Skip to content

Commit

Permalink
add handling for time.Time in avro encode extractor
Browse files Browse the repository at this point in the history
  • Loading branch information
samirketema committed Jun 12, 2024
1 parent 05dbc27 commit 7239c52
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (e extractor) Extract(v any) (avro.Schema, error) {
}

func (e extractor) extract(path []string, v reflect.Value, t reflect.Type) (avro.Schema, error) {
fmt.Printf("PATH: %+v\n", strings.Join(path, "."))
fmt.Printf("VALUE: %+v\n", v)
fmt.Printf("TYPE: %s\n", t.String())
fmt.Printf("T.KIND: %s\n", t.Kind())
fmt.Printf("-----\n")
if t == nil {
return nil, cerrors.Errorf("%s: can't get schema for untyped nil", strings.Join(path, ".")) // untyped nil
}
Expand Down Expand Up @@ -89,6 +94,10 @@ func (e extractor) extract(path []string, v reflect.Value, t reflect.Type) (avro
case reflect.Map:
return e.extractMap(path, v, t)
case reflect.Struct:
if t.String() == "time.Time" {
s := avro.NewPrimitiveLogicalSchema(avro.TimestampMillis)
return avro.NewPrimitiveSchema(avro.Long, s), nil
}
return e.extractStruct(path, v, t)
}
// Invalid, Uintptr, UnsafePointer, Uint64, Uint, Complex64, Complex128, Chan, Func
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,15 @@ type ExtractAndUploadSchemaStrategy struct {
Subject string
}

func (str ExtractAndUploadSchemaStrategy) GetSchema(ctx context.Context, client *Client, _ log.CtxLogger, sd opencdc.StructuredData) (Schema, sr.SubjectSchema, error) {
func (str ExtractAndUploadSchemaStrategy) GetSchema(ctx context.Context, client *Client, logger log.CtxLogger, sd opencdc.StructuredData) (Schema, sr.SubjectSchema, error) {
sf, ok := DefaultSchemaFactories[str.Type]
if !ok {
return nil, sr.SubjectSchema{}, cerrors.Errorf("unknown schema type %q (%d)", str.Type.String(), str.Type)
}

logger.Debug(ctx).Msgf("AVRO created_at: %+v", sd["created_at"])
logger.Debug(ctx).Msgf("AVRO created_at type: %T", sd["created_at"])

s, err := sf.SchemaForType(sd)
if err != nil {
return nil, sr.SubjectSchema{}, cerrors.Errorf("could not extract avro schema: %w", err)
Expand Down

0 comments on commit 7239c52

Please sign in to comment.