Skip to content

Commit

Permalink
Add absolute time fetch method in Threadpool
Browse files Browse the repository at this point in the history
Signed-off-by: Sandesh Kumar <[email protected]>
  • Loading branch information
sandeshkr419 committed Jun 19, 2023
1 parent c41ec05 commit 08d0289
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;

import java.util.function.LongSupplier;

/**
* Main Cluster Manager Node Service
*
Expand All @@ -23,8 +21,4 @@ public class ClusterManagerService extends MasterService {
public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool);
}

public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, LongSupplier nanoTimeSupplier) {
super(settings, clusterSettings, threadPool, nanoTimeSupplier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -132,13 +131,7 @@ public class MasterService extends AbstractLifecycleComponent {
protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler;
private final ClusterManagerThrottlingStats throttlingStats;

private final LongSupplier nanoTimeSupplier;

public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(settings, clusterSettings, threadPool, System::nanoTime);
}

public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, LongSupplier timeProvider) {
this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));

this.slowTaskLoggingThreshold = CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
Expand All @@ -155,7 +148,6 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
throttlingStats
);
this.threadPool = threadPool;
this.nanoTimeSupplier = timeProvider;
}

private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down Expand Up @@ -299,14 +291,14 @@ private void runTasks(TaskInputs taskInputs) {
return;
}

final long computationStartTime = nanoTimeSupplier.getAsLong();
final long computationStartTime = threadPool.absoluteTimeInNanos();
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);
taskOutputs.notifyFailedTasks();
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", summary);

if (taskOutputs.clusterStateUnchanged()) {
final long notificationStartTime = nanoTimeSupplier.getAsLong();
final long notificationStartTime = threadPool.absoluteTimeInNanos();
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
final TimeValue executionTime = getTimeSince(notificationStartTime);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
Expand All @@ -317,7 +309,7 @@ private void runTasks(TaskInputs taskInputs) {
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
}
final long publicationStartTime = nanoTimeSupplier.getAsLong();
final long publicationStartTime = threadPool.absoluteTimeInNanos();
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
Expand All @@ -344,7 +336,7 @@ private void runTasks(TaskInputs taskInputs) {
}

private TimeValue getTimeSince(long startTimeNanos) {
return TimeValue.timeValueMillis(TimeValue.nsecToMSec(nanoTimeSupplier.getAsLong() - startTimeNanos));
return TimeValue.timeValueMillis(TimeValue.nsecToMSec(threadPool.absoluteTimeInNanos() - startTimeNanos));
}

protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) {
Expand All @@ -366,7 +358,7 @@ protected boolean blockingAllowed() {
}

void onPublicationSuccess(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs) {
final long notificationStartTime = nanoTimeSupplier.getAsLong();
final long notificationStartTime = threadPool.absoluteTimeInNanos();
taskOutputs.processedDifferentClusterState(clusterChangedEvent.previousState(), clusterChangedEvent.state());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,15 @@ public long relativeTimeInNanos() {
return cachedTimeThread.relativeTimeInNanos();
}

/**
* Returns a value of nanoseconds that may be used for absolute time calculations.
*
* This method can be used for calculating precise time deltas.
*/
public long absoluteTimeInNanos() {
return System.nanoTime();
}

/**
* Returns the value of milliseconds since UNIX epoch.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
Expand All @@ -98,15 +97,16 @@
public class MasterServiceTests extends OpenSearchTestCase {

private static ThreadPool threadPool;
private static Long relativeTimeInMillis;

private static LongSupplier nanoTimeSupplier() {
return () -> relativeTimeInMillis * 1000000L;
}
private static long timeDiffInMillis;

@BeforeClass
public static void createThreadPool() {
threadPool = new TestThreadPool(MasterServiceTests.class.getName());
threadPool = new TestThreadPool(MasterServiceTests.class.getName()) {
@Override
public long absoluteTimeInNanos() {
return timeDiffInMillis * TimeValue.NSEC_PER_MSEC;
}
};
}

@AfterClass
Expand All @@ -119,7 +119,7 @@ public static void stopThreadPool() {

@Before
public void randomizeCurrentTime() {
relativeTimeInMillis = randomLongBetween(0L, 1L << 50);
timeDiffInMillis = randomLongBetween(0L, 1L << 50);
}

private ClusterManagerService createClusterManagerService(boolean makeClusterManager) {
Expand All @@ -130,8 +130,7 @@ private ClusterManagerService createClusterManagerService(boolean makeClusterMan
.put(Node.NODE_NAME_SETTING.getKey(), "test_node")
.build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
nanoTimeSupplier()
threadPool
);
final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName()))
.nodes(
Expand Down Expand Up @@ -427,7 +426,7 @@ public void testClusterStateUpdateLogging() throws Exception {
clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(1).millis();
timeDiffInMillis += TimeValue.timeValueSeconds(1).millis();
return currentState;
}

Expand All @@ -442,7 +441,7 @@ public void onFailure(String source, Exception e) {
clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(2).millis();
timeDiffInMillis += TimeValue.timeValueSeconds(2).millis();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
}

Expand All @@ -457,13 +456,13 @@ public void onFailure(String source, Exception e) {}
clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(3).millis();
timeDiffInMillis += TimeValue.timeValueSeconds(3).millis();
return ClusterState.builder(currentState).incrementVersion().build();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(4).millis();
timeDiffInMillis += TimeValue.timeValueSeconds(4).millis();
}

@Override
Expand Down Expand Up @@ -1063,8 +1062,7 @@ public void testLongClusterStateUpdateLogging() throws Exception {
.put(Node.NODE_NAME_SETTING.getKey(), "test_node")
.build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
nanoTimeSupplier()
threadPool
)
) {

Expand All @@ -1082,12 +1080,12 @@ public void testLongClusterStateUpdateLogging() throws Exception {
final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> {
if (event.source().contains("test5")) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
}
if (event.source().contains("test6")) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
throw new OpenSearchException("simulated error during slow publication which should trigger logging");
Expand All @@ -1103,7 +1101,7 @@ public void testLongClusterStateUpdateLogging() throws Exception {
clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += randomLongBetween(
timeDiffInMillis += randomLongBetween(
0L,
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
);
Expand All @@ -1126,7 +1124,7 @@ public void onFailure(String source, Exception e) {
clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
Expand All @@ -1145,7 +1143,7 @@ public void onFailure(String source, Exception e) {
clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
return ClusterState.builder(currentState).incrementVersion().build();
Expand All @@ -1164,7 +1162,7 @@ public void onFailure(String source, Exception e) {
clusterManagerService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
return currentState;
Expand Down

0 comments on commit 08d0289

Please sign in to comment.