Skip to content

Commit

Permalink
Reconnection improvements and extra metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecosta90 committed Dec 2, 2024
1 parent 522093d commit 7c40dee
Showing 1 changed file with 49 additions and 22 deletions.
71 changes: 49 additions & 22 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
)

var totalMessages uint64
var totalSubscribedChannels int64
var totalConnects uint64
var clusterSlicesMu sync.Mutex

type testResult struct {
Expand All @@ -48,41 +50,58 @@ type testResult struct {
Addresses []string `json:"Addresses"`
}

func subscriberRoutine(mode string, channels []string, printMessages bool, connectionReconnectInterval int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) {
func subscriberRoutine(clientName, mode string, channels []string, printMessages bool, connectionReconnectInterval int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) {
// Tell the caller we've stopped
defer wg.Done()
var reconnectTicker *time.Ticker
if connectionReconnectInterval > 0 {
reconnectTicker = time.NewTicker(time.Duration(connectionReconnectInterval) * time.Second)
reconnectTicker = time.NewTicker(time.Duration(connectionReconnectInterval) * time.Millisecond)
defer reconnectTicker.Stop()
} else {
reconnectTicker = time.NewTicker(1 * time.Second)
reconnectTicker.Stop()
}

var pubsub *redis.PubSub
nChannels := len(channels)

// Helper function to handle subscription based on mode
subscribe := func() {
if pubsub != nil {
// Unsubscribe based on mode before re-subscribing
if mode == "ssubscribe" {
if err := pubsub.SUnsubscribe(ctx, channels...); err != nil {
fmt.Printf("Error during SUnsubscribe: %v\n", err)
if nChannels > 1 {
// Unsubscribe based on mode before re-subscribing
if mode == "ssubscribe" {
if err := pubsub.SUnsubscribe(ctx, channels[1:]...); err != nil {
fmt.Printf("Error during SUnsubscribe: %v\n", err)
}
pubsub.Close()
atomic.AddInt64(&totalSubscribedChannels, int64(-len(channels[1:])))
pubsub = client.SSubscribe(ctx, channels[1:]...)
atomic.AddInt64(&totalSubscribedChannels, int64(len(channels[1:])))
} else {
if err := pubsub.Unsubscribe(ctx, channels[1:]...); err != nil {
fmt.Printf("Error during Unsubscribe: %v\n", err)
pubsub.Close()
atomic.AddInt64(&totalSubscribedChannels, int64(-len(channels[1:])))
pubsub = client.Subscribe(ctx, channels[1:]...)
atomic.AddInt64(&totalSubscribedChannels, int64(len(channels[1:])))
}
}
atomic.AddUint64(&totalConnects, 1)
} else {
if err := pubsub.Unsubscribe(ctx, channels...); err != nil {
fmt.Printf("Error during Unsubscribe: %v\n", err)
}
log.Println(fmt.Sprintf("Skipping (S)UNSUBSCRIBE given client %s had only one channel subscribed in this connection: %v.", clientName, channels))
}
pubsub.Close()
}
switch mode {
case "ssubscribe":
pubsub = client.SSubscribe(ctx, channels...)
default:
pubsub = client.Subscribe(ctx, channels...)
} else {
switch mode {
case "ssubscribe":
pubsub = client.SSubscribe(ctx, channels...)
default:
pubsub = client.Subscribe(ctx, channels...)
}
atomic.AddInt64(&totalSubscribedChannels, int64(len(channels)))
atomic.AddUint64(&totalConnects, 1)
}

}

subscribe()
Expand Down Expand Up @@ -142,6 +161,7 @@ func main() {
json_out_file := flag.String("json-out-file", "", "Name of json output file, if not set, will not print to json.")
client_update_tick := flag.Int("client-update-tick", 1, "client update tick.")
test_time := flag.Int("test-time", 0, "Number of seconds to run the test, after receiving the first message.")
randSeed := flag.Int64("rand-seed", 12345, "Random deterministic seed.")
subscribe_prefix := flag.String("subscriber-prefix", "channel-", "prefix for subscribing to channel, used in conjunction with key-minimum and key-maximum.")
client_output_buffer_limit_pubsub := flag.String("client-output-buffer-limit-pubsub", "", "Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.")
distributeSubscribers := flag.Bool("oss-cluster-api-distribute-subscribers", false, "read cluster slots and distribute subscribers among them.")
Expand All @@ -167,6 +187,8 @@ func main() {
log.Fatal(fmt.Errorf("--messages and --test-time are mutially exclusive ( please specify one or the other )"))
}
log.Println(fmt.Sprintf("pubsub-sub-bench (git_sha1:%s%s)", git_sha, git_dirty_str))
log.Println(fmt.Sprintf("using random seed:%d", *randSeed))
rand.Seed(*randSeed)

ctx := context.Background()
nodeCount := 0
Expand Down Expand Up @@ -292,7 +314,7 @@ func main() {
if *max_channels_per_subscriber == *min_channels_per_subscriber {
n_channels_this_conn = *max_channels_per_subscriber
} else {
n_channels_this_conn = rand.Intn(*max_channels_per_subscriber - *min_channels_per_subscriber)
n_channels_this_conn = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber
}
for channel_this_conn := 1; channel_this_conn < n_channels_this_conn; channel_this_conn++ {
new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum
Expand Down Expand Up @@ -330,12 +352,13 @@ func main() {
if *max_reconnect_interval == *min_reconnect_interval {
connectionReconnectInterval = *max_reconnect_interval
} else {
connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *max_reconnect_interval
connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *min_reconnect_interval
}
if connectionReconnectInterval > 0 {
log.Println(fmt.Sprintf("Using reconnection interval of %d for subscriber: %s", connectionReconnectInterval, subscriberName))
log.Println(fmt.Sprintf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName))
}
go subscriberRoutine(*mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client)
log.Println(fmt.Sprintf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels))
go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client)
}
}
}
Expand Down Expand Up @@ -431,10 +454,11 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw
start := time.Now()
prevTime := time.Now()
prevMessageCount := uint64(0)
prevConnectCount := uint64(0)
messageRateTs := []float64{}

w.Init(os.Stdout, 25, 0, 1, ' ', tabwriter.AlignRight)
fmt.Fprint(w, fmt.Sprintf("Test Time\tTotal Messages\t Message Rate \t"))
fmt.Fprint(w, fmt.Sprintf("Test Time\tTotal Messages\t Message Rate \tConnect Rate \tActive subscriptions\t"))
fmt.Fprint(w, "\n")
w.Flush()
for {
Expand All @@ -444,16 +468,19 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw
now := time.Now()
took := now.Sub(prevTime)
messageRate := float64(totalMessages-prevMessageCount) / float64(took.Seconds())
connectRate := float64(totalConnects-prevConnectCount) / float64(took.Seconds())

if prevMessageCount == 0 && totalMessages != 0 {
start = time.Now()
}
if totalMessages != 0 {
messageRateTs = append(messageRateTs, messageRate)
}
prevMessageCount = totalMessages
prevConnectCount = totalConnects
prevTime = now

fmt.Fprint(w, fmt.Sprintf("%.0f\t%d\t%.2f\t", time.Since(start).Seconds(), totalMessages, messageRate))
fmt.Fprint(w, fmt.Sprintf("%.0f\t%d\t%.2f\t%.2f\t%d\t", time.Since(start).Seconds(), totalMessages, messageRate, connectRate, totalSubscribedChannels))
fmt.Fprint(w, "\r\n")
w.Flush()
if message_limit > 0 && totalMessages >= uint64(message_limit) {
Expand Down

0 comments on commit 7c40dee

Please sign in to comment.