Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to track controller segment download and upload requests in progress #9258

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,14 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
MISSING_CONSUMING_SEGMENT_NEW_PARTITION_COUNT("missingConsumingSegmentNewPartitionCount", false),

// Maximum duration of a missing consuming segment in ideal state (in minutes)
MISSING_CONSUMING_SEGMENT_MAX_DURATION_MINUTES("missingSegmentsMaxDurationInMinutes", false);
MISSING_CONSUMING_SEGMENT_MAX_DURATION_MINUTES("missingSegmentsMaxDurationInMinutes", false),

// Number of in progress segment downloads
SEGMENT_DOWNLOADS_IN_PROGRESS("segmentDownloadsInProgress", true),

// Number of in progress segment uploads
SEGMENT_UPLOADS_IN_PROGRESS("segmentUploadsInProgress", true);


private final String _gaugeName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.resources;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.container.ResourceInfo;
import javax.ws.rs.core.Context;
import javax.ws.rs.ext.Provider;
import javax.ws.rs.ext.WriterInterceptor;
import javax.ws.rs.ext.WriterInterceptorContext;
import org.apache.pinot.common.metrics.ControllerMetrics;


/**
* A class that implements a JXRS request filter and writer interceptor to track the number of in progress requests
* using the gauge specified by the {@link TrackedByGauge} method annotation.
* The gauge specified by this annotation will be incremented when the request starts and decremented once the
* request has completed processing.
*
* See {@link PinotSegmentUploadDownloadRestletResource#downloadSegment} for an example of its usage.
*/
@Singleton
@Provider
@TrackInflightRequestMetrics
public class InflightRequestMetricsInterceptor implements ContainerRequestFilter, WriterInterceptor {

@Inject
ControllerMetrics _controllerMetrics;

@Context
ResourceInfo _resourceInfo;

public InflightRequestMetricsInterceptor() {
}

@VisibleForTesting
public InflightRequestMetricsInterceptor(ControllerMetrics controllerMetrics, ResourceInfo resourceInfo) {
_controllerMetrics = controllerMetrics;
_resourceInfo = resourceInfo;
}

@Override
public void filter(ContainerRequestContext req)
throws IOException {
TrackedByGauge trackedByGauge = _resourceInfo.getResourceMethod().getAnnotation(TrackedByGauge.class);
if (trackedByGauge != null) {
_controllerMetrics.addValueToGlobalGauge(trackedByGauge.gauge(), 1L);
}
}

@Override
public void aroundWriteTo(WriterInterceptorContext ctx)
throws IOException, WebApplicationException {
try {
ctx.proceed();
} finally {
TrackedByGauge trackedByGauge = _resourceInfo.getResourceMethod().getAnnotation(TrackedByGauge.class);
if (trackedByGauge != null) {
_controllerMetrics.addValueToGlobalGauge(trackedByGauge.gauge(), -1L);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
Expand Down Expand Up @@ -348,6 +349,8 @@ public String segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
@Authenticate(AccessType.CREATE)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.MULTIPART_FORM_DATA)
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
public String segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String instanceId,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
Expand Down Expand Up @@ -139,6 +140,8 @@ public class PinotSegmentUploadDownloadRestletResource {
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Path("/segments/{tableName}/{segmentName}")
@ApiOperation(value = "Download a segment", notes = "Download a segment")
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_DOWNLOADS_IN_PROGRESS)
public Response downloadSegment(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
Expand Down Expand Up @@ -463,6 +466,8 @@ private void decryptFile(String crypterClassName, File tempEncryptedFile, File t
@ApiResponse(code = 412, message = "CRC check fails"),
@ApiResponse(code = 500, message = "Internal error")
})
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
// We use this endpoint with URI upload because a request sent with the multipart content type will reject the POST
// request if a multipart object is not sent. This endpoint does not move the segment to its final location;
// it keeps it at the downloadURI header that is set. We will not support this endpoint going forward.
Expand Down Expand Up @@ -501,6 +506,8 @@ public void uploadSegmentAsJson(String segmentJsonStr,
@ApiResponse(code = 412, message = "CRC check fails"),
@ApiResponse(code = 500, message = "Internal error")
})
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
// For the multipart endpoint, we will always move segment to final location regardless of the segment endpoint.
public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart,
@ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
Expand Down Expand Up @@ -537,6 +544,8 @@ public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart,
@ApiResponse(code = 412, message = "CRC check fails"),
@ApiResponse(code = 500, message = "Internal error")
})
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
// We use this endpoint with URI upload because a request sent with the multipart content type will reject the POST
// request if a multipart object is not sent. This endpoint is recommended for use. It differs from the first
// endpoint in how it moves the segment to a Pinot-determined final directory.
Expand Down Expand Up @@ -576,6 +585,8 @@ public void uploadSegmentAsJsonV2(String segmentJsonStr,
@ApiResponse(code = 412, message = "CRC check fails"),
@ApiResponse(code = 500, message = "Internal error")
})
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
// This behavior does not differ from v1 of the same endpoint.
public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
@ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.resources;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.ws.rs.NameBinding;

/**
* JAX-RS annotation used to enable request metrics interceptor {@link InflightRequestMetricsInterceptor}.
*
* See {@link PinotSegmentUploadDownloadRestletResource#downloadSegment} for an example of its usage.
*/
@NameBinding
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface TrackInflightRequestMetrics {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.resources;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.apache.pinot.common.metrics.ControllerGauge;


/**
* A method-level annotation for tracking inflight request metrics using {@link InflightRequestMetricsInterceptor}.
* The gauge specified by this annotation will be incremented when the request starts and decremented once the request
* has completed processing.
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface TrackedByGauge {
ControllerGauge gauge();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.resources;

import java.io.IOException;
import java.lang.reflect.Method;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ResourceInfo;
import javax.ws.rs.ext.WriterInterceptorContext;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.mockito.Mock;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;


public class InflightRequestMetricsInterceptorTest {

@Mock
private ControllerMetrics _controllerMetrics;

@Mock
private ResourceInfo _resourceInfo;

@Mock
private ContainerRequestContext _containerRequestContext;

@Mock
private WriterInterceptorContext _writerInterceptorContext;

private InflightRequestMetricsInterceptor _interceptor;

@BeforeMethod
public void setUp() {
initMocks(this);
_interceptor = new InflightRequestMetricsInterceptor(_controllerMetrics, _resourceInfo);
}

@Test
public void testContainerRequestFilterIncrementsGauge()
throws Exception {
Method methodOne = TrackedClass.class.getDeclaredMethod("trackedMethod");
when(_resourceInfo.getResourceMethod()).thenReturn(methodOne);
_interceptor.filter(_containerRequestContext);
verify(_controllerMetrics)
.addValueToGlobalGauge(ControllerGauge.SEGMENT_DOWNLOADS_IN_PROGRESS, 1L);
}

@Test
public void testWriterInterceptorDecrementsGauge()
throws Exception {
Method methodOne = TrackedClass.class.getDeclaredMethod("trackedMethod");
when(_resourceInfo.getResourceMethod()).thenReturn(methodOne);
_interceptor.aroundWriteTo(_writerInterceptorContext);
verify(_controllerMetrics)
.addValueToGlobalGauge(ControllerGauge.SEGMENT_DOWNLOADS_IN_PROGRESS, -1L);
}

@Test(expectedExceptions = IOException.class)
public void testWriterInterceptorDecrementsGaugeWhenWriterThrowsException()
throws Exception {
Method methodOne = TrackedClass.class.getDeclaredMethod("trackedMethod");
when(_resourceInfo.getResourceMethod()).thenReturn(methodOne);
doThrow(new IOException()).when(_writerInterceptorContext).proceed();
try {
_interceptor.aroundWriteTo(_writerInterceptorContext);
} finally {
verify(_controllerMetrics).addValueToGlobalGauge(ControllerGauge.SEGMENT_DOWNLOADS_IN_PROGRESS, -1L);
}
}

private static class TrackedClass {
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_DOWNLOADS_IN_PROGRESS)
public void trackedMethod() {
}
}
}