Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 fixed OOM error when splitting a stream into several files #7074

Merged
merged 8 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.3.15",
"dockerImageTag": "0.3.16",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
"name": "Redshift",
"dockerRepository": "airbyte/destination-redshift",
"dockerImageTag": "0.3.17",
"dockerImageTag": "0.3.18",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift",
"icon": "redshift.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.3.15
dockerImageTag: 0.3.16
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
name: S3
Expand All @@ -52,7 +52,7 @@
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.17
dockerImageTag: 0.3.18
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.6
LABEL io.airbyte.version=0.3.7
LABEL io.airbyte.name=airbyte/destination-jdbc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.csv.CSVFormat;
Expand All @@ -40,6 +41,15 @@
public abstract class GcsStreamCopier implements StreamCopier {

private static final Logger LOGGER = LoggerFactory.getLogger(GcsStreamCopier.class);
// It is optimal to write every 10,000,000 records to a new file. This will make it easier to work with files and
// speed up the recording of large amounts of data.
// In addition, for a large number of records, we will not get a drop in the copy request to QUERY_TIMEOUT when
// the records from the file are copied to the staging table.
public static final int DEFAULT_PART = 1000;
public static final String UNDERSCORE = "_";

public final Map<String, Integer> filePrefixIndexMap = new HashMap<>();
public final Map<String, Integer> fileNamePartsMap = new HashMap<>();

private final Storage storageClient;
private final GcsConfig gcsConfig;
Expand Down Expand Up @@ -77,25 +87,52 @@ public GcsStreamCopier(String stagingFolder,
}

private String prepareGcsStagingFile() {
return String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 6) + "_" + streamName);
return String.join("/", stagingFolder, schemaName, getGcsStagingFileName());
}

private String getGcsStagingFileName() {
String result = 0 + UNDERSCORE + streamName;
if (filePrefixIndexMap.containsKey(streamName)) {
result = getGcsStagingFileNamePart(filePrefixIndexMap.get(streamName));
} else {
filePrefixIndexMap.put(streamName, 0);
fileNamePartsMap.put(result, 0);
}
return result;
}

private String getGcsStagingFileNamePart(Integer prefixIndex) {
String result = prefixIndex + UNDERSCORE + streamName;
if (fileNamePartsMap.containsKey(result) && fileNamePartsMap.get(result) < DEFAULT_PART) {
var partIndex = fileNamePartsMap.get(result) + 1;
fileNamePartsMap.put(result, partIndex);
} else {
int index = prefixIndex + 1;
result = index + UNDERSCORE + streamName;
filePrefixIndexMap.put(streamName, index);
fileNamePartsMap.put(result, 0);
}
return result;
}

@Override
public String prepareStagingFile() {
var name = prepareGcsStagingFile();
gcsStagingFiles.add(name);
var blobId = BlobId.of(gcsConfig.getBucketName(), name);
var blobInfo = BlobInfo.newBuilder(blobId).build();
var blob = storageClient.create(blobInfo);
var channel = blob.writer();
channels.put(name, channel);
OutputStream outputStream = Channels.newOutputStream(channel);

var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8);
try {
csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT));
} catch (IOException e) {
throw new RuntimeException(e);
if (!gcsStagingFiles.contains(name)) {
gcsStagingFiles.add(name);
var blobId = BlobId.of(gcsConfig.getBucketName(), name);
var blobInfo = BlobInfo.newBuilder(blobId).build();
var blob = storageClient.create(blobInfo);
var channel = blob.writer();
channels.put(name, channel);
OutputStream outputStream = Channels.newOutputStream(channel);

var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8);
try {
csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,16 @@ public abstract class S3StreamCopier implements StreamCopier {
// us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit.
// WARNING: Too large a part size can cause potential OOM errors.
public static final int DEFAULT_PART_SIZE_MB = 10;
// It is optimal to write every 10,000,000 records to a new file. This will make it easier to work with files and
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
// speed up the recording of large amounts of data.
// In addition, for a large number of records, we will not get a drop in the copy request to QUERY_TIMEOUT when
// the records from the file are copied to the staging table.
public static final int DEFAULT_PART = 1000;
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
public static final String UNDERSCORE = "_";

public final Map<String, Integer> filePrefixIndexMap = new HashMap<>();
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
public final Map<String, Integer> fileNamePartsMap = new HashMap<>();

protected final AmazonS3 s3Client;
protected final S3Config s3Config;
protected final String tmpTableName;
Expand Down Expand Up @@ -88,46 +96,62 @@ public S3StreamCopier(String stagingFolder,
}

private String prepareS3StagingFile() {
return String.join("/", stagingFolder, schemaName, getFilePrefixIndex() + "_" + s3FileName);
return String.join("/", stagingFolder, schemaName, getS3StagingFileName());
}

private Integer getFilePrefixIndex() {
int result = 0;
private String getS3StagingFileName() {
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
String result = 0 + UNDERSCORE + streamName;
if (filePrefixIndexMap.containsKey(s3FileName)) {
result = filePrefixIndexMap.get(s3FileName) + 1;
filePrefixIndexMap.put(s3FileName, result);
result = getS3StagingFileNamePart(filePrefixIndexMap.get(s3FileName));
} else {
filePrefixIndexMap.put(s3FileName, 0);
fileNamePartsMap.put(result, 0);
}
return result;
}

private String getS3StagingFileNamePart(Integer prefixIndex) {
String result = prefixIndex + UNDERSCORE + s3FileName;
if (fileNamePartsMap.containsKey(result) && fileNamePartsMap.get(result) < DEFAULT_PART) {
var partIndex = fileNamePartsMap.get(result) + 1;
fileNamePartsMap.put(result, partIndex);
} else {
int index = prefixIndex + 1;
result = index + UNDERSCORE + s3FileName;
filePrefixIndexMap.put(s3FileName, index);
fileNamePartsMap.put(result, 0);
}
return result;
}
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved

@Override
public String prepareStagingFile() {
var name = prepareS3StagingFile();
s3StagingFiles.add(name);
LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize());
// The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not
// have support for streaming multipart uploads;
// The alternative is first writing the entire output to disk before loading into S3. This is not
// feasible with large tables.
// Data is chunked into parts. A part is sent off to a queue to be uploaded once it has reached it's
// configured part size.
// Memory consumption is queue capacity * part size = 10 * 10 = 100 MB at current configurations.
var manager = new StreamTransferManager(s3Config.getBucketName(), name, s3Client)
.numUploadThreads(DEFAULT_UPLOAD_THREADS)
.queueCapacity(DEFAULT_QUEUE_CAPACITY)
.partSize(s3Config.getPartSize());
multipartUploadManagers.put(name, manager);
var outputStream = manager.getMultiPartOutputStreams().get(0);
// We only need one output stream as we only have one input stream. This is reasonably performant.
// See the above comment.
outputStreams.put(name, outputStream);
var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8);
try {
csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT));
} catch (IOException e) {
throw new RuntimeException(e);
if (!s3StagingFiles.contains(name)) {
s3StagingFiles.add(name);
LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize());
// The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not
// have support for streaming multipart uploads;
// The alternative is first writing the entire output to disk before loading into S3. This is not
// feasible with large tables.
// Data is chunked into parts. A part is sent off to a queue to be uploaded once it has reached it's
// configured part size.
// Memory consumption is queue capacity * part size = 10 * 10 = 100 MB at current configurations.
var manager = new StreamTransferManager(s3Config.getBucketName(), name, s3Client)
.numUploadThreads(DEFAULT_UPLOAD_THREADS)
.queueCapacity(DEFAULT_QUEUE_CAPACITY)
.partSize(s3Config.getPartSize());
multipartUploadManagers.put(name, manager);
var outputStream = manager.getMultiPartOutputStreams().get(0);
// We only need one output stream as we only have one input stream. This is reasonably performant.
// See the above comment.
outputStreams.put(name, outputStream);
var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8);
try {
csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.17
LABEL io.airbyte.version=0.3.18
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.15
LABEL io.airbyte.version=0.3.16
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,24 @@
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -132,4 +142,18 @@ protected void tearDown(TestDestinationEnv testEnv) throws Exception {
SnowflakeDatabase.getDatabase(baseConfig).execute(createSchemaQuery);
}

@ParameterizedTest
@ArgumentsSource(DataArgumentsProvider.class)
public void testSyncWithBillionRecords(String messagesFilename, String catalogFilename) throws Exception {
final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final List<AirbyteMessage> messages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());

final List<AirbyteMessage> largeNumberRecords = Collections.nCopies(1000000, messages).stream().flatMap(List::stream).collect(Collectors.toList());

final JsonNode config = getConfig();
runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false);
}

}