Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exchange version - Step 1 #278

Merged
merged 2 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,18 @@ perf-test-docker-build: perf-test-build
perf-test-docker-push: perf-test-docker-build
$(BUILDKIT) push pivotalrabbitmq/go-stream-perf-test:$(VERSION)

RABBITMQ_OCI ?= pivotalrabbitmq/rabbitmq-stream
RABBITMQ_OCI ?= rabbitmq:3-management
BUILDKIT_RUN_ARGS ?= --pull always
.PHONY: rabbitmq-server
rabbitmq-server:
$(BUILDKIT) run -it --rm --name rabbitmq-stream-go-client-test \
$(BUILDKIT) run -d --rm --name rabbitmq-stream-go-client-test \
-p 5552:5552 -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \
$(BUILDKIT_RUN_ARGS) \
$(RABBITMQ_OCI)
sleep 5
$(BUILDKIT) exec rabbitmq-stream-go-client-test rabbitmq-plugins enable rabbitmq_stream_management rabbitmq_amqp1_0


rabbitmq-ha-proxy:
cd compose/ha_tls; rm -rf tls-gen;
Expand All @@ -107,7 +110,7 @@ rabbitmq-server-tls:
-v $(shell pwd)/compose/tls/conf/:/etc/rabbitmq/ -v $(shell pwd)/compose/tls/tls-gen/basic/result/:/certs \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \
--pull always \
docker.io/rabbitmq:3.13-rc-management
docker.io/rabbitmq:3-management



113 changes: 113 additions & 0 deletions pkg/stream/available_features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package stream

import (
"fmt"
"regexp"
"strconv"
"strings"
"sync"
)

var lock = &sync.Mutex{}
var (
instance availableFeatures
)

type availableFeatures struct {
is313OrMore bool
is311OrMore bool
brokerVersion string
}

func availableFeaturesInstance() *availableFeatures {
if instance == (availableFeatures{}) {
lock.Lock()
defer lock.Unlock()
if instance == (availableFeatures{}) {
instance = availableFeatures{}
}
}
return &instance
}

func (a *availableFeatures) Is311OrMore() bool {
return a.is311OrMore
}

func (a *availableFeatures) Is313OrMore() bool {
return a.is313OrMore
}

func (a *availableFeatures) SetVersion(version string) error {
if extractVersion(version) == "" {
return fmt.Errorf("invalid version format: %s", version)
}
a.brokerVersion = version
a.is311OrMore = IsVersionGreaterOrEqual(extractVersion(version), "3.11.0")
a.is313OrMore = IsVersionGreaterOrEqual(extractVersion(version), "3.13.0")
return nil
}

func extractVersion(fullVersion string) string {
pattern := `(\d+\.\d+\.\d+)`
regex := regexp.MustCompile(pattern)
match := regex.FindStringSubmatch(fullVersion)

if len(match) > 1 {
return match[1]
}
return ""
}

func IsVersionGreaterOrEqual(version, target string) bool {
v1, err := parseVersion(version)
if err != nil {
return false
}

v2, err := parseVersion(target)
if err != nil {
return false
}
return v1.Compare(v2) >= 0
}

func parseVersion(version string) (Version, error) {
parts := strings.Split(version, ".")
if len(parts) != 3 {
return Version{}, fmt.Errorf("invalid version format: %s", version)
}

major, err := strconv.Atoi(parts[0])
if err != nil {
return Version{}, fmt.Errorf("invalid major version: %s", parts[0])
}

minor, err := strconv.Atoi(parts[1])
if err != nil {
return Version{}, fmt.Errorf("invalid minor version: %s", parts[1])
}

patch, err := strconv.Atoi(parts[2])
if err != nil {
return Version{}, fmt.Errorf("invalid patch version: %s", parts[2])
}

return Version{Major: major, Minor: minor, Patch: patch}, nil
}

type Version struct {
Major int
Minor int
Patch int
}

func (v Version) Compare(other Version) int {
if v.Major != other.Major {
return v.Major - other.Major
}
if v.Minor != other.Minor {
return v.Minor - other.Minor
}
return v.Patch - other.Patch
}
72 changes: 72 additions & 0 deletions pkg/stream/available_features_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package stream

import (
"fmt"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Available Features", func() {

It("Parse Version", func() {
v, err := parseVersion("1.2.3")
Expect(err).NotTo(HaveOccurred())
Expect(v).To(Equal(Version{Major: 1, Minor: 2, Patch: 3}))

_, err = parseVersion("1.2")
Expect(err).To(HaveOccurred())
Expect(fmt.Sprintf("%s", err)).To(ContainSubstring("invalid version format: 1.2"))

_, err = parseVersion("error.3.3")
Expect(err).To(HaveOccurred())
Expect(fmt.Sprintf("%s", err)).To(ContainSubstring("invalid major version: error"))

_, err = parseVersion("1.error.3")
Expect(err).To(HaveOccurred())
Expect(fmt.Sprintf("%s", err)).To(ContainSubstring("invalid minor version: error"))

_, err = parseVersion("1.2.error")
Expect(err).To(HaveOccurred())
Expect(fmt.Sprintf("%s", err)).To(ContainSubstring("invalid patch version: error"))

v, err = parseVersion(extractVersion("3.12.1-rc1"))
Expect(err).NotTo(HaveOccurred())
Expect(v).To(Equal(Version{Major: 3, Minor: 12, Patch: 1}))

v, err = parseVersion(extractVersion("3.13.1-alpha.234"))
Expect(err).NotTo(HaveOccurred())
Expect(v).To(Equal(Version{Major: 3, Minor: 13, Patch: 1}))
})

It("Is Version Greater Or Equal", func() {
Expect(IsVersionGreaterOrEqual("1.2.3", "1.2.3")).To(BeTrue())
Expect(IsVersionGreaterOrEqual("1.2.3", "1.2.2")).To(BeTrue())
Expect(IsVersionGreaterOrEqual("1.2.3", "1.2.4")).To(BeFalse())
Expect(IsVersionGreaterOrEqual("1.2.3", "1.3.3")).To(BeFalse())
Expect(IsVersionGreaterOrEqual("1.2.3", "2.2.3")).To(BeFalse())
Expect(IsVersionGreaterOrEqual("3.1.3-alpha.1", "2.2.3")).To(BeFalse())
Expect(IsVersionGreaterOrEqual("3.3.3-rc.1", "2.2.3")).To(BeFalse())

Expect(IsVersionGreaterOrEqual("error.3.2", "2.2.3")).To(BeFalse())
Expect(IsVersionGreaterOrEqual("4.3.2", "2.error.3")).To(BeFalse())

})

It("Available Features", func() {
var availableFeatures = availableFeaturesInstance()
Expect(availableFeatures).NotTo(BeNil())
Expect(availableFeatures.SetVersion("error")).NotTo(BeNil())
Expect(availableFeatures.SetVersion("3.9.0")).To(BeNil())
Expect(availableFeatures.Is311OrMore()).To(BeFalse())
Expect(availableFeatures.Is313OrMore()).To(BeFalse())
Expect(availableFeatures.SetVersion("3.11.0")).To(BeNil())
Expect(availableFeatures.Is311OrMore()).To(BeTrue())
Expect(availableFeatures.Is313OrMore()).To(BeFalse())
Expect(availableFeatures.SetVersion("3.13.0")).To(BeNil())
Expect(availableFeatures.Is311OrMore()).To(BeTrue())
Expect(availableFeatures.Is313OrMore()).To(BeTrue())
Expect(availableFeatures.SetVersion("3.13.1-alpha.234")).To(BeNil())
Expect(availableFeatures.Is311OrMore()).To(BeTrue())
Expect(availableFeatures.Is313OrMore()).To(BeTrue())
})
})
20 changes: 17 additions & 3 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,18 @@ func (c *Client) connect() error {
c.socket.setOpen()

go c.handleResponse()
err2 := c.peerProperties()
serverProperties, err2 := c.peerProperties()

if err2 != nil {
logs.LogError("Can't set the peer-properties. Check if the stream server is running/reachable")
return err2
}
logs.LogDebug("Server properties: %s", serverProperties)
if serverProperties["version"] == "" {
logs.LogInfo(
"Server version is less than 3.11.0, skipping command version exchange")
}

pwd, _ := u.User.Password()
err2 = c.authenticate(u.User.Username(), pwd)
if err2 != nil {
Expand Down Expand Up @@ -216,7 +222,7 @@ func (c *Client) setConnectionName(connectionName string) {
c.clientProperties.items["connection_name"] = connectionName
}

func (c *Client) peerProperties() error {
func (c *Client) peerProperties() (map[string]string, error) {
clientPropertiesSize := 4 // size of the map, always there

c.clientProperties.items["product"] = "RabbitMQ Stream"
Expand All @@ -241,7 +247,15 @@ func (c *Client) peerProperties() error {
writeString(b, element)
}

return c.handleWrite(b.Bytes(), resp).Err
err := c.handleWriteWithResponse(b.Bytes(), resp, false)
if err.Err != nil {
return nil, err.Err
}

serverProperties := <-resp.data
_ = c.coordinator.RemoveResponseById(resp.correlationid)
return serverProperties.(map[string]string), nil

}

func (c *Client) authenticate(user string, password string) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/stream/server_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (c *Client) handlePeerProperties(readProtocol *ReaderProtocol, r *bufio.Rea
return
}
res.code <- Code{id: readProtocol.ResponseCode}
res.data <- serverProperties

}

Expand Down
Loading