diff --git a/be/src/common/config.h b/be/src/common/config.h index 271464bf08acb8..4529ad2e8508f9 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -413,6 +413,12 @@ namespace config { // This configuration is used to recover compaction under the corner case. // If this configuration is set to true, block will seek position. CONF_Bool(block_seek_position, "false"); + + // the max client cache number per each host + // There are variety of client cache in BE, but currently we use the + // same cache size configuration. + // TODO(cmy): use different config to set different client cache if necessary. + CONF_Int32(max_client_cache_size_per_host, "10"); } // namespace config } // namespace doris diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp index b607faaf170523..2cef9d8e16a83b 100644 --- a/be/src/runtime/client_cache.cpp +++ b/be/src/runtime/client_cache.cpp @@ -135,15 +135,24 @@ Status ClientCacheHelper::create_client( void ClientCacheHelper::release_client(void** client_key) { DCHECK(*client_key != NULL) << "Trying to release NULL client"; boost::lock_guard lock(_lock); - ClientMap::iterator i = _client_map.find(*client_key); - DCHECK(i != _client_map.end()); - ThriftClientImpl* info = i->second; - //VLOG_RPC << "releasing client for " - // << info->ipaddress() << ":" << info->port(); - ClientCacheMap::iterator j = - _client_cache.find(make_network_address(info->ipaddress(), info->port())); + ClientMap::iterator client_map_entry = _client_map.find(*client_key); + DCHECK(client_map_entry != _client_map.end()); + ThriftClientImpl* info = client_map_entry->second; + ClientCacheMap::iterator j = _client_cache.find(make_network_address(info->ipaddress(), info->port())); DCHECK(j != _client_cache.end()); - j->second.push_back(*client_key); + + if (_max_cache_size_per_host >=0 && j->second.size() >= _max_cache_size_per_host) { + // cache of this host is full, close this client connection and remove if from _client_map + info->close(); + _client_map.erase(*client_key); + delete info; + + if (_metrics_enabled) { + _opened_clients->increment(-1); + } + } else { + j->second.push_back(*client_key); + } if (_metrics_enabled) { _used_clients->increment(-1); @@ -165,7 +174,10 @@ void ClientCacheHelper::close_connections(const TNetworkAddress& hostport) { BOOST_FOREACH(void * client_key, cache_entry->second) { ClientMap::iterator client_map_entry = _client_map.find(client_key); DCHECK(client_map_entry != _client_map.end()); - client_map_entry->second->close(); + ThriftClientImpl* info = client_map_entry->second; + info->close(); + _client_map.erase(client_key); + delete info; } } diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h index b5780c2043a557..84054ab521db5a 100644 --- a/be/src/runtime/client_cache.h +++ b/be/src/runtime/client_cache.h @@ -89,7 +89,11 @@ class ClientCacheHelper { private: template friend class ClientCache; // Private constructor so that only ClientCache can instantiate this class. - ClientCacheHelper() : _metrics_enabled(false) { } + ClientCacheHelper() : _metrics_enabled(false), _max_cache_size_per_host(-1) { } + + ClientCacheHelper(int max_cache_size_per_host): + _metrics_enabled(false), + _max_cache_size_per_host(max_cache_size_per_host) { } // Protects all member variables // TODO: have more fine-grained locks or use lock-free data structures, @@ -108,6 +112,9 @@ class ClientCacheHelper { // MetricRegistry bool _metrics_enabled; + // max connections per host in this cache, -1 means unlimited + int _max_cache_size_per_host; + // Number of clients 'checked-out' from the cache std::unique_ptr _used_clients; @@ -196,6 +203,12 @@ class ClientCache { boost::mem_fn(&ClientCache::make_client), this, _1, _2); } + ClientCache(int max_cache_size) : _client_cache_helper(max_cache_size) { + _client_factory = + boost::bind( + boost::mem_fn(&ClientCache::make_client), this, _1, _2); + } + // Close all clients connected to the supplied address, (e.g., in // case of failure) so that on their next use they will have to be // Reopen'ed. diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 5d7b09f75d497a..9d7e1bc301e936 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -88,7 +88,7 @@ class ExecEnv { MetricRegistry* metrics() const { return _metrics; } DataStreamMgr* stream_mgr() { return _stream_mgr; } ResultBufferMgr* result_mgr() { return _result_mgr; } - ClientCache* client_cache() { return _client_cache; } + ClientCache* client_cache() { return _backend_client_cache; } ClientCache* frontend_client_cache() { return _frontend_client_cache; } ClientCache* broker_client_cache() { return _broker_client_cache; } ClientCache* extdatasource_client_cache() { return _extdatasource_client_cache; } @@ -136,7 +136,7 @@ class ExecEnv { MetricRegistry* _metrics = nullptr; DataStreamMgr* _stream_mgr = nullptr; ResultBufferMgr* _result_mgr = nullptr; - ClientCache* _client_cache = nullptr; + ClientCache* _backend_client_cache = nullptr; ClientCache* _frontend_client_cache = nullptr; ClientCache* _broker_client_cache = nullptr; ClientCache* _extdatasource_client_cache = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index b20657b942cdea..21e78cd4941a92 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -71,10 +71,10 @@ Status ExecEnv::_init(const std::vector& store_paths) { _metrics = DorisMetrics::metrics(); _stream_mgr = new DataStreamMgr(); _result_mgr = new ResultBufferMgr(); - _client_cache = new BackendServiceClientCache(); - _frontend_client_cache = new FrontendServiceClientCache(); - _broker_client_cache = new BrokerServiceClientCache(); - _extdatasource_client_cache = new ExtDataSourceServiceClientCache(); + _backend_client_cache = new BackendServiceClientCache(config::max_client_cache_size_per_host); + _frontend_client_cache = new FrontendServiceClientCache(config::max_client_cache_size_per_host); + _broker_client_cache = new BrokerServiceClientCache(config::max_client_cache_size_per_host); + _extdatasource_client_cache = new ExtDataSourceServiceClientCache(config::max_client_cache_size_per_host); _mem_tracker = nullptr; _pool_mem_trackers = new PoolMemTrackerRegistry(); _thread_mgr = new ThreadResourceMgr(); @@ -100,7 +100,7 @@ Status ExecEnv::_init(const std::vector& store_paths) { _stream_load_executor = new StreamLoadExecutor(this); _routine_load_task_executor = new RoutineLoadTaskExecutor(this); - _client_cache->init_metrics(DorisMetrics::metrics(), "backend"); + _backend_client_cache->init_metrics(DorisMetrics::metrics(), "backend"); _frontend_client_cache->init_metrics(DorisMetrics::metrics(), "frontend"); _broker_client_cache->init_metrics(DorisMetrics::metrics(), "broker"); _extdatasource_client_cache->init_metrics(DorisMetrics::metrics(), "extdatasource"); @@ -209,7 +209,7 @@ void ExecEnv::_destory() { delete _broker_client_cache; delete _extdatasource_client_cache; delete _frontend_client_cache; - delete _client_cache; + delete _backend_client_cache; delete _result_mgr; delete _stream_mgr; delete _stream_load_executor; diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index af5c3cfbc5c56f..c16f0e098c1d27 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -667,8 +667,11 @@ private void cancelInternal() { private void cancelRemoteFragmentsAsync() { for (BackendExecState backendExecState : backendExecStates) { - LOG.warn("cancelRemoteFragments initiated={} done={} hasCanceled={}", - backendExecState.initiated, backendExecState.done, backendExecState.hasCanceled); + TNetworkAddress address = backendExecState.getBackendAddress(); + LOG.info("cancelRemoteFragments initiated={} done={} hasCanceled={} ip={} port={} fragment instance id={}", + backendExecState.initiated, backendExecState.done, backendExecState.hasCanceled, + address.hostname, address.port, DebugUtil.printId(backendExecState.getFragmentInstanceId())); + backendExecState.lock(); try { if (!backendExecState.initiated) { @@ -681,19 +684,15 @@ private void cancelRemoteFragmentsAsync() { if (backendExecState.hasCanceled) { continue; } - TNetworkAddress address = backendExecState.getBackendAddress(); - TNetworkAddress brcAddress = toBrpcHost(address); - - LOG.info("cancelRemoteFragments ip={} port={} rpcParams={}", address.hostname, address.port, - DebugUtil.printId(backendExecState.getFragmentInstanceId())); + TNetworkAddress brpcAddress = toBrpcHost(address); try { BackendServiceProxy.getInstance().cancelPlanFragmentAsync( - brcAddress, backendExecState.getFragmentInstanceId()); + brpcAddress, backendExecState.getFragmentInstanceId()); } catch (RpcException e) { LOG.warn("cancel plan fragment get a exception, address={}:{}", - brcAddress.getHostname(), brcAddress.getPort()); - SimpleScheduler.updateBlacklistBackends(addressToBackendID.get(brcAddress)); + brpcAddress.getHostname(), brpcAddress.getPort()); + SimpleScheduler.updateBlacklistBackends(addressToBackendID.get(brpcAddress)); } backendExecState.hasCanceled = true;