Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Zeta] Improve JobMetrics fetch performance #4467

Merged
merged 2 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@
import lombok.NonNull;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;

Expand Down Expand Up @@ -449,53 +451,81 @@ public JobStatus getJobStatus() {
}

public List<RawJobMetrics> getCurrJobMetrics() {
return getCurrJobMetrics(ownedSlotProfilesIMap.values());
return getCurrJobMetrics(
ownedSlotProfilesIMap.keySet().stream()
.filter(
pipelineLocation ->
pipelineLocation.getJobId()
== this.getJobImmutableInformation().getJobId())
.collect(Collectors.toList()));
}

public List<RawJobMetrics> getCurrJobMetrics(List<PipelineLocation> pipelineLocations) {
Map<TaskGroupLocation, Address> taskGroupLocationSlotProfileMap = new HashMap<>();

ownedSlotProfilesIMap.forEach(
(pipelineLocation, map) -> {
if (pipelineLocations.contains(pipelineLocation)) {
map.forEach(
(taskGroupLocation, slotProfile) -> {
if (taskGroupLocation.getJobId()
== this.getJobImmutableInformation().getJobId()) {
taskGroupLocationSlotProfileMap.put(
taskGroupLocation, slotProfile.getWorker());
}
});
}
});

Map<Address, List<TaskGroupLocation>> taskGroupLocationMap = new HashMap<>();

for (Map.Entry<TaskGroupLocation, Address> entry :
taskGroupLocationSlotProfileMap.entrySet()) {
taskGroupLocationMap
.computeIfAbsent(entry.getValue(), k -> new ArrayList<>())
.add(entry.getKey());
}

return getCurrJobMetrics(taskGroupLocationMap);
}

public List<RawJobMetrics> getCurrJobMetrics(
Collection<Map<TaskGroupLocation, SlotProfile>> groupLocations) {
Map<Address, List<TaskGroupLocation>> groupLocationMap) {
List<RawJobMetrics> metrics = new ArrayList<>();
for (Map<TaskGroupLocation, SlotProfile> groupLocation : groupLocations) {
groupLocation.forEach(
(taskGroupLocation, slotProfile) -> {
if (taskGroupLocation.getJobId()
== this.getJobImmutableInformation().getJobId()) {
try {
if (nodeEngine
.getClusterService()
.getMember(slotProfile.getWorker())
!= null) {
RawJobMetrics rawJobMetrics =
(RawJobMetrics)
NodeEngineUtil.sendOperationToMemberNode(
nodeEngine,
new GetTaskGroupMetricsOperation(
taskGroupLocation),
slotProfile.getWorker())
.get();
metrics.add(rawJobMetrics);
}
}
// HazelcastInstanceNotActiveException. It means that the node is
// offline, so waiting for the taskGroup to restore can be successful
catch (HazelcastInstanceNotActiveException e) {
LOGGER.warning(
String.format(
"%s get current job metrics with exception: %s.",
taskGroupLocation, ExceptionUtils.getMessage(e)));
} catch (Exception e) {
throw new SeaTunnelException(e.getMessage());
}

groupLocationMap.forEach(
(address, taskGroupLocations) -> {
try {
if (nodeEngine.getClusterService().getMember(address) != null) {
RawJobMetrics rawJobMetrics =
(RawJobMetrics)
NodeEngineUtil.sendOperationToMemberNode(
nodeEngine,
new GetTaskGroupMetricsOperation(
taskGroupLocations),
address)
.get();
metrics.add(rawJobMetrics);
}
});
}
}
// HazelcastInstanceNotActiveException. It means that the node is
// offline, so waiting for the taskGroup to restore can be successful
catch (HazelcastInstanceNotActiveException e) {
LOGGER.warning(
String.format(
"%s get current job metrics with exception: %s.",
Arrays.toString(taskGroupLocations.toArray()),
ExceptionUtils.getMessage(e)));
} catch (Exception e) {
throw new SeaTunnelEngineException(ExceptionUtils.getMessage(e));
}
});
return metrics;
}

public void savePipelineMetricsToHistory(PipelineLocation pipelineLocation) {
List<RawJobMetrics> currJobMetrics =
this.getCurrJobMetrics(
Collections.singleton(this.getOwnedSlotProfiles(pipelineLocation)));
this.getCurrJobMetrics(Collections.singletonList(pipelineLocation));
JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(currJobMetrics);
long jobId = this.getJobImmutableInformation().getJobId();
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import com.hazelcast.internal.metrics.impl.MetricsCompressor;
import com.hazelcast.logging.ILogger;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

public class JobMetricsCollector implements MetricsCollector {

private final TaskGroupLocation taskGroupLocation;
private final List<String> taskGroupLocationStrs;
private final MetricsCompressor compressor;
private final ILogger logger;
private final UnaryOperator<MetricDescriptor> addPrefixFn;
Expand All @@ -40,7 +43,20 @@ public JobMetricsCollector(TaskGroupLocation taskGroupLocation, Member member, I
Objects.requireNonNull(member, "member");
this.logger = Objects.requireNonNull(logger, "logger");

this.taskGroupLocation = taskGroupLocation;
this.taskGroupLocationStrs = Collections.singletonList(taskGroupLocation.toString());
this.addPrefixFn = JobMetricsUtil.addMemberPrefixFn(member);
this.compressor = new MetricsCompressor();
}

public JobMetricsCollector(
List<TaskGroupLocation> taskGroupLocations, Member member, ILogger logger) {
Objects.requireNonNull(member, "member");
this.logger = Objects.requireNonNull(logger, "logger");

this.taskGroupLocationStrs =
taskGroupLocations.stream()
.map(TaskGroupLocation::toString)
.collect(Collectors.toList());
this.addPrefixFn = JobMetricsUtil.addMemberPrefixFn(member);
this.compressor = new MetricsCompressor();
}
Expand All @@ -49,7 +65,7 @@ public JobMetricsCollector(TaskGroupLocation taskGroupLocation, Member member, I
public void collectLong(MetricDescriptor descriptor, long value) {
String taskGroupLocationStr =
JobMetricsUtil.getTaskGroupLocationFromMetricsDescriptor(descriptor);
if (taskGroupLocation.toString().equals(taskGroupLocationStr)) {
if (taskGroupLocationStrs.contains(taskGroupLocationStr)) {
compressor.addLong(addPrefixFn.apply(descriptor), value);
}
}
Expand All @@ -58,7 +74,7 @@ public void collectLong(MetricDescriptor descriptor, long value) {
public void collectDouble(MetricDescriptor descriptor, double value) {
String taskGroupLocationStr =
JobMetricsUtil.getTaskGroupLocationFromMetricsDescriptor(descriptor);
if (taskGroupLocation.toString().equals(taskGroupLocationStr)) {
if (taskGroupLocationStrs.contains(taskGroupLocationStr)) {
compressor.addDouble(addPrefixFn.apply(descriptor), value);
}
}
Expand All @@ -67,7 +83,7 @@ public void collectDouble(MetricDescriptor descriptor, double value) {
public void collectException(MetricDescriptor descriptor, Exception e) {
String taskGroupLocationStr =
JobMetricsUtil.getTaskGroupLocationFromMetricsDescriptor(descriptor);
if (taskGroupLocation.toString().equals(taskGroupLocationStr)) {
if (taskGroupLocationStrs.contains(taskGroupLocationStr)) {
logger.warning("Exception when rendering job metrics: " + e, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,18 @@
import com.hazelcast.spi.impl.operationservice.Operation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class GetTaskGroupMetricsOperation extends Operation implements IdentifiedDataSerializable {

private TaskGroupLocation taskGroupLocation;
private List<TaskGroupLocation> taskGroupLocations;
private RawJobMetrics response;

public GetTaskGroupMetricsOperation() {}

public GetTaskGroupMetricsOperation(TaskGroupLocation taskGroupLocation) {
this.taskGroupLocation = taskGroupLocation;
public GetTaskGroupMetricsOperation(List<TaskGroupLocation> taskGroupLocations) {
this.taskGroupLocations = taskGroupLocations;
}

@Override
Expand All @@ -56,27 +58,34 @@ public void run() {
"Caller "
+ callerAddress
+ " cannot get taskGroupLocation metrics"
+ taskGroupLocation.toString()
+ taskGroupLocations.toString()
+ " because it is not master. Master is: "
+ masterAddress);
}

JobMetricsCollector metricsRenderer =
new JobMetricsCollector(taskGroupLocation, nodeEngine.getLocalMember(), logger);
new JobMetricsCollector(taskGroupLocations, nodeEngine.getLocalMember(), logger);
nodeEngine.getMetricsRegistry().collect(metricsRenderer);
response = metricsRenderer.getMetrics();
}

@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
out.writeObject(taskGroupLocation);
out.writeInt(taskGroupLocations.size());
for (TaskGroupLocation taskGroupLocation : taskGroupLocations) {
out.writeObject(taskGroupLocation);
}
}

@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
taskGroupLocation = in.readObject();
int size = in.readInt();
this.taskGroupLocations = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
taskGroupLocations.add(in.readObject());
}
}

@Override
Expand Down