diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 26d36f9e..3928db6c 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -52,6 +52,7 @@ -export([start_link/1, submit/1, submit/2, submit/3, submit_async/1, submit_async/2, + dispatch_sync/1, dispatch_sync/2, ready/2, idle/2, default_pool/0]). @@ -68,6 +69,7 @@ -spec submit(fun (() -> A) | mfargs(), 'reuse' | 'single') -> A. -spec submit(atom(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A. -spec submit_async(fun (() -> any()) | mfargs()) -> 'ok'. +-spec dispatch_sync(fun(() -> any()) | mfargs()) -> 'ok'. -spec ready(atom(), pid()) -> 'ok'. -spec idle(atom(), pid()) -> 'ok'. -spec default_pool() -> atom(). @@ -103,6 +105,13 @@ submit_async(Fun) -> submit_async(?DEFAULT_POOL, Fun). submit_async(Server, Fun) -> gen_server2:cast(Server, {run_async, Fun}). +dispatch_sync(Fun) -> + dispatch_sync(?DEFAULT_POOL, Fun). + +dispatch_sync(Server, Fun) -> + Pid = gen_server2:call(Server, {next_free, self()}, infinity), + worker_pool_worker:submit_async(Pid, Fun). + ready(Server, WPid) -> gen_server2:cast(Server, {ready, WPid}). idle(Server, WPid) -> gen_server2:cast(Server, {idle, WPid}). diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 2718967d..09b5868a 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -60,7 +60,7 @@ submit(Pid, Fun, ProcessModel) -> gen_server2:call(Pid, {submit, Fun, self(), ProcessModel}, infinity). submit_async(Pid, Fun) -> - gen_server2:cast(Pid, {submit_async, Fun}). + gen_server2:cast(Pid, {submit_async, Fun, self()}). set_maximum_since_use(Pid, Age) -> gen_server2:cast(Pid, {set_maximum_since_use, Age}). @@ -118,7 +118,13 @@ handle_cast({next_job_from, CPid}, {job, CPid, From, Fun, ProcessModel}) -> ok = worker_pool:idle(get(worker_pool_name), self()), {noreply, undefined, hibernate}; -handle_cast({submit_async, Fun}, undefined) -> +handle_cast({submit_async, Fun, _CPid}, undefined) -> + run(Fun), + ok = worker_pool:idle(get(worker_pool_name), self()), + {noreply, undefined, hibernate}; + +handle_cast({submit_async, Fun, CPid}, {from, CPid, MRef}) -> + erlang:demonitor(MRef), run(Fun), ok = worker_pool:idle(get(worker_pool_name), self()), {noreply, undefined, hibernate}; diff --git a/test/worker_pool_SUITE.erl b/test/worker_pool_SUITE.erl new file mode 100644 index 00000000..75a31aa8 --- /dev/null +++ b/test/worker_pool_SUITE.erl @@ -0,0 +1,230 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(worker_pool_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-define(POOL_SIZE, 1). +-define(POOL_NAME, test_pool). + +all() -> + [ + run_code_synchronously, + run_code_asynchronously, + set_timeout, + cancel_timeout, + cancel_timeout_by_setting, + dispatch_async_blocks_until_task_begins + ]. + +init_per_testcase(_, Config) -> + {ok, Pool} = worker_pool_sup:start_link(?POOL_SIZE, ?POOL_NAME), + rabbit_ct_helpers:set_config(Config, [{pool_sup, Pool}]). + +end_per_testcase(_, Config) -> + Pool = ?config(pool_sup, Config), + unlink(Pool), + exit(Pool, kill). + +run_code_synchronously(_) -> + Self = self(), + Test = make_ref(), + Sleep = 200, + {Time, Result} = timer:tc(fun() -> + worker_pool:submit(?POOL_NAME, + fun() -> + timer:sleep(Sleep), + Self ! {hi, Test}, + self() + end, + reuse) + end), + % Worker run synchronously + true = Time > Sleep, + % Worker have sent message + receive {hi, Test} -> ok + after 0 -> error(no_message_from_worker) + end, + % Worker is a separate process + true = (Self /= Result). + +run_code_asynchronously(_) -> + Self = self(), + Test = make_ref(), + Sleep = 200, + {Time, Result} = timer:tc(fun() -> + worker_pool:submit_async(?POOL_NAME, + fun() -> + timer:sleep(Sleep), + Self ! {hi, Test}, + self() + end) + end), + % Worker run synchronously + true = Time < Sleep, + % Worker have sent message + receive {hi, Test} -> ok + after Sleep + 100 -> error(no_message_from_worker) + end, + % Worker is a separate process + true = (Self /= Result). + +set_timeout(_) -> + Self = self(), + Test = make_ref(), + Worker = worker_pool:submit(?POOL_NAME, + fun() -> + Worker = self(), + timer:sleep(100), + worker_pool_worker:set_timeout( + my_timeout, 1000, + fun() -> + Self ! {hello, self(), Test} + end), + Worker + end, + reuse), + + % Timeout will occur after 1000 ms only + receive {hello, Worker, Test} -> exit(timeout_should_wait) + after 0 -> ok + end, + + timer:sleep(1000), + + receive {hello, Worker, Test} -> ok + after 1000 -> exit(timeout_is_late) + end. + + +cancel_timeout(_) -> + Self = self(), + Test = make_ref(), + Worker = worker_pool:submit(?POOL_NAME, + fun() -> + Worker = self(), + timer:sleep(100), + worker_pool_worker:set_timeout( + my_timeout, 1000, + fun() -> + Self ! {hello, self(), Test} + end), + Worker + end, + reuse), + + % Timeout will occur after 1000 ms only + receive {hello, Worker, Test} -> exit(timeout_should_wait) + after 0 -> ok + end, + + worker_pool_worker:next_job_from(Worker, Self), + Worker = worker_pool_worker:submit(Worker, + fun() -> + worker_pool_worker:clear_timeout(my_timeout), + Worker + end, + reuse), + + timer:sleep(1000), + receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled) + after 0 -> ok + end. + +cancel_timeout_by_setting(_) -> + Self = self(), + Test = make_ref(), + Worker = worker_pool:submit(?POOL_NAME, + fun() -> + Worker = self(), + timer:sleep(100), + worker_pool_worker:set_timeout( + my_timeout, 1000, + fun() -> + Self ! {hello, self(), Test} + end), + Worker + end, + reuse), + + % Timeout will occur after 1000 ms only + receive {hello, Worker, Test} -> exit(timeout_should_wait) + after 0 -> ok + end, + + worker_pool_worker:next_job_from(Worker, Self), + Worker = worker_pool_worker:submit(Worker, + fun() -> + worker_pool_worker:set_timeout(my_timeout, 1000, + fun() -> + Self ! {hello_reset, self(), Test} + end), + Worker + end, + reuse), + + timer:sleep(1000), + receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled) + after 0 -> ok + end, + + receive {hello_reset, Worker, Test} -> ok + after 1000 -> exit(timeout_is_late) + end. + +dispatch_async_blocks_until_task_begins(_) -> + Self = self(), + + Waiter = fun() -> + Self ! {register, self()}, + receive + go -> ok + end + end, + + ok = worker_pool:dispatch_sync(?POOL_NAME, Waiter), + SomeWorker = receive + {register, WPid} -> WPid + after 250 -> + none + end, + ?assert(is_process_alive(SomeWorker), "Dispatched tasks should be running"), + Pid = spawn( + fun() -> + ok = worker_pool:dispatch_sync(?POOL_NAME, + Waiter), + Self ! done_waiting, + exit(normal) + end), + DidWait = receive + done_waiting -> + false + after 250 -> + true + end, + ?assert(DidWait, "dispatch_sync should block until there is a free worker"), + SomeWorker ! go, + DidFinish = receive + done_waiting -> + true + after 250 -> + false + end, + ?assert(DidFinish, "appearance of a free worker should unblock the dispatcher").