Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Segment Lineage List API #9005 #9006

Merged
merged 1 commit into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
*/
package org.apache.pinot.common.lineage;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.ZNRecord;
import org.apache.pinot.spi.utils.JsonUtils;


/**
Expand Down Expand Up @@ -146,4 +150,19 @@ public ZNRecord toZNRecord() {
}
return znRecord;
}

/**
* Returns a json representation of the segment lineage.
* Segment lineage entries are sorted in chronological order by default.
*/
public ObjectNode toJsonObject() {
ObjectNode jsonObject = JsonUtils.newObjectNode();
jsonObject.put("tableNameWithType", _tableNameWithType);
LinkedHashMap<String, LineageEntry> sortedLineageEntries = new LinkedHashMap<>();
_lineageEntries.entrySet().stream()
.sorted(Map.Entry.comparingByValue(Comparator.comparingLong(LineageEntry::getTimestamp)))
.forEachOrdered(x -> sortedLineageEntries.put(x.getKey(), x.getValue()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snleee fyi ensures resulting entries of listing segment lineage REST API are in chronological order here.

Copy link
Contributor

@walterddr walterddr Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO sorting is not necessary. at least we shouldn't do this in the toJsonObject path. what do you think we add the timestamp processing later?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline. We agreed to file the issue and address it from the separate PR.

jsonObject.set("lineageEntries", JsonUtils.objectToJsonNode(sortedLineageEntries));
return jsonObject;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.SegmentName;
Expand Down Expand Up @@ -219,6 +220,36 @@ public List<Map<String, Object>> getServerToSegmentsMap(
return resultList;
}

@GET
@Path("segments/{tableName}/lineage")
@Authenticate(AccessType.READ)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "List segment lineage", notes = "List segment lineage in chronologically sorted order")
public Response listSegmentLineage(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr) {
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
Response.Status.BAD_REQUEST);
}
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
try {
Response.ResponseBuilder builder = Response.ok();
SegmentLineage segmentLineage = _pinotHelixResourceManager.listSegmentLineage(tableNameWithType);
if (segmentLineage != null) {
builder.entity(segmentLineage.toJsonObject());
}
return builder.build();
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER,
String.format("Exception while listing segment lineage: %s for table: %s.", e.getMessage(),
tableNameWithType),
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}

@Deprecated
@GET
@Path("tables/{tableName}/segments")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3274,6 +3274,15 @@ public void endReplaceSegments(String tableNameWithType, String segmentLineageEn
tableNameWithType, segmentLineageEntryId);
}

/**
* List the segment lineage
*
* @param tableNameWithType
*/
public SegmentLineage listSegmentLineage(String tableNameWithType) {
return SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, tableNameWithType);
}

/**
* Revert the segment replacement
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.pinot.controller.api;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
Expand All @@ -37,13 +40,83 @@
public class PinotSegmentRestletResourceTest {
private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance();
private static final String TABLE_NAME = "pinotSegmentRestletResourceTestTable";
private static final String TABLE_NAME_OFFLINE = TABLE_NAME + "_OFFLINE";

@BeforeClass
public void setUp()
throws Exception {
TEST_INSTANCE.setupSharedStateAndValidate();
}

@Test
public void testListSegmentLineage()
throws Exception {
// Adding table
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(1).build();
TEST_INSTANCE.getHelixResourceManager().addTable(tableConfig);

// Wait for the table addition
while (!TEST_INSTANCE.getHelixResourceManager().hasOfflineTable(TABLE_NAME)) {
Thread.sleep(100);
}

Map<String, SegmentMetadata> segmentMetadataTable = new HashMap<>();

// Upload Segments
for (int i = 0; i < 4; i++) {
SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME, "s" + i);
TEST_INSTANCE.getHelixResourceManager()
.addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), segmentMetadata, "downloadUrl");
segmentMetadataTable.put(segmentMetadata.getName(), segmentMetadata);
}

// There should be no segment lineage at this point.
String segmentLineageResponse = ControllerTest.sendGetRequest(TEST_INSTANCE.getControllerRequestURLBuilder()
.forListAllSegmentLineages(TABLE_NAME, TableType.OFFLINE.toString()));
Assert.assertEquals(segmentLineageResponse, "");

// Now starts to replace segments.
List<String> segmentsFrom = Arrays.asList("s0", "s1");
List<String> segmentsTo = Arrays.asList("some_segment");
String segmentLineageId = TEST_INSTANCE.getHelixResourceManager()
.startReplaceSegments(TABLE_NAME_OFFLINE, segmentsFrom, segmentsTo, false);

// Replace more segments to add another entry to segment lineage.
segmentsFrom = Arrays.asList("s2", "s3");
segmentsTo = Arrays.asList("another_segment");
String nextSegmentLineageId = TEST_INSTANCE.getHelixResourceManager()
.startReplaceSegments(TABLE_NAME_OFFLINE, segmentsFrom, segmentsTo, false);

// There should now be two segment lineage entries resulting from the operations above.
segmentLineageResponse = ControllerTest.sendGetRequest(TEST_INSTANCE.getControllerRequestURLBuilder()
.forListAllSegmentLineages(TABLE_NAME, TableType.OFFLINE.toString()));
Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"IN_PROGRESS\""));
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsFrom\":[\"s0\",\"s1\"]"));
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsTo\":[\"some_segment\"]"));
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsFrom\":[\"s2\",\"s3\"]"));
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsTo\":[\"another_segment\"]"));
// Ensures the two entries are sorted in chronological order by timestamp.
Assert.assertTrue(
segmentLineageResponse.indexOf(segmentLineageId) < segmentLineageResponse.indexOf(nextSegmentLineageId));

// List segment lineage should fail for non-existing table
Assert.assertThrows(IOException.class, () -> ControllerTest.sendGetRequest(
TEST_INSTANCE.getControllerRequestURLBuilder()
.forListAllSegmentLineages("non-existing-table", TableType.OFFLINE.toString())));

// List segment lineage should also fail for invalid table type.
Assert.assertThrows(IOException.class, () -> ControllerTest.sendGetRequest(
TEST_INSTANCE.getControllerRequestURLBuilder().forListAllSegmentLineages(TABLE_NAME, "invalid-type")));

// Delete segments
TEST_INSTANCE.getHelixResourceManager().deleteSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
segmentMetadataTable.values().iterator().next().getName());

// Delete offline table
TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(TABLE_NAME);
}

@Test
public void testSegmentCrcApi()
throws Exception {
Expand Down Expand Up @@ -81,12 +154,11 @@ public void testSegmentCrcApi()
Assert.assertEquals(fetchedMetadata.get("segment.download.url"), "downloadUrl");

// use table name with table type
resp = ControllerTest.sendGetRequest(TEST_INSTANCE.getControllerRequestURLBuilder()
.forSegmentMetadata(TABLE_NAME + "_OFFLINE", entry.getKey()));
resp = ControllerTest.sendGetRequest(
TEST_INSTANCE.getControllerRequestURLBuilder().forSegmentMetadata(TABLE_NAME + "_OFFLINE", entry.getKey()));
fetchedMetadata = JsonUtils.stringToObject(resp, Map.class);
Assert.assertEquals(fetchedMetadata.get("segment.download.url"), "downloadUrl");


// Add more segments
for (int i = 0; i < 5; i++) {
SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME);
Expand All @@ -104,6 +176,9 @@ public void testSegmentCrcApi()

// Check crc api
checkCrcRequest(segmentMetadataTable, 9);

// Delete offline table
TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(TABLE_NAME);
}

private void checkCrcRequest(Map<String, SegmentMetadata> metadataTable, int expectedSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ public String forSegmentMetadata(String tableName, String segmentName) {
return StringUtil.join("/", _baseUrl, "segments", tableName, encode(segmentName), "metadata");
}

public String forListAllSegmentLineages(String tableName, String tableType) {
return StringUtil.join("/", _baseUrl, "segments", tableName, "lineage?type=" + tableType);
}

public String forListAllCrcInformationForTable(String tableName) {
return StringUtil.join("/", _baseUrl, "tables", tableName, "segments", "crc");
}
Expand Down