Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak in source-s3 while reading big CSV file #6870

Closed
amorskoy opened this issue Oct 7, 2021 · 10 comments · Fixed by #8252
Closed

Memory leak in source-s3 while reading big CSV file #6870

amorskoy opened this issue Oct 7, 2021 · 10 comments · Fixed by #8252
Assignees

Comments

@amorskoy
Copy link

amorskoy commented Oct 7, 2021

Enviroment

  • Airbyte version: fresh master, commit 11645689431a69c689a15b620e4a2b6bc7b045c3
  • OS Version / Instance: Ubuntu 18.04
  • Deployment: Docker
  • Source Connector and version: source-s3:0.1.5
  • Destination Connector and version: destination-s3:0.1.12
  • Severity: Critical
  • Step where error happened: Sync job

Current Behavior

I have CSV on s3: 35GB, 8M rows x 500 columns, synth generated by Python Faker lib.
1K sample is attached
sample_synth_1K_500.csv

Memory consumption grows until OOM (30GB on 32 GB EC2 instance).
Screenshots for htop in the middle and before OOM are attached.
memleak_source_s3
resources_8m_x_500

Expected Behavior

Expected normal sync without OOM and with predictable fixed RAM consumption for connector's main.py

Logs

Logs for sync job attached
logs-6-0.txt

@amorskoy amorskoy added the type/bug Something isn't working label Oct 7, 2021
@marcosmarxm marcosmarxm added the area/connectors Connector related issues label Oct 7, 2021
@marcosmarxm
Copy link
Member

thanks for opening the issue @amorskoy! Currently we apply a chunk method to read the file and not load all into the memory. Maybe there is something not working... we're going to investigate

reader_options = {**self._reader_options}
if self._reader_format == "csv":
reader_options["chunksize"] = 10000
if skip_data:
reader_options["nrows"] = 0
reader_options["index_col"] = 0
yield from reader(fp, **reader_options)
else:
yield reader(fp, **reader_options)

@marcosmarxm
Copy link
Member

@amorskoy Can you share the script to create the mock file using Faker?

@amorskoy
Copy link
Author

amorskoy commented Oct 7, 2021

@marcosmarxm sure, let me do it in a few hours as I arrive

@amorskoy
Copy link
Author

amorskoy commented Oct 7, 2021

@marcosmarxm Here is it. I've found it on some gist. Slightly modified to support cyclic headers and faster generation using suffix for deduplication. Not ideal for data processing, but for connectivity domain this is enough.

import copy
import csv
import sys
from itertools import cycle

from faker import Faker
import datetime


CACHE = []
CACHE_SIZE = 1000

def datagenerate(records, headers, out_path):
    fake = Faker('en_US')
    fake1 = Faker('en_GB')  # To generate phone numbers
    with open(out_path, 'wt') as csvFile:
        writer = csv.DictWriter(csvFile, fieldnames=headers)
        writer.writeheader()
        for i in range(records):

            if len(CACHE) == CACHE_SIZE:
                out_dict = copy.copy(CACHE[i % CACHE_SIZE])
                for k,v in out_dict.items():
                    out_dict[k] = f"{v}{i}"
            else:
                out_dict = {}
                full_name = fake.name()
                FLname = full_name.split(" ")
                Fname = FLname[0]
                Lname = FLname[1]
                domain_name = "@testDomain.com"
                userId = Fname + "." + Lname + domain_name

                for col in headers:
                    suf = col[col.index(" ") + 1:]
                    if suf == "Email Id": out_dict[col] = userId
                    elif suf == "Prefix": out_dict[col] = fake.prefix()
                    elif suf == "Name": out_dict[col] = fake.name()
                    elif suf == "Birth Date": out_dict[col] = fake.date(pattern="%d-%m-%Y", end_datetime=datetime.date(2000, 1, 1))
                    elif suf == "Phone Number": out_dict[col] = fake1.phone_number()
                    elif suf == "Additional Email Id": out_dict[col] = fake.email()
                    elif suf == "Address": out_dict[col] = fake.address().replace("\n", " ")
                    elif suf == "Zip Code": out_dict[col] = fake.zipcode()
                    elif suf == "City": out_dict[col] = fake.city()
                    elif suf == "State": out_dict[col] = fake.state()
                    elif suf == "Country": out_dict[col] = fake.country()
                    elif suf == "Year": out_dict[col] = fake.year()
                    elif suf == "Time": out_dict[col] = fake.time()
                    elif suf == "Link": out_dict[col] = fake.url()
                    elif suf == "Text": out_dict[col] = fake.word()
                    elif suf == "Ready": out_dict[col] = int(fake.pybool())

                CACHE.append(out_dict)


            writer.writerow(out_dict)


def generate_headers(num_columns):
    headers = ["Email Id", "Prefix", "Name", "Birth Date", "Phone Number",
               "Address", "Zip Code", "City", "State", "Country", "Year", "Time", "Link", "Text", "Ready"]

    headers_size = len(headers)
    allow_cycled_headers = ["Ready"]

    cycled_headers = cycle(allow_cycled_headers)
    base_iter = iter(headers)

    for i in range(num_columns):
        if i < headers_size:
            col_name = f"C{i} {next(base_iter)}"
        else:
            col_name = f"C{i} {next(cycled_headers)}"

        yield col_name


if __name__ == '__main__':
    records = int(sys.argv[1])
    num_columns = int(sys.argv[2])
    out_path = sys.argv[3]

    datagenerate(records, list(generate_headers(num_columns)), out_path)
    print("CSV generation complete!")

@amorskoy
Copy link
Author

amorskoy commented Oct 7, 2021

Feel free to edit allow_cycled_headers to tune it for your needs

@sherifnada
Copy link
Contributor

@amorskoy we recently merged an improvement to memory management in #6615 - can you upgrade your S3 connector and let us know if the issue persists?

@amorskoy
Copy link
Author

@amorskoy we recently merged an improvement to memory management in #6615 - can you upgrade your S3 connector and let us know if the issue persists?

@sherifnada Thanks - I will, but it may take some days as I am a little out of context at this moment. Please let me know, if I should either close an issue meanwhile or wait till my check alternatively.
Thanks

@sherifnada
Copy link
Contributor

I'll keep this open until we hear back from you :)

@amorskoy
Copy link
Author

amorskoy commented Nov 8, 2021

@sherifnada Well, it looks better, but seems that leak is not removed on the latest airbyte master - commit 154ecceda0350b33b840aab076b6a57b9aad358b
Connectors:

  • Source = airbyte/source-s3:0.1.6
  • Destination = airbyte/destination-s3:0.1.13

Seems that RAM consumption on S3 reader main.py growth ~ 10MB per 1000 rows, so at row number ~578K it is about 1925MB and continues to grow.

@amorskoy
Copy link
Author

cc @marcosmarxm sorry, forget to add you to cc above ^^^^

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
No open projects
Archived in project
Development

Successfully merging a pull request may close this issue.

6 participants