Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Consistent Data Push for Standalone Segment Push Job Runners #9295

Merged
merged 11 commits into from
Sep 7, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@
*/
package org.apache.pinot.common.utils;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import org.apache.commons.lang.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
Expand Down Expand Up @@ -57,6 +62,8 @@
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.StringUtil;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -747,6 +754,72 @@ public SimpleHttpResponse uploadSegment(URI uri, String segmentName, InputStream
return uploadSegment(uri, segmentName, inputStream, null, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
}

/**
* Returns a map from a given tableType to a list of segments for that given tableType (OFFLINE or REALTIME)
* If tableType is left unspecified, both OFFLINE and REALTIME segments will be returned in the map.
*/
public Map<String, List<String>> getSegments(URI controllerUri, String rawTableName, @Nullable TableType tableType,
boolean excludeReplacedSegments) throws Exception {
List<String> tableTypes;
if (tableType == null) {
tableTypes = Arrays.asList(TableType.OFFLINE.toString(), TableType.REALTIME.toString());
} else {
tableTypes = Arrays.asList(tableType.toString());
}
ControllerRequestURLBuilder controllerRequestURLBuilder =
ControllerRequestURLBuilder.baseUrl(controllerUri.toString());
Map<String, List<String>> tableTypeToSegments = new HashMap<>();
for (String tableTypeToFilter : tableTypes) {
tableTypeToSegments.put(tableTypeToFilter, new ArrayList<>());
String uri = controllerRequestURLBuilder.forSegmentListAPI(rawTableName,
tableTypeToFilter, excludeReplacedSegments);
RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
HttpClient.setTimeout(requestBuilder, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
RetryPolicies.exponentialBackoffRetryPolicy(5, 10_000L, 2.0).attempt(() -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for this..

try {
SimpleHttpResponse response =
HttpClient.wrapAndThrowHttpException(_httpClient.sendRequest(requestBuilder.build()));
LOGGER.info("Response {}: {} received for GET request to URI: {}", response.getStatusCode(),
response.getResponse(), uri);
tableTypeToSegments.put(tableTypeToFilter,
getSegmentNamesFromResponse(tableTypeToFilter, response.getResponse()));
return true;
} catch (SocketTimeoutException se) {
// In case of the timeout, we should re-try.
return false;
} catch (HttpErrorStatusException e) {
if (e.getStatusCode() < 500) {
if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
LOGGER.error("Segments not found for table {} when sending request uri: {}", rawTableName, uri);
}
}
return false;
}
});
}
return tableTypeToSegments;
}

private List<String> getSegmentNamesFromResponse(String tableType, String responseString)
throws IOException {
List<String> segments = new ArrayList<>();
JsonNode responseJsonNode = JsonUtils.stringToJsonNode(responseString);
Iterator<JsonNode> responseElements = responseJsonNode.elements();
while (responseElements.hasNext()) {
JsonNode responseElementJsonNode = responseElements.next();
if (!responseElementJsonNode.has(tableType)) {
continue;
}
JsonNode jsonArray = responseElementJsonNode.get(tableType);
Iterator<JsonNode> elements = jsonArray.elements();
while (elements.hasNext()) {
JsonNode segmentJsonNode = elements.next();
segments.add(segmentJsonNode.asText());
}
}
return segments;
}

/**
* Used by controllers to send requests to servers:
* Controller periodic task uses this endpoint to ask servers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ public URI flush()
batchConfigMapOverride.put(BatchConfigProperties.OUTPUT_DIR_URI, segmentDir.getAbsolutePath());
batchConfigMapOverride.put(BatchConfigProperties.INPUT_FORMAT, BUFFER_FILE_FORMAT.toString());
BatchIngestionConfig batchIngestionConfig = new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
_batchIngestionConfig.getSegmentIngestionType(), _batchIngestionConfig.getSegmentIngestionFrequency());
_batchIngestionConfig.getSegmentIngestionType(), _batchIngestionConfig.getSegmentIngestionFrequency(),
_batchIngestionConfig.getConsistentDataPush());

// Build segment
SegmentGeneratorConfig segmentGeneratorConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,15 @@ public void testPinotSinkParallelWrite()
private int getNumSegments()
throws IOException {
String jsonOutputStr = sendGetRequest(
_controllerRequestURLBuilder.forSegmentListAPIWithTableType(OFFLINE_TABLE_NAME, TableType.OFFLINE.toString()));
_controllerRequestURLBuilder.forSegmentListAPI(OFFLINE_TABLE_NAME, TableType.OFFLINE.toString()));
JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
return array.get(0).get("OFFLINE").size();
}

private int getTotalNumDocs()
throws IOException {
String jsonOutputStr = sendGetRequest(
_controllerRequestURLBuilder.forSegmentListAPIWithTableType(OFFLINE_TABLE_NAME, TableType.OFFLINE.toString()));
_controllerRequestURLBuilder.forSegmentListAPI(OFFLINE_TABLE_NAME, TableType.OFFLINE.toString()));
JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
JsonNode segments = array.get(0).get("OFFLINE");
int totalDocCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void testSegmentListApi()
throws Exception {
{
String jsonOutputStr = sendGetRequest(
_controllerRequestURLBuilder.forSegmentListAPIWithTableType(getTableName(), TableType.OFFLINE.toString()));
_controllerRequestURLBuilder.forSegmentListAPI(getTableName(), TableType.OFFLINE.toString()));
JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
// There should be one element in the array
JsonNode element = array.get(0);
Expand All @@ -149,7 +149,7 @@ public void testSegmentListApi()
}
{
String jsonOutputStr = sendGetRequest(
_controllerRequestURLBuilder.forSegmentListAPIWithTableType(getTableName(), TableType.REALTIME.toString()));
_controllerRequestURLBuilder.forSegmentListAPI(getTableName(), TableType.REALTIME.toString()));
JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
// There should be one element in the array
JsonNode element = array.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,28 @@
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.plugin.ingestion.batch.common.BaseSegmentPushJobRunner;
import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner;
import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -81,10 +89,15 @@ protected List<String> getBloomFilterColumns() {
return null;
}

@BeforeMethod
public void setUpTest()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is not actually necessary.

We can call TestUtils.ensureDirectoriesExistAndEmpty() function in setUp() and clean up those in cleanUp()

For 2 different tests, we can use different table names.

Copy link
Contributor Author

@yuanbenson yuanbenson Aug 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir) in setUp is insufficient as there is another test case testUploadAndQuery that leaves data in the tmp directories.

Using two table names will also lead to more code for now, since it seems like BaseClusterIntegrationTest is written in a way that assumes one table/config per subclass, e.g. getTableName, createOfflineTableConfig.

Hence, I prefer to keep the test structure as is with the before and after method annotations, unless this is an anti-pattern. Another way is to create a different test class altogether, but combining the two here to not prevent slow down from too many tests.

throws IOException {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
}

@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
// Start Zk and Kafka
startZk();

Expand Down Expand Up @@ -122,6 +135,7 @@ public void testUploadAndQuery()
jobSpec.setOutputDirURI(_tarDir.getAbsolutePath());
TableSpec tableSpec = new TableSpec();
tableSpec.setTableName(DEFAULT_TABLE_NAME);
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME));
jobSpec.setTableSpec(tableSpec);
PinotClusterSpec clusterSpec = new PinotClusterSpec();
clusterSpec.setControllerURI(_controllerBaseApiUrl);
Expand Down Expand Up @@ -188,6 +202,135 @@ public void testUploadAndQuery()
testCountStar(numDocs);
}

/**
* Runs both SegmentMetadataPushJobRunner and SegmentTarPushJobRunner while enabling consistent data push.
* Checks that segments are properly loaded and segment lineage entry were also in expected states.
*/
@Test
public void testUploadAndQueryWithConsistentPush()
throws Exception {
// Create and upload the schema and table config
Schema schema = createSchema();
addSchema(schema);
TableConfig offlineTableConfig = createOfflineTableConfigWithConsistentPush();
addTableConfig(offlineTableConfig);

List<File> avroFiles = getAllAvroFiles();

String firstTimeStamp = Long.toString(System.currentTimeMillis());

ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(0), offlineTableConfig, schema, firstTimeStamp,
_segmentDir, _tarDir);

// First test standalone metadata push job runner
BaseSegmentPushJobRunner runner = new SegmentMetadataPushJobRunner();
SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
PushJobSpec pushJobSpec = new PushJobSpec();
pushJobSpec.setCopyToDeepStoreForMetadataPush(true);
jobSpec.setPushJobSpec(pushJobSpec);
PinotFSSpec fsSpec = new PinotFSSpec();
fsSpec.setScheme("file");
fsSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS");
jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec));
jobSpec.setOutputDirURI(_tarDir.getAbsolutePath());
TableSpec tableSpec = new TableSpec();
tableSpec.setTableName(DEFAULT_TABLE_NAME);
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME));
jobSpec.setTableSpec(tableSpec);
PinotClusterSpec clusterSpec = new PinotClusterSpec();
clusterSpec.setControllerURI(_controllerBaseApiUrl);
jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec});

File dataDir = new File(_controllerConfig.getDataDir());
File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME);

Assert.assertEquals(_tarDir.listFiles().length, 1);

runner.init(jobSpec);
runner.run();

// Segment should be seen in dataDir
Assert.assertTrue(dataDirSegments.exists());
Assert.assertEquals(dataDirSegments.listFiles().length, 1);
Assert.assertEquals(_tarDir.listFiles().length, 1);

// test segment loaded
JsonNode segmentsList = getSegmentsList();
Assert.assertEquals(segmentsList.size(), 1);
String firstSegmentName = segmentsList.get(0).asText();
Assert.assertTrue(firstSegmentName.endsWith(firstTimeStamp));
long numDocs = getNumDocs(firstSegmentName);
testCountStar(numDocs);

// Fetch segment lineage entry after running segment metadata push with consistent push enabled.
String segmentLineageResponse = ControllerTest.sendGetRequest(
ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl)
.forListAllSegmentLineages(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString()));
// Segment lineage should be in completed state.
Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\""));
// SegmentsFrom should be empty as we started with a blank table.
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsFrom\":[]"));
// SegmentsTo should contain uploaded segment.
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsTo\":[\"" + firstSegmentName + "\"]"));

// Clear segment and tar dir
for (File segment : _segmentDir.listFiles()) {
FileUtils.deleteQuietly(segment);
}
for (File tar : _tarDir.listFiles()) {
FileUtils.deleteQuietly(tar);
}

String secondTimeStamp = Long.toString(System.currentTimeMillis());

ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(1), offlineTableConfig, schema, secondTimeStamp,
_segmentDir, _tarDir);
jobSpec.setPushJobSpec(new PushJobSpec());

// Now test standalone tar push job runner
runner = new SegmentTarPushJobRunner();

Assert.assertEquals(dataDirSegments.listFiles().length, 1);
Assert.assertEquals(_tarDir.listFiles().length, 1);

runner.init(jobSpec);
runner.run();

Assert.assertEquals(_tarDir.listFiles().length, 1);

// test segment loaded
segmentsList = getSegmentsList();
Assert.assertEquals(segmentsList.size(), 2);
String secondSegmentName = null;
for (JsonNode segment : segmentsList) {
if (segment.asText().endsWith(secondTimeStamp)) {
secondSegmentName = segment.asText();
}
}
Assert.assertNotNull(secondSegmentName);
numDocs = getNumDocs(secondSegmentName);
testCountStar(numDocs);

// Fetch segment lineage entry after running segment tar push with consistent push enabled.
segmentLineageResponse = ControllerTest.sendGetRequest(
ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl)
.forListAllSegmentLineages(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString()));
// Segment lineage should be in completed state.
Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\""));
// SegmentsFrom should contain the previous segment
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsFrom\":[\"" + firstSegmentName + "\"]"));
// SegmentsTo should contain uploaded segment.
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsTo\":[\"" + secondSegmentName + "\"]"));
}

protected TableConfig createOfflineTableConfigWithConsistentPush() {
TableConfig offlineTableConfig = createOfflineTableConfig();
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, "REFRESH", "DAILY", true));
offlineTableConfig.setIngestionConfig(ingestionConfig);
return offlineTableConfig;
}

private long getNumDocs(String segmentName)
throws IOException {
return JsonUtils.stringToJsonNode(
Expand All @@ -198,8 +341,7 @@ private long getNumDocs(String segmentName)
private JsonNode getSegmentsList()
throws IOException {
return JsonUtils.stringToJsonNode(sendGetRequest(
_controllerRequestURLBuilder.forSegmentListAPIWithTableType(DEFAULT_TABLE_NAME,
TableType.OFFLINE.toString())))
_controllerRequestURLBuilder.forSegmentListAPI(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString())))
.get(0).get("OFFLINE");
}

Expand All @@ -217,10 +359,15 @@ public Boolean apply(@Nullable Void aVoid) {
}, 100L, 300_000, "Failed to load " + countStarResult + " documents", true);
}

@AfterMethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also not necessary if we use different table names.

public void tearDownTest()
throws IOException {
dropOfflineTable(getTableName());
}

@AfterClass
public void tearDown()
throws Exception {
dropOfflineTable(getTableName());
stopServer();
stopBroker();
stopController();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void testFileBasedSegmentWriterAndDefaultUploader()
private int getNumSegments()
throws IOException {
String jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder.
forSegmentListAPIWithTableType(_tableNameWithType, TableType.OFFLINE.toString()));
forSegmentListAPI(_tableNameWithType, TableType.OFFLINE.toString()));
JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
return array.get(0).get("OFFLINE").size();
}
Expand All @@ -169,7 +169,7 @@ private int getTotalDocsFromQuery()
private int getNumDocsInLatestSegment()
throws IOException {
String jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder.
forSegmentListAPIWithTableType(_tableNameWithType, TableType.OFFLINE.toString()));
forSegmentListAPI(_tableNameWithType, TableType.OFFLINE.toString()));
JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
JsonNode segments = array.get(0).get("OFFLINE");
String segmentName = segments.get(segments.size() - 1).asText();
Expand Down
Loading