diff --git a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java index e35d69063cbd..96095888a1f3 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java @@ -18,15 +18,19 @@ */ package org.apache.pinot.common.lineage; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.helix.ZNRecord; +import org.apache.pinot.spi.utils.JsonUtils; /** @@ -146,4 +150,19 @@ public ZNRecord toZNRecord() { } return znRecord; } + + /** + * Returns a json representation of the segment lineage. + * Segment lineage entries are sorted in chronological order by default. + */ + public ObjectNode toJsonObject() { + ObjectNode jsonObject = JsonUtils.newObjectNode(); + jsonObject.put("tableNameWithType", _tableNameWithType); + LinkedHashMap sortedLineageEntries = new LinkedHashMap<>(); + _lineageEntries.entrySet().stream() + .sorted(Map.Entry.comparingByValue(Comparator.comparingLong(LineageEntry::getTimestamp))) + .forEachOrdered(x -> sortedLineageEntries.put(x.getKey(), x.getValue())); + jsonObject.set("lineageEntries", JsonUtils.objectToJsonNode(sortedLineageEntries)); + return jsonObject; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index eb44d98846f0..09fc0d4a0c52 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -60,6 +60,7 @@ import org.apache.helix.ZNRecord; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.common.lineage.SegmentLineage; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.SegmentName; @@ -222,6 +223,36 @@ public List> getServerToSegmentsMap( return resultList; } + @GET + @Path("segments/{tableName}/lineage") + @Authenticate(AccessType.READ) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "List segment lineage", notes = "List segment lineage in chronologically sorted order") + public Response listSegmentLineage( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr) { + TableType tableType = Constants.validateTableType(tableTypeStr); + if (tableType == null) { + throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime", + Response.Status.BAD_REQUEST); + } + String tableNameWithType = + ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); + try { + Response.ResponseBuilder builder = Response.ok(); + SegmentLineage segmentLineage = _pinotHelixResourceManager.listSegmentLineage(tableNameWithType); + if (segmentLineage != null) { + builder.entity(segmentLineage.toJsonObject()); + } + return builder.build(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, + String.format("Exception while listing segment lineage: %s for table: %s.", e.getMessage(), + tableNameWithType), + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + @Deprecated @GET @Path("tables/{tableName}/segments") diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 970d27efb8e1..b1143a987354 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -3295,6 +3295,15 @@ public void endReplaceSegments(String tableNameWithType, String segmentLineageEn tableNameWithType, segmentLineageEntryId); } + /** + * List the segment lineage + * + * @param tableNameWithType + */ + public SegmentLineage listSegmentLineage(String tableNameWithType) { + return SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, tableNameWithType); + } + /** * Revert the segment replacement * diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java index 60be1f01c4c6..090d066eeda9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java @@ -18,7 +18,10 @@ */ package org.apache.pinot.controller.api; +import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.pinot.controller.helix.ControllerTest; import org.apache.pinot.controller.utils.SegmentMetadataMockUtils; @@ -37,6 +40,7 @@ public class PinotSegmentRestletResourceTest { private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance(); private static final String TABLE_NAME = "pinotSegmentRestletResourceTestTable"; + private static final String TABLE_NAME_OFFLINE = TABLE_NAME + "_OFFLINE"; @BeforeClass public void setUp() @@ -44,6 +48,75 @@ public void setUp() TEST_INSTANCE.setupSharedStateAndValidate(); } + @Test + public void testListSegmentLineage() + throws Exception { + // Adding table + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(1).build(); + TEST_INSTANCE.getHelixResourceManager().addTable(tableConfig); + + // Wait for the table addition + while (!TEST_INSTANCE.getHelixResourceManager().hasOfflineTable(TABLE_NAME)) { + Thread.sleep(100); + } + + Map segmentMetadataTable = new HashMap<>(); + + // Upload Segments + for (int i = 0; i < 4; i++) { + SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME, "s" + i); + TEST_INSTANCE.getHelixResourceManager() + .addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), segmentMetadata, "downloadUrl"); + segmentMetadataTable.put(segmentMetadata.getName(), segmentMetadata); + } + + // There should be no segment lineage at this point. + String segmentLineageResponse = ControllerTest.sendGetRequest(TEST_INSTANCE.getControllerRequestURLBuilder() + .forListAllSegmentLineages(TABLE_NAME, TableType.OFFLINE.toString())); + Assert.assertEquals(segmentLineageResponse, ""); + + // Now starts to replace segments. + List segmentsFrom = Arrays.asList("s0", "s1"); + List segmentsTo = Arrays.asList("some_segment"); + String segmentLineageId = TEST_INSTANCE.getHelixResourceManager() + .startReplaceSegments(TABLE_NAME_OFFLINE, segmentsFrom, segmentsTo, false); + + // Replace more segments to add another entry to segment lineage. + segmentsFrom = Arrays.asList("s2", "s3"); + segmentsTo = Arrays.asList("another_segment"); + String nextSegmentLineageId = TEST_INSTANCE.getHelixResourceManager() + .startReplaceSegments(TABLE_NAME_OFFLINE, segmentsFrom, segmentsTo, false); + + // There should now be two segment lineage entries resulting from the operations above. + segmentLineageResponse = ControllerTest.sendGetRequest(TEST_INSTANCE.getControllerRequestURLBuilder() + .forListAllSegmentLineages(TABLE_NAME, TableType.OFFLINE.toString())); + Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"IN_PROGRESS\"")); + Assert.assertTrue(segmentLineageResponse.contains("\"segmentsFrom\":[\"s0\",\"s1\"]")); + Assert.assertTrue(segmentLineageResponse.contains("\"segmentsTo\":[\"some_segment\"]")); + Assert.assertTrue(segmentLineageResponse.contains("\"segmentsFrom\":[\"s2\",\"s3\"]")); + Assert.assertTrue(segmentLineageResponse.contains("\"segmentsTo\":[\"another_segment\"]")); + // Ensures the two entries are sorted in chronological order by timestamp. + Assert.assertTrue( + segmentLineageResponse.indexOf(segmentLineageId) < segmentLineageResponse.indexOf(nextSegmentLineageId)); + + // List segment lineage should fail for non-existing table + Assert.assertThrows(IOException.class, () -> ControllerTest.sendGetRequest( + TEST_INSTANCE.getControllerRequestURLBuilder() + .forListAllSegmentLineages("non-existing-table", TableType.OFFLINE.toString()))); + + // List segment lineage should also fail for invalid table type. + Assert.assertThrows(IOException.class, () -> ControllerTest.sendGetRequest( + TEST_INSTANCE.getControllerRequestURLBuilder().forListAllSegmentLineages(TABLE_NAME, "invalid-type"))); + + // Delete segments + TEST_INSTANCE.getHelixResourceManager().deleteSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), + segmentMetadataTable.values().iterator().next().getName()); + + // Delete offline table + TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(TABLE_NAME); + } + @Test public void testSegmentCrcApi() throws Exception { @@ -81,12 +154,11 @@ public void testSegmentCrcApi() Assert.assertEquals(fetchedMetadata.get("segment.download.url"), "downloadUrl"); // use table name with table type - resp = ControllerTest.sendGetRequest(TEST_INSTANCE.getControllerRequestURLBuilder() - .forSegmentMetadata(TABLE_NAME + "_OFFLINE", entry.getKey())); + resp = ControllerTest.sendGetRequest( + TEST_INSTANCE.getControllerRequestURLBuilder().forSegmentMetadata(TABLE_NAME + "_OFFLINE", entry.getKey())); fetchedMetadata = JsonUtils.stringToObject(resp, Map.class); Assert.assertEquals(fetchedMetadata.get("segment.download.url"), "downloadUrl"); - // Add more segments for (int i = 0; i < 5; i++) { SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME); @@ -104,6 +176,9 @@ public void testSegmentCrcApi() // Check crc api checkCrcRequest(segmentMetadataTable, 9); + + // Delete offline table + TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(TABLE_NAME); } private void checkCrcRequest(Map metadataTable, int expectedSize) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index c74be58e68ed..882a4a802a89 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -344,6 +344,10 @@ public String forSegmentMetadata(String tableName, String segmentName) { return StringUtil.join("/", _baseUrl, "segments", tableName, encode(segmentName), "metadata"); } + public String forListAllSegmentLineages(String tableName, String tableType) { + return StringUtil.join("/", _baseUrl, "segments", tableName, "lineage?type=" + tableType); + } + public String forListAllCrcInformationForTable(String tableName) { return StringUtil.join("/", _baseUrl, "tables", tableName, "segments", "crc"); }