From 53e258eda8a70b4983804d9342183e33d758d898 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 23 Nov 2023 15:45:09 +0100 Subject: [PATCH] Translate v2 requests into v3 ClusterMemberAttrSetRequest and ClusterVersionSetRequest Signed-off-by: Marek Siarkowicz --- server/etcdserver/apply_v2.go | 43 +++++++++++++++++---------- server/etcdserver/server.go | 8 ++--- server/etcdserver/server_test.go | 50 +++++++++++++++++++++++++------- 3 files changed, 69 insertions(+), 32 deletions(-) diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go index f9ec04d518d5..751057054695 100644 --- a/server/etcdserver/apply_v2.go +++ b/server/etcdserver/apply_v2.go @@ -18,32 +18,43 @@ import ( "encoding/json" "path" - "github.com/coreos/go-semver/semver" "go.uber.org/zap" - "go.etcd.io/etcd/server/v3/etcdserver/api" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" ) -func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) { - if r.Method != "PUT" || (!storeMemberAttributeRegexp.MatchString(r.Path) && r.Path != membership.StoreClusterVersionKey()) { - s.lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method)) - } - if storeMemberAttributeRegexp.MatchString(r.Path) { - id := membership.MustParseMemberIDFromKey(s.lg, path.Dir(r.Path)) +func v2ToV3Request(lg *zap.Logger, r *RequestV2) pb.InternalRaftRequest { + if r.Method == "PUT" && storeMemberAttributeRegexp.MatchString(r.Path) { + id := membership.MustParseMemberIDFromKey(lg, path.Dir(r.Path)) var attr membership.Attributes if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { - s.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) + lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) } - if s.cluster != nil { - s.cluster.UpdateAttributes(id, attr, shouldApplyV3) + return pb.InternalRaftRequest{ + Header: &pb.RequestHeader{ + ID: r.ID, + }, + ClusterMemberAttrSet: &membershippb.ClusterMemberAttrSetRequest{ + Member_ID: uint64(id), + MemberAttributes: &membershippb.Attributes{ + Name: attr.Name, + ClientUrls: attr.ClientURLs, + }, + }, } } - // TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6 - if r.Path == membership.StoreClusterVersionKey() { - if s.cluster != nil { - // persist to backend given v2store can be very stale - s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3) + if r.Method == "PUT" && r.Path == membership.StoreClusterVersionKey() { + return pb.InternalRaftRequest{ + Header: &pb.RequestHeader{ + ID: r.ID, + }, + ClusterVersionSet: &membershippb.ClusterVersionSetRequest{ + Ver: r.Val, + }, } } + lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method)) + return pb.InternalRaftRequest{} } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index f468400b2d00..9639ee22f0bd 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1849,17 +1849,13 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership. rp := &r pbutil.MustUnmarshal(rp, e.Data) s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp)) - s.applyV2Request((*RequestV2)(rp), shouldApplyV3) - s.w.Trigger(r.ID, Response{}) - return + raftReq = v2ToV3Request(s.lg, (*RequestV2)(rp)) } s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq)) if raftReq.V2 != nil { req := (*RequestV2)(raftReq.V2) - s.applyV2Request(req, shouldApplyV3) - s.w.Trigger(req.ID, Response{}) - return + raftReq = v2ToV3Request(s.lg, req) } id := raftReq.ID diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 944307dfa9c5..21772fa17b2a 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -28,6 +28,7 @@ import ( "time" "github.com/coreos/go-semver/semver" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "go.uber.org/zap" "go.uber.org/zap/zaptest" @@ -50,6 +51,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm" apply2 "go.etcd.io/etcd/server/v3/etcdserver/apply" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/etcdserver/errors" @@ -153,11 +155,19 @@ func TestV2SetMemberAttributes(t *testing.T) { defer betesting.Close(t, be) cl := newTestClusterWithBackend(t, []*membership.Member{{ID: 1}}, be) srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - v2store: mockstore.NewRecorder(), - cluster: cl, + lgMu: new(sync.RWMutex), + lg: zaptest.NewLogger(t), + v2store: mockstore.NewRecorder(), + cluster: cl, + consistIndex: cindex.NewConsistentIndex(be), + w: wait.New(), + } + as, err := v3alarm.NewAlarmStore(srv.lg, schema.NewAlarmBackend(srv.lg, be)) + if err != nil { + t.Fatal(err) } + srv.alarmStore = as + srv.uberApply = srv.NewUberApplier() req := pb.Request{ Method: "PUT", @@ -165,7 +175,13 @@ func TestV2SetMemberAttributes(t *testing.T) { Path: membership.MemberAttributesStorePath(1), Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`, } - srv.applyV2Request((*RequestV2)(&req), membership.ApplyBoth) + data, err := proto.Marshal(&req) + if err != nil { + t.Fatal(err) + } + srv.applyEntryNormal(&raftpb.Entry{ + Data: data, + }) w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}} if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) { t.Errorf("attributes = %v, want %v", g, w) @@ -180,11 +196,19 @@ func TestV2SetClusterVersion(t *testing.T) { cl := newTestClusterWithBackend(t, []*membership.Member{}, be) cl.SetVersion(semver.New("3.4.0"), api.UpdateCapability, membership.ApplyBoth) srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - v2store: mockstore.NewRecorder(), - cluster: cl, + lgMu: new(sync.RWMutex), + lg: zaptest.NewLogger(t), + v2store: mockstore.NewRecorder(), + cluster: cl, + consistIndex: cindex.NewConsistentIndex(be), + w: wait.New(), + } + as, err := v3alarm.NewAlarmStore(srv.lg, schema.NewAlarmBackend(srv.lg, be)) + if err != nil { + t.Fatal(err) } + srv.alarmStore = as + srv.uberApply = srv.NewUberApplier() req := pb.Request{ Method: "PUT", @@ -192,7 +216,13 @@ func TestV2SetClusterVersion(t *testing.T) { Path: membership.StoreClusterVersionKey(), Val: "3.5.0", } - srv.applyV2Request((*RequestV2)(&req), membership.ApplyBoth) + data, err := proto.Marshal(&req) + if err != nil { + t.Fatal(err) + } + srv.applyEntryNormal(&raftpb.Entry{ + Data: data, + }) if g := cl.Version(); !reflect.DeepEqual(*g, version.V3_5) { t.Errorf("attributes = %v, want %v", *g, version.V3_5) }