Skip to content

Commit 015cc7b

Browse files
binbin0325yesAnd92
authored andcommitted
refine config_client listen config (nacos-group#644)
1 parent 6e34ac4 commit 015cc7b

File tree

1 file changed

+67
-55
lines changed

1 file changed

+67
-55
lines changed

clients/config_client/config_client.go

+67-55
Original file line numberDiff line numberDiff line change
@@ -402,71 +402,61 @@ func (client *ConfigClient) startInternal() {
402402
}
403403

404404
func (client *ConfigClient) executeConfigListen() {
405-
listenCachesMap := make(map[int][]cacheData, 16)
406-
needAllSync := time.Since(client.lastAllSyncTime) >= constant.ALL_SYNC_INTERNAL
407-
for _, v := range client.cacheMap.Items() {
408-
cache, ok := v.(cacheData)
405+
var (
406+
needAllSync = time.Since(client.lastAllSyncTime) >= constant.ALL_SYNC_INTERNAL
407+
hasChangedKeys = false
408+
)
409+
410+
listenTaskMap := client.buildListenTask(needAllSync)
411+
if len(listenTaskMap) == 0 {
412+
return
413+
}
414+
415+
for taskId, caches := range listenTaskMap {
416+
request := buildConfigBatchListenRequest(caches)
417+
rpcClient := client.configProxy.createRpcClient(client.ctx, fmt.Sprintf("%d", taskId), client)
418+
iResponse, err := client.configProxy.requestProxy(rpcClient, request, 3000)
419+
if err != nil {
420+
logger.Warnf("ConfigBatchListenRequest failure, err:%v", err)
421+
continue
422+
}
423+
if iResponse == nil {
424+
logger.Warnf("ConfigBatchListenRequest failure, response is nil")
425+
continue
426+
}
427+
if !iResponse.IsSuccess() {
428+
logger.Warnf("ConfigBatchListenRequest failure, error code:%d", iResponse.GetErrorCode())
429+
continue
430+
}
431+
response, ok := iResponse.(*rpc_response.ConfigChangeBatchListenResponse)
409432
if !ok {
410433
continue
411434
}
412435

413-
if cache.isSyncWithServer {
414-
if cache.md5 != cache.cacheDataListener.lastMd5 {
415-
cache.executeListener()
416-
}
417-
if !needAllSync {
418-
continue
436+
if len(response.ChangedConfigs) > 0 {
437+
hasChangedKeys = true
438+
}
439+
changeKeys := make(map[string]struct{}, len(response.ChangedConfigs))
440+
for _, v := range response.ChangedConfigs {
441+
changeKey := util.GetConfigCacheKey(v.DataId, v.Group, v.Tenant)
442+
changeKeys[changeKey] = struct{}{}
443+
if value, ok := client.cacheMap.Get(changeKey); ok {
444+
cData := value.(cacheData)
445+
client.refreshContentAndCheck(cData, !cData.isInitializing)
419446
}
420447
}
421448

422-
cacheDatas := listenCachesMap[cache.taskId]
423-
cacheDatas = append(cacheDatas, cache)
424-
listenCachesMap[cache.taskId] = cacheDatas
425-
}
426-
hasChangedKeys := false
427-
if len(listenCachesMap) > 0 {
428-
for taskId, listenCaches := range listenCachesMap {
429-
request := buildConfigBatchListenRequest(listenCaches)
430-
rpcClient := client.configProxy.createRpcClient(client.ctx, fmt.Sprintf("%d", taskId), client)
431-
iResponse, err := client.configProxy.requestProxy(rpcClient, request, 3000)
432-
if err != nil {
433-
logger.Warnf("ConfigBatchListenRequest failure,err:%+v", err)
434-
continue
435-
}
436-
if iResponse == nil {
437-
logger.Warnf("ConfigBatchListenRequest failure, response is nil")
449+
for _, v := range caches {
450+
changeKey := util.GetConfigCacheKey(v.dataId, v.group, v.tenant)
451+
if _, ok := changeKeys[changeKey]; !ok {
452+
v.isSyncWithServer = true
453+
client.cacheMap.Set(changeKey, v)
438454
continue
439455
}
440-
if !iResponse.IsSuccess() {
441-
logger.Warnf("ConfigBatchListenRequest failure, error code:%+v", iResponse.GetErrorCode())
442-
continue
443-
}
444-
changeKeys := make(map[string]struct{})
445-
if response, ok := iResponse.(*rpc_response.ConfigChangeBatchListenResponse); ok {
446-
if len(response.ChangedConfigs) > 0 {
447-
hasChangedKeys = true
448-
for _, v := range response.ChangedConfigs {
449-
changeKey := util.GetConfigCacheKey(v.DataId, v.Group, v.Tenant)
450-
changeKeys[changeKey] = struct{}{}
451-
if cache, ok := client.cacheMap.Get(changeKey); !ok {
452-
continue
453-
} else {
454-
cacheData := cache.(cacheData)
455-
client.refreshContentAndCheck(cacheData, !cacheData.isInitializing)
456-
}
457-
}
458-
}
459-
460-
for _, v := range listenCaches {
461-
changeKey := util.GetConfigCacheKey(v.dataId, v.group, v.tenant)
462-
if _, ok := changeKeys[changeKey]; !ok {
463-
v.isSyncWithServer = true
464-
continue
465-
}
466-
v.isInitializing = true
467-
}
468-
}
456+
v.isInitializing = true
457+
client.cacheMap.Set(changeKey, v)
469458
}
459+
470460
}
471461
if needAllSync {
472462
client.lastAllSyncTime = time.Now()
@@ -509,6 +499,28 @@ func (client *ConfigClient) refreshContentAndCheck(cacheData cacheData, notify b
509499
}
510500
}
511501

502+
func (client *ConfigClient) buildListenTask(needAllSync bool) map[int][]cacheData {
503+
listenTaskMap := make(map[int][]cacheData, 8)
504+
505+
for _, v := range client.cacheMap.Items() {
506+
data, ok := v.(cacheData)
507+
if !ok {
508+
continue
509+
}
510+
511+
if data.isSyncWithServer {
512+
if data.md5 != data.cacheDataListener.lastMd5 {
513+
data.executeListener()
514+
}
515+
if !needAllSync {
516+
continue
517+
}
518+
}
519+
listenTaskMap[data.taskId] = append(listenTaskMap[data.taskId], data)
520+
}
521+
return listenTaskMap
522+
}
523+
512524
func (client *ConfigClient) asyncNotifyListenConfig() {
513525
go func() {
514526
client.listenExecute <- struct{}{}

0 commit comments

Comments
 (0)