Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add reconnect for lwm2m gateway #1075

Merged
merged 7 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 94 additions & 5 deletions pkg/gateway/lwm2m/client/lwm2m.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
registerPath = "/rd"

observeTaskSuffix = "-ob"

reconnectInterval = 5 * time.Second
maxReconnectBackoff = 60 * time.Second
rhoninl marked this conversation as resolved.
Show resolved Hide resolved
)

type Client struct {
Expand All @@ -44,6 +47,9 @@

udpConnection *udpClient.Conn
taskManager *TaskManager

reconnectCh chan struct{}
stopCh chan struct{}
}

type Config struct {
Expand All @@ -55,23 +61,100 @@

func NewClient(ctx context.Context, config Config) (*Client, error) {
var client = &Client{
ctx: context.TODO(),
ctx: ctx,

Check warning on line 64 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L64

Added line #L64 was not covered by tests
Config: config,
object: *NewObject(rootObjectId, nil),
taskManager: NewTaskManager(ctx),
dataCache: make(map[string]interface{}),
reconnectCh: make(chan struct{}),
stopCh: make(chan struct{}),

Check warning on line 70 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

return client, nil
}

func (c *Client) Start() error {
// Start connection monitor
go c.connectionMonitor()
rhoninl marked this conversation as resolved.
Show resolved Hide resolved

// Initial connection
if err := c.connect(); err != nil {
return err
}

Check warning on line 83 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L77-L83

Added lines #L77 - L83 were not covered by tests

return c.Register()

Check warning on line 85 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L85

Added line #L85 was not covered by tests
}

func (c *Client) connectionMonitor() {
rhoninl marked this conversation as resolved.
Show resolved Hide resolved
backoff := reconnectInterval

for {
select {
case <-c.stopCh:
return

Check warning on line 94 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L88-L94

Added lines #L88 - L94 were not covered by tests

case <-c.reconnectCh:
logger.Info("Connection lost, attempting to reconnect...")

for {
// Try to reconnect
err := c.reconnect()
if err == nil {
logger.Info("Successfully reconnected")
backoff = reconnectInterval // Reset backoff on successful connection
break

Check warning on line 105 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L96-L105

Added lines #L96 - L105 were not covered by tests
}

logger.Errorf("Failed to reconnect: %v", err)

// Exponential backoff with max limit
backoff = time.Duration(float64(backoff) * 1.5)
rhoninl marked this conversation as resolved.
Show resolved Hide resolved
if backoff > maxReconnectBackoff {
backoff = maxReconnectBackoff
}

Check warning on line 114 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L108-L114

Added lines #L108 - L114 were not covered by tests

select {
case <-c.stopCh:
return
case <-time.After(backoff):
continue

Check warning on line 120 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L116-L120

Added lines #L116 - L120 were not covered by tests
}
}
}
}
}

func (c *Client) reconnect() error {
// Close existing connection if any
if c.udpConnection != nil {
c.udpConnection.Close()
c.udpConnection = nil
}

Check warning on line 132 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L127-L132

Added lines #L127 - L132 were not covered by tests

// Establish new connection
if err := c.connect(); err != nil {
return err
}

Check warning on line 137 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L135-L137

Added lines #L135 - L137 were not covered by tests

// Update with the server
if err := c.Update(); err != nil {
return err
}

Check warning on line 142 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L140-L142

Added lines #L140 - L142 were not covered by tests

return nil

Check warning on line 144 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L144

Added line #L144 was not covered by tests
}

func (c *Client) connect() error {

Check warning on line 147 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L147

Added line #L147 was not covered by tests
udpClientOpts := []udp.Option{}

udpClientOpts = append(
udpClientOpts,
options.WithInactivityMonitor(time.Minute, func(cc *udpClient.Conn) {
_ = cc.Close()
logger.Warn("Connection inactive, triggering reconnect")
select {
case c.reconnectCh <- struct{}{}:
default:

Check warning on line 156 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L153-L156

Added lines #L153 - L156 were not covered by tests
}
}),
options.WithMux(c.handleRouter()),
)
Expand Down Expand Up @@ -133,12 +216,10 @@
}

// set query params for register request
// example: /rd?ep=shifu-gateway&lt=300&lwm2m=1.0&b=U
request.AddQuery(fmt.Sprintf("%s=%s", QueryParamsEndpointName, c.EndpointName))
request.AddQuery(fmt.Sprintf("%s=%d", QueryParamslifeTime, c.Settings.LifeTimeSec))
request.AddQuery(fmt.Sprintf("%s=%s", QueryParamsLwM2MVersion, lwM2MVersion))
request.AddQuery(fmt.Sprintf("%s=%s", QueryParamsBindingMode, defaultBindingMode))
// only accept text/plain
request.SetAccept(message.TextPlain)
resp, err := c.udpConnection.Do(request)
if err != nil {
Expand Down Expand Up @@ -452,5 +533,13 @@
}

func (c *Client) isActivity() bool {
return time.Now().Before(c.lastUpdatedTime.Add(time.Duration(c.Settings.LifeTimeSec) * time.Second))
return c.udpConnection != nil && time.Now().Before(c.lastUpdatedTime.Add(time.Duration(c.Settings.LifeTimeSec)*time.Second))

Check warning on line 536 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L536

Added line #L536 was not covered by tests
}

func (c *Client) Stop() error {
rhoninl marked this conversation as resolved.
Show resolved Hide resolved
close(c.stopCh)
if c.udpConnection != nil {
return c.udpConnection.Close()
}
return nil

Check warning on line 544 in pkg/gateway/lwm2m/client/lwm2m.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lwm2m/client/lwm2m.go#L539-L544

Added lines #L539 - L544 were not covered by tests
}
7 changes: 0 additions & 7 deletions pkg/gateway/lwm2m/gatewaylwm2m.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,6 @@ func (g *Gateway) Start() error {
return err
}

// Register the client to the server
err := g.client.Register()
if err != nil {
logger.Errorf("Error registering client: %v", err)
return err
}

// Ping the client every pingIntervalSec seconds, by default 30 seconds
t := time.NewTicker(time.Second * time.Duration(g.pingIntervalSec))
for range t.C {
Expand Down
Loading