Skip to content

Commit

Permalink
Add per-process error logging
Browse files Browse the repository at this point in the history
  • Loading branch information
ihnorton committed Oct 27, 2020
1 parent 96f27de commit d082ce8
Showing 1 changed file with 38 additions and 5 deletions.
43 changes: 38 additions & 5 deletions examples/parallel_csv_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np, pandas as pd
import sys, os, tempfile, time, glob
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from contextlib import contextmanager

# helper functions to generate data
Expand Down Expand Up @@ -37,13 +37,43 @@ def make_dataframe(col_size):
df = make_dataframe(target_length)
df.to_csv(output_path)

def from_csv_mp(csv_path, array_path, list_step_size=5, chunksize=100, max_workers=4,
def log_process_errors(*args, **kwargs):
try:
tiledb.from_csv(*args, **kwargs)
except Exception as exc:
# print log to file. randomize just in case
err_id = np.random.randint(np.iinfo(np.int64).max)
err_filename = "ingest-err-PID_{}_{}.log".format(os.getpid(), err_id)
err = """ ------------------------
Caught exception:
------------------------
{}
------------------------
with args:
------------------------
{}
------------------------
{}
------------------------
this message saved to file: {}
""".format(exc, args, kwargs, err_filename)
print(err)
with open(err_filename, 'w') as f:
f.writelines(err)
raise

def from_csv_mp(csv_path,
array_path,
list_step_size=5,
chunksize=100,
max_workers=4,
engine='processpool',
initial_file_count=5,
index_col=None,
parse_dates=None,
attr_types=None,
sparse=True,
allows_duplicates=True,
debug=False,
**kwargs):
"""
Expand All @@ -60,6 +90,7 @@ def from_csv_mp(csv_path, array_path, list_step_size=5, chunksize=100, max_worke

# Get a list of of CSVs from the target path
csvs = glob.glob(csv_path + "/*.csv")

if len(csvs) < 1:
raise ValueError("Cannot ingest empty CSV list!")

Expand Down Expand Up @@ -93,12 +124,14 @@ def from_csv_mp(csv_path, array_path, list_step_size=5, chunksize=100, max_worke
# high level ingestion timing
start = time.time()
# ingest the data in parallel

#with ThreadPoolExecutor(max_workers=max_workers) as executor:
with ProcessPoolExecutor(max_workers=max_workers) as executor:
for first in range(0, len(csvs)+1, list_step_size):
for first in range(0, len(csvs), list_step_size):
last = min(len(csvs), first + list_step_size)
print(" Submitting task for CSV list range: ", (first, last))
task = executor.submit(
tiledb.from_csv,
log_process_errors,
*(array_path, csvs[first:last]),
**dict(chunksize=chunksize,
index_col=index_col,
Expand All @@ -107,7 +140,7 @@ def from_csv_mp(csv_path, array_path, list_step_size=5, chunksize=100, max_worke
column_types=attr_types,
engine='c',
debug=debug,
allows_duplicates=True),
allows_duplicates=allows_duplicates),
**kwargs,
mode='append')
tasks.append(task)
Expand Down

0 comments on commit d082ce8

Please sign in to comment.