diff --git a/pkg/sfkafkalib/kafka/kafkaconsumer.go b/pkg/sfkafkalib/kafka/kafkaconsumer.go index 37a9193..847a02b 100644 --- a/pkg/sfkafkalib/kafka/kafkaconsumer.go +++ b/pkg/sfkafkalib/kafka/kafkaconsumer.go @@ -69,10 +69,12 @@ func CreateKafkaConsumer(ctx context.Context, conf *configuration.ConsumerConfig fmt.Println("CONSUMING MESSAGE: " + message.Key__c) if err := handleFunc(ctx, message); err != nil { log.Sugar().ErrorwContext(ctx, "error in kafka handler", zap.Error(err)) + panic(message) } case err := <-errChan: fmt.Println("ERROR") log.Sugar().ErrorwContext(ctx, "error in kafka handler", zap.Error(err)) + panic(err) } } }(ctx, outChan, errChan)