Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating go-release-action to the latest #20

Merged
merged 2 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ jobs:
goos: [linux, darwin]
goarch: [amd64, arm64]
steps:
- uses: actions/checkout@v3
- uses: wangyoucao577/go-release-action@v1.36
- uses: actions/checkout@v4
- uses: wangyoucao577/go-release-action@v1
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
goos: ${{ matrix.goos }}
Expand Down
131 changes: 99 additions & 32 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"os/signal"
"runtime/pprof"
Expand Down Expand Up @@ -47,32 +48,71 @@ type testResult struct {
Addresses []string `json:"Addresses"`
}

func subscriberRoutine(mode, channel string, printMessages bool, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) {
// tell the caller we've stopped
func subscriberRoutine(mode string, channels []string, printMessages bool, connectionReconnectInterval int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) {
// Tell the caller we've stopped
defer wg.Done()
switch mode {
case "ssubscribe":
spubsub := client.SSubscribe(ctx, channel)
defer spubsub.Close()
for {
msg, err := spubsub.ReceiveMessage(ctx)
if err != nil {
panic(err)
}
if printMessages {
fmt.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Payload))
var reconnectTicker *time.Ticker
if connectionReconnectInterval > 0 {
reconnectTicker = time.NewTicker(time.Duration(connectionReconnectInterval) * time.Second)
defer reconnectTicker.Stop()
} else {
reconnectTicker = time.NewTicker(1 * time.Second)
reconnectTicker.Stop()
}

var pubsub *redis.PubSub

// Helper function to handle subscription based on mode
subscribe := func() {
if pubsub != nil {
// Unsubscribe based on mode before re-subscribing
if mode == "ssubscribe" {
if err := pubsub.SUnsubscribe(ctx, channels...); err != nil {
fmt.Printf("Error during SUnsubscribe: %v\n", err)
}
} else {
if err := pubsub.Unsubscribe(ctx, channels...); err != nil {
fmt.Printf("Error during Unsubscribe: %v\n", err)
}
}
atomic.AddUint64(&totalMessages, 1)
pubsub.Close()
}
switch mode {
case "ssubscribe":
pubsub = client.SSubscribe(ctx, channels...)
default:
pubsub = client.Subscribe(ctx, channels...)
}
break
case "subscribe":
fallthrough
default:
pubsub := client.Subscribe(ctx, channel)
defer pubsub.Close()
for {
}

subscribe()

for {
select {
case <-ctx.Done():
// Context cancelled, exit routine
if pubsub != nil {
if mode == "ssubscribe" {
_ = pubsub.SUnsubscribe(ctx, channels...)
} else {
_ = pubsub.Unsubscribe(ctx, channels...)
}
pubsub.Close()
}
return
case <-reconnectTicker.C:
// Reconnect interval triggered, unsubscribe and resubscribe
if reconnectTicker != nil {
subscribe()
}
default:
// Handle messages
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
// Handle Redis connection errors, e.g., reconnect immediately
if err == redis.Nil || err == context.DeadlineExceeded || err == context.Canceled {
continue
}
panic(err)
}
if printMessages {
Expand All @@ -81,7 +121,6 @@ func subscriberRoutine(mode, channel string, printMessages bool, ctx context.Con
atomic.AddUint64(&totalMessages, 1)
}
}

}

func main() {
Expand All @@ -95,6 +134,10 @@ func main() {
channel_minimum := flag.Int("channel-minimum", 1, "channel ID minimum value ( each channel has a dedicated thread ).")
channel_maximum := flag.Int("channel-maximum", 100, "channel ID maximum value ( each channel has a dedicated thread ).")
subscribers_per_channel := flag.Int("subscribers-per-channel", 1, "number of subscribers per channel.")
min_channels_per_subscriber := flag.Int("min-number-channels-per-subscriber", 1, "min number of channels to subscribe to, per connection.")
max_channels_per_subscriber := flag.Int("max-number-channels-per-subscriber", 1, "max number of channels to subscribe to, per connection.")
min_reconnect_interval := flag.Int("min-reconnect-interval", 0, "min reconnect interval. if 0 disable (s)unsubscribe/(s)ubscribe.")
max_reconnect_interval := flag.Int("max-reconnect-interval", 0, "max reconnect interval. if 0 disable (s)unsubscribe/(s)ubscribe.")
messages_per_channel_subscriber := flag.Int64("messages", 0, "Number of total messages per subscriber per channel.")
json_out_file := flag.String("json-out-file", "", "Name of json output file, if not set, will not print to json.")
client_update_tick := flag.Int("client-update-tick", 1, "client update tick.")
Expand Down Expand Up @@ -191,16 +234,19 @@ func main() {
poolSize = subscriptions_per_node
log.Println(fmt.Sprintf("Setting per Node pool size of %d given you haven't specified a value and we have %d Subscriptions per node. You can control this option via --%s=<value>", poolSize, subscriptions_per_node, redisPoolSize))
clusterOptions.PoolSize = poolSize
log.Println("Reloading cluster state given we've changed pool size.")
clusterClient = redis.NewClusterClient(&clusterOptions)
// ReloadState reloads cluster state. It calls ClusterSlots func
// to get cluster slots information.
clusterClient.ReloadState(ctx)
err := clusterClient.Ping(ctx).Err()
if err != nil {
log.Fatal(err)
if *distributeSubscribers {
log.Println("Reloading cluster state given we've changed pool size.")
clusterClient = redis.NewClusterClient(&clusterOptions)
// ReloadState reloads cluster state. It calls ClusterSlots func
// to get cluster slots information.
clusterClient.ReloadState(ctx)
err := clusterClient.Ping(ctx).Err()
if err != nil {
log.Fatal(err)
}
nodeCount, nodeClients, nodesAddresses = updateSecondarySlicesCluster(clusterClient, ctx)
}
nodeCount, nodeClients, nodesAddresses = updateSecondarySlicesCluster(clusterClient, ctx)

}

log.Println(fmt.Sprintf("Detailing final setup used for benchmark."))
Expand Down Expand Up @@ -241,6 +287,18 @@ func main() {
for channel_id := *channel_minimum; channel_id <= *channel_maximum; channel_id++ {
channel := fmt.Sprintf("%s%d", *subscribe_prefix, channel_id)
for channel_subscriber_number := 1; channel_subscriber_number <= *subscribers_per_channel; channel_subscriber_number++ {
channels := []string{channel}
n_channels_this_conn := 1
if *max_channels_per_subscriber == *min_channels_per_subscriber {
n_channels_this_conn = *max_channels_per_subscriber
} else {
n_channels_this_conn = rand.Intn(*max_channels_per_subscriber - *min_channels_per_subscriber)
}
for channel_this_conn := 1; channel_this_conn < n_channels_this_conn; channel_this_conn++ {
new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum
new_channel := fmt.Sprintf("%s%d", *subscribe_prefix, new_channel_id)
channels = append(channels, new_channel)
}
totalCreatedClients++
subscriberName := fmt.Sprintf("subscriber#%d-%s%d", channel_subscriber_number, *subscribe_prefix, channel_id)
var client *redis.Client
Expand Down Expand Up @@ -268,7 +326,16 @@ func main() {
}
}
wg.Add(1)
go subscriberRoutine(*mode, channel, *printMessages, ctx, &wg, client)
connectionReconnectInterval := 0
if *max_reconnect_interval == *min_reconnect_interval {
connectionReconnectInterval = *max_reconnect_interval
} else {
connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *max_reconnect_interval
}
if connectionReconnectInterval > 0 {
log.Println(fmt.Sprintf("Using reconnection interval of %d for subscriber: %s", connectionReconnectInterval, subscriberName))
}
go subscriberRoutine(*mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client)
}
}
}
Expand Down