-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 Source S3: fixed bug where sync could hang indefinitely #5197
Conversation
/test connector=connectors/source-s3
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to block this, so if you can find solutions to the following issues, judge them to be insignificant/irrelevant, or if I'm just plain missing something, feel free to merge (the other comments are readability focused):
- 20s constant timeout -- is this reliable? what if it takes more time to read the data?
- how can we be certain that reading
blockSize*2
actually gives us the entire first row? (what if it's very big?)
schema_dict = {field.name: field.type for field in streaming_reader.schema} | ||
return schema_dict | ||
|
||
# boto3 stuff can't be pickled and so we can't multiprocess with the actual fileobject on Windows systems |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's a better word than "stuff" ? :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you know... stuff... like things and whatever... 😅
Made this more clear!
# boto3 stuff can't be pickled and so we can't multiprocess with the actual fileobject on Windows systems | ||
# we're reading block_size*2 bytes here, which we can then pass in and infer schema from block_size bytes | ||
# the *2 is to give us a buffer as pyarrow figures out where lines actually end so it gets schema correct | ||
file_sample = file.read(self._read_options()["block_size"] * 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how can we be certain that this block size completely covers the first row of data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The easy answer would be check on newline chars but in the case where newlines are allowable in the data itself this isn't bulletproof. So really you need to parse the csv to be certain you've got enough of a block to parse the csv... bit of a logic loop!
I've decided to expose block_size in spec so the user can configure it (with instructions to increase it from default if having issues with schema detection).
# we're reading block_size*2 bytes here, which we can then pass in and infer schema from block_size bytes | ||
# the *2 is to give us a buffer as pyarrow figures out where lines actually end so it gets schema correct | ||
file_sample = file.read(self._read_options()["block_size"] * 2) | ||
schema_dict = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for readability could we refactor this bit to be:
file_sample = file.read(self._read_options()["block_size"] * 2)
schema_dict = self.run_in_external_process(get_schema, file_sample, self._read_options(), self._parse_options(), self._convert_options())
return self.json_schema_to_pyarrow_schema(schema_dict, reverse=True)
basically taking all the external process logic outside into a helper method?
I tried it in my iDE, here's what I had for reference in case it helps:
def run_in_external_process(self, fn, *args):
schema_dict = None
fail_count = 0
while schema_dict is None:
q_worker = mp.Queue()
proc = mp.Process(
target=multiprocess_queuer,
args=(
dill.dumps(fn), # use dill to pickle the get_schema function for Windows-compatibility
q_worker,
*args,
),
)
proc.start()
try:
# this attempts to get return value from function with a 20 second timeout
return q_worker.get(timeout=20)
except mp.queues.Empty:
fail_count += 1
if fail_count > 2:
raise TimeoutError("Timed out 3 times while trying to infer schema")
self.logger.info("timed out on schema inference, retrying...")
finally:
try:
proc.terminate()
except Exception as e:
self.logger.info(f"infer schema proc unterminated, error: {e}")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah like it, changed!
proc.start() | ||
try: | ||
# this attempts to get return value from function with a 20 second timeout | ||
schema_dict = q_worker.get(timeout=20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be useful to do an increasing backoff until 60 seconds or something just in case the schema is genuinely taking a long time to load
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I think the balancing act here is waiting long enough that we don't time out any long but non-hanging processes while failing relatively quickly in cases of hang or badly formed csvs that should fail inference.
Changed this to a doubling timeout in range 4 -> 60 seconds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now propagating any actual non-hang errors back up now so it will failfast unless hanging in which case we retry with increasing backoff as above.
/test connector=connectors/source-s3
|
# Conflicts: # airbyte-integrations/connectors/source-s3/integration_tests/spec.json
/publish connector=connectors/source-s3
|
future work: we should maybe explore not using pyarrow if that's the problem |
# Conflicts: # airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json # airbyte-config/init/src/main/resources/seed/source_definitions.yaml
What
will close #5160
I'm about 90% confident this issue was the cause of the problem. The hang only ever happens on schema inference rather than streaming records and that issue indicates
csv::ColumnDecoder
as the root for non-cancelling. When we're streaming records we're applying a known schema and no longer inferring so this lines up.How
Since the problem is that pyarrow csv schema inference isn't responding to signal interrupts, using a standard timeout interrupt (either manually build
signal
interrupt or using timeout on threads) has no effect and the hang still occurs.To get around this, I've used multiprocessing to spawn a new process and run the schema inference in there. We can then kill this after a timeout (20 seconds) if it still hasn't returned. I've set this to retry 3 times before raising an error. In my testing it has worked no later than on the 2nd attempt (mostly on first) but if it were to reach retry limit, the sync will actually error out now rather than hang forever.
I've built this around Win limitations and tested this on Windows as well since that plays funny with python multiprocessing due to lack of fork() support. These considerations are commented in the code (and the reason why it's less simple than it could be on unix-only).
Note
Given the non-deterministic nature of the bug and the limited testing that affords, I can't guarantee with 100% certainty this solves it but it seems very likely based on my tests.
Updating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described here