-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable Consistent Data Push for Standalone Segment Push Job Runners #9295
Changes from all commits
c0fd3d7
eb2f0a2
c5bc653
4c59b8a
ec72fc9
41ee12d
5267dec
4746763
b09d747
d6ca192
f3aa9fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,20 +27,28 @@ | |
import java.util.Map; | ||
import javax.annotation.Nullable; | ||
import org.apache.commons.io.FileUtils; | ||
import org.apache.pinot.controller.helix.ControllerTest; | ||
import org.apache.pinot.plugin.ingestion.batch.common.BaseSegmentPushJobRunner; | ||
import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner; | ||
import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner; | ||
import org.apache.pinot.spi.config.table.TableConfig; | ||
import org.apache.pinot.spi.config.table.TableType; | ||
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; | ||
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; | ||
import org.apache.pinot.spi.data.Schema; | ||
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec; | ||
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec; | ||
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec; | ||
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; | ||
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec; | ||
import org.apache.pinot.spi.utils.JsonUtils; | ||
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; | ||
import org.apache.pinot.util.TestUtils; | ||
import org.testng.Assert; | ||
import org.testng.annotations.AfterClass; | ||
import org.testng.annotations.AfterMethod; | ||
import org.testng.annotations.BeforeClass; | ||
import org.testng.annotations.BeforeMethod; | ||
import org.testng.annotations.Test; | ||
|
||
|
||
|
@@ -81,10 +89,15 @@ protected List<String> getBloomFilterColumns() { | |
return null; | ||
} | ||
|
||
@BeforeMethod | ||
public void setUpTest() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that this is not actually necessary. We can call For 2 different tests, we can use different table names. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Calling Using two table names will also lead to more code for now, since it seems like Hence, I prefer to keep the test structure as is with the before and after method annotations, unless this is an anti-pattern. Another way is to create a different test class altogether, but combining the two here to not prevent slow down from too many tests. |
||
throws IOException { | ||
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); | ||
} | ||
|
||
@BeforeClass | ||
public void setUp() | ||
throws Exception { | ||
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); | ||
// Start Zk and Kafka | ||
startZk(); | ||
|
||
|
@@ -122,6 +135,7 @@ public void testUploadAndQuery() | |
jobSpec.setOutputDirURI(_tarDir.getAbsolutePath()); | ||
TableSpec tableSpec = new TableSpec(); | ||
tableSpec.setTableName(DEFAULT_TABLE_NAME); | ||
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME)); | ||
jobSpec.setTableSpec(tableSpec); | ||
PinotClusterSpec clusterSpec = new PinotClusterSpec(); | ||
clusterSpec.setControllerURI(_controllerBaseApiUrl); | ||
|
@@ -188,6 +202,135 @@ public void testUploadAndQuery() | |
testCountStar(numDocs); | ||
} | ||
|
||
/** | ||
* Runs both SegmentMetadataPushJobRunner and SegmentTarPushJobRunner while enabling consistent data push. | ||
* Checks that segments are properly loaded and segment lineage entry were also in expected states. | ||
*/ | ||
@Test | ||
public void testUploadAndQueryWithConsistentPush() | ||
throws Exception { | ||
// Create and upload the schema and table config | ||
Schema schema = createSchema(); | ||
addSchema(schema); | ||
TableConfig offlineTableConfig = createOfflineTableConfigWithConsistentPush(); | ||
addTableConfig(offlineTableConfig); | ||
|
||
List<File> avroFiles = getAllAvroFiles(); | ||
|
||
String firstTimeStamp = Long.toString(System.currentTimeMillis()); | ||
|
||
ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(0), offlineTableConfig, schema, firstTimeStamp, | ||
_segmentDir, _tarDir); | ||
|
||
// First test standalone metadata push job runner | ||
BaseSegmentPushJobRunner runner = new SegmentMetadataPushJobRunner(); | ||
SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec(); | ||
PushJobSpec pushJobSpec = new PushJobSpec(); | ||
pushJobSpec.setCopyToDeepStoreForMetadataPush(true); | ||
jobSpec.setPushJobSpec(pushJobSpec); | ||
PinotFSSpec fsSpec = new PinotFSSpec(); | ||
fsSpec.setScheme("file"); | ||
fsSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS"); | ||
jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec)); | ||
jobSpec.setOutputDirURI(_tarDir.getAbsolutePath()); | ||
TableSpec tableSpec = new TableSpec(); | ||
tableSpec.setTableName(DEFAULT_TABLE_NAME); | ||
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME)); | ||
jobSpec.setTableSpec(tableSpec); | ||
PinotClusterSpec clusterSpec = new PinotClusterSpec(); | ||
clusterSpec.setControllerURI(_controllerBaseApiUrl); | ||
jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec}); | ||
|
||
File dataDir = new File(_controllerConfig.getDataDir()); | ||
File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME); | ||
|
||
Assert.assertEquals(_tarDir.listFiles().length, 1); | ||
|
||
runner.init(jobSpec); | ||
runner.run(); | ||
|
||
// Segment should be seen in dataDir | ||
Assert.assertTrue(dataDirSegments.exists()); | ||
Assert.assertEquals(dataDirSegments.listFiles().length, 1); | ||
Assert.assertEquals(_tarDir.listFiles().length, 1); | ||
|
||
// test segment loaded | ||
JsonNode segmentsList = getSegmentsList(); | ||
Assert.assertEquals(segmentsList.size(), 1); | ||
String firstSegmentName = segmentsList.get(0).asText(); | ||
Assert.assertTrue(firstSegmentName.endsWith(firstTimeStamp)); | ||
long numDocs = getNumDocs(firstSegmentName); | ||
testCountStar(numDocs); | ||
|
||
// Fetch segment lineage entry after running segment metadata push with consistent push enabled. | ||
String segmentLineageResponse = ControllerTest.sendGetRequest( | ||
ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl) | ||
.forListAllSegmentLineages(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString())); | ||
// Segment lineage should be in completed state. | ||
Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\"")); | ||
// SegmentsFrom should be empty as we started with a blank table. | ||
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsFrom\":[]")); | ||
// SegmentsTo should contain uploaded segment. | ||
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsTo\":[\"" + firstSegmentName + "\"]")); | ||
|
||
// Clear segment and tar dir | ||
for (File segment : _segmentDir.listFiles()) { | ||
FileUtils.deleteQuietly(segment); | ||
} | ||
for (File tar : _tarDir.listFiles()) { | ||
FileUtils.deleteQuietly(tar); | ||
} | ||
|
||
String secondTimeStamp = Long.toString(System.currentTimeMillis()); | ||
|
||
ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(1), offlineTableConfig, schema, secondTimeStamp, | ||
_segmentDir, _tarDir); | ||
jobSpec.setPushJobSpec(new PushJobSpec()); | ||
|
||
// Now test standalone tar push job runner | ||
runner = new SegmentTarPushJobRunner(); | ||
|
||
Assert.assertEquals(dataDirSegments.listFiles().length, 1); | ||
Assert.assertEquals(_tarDir.listFiles().length, 1); | ||
|
||
runner.init(jobSpec); | ||
runner.run(); | ||
|
||
Assert.assertEquals(_tarDir.listFiles().length, 1); | ||
|
||
// test segment loaded | ||
segmentsList = getSegmentsList(); | ||
Assert.assertEquals(segmentsList.size(), 2); | ||
String secondSegmentName = null; | ||
for (JsonNode segment : segmentsList) { | ||
if (segment.asText().endsWith(secondTimeStamp)) { | ||
secondSegmentName = segment.asText(); | ||
} | ||
} | ||
Assert.assertNotNull(secondSegmentName); | ||
numDocs = getNumDocs(secondSegmentName); | ||
testCountStar(numDocs); | ||
|
||
// Fetch segment lineage entry after running segment tar push with consistent push enabled. | ||
segmentLineageResponse = ControllerTest.sendGetRequest( | ||
ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl) | ||
.forListAllSegmentLineages(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString())); | ||
// Segment lineage should be in completed state. | ||
Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\"")); | ||
// SegmentsFrom should contain the previous segment | ||
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsFrom\":[\"" + firstSegmentName + "\"]")); | ||
// SegmentsTo should contain uploaded segment. | ||
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsTo\":[\"" + secondSegmentName + "\"]")); | ||
} | ||
|
||
protected TableConfig createOfflineTableConfigWithConsistentPush() { | ||
TableConfig offlineTableConfig = createOfflineTableConfig(); | ||
IngestionConfig ingestionConfig = new IngestionConfig(); | ||
ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, "REFRESH", "DAILY", true)); | ||
offlineTableConfig.setIngestionConfig(ingestionConfig); | ||
return offlineTableConfig; | ||
} | ||
|
||
private long getNumDocs(String segmentName) | ||
throws IOException { | ||
return JsonUtils.stringToJsonNode( | ||
|
@@ -198,8 +341,7 @@ private long getNumDocs(String segmentName) | |
private JsonNode getSegmentsList() | ||
throws IOException { | ||
return JsonUtils.stringToJsonNode(sendGetRequest( | ||
_controllerRequestURLBuilder.forSegmentListAPIWithTableType(DEFAULT_TABLE_NAME, | ||
TableType.OFFLINE.toString()))) | ||
_controllerRequestURLBuilder.forSegmentListAPI(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString()))) | ||
.get(0).get("OFFLINE"); | ||
} | ||
|
||
|
@@ -217,10 +359,15 @@ public Boolean apply(@Nullable Void aVoid) { | |
}, 100L, 300_000, "Failed to load " + countStarResult + " documents", true); | ||
} | ||
|
||
@AfterMethod | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is also not necessary if we use different table names. |
||
public void tearDownTest() | ||
throws IOException { | ||
dropOfflineTable(getTableName()); | ||
} | ||
|
||
@AfterClass | ||
public void tearDown() | ||
throws Exception { | ||
dropOfflineTable(getTableName()); | ||
stopServer(); | ||
stopBroker(); | ||
stopController(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for this..