From 9c01ead91f482366b2d0cee6ba7f69eb6398cbe1 Mon Sep 17 00:00:00 2001 From: Marc Tarnutzer Date: Thu, 4 May 2023 23:51:15 +0200 Subject: [PATCH 1/5] switch to gobwas/ws, add compression --- connection.go | 199 ++++++++++++++++++++++++++++++++++++++++--- go.mod | 6 +- go.sum | 12 ++- relay.go | 18 ++-- subscription_test.go | 41 +++++++++ 5 files changed, 250 insertions(+), 26 deletions(-) diff --git a/connection.go b/connection.go index b3dd52e..bc4c1e8 100644 --- a/connection.go +++ b/connection.go @@ -1,34 +1,213 @@ package nostr import ( + "bytes" + "compress/flate" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" "sync" - "github.com/gorilla/websocket" + "github.com/gobwas/httphead" + "github.com/gobwas/ws" + "github.com/gobwas/ws/wsflate" + "github.com/gobwas/ws/wsutil" ) type Connection struct { - socket *websocket.Conn - mutex sync.Mutex + conn net.Conn + enableCompression bool + controlHandler wsutil.FrameHandlerFunc + flateReader *wsflate.Reader + reader *wsutil.Reader + flateWriter *wsflate.Writer + writer *wsutil.Writer + mutex sync.Mutex } -func NewConnection(socket *websocket.Conn) *Connection { - return &Connection{ - socket: socket, +func NewConnection(ctx context.Context, url string, requestHeader http.Header, enableCompression bool) (*Connection, error) { + dialer := ws.Dialer{ + Header: ws.HandshakeHeaderHTTP(requestHeader), + } + state := ws.StateClientSide + if enableCompression { + state |= ws.StateExtended + dialer.Extensions = []httphead.Option{ + wsflate.DefaultParameters.Option(), + } + } + + conn, _, _, err := dialer.Dial(ctx, url) + if err != nil { + return nil, fmt.Errorf("failed to dial: %w", err) + } + + // reader + var flateReader *wsflate.Reader + var msgState wsflate.MessageState + if enableCompression { + msgState.SetCompressed(true) + + flateReader = wsflate.NewReader(nil, func(r io.Reader) wsflate.Decompressor { + return flate.NewReader(r) + }) + } + + controlHandler := wsutil.ControlFrameHandler(conn, ws.StateClientSide) + reader := &wsutil.Reader{ + Source: conn, + State: state, + OnIntermediate: controlHandler, + Extensions: []wsutil.RecvExtension{ + &msgState, + }, + } + + // writer + var flateWriter *wsflate.Writer + if enableCompression { + flateWriter = wsflate.NewWriter(nil, func(w io.Writer) wsflate.Compressor { + fw, err := flate.NewWriter(w, 4) + if err != nil { + InfoLogger.Printf("Failed to create flate writer: %v", err) + } + return fw + }) } + + writer := wsutil.NewWriter( + conn, + state, + ws.OpBinary, + ) + writer.SetExtensions(&msgState) + + return &Connection{ + conn: conn, + enableCompression: enableCompression, + controlHandler: controlHandler, + flateReader: flateReader, + reader: reader, + flateWriter: flateWriter, + writer: writer, + }, nil } func (c *Connection) WriteJSON(v any) error { c.mutex.Lock() defer c.mutex.Unlock() - return c.socket.WriteJSON(v) + + if c.enableCompression { + c.flateWriter.Reset(c.writer) + if err := json.NewEncoder(c.flateWriter).Encode(v); err != nil { + return fmt.Errorf("failed to encode json: %w", err) + } + + err := c.flateWriter.Close() + if err != nil { + return fmt.Errorf("failed to close flate writer: %w", err) + } + } else { + if err := json.NewEncoder(c.writer).Encode(v); err != nil { + return fmt.Errorf("failed to encode json: %w", err) + } + } + + err := c.writer.Flush() + if err != nil { + return fmt.Errorf("failed to flush writer: %w", err) + } + + return nil +} + +func (c *Connection) Ping() error { + c.mutex.Lock() + defer c.mutex.Unlock() + + return wsutil.WriteClientMessage(c.writer, ws.OpPing, nil) } -func (c *Connection) WriteMessage(messageType int, data []byte) error { +func (c *Connection) WriteMessage(data []byte) error { c.mutex.Lock() defer c.mutex.Unlock() - return c.socket.WriteMessage(messageType, data) + + if c.enableCompression { + c.flateWriter.Reset(c.writer) + if _, err := io.Copy(c.flateWriter, bytes.NewReader(data)); err != nil { + return fmt.Errorf("failed to write message: %w", err) + } + + err := c.flateWriter.Close() + if err != nil { + return fmt.Errorf("failed to close flate writer: %w", err) + } + } else { + if _, err := io.Copy(c.writer, bytes.NewReader(data)); err != nil { + return fmt.Errorf("failed to write message: %w", err) + } + } + + err := c.writer.Flush() + if err != nil { + return fmt.Errorf("failed to flush writer: %w", err) + } + + return nil +} + +func (c *Connection) ReadMessage(ctx context.Context) ([]byte, error) { + for { + select { + case <-ctx.Done(): + return nil, errors.New("context canceled") + default: + } + + h, err := c.reader.NextFrame() + if err != nil { + c.conn.Close() + return nil, fmt.Errorf("failed to advance frame: %w", err) + } + + if h.OpCode.IsControl() { + if err := c.controlHandler(h, c.reader); err != nil { + return nil, fmt.Errorf("failed to handle control frame: %w", err) + } + } else if h.OpCode == ws.OpBinary || + h.OpCode == ws.OpText { + break + } + + if err := c.reader.Discard(); err != nil { + return nil, fmt.Errorf("failed to discard: %w", err) + } + } + + buf := new(bytes.Buffer) + if c.enableCompression { + c.flateReader.Reset(c.reader) + if _, err := io.Copy(buf, c.flateReader); err != nil { + return nil, fmt.Errorf("failed to read message: %w", err) + } + } else { + if _, err := io.Copy(buf, c.reader); err != nil { + return nil, fmt.Errorf("failed to read message: %w", err) + } + } + + return buf.Bytes(), nil } func (c *Connection) Close() error { - return c.socket.Close() + err := c.conn.Close() + if err != nil { + return fmt.Errorf("failed to close connection: %w", err) + } + + return nil } diff --git a/go.mod b/go.mod index d4b97bc..13bb6b5 100644 --- a/go.mod +++ b/go.mod @@ -6,11 +6,11 @@ require ( github.com/SaveTheRbtz/generic-sync-map-go v0.0.0-20220414055132-a37292614db8 github.com/btcsuite/btcd/btcec/v2 v2.2.0 github.com/btcsuite/btcd/btcutil v1.1.3 - github.com/gorilla/websocket v1.4.2 + github.com/gobwas/httphead v0.1.0 + github.com/gobwas/ws v1.2.0 github.com/mailru/easyjson v0.7.7 github.com/tyler-smith/go-bip32 v1.0.0 github.com/tyler-smith/go-bip39 v1.1.0 - github.com/valyala/fastjson v1.6.3 golang.org/x/exp v0.0.0-20221106115401-f9659909a136 golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc ) @@ -21,6 +21,8 @@ require ( github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect + github.com/gobwas/pool v0.2.1 // indirect github.com/josharian/intern v1.0.0 // indirect golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect + golang.org/x/sys v0.6.0 // indirect ) diff --git a/go.sum b/go.sum index d1e5afa..3340f78 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,12 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeC github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= +github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= +github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= +github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.2.0 h1:u0p9s3xLYpZCA1z5JgCkMeB34CKCMMQbM+G8Ii7YD0I= +github.com/gobwas/ws v1.2.0/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= @@ -52,8 +58,6 @@ github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= -github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -83,8 +87,6 @@ github.com/tyler-smith/go-bip32 v1.0.0 h1:sDR9juArbUgX+bO/iblgZnMPeWY1KZMUC2AFUJ github.com/tyler-smith/go-bip32 v1.0.0/go.mod h1:onot+eHknzV4BVPwrzqY5OoVpyCvnwD7lMawL5aQupE= github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8= github.com/tyler-smith/go-bip39 v1.1.0/go.mod h1:gUYDtqQw1JS3ZJ8UWVcGTGqqr6YIN3CWg+kkNaLt55U= -github.com/valyala/fastjson v1.6.3 h1:tAKFnnwmeMGPbwJ7IwxcTPCNr3uIzoIj3/Fh90ra4xc= -github.com/valyala/fastjson v1.6.3/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= golang.org/x/crypto v0.0.0-20170613210332-850760c427c5/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -108,6 +110,8 @@ golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/relay.go b/relay.go index abbbb09..fcec2db 100644 --- a/relay.go +++ b/relay.go @@ -9,7 +9,6 @@ import ( "time" s "github.com/SaveTheRbtz/generic-sync-map-go" - "github.com/gorilla/websocket" ) type Status int @@ -39,8 +38,9 @@ type Relay struct { URL string RequestHeader http.Header // e.g. for origin header - Connection *Connection - subscriptions s.MapOf[string, *Subscription] + Connection *Connection + EnableCompression bool + subscriptions s.MapOf[string, *Subscription] Challenges chan string // NIP-42 Challenges Notices chan string @@ -90,11 +90,12 @@ func (r *Relay) Connect(ctx context.Context) error { defer cancel() } - socket, _, err := websocket.DefaultDialer.DialContext(ctx, r.URL, r.RequestHeader) + conn, err := NewConnection(ctx, r.URL, r.RequestHeader, r.EnableCompression) if err != nil { cancel() return fmt.Errorf("error opening websocket to '%s': %w", r.URL, err) } + r.Connection = conn r.Challenges = make(chan string) r.Notices = make(chan string) @@ -108,9 +109,6 @@ func (r *Relay) Connect(ctx context.Context) error { r.mutex.Unlock() }() - conn := NewConnection(socket) - r.Connection = conn - // ping every 29 seconds go func() { ticker := time.NewTicker(29 * time.Second) @@ -119,7 +117,7 @@ func (r *Relay) Connect(ctx context.Context) error { for { select { case <-ticker.C: - err := conn.WriteMessage(websocket.PingMessage, nil) + err := conn.Ping() if err != nil { InfoLogger.Printf("{%s} error writing ping: %v; closing websocket", r.URL, err) return @@ -132,13 +130,13 @@ func (r *Relay) Connect(ctx context.Context) error { go func() { defer cancel() for { - typ, message, err := conn.socket.ReadMessage() + message, err := conn.ReadMessage(r.ConnectionContext) if err != nil { r.ConnectionError = err break } - if typ != websocket.TextMessage || len(message) == 0 || message[0] != '[' { + if len(message) == 0 || message[0] != '[' { continue } diff --git a/subscription_test.go b/subscription_test.go index f004c67..7931487 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -43,3 +43,44 @@ end: t.Errorf("expected 2 events, got %d", events) } } + +func TestSubscribeEnableCompression(t *testing.T) { + rl := &Relay{URL: NormalizeURL("wss://relay.damus.io"), EnableCompression: true} + err := rl.Connect(context.Background()) + if err != nil { + t.Fatalf("connection failed: %v", err) + } + defer rl.Close() + + sub, err := rl.Subscribe(context.Background(), Filters{{Kinds: []int{1}, Limit: 2}}) + if err != nil { + t.Errorf("subscription failed: %v", err) + return + } + + timeout := time.After(5 * time.Second) + events := 0 + + for { + select { + case event := <-sub.Events: + if event == nil { + t.Errorf("event is nil: %v", event) + } + events++ + case <-sub.EndOfStoredEvents: + goto end + case <-rl.ConnectionContext.Done(): + t.Errorf("connection closed: %v", rl.ConnectionContext.Err()) + goto end + case <-timeout: + t.Errorf("timeout") + goto end + } + } + +end: + if events != 2 { + t.Errorf("expected 2 events, got %d", events) + } +} From f152a5e0c714aae888b2fb81d388fe850406a682 Mon Sep 17 00:00:00 2001 From: Marc Tarnutzer Date: Thu, 4 May 2023 23:54:39 +0200 Subject: [PATCH 2/5] simplify connection close --- connection.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/connection.go b/connection.go index bc4c1e8..af2c9e3 100644 --- a/connection.go +++ b/connection.go @@ -204,10 +204,5 @@ func (c *Connection) ReadMessage(ctx context.Context) ([]byte, error) { } func (c *Connection) Close() error { - err := c.conn.Close() - if err != nil { - return fmt.Errorf("failed to close connection: %w", err) - } - - return nil + return c.conn.Close() } From ee9502bc3e234a022a6b6f88486607e417dd105a Mon Sep 17 00:00:00 2001 From: Marc Tarnutzer Date: Fri, 5 May 2023 12:14:29 +0200 Subject: [PATCH 3/5] fix: outgoing pings --- connection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connection.go b/connection.go index af2c9e3..ec45fdd 100644 --- a/connection.go +++ b/connection.go @@ -129,7 +129,7 @@ func (c *Connection) Ping() error { c.mutex.Lock() defer c.mutex.Unlock() - return wsutil.WriteClientMessage(c.writer, ws.OpPing, nil) + return wsutil.WriteClientMessage(c.conn, ws.OpPing, nil) } func (c *Connection) WriteMessage(data []byte) error { From c86e907142689830a707251a568dc99bad703468 Mon Sep 17 00:00:00 2001 From: Marc Tarnutzer Date: Fri, 5 May 2023 22:00:25 +0200 Subject: [PATCH 4/5] enable compression by default --- connection.go | 23 ++++++++++++++--------- relay.go | 7 +++---- subscription_test.go | 41 ----------------------------------------- 3 files changed, 17 insertions(+), 54 deletions(-) diff --git a/connection.go b/connection.go index ec45fdd..e858d3b 100644 --- a/connection.go +++ b/connection.go @@ -29,23 +29,28 @@ type Connection struct { mutex sync.Mutex } -func NewConnection(ctx context.Context, url string, requestHeader http.Header, enableCompression bool) (*Connection, error) { +func NewConnection(ctx context.Context, url string, requestHeader http.Header) (*Connection, error) { dialer := ws.Dialer{ Header: ws.HandshakeHeaderHTTP(requestHeader), - } - state := ws.StateClientSide - if enableCompression { - state |= ws.StateExtended - dialer.Extensions = []httphead.Option{ + Extensions: []httphead.Option{ wsflate.DefaultParameters.Option(), - } + }, } - - conn, _, _, err := dialer.Dial(ctx, url) + conn, _, hs, err := dialer.Dial(ctx, url) if err != nil { return nil, fmt.Errorf("failed to dial: %w", err) } + enableCompression := false + state := ws.StateClientSide + for _, extension := range hs.Extensions { + if string(extension.Name) == wsflate.ExtensionName { + enableCompression = true + state |= ws.StateExtended + break + } + } + // reader var flateReader *wsflate.Reader var msgState wsflate.MessageState diff --git a/relay.go b/relay.go index fcec2db..a4af3d0 100644 --- a/relay.go +++ b/relay.go @@ -38,9 +38,8 @@ type Relay struct { URL string RequestHeader http.Header // e.g. for origin header - Connection *Connection - EnableCompression bool - subscriptions s.MapOf[string, *Subscription] + Connection *Connection + subscriptions s.MapOf[string, *Subscription] Challenges chan string // NIP-42 Challenges Notices chan string @@ -90,7 +89,7 @@ func (r *Relay) Connect(ctx context.Context) error { defer cancel() } - conn, err := NewConnection(ctx, r.URL, r.RequestHeader, r.EnableCompression) + conn, err := NewConnection(ctx, r.URL, r.RequestHeader) if err != nil { cancel() return fmt.Errorf("error opening websocket to '%s': %w", r.URL, err) diff --git a/subscription_test.go b/subscription_test.go index 7931487..f004c67 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -43,44 +43,3 @@ end: t.Errorf("expected 2 events, got %d", events) } } - -func TestSubscribeEnableCompression(t *testing.T) { - rl := &Relay{URL: NormalizeURL("wss://relay.damus.io"), EnableCompression: true} - err := rl.Connect(context.Background()) - if err != nil { - t.Fatalf("connection failed: %v", err) - } - defer rl.Close() - - sub, err := rl.Subscribe(context.Background(), Filters{{Kinds: []int{1}, Limit: 2}}) - if err != nil { - t.Errorf("subscription failed: %v", err) - return - } - - timeout := time.After(5 * time.Second) - events := 0 - - for { - select { - case event := <-sub.Events: - if event == nil { - t.Errorf("event is nil: %v", event) - } - events++ - case <-sub.EndOfStoredEvents: - goto end - case <-rl.ConnectionContext.Done(): - t.Errorf("connection closed: %v", rl.ConnectionContext.Err()) - goto end - case <-timeout: - t.Errorf("timeout") - goto end - } - } - -end: - if events != 2 { - t.Errorf("expected 2 events, got %d", events) - } -} From 69b9d82bb1135bd95e3b633e731fbabea0758b7c Mon Sep 17 00:00:00 2001 From: Marc Tarnutzer Date: Sat, 6 May 2023 01:48:01 +0200 Subject: [PATCH 5/5] check if messages are compressed on receive --- connection.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/connection.go b/connection.go index e858d3b..a54fdd7 100644 --- a/connection.go +++ b/connection.go @@ -26,6 +26,7 @@ type Connection struct { reader *wsutil.Reader flateWriter *wsflate.Writer writer *wsutil.Writer + msgState *wsflate.MessageState mutex sync.Mutex } @@ -67,6 +68,7 @@ func NewConnection(ctx context.Context, url string, requestHeader http.Header) ( Source: conn, State: state, OnIntermediate: controlHandler, + CheckUTF8: false, Extensions: []wsutil.RecvExtension{ &msgState, }, @@ -98,6 +100,7 @@ func NewConnection(ctx context.Context, url string, requestHeader http.Header) ( flateReader: flateReader, reader: reader, flateWriter: flateWriter, + msgState: &msgState, writer: writer, }, nil } @@ -106,7 +109,7 @@ func (c *Connection) WriteJSON(v any) error { c.mutex.Lock() defer c.mutex.Unlock() - if c.enableCompression { + if c.enableCompression && c.msgState.IsCompressed() { c.flateWriter.Reset(c.writer) if err := json.NewEncoder(c.flateWriter).Encode(v); err != nil { return fmt.Errorf("failed to encode json: %w", err) @@ -141,7 +144,7 @@ func (c *Connection) WriteMessage(data []byte) error { c.mutex.Lock() defer c.mutex.Unlock() - if c.enableCompression { + if c.msgState.IsCompressed() && c.enableCompression { c.flateWriter.Reset(c.writer) if _, err := io.Copy(c.flateWriter, bytes.NewReader(data)); err != nil { return fmt.Errorf("failed to write message: %w", err) @@ -194,7 +197,7 @@ func (c *Connection) ReadMessage(ctx context.Context) ([]byte, error) { } buf := new(bytes.Buffer) - if c.enableCompression { + if c.msgState.IsCompressed() && c.enableCompression { c.flateReader.Reset(c.reader) if _, err := io.Copy(buf, c.flateReader); err != nil { return nil, fmt.Errorf("failed to read message: %w", err)