Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why not release the GIL on rd_kafka_assign and rd_kafka_new ? #1023

Closed
mkmoisen opened this issue Jan 29, 2021 · 8 comments
Closed

Why not release the GIL on rd_kafka_assign and rd_kafka_new ? #1023

mkmoisen opened this issue Jan 29, 2021 · 8 comments

Comments

@mkmoisen
Copy link
Contributor

In Consumer.c's Consumer_assign function, a call is made to rd_kafka_assign that does not drop the GIL. This function only uses C arguments, no PyObjects*. My assumption is that this function results in IO to the broker. I think we can drop the GIL here:

Py_BEGIN_ALLOW_THREADS
err = rd_kafka_assign(self->rk, NULL);
Py_END_ALLOW_THREADS

Likewise, in Consumer_init function, a call is made to rd_kafka_new. I'm not too sure, but I think this results in IO to the broker, and the GIL could also be dropped here:

Py_BEGIN_ALLOW_THREADS
self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
                            errstr, sizeof(errstr));
Py_END_ALLOW_THREADS  

I would assume that there are some more invocations of rd_kafka_* functions that result in IO and where we could drop the GIL.

Is there any reason in particular why the GIL isn't being dropped in these situations?


The reason I ask is because I recently started using a new broker with SSL. If I launch four consumer threads, each thread takes about 30 seconds to issue Consumer(conf), and it blocks all other threads (kafka consumers, rest api threads, and etc) during this time.

In addition, once each consumer thread's on_assign callback is invoked, the successive call to c.assign(partitions) takes another 30 seconds and blocks all other threads in the application.

I hypothesize that these calls are not releasing the GIL, which is resulting in blocking of all the other threads in the application.

(Of course, it shouldn't be taking this long to call Consumer(conf) and c.assign(partitions) - there is probably some network issue in my new broker. But regardless, if the GIL was released, the other threads wouldn't be blocked.)

@edenhill
Copy link
Contributor

edenhill commented Feb 5, 2021

What librdkafka version are you on?

I'm guessing the new state synchronization in assign() (which was added to avoid race conditions in the app) may to blame for this stall, in which case it makes sense to unlock the GIL for assign(), but would really like to get a reproducer with full log enabled to understand what is going on.

As for rd_kafka_new() it will not block on any IO, just waiting for its threads to start which should be instant and not require releasing the GIL.

@mkmoisen
Copy link
Contributor Author

@edenhill I've reproduced this on 1.5 and 1.6.

Sorry, but do you know how I can fully enable the logs on the consumer side?

I do not have easy access to the broker logs unfortunately.

Thanks!

@edenhill
Copy link
Contributor

edenhill commented Apr 6, 2021

Add "debug": "all,-fetch" to your consumer config

@mkmoisen
Copy link
Contributor Author

mkmoisen commented Apr 7, 2021

Hi @edenhill

I added debugging, and immediately noticed one issue: a call is made to refresh the kerberos ticket, and this call takes a very long time, 40-120 seconds.

For example:

%7|1617813262.244|SASLREFRESH|rdkafka#consumer-1| [thrd:app]: Refreshing Kerberos ticket with command: kinit -R -t "/opt/app-root/src/user.keytab" -k [email protected] || kinit -t "/opt/app-root/src/user.keytab" -k [email protected]
%7|1617813309.346|SASLREFRESH|rdkafka#consumer-1| [thrd:app]: Kerberos ticket refreshed in 47102ms

I also executed the same command directly on my unix environment, and it also takes about 40 seconds each time.

Would you happen to know what might cause this to be so slow?

In addition, it seems like for a given consumer, the kerberos ticket can be refreshed multiple times. In a 5 minute span I counted 4-5 refreshes for each consumer. Do you know why it is necessary to refresh the kerberos ticket so much?


Here is the logs just for Consumer(conf) for your reference:

Main thread at 16:34:22.2
0: before Consumer(conf) at 16:34:22.2
%7|1617813262.244|SASL|rdkafka#consumer-1| [thrd:app]: Selected provider Cyrus for SASL mechanism GSSAPI
%7|1617813262.244|SASLREFRESH|rdkafka#consumer-1| [thrd:app]: Refreshing Kerberos ticket with command: kinit -R -t "/opt/app-root/src/user.keytab" -k [email protected] || kinit -t "/opt/app-root/src/user.keytab" -k [email protected]
%7|1617813309.346|SASLREFRESH|rdkafka#consumer-1| [thrd:app]: Kerberos ticket refreshed in 47102ms
%7|1617813309.346|OPENSSL|rdkafka#consumer-1| [thrd:app]: librdkafka built with OpenSSL version 0x100020bf
%7|1617813309.346|SSL|rdkafka#consumer-1| [thrd:app]: Loading CA certificate(s) from file /opt/app-root/src/CARoot.pem
%7|1617813309.346|SSL|rdkafka#consumer-1| [thrd:app]: Loading public key from file /opt/app-root/src/my.server.certificate.pem
%7|1617813309.347|SSL|rdkafka#consumer-1| [thrd:app]: Loading private key file from /opt/app-root/src/key.pem
%7|1617813309.347|SSLPASSWD|rdkafka#consumer-1| [thrd:app]: Private key requires password
%7|1617813309.349|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "my_group": updating member id "(not-set)" -> ""
%7|1617813309.349|WAKEUPFD|rdkafka#consumer-1| [thrd:app]: GroupCoordinator: Enabled low-latency ops queue wake-ups
%7|1617813309.349|BROKER|rdkafka#consumer-1| [thrd:app]: GroupCoordinator: Added new broker with NodeId -1
%7|1617813309.349|BRKMAIN|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Enter main broker thread
%7|1617813309.349|WAKEUPFD|rdkafka#consumer-1| [thrd:app]: sasl_ssl://hello.com:9092/bootstrap: Enabled low-latency ops queue wake-ups
%7|1617813309.349|BRKMAIN|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1617813309.349|BROKER|rdkafka#consumer-1| [thrd:app]: sasl_ssl://hello.com:9092/bootstrap: Added new broker with NodeId -1
%7|1617813309.349|BRKMAIN|rdkafka#consumer-1| [thrd:sasl_ssl://hello.com:9092/bootstrap]: sasl_ssl://hello.com:9092/bootstrap: Enter main broker thread
%7|1617813309.349|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.6.1 (0x10601ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer, GCC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS ZLIB SSL SASL_CYRUS HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0xffbff)
%7|1617813309.349|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "my_group" changed state init -> query-coord (join-state init)
%7|1617813309.349|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1617813309.349|CONNECT|rdkafka#consumer-1| [thrd:main]: sasl_ssl://hello.com:9092/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s))
%7|1617813309.349|CONF|rdkafka#consumer-1| [thrd:app]: Client configuration:
%7|1617813309.349|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "my_group": no broker available for coordinator query: intervaled in state query-coord
%7|1617813309.349|CONF|rdkafka#consumer-1| [thrd:app]:   client.software.name = confluent-kafka-python
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   client.software.version = 1.6.0-rdkafka-1.6.1
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   metadata.broker.list = hello.com:9092
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,interceptor,plugin,consumer,admin,eos,mock,assignor,conf
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   error_cb = 0x7f56be1ff8d0
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   opaque = 0x7f56bb3b5158
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   default_topic_conf = 0x7f56ac005b00
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   security.protocol = sasl_ssl
%7|1617813309.350|CONNECT|rdkafka#consumer-1| [thrd:sasl_ssl://hello.com:9092/bootstrap]: sasl_ssl://hello.com:9092/bootstrap: Received CONNECT op
%7|1617813309.350|STATE|rdkafka#consumer-1| [thrd:sasl_ssl://hello.com:9092/bootstrap]: sasl_ssl://hello.com:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1617813309.350|BROADCAST|rdkafka#consumer-1| [thrd:sasl_ssl://hello.com:9092/bootstrap]: Broadcasting state change
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   ssl.key.location = [redacted]
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   ssl.certificate.location = /opt/app-root/src/sbpsuml.server.certificate.pem
%7|1617813309.350|CONNECT|rdkafka#consumer-1| [thrd:sasl_ssl://hello.com:9092/bootstrap]: sasl_ssl://hello.com:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1617813309.350|STATE|rdkafka#consumer-1| [thrd:sasl_ssl://hello.com:9092/bootstrap]: sasl_ssl://hello.com:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   ssl.ca.location = /opt/app-root/src/CARoot.pem
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   sasl.mechanisms = GSSAPI
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   sasl.kerberos.service.name = kafka
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   sasl.kerberos.principal = [email protected]
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   sasl.kerberos.keytab = /opt/app-root/src/user.keytab
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   group.id = my_group
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   session.timeout.ms = 120000
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   max.poll.interval.ms = 300000
%7|1617813309.350|BROADCAST|rdkafka#consumer-1| [thrd:sasl_ssl://hello.com:9092/bootstrap]: Broadcasting state change
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   enable.auto.commit = false
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   isolation.level = read_committed
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   rebalance_cb = 0x7f56be202000
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   offset_commit_cb = 0x7f56be201e70
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]: Default topic configuration:
%7|1617813309.350|CONF|rdkafka#consumer-1| [thrd:app]:   auto.offset.reset = smallest
Main thread at 16:35:09.3
0: after Consumer(conf) at 16:35:09.3

@edenhill
Copy link
Contributor

edenhill commented Apr 8, 2021

Would you happen to know what might cause this to be so slow?

Kerberos and me are like this: 💀
Seriously, I don't know, could it be DNS/reverse-DNS related?

I'll move the initial kinit refresh out from rd_kafka_new() to avoid this blocking behaviour. Good find 👍 !

edenhill added a commit to confluentinc/librdkafka that referenced this issue Apr 8, 2021
@edenhill
Copy link
Contributor

edenhill commented Apr 8, 2021

Closing. Fix included in the next version.

@edenhill edenhill closed this as completed Apr 8, 2021
@mkmoisen
Copy link
Contributor Author

mkmoisen commented Apr 8, 2021

Hi @edenhill thanks! I will test it out today.

I would have thought that calling kinit -t ... -k ... would create some ticket on the local system with some expiration date, and that confluent_kafka could check the expiration date to determine whether or not it needs to execute kinit. It seems like instead, confluent_kafka will always invoke kinit.

Would you please clarify how it works?

@edenhill
Copy link
Contributor

edenhill commented Apr 9, 2021

It is not smart like that. librdkafka itself does not known what is going on with Kerberos, it simply calls into libsasl2/sasl-cyrus and kinit shell commands.

edenhill added a commit to confluentinc/librdkafka that referenced this issue Apr 14, 2021
edenhill added a commit to confluentinc/librdkafka that referenced this issue Apr 14, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants