From 97bc092b27bf80809e3d5fcdf0c7fcb412844d47 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Wed, 26 Feb 2025 12:14:34 +0000 Subject: [PATCH] Remove AddFilesTransaction.updateTrackerFromLog --- .../runner/BulkImportJobDriver.java | 14 ++++++------- .../runner/BulkImportJobDriverTest.java | 12 ++--------- .../transaction/impl/AddFilesTransaction.java | 21 +++---------------- 3 files changed, 12 insertions(+), 35 deletions(-) diff --git a/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/runner/BulkImportJobDriver.java b/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/runner/BulkImportJobDriver.java index 8adc33f6d2..5908dfb626 100644 --- a/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/runner/BulkImportJobDriver.java +++ b/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/runner/BulkImportJobDriver.java @@ -115,16 +115,16 @@ public void run(BulkImportJob job, String jobRunId, String taskId) throws IOExce Instant finishTime = getTime.get(); boolean asyncCommit = tableProperties.getBoolean(BULK_IMPORT_FILES_COMMIT_ASYNC); try { - AddFilesTransaction transaction = AddFilesTransaction.builder() - .jobId(job.getId()).taskId(taskId).jobRunId(jobRunId).writtenTime(finishTime) - .fileReferences(output.fileReferences()) - .updateTrackerFromLog(asyncCommit) - .build(); if (asyncCommit) { - asyncSender.send(StateStoreCommitRequest.create(table.getTableUniqueId(), transaction)); + asyncSender.send(StateStoreCommitRequest.create(table.getTableUniqueId(), + AddFilesTransaction.builder() + .jobId(job.getId()).taskId(taskId).jobRunId(jobRunId).writtenTime(finishTime) + .fileReferences(output.fileReferences()) + .build())); LOGGER.info("Submitted asynchronous request to state store committer to add {} files for job {} in table {}", output.numFiles(), job.getId(), table); } else { - transaction.synchronousCommit(stateStoreProvider.getStateStore(tableProperties)); + AddFilesTransaction.fromReferences(output.fileReferences()) + .synchronousCommit(stateStoreProvider.getStateStore(tableProperties)); LOGGER.info("Added {} files to statestore for job {} in table {}", output.numFiles(), job.getId(), table); } } catch (RuntimeException e) { diff --git a/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverTest.java b/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverTest.java index b2d6f15d16..2719c0205d 100644 --- a/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverTest.java +++ b/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverTest.java @@ -180,13 +180,12 @@ void shouldCommitNewFilesAsynchronouslyWhenConfigured() throws Exception { assertThat(commitRequestQueue).containsExactly(StateStoreCommitRequest.create(tableProperties.get(TABLE_ID), AddFilesTransaction.builder() .jobId(job.getId()).taskId("test-task").jobRunId("test-run").writtenTime(finishTime) - .updateTrackerFromLog(true) .fileReferences(outputFiles) .build())); } @Test - void shouldRecordInTransactionLogThatTrackerUpdateNotRequiredForSynchronousCommit() throws Exception { + void shouldNotRecordJobTrackerUpdateDetailsInTransactionLogForSynchronousCommit() throws Exception { // Given tableProperties.set(BULK_IMPORT_FILES_COMMIT_ASYNC, "false"); BulkImportJob job = singleFileImportJob(); @@ -202,14 +201,7 @@ void shouldRecordInTransactionLogThatTrackerUpdateNotRequiredForSynchronousCommi // Then assertThat(transactionLogs.getLastFilesTransaction(tableProperties)) - .isEqualTo(AddFilesTransaction.builder() - .jobId(job.getId()) - .taskId("test-task") - .jobRunId("test-run") - .writtenTime(finishTime) - .updateTrackerFromLog(false) - .fileReferences(outputFiles) - .build()); + .isEqualTo(AddFilesTransaction.fromReferences(outputFiles)); } private void runJob( diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AddFilesTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AddFilesTransaction.java index 91650d295d..76de8c6017 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AddFilesTransaction.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AddFilesTransaction.java @@ -53,7 +53,6 @@ public class AddFilesTransaction implements FileReferenceTransaction { private final String taskId; private final String jobRunId; private final Instant writtenTime; - private final boolean updateTrackerFromLog; private final List files; public AddFilesTransaction(List files) { @@ -65,7 +64,6 @@ private AddFilesTransaction(Builder builder) { taskId = builder.taskId; jobRunId = builder.jobRunId; writtenTime = builder.writtenTime; - updateTrackerFromLog = builder.updateTrackerFromLog; files = Objects.requireNonNull(builder.files, "files must not be null") .stream().map(file -> file.withCreatedUpdateTime(null)).toList(); } @@ -209,7 +207,7 @@ public List getFiles() { @Override public int hashCode() { - return Objects.hash(jobId, taskId, jobRunId, writtenTime, updateTrackerFromLog, files); + return Objects.hash(jobId, taskId, jobRunId, writtenTime, files); } @Override @@ -223,13 +221,13 @@ public boolean equals(Object obj) { AddFilesTransaction other = (AddFilesTransaction) obj; return Objects.equals(jobId, other.jobId) && Objects.equals(taskId, other.taskId) && Objects.equals(jobRunId, other.jobRunId) && Objects.equals(writtenTime, other.writtenTime) - && Objects.equals(updateTrackerFromLog, other.updateTrackerFromLog) && Objects.equals(files, other.files); + && Objects.equals(files, other.files); } @Override public String toString() { return "AddFilesTransaction{jobId=" + jobId + ", taskId=" + taskId + ", jobRunId=" + jobRunId + ", writtenTime=" - + writtenTime + ", updateTrackerFromLog=" + updateTrackerFromLog + ", files=" + files + "}"; + + writtenTime + ", files=" + files + "}"; } /** @@ -240,7 +238,6 @@ public static class Builder { private String taskId; private String jobRunId; private Instant writtenTime; - private boolean updateTrackerFromLog = true; private List files; private Builder() { @@ -279,18 +276,6 @@ public Builder writtenTime(Instant writtenTime) { return this; } - /** - * Sets the flag for whether to update the tracker following the transaction log. Default to true, set to false - * for synchronous commits. - * - * @param updateTrackerFromLog whether tracker needs to be updated - * @return this builder - */ - public Builder updateTrackerFromLog(boolean updateTrackerFromLog) { - this.updateTrackerFromLog = updateTrackerFromLog; - return this; - } - /** * Sets the ID of the job run that added these files. *