Skip to content

Commit

Permalink
infer schema in multi process
Browse files Browse the repository at this point in the history
  • Loading branch information
Phlair committed Aug 4, 2021
1 parent 8a80bf7 commit f0fb6f6
Showing 1 changed file with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from operator import itemgetter
from traceback import format_exc
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
import time
import multiprocessing as mp

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models.airbyte_protocol import SyncMode
Expand Down Expand Up @@ -230,8 +232,29 @@ def _get_master_schema(self) -> Mapping[str, Any]:
file_reader = self.fileformatparser_class(self._format)
# time order isn't necessary here but we might as well use this method so we cache the list for later use
for _, storagefile in self.time_ordered_storagefile_iterator():
with storagefile.open(file_reader.is_binary) as f:
this_schema = file_reader.get_inferred_schema(f)

# DEBUG
def schema_func(storagefile, file_reader):
with storagefile.open(file_reader.is_binary) as f:
this_schema = file_reader.get_inferred_schema(f)
return this_schema

def process_queuer(func, q, *args, **kwargs):
q.put(func(*args, **kwargs))

q_worker = mp.Queue()
proc = mp.Process(target=process_queuer, args=(schema_func, q_worker, storagefile, file_reader))
proc.start()
try:
res = q_worker.get(timeout=20)
this_schema = res
except mp.queues.Empty:
raise TimeoutError()
finally:
try:
proc.terminate()
except:
pass

if this_schema == master_schema:
continue # exact schema match so go to next file
Expand Down

0 comments on commit f0fb6f6

Please sign in to comment.