Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can you consume without using a consumer group? #250

Closed
zhaojack opened this issue Sep 22, 2017 · 15 comments
Closed

Can you consume without using a consumer group? #250

zhaojack opened this issue Sep 22, 2017 · 15 comments
Labels

Comments

@zhaojack
Copy link

Is there a way to use confluent-kafka-python to consume without setting group.id?

The platform documentation said "You should always configure group.id unless you are using the simple assignment API and you don’t need to store offsets in Kafka" and didn't hint anything's different about the python API (or maybe I was just blind). In the python API I see I can call consumer.assign. I tried, and it gave me an _UNKNOWN_GROUP error.

@edenhill
Copy link
Contributor

While the group.id is technically not required from a Kafka standpoint until you want to commit offsets, this client implementation requires the group.id to be set.
But if you are not going to commit/retrieve offsets and only use the assign() API you can set the group.id to anything.

@zhaojack
Copy link
Author

So, if I set group.id to say, testgroup12345, as long as I don't touch the .subscribe or .commit functions, the server would never hear testgroup12345 so I wouldn't need to worry about littering the overall list of consumer groups with random test groups. Am I understanding this right?

Thanks for the quick answer by the way.

@edenhill
Copy link
Contributor

Exactly, with two additions:

  • set enable.auto.commit to False
  • set the TopicPartition .offset to OFFSET_BEGINNING, OFFSET_END or an absolute offset to start consuming from - the default is to used the committed offset.

@zhaojack
Copy link
Author

Awesome. I will give that a try. Thank you!

@zhaojack
Copy link
Author

Behavior confirmed. Closing.

@swenzel
Copy link

swenzel commented Aug 27, 2020

I'm afraid this will not bypass all the consumer group mechanisms. If you have ACL enabled and your kafka user is not allowed to create consumer groups with the given id, you cannot consume:

%7|1598524567.315|CGRPQUERY|rdkafka#consumer-9| [thrd:main]: sasl_plaintext://broker03:9093/6: Group "swenzelgroup": querying for coordinator: intervaled in state query-coord
%7|1598524567.315|CGRPSTATE|rdkafka#consumer-9| [thrd:main]: Group "swenzelgroup" changed state query-coord -> wait-coord (v3, join-state assigned)
%7|1598524567.315|BROADCAST|rdkafka#consumer-9| [thrd:main]: Broadcasting state change
%7|1598524567.315|SEND|rdkafka#consumer-9| [thrd:sasl_plaintext://broker03:]: sasl_plaintext://broker03:9093/6: Sent FindCoordinatorRequest (v2, 36 bytes @ 0, CorrId 9)
%7|1598524567.377|RECV|rdkafka#consumer-9| [thrd:sasl_plaintext://broker03:]: sasl_plaintext://broker03:9093/6: Received FindCoordinatorResponse (v2, 18 bytes, CorrId 9, rtt 61.97ms)
%7|1598524567.377|CGRPCOORD|rdkafka#consumer-9| [thrd:main]: sasl_plaintext://broker03:9093/6: Group "swenzelgroup" FindCoordinator response error: GROUP_AUTHORIZATION_FAILED: Broker: Group authorization failed
%7|1598524567.377|CGRPSTATE|rdkafka#consumer-9| [thrd:main]: Group "swenzelgroup" changed state wait-coord -> query-coord (v3, join-state assigned)
%7|1598524567.377|BROADCAST|rdkafka#consumer-9| [thrd:main]: Broadcasting state change
%7|1598524568.314|CGRPQUERY|rdkafka#consumer-9| [thrd:main]: sasl_plaintext://broker03:9093/6: Group "swenzelgroup": querying for coordinator: intervaled in state query-coord
%7|1598524568.314|CGRPSTATE|rdkafka#consumer-9| [thrd:main]: Group "swenzelgroup" changed state query-coord -> wait-coord (v3, join-state assigned)
...

@deeTEEcee
Copy link

deeTEEcee commented Dec 15, 2020

with the latest version (1.5.0), i followed #250 (comment) but ill still get errors requiring group to be set up when i run consumer.poll.

@edenhill
Copy link
Contributor

edenhill commented Dec 15, 2020

@deeTEEcee You can, but you are required to supply a group.id anyway, but it won't be used unless you call subscribe(), committed() or commit(), so if you're not using any of those functions you can simply generate a unique group.id (e.g. using uuid4())

@deeTEEcee
Copy link

deeTEEcee commented Dec 15, 2020

sorry, to be more specific, ill get the error even if i added in the group and disabled it.

current source code just to make things easier to see:

config = {
    "bootstrap.servers": rawq_brokers or os.environ.get("RAWQ_BROKERS"), # e.g: rawq-prod-0.rawq.sightmachine.io:9093
    "sasl.username": os.environ.get("RAWQ_USERNAME"),
    "sasl.password": os.environ.get("RAWQ_PASSWORD"),
    "sasl.mechanisms": "SCRAM-SHA-256",
    "security.protocol": "SASL_SSL",
    "group.id": 'random',
    "auto.offset.reset": 'earliest',
    "enable.auto.commit": False # we have to manually maintain this unless we add groups in the ACL.
    # "ssl.ca.location": "/etc/ssl/certs"
}

def basic_consume_loop(consumer):
    running = True
    records = []
    try:

        while running:
            if len(records) == 5:
                break
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                records.append(msg)
                # msg_process(msg)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()
    print(records)



consumer = Consumer(config)
topics = consumer.list_topics().topics
partitions = [TopicPartition(my_topic, partition=partition, offset=0) for partition in list(topics[my_topic].partitions.keys())]
consumer.assign(partitions)

basic_consume_loop(consumer)

@edenhill
Copy link
Contributor

What errors are you seeing?

@deeTEEcee
Copy link

/Users/dtcsight/.pyenv/versions/ftx/bin/python /opt/sightmachine/factorytx-core/scripts/rawq/confluent_kafka-test.py
Traceback (most recent call last):
  File "/opt/sightmachine/factorytx-core/scripts/rawq/confluent_kafka-test.py", line 74, in <module>
    basic_consume_loop(consumer)
  File "/opt/sightmachine/factorytx-core/scripts/rawq/confluent_kafka-test.py", line 56, in basic_consume_loop
    raise KafkaException(msg.error())
cimpl.KafkaException: KafkaError{code=GROUP_AUTHORIZATION_FAILED,val=30,str="FindCoordinator response error: Not authorized to access group: Group authorization failed."}

@edenhill
Copy link
Contributor

Ah, right, we still look up the coordinator to be able to commit offsets, and that lookup will fail in this case.

Is it necessary to have this group acl in place?

@deeTEEcee
Copy link

Not sure if that was a question for me but yeah, I do need the group acl activated for this. Otherwise, it breaks.

@deeTEEcee
Copy link

deeTEEcee commented Sep 16, 2021

@edenhill I just learned today we also have this ticket which describes a very similar situation: confluentinc/librdkafka#3261

My team also uses a Java client for kafka and we were pretty confused why group.id was needed on the python consumer side even with the same calls we use for Java WITHOUT a group.id but I think I've done enough attempts to say "hey, we're stuck with group.id for now"

@ethanttbui
Copy link

ethanttbui commented Oct 11, 2024

@edenhill May I ask if there is any update to this topic? I tested the behavior mentioned by @deeTEEcee myself and found that ACL error does not happen to me.
I am using confluent-kafka==2.5.3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants