Skip to content

Commit

Permalink
Remove AddFilesTransaction.updateTrackerFromLog
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Feb 26, 2025
1 parent 1cbf7f8 commit 97bc092
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,16 @@ public void run(BulkImportJob job, String jobRunId, String taskId) throws IOExce
Instant finishTime = getTime.get();
boolean asyncCommit = tableProperties.getBoolean(BULK_IMPORT_FILES_COMMIT_ASYNC);
try {
AddFilesTransaction transaction = AddFilesTransaction.builder()
.jobId(job.getId()).taskId(taskId).jobRunId(jobRunId).writtenTime(finishTime)
.fileReferences(output.fileReferences())
.updateTrackerFromLog(asyncCommit)
.build();
if (asyncCommit) {
asyncSender.send(StateStoreCommitRequest.create(table.getTableUniqueId(), transaction));
asyncSender.send(StateStoreCommitRequest.create(table.getTableUniqueId(),
AddFilesTransaction.builder()
.jobId(job.getId()).taskId(taskId).jobRunId(jobRunId).writtenTime(finishTime)
.fileReferences(output.fileReferences())
.build()));
LOGGER.info("Submitted asynchronous request to state store committer to add {} files for job {} in table {}", output.numFiles(), job.getId(), table);
} else {
transaction.synchronousCommit(stateStoreProvider.getStateStore(tableProperties));
AddFilesTransaction.fromReferences(output.fileReferences())
.synchronousCommit(stateStoreProvider.getStateStore(tableProperties));
LOGGER.info("Added {} files to statestore for job {} in table {}", output.numFiles(), job.getId(), table);
}
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,12 @@ void shouldCommitNewFilesAsynchronouslyWhenConfigured() throws Exception {
assertThat(commitRequestQueue).containsExactly(StateStoreCommitRequest.create(tableProperties.get(TABLE_ID),
AddFilesTransaction.builder()
.jobId(job.getId()).taskId("test-task").jobRunId("test-run").writtenTime(finishTime)
.updateTrackerFromLog(true)
.fileReferences(outputFiles)
.build()));
}

@Test
void shouldRecordInTransactionLogThatTrackerUpdateNotRequiredForSynchronousCommit() throws Exception {
void shouldNotRecordJobTrackerUpdateDetailsInTransactionLogForSynchronousCommit() throws Exception {
// Given
tableProperties.set(BULK_IMPORT_FILES_COMMIT_ASYNC, "false");
BulkImportJob job = singleFileImportJob();
Expand All @@ -202,14 +201,7 @@ void shouldRecordInTransactionLogThatTrackerUpdateNotRequiredForSynchronousCommi

// Then
assertThat(transactionLogs.getLastFilesTransaction(tableProperties))
.isEqualTo(AddFilesTransaction.builder()
.jobId(job.getId())
.taskId("test-task")
.jobRunId("test-run")
.writtenTime(finishTime)
.updateTrackerFromLog(false)
.fileReferences(outputFiles)
.build());
.isEqualTo(AddFilesTransaction.fromReferences(outputFiles));
}

private void runJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public class AddFilesTransaction implements FileReferenceTransaction {
private final String taskId;
private final String jobRunId;
private final Instant writtenTime;
private final boolean updateTrackerFromLog;
private final List<AllReferencesToAFile> files;

public AddFilesTransaction(List<AllReferencesToAFile> files) {
Expand All @@ -65,7 +64,6 @@ private AddFilesTransaction(Builder builder) {
taskId = builder.taskId;
jobRunId = builder.jobRunId;
writtenTime = builder.writtenTime;
updateTrackerFromLog = builder.updateTrackerFromLog;
files = Objects.requireNonNull(builder.files, "files must not be null")
.stream().map(file -> file.withCreatedUpdateTime(null)).toList();
}
Expand Down Expand Up @@ -209,7 +207,7 @@ public List<AllReferencesToAFile> getFiles() {

@Override
public int hashCode() {
return Objects.hash(jobId, taskId, jobRunId, writtenTime, updateTrackerFromLog, files);
return Objects.hash(jobId, taskId, jobRunId, writtenTime, files);
}

@Override
Expand All @@ -223,13 +221,13 @@ public boolean equals(Object obj) {
AddFilesTransaction other = (AddFilesTransaction) obj;
return Objects.equals(jobId, other.jobId) && Objects.equals(taskId, other.taskId)
&& Objects.equals(jobRunId, other.jobRunId) && Objects.equals(writtenTime, other.writtenTime)
&& Objects.equals(updateTrackerFromLog, other.updateTrackerFromLog) && Objects.equals(files, other.files);
&& Objects.equals(files, other.files);
}

@Override
public String toString() {
return "AddFilesTransaction{jobId=" + jobId + ", taskId=" + taskId + ", jobRunId=" + jobRunId + ", writtenTime="
+ writtenTime + ", updateTrackerFromLog=" + updateTrackerFromLog + ", files=" + files + "}";
+ writtenTime + ", files=" + files + "}";
}

/**
Expand All @@ -240,7 +238,6 @@ public static class Builder {
private String taskId;
private String jobRunId;
private Instant writtenTime;
private boolean updateTrackerFromLog = true;
private List<AllReferencesToAFile> files;

private Builder() {
Expand Down Expand Up @@ -279,18 +276,6 @@ public Builder writtenTime(Instant writtenTime) {
return this;
}

/**
* Sets the flag for whether to update the tracker following the transaction log. Default to true, set to false
* for synchronous commits.
*
* @param updateTrackerFromLog whether tracker needs to be updated
* @return this builder
*/
public Builder updateTrackerFromLog(boolean updateTrackerFromLog) {
this.updateTrackerFromLog = updateTrackerFromLog;
return this;
}

/**
* Sets the ID of the job run that added these files.
*
Expand Down

0 comments on commit 97bc092

Please sign in to comment.