Skip to content

Commit

Permalink
Add flag to configure the CRC (#110)
Browse files Browse the repository at this point in the history
Add the perftest flag
Closes #109
  • Loading branch information
Gsantomaggio authored Nov 18, 2021
1 parent f4b10c2 commit 96d0060
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 11 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,11 @@ With `ConsumerOptions` it is possible to customize the consumer behaviour.
```golang
stream.NewConsumerOptions().
SetConsumerName("my_consumer"). // set a consumer name
SetCRCCheck(false). // Enable/Disable the CRC control.
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
```
Disabling the CRC control can increase the performances.

See also "Offset Start" example in the [examples](./examples/) directory


Expand Down
3 changes: 2 additions & 1 deletion examples/getting_started.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func main() {
handleMessages,
stream.NewConsumerOptions().
SetConsumerName("my_consumer"). // set a consumer name
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
SetOffset(stream.OffsetSpecification{}.First()). // start consuming from the beginning
SetCRCCheck(false)) // Disable crc control, increase the performances
CheckErr(err)
channelClose := consumer.NotifyClose()
// channelClose receives all the closing events, here you can handle the
Expand Down
2 changes: 2 additions & 0 deletions perfTest/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
compression string
exitOnError bool
debugLogs bool
crcCheck bool
runDuration int
)

Expand All @@ -67,6 +68,7 @@ func setupCli(baseCmd *cobra.Command) {
baseCmd.PersistentFlags().BoolVarP(&exitOnError, "exit-on-error", "", true, "Close the app in case of error")
baseCmd.PersistentFlags().BoolVarP(&printStatsV, "print-stats", "", true, "Print stats")
baseCmd.PersistentFlags().BoolVarP(&debugLogs, "debug-logs", "", false, "Enable debug logs")
baseCmd.PersistentFlags().BoolVarP(&crcCheck, "crc-check", "", false, "Enable crc control")
baseCmd.PersistentFlags().StringSliceVarP(&streams, "streams", "", []string{"perf-test-go"}, "Stream names")
baseCmd.PersistentFlags().StringVarP(&maxLengthBytes, "max-length-bytes", "", "0", "Stream max length bytes, e.g. 10MB, 50GB, etc.")
baseCmd.PersistentFlags().IntVarP(&maxAge, "max-age", "", 0, "Stream Age in hours, e.g. 1,2.. 24 , etc.")
Expand Down
2 changes: 1 addition & 1 deletion perfTest/cmd/silent.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func startConsumer(consumerName string, streamName string) error {
streamName,
handleMessages,
stream.NewConsumerOptions().
SetConsumerName(consumerName).SetOffset(offsetSpec))
SetConsumerName(consumerName).SetOffset(offsetSpec).SetCRCCheck(crcCheck))
if err != nil {
return err
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/stream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,28 @@ type ConsumerOptions struct {
autocommit bool
autoCommitStrategy *AutoCommitStrategy
Offset OffsetSpecification
CRCCheck bool
}

func NewConsumerOptions() *ConsumerOptions {
return &ConsumerOptions{
Offset: OffsetSpecification{}.Last(),
autocommit: false,
autoCommitStrategy: NewAutoCommitStrategy()}
autoCommitStrategy: NewAutoCommitStrategy(),
CRCCheck: false,
}
}

func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions {
c.ConsumerName = consumerName
return c
}

func (c *ConsumerOptions) SetCRCCheck(CRCCheck bool) *ConsumerOptions {
c.CRCCheck = CRCCheck
return c
}

func (c *ConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions {
c.autocommit = true
if autoCommitStrategy == nil {
Expand Down
9 changes: 7 additions & 2 deletions pkg/stream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ var _ = Describe("Streaming Consumers", func() {
})

It("Multi Consumers per client", func() {
env, err := NewEnvironment(NewEnvironmentOptions().SetMaxConsumersPerClient(2))
env, err := NewEnvironment(NewEnvironmentOptions().
SetMaxConsumersPerClient(2))
Expect(err).NotTo(HaveOccurred())
streamName := uuid.New().String()
Expect(env.DeclareStream(streamName, nil)).
Expand Down Expand Up @@ -161,7 +162,9 @@ var _ = Describe("Streaming Consumers", func() {
consumer, err := env.NewConsumer(streamName,
func(consumerContext ConsumerContext, message *amqp.Message) {
atomic.AddInt32(&messagesReceived, 1)
}, NewConsumerOptions().SetOffset(OffsetSpecification{}.Offset(50)))
}, NewConsumerOptions().
SetOffset(OffsetSpecification{}.Offset(50)).
SetCRCCheck(true))
Expect(err).NotTo(HaveOccurred())

Eventually(func() int32 {
Expand All @@ -185,6 +188,7 @@ var _ = Describe("Streaming Consumers", func() {
}, NewConsumerOptions().
SetOffset(OffsetSpecification{}.First()).
SetConsumerName("my_auto_consumer").
SetCRCCheck(false).
SetAutoCommit(NewAutoCommitStrategy().
SetCountBeforeStorage(100).
SetFlushInterval(50*time.Second))) // here we set a high value to do not trigger the time
Expand All @@ -206,6 +210,7 @@ var _ = Describe("Streaming Consumers", func() {
}, NewConsumerOptions().
SetOffset(OffsetSpecification{}.First()).
SetConsumerName("my_auto_consumer_timer").
SetCRCCheck(true).
SetAutoCommit(NewAutoCommitStrategy().
SetCountBeforeStorage(10000000). /// We avoid raising the timer
SetFlushInterval(1*time.Second)))
Expand Down
13 changes: 7 additions & 6 deletions pkg/stream/server_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,13 @@ func (c *Client) handleDeliver(r *bufio.Reader) {

/// headers ---> payload -> messages

checkSum := crc32.ChecksumIEEE(bytesBuffer)

if crc != checkSum {
logs.LogError("Error during the checkSum, expected %d, checksum %d", crc, checkSum)
panic("Error during CRC")
} /// ???
if consumer.options.CRCCheck {
checkSum := crc32.ChecksumIEEE(bytesBuffer)
if crc != checkSum {
logs.LogError("Error during the checkSum, expected %d, checksum %d", crc, checkSum)
panic("Error during CRC")
} /// ???
}

bufferReader := bytes.NewReader(bytesBuffer)
dataReader := bufio.NewReader(bufferReader)
Expand Down

0 comments on commit 96d0060

Please sign in to comment.