Skip to content

Commit

Permalink
fix naming memory leak (alibaba#10606)
Browse files Browse the repository at this point in the history
* fix  naming memory leak

* fix test case

* fix test case
  • Loading branch information
shiyiyue1102 authored and wukong121 committed Aug 3, 2023
1 parent 579940a commit 3aecd68
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,15 @@ public static CredentialService getInstance() {

public static CredentialService getInstance(String appName) {
String key = appName != null ? appName : IdentifyConstants.NO_APP_NAME;
CredentialService instance = INSTANCES.get(key);
if (instance != null) {
return instance;
}
instance = new CredentialService(appName);
CredentialService previous = INSTANCES.putIfAbsent(key, instance);
if (previous != null) {
instance = previous;
if (INSTANCES.get(key) == null) {
synchronized (CredentialService.INSTANCES) {
if (INSTANCES.get(key) == null) {
CredentialService instance = new CredentialService(appName);
INSTANCES.putIfAbsent(key, instance);
}
}
}
return instance;
return INSTANCES.get(key);
}

public static CredentialService freeInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,5 +466,7 @@ public String getServerStatus() {
public void shutDown() throws NacosException {
serviceInfoHolder.shutdown();
clientProxy.shutdown();
NotifyCenter.deregisterSubscriber(changeNotifier);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, Se
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
labels.put(Constants.APPNAME, AppNameUtils.getAppName());
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels, RpcClientTlsConfig.properties(properties.asProperties()));
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels,
RpcClientTlsConfig.properties(properties.asProperties()));
this.redoService = new NamingGrpcRedoService(this);
NAMING_LOGGER.info("Create naming rpc client for uuid->{}", uuid);
start(serverListFactory, serviceInfoHolder);
}

Expand Down Expand Up @@ -376,8 +378,21 @@ private <T extends Response> T requestToServer(AbstractNamingRequest request, Cl

@Override
public void shutdown() throws NacosException {
rpcClient.shutdown();
NAMING_LOGGER.info("Shutdown naming grpc client proxy for uuid->{}", uuid);
redoService.shutdown();
shutDownAndRemove(uuid);
NotifyCenter.deregisterSubscriber(this);
}

private void shutDownAndRemove(String uuid) {
synchronized (RpcClientFactory.getAllClientEntries()) {
try {
RpcClientFactory.destroyClient(uuid);
NAMING_LOGGER.info("shutdown and remove naming rpc client for uuid ->{}", uuid);
} catch (NacosException e) {
NAMING_LOGGER.warn("Fail to shutdown naming rpc client for uuid ->{}", uuid);
}
}
}

public boolean isEnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.selector.AbstractSelector;
import com.alibaba.nacos.client.naming.event.InstancesChangeEvent;
import com.alibaba.nacos.client.naming.event.InstancesChangeNotifier;
import com.alibaba.nacos.client.naming.remote.NamingClientProxy;
import com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy;
Expand All @@ -46,6 +47,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -60,6 +62,8 @@ public class NacosNamingServiceTest {

private NamingClientProxy proxy;

private String uuid;

private InstancesChangeNotifier changeNotifier;

@Before
Expand All @@ -68,13 +72,16 @@ public void before() throws NoSuchFieldException, NacosException, IllegalAccessE
prop.setProperty("serverAddr", "localhost");
prop.put(PropertyKeyConst.NAMESPACE, "test");
client = new NacosNamingService(prop);

// inject proxy
proxy = mock(NamingHttpClientProxy.class);
Field serverProxyField = NacosNamingService.class.getDeclaredField("clientProxy");
serverProxyField.setAccessible(true);
serverProxyField.set(client, proxy);

// inject notifier
changeNotifier = mock(InstancesChangeNotifier.class);
doReturn(InstancesChangeEvent.class).when(changeNotifier).subscribeType();
Field changeNotifierField = NacosNamingService.class.getDeclaredField("changeNotifier");
changeNotifierField.setAccessible(true);
changeNotifierField.set(client, changeNotifier);
Expand All @@ -89,11 +96,10 @@ public void testRegisterInstance1() throws NacosException {
//when
client.registerInstance(serviceName, ip, port);
//then
verify(proxy, times(1))
.registerService(eq(serviceName), eq(Constants.DEFAULT_GROUP), argThat(
instance -> instance.getIp().equals(ip) && instance.getPort() == port
&& Math.abs(instance.getWeight() - 1.0) < 0.01f && instance.getClusterName()
.equals(Constants.DEFAULT_CLUSTER_NAME)));
verify(proxy, times(1)).registerService(eq(serviceName), eq(Constants.DEFAULT_GROUP),
argThat(instance -> instance.getIp().equals(ip) && instance.getPort() == port
&& Math.abs(instance.getWeight() - 1.0) < 0.01f && instance.getClusterName()
.equals(Constants.DEFAULT_CLUSTER_NAME)));
}

@Test
Expand All @@ -111,9 +117,8 @@ public void testBatchRegisterInstance() throws NacosException {
//when
client.batchRegisterInstance(serviceName, Constants.DEFAULT_GROUP, instanceList);
//then
verify(proxy, times(1))
.batchRegisterService(eq(serviceName), eq(Constants.DEFAULT_GROUP), argThat(
instances -> CollectionUtils.isEqualCollection(instanceList, instances)));
verify(proxy, times(1)).batchRegisterService(eq(serviceName), eq(Constants.DEFAULT_GROUP),
argThat(instances -> CollectionUtils.isEqualCollection(instanceList, instances)));
}

@Test
Expand Down Expand Up @@ -147,11 +152,10 @@ public void testRegisterInstance2() throws NacosException {
//when
client.registerInstance(serviceName, groupName, ip, port);
//then
verify(proxy, times(1))
.registerService(eq(serviceName), eq(groupName), argThat(
instance -> instance.getIp().equals(ip) && instance.getPort() == port
&& Math.abs(instance.getWeight() - 1.0) < 0.01f && instance.getClusterName()
.equals(Constants.DEFAULT_CLUSTER_NAME)));
verify(proxy, times(1)).registerService(eq(serviceName), eq(groupName),
argThat(instance -> instance.getIp().equals(ip) && instance.getPort() == port
&& Math.abs(instance.getWeight() - 1.0) < 0.01f && instance.getClusterName()
.equals(Constants.DEFAULT_CLUSTER_NAME)));
}

@Test
Expand All @@ -164,11 +168,10 @@ public void testRegisterInstance3() throws NacosException {
//when
client.registerInstance(serviceName, ip, port, clusterName);
//then
verify(proxy, times(1))
.registerService(eq(serviceName), eq(Constants.DEFAULT_GROUP), argThat(
instance -> instance.getIp().equals(ip) && instance.getPort() == port
&& Math.abs(instance.getWeight() - 1.0) < 0.01f && instance.getClusterName()
.equals(clusterName)));
verify(proxy, times(1)).registerService(eq(serviceName), eq(Constants.DEFAULT_GROUP),
argThat(instance -> instance.getIp().equals(ip) && instance.getPort() == port
&& Math.abs(instance.getWeight() - 1.0) < 0.01f && instance.getClusterName()
.equals(clusterName)));
}

@Test
Expand All @@ -182,11 +185,10 @@ public void testRegisterInstance4() throws NacosException {
//when
client.registerInstance(serviceName, groupName, ip, port, clusterName);
//then
verify(proxy, times(1))
.registerService(eq(serviceName), eq(groupName), argThat(
instance -> instance.getIp().equals(ip) && instance.getPort() == port
&& Math.abs(instance.getWeight() - 1.0) < 0.01f && instance.getClusterName()
.equals(clusterName)));
verify(proxy, times(1)).registerService(eq(serviceName), eq(groupName),
argThat(instance -> instance.getIp().equals(ip) && instance.getPort() == port
&& Math.abs(instance.getWeight() - 1.0) < 0.01f && instance.getClusterName()
.equals(clusterName)));
}

@Test
Expand Down Expand Up @@ -252,11 +254,10 @@ public void testDeregisterInstance2() throws NacosException {
//when
client.deregisterInstance(serviceName, groupName, ip, port);
//then
verify(proxy, times(1))
.deregisterService(eq(serviceName), eq(groupName), argThat(
instance -> instance.getIp().equals(ip) && instance.getPort() == port
&& Math.abs(instance.getWeight() - 1.0) < 0.01f && instance.getClusterName()
.equals(Constants.DEFAULT_CLUSTER_NAME)));
verify(proxy, times(1)).deregisterService(eq(serviceName), eq(groupName),
argThat(instance -> instance.getIp().equals(ip) && instance.getPort() == port
&& Math.abs(instance.getWeight() - 1.0) < 0.01f && instance.getClusterName()
.equals(Constants.DEFAULT_CLUSTER_NAME)));
}

@Test
Expand Down Expand Up @@ -286,11 +287,10 @@ public void testDeregisterInstance4() throws NacosException {
//when
client.deregisterInstance(serviceName, groupName, ip, port, clusterName);
//then
verify(proxy, times(1))
.deregisterService(eq(serviceName), eq(groupName), argThat(
instance -> instance.getIp().equals(ip) && instance.getPort() == port
&& Math.abs(instance.getWeight() - 1.0) < 0.01f && instance.getClusterName()
.equals(clusterName)));
verify(proxy, times(1)).deregisterService(eq(serviceName), eq(groupName),
argThat(instance -> instance.getIp().equals(ip) && instance.getPort() == port
&& Math.abs(instance.getWeight() - 1.0) < 0.01f && instance.getClusterName()
.equals(clusterName)));
}

@Test
Expand Down Expand Up @@ -391,8 +391,8 @@ public void testGetAllInstances7() throws NacosException {
//when
client.getAllInstances(serviceName, clusterList, false);
//then
verify(proxy, times(1))
.queryInstancesOfService(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2", 0, false);
verify(proxy, times(1)).queryInstancesOfService(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2", 0,
false);
}

@Test
Expand Down Expand Up @@ -482,8 +482,8 @@ public void testSelectInstances7() throws NacosException {
//when
client.selectInstances(serviceName, clusterList, true, false);
//then
verify(proxy, times(1))
.queryInstancesOfService(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2", 0, false);
verify(proxy, times(1)).queryInstancesOfService(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2", 0,
false);
}

@Test
Expand Down Expand Up @@ -583,8 +583,8 @@ public void testSelectOneHealthyInstance3() throws NacosException {
hosts.add(healthyInstance);
ServiceInfo infoWithHealthyInstance = new ServiceInfo();
infoWithHealthyInstance.setHosts(hosts);
when(proxy.queryInstancesOfService(anyString(), anyString(), anyString(), anyInt(), anyBoolean()))
.thenReturn(infoWithHealthyInstance);
when(proxy.queryInstancesOfService(anyString(), anyString(), anyString(), anyInt(), anyBoolean())).thenReturn(
infoWithHealthyInstance);

String serviceName = "service1";
//when
Expand All @@ -603,8 +603,8 @@ public void testSelectOneHealthyInstance4() throws NacosException {
hosts.add(healthyInstance);
ServiceInfo infoWithHealthyInstance = new ServiceInfo();
infoWithHealthyInstance.setHosts(hosts);
when(proxy.queryInstancesOfService(anyString(), anyString(), anyString(), anyInt(), anyBoolean()))
.thenReturn(infoWithHealthyInstance);
when(proxy.queryInstancesOfService(anyString(), anyString(), anyString(), anyInt(), anyBoolean())).thenReturn(
infoWithHealthyInstance);

String serviceName = "service1";
String groupName = "group1";
Expand Down Expand Up @@ -667,16 +667,16 @@ public void testSelectOneHealthyInstance7() throws NacosException {
hosts.add(healthyInstance);
ServiceInfo infoWithHealthyInstance = new ServiceInfo();
infoWithHealthyInstance.setHosts(hosts);
when(proxy.queryInstancesOfService(anyString(), anyString(), anyString(), anyInt(), anyBoolean()))
.thenReturn(infoWithHealthyInstance);
when(proxy.queryInstancesOfService(anyString(), anyString(), anyString(), anyInt(), anyBoolean())).thenReturn(
infoWithHealthyInstance);

String serviceName = "service1";
List<String> clusterList = Arrays.asList("cluster1", "cluster2");
//when
client.selectOneHealthyInstance(serviceName, clusterList, false);
//then
verify(proxy, times(1))
.queryInstancesOfService(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2", 0, false);
verify(proxy, times(1)).queryInstancesOfService(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2", 0,
false);
}

@Test
Expand All @@ -689,8 +689,8 @@ public void testSelectOneHealthyInstance8() throws NacosException {
hosts.add(healthyInstance);
ServiceInfo infoWithHealthyInstance = new ServiceInfo();
infoWithHealthyInstance.setHosts(hosts);
when(proxy.queryInstancesOfService(anyString(), anyString(), anyString(), anyInt(), anyBoolean()))
.thenReturn(infoWithHealthyInstance);
when(proxy.queryInstancesOfService(anyString(), anyString(), anyString(), anyInt(), anyBoolean())).thenReturn(
infoWithHealthyInstance);

String serviceName = "service1";
String groupName = "group1";
Expand Down Expand Up @@ -741,8 +741,8 @@ public void testSubscribe3() throws NacosException {
//when
client.subscribe(serviceName, clusterList, listener);
//then
verify(changeNotifier, times(1))
.registerListener(Constants.DEFAULT_GROUP, serviceName, "cluster1,cluster2", listener);
verify(changeNotifier, times(1)).registerListener(Constants.DEFAULT_GROUP, serviceName, "cluster1,cluster2",
listener);
verify(proxy, times(1)).subscribe(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2");
}

Expand Down Expand Up @@ -807,8 +807,8 @@ public void testUnSubscribe3() throws NacosException {
//when
client.unsubscribe(serviceName, clusterList, listener);
//then
verify(changeNotifier, times(1))
.deregisterListener(Constants.DEFAULT_GROUP, serviceName, "cluster1,cluster2", listener);
verify(changeNotifier, times(1)).deregisterListener(Constants.DEFAULT_GROUP, serviceName, "cluster1,cluster2",
listener);
verify(proxy, times(1)).unsubscribe(serviceName, Constants.DEFAULT_GROUP, "cluster1,cluster2");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientConfig;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.remote.client.grpc.GrpcClient;
import com.alibaba.nacos.common.remote.client.grpc.GrpcClientConfig;
Expand Down Expand Up @@ -122,6 +123,8 @@ public class NamingGrpcClientProxyTest {

private Instance instance;

private String uuid;

@Rule
public final ExpectedException thrown = ExpectedException.none();

Expand All @@ -136,10 +139,17 @@ public void setUp() throws NacosException, NoSuchFieldException, IllegalAccessEx

final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
client = new NamingGrpcClientProxy(NAMESPACE_ID, proxy, factory, nacosClientProperties, holder);

Field uuidField = NamingGrpcClientProxy.class.getDeclaredField("uuid");
uuidField.setAccessible(true);
uuid = (String) uuidField.get(client);

Assert.assertTrue(RpcClientFactory.getClient(uuid) != null);
Field rpcClientField = NamingGrpcClientProxy.class.getDeclaredField("rpcClient");
rpcClientField.setAccessible(true);
((RpcClient) rpcClientField.get(client)).shutdown();
rpcClientField.set(client, this.rpcClient);

response = new InstanceResponse();
when(this.rpcClient.request(any())).thenReturn(response);
instance = new Instance();
Expand Down Expand Up @@ -398,7 +408,8 @@ public void testServerHealthy() {
@Test
public void testShutdown() throws Exception {
client.shutdown();
verify(this.rpcClient, times(1)).shutdown();
Assert.assertTrue(RpcClientFactory.getClient(uuid) == null);
//verify(this.rpcClient, times(1)).shutdown();
}

@Test
Expand Down

0 comments on commit 3aecd68

Please sign in to comment.