diff --git a/go.mod b/go.mod index 84f61fdee02..5b38d3219b5 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( github.com/uber/jaeger-client-go v2.30.0+incompatible go.uber.org/atomic v1.11.0 go.uber.org/goleak v1.3.0 - golang.org/x/crypto v0.31.0 + golang.org/x/crypto v0.32.0 golang.org/x/net v0.33.0 golang.org/x/sync v0.10.0 golang.org/x/time v0.8.0 @@ -78,9 +78,9 @@ require ( github.com/shirou/gopsutil/v4 v4.24.12 github.com/thanos-io/objstore v0.0.0-20250120094545-4b72edf06a59 github.com/tjhop/slog-gokit v0.1.2 - github.com/twmb/franz-go v1.18.0 + github.com/twmb/franz-go v1.18.1 github.com/twmb/franz-go/pkg/kadm v1.14.0 - github.com/twmb/franz-go/pkg/kfake v0.0.0-20241202133023-293b7c4c56bb + github.com/twmb/franz-go/pkg/kfake v0.0.0-20250121044851-f30c518d6b72 github.com/twmb/franz-go/pkg/kmsg v1.9.0 github.com/twmb/franz-go/plugin/kotel v1.5.0 github.com/twmb/franz-go/plugin/kprom v1.1.0 @@ -89,7 +89,7 @@ require ( go.opentelemetry.io/otel v1.33.0 go.opentelemetry.io/otel/trace v1.33.0 go.uber.org/multierr v1.11.0 - golang.org/x/term v0.27.0 + golang.org/x/term v0.28.0 google.golang.org/api v0.213.0 google.golang.org/protobuf v1.36.3 sigs.k8s.io/kustomize/kyaml v0.18.1 @@ -277,7 +277,7 @@ require ( golang.org/x/exp v0.0.0-20241215155358-4a5509556b9e // indirect golang.org/x/mod v0.22.0 // indirect golang.org/x/oauth2 v0.24.0 // indirect - golang.org/x/sys v0.28.0 // indirect + golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect golang.org/x/tools v0.28.0 // indirect google.golang.org/genproto v0.0.0-20241113202542-65e8d215514f // indirect diff --git a/go.sum b/go.sum index 819c849e73c..cdbccc9ab1c 100644 --- a/go.sum +++ b/go.sum @@ -1734,8 +1734,8 @@ github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9f github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs= github.com/twmb/franz-go/pkg/kadm v1.14.0/go.mod h1:XjOPz6ZaXXjrW2jVCfLuucP8H1w2TvD6y3PT2M+aAM4= -github.com/twmb/franz-go/pkg/kfake v0.0.0-20241202133023-293b7c4c56bb h1:cNa7PaAvkAhYIOootH/5gRO9ljbI3MIObf5qU/PKFKY= -github.com/twmb/franz-go/pkg/kfake v0.0.0-20241202133023-293b7c4c56bb/go.mod h1:nkBI/wGFp7t1NJnnCeJdS4sX5atPAqwCPpDXKuI7SC8= +github.com/twmb/franz-go/pkg/kfake v0.0.0-20250121044851-f30c518d6b72 h1:gPKjLspEpYIMuRrpgPe6+wFpcaoo4rb9PGmvZS0trDE= +github.com/twmb/franz-go/pkg/kfake v0.0.0-20250121044851-f30c518d6b72/go.mod h1:zCgWGv7Rg9B70WV6T+tUbifRJnx60gGTFU/U4xZpyUA= github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= github.com/twmb/franz-go/plugin/kotel v1.5.0 h1:TiPfGUbQK384OO7ZYGdo7JuPCbJn+/8njQ/D9Je9CDE= @@ -1848,8 +1848,8 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2150,8 +2150,8 @@ golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -2168,8 +2168,8 @@ golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= -golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= -golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= +golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= +golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/vendor/github.com/twmb/franz-go/pkg/kfake/01_fetch.go b/vendor/github.com/twmb/franz-go/pkg/kfake/01_fetch.go index 53f9a8e6ee9..a674e7c65a6 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kfake/01_fetch.go +++ b/vendor/github.com/twmb/franz-go/pkg/kfake/01_fetch.go @@ -51,7 +51,7 @@ func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, er if !ok || pd.createdAt.After(creq.at) { continue } - if pd.leader != creq.cc.b { + if pd.leader != creq.cc.b && !pd.followers.has(creq.cc.b) { returnEarly = true // NotLeaderForPartition break out } @@ -162,7 +162,7 @@ full: } continue } - if pd.leader != creq.cc.b { + if pd.leader != creq.cc.b && !pd.followers.has(creq.cc.b) { p := donep(rt.Topic, rt.TopicID, rp.Partition, kerr.NotLeaderForPartition.Code) p.CurrentLeader.LeaderID = pd.leader.node p.CurrentLeader.LeaderEpoch = pd.epoch diff --git a/vendor/github.com/twmb/franz-go/pkg/kfake/cluster.go b/vendor/github.com/twmb/franz-go/pkg/kfake/cluster.go index 3720cf61e7d..f9504dee16a 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kfake/cluster.go +++ b/vendor/github.com/twmb/franz-go/pkg/kfake/cluster.go @@ -980,6 +980,20 @@ func (c *Cluster) CoordinatorFor(key string) int32 { return n } +// LeaderFor returns the node ID of the topic partition. If the partition +// does not exist, this returns -1. +func (c *Cluster) LeaderFor(topic string, partition int32) int32 { + n := int32(-1) + c.admin(func() { + pd, ok := c.data.tps.getp(topic, partition) + if !ok { + return + } + n = pd.leader.node + }) + return n +} + // RehashCoordinators simulates group and transacational ID coordinators moving // around. All group and transactional IDs are rekeyed. This forces clients to // reload coordinators. @@ -1084,3 +1098,16 @@ func (c *Cluster) shufflePartitionsLocked() { p.epoch++ }) } + +// SetFollowers sets the node IDs of brokers that can also serve fetch requests +// for a partition. Setting followers to an empty or nil slice reverts to the +// default of only the leader being able to serve fetch requests. +func (c *Cluster) SetFollowers(topic string, partition int32, followers []int32) { + c.admin(func() { + pd, ok := c.data.tps.getp(topic, partition) + if !ok { + return + } + pd.followers = append([]int32(nil), followers...) + }) +} diff --git a/vendor/github.com/twmb/franz-go/pkg/kfake/data.go b/vendor/github.com/twmb/franz-go/pkg/kfake/data.go index 566d51b6956..b2947b59f94 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kfake/data.go +++ b/vendor/github.com/twmb/franz-go/pkg/kfake/data.go @@ -42,14 +42,17 @@ type ( nbytes int64 // abortedTxns - rf int8 - leader *broker + rf int8 + leader *broker + followers followers watch map[*watchFetch]struct{} createdAt time.Time } + followers []int32 + partBatch struct { kmsg.RecordBatch nbytes int @@ -68,6 +71,15 @@ type ( } ) +func (fs followers) has(b *broker) bool { + for _, f := range fs { + if f == b.node { + return true + } + } + return false +} + func (d *data) mkt(t string, nparts int, nreplicas int, configs map[string]*string) { if d.tps != nil { if _, exists := d.tps[t]; exists { @@ -88,6 +100,9 @@ func (d *data) mkt(t string, nparts int, nreplicas int, configs map[string]*stri } if nreplicas < 0 { nreplicas = 3 // cluster default + if nreplicas > len(d.c.bs) { + nreplicas = len(d.c.bs) + } } d.id2t[id] = t d.t2id[t] = id diff --git a/vendor/github.com/twmb/franz-go/pkg/kfake/topic_partition.go b/vendor/github.com/twmb/franz-go/pkg/kfake/topic_partition.go index ac409c53e7d..f7e90150dd5 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kfake/topic_partition.go +++ b/vendor/github.com/twmb/franz-go/pkg/kfake/topic_partition.go @@ -1,5 +1,7 @@ package kfake +import "sort" + type tps[V any] map[string]map[int32]*V func (tps *tps[V]) getp(t string, p int32) (*V, bool) { @@ -73,3 +75,100 @@ func (tps *tps[V]) delp(t string, p int32) { delete(*tps, t) } } + +// TopicInfo contains snapshot-in-time metadata about an existing topic. +type TopicInfo struct { + TopicID [16]byte // TopicID is the UUID of the topic. + NumReplicas int // NumReplicas is the replication factor for all partitions in this topic. + Configs map[string]*string // Configs contains all configuration values specified for this topic. +} + +// PartitionInfo contains snapshot-in-time metadata about an existing partition. +type PartitionInfo struct { + HighWatermark int64 // HighWatermark is the latest offset present in the partition. + LastStableOffset int64 // LastStableOffset is the last stable offset. + LogStartOffset int64 // LogStartOffsets is the first offset present in the partition. + Epoch int32 // Epoch is the current "epoch" of the partition -- how many times the partition transferred leaders. + MaxTimestamp int64 // MaxTimestamp is the current max timestamp across all batches. + NumBytes int64 // NumBytes is the current amount of data stored in the partition. + Leader int32 // Leader is the current leader of the partition. +} + +func (pd *partData) info() *PartitionInfo { + return &PartitionInfo{ + HighWatermark: pd.highWatermark, + LastStableOffset: pd.lastStableOffset, + LogStartOffset: pd.logStartOffset, + Epoch: pd.epoch, + MaxTimestamp: pd.maxTimestamp, + NumBytes: pd.nbytes, + Leader: pd.leader.node, + } +} + +// TopicInfo returns information about a topic if it exists. +func (c *Cluster) TopicInfo(topic string) *TopicInfo { + var i *TopicInfo + c.admin(func() { + id, exists := c.data.t2id[topic] + if !exists { + return + } + clone := func(m map[string]*string) map[string]*string { // a deeper maps.Clone + m2 := make(map[string]*string, len(m)) + for k, v := range m { + var v2 *string + if v != nil { + vv := *v + v2 = &vv + } + m2[k] = v2 + } + return m2 + } + i = &TopicInfo{ + TopicID: id, + NumReplicas: c.data.treplicas[topic], + Configs: clone(c.data.tcfgs[topic]), + } + }) + return i +} + +// PartitionInfo returns information about a partition if it exists. +func (c *Cluster) PartitionInfo(topic string, partition int32) *PartitionInfo { + var i *PartitionInfo + c.admin(func() { + pd, ok := c.data.tps.getp(topic, partition) + if !ok { + return + } + i = pd.info() + }) + return i +} + +// PartitionInfos returns information about all partitions in a topic, +// if it exists. The partitions are returned in sorted partition order, +// with partition 0 at index 0, partition 1 at index 1, etc. +func (c *Cluster) PartitionInfos(topic string) []*PartitionInfo { + var is []*PartitionInfo + c.admin(func() { + t, ok := c.data.tps.gett(topic) + if !ok { + return + } + partitions := make([]int32, 0, len(t)) + for p := range t { + partitions = append(partitions, p) + } + sort.Slice(partitions, func(i, j int) bool { + return partitions[i] < partitions[j] + }) + for _, p := range partitions { + pd, _ := c.data.tps.getp(topic, p) + is = append(is, pd.info()) + } + }) + return is +} diff --git a/vendor/golang.org/x/crypto/pkcs12/crypto.go b/vendor/golang.org/x/crypto/pkcs12/crypto.go index 96f4a1a56ec..212538cb5a8 100644 --- a/vendor/golang.org/x/crypto/pkcs12/crypto.go +++ b/vendor/golang.org/x/crypto/pkcs12/crypto.go @@ -26,7 +26,7 @@ type pbeCipher interface { create(key []byte) (cipher.Block, error) // deriveKey returns a key derived from the given password and salt. deriveKey(salt, password []byte, iterations int) []byte - // deriveKey returns an IV derived from the given password and salt. + // deriveIV returns an IV derived from the given password and salt. deriveIV(salt, password []byte, iterations int) []byte } diff --git a/vendor/golang.org/x/sys/unix/syscall_dragonfly.go b/vendor/golang.org/x/sys/unix/syscall_dragonfly.go index 97cb916f2c9..be8c0020701 100644 --- a/vendor/golang.org/x/sys/unix/syscall_dragonfly.go +++ b/vendor/golang.org/x/sys/unix/syscall_dragonfly.go @@ -246,6 +246,18 @@ func Sendfile(outfd int, infd int, offset *int64, count int) (written int, err e return sendfile(outfd, infd, offset, count) } +func Dup3(oldfd, newfd, flags int) error { + if oldfd == newfd || flags&^O_CLOEXEC != 0 { + return EINVAL + } + how := F_DUP2FD + if flags&O_CLOEXEC != 0 { + how = F_DUP2FD_CLOEXEC + } + _, err := fcntl(oldfd, how, newfd) + return err +} + /* * Exposed directly */ diff --git a/vendor/golang.org/x/sys/windows/dll_windows.go b/vendor/golang.org/x/sys/windows/dll_windows.go index 4e613cf6335..3ca814f54d4 100644 --- a/vendor/golang.org/x/sys/windows/dll_windows.go +++ b/vendor/golang.org/x/sys/windows/dll_windows.go @@ -43,8 +43,8 @@ type DLL struct { // LoadDLL loads DLL file into memory. // // Warning: using LoadDLL without an absolute path name is subject to -// DLL preloading attacks. To safely load a system DLL, use LazyDLL -// with System set to true, or use LoadLibraryEx directly. +// DLL preloading attacks. To safely load a system DLL, use [NewLazySystemDLL], +// or use [LoadLibraryEx] directly. func LoadDLL(name string) (dll *DLL, err error) { namep, err := UTF16PtrFromString(name) if err != nil { @@ -271,6 +271,9 @@ func (d *LazyDLL) NewProc(name string) *LazyProc { } // NewLazyDLL creates new LazyDLL associated with DLL file. +// +// Warning: using NewLazyDLL without an absolute path name is subject to +// DLL preloading attacks. To safely load a system DLL, use [NewLazySystemDLL]. func NewLazyDLL(name string) *LazyDLL { return &LazyDLL{Name: name} } @@ -410,7 +413,3 @@ func loadLibraryEx(name string, system bool) (*DLL, error) { } return &DLL{Name: name, Handle: h}, nil } - -type errString string - -func (s errString) Error() string { return string(s) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 085c594fb0a..92e64d82f8a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1196,7 +1196,7 @@ github.com/tklauser/go-sysconf # github.com/tklauser/numcpus v0.6.1 ## explicit; go 1.13 github.com/tklauser/numcpus -# github.com/twmb/franz-go v1.18.0 => github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 +# github.com/twmb/franz-go v1.18.1 => github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 ## explicit; go 1.21 github.com/twmb/franz-go/pkg/kbin github.com/twmb/franz-go/pkg/kerr @@ -1209,7 +1209,7 @@ github.com/twmb/franz-go/pkg/sasl/plain # github.com/twmb/franz-go/pkg/kadm v1.14.0 ## explicit; go 1.21 github.com/twmb/franz-go/pkg/kadm -# github.com/twmb/franz-go/pkg/kfake v0.0.0-20241202133023-293b7c4c56bb +# github.com/twmb/franz-go/pkg/kfake v0.0.0-20250121044851-f30c518d6b72 ## explicit; go 1.21 github.com/twmb/franz-go/pkg/kfake # github.com/twmb/franz-go/pkg/kmsg v1.9.0 @@ -1380,7 +1380,7 @@ go.uber.org/zap/internal/color go.uber.org/zap/internal/exit go.uber.org/zap/zapcore go.uber.org/zap/zapgrpc -# golang.org/x/crypto v0.31.0 +# golang.org/x/crypto v0.32.0 ## explicit; go 1.20 golang.org/x/crypto/argon2 golang.org/x/crypto/bcrypt @@ -1440,14 +1440,14 @@ golang.org/x/oauth2/jwt golang.org/x/sync/errgroup golang.org/x/sync/semaphore golang.org/x/sync/singleflight -# golang.org/x/sys v0.28.0 +# golang.org/x/sys v0.29.0 ## explicit; go 1.18 golang.org/x/sys/cpu golang.org/x/sys/plan9 golang.org/x/sys/unix golang.org/x/sys/windows golang.org/x/sys/windows/registry -# golang.org/x/term v0.27.0 +# golang.org/x/term v0.28.0 ## explicit; go 1.18 golang.org/x/term # golang.org/x/text v0.21.0