Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to consume all messages on topic and then exit #825

Closed
ajdanaher opened this issue Jul 30, 2020 · 5 comments
Closed

How to consume all messages on topic and then exit #825

ajdanaher opened this issue Jul 30, 2020 · 5 comments
Labels

Comments

@ajdanaher
Copy link

ajdanaher commented Jul 30, 2020

I am using kafkajs/1.12.0 in my application using kafka server on my MAC (version 2.2.1) and node 12.14.1.
Trying to follow example from https://www.npmjs.com/package/kafkajs to make client and server connection but seeing my program is blocked in following -

await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },

it fetches data from kafka and doesn't return. I also tried with setting maxWaitTimeInMs to 0 or 1 in Consumer object but no luck.
My requirement is read all messages from beginning in the topic and once all messages are done, return. but its not happening. I am sure I am missing something in configuration. Can you please suggest..

@goriunov
Copy link
Contributor

await consumer.run will not wait for you to consume all your messages, it just schedules fetch loop... You need to wrap it in your own promise if you want await for all messages also kafkajs will not exist after getting last message it will just wait for more messages so you also have to figure our when you got the last message and manually stop your consumer.

@Nevon Nevon added the question label Jul 30, 2020
@Nevon Nevon changed the title ** Help Wanted ** How to consume all messages on topic and then exit Jul 30, 2020
@Nevon
Copy link
Collaborator

Nevon commented Aug 14, 2020

This is actually a little trickier than you might think. The question is, how do you know that you've reached the last message for all partitions? To know that, we need to know a few things:

  1. How many partitions are there?
  2. What's the highwatermark for each partition?

An added complication is that this could change during the execution of your program. Partitions can be added or removed and new messages can be produced.

Another question is what we should do if there are more than one consumer, in which case we would only ever be assigned a subset of all partitions. Should we exit once we've consumed all messages for the partitions we're assigned? What about the other partitions?

A simple version would be to simply say that we want to consume all the messages for all the partitions that we were assigned when we ran the program. This would mean that it would miss out on messages produced in the meantime, not to mention that there could be other partitions that we're not assigned, but it simplifies our lives a lot:

const topic = 'topic-test'
const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
  await consumer.connect()

  /*
   * 1. We need to know which partitions we are assigned.
   * 2. Which partitions have we consumed the last offset for
   * 3. If all partitions have 0 lag, we exit.
   */

  /*
   * `consumedTopicPartitions` will be an object of all topic-partitions
   * and a boolean indicating whether or not we have consumed all
   * messages in that topic-partition. For example:
   *
   * {
   *   "topic-test-0": false,
   *   "topic-test-1": false,
   *   "topic-test-2": false
   * }
   */
  let consumedTopicPartitions = {}
  consumer.on(consumer.events.GROUP_JOIN, async ({ payload }) => {
    const { memberAssignment } = payload
    consumedTopicPartitions = Object.entries(memberAssignment).reduce(
      (topics, [topic, partitions]) => {
        for (const partition in partitions) {
          topics[`${topic}-${partition}`] = false
        }
        return topics
      },
      {}
    )
  })

  /*
   * This is extremely unergonomic, but if we are currently caught up to the head
   * of all topic-partitions, we won't actually get any batches, which means we'll
   * never find out that we are actually caught up. So as a workaround, what we can do
   * is to check in `FETCH_START` if we have previously made a fetch without
   * processing any batches in between. If so, it means that we received empty
   * fetch responses, meaning there was no more data to fetch.
   *
   * We need to initially set this to true, or we would immediately exit.
   */
  let processedBatch = true
  consumer.on(consumer.events.FETCH_START, async () => {
    if (processedBatch === false) {
      await consumer.disconnect()
      process.exit(0)
    }

    processedBatch = false
  })

  /*
   * Now whenever we have finished processing a batch, we'll update `consumedTopicPartitions`
   * and exit if all topic-partitions have been consumed,
   */
  consumer.on(consumer.events.END_BATCH_PROCESS, async ({ payload }) => {
    const { topic, partition, offsetLag } = payload
    consumedTopicPartitions[`${topic}-${partition}`] = offsetLag === '0'

    if (Object.values(consumedTopicPartitions).every(consumed => Boolean(consumed))) {
      await consumer.disconnect()
      process.exit(0)
    }

    processedBatch = true
  })

  await consumer.subscribe({ topic, fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
      console.log(`- ${prefix} ${message.key}#${message.value}`)
    },
  })
}

run().catch(e => console.error(`[example/consumer] ${e.message}`, e))

const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']

errorTypes.map(type => {
  process.on(type, async e => {
    try {
      console.log(`process.on ${type}`)
      console.error(e)
      await consumer.disconnect()
      process.exit(0)
    } catch (_) {
      process.exit(1)
    }
  })
})

signalTraps.map(type => {
  process.once(type, async () => {
    try {
      await consumer.disconnect()
    } finally {
      process.kill(process.pid, type)
    }
  })
})

This is obviously not as ergonomic as we would like. KafkaJS is designed more towards the long-running continuous processing use-case, rather than the one-shot consume-and-exit use-case. I've often thought about whether we should expose a different kind of consumer for that use-case, but for now you'll have to piece it together yourself.

@ajdanaher
Copy link
Author

Thanks @Nevon and @goriunov.. It clears several concepts.

@TheCodingFreakj
Copy link

@Nevon Hi.. I implemented the solution in the end batch event shown above. I am also prioritizing my topics... However is the condition that you put "all topic-partitions have been consumed" is never happening. so the consumer is not disconnecting. meanwhile when I am sending another webhook for processing , its says consumer is already running .. What's all this happening ? any workaround ?

@ilons
Copy link

ilons commented Feb 3, 2023

@Nevon Hi.. I implemented the solution in the end batch event shown above. I am also prioritizing my topics... However is the condition that you put "all topic-partitions have been consumed" is never happening. so the consumer is not disconnecting. meanwhile when I am sending another webhook for processing , its says consumer is already running .. What's all this happening ? any workaround ?

After spending WAY too much time of mine (and with help from colleagues) I finally got to something which works..
TLDR; this works:

    let processedBatch = true;
    consumer.on(consumer.events.FETCH_START, async () => {
      if (processedBatch === false) {
        const describeGroup = await consumer.describeGroup();
        if (describeGroup.state === "Stable") {
          await consumer.disconnect();
          process.exit(0);
        }
      }

      processedBatch = false;
    });

It ALSO works doing this:


    let processedBatch = true;
    consumer.on(consumer.events.FETCH_START, async () => {
      await new Promise(r => setTimeout(r, 1));
      consumer.disconnect();
      process.exit(0);

      processedBatch = false;
    });

This all leads me to believe there is something happening when calling describeGroup() which makes the disconnect() to work.

Do you have any idea why this happens @Nevon ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants