diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 881451c654..fc564cbb89 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -480,6 +480,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { var err error if msg.Value != nil && msg.Payload != nil { p.log.Error("Can not set Value and Payload both") + request.callback(nil, request.msg, errors.New("can not set Value and Payload both")) return } @@ -493,6 +494,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { if msg.Schema != nil && p.options.Schema != nil && msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() { p.publishSemaphore.Release() + request.callback(nil, request.msg, fmt.Errorf("msg schema can not match with producer schema")) p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic) return } @@ -528,6 +530,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { if err != nil { p.publishSemaphore.Release() p.log.WithError(err).Error("get schema version fail") + request.callback(nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err)) return } p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)