From 09b2a2fc5457d9e88c466b534ed2fa4353b46d6b Mon Sep 17 00:00:00 2001 From: Bearetta Date: Tue, 27 Aug 2024 16:07:51 -0400 Subject: [PATCH 1/4] fix: wrong configuration for the eth connection pool. * add a specific error for missing RPC URL * fix key prefix in config.toml of the example listener --- client/eth/connection_pool.go | 14 +++++++++++++- examples/listener/config.toml | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/client/eth/connection_pool.go b/client/eth/connection_pool.go index 3881a2ec..c2a3c8af 100644 --- a/client/eth/connection_pool.go +++ b/client/eth/connection_pool.go @@ -36,6 +36,17 @@ func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (Connect cfg.HealthCheckInterval = defaultHealthCheckInterval } + // The LRU cache needs at least one URL provided for both HTTP and WS. + hasEthHttpUrls := len(cfg.EthHTTPURLs) > 0 + if !hasEthHttpUrls { + return nil, fmt.Errorf("ConnectionPool: missing URL for HTTP clients") + } + + hasEthWSUrls := len(cfg.EthWSURLs) > 0 + if !hasEthWSUrls { + return nil, fmt.Errorf("ConnectionPool: missing URL for WS clients") + } + cache, err := lru.NewWithEvict( len(cfg.EthHTTPURLs), func(_ string, v *HealthCheckedClient) { defer v.Close() @@ -46,8 +57,9 @@ func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (Connect if err != nil { return nil, err } + wsCache, err := lru.NewWithEvict( - len(cfg.EthHTTPURLs), func(_ string, v *HealthCheckedClient) { + len(cfg.EthWSURLs), func(_ string, v *HealthCheckedClient) { defer v.Close() // The timeout is added so that any in progress // requests have a chance to complete before we close. diff --git a/examples/listener/config.toml b/examples/listener/config.toml index 26573255..642db720 100644 --- a/examples/listener/config.toml +++ b/examples/listener/config.toml @@ -19,7 +19,7 @@ Enabled = true Namespace = "example" Subsystem = "listener_app" -[App.ConnectionPool] +[ConnectionPool] EthHTTPURLs = ["http://localhost:10545"] EthWSURLs = ["ws://localhost:10546"] DefaultTimeout = "5s" From db5fcc146fabbaf53a39de8d9c3d9b9d854574c2 Mon Sep 17 00:00:00 2001 From: Bearetta Date: Tue, 27 Aug 2024 19:11:32 -0400 Subject: [PATCH 2/4] fix: wrong configuration for the eth connection pool. * make WS client an optional one * add tests for connection_pool.go * fix lint errors --- client/eth/connection_pool.go | 67 ++++++++++------- client/eth/connection_pool_test.go | 112 +++++++++++++++++++++++++++++ 2 files changed, 154 insertions(+), 25 deletions(-) create mode 100644 client/eth/connection_pool_test.go diff --git a/client/eth/connection_pool.go b/client/eth/connection_pool.go index c2a3c8af..a343ec78 100644 --- a/client/eth/connection_pool.go +++ b/client/eth/connection_pool.go @@ -36,37 +36,43 @@ func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (Connect cfg.HealthCheckInterval = defaultHealthCheckInterval } + var ( + cache *lru.Cache[string, *HealthCheckedClient] + wsCache *lru.Cache[string, *HealthCheckedClient] + err error + ) + // The LRU cache needs at least one URL provided for both HTTP and WS. - hasEthHttpUrls := len(cfg.EthHTTPURLs) > 0 - if !hasEthHttpUrls { + if len(cfg.EthHTTPURLs) == 0 { return nil, fmt.Errorf("ConnectionPool: missing URL for HTTP clients") } - hasEthWSUrls := len(cfg.EthWSURLs) > 0 - if !hasEthWSUrls { - return nil, fmt.Errorf("ConnectionPool: missing URL for WS clients") - } - - cache, err := lru.NewWithEvict( + cache, err = lru.NewWithEvict( len(cfg.EthHTTPURLs), func(_ string, v *HealthCheckedClient) { defer v.Close() // The timeout is added so that any in progress // requests have a chance to complete before we close. time.Sleep(cfg.DefaultTimeout) }) + if err != nil { return nil, err } - wsCache, err := lru.NewWithEvict( - len(cfg.EthWSURLs), func(_ string, v *HealthCheckedClient) { - defer v.Close() - // The timeout is added so that any in progress - // requests have a chance to complete before we close. - time.Sleep(cfg.DefaultTimeout) - }) - if err != nil { - return nil, err + if len(cfg.EthWSURLs) == 0 { + logger.Warn("ConnectionPool: missing URL for WS clients") + } else { + wsCache, err = lru.NewWithEvict( + len(cfg.EthWSURLs), func(_ string, v *HealthCheckedClient) { + defer v.Close() + // The timeout is added so that any in progress + // requests have a chance to complete before we close. + time.Sleep(cfg.DefaultTimeout) + }) + + if err != nil { + return nil, err + } } return &ConnectionPoolImpl{ @@ -80,6 +86,11 @@ func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (Connect func (c *ConnectionPoolImpl) Close() error { c.mutex.Lock() defer c.mutex.Unlock() + + if c.cache == nil { + return nil + } + for _, client := range c.cache.Keys() { if err := c.removeClient(client); err != nil { return err @@ -111,21 +122,27 @@ func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error { } func (c *ConnectionPoolImpl) GetHTTP() (Client, bool) { - c.mutex.Lock() - defer c.mutex.Unlock() -retry: - _, client, ok := c.cache.GetOldest() - if !client.Health() { - goto retry + if c.cache == nil { + return nil, false } - return client, ok + return c.getClient(c.cache) } func (c *ConnectionPoolImpl) GetWS() (Client, bool) { + // Because the WS URL is optional, we need to check if it's nil. + if c.wsCache == nil { + return nil, false + } + return c.getClient(c.wsCache) +} + +func (c *ConnectionPoolImpl) getClient( + cache *lru.Cache[string, *HealthCheckedClient], +) (Client, bool) { c.mutex.Lock() defer c.mutex.Unlock() retry: - _, client, ok := c.wsCache.GetOldest() + _, client, ok := cache.GetOldest() if !client.Health() { goto retry } diff --git a/client/eth/connection_pool_test.go b/client/eth/connection_pool_test.go new file mode 100644 index 00000000..cd90c57a --- /dev/null +++ b/client/eth/connection_pool_test.go @@ -0,0 +1,112 @@ +package eth_test + +import ( + "bytes" + "io" + "os" + "testing" + + "github.com/berachain/offchain-sdk/client/eth" + "github.com/berachain/offchain-sdk/log" + "github.com/stretchr/testify/require" +) + +var ( + HTTPURL = os.Getenv("ETH_HTTP_URL") + WSURL = os.Getenv("ETH_WS_URL") +) + +// InitConnectionPool initializes a new connection pool. +func InitConnectionPool( + cfg eth.ConnectionPoolConfig, writer io.Writer, +) (eth.ConnectionPool, error) { + logger := log.NewLogger(writer, "test-runner") + return eth.NewConnectionPoolImpl(cfg, logger) +} + +// TestNewConnectionPoolImpl_MissingURLs tests the case when the URLs are missing. +func TestNewConnectionPoolImpl_MissingURLs(t *testing.T) { + cfg := eth.ConnectionPoolConfig{} + var logBuffer bytes.Buffer + + _, err := InitConnectionPool(cfg, &logBuffer) + require.ErrorContains(t, err, "ConnectionPool: missing URL for HTTP clients") +} + +// TestNewConnectionPoolImpl_MissingWSURLs tests the case when the WS URLs are missing. +func TestNewConnectionPoolImpl_MissingWSURLs(t *testing.T) { + cfg := eth.ConnectionPoolConfig{ + EthHTTPURLs: []string{HTTPURL}, + } + var logBuffer bytes.Buffer + pool, err := InitConnectionPool(cfg, &logBuffer) + + require.NoError(t, err) + require.NotNil(t, pool) + require.Contains(t, logBuffer.String(), "ConnectionPool: missing URL for WS clients") +} + +// TestNewConnectionPoolImpl tests the case when the URLs are provided. +// It should the expected behavior. +func TestNewConnectionPoolImpl(t *testing.T) { + cfg := eth.ConnectionPoolConfig{ + EthHTTPURLs: []string{HTTPURL}, + EthWSURLs: []string{WSURL}, + } + var logBuffer bytes.Buffer + pool, err := InitConnectionPool(cfg, &logBuffer) + + require.NoError(t, err) + require.NotNil(t, pool) + require.Empty(t, logBuffer.String()) +} + +// TestGetHTTP tests the retrieval of the HTTP client when it +// has been set and the connection has been established. +func TestGetHTTP(t *testing.T) { + cfg := eth.ConnectionPoolConfig{ + EthHTTPURLs: []string{HTTPURL}, + } + var logBuffer bytes.Buffer + pool, _ := InitConnectionPool(cfg, &logBuffer) + err := pool.Dial("") + require.NoError(t, err) + + client, ok := pool.GetHTTP() + require.True(t, ok) + require.NotNil(t, client) +} + +// TestGetWS tests the retrieval of the HTTP client when it +// has been set and the connection has been established. +func TestGetWS(t *testing.T) { + cfg := eth.ConnectionPoolConfig{ + EthHTTPURLs: []string{HTTPURL}, + EthWSURLs: []string{WSURL}, + } + var logBuffer bytes.Buffer + pool, _ := InitConnectionPool(cfg, &logBuffer) + err := pool.Dial("") + + require.NoError(t, err) + + client, ok := pool.GetWS() + require.True(t, ok) + require.NotNil(t, client) +} + +// TestGetWS_WhenItIsNotSet tests the retrieval of the WS client when +// no WS URLs have been provided. +func TestGetWS_WhenItIsNotSet(t *testing.T) { + cfg := eth.ConnectionPoolConfig{ + EthHTTPURLs: []string{HTTPURL}, + } + var logBuffer bytes.Buffer + pool, _ := InitConnectionPool(cfg, &logBuffer) + err := pool.Dial("") + require.NoError(t, err) + + client, ok := pool.GetWS() + require.False(t, ok) + require.Nil(t, client) +} From 502165c3dd4a25446456afb057434b8a9f3082e0 Mon Sep 17 00:00:00 2001 From: Bearetta Date: Wed, 28 Aug 2024 11:29:43 -0400 Subject: [PATCH 3/4] refactor: improved eth.ConnectionPoolImpl * rename function getClient in getClientFrom * move mutex usage to user-facing functions --- client/eth/connection_pool.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/client/eth/connection_pool.go b/client/eth/connection_pool.go index a343ec78..dea15f87 100644 --- a/client/eth/connection_pool.go +++ b/client/eth/connection_pool.go @@ -42,7 +42,7 @@ func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (Connect err error ) - // The LRU cache needs at least one URL provided for both HTTP and WS. + // The LRU cache needs at least one URL provided for HTTP. if len(cfg.EthHTTPURLs) == 0 { return nil, fmt.Errorf("ConnectionPool: missing URL for HTTP clients") } @@ -54,7 +54,6 @@ func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (Connect // requests have a chance to complete before we close. time.Sleep(cfg.DefaultTimeout) }) - if err != nil { return nil, err } @@ -69,7 +68,6 @@ func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (Connect // requests have a chance to complete before we close. time.Sleep(cfg.DefaultTimeout) }) - if err != nil { return nil, err } @@ -122,25 +120,30 @@ func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error { } func (c *ConnectionPoolImpl) GetHTTP() (Client, bool) { + c.mutex.Lock() + defer c.mutex.Unlock() + if c.cache == nil { return nil, false } - return c.getClient(c.cache) + return c.getClientFrom(c.cache) } func (c *ConnectionPoolImpl) GetWS() (Client, bool) { + c.mutex.Lock() + defer c.mutex.Unlock() + // Because the WS URL is optional, we need to check if it's nil. if c.wsCache == nil { return nil, false } - return c.getClient(c.wsCache) + return c.getClientFrom(c.wsCache) } -func (c *ConnectionPoolImpl) getClient( +// NOTE: this function assumes the lock is held and cache is non-nil +func (c *ConnectionPoolImpl) getClientFrom( cache *lru.Cache[string, *HealthCheckedClient], ) (Client, bool) { - c.mutex.Lock() - defer c.mutex.Unlock() retry: _, client, ok := cache.GetOldest() if !client.Health() { From ed0f235b012f11e6cb779246d3cdcc0063602252 Mon Sep 17 00:00:00 2001 From: Bearetta Date: Wed, 28 Aug 2024 14:33:38 -0400 Subject: [PATCH 4/4] fix: skip tests for ConnectionPoolImpl if the env vars are not set. * Add a checkEnv function used to skip the test if the env vars are not set. * Remove unneeded nil checks for HTTP clients cache * Check if the WS Url cache is null before adding WS clients to the cache. --- client/eth/connection_pool.go | 15 ++++++++---- client/eth/connection_pool_test.go | 38 +++++++++++++++++++++++------- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/client/eth/connection_pool.go b/client/eth/connection_pool.go index dea15f87..268c4005 100644 --- a/client/eth/connection_pool.go +++ b/client/eth/connection_pool.go @@ -102,6 +102,8 @@ func (c *ConnectionPoolImpl) Dial(string) error { } func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error { + // NOTE: Check the cache for the HTTP URL is not needed because it + // is guaranteed to be non-nil when a ConnectionPoolImpl is created. for _, url := range c.config.EthHTTPURLs { client := NewHealthCheckedClient(c.config.HealthCheckInterval, c.logger) if err := client.DialContextWithTimeout(ctx, url, c.config.DefaultTimeout); err != nil { @@ -109,6 +111,12 @@ func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error { } c.cache.Add(url, client) } + + // Check is needed because the WS URL is optional. + if c.wsCache == nil { + return nil + } + for _, url := range c.config.EthWSURLs { client := NewHealthCheckedClient(c.config.HealthCheckInterval, c.logger) if err := client.DialContextWithTimeout(ctx, url, c.config.DefaultTimeout); err != nil { @@ -119,13 +127,12 @@ func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error { return nil } +// NOTE: this function assumes the cache is non-nil +// because it is guaranteed to be non-nil when a ConnectionPoolImpl is created. func (c *ConnectionPoolImpl) GetHTTP() (Client, bool) { c.mutex.Lock() defer c.mutex.Unlock() - if c.cache == nil { - return nil, false - } return c.getClientFrom(c.cache) } @@ -140,7 +147,7 @@ func (c *ConnectionPoolImpl) GetWS() (Client, bool) { return c.getClientFrom(c.wsCache) } -// NOTE: this function assumes the lock is held and cache is non-nil +// NOTE: this function assumes the lock is held and cache is non-nil. func (c *ConnectionPoolImpl) getClientFrom( cache *lru.Cache[string, *HealthCheckedClient], ) (Client, bool) { diff --git a/client/eth/connection_pool_test.go b/client/eth/connection_pool_test.go index cd90c57a..1921bea3 100644 --- a/client/eth/connection_pool_test.go +++ b/client/eth/connection_pool_test.go @@ -16,20 +16,42 @@ var ( WSURL = os.Getenv("ETH_WS_URL") ) -// InitConnectionPool initializes a new connection pool. -func InitConnectionPool( +/******************************* HELPER FUNCTIONS ***************************************/ + +// NOTE: requires chain rpc url at env var `ETH_HTTP_URL` and `ETH_WS_URL`. +func checkEnv(t *testing.T) { + ethHTTPRPC := os.Getenv("ETH_HTTP_URL") + ethWSRPC := os.Getenv("ETH_WS_URL") + if ethHTTPRPC == "" || ethWSRPC == "" { + t.Skipf("Skipping test: no eth rpc url provided") + } +} + +// initConnectionPool initializes a new connection pool. +func initConnectionPool( cfg eth.ConnectionPoolConfig, writer io.Writer, ) (eth.ConnectionPool, error) { logger := log.NewLogger(writer, "test-runner") return eth.NewConnectionPoolImpl(cfg, logger) } +// Use Init function as a setup function for the tests. +// It means each test will have to call Init function to set up the test. +func Init( + cfg eth.ConnectionPoolConfig, writer io.Writer, t *testing.T, +) (eth.ConnectionPool, error) { + checkEnv(t) + return initConnectionPool(cfg, writer) +} + +/******************************* TEST CASES ***************************************/ + // TestNewConnectionPoolImpl_MissingURLs tests the case when the URLs are missing. func TestNewConnectionPoolImpl_MissingURLs(t *testing.T) { cfg := eth.ConnectionPoolConfig{} var logBuffer bytes.Buffer - _, err := InitConnectionPool(cfg, &logBuffer) + _, err := Init(cfg, &logBuffer, t) require.ErrorContains(t, err, "ConnectionPool: missing URL for HTTP clients") } @@ -39,7 +61,7 @@ func TestNewConnectionPoolImpl_MissingWSURLs(t *testing.T) { EthHTTPURLs: []string{HTTPURL}, } var logBuffer bytes.Buffer - pool, err := InitConnectionPool(cfg, &logBuffer) + pool, err := Init(cfg, &logBuffer, t) require.NoError(t, err) require.NotNil(t, pool) @@ -54,7 +76,7 @@ func TestNewConnectionPoolImpl(t *testing.T) { EthWSURLs: []string{WSURL}, } var logBuffer bytes.Buffer - pool, err := InitConnectionPool(cfg, &logBuffer) + pool, err := Init(cfg, &logBuffer, t) require.NoError(t, err) require.NotNil(t, pool) @@ -68,7 +90,7 @@ func TestGetHTTP(t *testing.T) { EthHTTPURLs: []string{HTTPURL}, } var logBuffer bytes.Buffer - pool, _ := InitConnectionPool(cfg, &logBuffer) + pool, _ := Init(cfg, &logBuffer, t) err := pool.Dial("") require.NoError(t, err) @@ -85,7 +107,7 @@ func TestGetWS(t *testing.T) { EthWSURLs: []string{WSURL}, } var logBuffer bytes.Buffer - pool, _ := InitConnectionPool(cfg, &logBuffer) + pool, _ := Init(cfg, &logBuffer, t) err := pool.Dial("") require.NoError(t, err) @@ -102,7 +124,7 @@ func TestGetWS_WhenItIsNotSet(t *testing.T) { EthHTTPURLs: []string{HTTPURL}, } var logBuffer bytes.Buffer - pool, _ := InitConnectionPool(cfg, &logBuffer) + pool, _ := Init(cfg, &logBuffer, t) err := pool.Dial("") require.NoError(t, err)