From 7c40dee5e03093fbeda8c77e2c3ec21836017979 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 2 Dec 2024 17:19:19 +0000 Subject: [PATCH] Reconnection improvements and extra metrics --- subscriber.go | 71 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 22 deletions(-) diff --git a/subscriber.go b/subscriber.go index af11c97..349231e 100644 --- a/subscriber.go +++ b/subscriber.go @@ -30,6 +30,8 @@ const ( ) var totalMessages uint64 +var totalSubscribedChannels int64 +var totalConnects uint64 var clusterSlicesMu sync.Mutex type testResult struct { @@ -48,12 +50,12 @@ type testResult struct { Addresses []string `json:"Addresses"` } -func subscriberRoutine(mode string, channels []string, printMessages bool, connectionReconnectInterval int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) { +func subscriberRoutine(clientName, mode string, channels []string, printMessages bool, connectionReconnectInterval int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) { // Tell the caller we've stopped defer wg.Done() var reconnectTicker *time.Ticker if connectionReconnectInterval > 0 { - reconnectTicker = time.NewTicker(time.Duration(connectionReconnectInterval) * time.Second) + reconnectTicker = time.NewTicker(time.Duration(connectionReconnectInterval) * time.Millisecond) defer reconnectTicker.Stop() } else { reconnectTicker = time.NewTicker(1 * time.Second) @@ -61,28 +63,45 @@ func subscriberRoutine(mode string, channels []string, printMessages bool, conne } var pubsub *redis.PubSub + nChannels := len(channels) // Helper function to handle subscription based on mode subscribe := func() { if pubsub != nil { - // Unsubscribe based on mode before re-subscribing - if mode == "ssubscribe" { - if err := pubsub.SUnsubscribe(ctx, channels...); err != nil { - fmt.Printf("Error during SUnsubscribe: %v\n", err) + if nChannels > 1 { + // Unsubscribe based on mode before re-subscribing + if mode == "ssubscribe" { + if err := pubsub.SUnsubscribe(ctx, channels[1:]...); err != nil { + fmt.Printf("Error during SUnsubscribe: %v\n", err) + } + pubsub.Close() + atomic.AddInt64(&totalSubscribedChannels, int64(-len(channels[1:]))) + pubsub = client.SSubscribe(ctx, channels[1:]...) + atomic.AddInt64(&totalSubscribedChannels, int64(len(channels[1:]))) + } else { + if err := pubsub.Unsubscribe(ctx, channels[1:]...); err != nil { + fmt.Printf("Error during Unsubscribe: %v\n", err) + pubsub.Close() + atomic.AddInt64(&totalSubscribedChannels, int64(-len(channels[1:]))) + pubsub = client.Subscribe(ctx, channels[1:]...) + atomic.AddInt64(&totalSubscribedChannels, int64(len(channels[1:]))) + } } + atomic.AddUint64(&totalConnects, 1) } else { - if err := pubsub.Unsubscribe(ctx, channels...); err != nil { - fmt.Printf("Error during Unsubscribe: %v\n", err) - } + log.Println(fmt.Sprintf("Skipping (S)UNSUBSCRIBE given client %s had only one channel subscribed in this connection: %v.", clientName, channels)) } - pubsub.Close() - } - switch mode { - case "ssubscribe": - pubsub = client.SSubscribe(ctx, channels...) - default: - pubsub = client.Subscribe(ctx, channels...) + } else { + switch mode { + case "ssubscribe": + pubsub = client.SSubscribe(ctx, channels...) + default: + pubsub = client.Subscribe(ctx, channels...) + } + atomic.AddInt64(&totalSubscribedChannels, int64(len(channels))) + atomic.AddUint64(&totalConnects, 1) } + } subscribe() @@ -142,6 +161,7 @@ func main() { json_out_file := flag.String("json-out-file", "", "Name of json output file, if not set, will not print to json.") client_update_tick := flag.Int("client-update-tick", 1, "client update tick.") test_time := flag.Int("test-time", 0, "Number of seconds to run the test, after receiving the first message.") + randSeed := flag.Int64("rand-seed", 12345, "Random deterministic seed.") subscribe_prefix := flag.String("subscriber-prefix", "channel-", "prefix for subscribing to channel, used in conjunction with key-minimum and key-maximum.") client_output_buffer_limit_pubsub := flag.String("client-output-buffer-limit-pubsub", "", "Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.") distributeSubscribers := flag.Bool("oss-cluster-api-distribute-subscribers", false, "read cluster slots and distribute subscribers among them.") @@ -167,6 +187,8 @@ func main() { log.Fatal(fmt.Errorf("--messages and --test-time are mutially exclusive ( please specify one or the other )")) } log.Println(fmt.Sprintf("pubsub-sub-bench (git_sha1:%s%s)", git_sha, git_dirty_str)) + log.Println(fmt.Sprintf("using random seed:%d", *randSeed)) + rand.Seed(*randSeed) ctx := context.Background() nodeCount := 0 @@ -292,7 +314,7 @@ func main() { if *max_channels_per_subscriber == *min_channels_per_subscriber { n_channels_this_conn = *max_channels_per_subscriber } else { - n_channels_this_conn = rand.Intn(*max_channels_per_subscriber - *min_channels_per_subscriber) + n_channels_this_conn = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber } for channel_this_conn := 1; channel_this_conn < n_channels_this_conn; channel_this_conn++ { new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum @@ -330,12 +352,13 @@ func main() { if *max_reconnect_interval == *min_reconnect_interval { connectionReconnectInterval = *max_reconnect_interval } else { - connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *max_reconnect_interval + connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *min_reconnect_interval } if connectionReconnectInterval > 0 { - log.Println(fmt.Sprintf("Using reconnection interval of %d for subscriber: %s", connectionReconnectInterval, subscriberName)) + log.Println(fmt.Sprintf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName)) } - go subscriberRoutine(*mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client) + log.Println(fmt.Sprintf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels)) + go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client) } } } @@ -431,10 +454,11 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw start := time.Now() prevTime := time.Now() prevMessageCount := uint64(0) + prevConnectCount := uint64(0) messageRateTs := []float64{} w.Init(os.Stdout, 25, 0, 1, ' ', tabwriter.AlignRight) - fmt.Fprint(w, fmt.Sprintf("Test Time\tTotal Messages\t Message Rate \t")) + fmt.Fprint(w, fmt.Sprintf("Test Time\tTotal Messages\t Message Rate \tConnect Rate \tActive subscriptions\t")) fmt.Fprint(w, "\n") w.Flush() for { @@ -444,6 +468,8 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw now := time.Now() took := now.Sub(prevTime) messageRate := float64(totalMessages-prevMessageCount) / float64(took.Seconds()) + connectRate := float64(totalConnects-prevConnectCount) / float64(took.Seconds()) + if prevMessageCount == 0 && totalMessages != 0 { start = time.Now() } @@ -451,9 +477,10 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw messageRateTs = append(messageRateTs, messageRate) } prevMessageCount = totalMessages + prevConnectCount = totalConnects prevTime = now - fmt.Fprint(w, fmt.Sprintf("%.0f\t%d\t%.2f\t", time.Since(start).Seconds(), totalMessages, messageRate)) + fmt.Fprint(w, fmt.Sprintf("%.0f\t%d\t%.2f\t%.2f\t%d\t", time.Since(start).Seconds(), totalMessages, messageRate, connectRate, totalSubscribedChannels)) fmt.Fprint(w, "\r\n") w.Flush() if message_limit > 0 && totalMessages >= uint64(message_limit) {