Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consume all messages on msk topic and then exit with aws lambda #1735

Open
KhalefAhmed opened this issue Jan 28, 2025 · 1 comment
Open

consume all messages on msk topic and then exit with aws lambda #1735

KhalefAhmed opened this issue Jan 28, 2025 · 1 comment

Comments

@KhalefAhmed
Copy link

KhalefAhmed commented Jan 28, 2025

  • Bug Description : The lambda function is unable to consume messages from the configured Kafka topic. After connecting to the Kafka broker, joining the consumer group, and starting to fetch messages, no messages are ultimately consumed.

  • Expected Behavior : The lambda function should be able to connect to the Kafka broker, join the consumer group, and continuously consume messages from the configured topic.

  • Observed Behavior : When the lambda function is executed, it successfully connects to the Kafka broker and joins the consumer group. However, no messages are ultimately consumed, and the lambda function does not appear to be making progress in processing the messages.

Environment

  • AWS: Lambda

  • KafkaJS version: 2.2.4

  • Kafka version: 3.6.0 (MSK)

  • Node.js version: 22

  • Additional Context : The lambda function is set up to consume messages from a Kafka topic, but it's not actually consuming any messages. There seems to be an issue with the connection or configuration that is preventing the consumption of messages.


  • the code of the concerned lambda :
import {Kafka} from 'kafkajs';
import {createMechanism} from '@jm18457/kafkajs-msk-iam-authentication-mechanism';

const config = {
    topic: MSKTopic,
    groupId: test-group,
    brokers: BROKER-LIST,
    region: AWS_REGION
};

const brokerList = config.brokers.split(',');
const kafka = new Kafka({
    brokers: brokerList,
    ssl: true,
    sasl: createMechanism({region: config.region})
});
const consumer = kafka.consumer({groupId: config.groupId});

export const handler = async (event, context) => {
    try {
        await consumer.connect();

        let consumedTopicPartitions = {};
        let processedBatch = true;

        consumer.on(consumer.events.GROUP_JOIN, async ({payload}) => {
            console.log("GROUP_JOIN")
            console.log(payload)
            const {memberAssignment} = payload;
            consumedTopicPartitions = Object.entries(memberAssignment).reduce((topics, [topic, partitions]) => {
                for (const partition in partitions) {
                    topics[`${topic}-${partition}`] = false;
                }
                return topics;
            }, {});

            console.log(consumedTopicPartitions)
        });

        consumer.on(consumer.events.FETCH_START, async () => {
            console.log("FETCH_START")
            if (!processedBatch) {
                await consumer.disconnect();
                process.exit(0);
            }
            processedBatch = false;
        });


        consumer.on(consumer.events.END_BATCH_PROCESS, async ({payload}) => {
            console.log("END_BATCH_PROCESS")

            const {topic, partition, offsetLag} = payload;
            consumedTopicPartitions[`${topic}-${partition}`] = offsetLag === '0';
            if (Object.values(consumedTopicPartitions).every(Boolean)) {
                await consumer.disconnect();
                process.exit(0);
            }
            processedBatch = true;
        });

        await consumer.run({
            eachMessage: async ({topic, partition, message}) => {
                const prefix = `${topic}[${partition} | ${message.offset}] / ${new Date(Number(message.timestamp)).toISOString()}`;
                console.info(`- ${prefix} ${message.value}`);
            }
        });

    } catch (error) {
        console.error(`[consumer] ${error.message}`, error);
        context.fail(error);
    }
};

I have a lag greater than 0 before I launch my lambda and afterwards it doesn't change.
and my lambda is able to connect to the broker and join as consumer group

and I gave full IAM rights to my lambda to access the msk cluster to make sure it's not a rights issue

and finally I know it's not just about kafkajs but also about AWS but I hope you can help me

@KhalefAhmed
Copy link
Author

Hi @Nevon I've been inspired by your code here if you can help when you have time of course
#825 (comment)
Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant