Skip to content

Commit

Permalink
Add Segment Lineage List API #9005 (#9006)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanbenson authored Jul 13, 2022
1 parent 8e7ca65 commit 7eec296
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
*/
package org.apache.pinot.common.lineage;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.ZNRecord;
import org.apache.pinot.spi.utils.JsonUtils;


/**
Expand Down Expand Up @@ -146,4 +150,19 @@ public ZNRecord toZNRecord() {
}
return znRecord;
}

/**
* Returns a json representation of the segment lineage.
* Segment lineage entries are sorted in chronological order by default.
*/
public ObjectNode toJsonObject() {
ObjectNode jsonObject = JsonUtils.newObjectNode();
jsonObject.put("tableNameWithType", _tableNameWithType);
LinkedHashMap<String, LineageEntry> sortedLineageEntries = new LinkedHashMap<>();
_lineageEntries.entrySet().stream()
.sorted(Map.Entry.comparingByValue(Comparator.comparingLong(LineageEntry::getTimestamp)))
.forEachOrdered(x -> sortedLineageEntries.put(x.getKey(), x.getValue()));
jsonObject.set("lineageEntries", JsonUtils.objectToJsonNode(sortedLineageEntries));
return jsonObject;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.SegmentName;
Expand Down Expand Up @@ -222,6 +223,36 @@ public List<Map<String, Object>> getServerToSegmentsMap(
return resultList;
}

@GET
@Path("segments/{tableName}/lineage")
@Authenticate(AccessType.READ)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "List segment lineage", notes = "List segment lineage in chronologically sorted order")
public Response listSegmentLineage(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr) {
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
Response.Status.BAD_REQUEST);
}
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
try {
Response.ResponseBuilder builder = Response.ok();
SegmentLineage segmentLineage = _pinotHelixResourceManager.listSegmentLineage(tableNameWithType);
if (segmentLineage != null) {
builder.entity(segmentLineage.toJsonObject());
}
return builder.build();
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER,
String.format("Exception while listing segment lineage: %s for table: %s.", e.getMessage(),
tableNameWithType),
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}

@Deprecated
@GET
@Path("tables/{tableName}/segments")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3295,6 +3295,15 @@ public void endReplaceSegments(String tableNameWithType, String segmentLineageEn
tableNameWithType, segmentLineageEntryId);
}

/**
* List the segment lineage
*
* @param tableNameWithType
*/
public SegmentLineage listSegmentLineage(String tableNameWithType) {
return SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, tableNameWithType);
}

/**
* Revert the segment replacement
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.pinot.controller.api;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
Expand All @@ -37,13 +40,83 @@
public class PinotSegmentRestletResourceTest {
private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance();
private static final String TABLE_NAME = "pinotSegmentRestletResourceTestTable";
private static final String TABLE_NAME_OFFLINE = TABLE_NAME + "_OFFLINE";

@BeforeClass
public void setUp()
throws Exception {
TEST_INSTANCE.setupSharedStateAndValidate();
}

@Test
public void testListSegmentLineage()
throws Exception {
// Adding table
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(1).build();
TEST_INSTANCE.getHelixResourceManager().addTable(tableConfig);

// Wait for the table addition
while (!TEST_INSTANCE.getHelixResourceManager().hasOfflineTable(TABLE_NAME)) {
Thread.sleep(100);
}

Map<String, SegmentMetadata> segmentMetadataTable = new HashMap<>();

// Upload Segments
for (int i = 0; i < 4; i++) {
SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME, "s" + i);
TEST_INSTANCE.getHelixResourceManager()
.addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), segmentMetadata, "downloadUrl");
segmentMetadataTable.put(segmentMetadata.getName(), segmentMetadata);
}

// There should be no segment lineage at this point.
String segmentLineageResponse = ControllerTest.sendGetRequest(TEST_INSTANCE.getControllerRequestURLBuilder()
.forListAllSegmentLineages(TABLE_NAME, TableType.OFFLINE.toString()));
Assert.assertEquals(segmentLineageResponse, "");

// Now starts to replace segments.
List<String> segmentsFrom = Arrays.asList("s0", "s1");
List<String> segmentsTo = Arrays.asList("some_segment");
String segmentLineageId = TEST_INSTANCE.getHelixResourceManager()
.startReplaceSegments(TABLE_NAME_OFFLINE, segmentsFrom, segmentsTo, false);

// Replace more segments to add another entry to segment lineage.
segmentsFrom = Arrays.asList("s2", "s3");
segmentsTo = Arrays.asList("another_segment");
String nextSegmentLineageId = TEST_INSTANCE.getHelixResourceManager()
.startReplaceSegments(TABLE_NAME_OFFLINE, segmentsFrom, segmentsTo, false);

// There should now be two segment lineage entries resulting from the operations above.
segmentLineageResponse = ControllerTest.sendGetRequest(TEST_INSTANCE.getControllerRequestURLBuilder()
.forListAllSegmentLineages(TABLE_NAME, TableType.OFFLINE.toString()));
Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"IN_PROGRESS\""));
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsFrom\":[\"s0\",\"s1\"]"));
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsTo\":[\"some_segment\"]"));
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsFrom\":[\"s2\",\"s3\"]"));
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsTo\":[\"another_segment\"]"));
// Ensures the two entries are sorted in chronological order by timestamp.
Assert.assertTrue(
segmentLineageResponse.indexOf(segmentLineageId) < segmentLineageResponse.indexOf(nextSegmentLineageId));

// List segment lineage should fail for non-existing table
Assert.assertThrows(IOException.class, () -> ControllerTest.sendGetRequest(
TEST_INSTANCE.getControllerRequestURLBuilder()
.forListAllSegmentLineages("non-existing-table", TableType.OFFLINE.toString())));

// List segment lineage should also fail for invalid table type.
Assert.assertThrows(IOException.class, () -> ControllerTest.sendGetRequest(
TEST_INSTANCE.getControllerRequestURLBuilder().forListAllSegmentLineages(TABLE_NAME, "invalid-type")));

// Delete segments
TEST_INSTANCE.getHelixResourceManager().deleteSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
segmentMetadataTable.values().iterator().next().getName());

// Delete offline table
TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(TABLE_NAME);
}

@Test
public void testSegmentCrcApi()
throws Exception {
Expand Down Expand Up @@ -81,12 +154,11 @@ public void testSegmentCrcApi()
Assert.assertEquals(fetchedMetadata.get("segment.download.url"), "downloadUrl");

// use table name with table type
resp = ControllerTest.sendGetRequest(TEST_INSTANCE.getControllerRequestURLBuilder()
.forSegmentMetadata(TABLE_NAME + "_OFFLINE", entry.getKey()));
resp = ControllerTest.sendGetRequest(
TEST_INSTANCE.getControllerRequestURLBuilder().forSegmentMetadata(TABLE_NAME + "_OFFLINE", entry.getKey()));
fetchedMetadata = JsonUtils.stringToObject(resp, Map.class);
Assert.assertEquals(fetchedMetadata.get("segment.download.url"), "downloadUrl");


// Add more segments
for (int i = 0; i < 5; i++) {
SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME);
Expand All @@ -104,6 +176,9 @@ public void testSegmentCrcApi()

// Check crc api
checkCrcRequest(segmentMetadataTable, 9);

// Delete offline table
TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(TABLE_NAME);
}

private void checkCrcRequest(Map<String, SegmentMetadata> metadataTable, int expectedSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ public String forSegmentMetadata(String tableName, String segmentName) {
return StringUtil.join("/", _baseUrl, "segments", tableName, encode(segmentName), "metadata");
}

public String forListAllSegmentLineages(String tableName, String tableType) {
return StringUtil.join("/", _baseUrl, "segments", tableName, "lineage?type=" + tableType);
}

public String forListAllCrcInformationForTable(String tableName) {
return StringUtil.join("/", _baseUrl, "tables", tableName, "segments", "crc");
}
Expand Down

0 comments on commit 7eec296

Please sign in to comment.