diff --git a/pkg/gateway/lwm2m/client/lwm2m.go b/pkg/gateway/lwm2m/client/lwm2m.go index 24df738d2..2ae79db78 100644 --- a/pkg/gateway/lwm2m/client/lwm2m.go +++ b/pkg/gateway/lwm2m/client/lwm2m.go @@ -30,6 +30,10 @@ const ( registerPath = "/rd" observeTaskSuffix = "-ob" + + reconnectInterval = 5 * time.Second + maxReconnectBackoff = 60 * time.Second + reconnectBackoffExp = 1.5 ) type Client struct { @@ -44,6 +48,9 @@ type Client struct { udpConnection *udpClient.Conn taskManager *TaskManager + + reconnectCh chan struct{} + stopCh chan struct{} } type Config struct { @@ -55,23 +62,100 @@ type Config struct { func NewClient(ctx context.Context, config Config) (*Client, error) { var client = &Client{ - ctx: context.TODO(), + ctx: ctx, Config: config, object: *NewObject(rootObjectId, nil), taskManager: NewTaskManager(ctx), dataCache: make(map[string]interface{}), + reconnectCh: make(chan struct{}), + stopCh: make(chan struct{}), } return client, nil } func (c *Client) Start() error { + // Initial connection + if err := c.connect(); err != nil { + return err + } + + // Start connection monitor + go c.connectionMonitor() + + return c.Register() +} + +func (c *Client) connectionMonitor() { + backoff := reconnectInterval + + for { + select { + case <-c.stopCh: + return + + case <-c.reconnectCh: + logger.Info("Connection lost, attempting to reconnect...") + + for { + // Try to reconnect + err := c.reconnect() + if err == nil { + logger.Info("Successfully reconnected") + backoff = reconnectInterval // Reset backoff on successful connection + break + } + + logger.Errorf("Failed to reconnect: %v", err) + + // Exponential backoff with max limit + backoff = time.Duration(float64(backoff) * reconnectBackoffExp) + if backoff > maxReconnectBackoff { + backoff = maxReconnectBackoff + } + + select { + case <-c.stopCh: + return + case <-time.After(backoff): + continue + } + } + } + } +} + +func (c *Client) reconnect() error { + // Close existing connection if any + if c.udpConnection != nil { + c.udpConnection.Close() + c.udpConnection = nil + } + + // Establish new connection + if err := c.connect(); err != nil { + return err + } + + // Update with the server + if err := c.Update(); err != nil { + return err + } + + return nil +} + +func (c *Client) connect() error { udpClientOpts := []udp.Option{} udpClientOpts = append( udpClientOpts, options.WithInactivityMonitor(time.Minute, func(cc *udpClient.Conn) { - _ = cc.Close() + logger.Warn("Connection inactive, triggering reconnect") + select { + case c.reconnectCh <- struct{}{}: + default: + } }), options.WithMux(c.handleRouter()), ) @@ -366,13 +450,20 @@ func (c *Client) observe(w mux.ResponseWriter, token message.Token, objectId str c.taskManager.AddTask(objectId, time.Second*time.Duration(c.Settings.ObserveIntervalSec), func() { data, err := c.object.ReadAll(objectId) if err != nil { + logger.Errorf("failed to read data from object %s, error: %v", objectId, err) return } jsonData := data.ReadAsJSON() + // check if udp connection is nil + if c.udpConnection == nil { + logger.Errorf("udp connection is nil, ignore observe") + return + } + c.dataCache[objectId] = jsonData - err = sendResponse(w.Conn(), token, obs, jsonData) + err = sendResponse(c.udpConnection, token, obs, jsonData) if err != nil { logger.Errorf("failed to send response: %v", err) return @@ -392,6 +483,12 @@ func (c *Client) observe(w mux.ResponseWriter, token message.Token, objectId str jsonData := data.ReadAsJSON() + // check if udp connection is nil + if c.udpConnection == nil { + logger.Errorf("udp connection is nil, ignore observe") + return + } + // check data is changed if data, exists := c.dataCache[objectId]; exists { if string(jsonData) == data { @@ -401,10 +498,12 @@ func (c *Client) observe(w mux.ResponseWriter, token message.Token, objectId str } c.dataCache[objectId] = jsonData - err = sendResponse(w.Conn(), token, obs, jsonData) + err = sendResponse(c.udpConnection, token, obs, jsonData) if err != nil { + logger.Errorf("failed to send response: %v", err) return } + obs++ c.taskManager.ResetTask(objectId) }) @@ -449,8 +548,13 @@ func sendResponse(cc mux.Conn, token []byte, obs uint32, body string) error { func (c *Client) CleanUp() { c.taskManager.CancelAllTasks() _ = c.Delete() + + close(c.stopCh) + if c.udpConnection != nil { + _ = c.udpConnection.Close() + } } func (c *Client) isActivity() bool { - return time.Now().Before(c.lastUpdatedTime.Add(time.Duration(c.Settings.LifeTimeSec) * time.Second)) + return c.udpConnection != nil && time.Now().Before(c.lastUpdatedTime.Add(time.Duration(c.Settings.LifeTimeSec)*time.Second)) } diff --git a/pkg/gateway/lwm2m/gatewaylwm2m.go b/pkg/gateway/lwm2m/gatewaylwm2m.go index 8a7f44a9e..e04925827 100644 --- a/pkg/gateway/lwm2m/gatewaylwm2m.go +++ b/pkg/gateway/lwm2m/gatewaylwm2m.go @@ -155,22 +155,18 @@ func (g *Gateway) Start() error { return err } - // Register the client to the server - err := g.client.Register() - if err != nil { - logger.Errorf("Error registering client: %v", err) - return err - } - - // Ping the client every pingIntervalSec seconds, by default 30 seconds - t := time.NewTicker(time.Second * time.Duration(g.pingIntervalSec)) - for range t.C { - if err := g.client.Ping(); err != nil { - logger.Errorf("Error pinging client: %v", err) - g.ShutDown() - return err + if g.pingIntervalSec > 0 { + // Ping the client every pingIntervalSec seconds,by default disable + t := time.NewTicker(time.Second * time.Duration(g.pingIntervalSec)) + for range t.C { + if err := g.client.Ping(); err != nil { + logger.Errorf("Error pinging client: %v", err) + g.ShutDown() + return err + } } } + return nil } diff --git a/pkg/k8s/api/v1alpha1/edgedevice_types.go b/pkg/k8s/api/v1alpha1/edgedevice_types.go index 54bcd9263..6de96fe7f 100644 --- a/pkg/k8s/api/v1alpha1/edgedevice_types.go +++ b/pkg/k8s/api/v1alpha1/edgedevice_types.go @@ -113,7 +113,7 @@ type LwM2MSetting struct { PSKIdentity *string `json:"pskIdentity,omitempty"` PSKKey *string `json:"pskKey,omitempty"` - // +kubebuilder:default=30 + // +kubebuilder:default=-1 PingIntervalSec int64 `json:"pingIntervalSec,omitempty"` // reference https://datatracker.ietf.org/doc/html/rfc7252#section-4.8.2 // +kubebuilder:default=247 diff --git a/pkg/k8s/crd/config/edgedevice/bases/shifu.edgenesis.io_edgedevices.yaml b/pkg/k8s/crd/config/edgedevice/bases/shifu.edgenesis.io_edgedevices.yaml index 0e6be5bc9..152f80cd9 100644 --- a/pkg/k8s/crd/config/edgedevice/bases/shifu.edgenesis.io_edgedevices.yaml +++ b/pkg/k8s/crd/config/edgedevice/bases/shifu.edgenesis.io_edgedevices.yaml @@ -73,7 +73,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: @@ -122,7 +122,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: diff --git a/pkg/k8s/crd/install/config_crd.yaml b/pkg/k8s/crd/install/config_crd.yaml index 6e28f2fc0..8c492b341 100644 --- a/pkg/k8s/crd/install/config_crd.yaml +++ b/pkg/k8s/crd/install/config_crd.yaml @@ -72,7 +72,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: @@ -121,7 +121,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: diff --git a/pkg/k8s/crd/install/config_default.yaml b/pkg/k8s/crd/install/config_default.yaml index 53ccf4872..f06ae8ea9 100644 --- a/pkg/k8s/crd/install/config_default.yaml +++ b/pkg/k8s/crd/install/config_default.yaml @@ -79,7 +79,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: @@ -128,7 +128,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: diff --git a/pkg/k8s/crd/install/shifu_install.yml b/pkg/k8s/crd/install/shifu_install.yml index 6919691f4..4e15e1703 100644 --- a/pkg/k8s/crd/install/shifu_install.yml +++ b/pkg/k8s/crd/install/shifu_install.yml @@ -79,7 +79,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: @@ -128,7 +128,7 @@ spec: format: int64 type: integer pingIntervalSec: - default: 30 + default: -1 format: int64 type: integer pskIdentity: