Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add register_schemas option to encode method #210

Merged
merged 8 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ avro.encode({ "name" => "Jane", "age" => 28 }, schema_name: "person")
# Data can be validated before encoding to get a description of problem through
# Avro::SchemaValidator::ValidationError exception
avro.encode({ "titl" => "hello, world" }, schema_name: "person", validate: true)

# If you do not want to register the schema in case it does not exist, you can pass the register_schemas option as false
avro.encode({ "name" => "Jane", "age" => 28 }, schema_name: "person", register_schemas: false)
```

### Inter-schema references
Expand Down
6 changes: 5 additions & 1 deletion lib/avro_turf/cached_confluent_schema_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ def initialize(upstream, cache: nil)
end

# Delegate the following methods to the upstream
%i(subjects subject_versions schema_subject_versions check compatible?
%i(subjects subject_versions schema_subject_versions compatible?
global_config update_global_config subject_config update_subject_config).each do |name|
define_method(name) do |*args|
instance_variable_get(:@upstream).send(name, *args)
end
end

def check(subject, schema)
@cache.lookup_data_by_schema(subject, schema) || @cache.store_data_by_schema(subject, schema, @upstream.check(subject, schema))
end

def fetch(id)
@cache.lookup_by_id(id) || @cache.store_by_id(id, @upstream.fetch(id))
end
Expand Down
19 changes: 19 additions & 0 deletions lib/avro_turf/disk_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def initialize(disk_path, logger: Logger.new($stdout))

@schemas_by_subject_version_path = File.join(disk_path, 'schemas_by_subject_version.json')
@schemas_by_subject_version = {}

@data_by_schema_path = File.join(disk_path, 'data_by_schema.json')
hash = read_from_disk_cache(@data_by_schema_path)
@data_by_schema = hash || {}
end

# override
Expand All @@ -40,6 +44,12 @@ def lookup_by_schema(subject, schema)
@ids_by_schema[key]
end

# override to use a json serializable cache key
def lookup_data_by_schema(subject, schema)
key = "#{subject}#{schema}"
@data_by_schema[key]
end

# override to use a json serializable cache key and update the file cache
def store_by_schema(subject, schema, id)
key = "#{subject}#{schema}"
Expand All @@ -49,6 +59,15 @@ def store_by_schema(subject, schema, id)
id
end

def store_data_by_schema(subject, schema, data)
return unless data

key = "#{subject}#{schema}"
@data_by_schema[key] = data
write_to_disk_cache(@data_by_schema_path, @data_by_schema)
data
end

# checks instance var (in-memory cache) for schema
# checks disk cache if in-memory cache doesn't exists
# if file exists but no in-memory cache, read from file and sync in-memory cache
Expand Down
14 changes: 13 additions & 1 deletion lib/avro_turf/in_memory_cache.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# A cache for the CachedConfluentSchemaRegistry.
# Simply stores the schemas and ids in in-memory hashes.
class AvroTurf::InMemoryCache

def initialize
@schemas_by_id = {}
@ids_by_schema = {}
@schema_by_subject_version = {}
@data_by_schema = {}
end

def lookup_by_id(id)
Expand All @@ -21,11 +21,23 @@ def lookup_by_schema(subject, schema)
@ids_by_schema[key]
end

def lookup_data_by_schema(subject, schema)
key = [subject, schema]
@data_by_schema[key]
end

def store_by_schema(subject, schema, id)
key = [subject, schema]
@ids_by_schema[key] = id
end

def store_data_by_schema(subject, schema, data)
return unless data

key = [subject, schema]
@data_by_schema[key] = data
end

def lookup_by_version(subject, version)
key = "#{subject}#{version}"
@schema_by_subject_version[key]
Expand Down
44 changes: 29 additions & 15 deletions lib/avro_turf/messaging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,27 +106,33 @@ def initialize(

# Encodes a message using the specified schema.
#
# message - The message that should be encoded. Must be compatible with
# the schema.
# schema_name - The String name of the schema that should be used to encode
# the data.
# namespace - The namespace of the schema (optional).
# subject - The subject name the schema should be registered under in
# the schema registry (optional).
# version - The integer version of the schema that should be used to decode
# the data. Must match the schema used when encoding (optional).
# schema_id - The integer id of the schema that should be used to encode
# the data.
# validate - The boolean for performing complete message validation before
# encoding it, Avro::SchemaValidator::ValidationError with
# a descriptive message will be raised in case of invalid message.
# message - The message that should be encoded. Must be compatible with
# the schema.
# schema_name - The String name of the schema that should be used to encode
# the data.
# namespace - The namespace of the schema (optional).
# subject - The subject name the schema should be registered under in
# the schema registry (optional).
# version - The integer version of the schema that should be used to decode
# the data. Must match the schema used when encoding (optional).
# schema_id - The integer id of the schema that should be used to encode
# the data.
# validate - The boolean for performing complete message validation before
# encoding it, Avro::SchemaValidator::ValidationError with
# a descriptive message will be raised in case of invalid message.
# register_schemas - The boolean that indicates whether or not the schema should be
# registered in case it does not exist, or if it should be fetched
# from the registry without registering it (register_schemas: false).
#
# Returns the encoded data as a String.
def encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil, schema_id: nil, validate: false)
def encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil, schema_id: nil, validate: false,
register_schemas: true)
schema, schema_id = if schema_id
fetch_schema_by_id(schema_id)
elsif subject && version
fetch_schema(subject: subject, version: version)
elsif schema_name && !register_schemas
fetch_schema_by_body(subject: subject, schema_name: schema_name, namespace: namespace)
elsif schema_name
register_schema(subject: subject, schema_name: schema_name, namespace: namespace)
else
Expand Down Expand Up @@ -228,6 +234,14 @@ def fetch_schema_by_id(schema_id)
[schema, schema_id]
end

def fetch_schema_by_body(schema_name:, subject: nil, namespace: nil)
schema = @schema_store.find(schema_name, namespace)
schema_data = @registry.check(subject || schema.fullname, schema)
raise SchemaNotFoundError.new("Schema with structure: #{schema} not found on registry") unless schema_data

[schema, schema_data["id"]]
end

# Schemas are registered under the full name of the top level Avro record
# type, or `subject` if it's provided.
def register_schema(schema_name:, subject: nil, namespace: nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class FakePrefixedConfluentSchemaRegistryServer < FakeConfluentSchemaRegistrySer

# Note: this does not actually handle the same schema registered under
# multiple subjects
schema_id = SCHEMAS.index(schema)
context, _subject = parse_qualified_subject(params[:subject])
schema_id = SCHEMAS[context].index(schema)
Comment on lines +67 to +68
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was already fixed in the parent but was failing here so all I did was copy the same behavior


halt(404, SCHEMA_NOT_FOUND) unless schema_id

Expand Down
26 changes: 23 additions & 3 deletions spec/cached_confluent_schema_registry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
let(:upstream) { instance_double(AvroTurf::ConfluentSchemaRegistry) }
let(:registry) { described_class.new(upstream) }
let(:id) { rand(999) }
let(:subject_name) { 'a_subject' }
let(:schema) do
{
type: "record",
Expand All @@ -25,8 +26,6 @@
end

describe "#register" do
let(:subject_name) { "a_subject" }

it "caches the result of register" do
# multiple calls return same result, with only one upstream call
allow(upstream).to receive(:register).with(subject_name, schema).and_return(id)
Expand All @@ -36,8 +35,29 @@
end
end

describe "#check" do
let(:schema_data) do
{
"subject" => subject_name,
"version" => 123,
"id" => id,
"schema" => schema
}
end

before do
allow(upstream).to receive(:check).with(subject_name, schema).and_return(schema_data)
end

it "caches the result of check" do
# multiple calls return same result, with only one upstream call
expect(registry.check(subject_name, schema)).to eq(schema_data)
expect(registry.check(subject_name, schema)).to eq(schema_data)
expect(upstream).to have_received(:check).exactly(1).times
end
end

describe '#subject_version' do
let(:subject_name) { 'a_subject' }
let(:version) { 1 }
let(:schema_with_meta) do
{
Expand Down
60 changes: 60 additions & 0 deletions spec/disk_cached_confluent_schema_registry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,66 @@
end
end

describe "#check" do
let(:city_name) { "a_city" }
let(:schema_data) do
{
"subject" => subject,
"version" => version,
"id" => id,
"schema" => schema
}
end

let(:city_schema_data) do
{
"subject" => city_name,
"version" => version,
"id" => city_id,
"schema" => city_schema
}
end

let(:cache_before) do
{
"#{subject}#{schema}" => schema_data
}
end

let(:cache_after) do
{
"#{subject}#{schema}" => schema_data,
"#{city_name}#{city_schema}" => city_schema_data
}
end

# setup the disk cache to avoid performing the upstream fetch
before do
store_cache("data_by_schema.json", cache_before)
allow(upstream).to receive(:check).with(subject, schema).and_return(schema_data)
allow(upstream).to receive(:check).with(city_name, city_schema).and_return(city_schema_data)
end

context "when the schema is not found in the cache" do
it "makes only one request using upstream" do
expect(registry.check(city_name, city_schema)).to eq(city_schema_data)
expect(registry.check(city_name, city_schema)).to eq(city_schema_data)
expect(upstream).to have_received(:check).with(city_name, city_schema).exactly(1).times
expect(load_cache("data_by_schema.json")).to eq cache_after
end
end

context "when schema is already in the cache" do
it "uses preloaded disk cache" do
# multiple calls return same result, with zero upstream calls
expect(registry.check(subject, schema)).to eq(schema_data)
expect(registry.check(subject, schema)).to eq(schema_data)
expect(upstream).to have_received(:check).exactly(0).times
expect(load_cache("data_by_schema.json")).to eq cache_before
end
end
end

it_behaves_like "a confluent schema registry client" do
let(:upstream) { AvroTurf::ConfluentSchemaRegistry.new(registry_url, logger: logger) }
let(:registry) { described_class.new(upstream) }
Expand Down
57 changes: 57 additions & 0 deletions spec/messaging_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@
}
AVSC
end

let(:city_message) { { "name" => "Paris" } }
let(:city_schema_json) do
<<-AVSC
{
"name": "city",
"type": "record",
"fields": [
{
"type": "string",
"name": "name"
}
]
}
AVSC
end

let(:city_schema) { Avro::Schema.parse(city_schema_json) }
let(:schema) { Avro::Schema.parse(schema_json) }

before do
Expand All @@ -49,6 +67,7 @@

before do
define_schema "person.avsc", schema_json
define_schema "city.avsc", city_schema_json
end

shared_examples_for "encoding and decoding with the schema from schema store" do
Expand Down Expand Up @@ -92,6 +111,16 @@
expect { avro.encode(message, subject: 'missing', version: 1) }.to raise_error(AvroTurf::SchemaNotFoundError)
end

it 'raises AvroTurf::SchemaNotFoundError when the schema does not exist on registry and register_schemas false' do
expect { avro.encode(city_message, schema_name: 'city', register_schemas: false) }.
to raise_error(AvroTurf::SchemaNotFoundError, "Schema with structure: #{city_schema} not found on registry")
end

it 'encodes with register_schemas false when the schema exists on the registry' do
data = avro.encode(message, schema_name: 'person', register_schemas: false)
expect(avro.decode(data, schema_name: 'person')).to eq message
end

it 'caches parsed schemas for decoding' do
data = avro.encode(message, subject: 'person', version: 1)
avro.decode(data)
Expand Down Expand Up @@ -364,6 +393,34 @@
end
end

context 'using fetch_schema_by_body' do
let(:subject_name) { 'city' }
let(:schema_name) { 'city' }
let(:namespace) { 'namespace' }
let(:city_schema_id) { 125 }
let(:city_schema_data) do
{
"subject" => subject_name,
"version" => 123,
"id" => city_schema_id,
"schema" => city_schema
}
end

subject(:fetch_schema_by_body) do
avro.fetch_schema_by_body(schema_name: schema_name, namespace: namespace, subject: subject_name)
end

before do
allow(schema_store).to receive(:find).with(schema_name, namespace).and_return(city_schema)
allow(registry).to receive(:check).with(subject_name, city_schema).and_return(city_schema_data)
end

it 'gets schema from registry' do
expect(fetch_schema_by_body).to eq([city_schema, city_schema_id])
end
end

context 'using register_schema' do
let(:schema_name) { 'schema_name' }

Expand Down
Loading