Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using Binary Encode/Decoder #11

Open
malhomaid opened this issue Sep 14, 2020 · 1 comment
Open

Using Binary Encode/Decoder #11

malhomaid opened this issue Sep 14, 2020 · 1 comment

Comments

@malhomaid
Copy link

malhomaid commented Sep 14, 2020

Hi Matt,
Thank you for this project.

I had to use BinaryDecoder to decode my messages but I didn't know how to tell DataFileReader to use the BinaryDecoder to decode my messages so I used GenericDatumReader directly, I'm assuming you are using DataFileReader in case the avro file contained more than one message(which isn't my case at least for now), is this something from the Connect framework ?

Attached my code:

public SchemaAndValue toConnectData(String topic, byte[] value) {
  DatumReader<GenericRecord> datumReader;
  if (avroSchema != null) {
    datumReader = new GenericDatumReader<>(avroSchema);
  } else {
    datumReader = new GenericDatumReader<>();
  }
  GenericRecord instance;

  binaryDecoder = DecoderFactory.get().binaryDecoder(value, binaryDecoder);

  try {
    instance = datumReader.read(null, binaryDecoder);
  } catch (IOException ioe) {
    throw new DataException("Failed to deserialize Avro data from topic %s :".format(topic), ioe);
  }
    if (instance == null) {
      logger.warn("Instance was null");
    }

    if (avroSchema != null) {
      return avroDataHelper.toConnectData(avroSchema, instance);
    } else {
      return avroDataHelper.toConnectData(instance.getSchema(), instance);
    }
}
@Brettuss
Copy link

Brettuss commented Oct 26, 2021

I will preface my comment with - "I AM NOT A JAVA DEVELOPER, AND I DON'T KNOW JAVA."
I will also preface my comment with - "I AM NOT A DEVELOPER AT ALL"

I am a DBA that is dabbling in Data Engineering, but coding is my weakness.

I ran into an issue with a Python producer that was out of my jurisdiction that was producing to a topic using Avro, but was not using Schema Registry.

The serialization of the message value looked something like this:

def serialize(myschema, myobject):
     writer = DatumWriter(myschema)
     bytes_writer = io.BytesIO()
     encoder = BinaryEncoder(bytes_writer)
     writer.write(myobject, encoder)
     raw_bytes = bytes_writer.getvalue()
     return (raw_bytes)    

I was running into an issue with the RegistrylessAvroConverter - it was complaining that the value of the messages was not a valid Avro file. So, I went looking and found this issue from @mhomaid1 and got some feedback from the folks at r/ApacheKafka.

https://www.reddit.com/r/apachekafka/comments/qfpxb9/schema_registryless_avro_python_produced_messages/

His solution worked, for the most part. Here is what mine ended up looking like:

public SchemaAndValue toConnectData(String topic, byte[] value) {
    DatumReader<GenericRecord> datumReader;
    if (avroSchema != null) {
      datumReader = new GenericDatumReader<>(avroSchema);
    } else {
      datumReader = new GenericDatumReader<>();
    }
    GenericRecord instance;

    Decoder binaryDecoder = DecoderFactory.get().binaryDecoder(value, null);

    try {
      instance = datumReader.read(null, binaryDecoder);
    } catch (IOException ioe) {
      throw new DataException("Failed to deserialize Avro data from topic %s :".format(topic), ioe);
    }
      if (instance == null) {
        logger.warn("Instance was null");
      }

      if (avroSchema != null) {
        return avroDataHelper.toConnectData(avroSchema, instance);
      } else {
        return avroDataHelper.toConnectData(instance.getSchema(), instance);
      }
  }

I also had to have the correct imports:

import org.apache.avro.io.*;

I then had to compile while ignoring the test. I am guessing this was because I changed the .java file and the tests were no longer applicable to how the class was coded.

./gradlew build -x test

After that, my connector worked and was able to move data downstream. I am not happy about it, but I do appreciate the work of both @farmdawgnation and @mhomaid1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants