Skip to content

Commit

Permalink
Renames metrics and tracks realtime segment uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
gviedma committed Aug 22, 2022
1 parent 50b6063 commit e5faf6f
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
MISSING_CONSUMING_SEGMENT_MAX_DURATION_MINUTES("missingSegmentsMaxDurationInMinutes", false),

// Number of in progress segment downloads
CONTROLLER_SEGMENT_DOWNLOADS_IN_PROGRESS_COUNT("segmentDownloadsInProgressCount", true),
SEGMENT_DOWNLOADS_IN_PROGRESS("segmentDownloadsInProgress", true),

// Number of in progress segment uploads
CONTROLLER_SEGMENT_UPLOADS_IN_PROGRESS_COUNT("segmentUploadsInProgressCount", true);
SEGMENT_UPLOADS_IN_PROGRESS("segmentUploadsInProgress", true);


private final String _gaugeName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
Expand Down Expand Up @@ -348,6 +349,8 @@ public String segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
@Authenticate(AccessType.CREATE)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.MULTIPART_FORM_DATA)
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
public String segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String instanceId,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public class PinotSegmentUploadDownloadRestletResource {
@Path("/segments/{tableName}/{segmentName}")
@ApiOperation(value = "Download a segment", notes = "Download a segment")
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.CONTROLLER_SEGMENT_DOWNLOADS_IN_PROGRESS_COUNT)
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_DOWNLOADS_IN_PROGRESS)
public Response downloadSegment(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
Expand Down Expand Up @@ -467,7 +467,7 @@ private void decryptFile(String crypterClassName, File tempEncryptedFile, File t
@ApiResponse(code = 500, message = "Internal error")
})
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.CONTROLLER_SEGMENT_UPLOADS_IN_PROGRESS_COUNT)
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
// We use this endpoint with URI upload because a request sent with the multipart content type will reject the POST
// request if a multipart object is not sent. This endpoint does not move the segment to its final location;
// it keeps it at the downloadURI header that is set. We will not support this endpoint going forward.
Expand Down Expand Up @@ -507,7 +507,7 @@ public void uploadSegmentAsJson(String segmentJsonStr,
@ApiResponse(code = 500, message = "Internal error")
})
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.CONTROLLER_SEGMENT_UPLOADS_IN_PROGRESS_COUNT)
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
// For the multipart endpoint, we will always move segment to final location regardless of the segment endpoint.
public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart,
@ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
Expand Down Expand Up @@ -545,7 +545,7 @@ public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart,
@ApiResponse(code = 500, message = "Internal error")
})
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.CONTROLLER_SEGMENT_UPLOADS_IN_PROGRESS_COUNT)
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
// We use this endpoint with URI upload because a request sent with the multipart content type will reject the POST
// request if a multipart object is not sent. This endpoint is recommended for use. It differs from the first
// endpoint in how it moves the segment to a Pinot-determined final directory.
Expand Down Expand Up @@ -586,7 +586,7 @@ public void uploadSegmentAsJsonV2(String segmentJsonStr,
@ApiResponse(code = 500, message = "Internal error")
})
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.CONTROLLER_SEGMENT_UPLOADS_IN_PROGRESS_COUNT)
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
// This behavior does not differ from v1 of the same endpoint.
public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
@ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -65,7 +64,7 @@ public void testContainerRequestFilterIncrementsGauge()
when(_resourceInfo.getResourceMethod()).thenReturn(methodOne);
_interceptor.filter(_containerRequestContext);
verify(_controllerMetrics)
.addValueToGlobalGauge(ControllerGauge.CONTROLLER_SEGMENT_DOWNLOADS_IN_PROGRESS_COUNT, 1L);
.addValueToGlobalGauge(ControllerGauge.SEGMENT_DOWNLOADS_IN_PROGRESS, 1L);
}

@Test
Expand All @@ -75,7 +74,7 @@ public void testWriterInterceptorDecrementsGauge()
when(_resourceInfo.getResourceMethod()).thenReturn(methodOne);
_interceptor.aroundWriteTo(_writerInterceptorContext);
verify(_controllerMetrics)
.addValueToGlobalGauge(ControllerGauge.CONTROLLER_SEGMENT_DOWNLOADS_IN_PROGRESS_COUNT, -1L);
.addValueToGlobalGauge(ControllerGauge.SEGMENT_DOWNLOADS_IN_PROGRESS, -1L);
}

@Test(expectedExceptions = IOException.class)
Expand All @@ -87,13 +86,13 @@ public void testWriterInterceptorDecrementsGaugeWhenWriterThrowsException()
try {
_interceptor.aroundWriteTo(_writerInterceptorContext);
} finally {
verify(_controllerMetrics).addValueToGlobalGauge(ControllerGauge.CONTROLLER_SEGMENT_DOWNLOADS_IN_PROGRESS_COUNT, -1L);
verify(_controllerMetrics).addValueToGlobalGauge(ControllerGauge.SEGMENT_DOWNLOADS_IN_PROGRESS, -1L);
}
}

private static class TrackedClass {
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.CONTROLLER_SEGMENT_DOWNLOADS_IN_PROGRESS_COUNT)
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_DOWNLOADS_IN_PROGRESS)
public void trackedMethod() {
}
}
Expand Down

0 comments on commit e5faf6f

Please sign in to comment.