Skip to content

Commit

Permalink
franz-go: fix 32 bit alignment, fix a few lints
Browse files Browse the repository at this point in the history
Committing this before the next bigger lint-fixing commit

This commit moves a few atomics to the top of structs, ensuring
alignment. This was caught with the new github actions, which is a bit
slow.
  • Loading branch information
twmb committed Oct 11, 2022
1 parent 719c6f4 commit 203a837
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 13 deletions.
4 changes: 2 additions & 2 deletions examples/requesting/request_metadata.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package requesting
package main

import (
"context"
Expand All @@ -12,7 +12,7 @@ import (
"github.com/twmb/franz-go/pkg/kversion"
)

func requestMetadata() {
func main() {
seeds := []string{"localhost:9092"}
client, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
Expand Down
1 change: 1 addition & 0 deletions generate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type (
// The following types can be encoded "compact"; this happens on
// flexible versions. If adding types here, be sure to add the
// AsFromFlexible method below.

String struct {
FromFlexible bool
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) {
// brokerCxn manages an actual connection to a Kafka broker. This is separate
// the broker struct to allow lazy connection (re)creation.
type brokerCxn struct {
throttleUntil int64 // atomic nanosec

conn net.Conn

cl *Client
Expand All @@ -599,8 +601,6 @@ type brokerCxn struct {
mechanism sasl.Mechanism
expiry time.Time

throttleUntil int64 // atomic nanosec

corrID int32

// The following four fields are used for connection reaping.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ func (o Offset) At(at int64) Offset {
}

type consumer struct {
cl *Client

bufferedRecords int64

cl *Client

pausedMu sync.Mutex // grabbed when updating paused
paused atomic.Value // loaded when issuing fetches

Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (*ConsumerBalancer) Balance(map[string]int32) IntoSyncAssignment {
panic("unreachable")
}

// Balance satisfies the GroupMemberBalancerOrError interface.
// BalanceOrError satisfies the GroupMemberBalancerOrError interface.
func (b *ConsumerBalancer) BalanceOrError(topics map[string]int32) (IntoSyncAssignment, error) {
return b.b.Balance(b, topics), b.err
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
)

type producer struct {
bufferedRecords int64
inflight int64 // high 16: # waiters, low 48: # inflight

cl *Client

topicsMu sync.Mutex // locked to prevent concurrent updates; reads are always atomic
Expand All @@ -34,8 +37,6 @@ type producer struct {
unknownTopicsMu sync.Mutex
unknownTopics map[string]*unknownTopicProduces

bufferedRecords int64

id atomic.Value
producingTxn uint32 // 1 if in txn

Expand All @@ -55,8 +56,6 @@ type producer struct {
mu sync.Mutex
c *sync.Cond

inflight int64 // high 16: # waiters, low 48: # inflight

batchPromises ringBatchPromise
promisesMu sync.Mutex

Expand Down Expand Up @@ -484,7 +483,7 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error) {
p.waitBuffer <- struct{}{}
} else if buffered == 0 && atomic.LoadInt32(&p.flushing) > 0 {
p.mu.Lock()
p.mu.Unlock() // nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe.
p.mu.Unlock() //nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe.
p.c.Broadcast()
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/record_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (

// RecordFormatter formats records.
type RecordFormatter struct {
fns []func([]byte, *FetchPartition, *Record) []byte
calls int64
fns []func([]byte, *FetchPartition, *Record) []byte
}

// AppendRecord appends a record to b given the parsed format and returns the
Expand Down

0 comments on commit 203a837

Please sign in to comment.