-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Accelerated Kafka datasource. #330
Add Accelerated Kafka datasource. #330
Conversation
streamz/sources.py
Outdated
engine: str (None) | ||
If engine is "cudf", streamz reads data (messages must be JSON) from Kafka | ||
in an accelerated manner directly into cudf dataframes. | ||
Please refer to API here: github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This link would be updated in the future.
FYI: All Dask tests with Any ideas how to resolve this? I am trying to look at this: https://distributed.dask.org/en/latest/develop.html#writing-tests |
Okay, so installing Can we do install |
I don't see why not |
I added it to travis, but tests are still failing. Can someone please take a look? |
I see this got merged last week: dask/distributed#3706. But marking tests with async def should work. Maybe we need to change from Python 3.6. Nope, tried it. Still doesn't work: https://travis-ci.org/github/python-streamz/streamz/builds/678266572 |
streamz/tests/test_dask.py
Outdated
@@ -15,7 +15,7 @@ | |||
|
|||
|
|||
@gen_cluster(client=True) | |||
def test_map(c, s, a, b): | |||
async def test_map(c, s, a, b): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we suddenly need the async
keyword here when it wasn't needed before? I think the @gen_cluster
attribute should make this function asynchronous, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's been an upstream change in Dask, that's causing tests unrelated to the PR to fail. Posted the relevant information and links in the comments.
streamz/sources.py
Outdated
@@ -503,7 +507,7 @@ def checkpoint_emit(_part): | |||
try: | |||
low, high = self.consumer.get_watermark_offsets( | |||
tp, timeout=0.1) | |||
except (RuntimeError, ck.KafkaException): | |||
except (RuntimeError, ck.KafkaException, ValueError): | |||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a sleep statement here to prevent a spam query for offsets? Also, I wonder if we need logging to here to explain the reason for the ValueError
conda/environments/streamz_dev.yml
Outdated
@@ -6,6 +6,7 @@ channels: | |||
dependencies: | |||
- python>=3.7 | |||
- pytest | |||
- pytest-tornasync |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add this dependency if because of the introduction of the async
keyword?
.travis.yml
Outdated
@@ -7,7 +7,7 @@ language: python | |||
|
|||
matrix: | |||
include: | |||
- python: 3.6 | |||
- python: 3.7 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this fix?
@@ -529,8 +533,14 @@ def checkpoint_emit(_part): | |||
|
|||
def start(self): | |||
import confluent_kafka as ck | |||
if self.engine == "cudf": | |||
from custreamz import kafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a complete replacement for ck
? (does it include TopicPartition
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if you see the API calls like commit
, committed
, get_watermark_offsets
, they're the exact same as CK, including the TopicPartition
class. We've tried to do this deliberately to keep code duplication as minimal as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, why not do from custreamz import kafka as ck
and remove the conditionals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see what you're saying. A little clarification here is that the API for custreamz.kafka are exactly same as CK in the sense that they take the TopicPartition
object(s). But, the TopicPartition
class does not exist in custreamz.kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deserialization happens in these API and there is no concept of TopicPartition
since this a librdkafka C++ implementation underneath: https://github.com/jdye64/cudf/blob/7307cbe4f33a67c7d31a8d48955d755d55b03e9d/python/custreamz/custreamz/kafka.py#L72
Should these changes also be made in |
There's no checkpointing in |
@CJ-Wright Any idea about how we can resolve this? Tests are passing locally. |
It seems like maybe this was an effort to use native Python features in 3.x, because 2.x is no longer supported. Tornado is python 2/3 compatible with how it does not use these new Python 3 keywords. |
Codecov Report
@@ Coverage Diff @@
## master #330 +/- ##
==========================================
+ Coverage 94.79% 95.42% +0.63%
==========================================
Files 13 13
Lines 1671 1661 -10
==========================================
+ Hits 1584 1585 +1
+ Misses 87 76 -11
Continue to review full report at Codecov.
|
Okay, with the upstream Dask error fixed: dask/distributed#3738, all tests are now passing. |
Clearly coverage suffers here since we don't test for CUDA on Travis. Indeed, that means that we take it on trust that this code works... Can you think of how we can get around that? |
Yes, coverage definitely takes a small hit here. All the API that serve as a replacement for CK would be tested as part of cudf/custreamz nightly builds (we get a new build every 2-3 hours, so this is pretty reliable) on gpuCI. If those pass, then it is guaranteed that the individual API added as part of As for actually having a test for |
At the minimum, I would suggest adding a badge to the front page (README) linking to those nightly tests, and describe this somewhere, and adding no-coverage markers to GPU-specific branches. |
Done. Please have a look. |
@@ -21,3 +21,5 @@ BSD-3 Clause | |||
:alt: Documentation Status | |||
.. |Version Status| image:: https://img.shields.io/pypi/v/streamz.svg | |||
:target: https://pypi.python.org/pypi/streamz/ | |||
.. |RAPIDS custreamz gpuCI| image:: https://img.shields.io/badge/gpuCI-custreamz-green | |||
:target: https://github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This link will be updated in the future.
This will install all GPU dependencies, including streamz. | ||
|
||
Please refer to RAPIDS custreamz.kafka API here: | ||
github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This link will be updated in the future.
@CJ-Wright , do you want another look? |
LGTM to the extent that I understand this. |
OK, putting it in then - thank you. |
Thanks @martindurant, @CJ-Wright |
I will keep the links to |
This PR adds an
engine
parameter tofrom_kafka_batched
. Ifengine="cudf"
, messages (for now, only JSON is supported) will be read from Kafka directly into a cuDF dataframe.custreamz.kafka
has the exact same API as Confluent Kafka, so it serves as a drop-in replacement infrom_kafka_batched
with minimal duplication of code. But under the hood, it reads messages from librdkafka and directly uploads them to the GPU as a cuDF dataframe instead of gathering all the messages back from C++ into Python.This essentially avoids the GIL issue we encountered in the Confluent Kafka consumer, and hence enables reading from Kafka in a faster fashion with fewer processes. This accelerated reader also adheres to the current checkpointing mechanism.
Folks interested in trying out custreamz would benefit from this accelerated Kafka reader. If someone does not want to use GPUs, they can use streamz as is, with the default
engine=None
.I am skipping the tests for this PR, since we would be having gpuCI tests in cuDF/cuStreamz once things consolidate.