Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Project Reactor #116

Closed
hartmut-co-uk opened this issue May 16, 2021 · 8 comments · Fixed by #146
Closed

Add support for Project Reactor #116

hartmut-co-uk opened this issue May 16, 2021 · 8 comments · Fixed by #146
Labels
draft-available enhancement New feature or request help wanted Extra attention is needed ver:0.4.0.0 wontfix This will not be worked on

Comments

@hartmut-co-uk
Copy link

Not explicitly mentioned on the Roadmap - but is support for Project Reactor planned also?

JavaRX and other streaming modules

Btw. congrats to the great work - I think this is going to fill a huge 'gap' for consumers / processing patterns.

@astubbs
Copy link
Contributor

astubbs commented Jul 27, 2021

Hey thanks a lot! Sorry long delay!

Are you actively using the project?

Reactor has been brought up - but the system is not CPU bound at all, so I'm not sure I see a point.

@astubbs astubbs added wontfix This will not be worked on enhancement New feature or request labels Jul 27, 2021
@astubbs astubbs closed this as completed Jul 27, 2021
@hartmut-co-uk
Copy link
Author

Hi @astubbs, I'm not yet (due to limitation to HTTP), but was planning to.

Reactor support wouldn't be about anything 'CPU bound' but simply as an alternative to Vert.x - for non-blocking IO.

As I understand currently only Vert.x HTTP is supported (?) - while I'd like to re-use already existing code to access my database and/or cache.

@astubbs astubbs reopened this Jul 27, 2021
@astubbs
Copy link
Contributor

astubbs commented Jul 27, 2021

Oh dang! Sorry @hartmut-co-uk - my bad. My brain translated that to Disrupter (https://lmax-exchange.github.io/disruptor/). Ah yes I remember Reactor now. - No plans to implement. But the implementation would be extremely similar to the vert.x module, and can reuse a lot of what the vert.x module does. I'd recommend forking the Vert.x class, and adapting to to Reactor, then taking out the common parts and making an abstract super between them.

Yes, currently vert.x module only supports http, generic interface work stalled due to lack of demand:
Add support for generic Vert.x execution types (i.e. other than http) #93

I have some in progress work to refactor the main class, but I'd stopped working on it:
issue: Remove non vertx operations from vertx implementation #99
pr: #133 Base class refactor

There is an issue with the queueing model for vert.x atm though, and will also have the same problem with Reactor I think: vert.x concurrency relies on WebClient only using a single host #113

Let me have a quick look...

@astubbs astubbs added the help wanted Extra attention is needed label Jul 27, 2021
@astubbs
Copy link
Contributor

astubbs commented Jul 27, 2021

@hartmut-co-uk can you provide a simple bit of code that demonstrates using reactor to do something async with a file or something you use it for?

@hartmut-co-uk
Copy link
Author

I'll first need to get my head around this again :-)
but maybe as a reference you could take a look at

Not sure if the parallelConsumer could provide a ~ Flux<ReceiverRecord<Integer, String>> similar to the linked SampleConsumer - or if it actually would need to be a Flux<GroupedFlux>.

🤔 or - since the parallelConsumer already comes with native support for the ordering mode maybe it could simply require a lambda returning a Mono<T> which the parallelConsumer subscribing to?

Something like

        // tag::example[]
        var resultStream = parallelConsumer.reactorStream(record -> {
            log.info("Concurrently constructing and returning RequestInfo from record: {}", record);
            Map<String, String> params = UniMaps.of("recordKey", record.key(), "payload", record.value());
            return Mono.just(params); // here you would put your actual logic, could be anything e.g. Http/GRPC client request, database query, ..
        });
        // end::example[]

@astubbs
Copy link
Contributor

astubbs commented Jul 28, 2021

ok @hartmut-co-uk , well that was interesting! haha whoops. Check it out. It uses the same approach as Vert.x. the #subscribeOn should cause the user's Flux to execute in the thread pool - although I couldn't get subscribeOn to work correctly for me - only publishOn would get things in a different pool. Please test it out and see if it does what you think it should. There's a basic test that checks the processor runs the user' function and commits to records. I'm very keen to see a more interesting test that does something other than simply return a simply Mono. Perhaps you could hook up an http client? Cheers.

@hartmut-co-uk
Copy link
Author

👍 something like this?
#146 (review)

@astubbs
Copy link
Contributor

astubbs commented Jul 28, 2021

Ok, merging the prototype. Need help kicking the tires, writing docs and more tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
draft-available enhancement New feature or request help wanted Extra attention is needed ver:0.4.0.0 wontfix This will not be worked on
Projects
None yet
2 participants