Skip to content

Commit

Permalink
Continue removing logrus (#17)
Browse files Browse the repository at this point in the history
* Changed : renamed to NewNodeProcessor for consistency with type

* Refactored : continue changes to remove logrus
Changed : removed MainCallback for now

* Refactored : add slog to fileobject
  • Loading branch information
samsamfire authored Dec 8, 2024
1 parent 257a4c2 commit 53145aa
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 44 deletions.
2 changes: 1 addition & 1 deletion pkg/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (network *Network) AddRemoteNode(nodeId uint8, odict any) (*n.RemoteNode, e
// Add any node to the network and return a node controller which can be used
// To control high level node behaviour (starting, stopping the node)
func (network *Network) AddNode(node n.Node) (*n.NodeProcessor, error) {
controller := n.NewNodeController(node)
controller := n.NewNodeProcessor(node, network.logger)
_, ok := network.controllers[node.GetID()]
if ok {
return nil, ErrIdConflict
Expand Down
27 changes: 16 additions & 11 deletions pkg/node/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,42 @@ package node

import (
"context"
"log/slog"
"sync"
"time"

"github.com/samsamfire/gocanopen/pkg/nmt"
log "github.com/sirupsen/logrus"
)

// [NodeProcessor] is responsible for handling the node
// internal CANopen stack processing.
type NodeProcessor struct {
logger *slog.Logger
node Node
cancel context.CancelFunc
resetHandler func(node Node, cmd uint8) error
wg *sync.WaitGroup
}

func NewNodeController(n Node) *NodeProcessor {
return &NodeProcessor{node: n, wg: &sync.WaitGroup{}}
func NewNodeProcessor(n Node, logger *slog.Logger) *NodeProcessor {

if logger == nil {
logger = slog.Default()
}

return &NodeProcessor{logger: logger.With("service", "[CTRLR]", "id", n.GetID()), node: n, wg: &sync.WaitGroup{}}
}

// background processing for [SYNC],[TPDO],[RPDO] services
func (c *NodeProcessor) background(ctx context.Context) {

const PeriodUs = 10_000
ticker := time.NewTicker(PeriodUs * time.Microsecond)
log.Infof("[NETWORK][x%x] starting node background process", c.node.GetID())
c.logger.Info("starting node background process")
for {
select {
case <-ctx.Done():
log.Infof("[NETWORK][x%x] exited node background process", c.node.GetID())
c.logger.Info("exited node background process")
ticker.Stop()
return
case <-ticker.C:
Expand All @@ -47,24 +53,25 @@ func (c *NodeProcessor) main(ctx context.Context) {

const PeriodUs = 1_000
ticker := time.NewTicker(PeriodUs * time.Microsecond)
log.Infof("[NETWORK][x%x] starting node main process", c.node.GetID())
c.logger.Info("starting node main process")
for {
select {
case <-ctx.Done():
log.Infof("[NETWORK][x%x] exited node main process", c.node.GetID())
c.logger.Info("exited node main process")
ticker.Stop()
return
case <-ticker.C:
// Process main
state := c.node.ProcessMain(false, PeriodUs, nil)
if state == nmt.ResetApp || state == nmt.ResetComm {
c.logger.Info("node reset requested")
if c.resetHandler != nil {
err := c.resetHandler(c.node, state)
if err != nil {
log.Warn("failed to reset node")
c.logger.Info("failed to reset node", "error", err)
}
} else {
log.Warn("no reset handler for node")
c.logger.Warn("no reset handler registered")
}
}
}
Expand Down Expand Up @@ -96,9 +103,7 @@ func (c *NodeProcessor) Start(ctx context.Context) error {
c.wg.Add(1)
go func() {
defer c.wg.Done()
log.Info("start sdo server processing")
server.Process(ctx)
log.Info("stop sdo server processing")
}()
}
return nil
Expand Down
20 changes: 11 additions & 9 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package node

import (
"fmt"
"log/slog"
"sync"

canopen "github.com/samsamfire/gocanopen"
"github.com/samsamfire/gocanopen/pkg/config"
"github.com/samsamfire/gocanopen/pkg/od"
"github.com/samsamfire/gocanopen/pkg/sdo"
log "github.com/sirupsen/logrus"
)

const (
Expand Down Expand Up @@ -75,10 +75,6 @@ func (node *BaseNode) GetID() uint8 {
return node.id
}

func (node *BaseNode) SetMainCallback(mainCallback func(node Node)) {
node.mainCallback = mainCallback
}

func (node *BaseNode) Configurator() *config.NodeConfigurator {
return config.NewNodeConfigurator(node.id, node.logger, node.SDOClient)
}
Expand All @@ -89,26 +85,32 @@ func (node *BaseNode) Export(filename string) error {
countErrors := 0
for index, entry := range node.GetOD().Entries() {
if entry.ObjectType == od.ObjectTypeDOMAIN {
log.Warnf("skipping domain object %x", index)
node.logger.Warn("skipping domain object", "index", fmt.Sprintf("x%x", index))
continue
}
for j := range uint8(entry.SubCount()) {
buffer := make([]byte, 100)
n, err := node.ReadRaw(index, j, buffer)
if err != nil {
countErrors++
log.Warnf("failed to read remote value %x|%x : %v", index, j, err)
node.logger.Warn("failed to read remote value",
"index", fmt.Sprintf("x%x", index),
"subIndex", fmt.Sprintf("x%x", j),
"error", err)
continue
}
err = entry.WriteExactly(j, buffer[:n], true)
if err != nil {
log.Warnf("failed to write remote value to local od %x|%x : %v", index, j, err)
node.logger.Warn("failed to write remote value to local od",
"index", fmt.Sprintf("x%x", index),
"subIndex", fmt.Sprintf("x%x", j),
"error", err)
countErrors++
continue
}
countRead++
}
}
log.Infof("dump successful, read : %v, errors : %v", countRead, countErrors)
node.logger.Info("dump successful", "nbRead", countRead, "nbErrors", countErrors)
return od.ExportEDS(node.GetOD(), false, filename)
}
32 changes: 21 additions & 11 deletions pkg/od/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,31 @@ package od

import (
"io"
"log/slog"
"os"

log "github.com/sirupsen/logrus"
)

type FileObject struct {
logger *slog.Logger
FilePath string
WriteMode int
ReadMode int
File *os.File
}

func NewFileObject(path string, logger *slog.Logger, writeMode int, readMode int) *FileObject {

if logger == nil {
logger = slog.Default()
}

return &FileObject{
logger: logger.With("extension", "[FILE]"),
FilePath: path,
WriteMode: writeMode,
ReadMode: readMode}
}

// [SDO] Custom function for reading a file like object
func ReadEntryFileObject(stream *Stream, data []byte, countRead *uint16) error {
if stream == nil || data == nil || countRead == nil || stream.Subindex != 0 || stream.Object == nil {
Expand All @@ -28,7 +41,7 @@ func ReadEntryFileObject(stream *Stream, data []byte, countRead *uint16) error {
}
if stream.DataOffset == 0 {
var err error
log.Infof("[OD][EXTENSION][FILE] opening %v for reading", fileObject.FilePath)
fileObject.logger.Info("opening file for reading", "path", fileObject.FilePath)
fileObject.File, err = os.OpenFile(fileObject.FilePath, fileObject.ReadMode, 0644)
if err != nil {
return ErrDevIncompat
Expand All @@ -49,12 +62,12 @@ func ReadEntryFileObject(stream *Stream, data []byte, countRead *uint16) error {
return ErrPartial
case io.EOF, io.ErrUnexpectedEOF:
*countRead = uint16(countReadInt)
log.Infof("[OD][EXTENSION][FILE] finished reading %v", fileObject.FilePath)
fileObject.logger.Info("finished reading", "path", fileObject.FilePath)
fileObject.File.Close()
return nil
default:
// unexpected error
log.Errorf("[OD][EXTENSION][FILE] error reading file %v", err)
fileObject.logger.Warn("error reading", "path", fileObject.FilePath, "err", err)
fileObject.File.Close()
return ErrDevIncompat

Expand All @@ -73,7 +86,7 @@ func WriteEntryFileObject(stream *Stream, data []byte, countWritten *uint16) err
}
if stream.DataOffset == 0 {
var err error
log.Infof("[OD][EXTENSION][FILE] opening %v for writing", fileObject.FilePath)
fileObject.logger.Info("opening file for writing", "path", fileObject.FilePath)
fileObject.File, err = os.OpenFile(fileObject.FilePath, fileObject.WriteMode, 0644)
if err != nil {
return ErrDevIncompat
Expand All @@ -91,14 +104,14 @@ func WriteEntryFileObject(stream *Stream, data []byte, countWritten *uint16) err
*countWritten = uint16(countWrittenInt)
stream.DataOffset += uint32(countWrittenInt)
if stream.DataLength == stream.DataOffset {
log.Infof("[OD][EXTENSION][FILE] finished writing %v", fileObject.FilePath)
fileObject.logger.Info("finished writing", "path", fileObject.FilePath)
fileObject.File.Close()
return nil
} else {
return ErrPartial
}
} else {
log.Errorf("[OD][EXTENSION][FILE] error writing file %v", err)
fileObject.logger.Warn("error writing", "path", fileObject.FilePath, "err", err)
fileObject.File.Close()
return ErrDevIncompat
}
Expand Down Expand Up @@ -132,11 +145,8 @@ func ReadEntryReader(stream *Stream, data []byte, countRead *uint16) error {
return ErrPartial
case io.EOF, io.ErrUnexpectedEOF:
*countRead = uint16(countReadInt)
log.Infof("[OD][EXTENSION][FILE] finished reading")
return nil
default:
log.Errorf("[OD][EXTENSION][FILE] error reading file %v", err)
return ErrDevIncompat

}
}
4 changes: 2 additions & 2 deletions pkg/od/od.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ func (od *ObjectDictionary) AddVariableList(index uint16, name string, varList *
// readMode and writeMode should be given to determine what type of access to the file is allowed
// e.g. os.O_RDONLY if only reading is allowed
func (od *ObjectDictionary) AddFile(index uint16, indexName string, filePath string, readMode int, writeMode int) {
fileObject := &FileObject{FilePath: filePath, ReadMode: readMode, WriteMode: writeMode}
f := NewFileObject(filePath, od.logger, writeMode, readMode)
entry, _ := od.AddVariableType(index, indexName, DOMAIN, AttributeSdoRw, "") // Cannot error
entry.logger.Info("adding extension file i/o", "path", filePath)
entry.AddExtension(fileObject, ReadEntryFileObject, WriteEntryFileObject)
entry.AddExtension(f, ReadEntryFileObject, WriteEntryFileObject)
}

// AddReader adds an io.Reader object, of type DOMAIN to OD
Expand Down
24 changes: 14 additions & 10 deletions pkg/pdo/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package pdo

import (
"encoding/binary"
"fmt"

canopen "github.com/samsamfire/gocanopen"
"github.com/samsamfire/gocanopen/pkg/od"
log "github.com/sirupsen/logrus"
)

// [RPDO] update communication parameter
func writeEntry14xx(stream *od.Stream, data []byte, countWritten *uint16) error {
log.Debug("[OD][EXTENSION][RPDO] updating communication parameter")
if stream == nil || data == nil || countWritten == nil || len(data) > 4 {
return od.ErrDevIncompat
}
Expand All @@ -33,7 +32,10 @@ func writeEntry14xx(stream *od.Stream, data []byte, countWritten *uint16) error
/* bits 11...29 must be zero, PDO must be disabled on change,
* CAN_ID == 0 is not allowed, mapping must be configured before
* enabling the PDO */
log.Debugf("[OD][EXTENSION][%v] updating pdo cob-id, valid : %v, canId : x%x", pdo.Type(), valid, canId)
rpdo.pdo.logger.Debug("updating cob-id",
"valid", valid,
"canId", fmt.Sprintf("x%x", canId),
)
if (cobId&0x3FFFF800) != 0 ||
valid && pdo.Valid && canId != uint32(pdo.configuredId) ||
valid && canopen.IsIDRestricted(uint16(canId)) ||
Expand Down Expand Up @@ -62,7 +64,10 @@ func writeEntry14xx(stream *od.Stream, data []byte, countWritten *uint16) error
return od.ErrDevIncompat
}
}
log.Debugf("[OD][EXTENSION][%v] updated pdo with cobId : x%x, valid : %v", pdo.Type(), pdo.configuredId&0x7FF, pdo.Valid)
rpdo.pdo.logger.Debug("updated cob-id",
"valid", valid,
"cobId", fmt.Sprintf("x%x", pdo.configuredId&0x7FF),
)
}

case 2:
Expand All @@ -77,14 +82,14 @@ func writeEntry14xx(stream *od.Stream, data []byte, countWritten *uint16) error
rpdo.rxNew[1] = false
}
rpdo.synchronous = synchronous
log.Debugf("[OD][EXTENSION][%v] updated pdo transmission type to : %v", pdo.Type(), transmissionType)
rpdo.pdo.logger.Debug("updated transmission type", "transmissionType", transmissionType)

case 5:
// Event timer
eventTime := binary.LittleEndian.Uint16(data)
rpdo.timeoutTimeUs = uint32(eventTime) * 1000
rpdo.timeoutTimer = 0
log.Debugf("[OD][EXTENSION][%v] updated pdo event timer to : %v us", pdo.Type(), eventTime)
rpdo.pdo.logger.Debug("updated event timer", "transmissionType", eventTime)
}

return od.WriteEntryDefault(stream, bufCopy, countWritten)
Expand Down Expand Up @@ -143,7 +148,7 @@ func writeEntry16xxOr1Axx(stream *od.Stream, data []byte, countWritten *uint16)
default:
return od.ErrDevIncompat
}
log.Debugf("[OD][EXTENSION][%v] updating mapping parameter", pdo.Type())
pdo.logger.Debug("updating mapping parameter")
// PDO must be disabled in order to allow mapping
if pdo.Valid || pdo.nbMapped != 0 && stream.Subindex > 0 {
return od.ErrUnsuppAccess
Expand Down Expand Up @@ -172,8 +177,7 @@ func writeEntry16xxOr1Axx(stream *od.Stream, data []byte, countWritten *uint16)
}
pdo.dataLength = pdoDataLength
pdo.nbMapped = mappedObjectsCount
log.Debugf("[OD][EXTENSION][%v] updated pdo number of mapped objects to : %v", pdo.Type(), mappedObjectsCount)

pdo.logger.Debug("updated number of mapped objects to", "count", mappedObjectsCount)
} else {
err := pdo.configureMap(binary.LittleEndian.Uint32(data), uint32(stream.Subindex)-1, pdo.IsRPDO)
if err != nil {
Expand Down Expand Up @@ -208,7 +212,7 @@ func writeEntry18xx(stream *od.Stream, data []byte, countWritten *uint16) error
// - PDO must be disabled on change
// - CAN_ID == 0 is not allowed
// - mapping must be configured before enabling the PDO
log.Debugf("[OD][EXTENSION][%v] updating pdo cob-id, valid : %v, canId : x%x", pdo.Type(), valid, canId)
pdo.logger.Debug("updating cob-id", "valid", valid, "canId", canId)

if (cobId&0x3FFFF800) != 0 ||
(valid && pdo.Valid && canId != uint32(pdo.configuredId)) ||
Expand Down

0 comments on commit 53145aa

Please sign in to comment.