From 1147723faf733ca32dd8d9ec79a22e5af308e541 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Tue, 18 Feb 2025 13:22:05 +0000 Subject: [PATCH] Use new helper in IngestJobRunnerIT --- .../ingest/runner/task/IngestJobRunnerIT.java | 65 +++++++++---------- 1 file changed, 30 insertions(+), 35 deletions(-) diff --git a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/task/IngestJobRunnerIT.java b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/task/IngestJobRunnerIT.java index eb94bae617..627ef77ab1 100644 --- a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/task/IngestJobRunnerIT.java +++ b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/task/IngestJobRunnerIT.java @@ -30,7 +30,6 @@ import sleeper.core.properties.table.TablePropertiesProvider; import sleeper.core.properties.testutils.FixedTablePropertiesProvider; import sleeper.core.record.Record; -import sleeper.core.schema.Schema; import sleeper.core.schema.type.LongType; import sleeper.core.statestore.AllReferencesToAFile; import sleeper.core.statestore.FileReference; @@ -40,6 +39,8 @@ import sleeper.core.statestore.commit.StateStoreCommitRequest; import sleeper.core.statestore.commit.StateStoreCommitRequestSerDe; import sleeper.core.statestore.testutils.FixedStateStoreProvider; +import sleeper.core.statestore.transactionlog.InMemoryTransactionLogStateStore; +import sleeper.core.statestore.transactionlog.InMemoryTransactionLogs; import sleeper.core.statestore.transactionlog.transaction.impl.AddFilesTransaction; import sleeper.core.tracker.ingest.job.InMemoryIngestJobTracker; import sleeper.core.tracker.ingest.job.IngestJobTracker; @@ -70,7 +71,7 @@ import static sleeper.core.properties.table.TableProperty.TABLE_ID; import static sleeper.core.properties.table.TableProperty.TABLE_NAME; import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; -import static sleeper.core.statestore.testutils.StateStoreTestHelper.inMemoryStateStoreWithFixedSinglePartition; +import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTablePropertiesWithNoSchema; import static sleeper.core.tracker.ingest.job.IngestJobStatusTestData.ingestAddedFilesStatus; import static sleeper.core.tracker.job.run.JobRunTestData.jobRunOnTask; import static sleeper.ingest.core.job.IngestJobStatusFromJobTestData.ingestJobStatus; @@ -80,6 +81,7 @@ class IngestJobRunnerIT extends LocalStackTestBase { private final InstanceProperties instanceProperties = createTestInstanceProperties(); + private final TableProperties tableProperties = createTestTablePropertiesWithNoSchema(instanceProperties); private final String tableName = "test-table"; private final String tableId = UUID.randomUUID().toString(); private final String dataBucketName = instanceProperties.get(DATA_BUCKET); @@ -94,6 +96,9 @@ public void before() throws IOException { createBucket(dataBucketName); createBucket(ingestSourceBucketName); instanceProperties.set(STATESTORE_COMMITTER_QUEUE_URL, createFifoQueueGetUrl()); + tableProperties.set(TABLE_NAME, tableName); + tableProperties.set(TABLE_ID, tableId); + tableProperties.set(INGEST_FILES_COMMIT_ASYNC, "false"); } @Test @@ -102,14 +107,15 @@ void shouldIngestParquetFiles() throws Exception { RecordGenerator.RecordListAndSchema recordListAndSchema = RecordGenerator.genericKey1D( new LongType(), LongStream.range(-5, 5).boxed().collect(Collectors.toList())); - StateStore stateStore = inMemoryStateStoreWithFixedSinglePartition(recordListAndSchema.sleeperSchema); + tableProperties.setSchema(recordListAndSchema.sleeperSchema); + StateStore stateStore = initialiseStateStore(); List files = writeParquetFilesForIngest(recordListAndSchema, "", 2); List doubledRecords = Stream.of(recordListAndSchema.recordList, recordListAndSchema.recordList) .flatMap(List::stream).collect(Collectors.toList()); // When - runIngestJob(stateStore, recordListAndSchema, files); + runIngestJob(stateStore, files); // Then List actualFiles = stateStore.getFileReferences(); @@ -130,6 +136,7 @@ void shouldIgnoreFilesOfUnreadableFormats() throws Exception { RecordGenerator.RecordListAndSchema recordListAndSchema = RecordGenerator.genericKey1D( new LongType(), LongStream.range(-100, 100).boxed().collect(Collectors.toList())); + tableProperties.setSchema(recordListAndSchema.sleeperSchema); List files = writeParquetFilesForIngest(recordListAndSchema, "", 1); URI uri1 = new URI("s3a://" + ingestSourceBucketName + "/file-1.crc"); FileSystem.get(uri1, hadoopConf).createNewFile(new Path(uri1)); @@ -137,10 +144,10 @@ void shouldIgnoreFilesOfUnreadableFormats() throws Exception { URI uri2 = new URI("s3a://" + ingestSourceBucketName + "/file-2.csv"); FileSystem.get(uri2, hadoopConf).createNewFile(new Path(uri2)); files.add(ingestSourceBucketName + "/file-2.csv"); - StateStore stateStore = inMemoryStateStoreWithFixedSinglePartition(recordListAndSchema.sleeperSchema); + StateStore stateStore = initialiseStateStore(); // When - runIngestJob(stateStore, recordListAndSchema, files); + runIngestJob(stateStore, files); // Then List actualFiles = stateStore.getFileReferences(); @@ -161,6 +168,7 @@ void shouldIngestParquetFilesInNestedDirectories() throws Exception { RecordGenerator.RecordListAndSchema recordListAndSchema = RecordGenerator.genericKey1D( new LongType(), LongStream.range(-5, 5).boxed().collect(Collectors.toList())); + tableProperties.setSchema(recordListAndSchema.sleeperSchema); int noOfTopLevelDirectories = 2; int noOfNestings = 4; int noOfFilesPerDirectory = 2; @@ -176,10 +184,10 @@ void shouldIngestParquetFilesInNestedDirectories() throws Exception { .flatMap(List::stream).collect(Collectors.toList()); List expectedRecords = Collections.nCopies(noOfTopLevelDirectories * noOfNestings * noOfFilesPerDirectory, recordListAndSchema.recordList).stream() .flatMap(List::stream).collect(Collectors.toList()); - StateStore stateStore = inMemoryStateStoreWithFixedSinglePartition(recordListAndSchema.sleeperSchema); + StateStore stateStore = initialiseStateStore(); // When - runIngestJob(stateStore, recordListAndSchema, files); + runIngestJob(stateStore, files); // Then List actualFiles = stateStore.getFileReferences(); @@ -203,6 +211,7 @@ void shouldWriteRecordsFromTwoBuckets() throws Exception { RecordGenerator.RecordListAndSchema records2 = RecordGenerator.genericKey1D( new LongType(), LongStream.range(10, 20).boxed().collect(Collectors.toList())); + tableProperties.setSchema(records1.sleeperSchema); writeParquetFileForIngest(new Path("s3a://" + dataBucketName + "/ingest/file1.parquet"), records1); writeParquetFileForIngest(new Path("s3a://" + ingestSourceBucketName + "/ingest/file2.parquet"), records2); @@ -214,12 +223,11 @@ void shouldWriteRecordsFromTwoBuckets() throws Exception { List expectedRecords = new ArrayList<>(); expectedRecords.addAll(records1.recordList); expectedRecords.addAll(records2.recordList); - TableProperties tableProperties = createTableProperties(records1.sleeperSchema); - StateStore stateStore = inMemoryStateStoreWithFixedSinglePartition(records1.sleeperSchema); + StateStore stateStore = initialiseStateStore(); fixTimes(Instant.parse("2024-06-20T15:33:01Z"), Instant.parse("2024-06-20T15:33:10Z")); // When - runIngestJob(tableProperties, stateStore, ingestJob); + runIngestJob(stateStore, ingestJob); // Then List actualFiles = stateStore.getFileReferences(); @@ -245,9 +253,9 @@ void shouldCommitFilesAsynchronously() throws Exception { RecordGenerator.RecordListAndSchema recordListAndSchema = RecordGenerator.genericKey1D( new LongType(), LongStream.range(-5, 5).boxed().collect(Collectors.toList())); - TableProperties tableProperties = createTableProperties(recordListAndSchema.sleeperSchema); + tableProperties.setSchema(recordListAndSchema.sleeperSchema); tableProperties.set(INGEST_FILES_COMMIT_ASYNC, "true"); - StateStore stateStore = inMemoryStateStoreWithFixedSinglePartition(recordListAndSchema.sleeperSchema); + StateStore stateStore = initialiseStateStore(); List files = writeParquetFilesForIngest(recordListAndSchema, "", 1); IngestJob job = IngestJob.builder() @@ -259,7 +267,7 @@ void shouldCommitFilesAsynchronously() throws Exception { fixTimes(Instant.parse("2024-06-20T15:10:00Z"), Instant.parse("2024-06-20T15:10:01Z")); // When - runIngestJob(tableProperties, stateStore, job); + runIngestJob(stateStore, job); // Then List commitRequests = getCommitRequestsFromQueue(tableProperties); @@ -302,30 +310,22 @@ private Stream streamFilesAdded(StateStoreCommitRequest commitReq private void runIngestJob( StateStore stateStore, - RecordGenerator.RecordListAndSchema recordListAndSchema, List files) throws Exception { - TableProperties tableProperties = createTableProperties(recordListAndSchema.sleeperSchema); IngestJob job = IngestJob.builder() .tableName(tableName) .tableId(tableProperties.get(TABLE_ID)) .id("id") .files(files) .build(); - runIngestJob(tableProperties, stateStore, job); + runIngestJob(stateStore, job); } - private void runIngestJob( - TableProperties tableProperties, - StateStore stateStore, - IngestJob job) throws Exception { + private void runIngestJob(StateStore stateStore, IngestJob job) throws Exception { tracker.jobStarted(job.startedEventBuilder(timeSupplier.get()).taskId("test-task").jobRunId("test-job-run").build()); - ingestJobRunner(instanceProperties, tableProperties, stateStore) - .ingest(job, "test-job-run"); + ingestJobRunner(stateStore).ingest(job, "test-job-run"); } - private IngestJobRunner ingestJobRunner(InstanceProperties instanceProperties, - TableProperties tableProperties, - StateStore stateStore) throws Exception { + private IngestJobRunner ingestJobRunner(StateStore stateStore) throws Exception { TablePropertiesProvider tablePropertiesProvider = new FixedTablePropertiesProvider(tableProperties); StateStoreProvider stateStoreProvider = new FixedStateStoreProvider(tableProperties, stateStore); return new IngestJobRunner( @@ -345,15 +345,6 @@ private void fixTimes(Instant... times) { timeSupplier = List.of(times).iterator()::next; } - private TableProperties createTableProperties(Schema schema) { - TableProperties tableProperties = new TableProperties(instanceProperties); - tableProperties.set(TABLE_NAME, tableName); - tableProperties.set(TABLE_ID, tableId); - tableProperties.set(INGEST_FILES_COMMIT_ASYNC, "false"); - tableProperties.setSchema(schema); - return tableProperties; - } - private List writeParquetFilesForIngest( RecordGenerator.RecordListAndSchema recordListAndSchema, String subDirectory, @@ -381,4 +372,8 @@ private void writeParquetFileForIngest( writer.close(); } + private StateStore initialiseStateStore() { + return InMemoryTransactionLogStateStore.createAndInitialise(tableProperties, new InMemoryTransactionLogs()); + } + }