diff --git a/examples/http_loader.rb b/examples/http_loader.rb index fea0a0f..a22c921 100644 --- a/examples/http_loader.rb +++ b/examples/http_loader.rb @@ -38,6 +38,8 @@ # An example loader which is blocking and synchronous as a whole, but executes all of its operations concurrently. module Loaders class HTTPLoader < GraphQL::Batch::Loader + include GraphQL::Batch::Async + def initialize(host:, size: 4, timeout: 4) super() @host = host @@ -46,7 +48,7 @@ def initialize(host:, size: 4, timeout: 4) @futures = {} end - def perform_on_wait(operations) + def perform_early(operations) # This fans out and starts off all the concurrent work, which starts and # immediately returns Concurrent::Promises::Future` objects for each operation. operations.each do |operation| @@ -55,9 +57,6 @@ def perform_on_wait(operations) end def perform(operations) - # Defer to let other non-async loaders run to completion first. - defer - # Collect the futures (and possibly trigger any newly added ones) futures = operations.map do |operation| future(operation) @@ -65,7 +64,7 @@ def perform(operations) # At this point, all of the concurrent work has been started. - # This converges back in, waiting on each concurrent future to finish, and fulfilling each + # Now it converges back in, waiting on each concurrent future to finish, and fulfilling each # (non-concurrent) Promise.rb promise. operations.each_with_index.each do |operation, index| fulfill(operation, futures[index].value!) # .value is a blocking call diff --git a/lib/graphql/batch.rb b/lib/graphql/batch.rb index 4e6d096..c77e1e5 100644 --- a/lib/graphql/batch.rb +++ b/lib/graphql/batch.rb @@ -38,5 +38,6 @@ def self.use(schema_defn, executor_class: GraphQL::Batch::Executor) require_relative "batch/version" require_relative "batch/loader" +require_relative "batch/async" require_relative "batch/executor" require_relative "batch/setup_multiplex" diff --git a/lib/graphql/batch/async.rb b/lib/graphql/batch/async.rb new file mode 100644 index 0000000..0e4e330 --- /dev/null +++ b/lib/graphql/batch/async.rb @@ -0,0 +1,25 @@ +module GraphQL::Batch + module Async + def resolve + defer # Let other non-async loaders run to completion first. + @peek_queue_index = nil + super + end + + def on_any_loader_wait + @peek_queue_index ||= 0 + peek_queue = queue[@peek_queue_index..] + return if peek_queue.empty? + @peek_queue_index = peek_queue.size + perform_early(peek_queue) + end + + def perform_early(keys) + raise NotImplementedError, "Implement GraphQL::Batch::Async#perform_early to trigger async operations early" + end + + def perform(keys) + raise NotImplementedError, "Implement GraphQL::Batch::Async#perform to wait on the async operations" + end + end +end diff --git a/lib/graphql/batch/executor.rb b/lib/graphql/batch/executor.rb index 8ca5cb2..b51deb6 100644 --- a/lib/graphql/batch/executor.rb +++ b/lib/graphql/batch/executor.rb @@ -53,8 +53,11 @@ def resolve(loader) @loading = was_loading end - def defer(_loader) - while (non_deferred_loader = @loaders.find { |_, loader| !loader.deferred }) + # Defer the resolution of the current loader, allowing other loaders to be resolved first. + # This is useful when the current loader has kicked off async or concurrent work, and don't need to + # block execution of the current thread until later. + def defer_to_other_loaders + while (non_deferred_loader = @loaders.find { |_, loader| !loader.deferred && !loader.resolved? }) resolve(non_deferred_loader) end end @@ -62,7 +65,7 @@ def defer(_loader) def on_wait # FIXME: Better name? @loaders.each do |_, loader| - loader.on_any_wait + loader.on_any_loader_wait end end diff --git a/lib/graphql/batch/loader.rb b/lib/graphql/batch/loader.rb index 3f7022b..6c3c72e 100644 --- a/lib/graphql/batch/loader.rb +++ b/lib/graphql/batch/loader.rb @@ -67,12 +67,11 @@ def prime(key, value) cache[cache_key(key)] ||= ::Promise.resolve(value).tap { |p| p.source = self } end - def on_any_wait - return if resolved? - load_keys = queue # "Peek" the queue, but don't consume it. - # TODO: Should we have a "peek queue" / "async queue", that we can consume here, to prevent - # duplicate calls to perform_on_wait? (perform_on_wait should be idempotent anyway, but...) - perform_on_wait(load_keys) + # Called when any GraphQL::Batch::Loader starts waiting. May be called more than once per loader, if + # the loader is waiting multiple times. Will not be called once per promise. + # + # Use GraphQL::Batch::Async for the common way to use this. + def on_any_loader_wait end def resolve # :nodoc: @@ -136,36 +135,6 @@ def fulfilled?(key) promise.pending? && promise.source != self end - def perform_on_wait(keys) - # FIXME: Better name? - # Interface to add custom code to e.g. trigger async operations when any loader starts waiting. - # Example: - # - # def initialize - # super() - # @futures = {} - # end - # - # def perform_on_wait(keys) - # keys.each do |key| - # future(key) - # end - # end - # - # def perform(keys) - # defer # let other non-async loaders run to completion first. - # keys.each do |key| - # future(key).value! - # end - # end - # - # def future(key) - # @futures[key] ||= Concurrent::Promises.future do - # # Perform the async operation - # end - # end - end - # Must override to load the keys and call #fulfill for each key def perform(keys) raise NotImplementedError @@ -188,7 +157,7 @@ def finish_resolve(key) def defer @deferred = true - executor.defer(self) + executor.defer_to_other_loaders ensure @deferred = false end