Skip to content

Commit

Permalink
[ISSUE #6910] Extract the interval of fetchNameServerAddr
Browse files Browse the repository at this point in the history
Co-authored-by: RongtongJin <[email protected]>
  • Loading branch information
totalo and RongtongJin authored Jun 23, 2023
1 parent fd4a397 commit 5d6ee63
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,6 @@
package org.apache.rocketmq.broker;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
Expand Down Expand Up @@ -159,6 +134,32 @@
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

public class BrokerController {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
Expand Down Expand Up @@ -720,7 +721,7 @@ public void run() {
LOG.error("Failed to fetch nameServer address", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}, 1000 * 10, this.brokerConfig.getFetchNamesrvAddrInterval(), TimeUnit.MILLISECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
*/
package org.apache.rocketmq.common;

import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.metrics.MetricsExporterType;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;

import java.util.concurrent.TimeUnit;

public class BrokerConfig extends BrokerIdentity {

private String brokerConfigPath = null;
Expand Down Expand Up @@ -374,6 +375,11 @@ public class BrokerConfig extends BrokerIdentity {
private boolean usePIDColdCtrStrategy = true;
private long cgColdReadThreshold = 3 * 1024 * 1024;
private long globalColdReadThreshold = 100 * 1024 * 1024;

/**
* The interval to fetch namesrv addr, default value is 10 second
*/
private long fetchNamesrvAddrInterval = 10 * 1000;

public long getMaxPopPollingSize() {
return maxPopPollingSize;
Expand Down Expand Up @@ -1662,4 +1668,12 @@ public boolean isUseStaticSubscription() {
public void setUseStaticSubscription(boolean useStaticSubscription) {
this.useStaticSubscription = useStaticSubscription;
}

public long getFetchNamesrvAddrInterval() {
return fetchNamesrvAddrInterval;
}

public void setFetchNamesrvAddrInterval(final long fetchNamesrvAddrInterval) {
this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.rocketmq.common;

import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
Expand All @@ -27,6 +25,9 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class BrokerIdentity {
private static final String DEFAULT_CLUSTER_NAME = "DefaultCluster";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@
*/
package org.apache.rocketmq.container;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
Expand All @@ -50,6 +39,18 @@
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class BrokerContainer implements IBrokerContainer {
private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

Expand Down Expand Up @@ -177,7 +178,7 @@ public void run0() {
LOG.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}, 1000 * 10, this.brokerContainerConfig.getFetchNamesrvAddrInterval(), TimeUnit.MILLISECONDS);
}

this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public class BrokerContainerConfig {
private String brokerContainerIP = NetworkUtil.getLocalAddress();

private String brokerConfigPaths = null;

/**
* The interval to fetch namesrv addr, default value is 10 second
*/
private long fetchNamesrvAddrInterval = 10 * 1000;

public String getRocketmqHome() {
return rocketmqHome;
Expand Down Expand Up @@ -82,5 +87,12 @@ public String getBrokerConfigPaths() {
public void setBrokerConfigPaths(String brokerConfigPaths) {
this.brokerConfigPaths = brokerConfigPaths;
}


public long getFetchNamesrvAddrInterval() {
return fetchNamesrvAddrInterval;
}

public void setFetchNamesrvAddrInterval(final long fetchNamesrvAddrInterval) {
this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval;
}
}

0 comments on commit 5d6ee63

Please sign in to comment.