Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 / GCS staging: use correct staging filename #11768

Merged
merged 4 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,36 +97,39 @@ public void createBucketObjectIfNotExists(final String objectPath) {
}

@Override
public String uploadRecordsToBucket(final SerializableBuffer recordsData, final String namespace, final String streamName, final String objectPath)
throws Exception {
public String uploadRecordsToBucket(final SerializableBuffer recordsData,
final String namespace,
final String streamName,
final String objectPath) {
final List<Exception> exceptionsThrown = new ArrayList<>();
boolean succeeded = false;
while (exceptionsThrown.size() < UPLOAD_RETRY_LIMIT && !succeeded) {
while (exceptionsThrown.size() < UPLOAD_RETRY_LIMIT) {
if (exceptionsThrown.size() > 0) {
LOGGER.info("Retrying to upload records into storage {} ({}/{}})", objectPath, exceptionsThrown.size(), UPLOAD_RETRY_LIMIT);
// Force a reconnection before retrying in case error was due to network issues...
s3Client = s3Config.resetS3Client();
}

try {
loadDataIntoBucket(objectPath, recordsData);
succeeded = true;
return loadDataIntoBucket(objectPath, recordsData);
} catch (final Exception e) {
LOGGER.error("Failed to upload records into storage {}", objectPath, e);
exceptionsThrown.add(e);
}
if (!succeeded) {
LOGGER.info("Retrying to upload records into storage {} ({}/{}})", objectPath, exceptionsThrown.size(), UPLOAD_RETRY_LIMIT);
// Force a reconnection before retrying in case error was due to network issues...
s3Client = s3Config.resetS3Client();
}
}
if (!succeeded) {
throw new RuntimeException(String.format("Exceptions thrown while uploading records into storage: %s", Strings.join(exceptionsThrown, "\n")));
}
return recordsData.getFilename();
throw new RuntimeException(String.format("Exceptions thrown while uploading records into storage: %s", Strings.join(exceptionsThrown, "\n")));
}

private void loadDataIntoBucket(final String objectPath, final SerializableBuffer recordsData) throws IOException {
/**
* Upload the file from {@code recordsData} to S3 and simplify the filename as <partId>.<extension>.
* @return the uploaded filename, which is different from the serialized buffer filename
*/
private String loadDataIntoBucket(final String objectPath, final SerializableBuffer recordsData) throws IOException {
final long partSize = s3Config.getFormatConfig() != null ? s3Config.getFormatConfig().getPartSize() : DEFAULT_PART_SIZE;
final String bucket = s3Config.getBucketName();
final String objectKeyWithPartId = String.format("%s%s%s", objectPath, getPartId(objectPath), getExtension(recordsData.getFilename()));
final String newFilename = getPartId(objectPath) + getExtension(recordsData.getFilename());
tuliren marked this conversation as resolved.
Show resolved Hide resolved
final String fullObjectKey = objectPath + newFilename;
final StreamTransferManager uploadManager = StreamTransferManagerHelper
.getDefault(bucket, objectKeyWithPartId, s3Client, partSize)
.getDefault(bucket, fullObjectKey, s3Client, partSize)
.checkIntegrity(true)
.numUploadThreads(DEFAULT_UPLOAD_THREADS)
.queueCapacity(DEFAULT_QUEUE_CAPACITY);
Expand All @@ -145,10 +148,12 @@ private void loadDataIntoBucket(final String objectPath, final SerializableBuffe
uploadManager.complete();
}
}
if (!s3Client.doesObjectExist(bucket, objectKeyWithPartId)) {
LOGGER.error("Failed to upload data into storage, object {} not found", objectKeyWithPartId);
if (!s3Client.doesObjectExist(bucket, fullObjectKey)) {
LOGGER.error("Failed to upload data into storage, object {} not found", fullObjectKey);
throw new RuntimeException("Upload failed");
}
LOGGER.info("Uploaded buffer file to storage: {} -> {}", recordsData.getFilename(), fullObjectKey);
return newFilename;
}

protected static String getExtension(final String filename) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,8 @@ public String uploadRecordsToStage(final JdbcDatabase database,
final SerializableBuffer recordsData,
final String schemaName,
final String stageName,
final String stagingPath)
throws Exception {
AirbyteSentry.executeWithTracing("UploadRecordsToStage",
tuliren marked this conversation as resolved.
Show resolved Hide resolved
() -> s3StorageOperations.uploadRecordsToBucket(recordsData, schemaName, stageName, stagingPath),
Map.of("stage", stageName, "path", stagingPath));
return recordsData.getFilename();
final String stagingPath) {
return s3StorageOperations.uploadRecordsToBucket(recordsData, schemaName, stageName, stagingPath);
}

@Override
Expand Down