Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnection Improvements, Channel Metrics, and Random Seed Support #21

Merged
merged 4 commits into from
Dec 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 49 additions & 22 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
)

var totalMessages uint64
var totalSubscribedChannels int64
var totalConnects uint64
var clusterSlicesMu sync.Mutex

type testResult struct {
Expand All @@ -48,41 +50,58 @@ type testResult struct {
Addresses []string `json:"Addresses"`
}

func subscriberRoutine(mode string, channels []string, printMessages bool, connectionReconnectInterval int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) {
func subscriberRoutine(clientName, mode string, channels []string, printMessages bool, connectionReconnectInterval int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) {
// Tell the caller we've stopped
defer wg.Done()
var reconnectTicker *time.Ticker
if connectionReconnectInterval > 0 {
reconnectTicker = time.NewTicker(time.Duration(connectionReconnectInterval) * time.Second)
reconnectTicker = time.NewTicker(time.Duration(connectionReconnectInterval) * time.Millisecond)
defer reconnectTicker.Stop()
} else {
reconnectTicker = time.NewTicker(1 * time.Second)
reconnectTicker.Stop()
}

var pubsub *redis.PubSub
nChannels := len(channels)

// Helper function to handle subscription based on mode
subscribe := func() {
if pubsub != nil {
// Unsubscribe based on mode before re-subscribing
if mode == "ssubscribe" {
if err := pubsub.SUnsubscribe(ctx, channels...); err != nil {
fmt.Printf("Error during SUnsubscribe: %v\n", err)
if nChannels > 1 {
// Unsubscribe based on mode before re-subscribing
if mode == "ssubscribe" {
if err := pubsub.SUnsubscribe(ctx, channels[1:]...); err != nil {
fmt.Printf("Error during SUnsubscribe: %v\n", err)
}
pubsub.Close()
atomic.AddInt64(&totalSubscribedChannels, int64(-len(channels[1:])))
pubsub = client.SSubscribe(ctx, channels[1:]...)
atomic.AddInt64(&totalSubscribedChannels, int64(len(channels[1:])))
} else {
if err := pubsub.Unsubscribe(ctx, channels[1:]...); err != nil {
fmt.Printf("Error during Unsubscribe: %v\n", err)
pubsub.Close()
atomic.AddInt64(&totalSubscribedChannels, int64(-len(channels[1:])))
pubsub = client.Subscribe(ctx, channels[1:]...)
atomic.AddInt64(&totalSubscribedChannels, int64(len(channels[1:])))
}
}
atomic.AddUint64(&totalConnects, 1)
} else {
if err := pubsub.Unsubscribe(ctx, channels...); err != nil {
fmt.Printf("Error during Unsubscribe: %v\n", err)
}
log.Println(fmt.Sprintf("Skipping (S)UNSUBSCRIBE given client %s had only one channel subscribed in this connection: %v.", clientName, channels))
}
pubsub.Close()
}
switch mode {
case "ssubscribe":
pubsub = client.SSubscribe(ctx, channels...)
default:
pubsub = client.Subscribe(ctx, channels...)
} else {
switch mode {
case "ssubscribe":
pubsub = client.SSubscribe(ctx, channels...)
default:
pubsub = client.Subscribe(ctx, channels...)
}
atomic.AddInt64(&totalSubscribedChannels, int64(len(channels)))
atomic.AddUint64(&totalConnects, 1)
}

}

subscribe()
Expand Down Expand Up @@ -142,6 +161,7 @@ func main() {
json_out_file := flag.String("json-out-file", "", "Name of json output file, if not set, will not print to json.")
client_update_tick := flag.Int("client-update-tick", 1, "client update tick.")
test_time := flag.Int("test-time", 0, "Number of seconds to run the test, after receiving the first message.")
randSeed := flag.Int64("rand-seed", 12345, "Random deterministic seed.")
subscribe_prefix := flag.String("subscriber-prefix", "channel-", "prefix for subscribing to channel, used in conjunction with key-minimum and key-maximum.")
client_output_buffer_limit_pubsub := flag.String("client-output-buffer-limit-pubsub", "", "Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.")
distributeSubscribers := flag.Bool("oss-cluster-api-distribute-subscribers", false, "read cluster slots and distribute subscribers among them.")
Expand All @@ -167,6 +187,8 @@ func main() {
log.Fatal(fmt.Errorf("--messages and --test-time are mutially exclusive ( please specify one or the other )"))
}
log.Println(fmt.Sprintf("pubsub-sub-bench (git_sha1:%s%s)", git_sha, git_dirty_str))
log.Println(fmt.Sprintf("using random seed:%d", *randSeed))
rand.Seed(*randSeed)

ctx := context.Background()
nodeCount := 0
Expand Down Expand Up @@ -292,7 +314,7 @@ func main() {
if *max_channels_per_subscriber == *min_channels_per_subscriber {
n_channels_this_conn = *max_channels_per_subscriber
} else {
n_channels_this_conn = rand.Intn(*max_channels_per_subscriber - *min_channels_per_subscriber)
n_channels_this_conn = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber
}
for channel_this_conn := 1; channel_this_conn < n_channels_this_conn; channel_this_conn++ {
new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum
Expand Down Expand Up @@ -330,12 +352,13 @@ func main() {
if *max_reconnect_interval == *min_reconnect_interval {
connectionReconnectInterval = *max_reconnect_interval
} else {
connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *max_reconnect_interval
connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *min_reconnect_interval
}
if connectionReconnectInterval > 0 {
log.Println(fmt.Sprintf("Using reconnection interval of %d for subscriber: %s", connectionReconnectInterval, subscriberName))
log.Println(fmt.Sprintf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName))
}
go subscriberRoutine(*mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client)
log.Println(fmt.Sprintf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels))
go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client)
}
}
}
Expand Down Expand Up @@ -431,10 +454,11 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw
start := time.Now()
prevTime := time.Now()
prevMessageCount := uint64(0)
prevConnectCount := uint64(0)
messageRateTs := []float64{}

w.Init(os.Stdout, 25, 0, 1, ' ', tabwriter.AlignRight)
fmt.Fprint(w, fmt.Sprintf("Test Time\tTotal Messages\t Message Rate \t"))
fmt.Fprint(w, fmt.Sprintf("Test Time\tTotal Messages\t Message Rate \tConnect Rate \tActive subscriptions\t"))
fmt.Fprint(w, "\n")
w.Flush()
for {
Expand All @@ -444,16 +468,19 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw
now := time.Now()
took := now.Sub(prevTime)
messageRate := float64(totalMessages-prevMessageCount) / float64(took.Seconds())
connectRate := float64(totalConnects-prevConnectCount) / float64(took.Seconds())

if prevMessageCount == 0 && totalMessages != 0 {
start = time.Now()
}
if totalMessages != 0 {
messageRateTs = append(messageRateTs, messageRate)
}
prevMessageCount = totalMessages
prevConnectCount = totalConnects
prevTime = now

fmt.Fprint(w, fmt.Sprintf("%.0f\t%d\t%.2f\t", time.Since(start).Seconds(), totalMessages, messageRate))
fmt.Fprint(w, fmt.Sprintf("%.0f\t%d\t%.2f\t%.2f\t%d\t", time.Since(start).Seconds(), totalMessages, messageRate, connectRate, totalSubscribedChannels))
fmt.Fprint(w, "\r\n")
w.Flush()
if message_limit > 0 && totalMessages >= uint64(message_limit) {
Expand Down
Loading