Skip to content

Commit

Permalink
new variable
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jan 31, 2023
1 parent 431f8c9 commit cf2a30d
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ def build_stream_processor(
primary_key = get_field(configured_stream, "primary_key", f"Undefined primary key for stream {stream_name}")

message = f"'json_schema'.'properties' are not defined for stream {stream_name}"
schema = get_field(stream_config, "json_schema", f"'json_schema' is not defined for stream {stream_name}")
if "properties" in schema:
properties = get_field(schema, "properties", message)
elif "oneOf" in schema:
options = filter(lambda option: "properties" in option, schema["oneOf"])
stream_schema = get_field(stream_config, "json_schema", f"'json_schema' is not defined for stream {stream_name}")
if "properties" in stream_schema:
properties = get_field(stream_schema, "properties", message)
elif "oneOf" in stream_schema:
options = list(filter(lambda option: "properties" in option, stream_schema["oneOf"]))
if len(options) == 0:
raise KeyError(f"Stream {stream_name} does not have any properties")
# If there are multiple oneOf options, just pick the first one - we don't really support oneOf to begin with
Expand Down

0 comments on commit cf2a30d

Please sign in to comment.