Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add reconnect for lwm2m gateway #1075

Merged
merged 7 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 109 additions & 5 deletions pkg/gateway/lwm2m/client/lwm2m.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
registerPath = "/rd"

observeTaskSuffix = "-ob"

reconnectInterval = 5 * time.Second
maxReconnectBackoff = 60 * time.Second
rhoninl marked this conversation as resolved.
Show resolved Hide resolved
reconnectBackoffExp = 1.5
)

type Client struct {
Expand All @@ -44,6 +48,9 @@

udpConnection *udpClient.Conn
taskManager *TaskManager

reconnectCh chan struct{}
stopCh chan struct{}
}

type Config struct {
Expand All @@ -55,23 +62,100 @@

func NewClient(ctx context.Context, config Config) (*Client, error) {
var client = &Client{
ctx: context.TODO(),
ctx: ctx,

Check warning on line 65 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L65

Added line #L65 was not covered by tests
Config: config,
object: *NewObject(rootObjectId, nil),
taskManager: NewTaskManager(ctx),
dataCache: make(map[string]interface{}),
reconnectCh: make(chan struct{}),
stopCh: make(chan struct{}),

Check warning on line 71 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L70-L71

Added lines #L70 - L71 were not covered by tests
}

return client, nil
}

func (c *Client) Start() error {
// Initial connection
if err := c.connect(); err != nil {
return err
}

Check warning on line 81 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L78-L81

Added lines #L78 - L81 were not covered by tests

// Start connection monitor
go c.connectionMonitor()

return c.Register()

Check warning on line 86 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L84-L86

Added lines #L84 - L86 were not covered by tests
}

func (c *Client) connectionMonitor() {
rhoninl marked this conversation as resolved.
Show resolved Hide resolved
backoff := reconnectInterval

for {
select {
case <-c.stopCh:
return

Check warning on line 95 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L89-L95

Added lines #L89 - L95 were not covered by tests

case <-c.reconnectCh:
logger.Info("Connection lost, attempting to reconnect...")

for {
// Try to reconnect
err := c.reconnect()
if err == nil {
logger.Info("Successfully reconnected")
backoff = reconnectInterval // Reset backoff on successful connection
break

Check warning on line 106 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L97-L106

Added lines #L97 - L106 were not covered by tests
}

logger.Errorf("Failed to reconnect: %v", err)

// Exponential backoff with max limit
backoff = time.Duration(float64(backoff) * reconnectBackoffExp)
if backoff > maxReconnectBackoff {
backoff = maxReconnectBackoff
}

Check warning on line 115 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L109-L115

Added lines #L109 - L115 were not covered by tests

select {
case <-c.stopCh:
return
case <-time.After(backoff):
continue

Check warning on line 121 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L117-L121

Added lines #L117 - L121 were not covered by tests
}
}
}
}
}

func (c *Client) reconnect() error {
// Close existing connection if any
if c.udpConnection != nil {
c.udpConnection.Close()
c.udpConnection = nil
}

Check warning on line 133 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L128-L133

Added lines #L128 - L133 were not covered by tests

// Establish new connection
if err := c.connect(); err != nil {
return err
}

Check warning on line 138 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L136-L138

Added lines #L136 - L138 were not covered by tests

// Update with the server
if err := c.Update(); err != nil {
return err
}

Check warning on line 143 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L141-L143

Added lines #L141 - L143 were not covered by tests

return nil

Check warning on line 145 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L145

Added line #L145 was not covered by tests
}

func (c *Client) connect() error {

Check warning on line 148 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L148

Added line #L148 was not covered by tests
udpClientOpts := []udp.Option{}

udpClientOpts = append(
udpClientOpts,
options.WithInactivityMonitor(time.Minute, func(cc *udpClient.Conn) {
_ = cc.Close()
logger.Warn("Connection inactive, triggering reconnect")
select {
case c.reconnectCh <- struct{}{}:
default:

Check warning on line 157 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L154-L157

Added lines #L154 - L157 were not covered by tests
}
}),
options.WithMux(c.handleRouter()),
)
Expand Down Expand Up @@ -366,13 +450,20 @@
c.taskManager.AddTask(objectId, time.Second*time.Duration(c.Settings.ObserveIntervalSec), func() {
data, err := c.object.ReadAll(objectId)
if err != nil {
logger.Errorf("failed to read data from object %s, error: %v", objectId, err)

Check warning on line 453 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L453

Added line #L453 was not covered by tests
return
}

jsonData := data.ReadAsJSON()

// check if udp connection is nil
if c.udpConnection == nil {
logger.Errorf("udp connection is nil, ignore observe")
return
}

Check warning on line 463 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L459-L463

Added lines #L459 - L463 were not covered by tests

c.dataCache[objectId] = jsonData
err = sendResponse(w.Conn(), token, obs, jsonData)
err = sendResponse(c.udpConnection, token, obs, jsonData)

Check warning on line 466 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L466

Added line #L466 was not covered by tests
if err != nil {
logger.Errorf("failed to send response: %v", err)
return
Expand All @@ -392,6 +483,12 @@

jsonData := data.ReadAsJSON()

// check if udp connection is nil
if c.udpConnection == nil {
logger.Errorf("udp connection is nil, ignore observe")
return
}

Check warning on line 490 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L486-L490

Added lines #L486 - L490 were not covered by tests

// check data is changed
if data, exists := c.dataCache[objectId]; exists {
if string(jsonData) == data {
Expand All @@ -401,10 +498,12 @@
}

c.dataCache[objectId] = jsonData
err = sendResponse(w.Conn(), token, obs, jsonData)
err = sendResponse(c.udpConnection, token, obs, jsonData)

Check warning on line 501 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L501

Added line #L501 was not covered by tests
if err != nil {
logger.Errorf("failed to send response: %v", err)

Check warning on line 503 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L503

Added line #L503 was not covered by tests
return
}

obs++
c.taskManager.ResetTask(objectId)
})
Expand Down Expand Up @@ -449,8 +548,13 @@
func (c *Client) CleanUp() {
c.taskManager.CancelAllTasks()
_ = c.Delete()

close(c.stopCh)
if c.udpConnection != nil {
_ = c.udpConnection.Close()
}

Check warning on line 555 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L551-L555

Added lines #L551 - L555 were not covered by tests
}

func (c *Client) isActivity() bool {
return time.Now().Before(c.lastUpdatedTime.Add(time.Duration(c.Settings.LifeTimeSec) * time.Second))
return c.udpConnection != nil && time.Now().Before(c.lastUpdatedTime.Add(time.Duration(c.Settings.LifeTimeSec)*time.Second))

Check warning on line 559 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L559

Added line #L559 was not covered by tests
}
24 changes: 10 additions & 14 deletions pkg/gateway/lwm2m/gatewaylwm2m.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,22 +155,18 @@
return err
}

// Register the client to the server
err := g.client.Register()
if err != nil {
logger.Errorf("Error registering client: %v", err)
return err
}

// Ping the client every pingIntervalSec seconds, by default 30 seconds
t := time.NewTicker(time.Second * time.Duration(g.pingIntervalSec))
for range t.C {
if err := g.client.Ping(); err != nil {
logger.Errorf("Error pinging client: %v", err)
g.ShutDown()
return err
if g.pingIntervalSec > 0 {
// Ping the client every pingIntervalSec seconds,by default disable
t := time.NewTicker(time.Second * time.Duration(g.pingIntervalSec))
for range t.C {
if err := g.client.Ping(); err != nil {
logger.Errorf("Error pinging client: %v", err)
g.ShutDown()
return err
}

Check warning on line 166 in pkg/gateway/lwm2m/gatewaylwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/gatewaylwm2m.go#L158-L166

Added lines #L158 - L166 were not covered by tests
}
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/k8s/api/v1alpha1/edgedevice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type LwM2MSetting struct {
PSKIdentity *string `json:"pskIdentity,omitempty"`
PSKKey *string `json:"pskKey,omitempty"`

// +kubebuilder:default=30
// +kubebuilder:default=-1
PingIntervalSec int64 `json:"pingIntervalSec,omitempty"`
// reference https://datatracker.ietf.org/doc/html/rfc7252#section-4.8.2
// +kubebuilder:default=247
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down Expand Up @@ -122,7 +122,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down
4 changes: 2 additions & 2 deletions pkg/k8s/crd/install/config_crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down Expand Up @@ -121,7 +121,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down
4 changes: 2 additions & 2 deletions pkg/k8s/crd/install/config_default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down Expand Up @@ -128,7 +128,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down
4 changes: 2 additions & 2 deletions pkg/k8s/crd/install/shifu_install.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down Expand Up @@ -128,7 +128,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down
Loading