Skip to content

Commit

Permalink
feat: support timeout for stopping sup pools
Browse files Browse the repository at this point in the history
  • Loading branch information
terry-xiaoyu committed Nov 26, 2024
1 parent d16e504 commit ceea46b
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 6 deletions.
4 changes: 4 additions & 0 deletions src/ecpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
, start_pool/3
, start_sup_pool/3
, stop_sup_pool/1
, stop_sup_pool/2
, get_client/1
, get_client/2
, pick_and_do/3
Expand Down Expand Up @@ -91,6 +92,9 @@ start_sup_pool(Pool, Mod, Opts) ->
stop_sup_pool(Pool) ->
ecpool_sup:stop_pool(Pool).

stop_sup_pool(Pool, Opts) ->
ecpool_sup:stop_pool(Pool, Opts).

%% @doc Get client/connection
-spec(get_client(pool_name()) -> get_client_ret()).
get_client(Pool) ->
Expand Down
79 changes: 73 additions & 6 deletions src/ecpool_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
%% API
-export([ start_pool/3
, stop_pool/1
, stop_pool/2
, get_pool/1
]).

Expand All @@ -32,6 +33,9 @@
-export([init/1]).

-type pool_name() :: ecpool:pool_name().
-type stop_opts() :: #{timeout => non_neg_integer()}.

-define(STOP_TIMOEUT, infinity).

%% @doc Start supervisor.
-spec(start_link() -> {ok, pid()} | {error, term()}).
Expand All @@ -50,13 +54,30 @@ start_pool(Pool, Mod, Opts) ->
%% @doc Stop a pool.
-spec(stop_pool(Pool :: pool_name()) -> ok | {error, term()}).
stop_pool(Pool) ->
stop_pool(Pool, #{}).

-spec(stop_pool(Pool :: pool_name(), stop_opts()) -> ok | {error, term()}).
stop_pool(Pool, Opts) ->
ChildId = child_id(Pool),
case supervisor:terminate_child(?MODULE, ChildId) of
ok ->
supervisor:delete_child(?MODULE, ChildId);
{error, Reason} ->
{error, Reason}
end.
Timeout = maps:get(timeout, Opts, ?STOP_TIMOEUT),
try gen_server:call(?MODULE, {terminate_child, ChildId}, Timeout) of
ok -> delete_child(ChildId, Timeout);
{error, Reason} -> {error, Reason}
catch
exit:{R, _} when R == noproc; R == normal; R == shutdown ->
{error, not_found};
exit:{timeout, _} ->
%% Sometimes the `ecpool_sup` is not responding to terminate request as the `ecpool_pool_sup`
%% process got stuck in connecting. In this case, we need to cancel the connection
%% by force killing it so the `ecpool_sup` won't be stuck.
_ = kill_ecpool_pool_sup_if_stuck(),
%% Now the `ecpool_pool_sup` process can be in one of the following state:
%% - has been force killed, or
%% - has gone down by itself or by the `terminate_child` call, or
%% - is still running normally
%% In any case we try to remove it from childspec as we have sent a `terminate_child`.
delete_child(ChildId, Timeout)
end.

%% @doc Get a pool.
-spec(get_pool(pool_name()) -> undefined | pid()).
Expand Down Expand Up @@ -90,3 +111,49 @@ pool_spec(Pool, Mod, Opts) ->

child_id(Pool) -> {pool_sup, Pool}.

%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

kill_ecpool_pool_sup_if_stuck() ->
case process_info(whereis(?MODULE), links) of
{links, LinkedPids} ->
case search_ecpool_pool_sup_process(LinkedPids) of
{ok, Pid} ->
exit(Pid, kill),
ok;
Err ->
Err
end;
undefined ->
{error, not_found}
end.

search_ecpool_pool_sup_process([]) ->
{error, not_found};
search_ecpool_pool_sup_process([Pid | Rest]) ->
case process_info(Pid, [dictionary, status, current_function]) of
[{dictionary, Dicts}, {status, Status}, {current_function, CurrFunc}] ->
case proplists:get_value('$initial_call', Dicts) of
{supervisor, ecpool_pool_sup, _} ->
case {Status, CurrFunc} of
{waiting, {proc_lib, sync_start, _}} ->
{ok, Pid};
_ ->
{error, not_stuck_in_start}
end;
_ ->
search_ecpool_pool_sup_process(Rest)
end;
undefined ->
search_ecpool_pool_sup_process(Rest)
end.

delete_child(ChildId, Timeout) ->
try gen_server:call(?MODULE, {delete_child, ChildId}, Timeout)
catch
exit:{R, _} when R == noproc; R == normal; R == shutdown ->
{error, not_found};
exit:{timeout, _} ->
{error, timeout}
end.
14 changes: 14 additions & 0 deletions test/ecpool_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ groups() ->
[t_start_pool,
t_start_pool_any_name,
t_start_sup_pool,
t_start_sup_pool_timeout,
t_empty_pool,
t_empty_hash_pool,
t_restart_client,
Expand Down Expand Up @@ -113,6 +114,19 @@ t_start_sup_pool(_Config) ->
ecpool:stop_sup_pool(xpool),
?assertEqual([], ecpool_sup:pools()).

t_start_sup_pool_timeout(_Config) ->
spawn_link(fun() ->
?assertMatch({error, {killed, _}},
ecpool:start_sup_pool(timeout_pool, test_timeout_client, ?POOL_OPTS))
end),
timer:sleep(100),
{Time, Val} = timer:tc(ecpool, stop_sup_pool, [timeout_pool, #{timeout => 200}]),
?assert(Time/1000 < 500),
%% The `ecpool:start_sup_pool/3` has not completed before it was cancelled (killed),
%% so `ecpool:stop_sup_pool/2` returns `{error, not_found}`.
?assertEqual({error, not_found}, Val),
?assertEqual([], ecpool_sup:pools()).

t_restart_client(_Config) ->
ecpool:start_pool(?POOL, test_client, [{pool_size, 4}]),
Workers1 = ecpool:workers(?POOL),
Expand Down
26 changes: 26 additions & 0 deletions test/test_timeout_client.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(test_timeout_client).

-behaviour(ecpool_worker).

-export([connect/1]).

connect(Options) ->
Delay = proplists:get_value(delay, Options, 5000),
timer:sleep(Delay),
{ok, erlang:spawn_link(fun() -> ok end)}.

0 comments on commit ceea46b

Please sign in to comment.