From c27a9858d76c0abbefc217a50e96fc8a4cdd0dd3 Mon Sep 17 00:00:00 2001 From: Zinahia Date: Wed, 21 Aug 2024 13:56:21 +0200 Subject: [PATCH 1/8] Add read_only option to encode method --- .../cached_confluent_schema_registry.rb | 6 +++++- lib/avro_turf/in_memory_cache.rb | 13 +++++++++++++ lib/avro_turf/messaging.rb | 16 +++++++++++++++- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/lib/avro_turf/cached_confluent_schema_registry.rb b/lib/avro_turf/cached_confluent_schema_registry.rb index b0934e9..b8c74c0 100644 --- a/lib/avro_turf/cached_confluent_schema_registry.rb +++ b/lib/avro_turf/cached_confluent_schema_registry.rb @@ -17,13 +17,17 @@ def initialize(upstream, cache: nil) end # Delegate the following methods to the upstream - %i(subjects subject_versions schema_subject_versions check compatible? + %i(subjects subject_versions schema_subject_versions compatible? global_config update_global_config subject_config update_subject_config).each do |name| define_method(name) do |*args| instance_variable_get(:@upstream).send(name, *args) end end + def check(subject, schema) + @cache.lookup_data_by_schema(subject, schema) || @cache.store_data_by_schema(subject, schema, @upstream.check(subject, schema)) + end + def fetch(id) @cache.lookup_by_id(id) || @cache.store_by_id(id, @upstream.fetch(id)) end diff --git a/lib/avro_turf/in_memory_cache.rb b/lib/avro_turf/in_memory_cache.rb index d02026b..4a5f44d 100644 --- a/lib/avro_turf/in_memory_cache.rb +++ b/lib/avro_turf/in_memory_cache.rb @@ -6,6 +6,7 @@ def initialize @schemas_by_id = {} @ids_by_schema = {} @schema_by_subject_version = {} + @data_by_schema = {} end def lookup_by_id(id) @@ -21,11 +22,23 @@ def lookup_by_schema(subject, schema) @ids_by_schema[key] end + def lookup_data_by_schema(subject, schema) + key = [subject, schema] + @data_by_schema[key] + end + def store_by_schema(subject, schema, id) key = [subject, schema] @ids_by_schema[key] = id end + def store_data_by_schema(subject, schema, data) + return unless data + + key = [subject, schema] + @data_by_schema[key] = data + end + def lookup_by_version(subject, version) key = "#{subject}#{version}" @schema_by_subject_version[key] diff --git a/lib/avro_turf/messaging.rb b/lib/avro_turf/messaging.rb index dea1bc3..955447d 100644 --- a/lib/avro_turf/messaging.rb +++ b/lib/avro_turf/messaging.rb @@ -120,13 +120,19 @@ def initialize( # validate - The boolean for performing complete message validation before # encoding it, Avro::SchemaValidator::ValidationError with # a descriptive message will be raised in case of invalid message. + # read_only - The boolean that indicates whether or not the schema should be + # registered in case it does not exist, or if it should be fetched + # from the registry without registering it (read_only: true). # # Returns the encoded data as a String. - def encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil, schema_id: nil, validate: false) + def encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil, schema_id: nil, validate: false, + read_only: false) schema, schema_id = if schema_id fetch_schema_by_id(schema_id) elsif subject && version fetch_schema(subject: subject, version: version) + elsif schema_name && read_only + fetch_schema_by_body(subject: subject, schema_name: schema_name, namespace: namespace) elsif schema_name register_schema(subject: subject, schema_name: schema_name, namespace: namespace) else @@ -228,6 +234,14 @@ def fetch_schema_by_id(schema_id) [schema, schema_id] end + def fetch_schema_by_body(schema_name:, subject: nil, namespace: nil) + schema = @schema_store.find(schema_name, namespace) + schema_data = @registry.check(subject || schema.fullname, schema) + raise SchemaNotFoundError.new("Schema with structure: #{schema} not found on registry") unless schema_data + + [schema, schema_data.fetch('id')] + end + # Schemas are registered under the full name of the top level Avro record # type, or `subject` if it's provided. def register_schema(schema_name:, subject: nil, namespace: nil) From 2c0cc2394cb0b4d059084f07c408af13fe059239 Mon Sep 17 00:00:00 2001 From: Zinahia Date: Wed, 21 Aug 2024 18:00:00 +0200 Subject: [PATCH 2/8] Add data_by_schema to disk cache --- lib/avro_turf/disk_cache.rb | 19 ++++++ lib/avro_turf/in_memory_cache.rb | 1 - ...k_cached_confluent_schema_registry_spec.rb | 60 +++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) diff --git a/lib/avro_turf/disk_cache.rb b/lib/avro_turf/disk_cache.rb index 54dc88f..5b6f4f4 100644 --- a/lib/avro_turf/disk_cache.rb +++ b/lib/avro_turf/disk_cache.rb @@ -16,6 +16,10 @@ def initialize(disk_path, logger: Logger.new($stdout)) @schemas_by_subject_version_path = File.join(disk_path, 'schemas_by_subject_version.json') @schemas_by_subject_version = {} + + @data_by_schema_path = File.join(disk_path, 'data_by_schema.json') + hash = read_from_disk_cache(@data_by_schema_path) + @data_by_schema = hash || {} end # override @@ -40,6 +44,12 @@ def lookup_by_schema(subject, schema) @ids_by_schema[key] end + # override to use a json serializable cache key + def lookup_data_by_schema(subject, schema) + key = "#{subject}#{schema}" + @data_by_schema[key] + end + # override to use a json serializable cache key and update the file cache def store_by_schema(subject, schema, id) key = "#{subject}#{schema}" @@ -49,6 +59,15 @@ def store_by_schema(subject, schema, id) id end + def store_data_by_schema(subject, schema, data) + return unless data + + key = "#{subject}#{schema}" + @data_by_schema[key] = data + write_to_disk_cache(@data_by_schema_path, @data_by_schema) + data + end + # checks instance var (in-memory cache) for schema # checks disk cache if in-memory cache doesn't exists # if file exists but no in-memory cache, read from file and sync in-memory cache diff --git a/lib/avro_turf/in_memory_cache.rb b/lib/avro_turf/in_memory_cache.rb index 4a5f44d..3e8ca09 100644 --- a/lib/avro_turf/in_memory_cache.rb +++ b/lib/avro_turf/in_memory_cache.rb @@ -1,7 +1,6 @@ # A cache for the CachedConfluentSchemaRegistry. # Simply stores the schemas and ids in in-memory hashes. class AvroTurf::InMemoryCache - def initialize @schemas_by_id = {} @ids_by_schema = {} diff --git a/spec/disk_cached_confluent_schema_registry_spec.rb b/spec/disk_cached_confluent_schema_registry_spec.rb index b0932fc..99cbf45 100644 --- a/spec/disk_cached_confluent_schema_registry_spec.rb +++ b/spec/disk_cached_confluent_schema_registry_spec.rb @@ -222,6 +222,66 @@ end end + describe "#check" do + let(:city_name) { "a_city" } + let(:schema_data) do + { + subject: subject, + version: version, + id: id, + schema: schema + }.to_json + end + + let(:city_schema_data) do + { + subject: city_name, + version: version, + id: city_id, + schema: city_schema + }.to_json + end + + let(:cache_before) do + { + "#{subject}#{schema}" => schema_data + } + end + + let(:cache_after) do + { + "#{subject}#{schema}" => schema_data, + "#{city_name}#{city_schema}" => city_schema_data + } + end + + # setup the disk cache to avoid performing the upstream fetch + before do + store_cache("data_by_schema.json", cache_before) + allow(upstream).to receive(:check).with(subject, schema).and_return(schema_data) + allow(upstream).to receive(:check).with(city_name, city_schema).and_return(city_schema_data) + end + + context "when the schema is not found in the cache" do + it "makes only one request using upstream" do + expect(registry.check(city_name, city_schema)).to eq(city_schema_data) + expect(registry.check(city_name, city_schema)).to eq(city_schema_data) + expect(upstream).to have_received(:check).with(city_name, city_schema).exactly(1).times + expect(load_cache("data_by_schema.json")).to eq cache_after + end + end + + context "when schema is already in the cache" do + it "uses preloaded disk cache" do + # multiple calls return same result, with zero upstream calls + expect(registry.check(subject, schema)).to eq(schema_data) + expect(registry.check(subject, schema)).to eq(schema_data) + expect(upstream).to have_received(:check).exactly(0).times + expect(load_cache("data_by_schema.json")).to eq cache_before + end + end + end + it_behaves_like "a confluent schema registry client" do let(:upstream) { AvroTurf::ConfluentSchemaRegistry.new(registry_url, logger: logger) } let(:registry) { described_class.new(upstream) } From ee71b704ed9119d2c797c136437be277d08e5fa9 Mon Sep 17 00:00:00 2001 From: Zinahia Date: Wed, 21 Aug 2024 18:06:38 +0200 Subject: [PATCH 3/8] Add spec for cached check --- spec/cached_confluent_schema_registry_spec.rb | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/spec/cached_confluent_schema_registry_spec.rb b/spec/cached_confluent_schema_registry_spec.rb index f6371a7..f37648a 100644 --- a/spec/cached_confluent_schema_registry_spec.rb +++ b/spec/cached_confluent_schema_registry_spec.rb @@ -6,6 +6,7 @@ let(:upstream) { instance_double(AvroTurf::ConfluentSchemaRegistry) } let(:registry) { described_class.new(upstream) } let(:id) { rand(999) } + let(:subject_name) { 'a_subject' } let(:schema) do { type: "record", @@ -25,8 +26,6 @@ end describe "#register" do - let(:subject_name) { "a_subject" } - it "caches the result of register" do # multiple calls return same result, with only one upstream call allow(upstream).to receive(:register).with(subject_name, schema).and_return(id) @@ -36,8 +35,29 @@ end end + describe "#check" do + let(:schema_data) do + { + subject: subject_name, + version: 123, + id: id, + schema: schema + }.to_json + end + + before do + allow(upstream).to receive(:check).with(subject_name, schema).and_return(schema_data) + end + + it "caches the result of check" do + # multiple calls return same result, with only one upstream call + expect(registry.check(subject_name, schema)).to eq(schema_data) + expect(registry.check(subject_name, schema)).to eq(schema_data) + expect(upstream).to have_received(:check).exactly(1).times + end + end + describe '#subject_version' do - let(:subject_name) { 'a_subject' } let(:version) { 1 } let(:schema_with_meta) do { From bfab2da915b79809066dc0b99779aaa4d6564d60 Mon Sep 17 00:00:00 2001 From: Zinahia Date: Thu, 22 Aug 2024 13:48:34 +0200 Subject: [PATCH 4/8] Add specs and fix fake prefixed confluent registry check method --- lib/avro_turf/messaging.rb | 2 +- ...efixed_confluent_schema_registry_server.rb | 3 +- spec/messaging_spec.rb | 52 +++++++++++++++++++ 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/lib/avro_turf/messaging.rb b/lib/avro_turf/messaging.rb index 955447d..dcef776 100644 --- a/lib/avro_turf/messaging.rb +++ b/lib/avro_turf/messaging.rb @@ -239,7 +239,7 @@ def fetch_schema_by_body(schema_name:, subject: nil, namespace: nil) schema_data = @registry.check(subject || schema.fullname, schema) raise SchemaNotFoundError.new("Schema with structure: #{schema} not found on registry") unless schema_data - [schema, schema_data.fetch('id')] + [schema, schema_data["id"]] end # Schemas are registered under the full name of the top level Avro record diff --git a/lib/avro_turf/test/fake_prefixed_confluent_schema_registry_server.rb b/lib/avro_turf/test/fake_prefixed_confluent_schema_registry_server.rb index e4168ad..44a3195 100644 --- a/lib/avro_turf/test/fake_prefixed_confluent_schema_registry_server.rb +++ b/lib/avro_turf/test/fake_prefixed_confluent_schema_registry_server.rb @@ -64,7 +64,8 @@ class FakePrefixedConfluentSchemaRegistryServer < FakeConfluentSchemaRegistrySer # Note: this does not actually handle the same schema registered under # multiple subjects - schema_id = SCHEMAS.index(schema) + context, _subject = parse_qualified_subject(params[:subject]) + schema_id = SCHEMAS[context].index(schema) halt(404, SCHEMA_NOT_FOUND) unless schema_id diff --git a/spec/messaging_spec.rb b/spec/messaging_spec.rb index 36c15d2..1e21ec0 100644 --- a/spec/messaging_spec.rb +++ b/spec/messaging_spec.rb @@ -36,6 +36,24 @@ } AVSC end + + let(:city_message) { { "name" => "Paris" } } + let(:city_schema_json) do + <<-AVSC + { + "name": "city", + "type": "record", + "fields": [ + { + "type": "string", + "name": "name" + } + ] + } + AVSC + end + + let(:city_schema) { Avro::Schema.parse(city_schema_json) } let(:schema) { Avro::Schema.parse(schema_json) } before do @@ -49,6 +67,7 @@ before do define_schema "person.avsc", schema_json + define_schema "city.avsc", city_schema_json end shared_examples_for "encoding and decoding with the schema from schema store" do @@ -92,6 +111,11 @@ expect { avro.encode(message, subject: 'missing', version: 1) }.to raise_error(AvroTurf::SchemaNotFoundError) end + it 'raises AvroTurf::SchemaNotFoundError when the schema does not exist on registry and read_only true' do + expect { avro.encode(city_message, schema_name: 'city', read_only: true) }. + to raise_error(AvroTurf::SchemaNotFoundError, "Schema with structure: #{city_schema} not found on registry") + end + it 'caches parsed schemas for decoding' do data = avro.encode(message, subject: 'person', version: 1) avro.decode(data) @@ -364,6 +388,34 @@ end end + context 'using fetch_schema_by_body' do + let(:subject_name) { 'city' } + let(:schema_name) { 'city' } + let(:namespace) { 'namespace' } + let(:city_schema_id) { 125 } + let(:city_schema_data) do + { + "subject" => subject_name, + "version" => 123, + "id" => city_schema_id, + "schema" => city_schema.to_s + } + end + + subject(:fetch_schema_by_body) do + avro.fetch_schema_by_body(schema_name: schema_name, namespace: namespace, subject: subject_name) + end + + before do + allow(schema_store).to receive(:find).with(schema_name, namespace).and_return(city_schema) + allow(registry).to receive(:check).with(subject_name, city_schema).and_return(city_schema_data) + end + + it 'gets schema from registry' do + expect(fetch_schema_by_body).to eq([city_schema, city_schema_id]) + end + end + context 'using register_schema' do let(:schema_name) { 'schema_name' } From 34daf707afab30e50667b87429e240e2be83dafc Mon Sep 17 00:00:00 2001 From: Zinahia Date: Thu, 22 Aug 2024 17:13:45 +0200 Subject: [PATCH 5/8] Stringify keys to ensure specs are the same as the real scenario --- spec/cached_confluent_schema_registry_spec.rb | 10 +++++----- ...k_cached_confluent_schema_registry_spec.rb | 20 +++++++++---------- spec/messaging_spec.rb | 2 +- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/spec/cached_confluent_schema_registry_spec.rb b/spec/cached_confluent_schema_registry_spec.rb index f37648a..384b852 100644 --- a/spec/cached_confluent_schema_registry_spec.rb +++ b/spec/cached_confluent_schema_registry_spec.rb @@ -38,11 +38,11 @@ describe "#check" do let(:schema_data) do { - subject: subject_name, - version: 123, - id: id, - schema: schema - }.to_json + "subject" => subject_name, + "version" => 123, + "id" => id, + "schema" => schema + } end before do diff --git a/spec/disk_cached_confluent_schema_registry_spec.rb b/spec/disk_cached_confluent_schema_registry_spec.rb index 99cbf45..253d5d3 100644 --- a/spec/disk_cached_confluent_schema_registry_spec.rb +++ b/spec/disk_cached_confluent_schema_registry_spec.rb @@ -226,20 +226,20 @@ let(:city_name) { "a_city" } let(:schema_data) do { - subject: subject, - version: version, - id: id, - schema: schema - }.to_json + "subject" => subject, + "version" => version, + "id" => id, + "schema" => schema + } end let(:city_schema_data) do { - subject: city_name, - version: version, - id: city_id, - schema: city_schema - }.to_json + "subject" => city_name, + "version" => version, + "id" => city_id, + "schema" => city_schema + } end let(:cache_before) do diff --git a/spec/messaging_spec.rb b/spec/messaging_spec.rb index 1e21ec0..e3d0df0 100644 --- a/spec/messaging_spec.rb +++ b/spec/messaging_spec.rb @@ -398,7 +398,7 @@ "subject" => subject_name, "version" => 123, "id" => city_schema_id, - "schema" => city_schema.to_s + "schema" => city_schema } end From dbf2f9468efceb9c369e3ad34a2cd6cdc82d5da4 Mon Sep 17 00:00:00 2001 From: Zinahia Date: Thu, 22 Aug 2024 17:14:41 +0200 Subject: [PATCH 6/8] Add README explanation of the read_only option --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 78fa549..899f0a6 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,9 @@ avro.encode({ "name" => "Jane", "age" => 28 }, schema_name: "person") # Data can be validated before encoding to get a description of problem through # Avro::SchemaValidator::ValidationError exception avro.encode({ "titl" => "hello, world" }, schema_name: "person", validate: true) + +# If you do not want to register the schema in case it does not exist, you can pass the read_only option +avro.encode({ "name" => "Jane", "age" => 28 }, schema_name: "person", read_only: true) ``` ### Inter-schema references From 29bde8070975471765a771be5fa7d756f12da2bf Mon Sep 17 00:00:00 2001 From: Zinahia Date: Thu, 22 Aug 2024 17:27:09 +0200 Subject: [PATCH 7/8] Add spec for encoding the message with read_only: true --- spec/messaging_spec.rb | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/spec/messaging_spec.rb b/spec/messaging_spec.rb index e3d0df0..d737260 100644 --- a/spec/messaging_spec.rb +++ b/spec/messaging_spec.rb @@ -116,6 +116,11 @@ to raise_error(AvroTurf::SchemaNotFoundError, "Schema with structure: #{city_schema} not found on registry") end + it 'encodes with read_only true when the schema exists on the registry' do + data = avro.encode(message, schema_name: 'person', read_only: true) + expect(avro.decode(data, schema_name: 'person')).to eq message + end + it 'caches parsed schemas for decoding' do data = avro.encode(message, subject: 'person', version: 1) avro.decode(data) From c7d0c75e691af008cb078d9a89d0d6ae593bc942 Mon Sep 17 00:00:00 2001 From: Zinahia Date: Fri, 23 Aug 2024 10:00:54 +0200 Subject: [PATCH 8/8] Improve parameter name --- README.md | 4 ++-- lib/avro_turf/messaging.rb | 38 +++++++++++++++++++------------------- spec/messaging_spec.rb | 8 ++++---- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 899f0a6..05f4945 100644 --- a/README.md +++ b/README.md @@ -93,8 +93,8 @@ avro.encode({ "name" => "Jane", "age" => 28 }, schema_name: "person") # Avro::SchemaValidator::ValidationError exception avro.encode({ "titl" => "hello, world" }, schema_name: "person", validate: true) -# If you do not want to register the schema in case it does not exist, you can pass the read_only option -avro.encode({ "name" => "Jane", "age" => 28 }, schema_name: "person", read_only: true) +# If you do not want to register the schema in case it does not exist, you can pass the register_schemas option as false +avro.encode({ "name" => "Jane", "age" => 28 }, schema_name: "person", register_schemas: false) ``` ### Inter-schema references diff --git a/lib/avro_turf/messaging.rb b/lib/avro_turf/messaging.rb index dcef776..f2a0c3c 100644 --- a/lib/avro_turf/messaging.rb +++ b/lib/avro_turf/messaging.rb @@ -106,32 +106,32 @@ def initialize( # Encodes a message using the specified schema. # - # message - The message that should be encoded. Must be compatible with - # the schema. - # schema_name - The String name of the schema that should be used to encode - # the data. - # namespace - The namespace of the schema (optional). - # subject - The subject name the schema should be registered under in - # the schema registry (optional). - # version - The integer version of the schema that should be used to decode - # the data. Must match the schema used when encoding (optional). - # schema_id - The integer id of the schema that should be used to encode - # the data. - # validate - The boolean for performing complete message validation before - # encoding it, Avro::SchemaValidator::ValidationError with - # a descriptive message will be raised in case of invalid message. - # read_only - The boolean that indicates whether or not the schema should be - # registered in case it does not exist, or if it should be fetched - # from the registry without registering it (read_only: true). + # message - The message that should be encoded. Must be compatible with + # the schema. + # schema_name - The String name of the schema that should be used to encode + # the data. + # namespace - The namespace of the schema (optional). + # subject - The subject name the schema should be registered under in + # the schema registry (optional). + # version - The integer version of the schema that should be used to decode + # the data. Must match the schema used when encoding (optional). + # schema_id - The integer id of the schema that should be used to encode + # the data. + # validate - The boolean for performing complete message validation before + # encoding it, Avro::SchemaValidator::ValidationError with + # a descriptive message will be raised in case of invalid message. + # register_schemas - The boolean that indicates whether or not the schema should be + # registered in case it does not exist, or if it should be fetched + # from the registry without registering it (register_schemas: false). # # Returns the encoded data as a String. def encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil, schema_id: nil, validate: false, - read_only: false) + register_schemas: true) schema, schema_id = if schema_id fetch_schema_by_id(schema_id) elsif subject && version fetch_schema(subject: subject, version: version) - elsif schema_name && read_only + elsif schema_name && !register_schemas fetch_schema_by_body(subject: subject, schema_name: schema_name, namespace: namespace) elsif schema_name register_schema(subject: subject, schema_name: schema_name, namespace: namespace) diff --git a/spec/messaging_spec.rb b/spec/messaging_spec.rb index d737260..82bc19a 100644 --- a/spec/messaging_spec.rb +++ b/spec/messaging_spec.rb @@ -111,13 +111,13 @@ expect { avro.encode(message, subject: 'missing', version: 1) }.to raise_error(AvroTurf::SchemaNotFoundError) end - it 'raises AvroTurf::SchemaNotFoundError when the schema does not exist on registry and read_only true' do - expect { avro.encode(city_message, schema_name: 'city', read_only: true) }. + it 'raises AvroTurf::SchemaNotFoundError when the schema does not exist on registry and register_schemas false' do + expect { avro.encode(city_message, schema_name: 'city', register_schemas: false) }. to raise_error(AvroTurf::SchemaNotFoundError, "Schema with structure: #{city_schema} not found on registry") end - it 'encodes with read_only true when the schema exists on the registry' do - data = avro.encode(message, schema_name: 'person', read_only: true) + it 'encodes with register_schemas false when the schema exists on the registry' do + data = avro.encode(message, schema_name: 'person', register_schemas: false) expect(avro.decode(data, schema_name: 'person')).to eq message end