From ff8fef6ad0d52a3bbc2e046cd4cf86890934ca62 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Wed, 15 Sep 2021 18:13:49 +0800 Subject: [PATCH] Upgrade pulsar-client-go to 0.6.0 (#7909) MessageID interface of pulsar-client-go adds 4 more methods. So I add these methods into `MessageID` interface. Resolves: #7770 Signed-off-by: yangxuan --- go.mod | 2 +- go.sum | 7 +++++++ internal/util/mqclient/id.go | 12 ++++++++++++ internal/util/mqclient/pulsar_id.go | 20 ++++++++++++++++++++ internal/util/mqclient/pulsar_id_test.go | 5 +++++ internal/util/mqclient/rmq_id.go | 23 +++++++++++++++++++++++ internal/util/mqclient/rmq_id_test.go | 5 +++++ 7 files changed, 73 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 0878f40dba72e..f892d14d4bd87 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( ) replace ( - github.com/apache/pulsar-client-go => github.com/apache/pulsar-client-go v0.5.0 + github.com/apache/pulsar-client-go => github.com/apache/pulsar-client-go v0.6.0 google.golang.org/grpc => google.golang.org/grpc v1.38.0 github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 ) diff --git a/go.sum b/go.sum index baff6550654c3..8be0feb324eb1 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4= github.com/99designs/keyring v1.1.5 h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4= github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0= +github.com/AthenZ/athenz v1.10.15 h1:8Bc2W313k/ev/SGokuthNbzpwfg9W3frg3PKq1r943I= +github.com/AthenZ/athenz v1.10.15/go.mod h1:7KMpEuJ9E4+vMCMI3UQJxwWs0RZtQq7YXZ1IteUjdsc= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -59,6 +61,8 @@ github.com/antonmedv/expr v1.8.9 h1:O9stiHmHHww9b4ozhPx7T6BK7fXfOCHJ8ybxf0833zw= github.com/antonmedv/expr v1.8.9/go.mod h1:5qsM3oLGDND7sDmQGDXHkYfkjYMUX14qsgqmHhwGEk8= github.com/apache/pulsar-client-go v0.5.0 h1:cM2e6dXBa9OyPtvGHxZB1OlSOWQxsWzu45btBvtmpYo= github.com/apache/pulsar-client-go v0.5.0/go.mod h1:yj6hIv/EZXf5GgJJ8I3T13Yx9yspj8aF2QrJ5kzuueM= +github.com/apache/pulsar-client-go v0.6.0 h1:yKX7NsmJxR5mL6uIUxTTatNhMFlhurTASSZRJ9IULDg= +github.com/apache/pulsar-client-go v0.6.0/go.mod h1:A1P5VjjljsFKAD13w7/jmU3Dly2gcRvcobiULqQXhz4= github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd h1:P5kM7jcXJ7TaftX0/EMKiSJgvQc/ct+Fw0KMvcH3WuY= github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd/go.mod h1:0UtvvETGDdvXNDCHa8ZQpxl+w3HbdFtfYZvDHLgWGTY= github.com/apache/thrift v0.14.2 h1:hY4rAyg7Eqbb27GB6gkhUKrRAuc8xRjlNtJq+LseKeY= @@ -71,6 +75,7 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aws/aws-sdk-go v1.30.8/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA= github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= @@ -249,6 +254,7 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= @@ -647,6 +653,7 @@ golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= diff --git a/internal/util/mqclient/id.go b/internal/util/mqclient/id.go index ed4d10345d5d8..5bea6143e80bc 100644 --- a/internal/util/mqclient/id.go +++ b/internal/util/mqclient/id.go @@ -14,4 +14,16 @@ package mqclient type MessageID interface { // Serialize the message id into a sequence of bytes that can be stored somewhere else Serialize() []byte + + // Get the message ledgerID + LedgerID() int64 + + // Get the message entryID + EntryID() int64 + + // Get the message batchIdx + BatchIdx() int32 + + // Get the message partitionIdx + PartitionIdx() int32 } diff --git a/internal/util/mqclient/pulsar_id.go b/internal/util/mqclient/pulsar_id.go index 8edd54aac3169..a0dd443d4f90a 100644 --- a/internal/util/mqclient/pulsar_id.go +++ b/internal/util/mqclient/pulsar_id.go @@ -21,10 +21,30 @@ type pulsarID struct { messageID pulsar.MessageID } +// Check if pulsarID implements pulsar.MessageID and MessageID interface +var _ pulsar.MessageID = &pulsarID{} +var _ MessageID = &pulsarID{} + func (pid *pulsarID) Serialize() []byte { return pid.messageID.Serialize() } +func (pid *pulsarID) LedgerID() int64 { + return pid.messageID.LedgerID() +} + +func (pid *pulsarID) EntryID() int64 { + return pid.messageID.EntryID() +} + +func (pid *pulsarID) BatchIdx() int32 { + return pid.messageID.BatchIdx() +} + +func (pid *pulsarID) PartitionIdx() int32 { + return pid.messageID.PartitionIdx() +} + func SerializePulsarMsgID(messageID pulsar.MessageID) []byte { return messageID.Serialize() } diff --git a/internal/util/mqclient/pulsar_id_test.go b/internal/util/mqclient/pulsar_id_test.go index 527071b94f422..01074961d4f9a 100644 --- a/internal/util/mqclient/pulsar_id_test.go +++ b/internal/util/mqclient/pulsar_id_test.go @@ -27,6 +27,11 @@ func TestPulsarID_Serialize(t *testing.T) { binary := pid.Serialize() assert.NotNil(t, binary) assert.NotZero(t, len(binary)) + + pid.LedgerID() + pid.EntryID() + pid.BatchIdx() + pid.PartitionIdx() } func Test_SerializePulsarMsgID(t *testing.T) { diff --git a/internal/util/mqclient/rmq_id.go b/internal/util/mqclient/rmq_id.go index 24b30055726a5..539e786e3dc4f 100644 --- a/internal/util/mqclient/rmq_id.go +++ b/internal/util/mqclient/rmq_id.go @@ -21,10 +21,33 @@ type rmqID struct { messageID rocksmq.UniqueID } +// Check if rmqID implements MessageID interface +var _ MessageID = &rmqID{} + func (rid *rmqID) Serialize() []byte { return SerializeRmqID(rid.messageID) } +func (rid *rmqID) LedgerID() int64 { + // TODO + return 0 +} + +func (rid *rmqID) EntryID() int64 { + // TODO + return 0 +} + +func (rid *rmqID) BatchIdx() int32 { + // TODO + return 0 +} + +func (rid *rmqID) PartitionIdx() int32 { + // TODO + return 0 +} + func SerializeRmqID(messageID int64) []byte { b := make([]byte, 8) binary.LittleEndian.PutUint64(b, uint64(messageID)) diff --git a/internal/util/mqclient/rmq_id_test.go b/internal/util/mqclient/rmq_id_test.go index da2db447284f9..35a156f8f6c16 100644 --- a/internal/util/mqclient/rmq_id_test.go +++ b/internal/util/mqclient/rmq_id_test.go @@ -25,6 +25,11 @@ func TestRmqID_Serialize(t *testing.T) { bin := rid.Serialize() assert.NotNil(t, bin) assert.NotZero(t, len(bin)) + + rid.LedgerID() + rid.EntryID() + rid.BatchIdx() + rid.PartitionIdx() } func Test_SerializeRmqID(t *testing.T) {