Skip to content

Commit

Permalink
Fixing pcidevice device plugin stop deadlock
Browse files Browse the repository at this point in the history
Signed-off-by: Webber Huang <[email protected]>

Fixing codeFactor "Complex Method" in pcidevice plugin healthcheck()
  • Loading branch information
WebberHuang1118 committed Feb 19, 2024
1 parent 82535bf commit a42eb58
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 14 deletions.
3 changes: 1 addition & 2 deletions pkg/controller/nodes/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"reflect"
"time"

"github.com/harvester/pcidevices/pkg/controller/gpudevice"

ctlnetworkv1beta1 "github.com/harvester/harvester-network-controller/pkg/generated/controllers/network.harvesterhci.io/v1beta1"
"github.com/jaypipes/ghw"
ctlcorev1 "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
Expand All @@ -17,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/labels"

"github.com/harvester/pcidevices/pkg/apis/devices.harvesterhci.io/v1beta1"
"github.com/harvester/pcidevices/pkg/controller/gpudevice"
"github.com/harvester/pcidevices/pkg/controller/pcidevice"
"github.com/harvester/pcidevices/pkg/controller/sriovdevice"
ctl "github.com/harvester/pcidevices/pkg/generated/controllers/devices.harvesterhci.io/v1beta1"
Expand Down
28 changes: 16 additions & 12 deletions pkg/deviceplugins/device_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
pciBasePath = "/sys/bus/pci/devices"
connectionTimeout = 120 * time.Second // Google gRPC default timeout
PCIResourcePrefix = "PCI_RESOURCE"
tickerTimeout = 30 * time.Second
)

type PCIDevice struct {
Expand Down Expand Up @@ -292,7 +293,6 @@ func (dp *PCIDevicePlugin) Allocate(_ context.Context, r *pluginapi.AllocateRequ
}

func (dp *PCIDevicePlugin) healthCheck() error {
logger := log.DefaultLogger()
monitoredDevices := make(map[string]string)
watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand Down Expand Up @@ -348,31 +348,37 @@ func (dp *PCIDevicePlugin) healthCheck() error {
return fmt.Errorf("failed to watch device-plugin socket: %v", err)
}

return dp.performCheck(monitoredDevices, watcher)
}

func (dp *PCIDevicePlugin) performCheck(monitoredDevices map[string]string, watcher *fsnotify.Watcher) error {
for {
select {
case <-dp.stop:
return nil
case <-dp.done:
return nil
case err := <-watcher.Errors:
logger.Reason(err).Errorf("error watching devices and device plugin directory")
logrus.Errorf("error watching devices and device plugin directory: %v", err)
case event := <-watcher.Events:
logger.V(4).Infof("health Event: %v", event)
logrus.Infof("health Event: %v", event)
if monDevID, exist := monitoredDevices[event.Name]; exist {
// Health in this case is if the device path actually exists
if event.Op == fsnotify.Create {
logger.Infof("monitored device %s appeared", dp.resourceName)
logrus.Infof("monitored device %s appeared", dp.resourceName)
dp.health <- deviceHealth{
DevID: monDevID,
Health: pluginapi.Healthy,
}
} else if (event.Op == fsnotify.Remove) || (event.Op == fsnotify.Rename) {
logger.Infof("monitored device %s disappeared", dp.resourceName)
logrus.Infof("monitored device %s disappeared", dp.resourceName)
dp.health <- deviceHealth{
DevID: monDevID,
Health: pluginapi.Unhealthy,
}
}
} else if event.Name == dp.socketPath && event.Op == fsnotify.Remove {
logger.Infof("device socket file for device %s was removed, kubelet probably restarted.", dp.resourceName)
logrus.Infof("device socket file for device %s was removed, kubelet probably restarted.", dp.resourceName)
return nil
}
}
Expand All @@ -389,14 +395,12 @@ func (dp *PCIDevicePlugin) GetDeviceName() string {

// Stop stops the gRPC server
func (dp *PCIDevicePlugin) stopDevicePlugin() error {
defer func() {
if !IsChanClosed(dp.done) {
close(dp.done)
}
}()
if !IsChanClosed(dp.done) {
close(dp.done)
}

// Give the device plugin one second to properly deregister
ticker := time.NewTicker(1 * time.Second)
ticker := time.NewTicker(tickerTimeout)
defer ticker.Stop()
select {
case <-dp.deregistered:
Expand Down
6 changes: 6 additions & 0 deletions pkg/deviceplugins/deviceplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,11 @@ func (dp *PCIDevicePlugin) RemoveDevice(pd *v1beta1.PCIDevice, pdc *v1beta1.PCID
logrus.Infof("Removing %s from device plugin", resourceName)
dp.MarkPCIDeviceAsUnhealthy(pdc.Spec.Address)
}

for i, dev := range dp.devs {
if dev.ID == pdc.Spec.Address {
dp.devs[i].Health = pluginapi.Unhealthy
}
}
return nil
}

0 comments on commit a42eb58

Please sign in to comment.