Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements #93 - Generic Vert.x future support #94

Merged
merged 3 commits into from
Sep 3, 2021

Conversation

astubbs
Copy link
Contributor

@astubbs astubbs commented Mar 17, 2021

fixes #93

@astubbs astubbs linked an issue Mar 17, 2021 that may be closed by this pull request
@astubbs
Copy link
Contributor Author

astubbs commented Mar 17, 2021

@radoslawtwardy can you give me an example of the async io code you're using? I'd like something more concrete to use in my tests for this.

@radoslawtwardy
Copy link

So as I said i tried to wrote own ParallelEoSStreamProcessor (which inside call async-htpp-client and some other thinks), but also i tried to use VertxParallelStreamProcessor, so maybe so example with it:

    ParallelConsumerOptions<String, String> options = ParallelConsumerOptions.<String, String>builder()
            .ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED)
            .maxConcurrency(10000) //I assume that processing mainly wait for IO
            .consumer(consumer)
            .build();

    VertxParallelStreamProcessor<String, String> vertxProcessor = VertxParallelStreamProcessor.createEosStreamProcessor(consumer, null, options);
    vertxProcessor.subscribe(of("test"));

    vertxProcessor.vertxHttpWebClient(
            (client, record) -> client.post(9000, "localhost", "/someEndpoint").sendJsonObject(new JsonObject("someBody")),
            resp -> resp.onSuccess(s -> logger.debug(s.statusMessage()))

This cause that i has 10000 threads !

I can show you own implementation of ParallelEoSStreamProcessor (which has same problem) - but i wrote it in scala (i'm scala developer)

@radoslawtwardy
Copy link

radoslawtwardy commented Mar 17, 2021

I've written some very simple code in Java - using ParallelEoSStreamProcessor to show you intention:

    //some simple and naive implementation of thread pool for IO operations - replace with some fork join pool
    ThreadPoolExecutor executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 32, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());

    KafkaConsumer<String, String> consumer = new KafkaConsumer(kafkaConsumerProps);
    ParallelConsumerOptions<String, String> options = ParallelConsumerOptions.<String, String>builder()
            .ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED)
            .maxConcurrency(10000) //I assume that processing mainly wait for IO
            .consumer(consumer)
            .build();

    ParallelStreamProcessor<String, String> processor = ParallelStreamProcessor.createEosStreamProcessor(options);
    processor.subscribe(of("test"));

    processor.poll(record -> executor.execute(new Runnable() {
        @Override
        public void run() {
            //do some IO operations with record
        }
    }));

Of course using future is much better and elegant, but I think simplicity of this code show my case. I want delegate main work to custom thread pool which is designed for some kind of IO operations. It can be not only one thread pool (maybe few).
Your commit (if I understood) has good API, but not resolve my problem, that throughput are closely related with pool size of 'workerPool'

@astubbs
Copy link
Contributor Author

astubbs commented Mar 23, 2021

I don't understand - this PR adds future support for generic async functions to the vert.x module, which uses a small number of threads.

Is your fork published so I can see it? I am also a "Scala developer"? doesn't seem to be on this list: https://github.com/confluentinc/parallel-consumer/network/members.

You don't need to reimplement thread pools or a custom ParallelEoSStreamProcessor in order to use generic async operations - the vert.x module already does that for you (with this PR). It uses a thread pool by default of the number of cores as you've shown.

ParallelEoSStreamProcessor (which inside call async-htpp-client

You also don't need a custom implementation just to have async http requests - the vert.x http functions are already available in the vert.x module.

This cause that i has 10000 threads !

Are you saying that when you tried the http client in the vert.x module, you got 10,000 threads? That shouldn't happen, and if it did, please open an issue as it's a bug. You should get roughly the same number of threads as you have cores, unless it's overridden.

do some IO operations with record

Also if you could show some examples of the complex IO operations you're doing and the libraries used and what they return in the function that would really help as well.

@radoslawtwardy
Copy link

As you suggested i create issue #97

@radoslawtwardy
Copy link

Together with #98 this seems nice for me. Thanks!

@astubbs astubbs changed the title Implements #93 - Generic Vert.x future support Implements #93 - Generic Vert.x future and Java CompletableFuture support Mar 27, 2021
@astubbs astubbs force-pushed the master branch 6 times, most recently from 5c841c3 to bc85ba3 Compare March 30, 2021 10:25
@birdayz
Copy link
Contributor

birdayz commented Jun 6, 2021

hey! this is pretty interesting. sorry for being annoying, but are there any plans to finish this PR ?

@astubbs
Copy link
Contributor Author

astubbs commented Jun 7, 2021

Thanks @birdayz - yes! Just finishing #54 first - paying down some decently sized tech debt first before adding more features...

@astubbs astubbs marked this pull request as draft July 13, 2021 13:19
@astubbs
Copy link
Contributor Author

astubbs commented Jul 28, 2021

hey! this is pretty interesting. sorry for being annoying, but are there any plans to finish this PR ?

Yes. This is now supported in the Reactor module ( #116 ). The fixes for that, will enable this with Vert.x I think.

@astubbs astubbs self-assigned this Jul 28, 2021
@astubbs astubbs force-pushed the vertx-generic branch 3 times, most recently from 4da5bc4 to 291d61f Compare September 2, 2021 20:42
@astubbs astubbs marked this pull request as ready for review September 2, 2021 21:33
@astubbs astubbs merged commit ac66193 into confluentinc:master Sep 3, 2021
@astubbs astubbs deleted the vertx-generic branch September 3, 2021 09:55
@astubbs astubbs changed the title Implements #93 - Generic Vert.x future and Java CompletableFuture support Implements #93 - Generic Vert.x future support Sep 3, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for generic Vert.x execution types (i.e. other than http)
3 participants