Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tombstone messages appear in tableview entries #292

Closed
jim-glasson opened this issue Feb 4, 2025 · 5 comments · Fixed by #293
Closed

Tombstone messages appear in tableview entries #292

jim-glasson opened this issue Feb 4, 2025 · 5 comments · Fixed by #293

Comments

@jim-glasson
Copy link

When using TableView, if a "tombstone" message is published to delete a key from the underlying compacted topic, the tombstone message appears as an entry in the TableView with an empty value. If compaction is not run, any newly created TableView will also contain the tombstone message in its entries. If compaction is run, then any TableView created afterwards will not include the tombstone entries, however any previously existing TableView will continue to include the tombstone entries.

Expected behavior: when a tombstone message is sent, the tableview entry with the same key as the tombstone message will be removed from the entries of any existing tableviews, and will not appear in the entries of any tableviews created in future. The removal should not require compaction to be run.

I am using version 3.6.1 of the Pulsar.Client library and .NET 8.0.

The following code reproduces the issue. There are some pertinent details in comments in the code.

using System.Text;
using Pulsar.Client.Api;

namespace PulsarTableViewWriter;

class Program
{
    static async Task Main(string[] args)
    {
        // Pulsar is running locally in docker:
        // docker run -d --name pulsar -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:4.0.2 bin/pulsar standalone
        const string pulsarServiceUrl = "pulsar://localhost:6650";
        const string topicName = "persistent://public/default/mytopic";

        var client = await new PulsarClientBuilder()
            .ServiceUrl(pulsarServiceUrl)
            .BuildAsync();
        
        var producer = await client.NewProducer()
            .Topic(topicName)
            .CreateAsync();

        // Create 10 keyed messages.
        for (int counter = 0; counter < 10; counter++)
        {
            var message = producer.NewMessage(Encoding.Unicode.GetBytes($"{{\"id\":{counter}}}"), counter.ToString());
            await producer.SendAsync(message);
        }

        var reader = new TableViewReader(client);
        reader.StartAsync(topicName);

        await Task.Delay(5000);
        
        // Send some tombstone messages to delete previous keys.
        for (int counter = 3; counter < 6; counter++)
        {
            // I couldn't find any definitive examples showing what value to use
            // here, but using "null as byte[]" instead of "new byte[0]" caused
            // the subsequent SendAsync command to hang, and no message was sent.
            // "new byte[0]" seems to produce the desired effect if disregarding
            // the unexpected tableview behavior.
            
            // var message = producer.NewMessage(null as byte[], counter.ToString()); // Causes SendAsync() to hang indefinitely.
            
            var message = producer.NewMessage(new byte[0], counter.ToString());
            await producer.SendAsync(message);
        }

        // The tableview's entries include the tombstone messages.
        // Placing a breakpoint here and compacting the topic
        // before proceeding won't make any difference - the
        // tableview's entries will still include the tombstone messages 
        await Task.Delay(10000);
        await reader.StopAsync();
        
        // The new tableview's entries also include the tombstone messages.
        // Placing a breakpoint here and compacting the topic before the
        // new tableview is created will cause the keys with the tombstone
        // messages to disappear.
        reader = new TableViewReader(client);
        reader.StartAsync(topicName);

        await Task.Delay(10000);

        await reader.StopAsync();
    }

    private class TableViewReader(PulsarClient pulsarClient)
    {
        private CancellationTokenSource cts = new CancellationTokenSource();
        private bool stopping = false;

        public async Task StartAsync(string topicName)
        {
            string subscriptionName = Guid.NewGuid().ToString(); // Subscription name

            var tableView = await pulsarClient.NewTableViewBuilder(Schema.BYTES())
                .Topic(topicName)
                .AutoUpdatePartitionsInterval(TimeSpan.FromSeconds(1))
                .CreateAsync();

            while (!stopping)
            {
                Console.Clear();
                await Task.Delay(200, cts.Token);
                
                foreach (var entry in tableView)
                {
                    Console.WriteLine($"Table entry: {entry.Key} {Encoding.Unicode.GetString(entry.Value)}");
                }

                if (tableView.Any(e => e.Value.Length == 0))
                {
                    Console.WriteLine("Tombstone messages are appearing in the tableview entries.");
                    Console.WriteLine("They will continue to appear after compaction is run unless a new tableview is created.");
                }
                
                await Task.Delay(2000, cts.Token);
            }
        }

        public async Task StopAsync()
        {
            await cts.CancelAsync();
        }
    }
}


@Lanayx
Copy link
Member

Lanayx commented Feb 5, 2025

Hi! I hope @nodece can help as a developer of the tableview feature

@Lanayx
Copy link
Member

Lanayx commented Feb 8, 2025

Thank you @nodece ! This should be fixed in 3.7.0

@RobertIndie
Copy link
Contributor

RobertIndie commented Feb 9, 2025

Hi @jim-glasson , In addition to this fix #293, you should also use the null value instead of the new byte[0] to send the tombstone message.

var message = producer.NewMessage(null, counter.ToString());

Could you use the latest version and try again? The producer didn't hang during my testing. If it still does, it might be a different issue.

@jim-glasson
Copy link
Author

@RobertIndie I have upgraded the library reference to 3.7.0 and changed to using null as byte[] rather than new byte[0] in my test harness and it is now working as expected. Thank you @nodece @RobertIndie @Lanayx!

Is 3.7.0 considered to be a "stable release"?

@Lanayx
Copy link
Member

Lanayx commented Feb 11, 2025

Is 3.7.0 considered to be a "stable release"?

It is as stable as any other release

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants