Skip to content

Commit

Permalink
remove unnecessary schema check when create sarama sink.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Oct 29, 2021
1 parent 3390157 commit dc3e683
Showing 1 changed file with 0 additions and 5 deletions.
5 changes: 0 additions & 5 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,11 +398,6 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage,
}

func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) {
scheme := strings.ToLower(sinkURI.Scheme)
if scheme != "kafka" && scheme != "kafka+ssl" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("can't create MQ sink with unsupported scheme: %s", scheme)
}

config := kafka.NewConfig()
if err := config.Initialize(sinkURI, replicaConfig, opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
Expand Down

0 comments on commit dc3e683

Please sign in to comment.