From c00c6ebadf3a9f2136064441f69e135f804b700c Mon Sep 17 00:00:00 2001 From: rhoninlee Date: Tue, 10 Dec 2024 11:15:19 +0800 Subject: [PATCH 1/7] add reconnect for lwm2m gateway --- pkg/gateway/lwm2m/client/lwm2m.go | 103 ++++++++++++++++++++++++++++-- pkg/gateway/lwm2m/gatewaylwm2m.go | 7 -- 2 files changed, 98 insertions(+), 12 deletions(-) diff --git a/pkg/gateway/lwm2m/client/lwm2m.go b/pkg/gateway/lwm2m/client/lwm2m.go index 24df738d2..c538dd5fc 100644 --- a/pkg/gateway/lwm2m/client/lwm2m.go +++ b/pkg/gateway/lwm2m/client/lwm2m.go @@ -30,6 +30,9 @@ const ( registerPath = "/rd" observeTaskSuffix = "-ob" + + reconnectInterval = 5 * time.Second + maxReconnectBackoff = 60 * time.Second ) type Client struct { @@ -44,6 +47,10 @@ type Client struct { udpConnection *udpClient.Conn taskManager *TaskManager + + reconnectCh chan struct{} + stopCh chan struct{} + connected bool } type Config struct { @@ -55,23 +62,102 @@ type Config struct { func NewClient(ctx context.Context, config Config) (*Client, error) { var client = &Client{ - ctx: context.TODO(), + ctx: ctx, Config: config, object: *NewObject(rootObjectId, nil), taskManager: NewTaskManager(ctx), dataCache: make(map[string]interface{}), + reconnectCh: make(chan struct{}), + stopCh: make(chan struct{}), } return client, nil } func (c *Client) Start() error { + // Start connection monitor + go c.connectionMonitor() + + // Initial connection + if err := c.connect(); err != nil { + return err + } + + return c.Register() +} + +func (c *Client) connectionMonitor() { + backoff := reconnectInterval + + for { + select { + case <-c.stopCh: + return + + case <-c.reconnectCh: + logger.Info("Connection lost, attempting to reconnect...") + + for { + // Try to reconnect + err := c.reconnect() + if err == nil { + logger.Info("Successfully reconnected") + backoff = reconnectInterval // Reset backoff on successful connection + break + } + + logger.Errorf("Failed to reconnect: %v", err) + + // Exponential backoff with max limit + backoff = time.Duration(float64(backoff) * 1.5) + if backoff > maxReconnectBackoff { + backoff = maxReconnectBackoff + } + + select { + case <-c.stopCh: + return + case <-time.After(backoff): + continue + } + } + } + } +} + +func (c *Client) reconnect() error { + // Close existing connection if any + if c.udpConnection != nil { + c.udpConnection.Close() + c.udpConnection = nil + } + + // Establish new connection + if err := c.connect(); err != nil { + return err + } + + // Re-register with the server + if err := c.Update(); err != nil { + return err + } + + c.connected = true + return nil +} + +func (c *Client) connect() error { udpClientOpts := []udp.Option{} udpClientOpts = append( udpClientOpts, options.WithInactivityMonitor(time.Minute, func(cc *udpClient.Conn) { - _ = cc.Close() + logger.Warn("Connection inactive, triggering reconnect") + c.connected = false + select { + case c.reconnectCh <- struct{}{}: + default: + } }), options.WithMux(c.handleRouter()), ) @@ -106,6 +192,7 @@ func (c *Client) Start() error { } c.udpConnection = conn + c.connected = true return nil } @@ -133,12 +220,10 @@ func (c *Client) Register() error { } // set query params for register request - // example: /rd?ep=shifu-gateway<=300&lwm2m=1.0&b=U request.AddQuery(fmt.Sprintf("%s=%s", QueryParamsEndpointName, c.EndpointName)) request.AddQuery(fmt.Sprintf("%s=%d", QueryParamslifeTime, c.Settings.LifeTimeSec)) request.AddQuery(fmt.Sprintf("%s=%s", QueryParamsLwM2MVersion, lwM2MVersion)) request.AddQuery(fmt.Sprintf("%s=%s", QueryParamsBindingMode, defaultBindingMode)) - // only accept text/plain request.SetAccept(message.TextPlain) resp, err := c.udpConnection.Do(request) if err != nil { @@ -452,5 +537,13 @@ func (c *Client) CleanUp() { } func (c *Client) isActivity() bool { - return time.Now().Before(c.lastUpdatedTime.Add(time.Duration(c.Settings.LifeTimeSec) * time.Second)) + return c.udpConnection != nil && time.Now().Before(c.lastUpdatedTime.Add(time.Duration(c.Settings.LifeTimeSec)*time.Second)) +} + +func (c *Client) Stop() error { + close(c.stopCh) + if c.udpConnection != nil { + return c.udpConnection.Close() + } + return nil } diff --git a/pkg/gateway/lwm2m/gatewaylwm2m.go b/pkg/gateway/lwm2m/gatewaylwm2m.go index 8a7f44a9e..d52b633d6 100644 --- a/pkg/gateway/lwm2m/gatewaylwm2m.go +++ b/pkg/gateway/lwm2m/gatewaylwm2m.go @@ -155,13 +155,6 @@ func (g *Gateway) Start() error { return err } - // Register the client to the server - err := g.client.Register() - if err != nil { - logger.Errorf("Error registering client: %v", err) - return err - } - // Ping the client every pingIntervalSec seconds, by default 30 seconds t := time.NewTicker(time.Second * time.Duration(g.pingIntervalSec)) for range t.C { From 6280bc505690e9b6b539f4a67bf94033cd18a540 Mon Sep 17 00:00:00 2001 From: rhoninlee Date: Tue, 10 Dec 2024 11:22:14 +0800 Subject: [PATCH 2/7] remove unused var --- pkg/gateway/lwm2m/client/lwm2m.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/gateway/lwm2m/client/lwm2m.go b/pkg/gateway/lwm2m/client/lwm2m.go index c538dd5fc..5022f68c7 100644 --- a/pkg/gateway/lwm2m/client/lwm2m.go +++ b/pkg/gateway/lwm2m/client/lwm2m.go @@ -50,7 +50,6 @@ type Client struct { reconnectCh chan struct{} stopCh chan struct{} - connected bool } type Config struct { @@ -142,7 +141,6 @@ func (c *Client) reconnect() error { return err } - c.connected = true return nil } @@ -153,7 +151,6 @@ func (c *Client) connect() error { udpClientOpts, options.WithInactivityMonitor(time.Minute, func(cc *udpClient.Conn) { logger.Warn("Connection inactive, triggering reconnect") - c.connected = false select { case c.reconnectCh <- struct{}{}: default: @@ -192,7 +189,6 @@ func (c *Client) connect() error { } c.udpConnection = conn - c.connected = true return nil } From 34e57b6eef2ddf9c276cc8d9cc4c8b95bfef79ce Mon Sep 17 00:00:00 2001 From: rhoninlee Date: Tue, 10 Dec 2024 11:26:10 +0800 Subject: [PATCH 3/7] update comment --- pkg/gateway/lwm2m/client/lwm2m.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/gateway/lwm2m/client/lwm2m.go b/pkg/gateway/lwm2m/client/lwm2m.go index 5022f68c7..fbb66f1e9 100644 --- a/pkg/gateway/lwm2m/client/lwm2m.go +++ b/pkg/gateway/lwm2m/client/lwm2m.go @@ -136,7 +136,7 @@ func (c *Client) reconnect() error { return err } - // Re-register with the server + // Update with the server if err := c.Update(); err != nil { return err } From 4aa01a39c9bbde96ab4524426a6bc0da74589e57 Mon Sep 17 00:00:00 2001 From: rhoninlee Date: Wed, 11 Dec 2024 17:28:45 +0800 Subject: [PATCH 4/7] fix notify will crash when reconnect --- pkg/gateway/lwm2m/client/lwm2m.go | 41 ++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/pkg/gateway/lwm2m/client/lwm2m.go b/pkg/gateway/lwm2m/client/lwm2m.go index fbb66f1e9..2c430f853 100644 --- a/pkg/gateway/lwm2m/client/lwm2m.go +++ b/pkg/gateway/lwm2m/client/lwm2m.go @@ -33,6 +33,7 @@ const ( reconnectInterval = 5 * time.Second maxReconnectBackoff = 60 * time.Second + reconnectBackoffExp = 1.5 ) type Client struct { @@ -74,14 +75,14 @@ func NewClient(ctx context.Context, config Config) (*Client, error) { } func (c *Client) Start() error { - // Start connection monitor - go c.connectionMonitor() - // Initial connection if err := c.connect(); err != nil { return err } + // Start connection monitor + go c.connectionMonitor() + return c.Register() } @@ -108,7 +109,7 @@ func (c *Client) connectionMonitor() { logger.Errorf("Failed to reconnect: %v", err) // Exponential backoff with max limit - backoff = time.Duration(float64(backoff) * 1.5) + backoff = time.Duration(float64(backoff) * reconnectBackoffExp) if backoff > maxReconnectBackoff { backoff = maxReconnectBackoff } @@ -447,13 +448,20 @@ func (c *Client) observe(w mux.ResponseWriter, token message.Token, objectId str c.taskManager.AddTask(objectId, time.Second*time.Duration(c.Settings.ObserveIntervalSec), func() { data, err := c.object.ReadAll(objectId) if err != nil { + logger.Errorf("failed to read data from object %s, error: %v", objectId, err) return } jsonData := data.ReadAsJSON() + // check if udp connection is nil + if c.udpConnection == nil { + logger.Errorf("udp connection is nil, ignore observe") + return + } + c.dataCache[objectId] = jsonData - err = sendResponse(w.Conn(), token, obs, jsonData) + err = sendResponse(c.udpConnection, token, obs, jsonData) if err != nil { logger.Errorf("failed to send response: %v", err) return @@ -473,6 +481,12 @@ func (c *Client) observe(w mux.ResponseWriter, token message.Token, objectId str jsonData := data.ReadAsJSON() + // check if udp connection is nil + if c.udpConnection == nil { + logger.Errorf("udp connection is nil, ignore observe") + return + } + // check data is changed if data, exists := c.dataCache[objectId]; exists { if string(jsonData) == data { @@ -482,10 +496,12 @@ func (c *Client) observe(w mux.ResponseWriter, token message.Token, objectId str } c.dataCache[objectId] = jsonData - err = sendResponse(w.Conn(), token, obs, jsonData) + err = sendResponse(c.udpConnection, token, obs, jsonData) if err != nil { + logger.Errorf("failed to send response: %v", err) return } + obs++ c.taskManager.ResetTask(objectId) }) @@ -530,16 +546,13 @@ func sendResponse(cc mux.Conn, token []byte, obs uint32, body string) error { func (c *Client) CleanUp() { c.taskManager.CancelAllTasks() _ = c.Delete() -} - -func (c *Client) isActivity() bool { - return c.udpConnection != nil && time.Now().Before(c.lastUpdatedTime.Add(time.Duration(c.Settings.LifeTimeSec)*time.Second)) -} -func (c *Client) Stop() error { close(c.stopCh) if c.udpConnection != nil { - return c.udpConnection.Close() + _ = c.udpConnection.Close() } - return nil +} + +func (c *Client) isActivity() bool { + return c.udpConnection != nil && time.Now().Before(c.lastUpdatedTime.Add(time.Duration(c.Settings.LifeTimeSec)*time.Second)) } From 6f0711328d8ffd9a2fa910d0fa029e0ba7eeec38 Mon Sep 17 00:00:00 2001 From: rhoninlee Date: Wed, 11 Dec 2024 17:33:39 +0800 Subject: [PATCH 5/7] disable ping interval by default --- pkg/gateway/lwm2m/gatewaylwm2m.go | 17 ++++++++++------- pkg/k8s/api/v1alpha1/edgedevice_types.go | 2 +- .../bases/shifu.edgenesis.io_edgedevices.yaml | 4 ++-- pkg/k8s/crd/install/config_crd.yaml | 4 ++-- pkg/k8s/crd/install/config_default.yaml | 4 ++-- pkg/k8s/crd/install/shifu_install.yml | 4 ++-- 6 files changed, 19 insertions(+), 16 deletions(-) diff --git a/pkg/gateway/lwm2m/gatewaylwm2m.go b/pkg/gateway/lwm2m/gatewaylwm2m.go index d52b633d6..b28dbc2bf 100644 --- a/pkg/gateway/lwm2m/gatewaylwm2m.go +++ b/pkg/gateway/lwm2m/gatewaylwm2m.go @@ -155,15 +155,18 @@ func (g *Gateway) Start() error { return err } - // Ping the client every pingIntervalSec seconds, by default 30 seconds - t := time.NewTicker(time.Second * time.Duration(g.pingIntervalSec)) - for range t.C { - if err := g.client.Ping(); err != nil { - logger.Errorf("Error pinging client: %v", err) - g.ShutDown() - return err + if g.pingIntervalSec <= 0 { + // Ping the client every pingIntervalSec seconds,by default disable + t := time.NewTicker(time.Second * time.Duration(g.pingIntervalSec)) + for range t.C { + if err := g.client.Ping(); err != nil { + logger.Errorf("Error pinging client: %v", err) + g.ShutDown() + return err + } } } + return nil } diff --git a/pkg/k8s/api/v1alpha1/edgedevice_types.go b/pkg/k8s/api/v1alpha1/edgedevice_types.go index 54bcd9263..6de96fe7f 100644 --- a/pkg/k8s/api/v1alpha1/edgedevice_types.go +++ b/pkg/k8s/api/v1alpha1/edgedevice_types.go @@ -113,7 +113,7 @@ type LwM2MSetting struct { PSKIdentity *string `json:"pskIdentity,omitempty"` PSKKey *string `json:"pskKey,omitempty"` - // +kubebuilder:default=30 + // +kubebuilder:default=-1 PingIntervalSec int64 `json:"pingIntervalSec,omitempty"` // reference https://datatracker.ietf.org/doc/html/rfc7252#section-4.8.2 // +kubebuilder:default=247 diff --git a/pkg/k8s/crd/config/edgedevice/bases/shifu.edgenesis.io_edgedevices.yaml b/pkg/k8s/crd/config/edgedevice/bases/shifu.edgenesis.io_edgedevices.yaml index 0e6be5bc9..152f80cd9 100644 --- a/pkg/k8s/crd/config/edgedevice/bases/shifu.edgenesis.io_edgedevices.yaml +++ b/pkg/k8s/crd/config/edgedevice/bases/shifu.edgenesis.io_edgedevices.yaml @@ -73,7 +73,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: @@ -122,7 +122,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: diff --git a/pkg/k8s/crd/install/config_crd.yaml b/pkg/k8s/crd/install/config_crd.yaml index 6e28f2fc0..8c492b341 100644 --- a/pkg/k8s/crd/install/config_crd.yaml +++ b/pkg/k8s/crd/install/config_crd.yaml @@ -72,7 +72,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: @@ -121,7 +121,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: diff --git a/pkg/k8s/crd/install/config_default.yaml b/pkg/k8s/crd/install/config_default.yaml index 53ccf4872..f06ae8ea9 100644 --- a/pkg/k8s/crd/install/config_default.yaml +++ b/pkg/k8s/crd/install/config_default.yaml @@ -79,7 +79,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: @@ -128,7 +128,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: diff --git a/pkg/k8s/crd/install/shifu_install.yml b/pkg/k8s/crd/install/shifu_install.yml index 6919691f4..4e15e1703 100644 --- a/pkg/k8s/crd/install/shifu_install.yml +++ b/pkg/k8s/crd/install/shifu_install.yml @@ -79,7 +79,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: @@ -128,7 +128,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: From 1becb8f6b87f9c37e8d23d403f6810ba6fe32d3e Mon Sep 17 00:00:00 2001 From: rhoninlee Date: Wed, 11 Dec 2024 17:45:10 +0800 Subject: [PATCH 6/7] revert comment --- pkg/gateway/lwm2m/client/lwm2m.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/gateway/lwm2m/client/lwm2m.go b/pkg/gateway/lwm2m/client/lwm2m.go index 2c430f853..2ae79db78 100644 --- a/pkg/gateway/lwm2m/client/lwm2m.go +++ b/pkg/gateway/lwm2m/client/lwm2m.go @@ -217,10 +217,12 @@ func (c *Client) Register() error { } // set query params for register request + // example: /rd?ep=shifu-gateway<=300&lwm2m=1.0&b=U request.AddQuery(fmt.Sprintf("%s=%s", QueryParamsEndpointName, c.EndpointName)) request.AddQuery(fmt.Sprintf("%s=%d", QueryParamslifeTime, c.Settings.LifeTimeSec)) request.AddQuery(fmt.Sprintf("%s=%s", QueryParamsLwM2MVersion, lwM2MVersion)) request.AddQuery(fmt.Sprintf("%s=%s", QueryParamsBindingMode, defaultBindingMode)) + // only accept text/plain request.SetAccept(message.TextPlain) resp, err := c.udpConnection.Do(request) if err != nil { From ff5183a984e05d6607387a1e30b4813b1b2cf6de Mon Sep 17 00:00:00 2001 From: rhoninlee Date: Wed, 11 Dec 2024 17:48:06 +0800 Subject: [PATCH 7/7] fix bad ping check --- pkg/gateway/lwm2m/gatewaylwm2m.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/gateway/lwm2m/gatewaylwm2m.go b/pkg/gateway/lwm2m/gatewaylwm2m.go index b28dbc2bf..e04925827 100644 --- a/pkg/gateway/lwm2m/gatewaylwm2m.go +++ b/pkg/gateway/lwm2m/gatewaylwm2m.go @@ -155,7 +155,7 @@ func (g *Gateway) Start() error { return err } - if g.pingIntervalSec <= 0 { + if g.pingIntervalSec > 0 { // Ping the client every pingIntervalSec seconds,by default disable t := time.NewTicker(time.Second * time.Duration(g.pingIntervalSec)) for range t.C {