Skip to content

Commit

Permalink
Collect in progress segment download and upload gauges for controller
Browse files Browse the repository at this point in the history
  • Loading branch information
gviedma committed Aug 22, 2022
1 parent 718f41f commit 50b6063
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,14 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
MISSING_CONSUMING_SEGMENT_NEW_PARTITION_COUNT("missingConsumingSegmentNewPartitionCount", false),

// Maximum duration of a missing consuming segment in ideal state (in minutes)
MISSING_CONSUMING_SEGMENT_MAX_DURATION_MINUTES("missingSegmentsMaxDurationInMinutes", false);
MISSING_CONSUMING_SEGMENT_MAX_DURATION_MINUTES("missingSegmentsMaxDurationInMinutes", false),

// Number of in progress segment downloads
CONTROLLER_SEGMENT_DOWNLOADS_IN_PROGRESS_COUNT("segmentDownloadsInProgressCount", true),

// Number of in progress segment uploads
CONTROLLER_SEGMENT_UPLOADS_IN_PROGRESS_COUNT("segmentUploadsInProgressCount", true);


private final String _gaugeName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.resources;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.container.ResourceInfo;
import javax.ws.rs.core.Context;
import javax.ws.rs.ext.Provider;
import javax.ws.rs.ext.WriterInterceptor;
import javax.ws.rs.ext.WriterInterceptorContext;
import org.apache.pinot.common.metrics.ControllerMetrics;


/**
* A class that implements a JXRS request filter and writer interceptor to track the number of in progress requests
* using the gauge specified by the {@link TrackedByGauge} method annotation.
* The gauge specified by this annotation will be incremented when the request starts and decremented once the
* request has completed processing.
*
* See {@link PinotSegmentUploadDownloadRestletResource#downloadSegment} for an example of its usage.
*/
@Singleton
@Provider
@TrackInflightRequestMetrics
public class InflightRequestMetricsInterceptor implements ContainerRequestFilter, WriterInterceptor {

@Inject
ControllerMetrics _controllerMetrics;

@Context
ResourceInfo _resourceInfo;

public InflightRequestMetricsInterceptor() {
}

@VisibleForTesting
public InflightRequestMetricsInterceptor(ControllerMetrics controllerMetrics, ResourceInfo resourceInfo) {
_controllerMetrics = controllerMetrics;
_resourceInfo = resourceInfo;
}

@Override
public void filter(ContainerRequestContext req)
throws IOException {
TrackedByGauge trackedByGauge = _resourceInfo.getResourceMethod().getAnnotation(TrackedByGauge.class);
if (trackedByGauge != null) {
_controllerMetrics.addValueToGlobalGauge(trackedByGauge.gauge(), 1L);
}
}

@Override
public void aroundWriteTo(WriterInterceptorContext ctx)
throws IOException, WebApplicationException {
try {
ctx.proceed();
} finally {
TrackedByGauge trackedByGauge = _resourceInfo.getResourceMethod().getAnnotation(TrackedByGauge.class);
if (trackedByGauge != null) {
_controllerMetrics.addValueToGlobalGauge(trackedByGauge.gauge(), -1L);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
Expand Down Expand Up @@ -139,6 +140,8 @@ public class PinotSegmentUploadDownloadRestletResource {
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Path("/segments/{tableName}/{segmentName}")
@ApiOperation(value = "Download a segment", notes = "Download a segment")
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.CONTROLLER_SEGMENT_DOWNLOADS_IN_PROGRESS_COUNT)
public Response downloadSegment(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
Expand Down Expand Up @@ -463,6 +466,8 @@ private void decryptFile(String crypterClassName, File tempEncryptedFile, File t
@ApiResponse(code = 412, message = "CRC check fails"),
@ApiResponse(code = 500, message = "Internal error")
})
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.CONTROLLER_SEGMENT_UPLOADS_IN_PROGRESS_COUNT)
// We use this endpoint with URI upload because a request sent with the multipart content type will reject the POST
// request if a multipart object is not sent. This endpoint does not move the segment to its final location;
// it keeps it at the downloadURI header that is set. We will not support this endpoint going forward.
Expand Down Expand Up @@ -501,6 +506,8 @@ public void uploadSegmentAsJson(String segmentJsonStr,
@ApiResponse(code = 412, message = "CRC check fails"),
@ApiResponse(code = 500, message = "Internal error")
})
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.CONTROLLER_SEGMENT_UPLOADS_IN_PROGRESS_COUNT)
// For the multipart endpoint, we will always move segment to final location regardless of the segment endpoint.
public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart,
@ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
Expand Down Expand Up @@ -537,6 +544,8 @@ public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart,
@ApiResponse(code = 412, message = "CRC check fails"),
@ApiResponse(code = 500, message = "Internal error")
})
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.CONTROLLER_SEGMENT_UPLOADS_IN_PROGRESS_COUNT)
// We use this endpoint with URI upload because a request sent with the multipart content type will reject the POST
// request if a multipart object is not sent. This endpoint is recommended for use. It differs from the first
// endpoint in how it moves the segment to a Pinot-determined final directory.
Expand Down Expand Up @@ -576,6 +585,8 @@ public void uploadSegmentAsJsonV2(String segmentJsonStr,
@ApiResponse(code = 412, message = "CRC check fails"),
@ApiResponse(code = 500, message = "Internal error")
})
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.CONTROLLER_SEGMENT_UPLOADS_IN_PROGRESS_COUNT)
// This behavior does not differ from v1 of the same endpoint.
public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
@ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.resources;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.ws.rs.NameBinding;

/**
* JAX-RS annotation used to enable request metrics interceptor {@link InflightRequestMetricsInterceptor}.
*
* See {@link PinotSegmentUploadDownloadRestletResource#downloadSegment} for an example of its usage.
*/
@NameBinding
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface TrackInflightRequestMetrics {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.resources;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.apache.pinot.common.metrics.ControllerGauge;


/**
* A method-level annotation for tracking inflight request metrics using {@link InflightRequestMetricsInterceptor}.
* The gauge specified by this annotation will be incremented when the request starts and decremented once the request
* has completed processing.
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface TrackedByGauge {
ControllerGauge gauge();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.resources;

import java.io.IOException;
import java.lang.reflect.Method;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ResourceInfo;
import javax.ws.rs.ext.WriterInterceptorContext;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;


public class InflightRequestMetricsInterceptorTest {

@Mock
private ControllerMetrics _controllerMetrics;

@Mock
private ResourceInfo _resourceInfo;

@Mock
private ContainerRequestContext _containerRequestContext;

@Mock
private WriterInterceptorContext _writerInterceptorContext;

private InflightRequestMetricsInterceptor _interceptor;

@BeforeMethod
public void setUp() {
initMocks(this);
_interceptor = new InflightRequestMetricsInterceptor(_controllerMetrics, _resourceInfo);
}

@Test
public void testContainerRequestFilterIncrementsGauge()
throws Exception {
Method methodOne = TrackedClass.class.getDeclaredMethod("trackedMethod");
when(_resourceInfo.getResourceMethod()).thenReturn(methodOne);
_interceptor.filter(_containerRequestContext);
verify(_controllerMetrics)
.addValueToGlobalGauge(ControllerGauge.CONTROLLER_SEGMENT_DOWNLOADS_IN_PROGRESS_COUNT, 1L);
}

@Test
public void testWriterInterceptorDecrementsGauge()
throws Exception {
Method methodOne = TrackedClass.class.getDeclaredMethod("trackedMethod");
when(_resourceInfo.getResourceMethod()).thenReturn(methodOne);
_interceptor.aroundWriteTo(_writerInterceptorContext);
verify(_controllerMetrics)
.addValueToGlobalGauge(ControllerGauge.CONTROLLER_SEGMENT_DOWNLOADS_IN_PROGRESS_COUNT, -1L);
}

@Test(expectedExceptions = IOException.class)
public void testWriterInterceptorDecrementsGaugeWhenWriterThrowsException()
throws Exception {
Method methodOne = TrackedClass.class.getDeclaredMethod("trackedMethod");
when(_resourceInfo.getResourceMethod()).thenReturn(methodOne);
doThrow(new IOException()).when(_writerInterceptorContext).proceed();
try {
_interceptor.aroundWriteTo(_writerInterceptorContext);
} finally {
verify(_controllerMetrics).addValueToGlobalGauge(ControllerGauge.CONTROLLER_SEGMENT_DOWNLOADS_IN_PROGRESS_COUNT, -1L);
}
}

private static class TrackedClass {
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.CONTROLLER_SEGMENT_DOWNLOADS_IN_PROGRESS_COUNT)
public void trackedMethod() {
}
}
}

0 comments on commit 50b6063

Please sign in to comment.