Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumption rate of messages from one topic effects the rate of consumption from another topic #247

Open
omer-riv opened this issue Jan 22, 2025 · 0 comments

Comments

@omer-riv
Copy link

omer-riv commented Jan 22, 2025

Environment Information

  • OS [ubuntu 24.10]:
  • Node Version [20.16.0]:
  • confluent-kafka-javascript version [1.0.0]:

Summary

The following code produces messages to 2 topics and then consumes from the 2 topics with partitionsConsumedConcurrently:2 . When consuming using eachBatch, only entries from the "slow topic" are consumed, blocking the consumption of the "fast topic". In other executions, some of the "fast topic" messages are consumed but are still later blocked by the consumption of "slow topic" messages.

The expected behavior would be to consume all the "fast topic" messages very quickly, and for the existence of another messages in another topic not effect the consumption rate of the fast topic.

Reproduce

import { KafkaJS } from "@confluentinc/kafka-javascript";

import { config } from "../src/config";

const kafka = new KafkaJS.Kafka({
    kafkaJS: {
        brokers: config.kafkaBrokersAddress.split(","),
    },
});

const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

const fastTopicName = "fast-topic5";
const slowTopicName = "slow-topic5";
const consumerGroup = "cg5";

(async () => {
    const admin = kafka.admin();
    await admin.connect();

    await admin.createTopics({
        topics: [
            {
                topic: fastTopicName,
                numPartitions: 1,
            },
            {
                topic: slowTopicName,
                numPartitions: 1,
            },
        ],
    });

    console.log("Partitions added successfully!");
    await admin.disconnect();

    const producer = kafka.producer();
    await producer.connect();

    for (let i = 0; i < 5; i++) {
        const messages = [];
        for (let q = 0; q < 50; q++) {
            messages.push({ headers: {}, value: (q + i * 1000).toString() + " " + "a".repeat(1000) });
        }

        await producer.send({
            topic: slowTopicName,
            messages: messages,
        });

        await producer.send({
            topic: fastTopicName,
            messages: messages,
        });
    }
    await producer.disconnect();

    const consumer = kafka.consumer({
        kafkaJS: {
            groupId: consumerGroup,
            maxWaitTimeInMs: 5000,
            fromBeginning: true,
        },
    });
    await consumer.connect();
    await consumer.subscribe({ topics: [slowTopicName, fastTopicName] });

    const messagesConsumed: Record<string, number> = { [fastTopicName]: 0, [slowTopicName]: 0 };
    await consumer.run({
        partitionsConsumedConcurrently: 2,
        eachBatch: async ({ batch }) => {
            console.log("handling batch", {
                partition: batch.partition,
                topic: batch.topic,
            });

            if (batch.topic === slowTopicName) {
                await sleep(500);
            }

            messagesConsumed[batch.topic] += batch.messages.length;

            console.log("finished batch", {
                time: new Date(),
                partition: batch.partition,
                consumed: JSON.stringify(messagesConsumed),
            });
        },
    });

    await new Promise((resolve) => setTimeout(resolve, 30000));
    await consumer.disconnect();
})();

Output

handling batch { partition: 0, topic: 'slow-topic1' }
finished batch {
  time: 2025-01-22T17:00:55.045Z,
  partition: 0,
  consumed: '{"fast-topic1":0,"slow-topic1":1}'
}
handling batch { partition: 0, topic: 'slow-topic1' }
finished batch {
  time: 2025-01-22T17:00:55.546Z,
  partition: 0,
  consumed: '{"fast-topic1":0,"slow-topic1":2}'
}
handling batch { partition: 0, topic: 'slow-topic1' }
finished batch {
  time: 2025-01-22T17:00:56.048Z,
  partition: 0,
  consumed: '{"fast-topic1":0,"slow-topic1":4}'
}
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant