Skip to content

Commit

Permalink
Improvements to GetWorkTimingInfosTracker when there is clock skew be…
Browse files Browse the repository at this point in the history
…tween the worker and service. (#30990)

- remove spammy log
- use service provided timestamps when scaling
  • Loading branch information
scwhittle authored Apr 16, 2024
1 parent 1a26ead commit a6f3ddf
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
Instant forwardedByDispatcherTiming =
getWorkStreamTimings.get(Event.GET_WORK_FORWARDED_BY_DISPATCHER);
Instant now = Instant.ofEpochMilli(clock.getMillis());
if (forwardedByDispatcherTiming != null) {
if (forwardedByDispatcherTiming != null && now.isAfter(forwardedByDispatcherTiming)) {
Duration newDuration = new Duration(forwardedByDispatcherTiming, now);
aggregatedGetWorkStreamLatencies.compute(
State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
Expand All @@ -134,22 +134,23 @@ List<LatencyAttribution> getLatencyAttributions() {
if (workItemCreationLatency != null) {
latencyAttributions.add(workItemCreationLatency);
}
if (workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
LOG.warn(
"Work item creation time {} is after the work received time {}, "
+ "one or more GetWorkStream timing infos are missing.",
workItemCreationEndTime,
workItemLastChunkReceivedByWorkerTime);
return latencyAttributions;
}
long totalTransmissionDurationElapsedTime =
new Duration(workItemCreationEndTime, workItemLastChunkReceivedByWorkerTime).getMillis();
long totalSumDurationTimeMills = 0;
for (SumAndMaxDurations duration : aggregatedGetWorkStreamLatencies.values()) {
totalSumDurationTimeMills += duration.sum.getMillis();
}
final long finalTotalSumDurationTimeMills = totalSumDurationTimeMills;

long totalTransmissionDurationElapsedTime;
if (workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
LOG.debug(
"Work item creation time {} is after the work received time {}, "
+ "one or more GetWorkStream timing infos are missing. Using raw times without scaling.",
workItemCreationEndTime,
workItemLastChunkReceivedByWorkerTime);
totalTransmissionDurationElapsedTime = finalTotalSumDurationTimeMills;
} else {
totalTransmissionDurationElapsedTime =
new Duration(workItemCreationEndTime, workItemLastChunkReceivedByWorkerTime).getMillis();
}
aggregatedGetWorkStreamLatencies.forEach(
(state, duration) -> {
long scaledDuration =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;

import java.io.InputStream;
import java.io.SequenceInputStream;
Expand Down Expand Up @@ -1202,6 +1198,51 @@ public void testGetWorkTimingInfosTracker() throws Exception {
latencies.get(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER).getTotalDurationMillis());
}

@Test
public void testGetWorkTimingInfosTracker_ClockSkew() throws Exception {
int skewMicros = 50 * 1000;
GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() -> 50);
List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
for (int i = 0; i <= 3; i++) {
infos.add(
GetWorkStreamTimingInfo.newBuilder()
.setEvent(Event.GET_WORK_CREATION_START)
.setTimestampUsec(skewMicros)
.build());
infos.add(
GetWorkStreamTimingInfo.newBuilder()
.setEvent(Event.GET_WORK_CREATION_END)
.setTimestampUsec(10000 + skewMicros)
.build());
infos.add(
GetWorkStreamTimingInfo.newBuilder()
.setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
.setTimestampUsec((i + 11) * 1000 + skewMicros)
.build());
infos.add(
GetWorkStreamTimingInfo.newBuilder()
.setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
.setTimestampUsec((i + 16) * 1000 + skewMicros)
.build());
tracker.addTimingInfo(infos);
infos.clear();
}
// durations for each chunk:
// GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
// GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
// GET_WORK_IN_TRANSIT_TO_USER_WORKER: not observed due to skew
Map<State, LatencyAttribution> latencies = new HashMap<>();
List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
assertEquals(2, attributions.size());
for (LatencyAttribution attribution : attributions) {
latencies.put(attribution.getState(), attribution);
}
assertEquals(10L, latencies.get(State.GET_WORK_IN_WINDMILL_WORKER).getTotalDurationMillis());
assertEquals(
4L, latencies.get(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER).getTotalDurationMillis());
assertNull(latencies.get(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER));
}

class ResponseErrorInjector<Stream extends StreamObserver> {

private final Stream stream;
Expand Down

0 comments on commit a6f3ddf

Please sign in to comment.