From 789fcf5afbd1ea8ce628fed675fed5e4bd313987 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 29 Aug 2016 09:55:38 -0400 Subject: [PATCH 1/5] Move NetworkInfo from Registry to OsdnNode/OsdnMaster/ovsProxyPlugin Rather than implicitly caching NetworkInfo in the Registry, explicitly cache it in the node/master/proxy objects. --- pkg/sdn/plugin/common.go | 50 +++++++++++++++++++++++++++ pkg/sdn/plugin/master.go | 20 ++++++----- pkg/sdn/plugin/node.go | 6 ++-- pkg/sdn/plugin/proxy.go | 14 ++++---- pkg/sdn/plugin/registry.go | 69 +------------------------------------- pkg/sdn/plugin/subnets.go | 15 +++------ 6 files changed, 79 insertions(+), 95 deletions(-) diff --git a/pkg/sdn/plugin/common.go b/pkg/sdn/plugin/common.go index c91181150174..f696a73589ad 100644 --- a/pkg/sdn/plugin/common.go +++ b/pkg/sdn/plugin/common.go @@ -2,6 +2,7 @@ package plugin import ( "fmt" + "net" "strings" osapi "github.com/openshift/origin/pkg/sdn/api" @@ -22,3 +23,52 @@ func getPodContainerID(pod *kapi.Pod) string { func hostSubnetToString(subnet *osapi.HostSubnet) string { return fmt.Sprintf("%s (host: %q, ip: %q, subnet: %q)", subnet.Name, subnet.Host, subnet.HostIP, subnet.Subnet) } + +type NetworkInfo struct { + ClusterNetwork *net.IPNet + ServiceNetwork *net.IPNet + HostSubnetLength uint32 + PluginName string +} + +func validateClusterNetwork(network string, hostSubnetLength uint32, serviceNetwork string, pluginName string) (*NetworkInfo, error) { + _, cn, err := net.ParseCIDR(network) + if err != nil { + return nil, fmt.Errorf("Failed to parse ClusterNetwork CIDR %s: %v", network, err) + } + + _, sn, err := net.ParseCIDR(serviceNetwork) + if err != nil { + return nil, fmt.Errorf("Failed to parse ServiceNetwork CIDR %s: %v", serviceNetwork, err) + } + + if hostSubnetLength <= 0 || hostSubnetLength > 32 { + return nil, fmt.Errorf("Invalid HostSubnetLength %d (not between 1 and 32)", hostSubnetLength) + } + + return &NetworkInfo{ + ClusterNetwork: cn, + ServiceNetwork: sn, + HostSubnetLength: hostSubnetLength, + PluginName: pluginName, + }, nil +} + +func (ni *NetworkInfo) validateNodeIP(nodeIP string) error { + if nodeIP == "" || nodeIP == "127.0.0.1" { + return fmt.Errorf("Invalid node IP %q", nodeIP) + } + + // Ensure each node's NodeIP is not contained by the cluster network, + // which could cause a routing loop. (rhbz#1295486) + ipaddr := net.ParseIP(nodeIP) + if ipaddr == nil { + return fmt.Errorf("Failed to parse node IP %s", nodeIP) + } + + if ni.ClusterNetwork.Contains(ipaddr) { + return fmt.Errorf("Node IP %s conflicts with cluster network %s", nodeIP, ni.ClusterNetwork.String()) + } + + return nil +} diff --git a/pkg/sdn/plugin/master.go b/pkg/sdn/plugin/master.go index 9f9d8dcccb27..8eb318a50119 100644 --- a/pkg/sdn/plugin/master.go +++ b/pkg/sdn/plugin/master.go @@ -16,6 +16,7 @@ import ( type OsdnMaster struct { registry *Registry + networkInfo *NetworkInfo subnetAllocator *netutils.SubnetAllocator vnids *masterVNIDMap } @@ -32,26 +33,27 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie } // Validate command-line/config parameters - ni, err := validateClusterNetwork(networkConfig.ClusterNetworkCIDR, networkConfig.HostSubnetLength, networkConfig.ServiceNetworkCIDR, networkConfig.NetworkPluginName) + var err error + master.networkInfo, err = validateClusterNetwork(networkConfig.ClusterNetworkCIDR, networkConfig.HostSubnetLength, networkConfig.ServiceNetworkCIDR, networkConfig.NetworkPluginName) if err != nil { return err } - changed, net_err := master.isClusterNetworkChanged(ni) + changed, net_err := master.isClusterNetworkChanged() if changed { - if err = master.validateNetworkConfig(ni); err != nil { + if err = master.validateNetworkConfig(); err != nil { return err } - if err = master.registry.UpdateClusterNetwork(ni); err != nil { + if err = master.registry.UpdateClusterNetwork(master.networkInfo); err != nil { return err } } else if net_err != nil { - if err = master.registry.CreateClusterNetwork(ni); err != nil { + if err = master.registry.CreateClusterNetwork(master.networkInfo); err != nil { return err } } - if err = master.SubnetStartMaster(ni.ClusterNetwork, networkConfig.HostSubnetLength); err != nil { + if err = master.SubnetStartMaster(master.networkInfo.ClusterNetwork, networkConfig.HostSubnetLength); err != nil { return err } @@ -66,12 +68,13 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie return nil } -func (master *OsdnMaster) validateNetworkConfig(ni *NetworkInfo) error { +func (master *OsdnMaster) validateNetworkConfig() error { hostIPNets, err := netutils.GetHostIPNetworks([]string{TUN, LBR}) if err != nil { return err } + ni := master.networkInfo errList := []error{} // Ensure cluster and service network don't overlap with host networks @@ -120,11 +123,12 @@ func (master *OsdnMaster) validateNetworkConfig(ni *NetworkInfo) error { return kerrors.NewAggregate(errList) } -func (master *OsdnMaster) isClusterNetworkChanged(curNetwork *NetworkInfo) (bool, error) { +func (master *OsdnMaster) isClusterNetworkChanged() (bool, error) { oldNetwork, err := master.registry.GetNetworkInfo() if err != nil { return false, err } + curNetwork := master.networkInfo if curNetwork.ClusterNetwork.String() != oldNetwork.ClusterNetwork.String() || curNetwork.HostSubnetLength != oldNetwork.HostSubnetLength || diff --git a/pkg/sdn/plugin/node.go b/pkg/sdn/plugin/node.go index cbf856d3bc32..098b11f8da3c 100644 --- a/pkg/sdn/plugin/node.go +++ b/pkg/sdn/plugin/node.go @@ -23,6 +23,7 @@ import ( type OsdnNode struct { multitenant bool registry *Registry + networkInfo *NetworkInfo localIP string localSubnet *osapi.HostSubnet hostName string @@ -78,12 +79,13 @@ func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient *kclien } func (node *OsdnNode) Start() error { - ni, err := node.registry.GetNetworkInfo() + var err error + node.networkInfo, err = node.registry.GetNetworkInfo() if err != nil { return fmt.Errorf("Failed to get network information: %v", err) } - nodeIPTables := newNodeIPTables(ni.ClusterNetwork.String(), node.iptablesSyncPeriod) + nodeIPTables := newNodeIPTables(node.networkInfo.ClusterNetwork.String(), node.iptablesSyncPeriod) if err = nodeIPTables.Setup(); err != nil { return fmt.Errorf("Failed to set up iptables: %v", err) } diff --git a/pkg/sdn/plugin/proxy.go b/pkg/sdn/plugin/proxy.go index 224a63264cf8..21e8a0010da9 100644 --- a/pkg/sdn/plugin/proxy.go +++ b/pkg/sdn/plugin/proxy.go @@ -26,6 +26,7 @@ type proxyFirewallItem struct { type ovsProxyPlugin struct { registry *Registry + networkInfo *NetworkInfo baseEndpointsHandler pconfig.EndpointsConfigHandler lock sync.Mutex @@ -48,6 +49,11 @@ func NewProxyPlugin(pluginName string, osClient *osclient.Client, kClient *kclie func (proxy *ovsProxyPlugin) Start(baseHandler pconfig.EndpointsConfigHandler) error { glog.Infof("Starting multitenant SDN proxy endpoint filter") + var err error + proxy.networkInfo, err = proxy.registry.GetNetworkInfo() + if err != nil { + return fmt.Errorf("could not get network info: %s", err) + } proxy.baseEndpointsHandler = baseHandler policies, err := proxy.registry.GetEgressNetworkPolicies() @@ -126,12 +132,6 @@ func (proxy *ovsProxyPlugin) updateEndpoints() { return } - ni, err := proxy.registry.GetNetworkInfo() - if err != nil { - glog.Warningf("Error fetching network information: %v", err) - return - } - filteredEndpoints := make([]kapi.Endpoints, 0, len(proxy.allEndpoints)) EndpointLoop: @@ -140,7 +140,7 @@ EndpointLoop: for _, ss := range ep.Subsets { for _, addr := range ss.Addresses { IP := net.ParseIP(addr.IP) - if !ni.ClusterNetwork.Contains(IP) && !ni.ServiceNetwork.Contains(IP) { + if !proxy.networkInfo.ClusterNetwork.Contains(IP) && !proxy.networkInfo.ServiceNetwork.Contains(IP) { if proxy.firewallBlocksIP(ns, IP) { glog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)", ep.ObjectMeta.Name, ns, addr.IP) continue EndpointLoop diff --git a/pkg/sdn/plugin/registry.go b/pkg/sdn/plugin/registry.go index 7e9f5ab810d6..088c38ca295e 100644 --- a/pkg/sdn/plugin/registry.go +++ b/pkg/sdn/plugin/registry.go @@ -2,7 +2,6 @@ package plugin import ( "fmt" - "net" "strings" "time" @@ -19,19 +18,9 @@ import ( osapi "github.com/openshift/origin/pkg/sdn/api" ) -type NetworkInfo struct { - ClusterNetwork *net.IPNet - ServiceNetwork *net.IPNet - HostSubnetLength uint32 - PluginName string -} - type Registry struct { oClient *osclient.Client kClient *kclient.Client - - // Cache cluster network information - NetworkInfo *NetworkInfo } type ResourceName string @@ -158,45 +147,13 @@ func (registry *Registry) CreateClusterNetwork(ni *NetworkInfo) error { return nil } -func validateClusterNetwork(network string, hostSubnetLength uint32, serviceNetwork string, pluginName string) (*NetworkInfo, error) { - _, cn, err := net.ParseCIDR(network) - if err != nil { - return nil, fmt.Errorf("Failed to parse ClusterNetwork CIDR %s: %v", network, err) - } - - _, sn, err := net.ParseCIDR(serviceNetwork) - if err != nil { - return nil, fmt.Errorf("Failed to parse ServiceNetwork CIDR %s: %v", serviceNetwork, err) - } - - if hostSubnetLength <= 0 || hostSubnetLength > 32 { - return nil, fmt.Errorf("Invalid HostSubnetLength %d (not between 1 and 32)", hostSubnetLength) - } - - return &NetworkInfo{ - ClusterNetwork: cn, - ServiceNetwork: sn, - HostSubnetLength: hostSubnetLength, - PluginName: pluginName, - }, nil -} - func (registry *Registry) GetNetworkInfo() (*NetworkInfo, error) { - // Check if we got cached network info - if registry.NetworkInfo != nil { - return registry.NetworkInfo, nil - } - cn, err := registry.oClient.ClusterNetwork().Get(osapi.ClusterNetworkDefault) if err != nil { return nil, err } - registry.NetworkInfo, err = validateClusterNetwork(cn.Network, cn.HostSubnetLength, cn.ServiceNetwork, cn.PluginName) - if err != nil { - return nil, err - } - return registry.NetworkInfo, nil + return validateClusterNetwork(cn.Network, cn.HostSubnetLength, cn.ServiceNetwork, cn.PluginName) } func (registry *Registry) GetNetNamespaces() ([]osapi.NetNamespace, error) { @@ -309,30 +266,6 @@ func (registry *Registry) RunEventQueue(resourceName ResourceName, process Proce } } -func (registry *Registry) ValidateNodeIP(nodeIP string) error { - if nodeIP == "" || nodeIP == "127.0.0.1" { - return fmt.Errorf("Invalid node IP %q", nodeIP) - } - - ni, err := registry.GetNetworkInfo() - if err != nil { - return fmt.Errorf("Failed to get network information: %v", err) - } - - // Ensure each node's NodeIP is not contained by the cluster network, - // which could cause a routing loop. (rhbz#1295486) - ipaddr := net.ParseIP(nodeIP) - if ipaddr == nil { - return fmt.Errorf("Failed to parse node IP %s", nodeIP) - } - - if ni.ClusterNetwork.Contains(ipaddr) { - return fmt.Errorf("Node IP %s conflicts with cluster network %s", nodeIP, ni.ClusterNetwork.String()) - } - - return nil -} - func clusterNetworkToString(n *osapi.ClusterNetwork) string { return fmt.Sprintf("%s (network: %q, hostSubnetBits: %d, serviceNetwork: %q, pluginName: %q)", n.Name, n.Network, n.HostSubnetLength, n.ServiceNetwork, n.PluginName) } diff --git a/pkg/sdn/plugin/subnets.go b/pkg/sdn/plugin/subnets.go index 608e4a53ee50..93cffa488088 100644 --- a/pkg/sdn/plugin/subnets.go +++ b/pkg/sdn/plugin/subnets.go @@ -28,7 +28,7 @@ func (master *OsdnMaster) SubnetStartMaster(clusterNetwork *net.IPNet, hostSubne } for _, sub := range subnets { subrange = append(subrange, sub.Subnet) - if err = master.registry.ValidateNodeIP(sub.HostIP); err != nil { + if err = master.networkInfo.validateNodeIP(sub.HostIP); err != nil { // Don't error out; just warn so the error can be corrected with 'oc' log.Errorf("Failed to validate HostSubnet %s: %v", hostSubnetToString(&sub), err) } else { @@ -47,7 +47,7 @@ func (master *OsdnMaster) SubnetStartMaster(clusterNetwork *net.IPNet, hostSubne func (master *OsdnMaster) addNode(nodeName string, nodeIP string) error { // Validate node IP before proceeding - if err := master.registry.ValidateNodeIP(nodeIP); err != nil { + if err := master.networkInfo.validateNodeIP(nodeIP); err != nil { return err } @@ -196,12 +196,7 @@ func (node *OsdnNode) SubnetStartNode(mtu uint32) (bool, error) { return false, err } - // Assume we are working with IPv4 - ni, err := node.registry.GetNetworkInfo() - if err != nil { - return false, err - } - networkChanged, err := node.SetupSDN(node.localSubnet.Subnet, ni.ClusterNetwork.String(), ni.ServiceNetwork.String(), mtu) + networkChanged, err := node.SetupSDN(node.localSubnet.Subnet, node.networkInfo.ClusterNetwork.String(), node.networkInfo.ServiceNetwork.String(), mtu) if err != nil { return false, err } @@ -231,7 +226,7 @@ func (node *OsdnNode) initSelfSubnet() error { return fmt.Errorf("Failed to get subnet for this host: %s, error: %v", node.hostName, err) } - if err = node.registry.ValidateNodeIP(subnet.HostIP); err != nil { + if err = node.networkInfo.validateNodeIP(subnet.HostIP); err != nil { return fmt.Errorf("Failed to validate own HostSubnet: %v", err) } @@ -263,7 +258,7 @@ func (node *OsdnNode) watchSubnets() { } } } - if err := node.registry.ValidateNodeIP(hs.HostIP); err != nil { + if err := node.networkInfo.validateNodeIP(hs.HostIP); err != nil { log.Warningf("Ignoring invalid subnet for node %s: %v", hs.HostIP, err) break } From a4d9f50efce650f1299125af4d82adb7d3d9f48d Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 14 Sep 2016 13:02:46 -0400 Subject: [PATCH 2/5] Move remaining NetworkInfo-related code out of registry.go --- pkg/sdn/plugin/common.go | 10 +++++++ pkg/sdn/plugin/master.go | 56 ++++++++++++++++++++++++-------------- pkg/sdn/plugin/node.go | 2 +- pkg/sdn/plugin/proxy.go | 2 +- pkg/sdn/plugin/registry.go | 43 ----------------------------- 5 files changed, 47 insertions(+), 66 deletions(-) diff --git a/pkg/sdn/plugin/common.go b/pkg/sdn/plugin/common.go index f696a73589ad..859e7a8bba21 100644 --- a/pkg/sdn/plugin/common.go +++ b/pkg/sdn/plugin/common.go @@ -5,6 +5,7 @@ import ( "net" "strings" + osclient "github.com/openshift/origin/pkg/client" osapi "github.com/openshift/origin/pkg/sdn/api" kapi "k8s.io/kubernetes/pkg/api" @@ -72,3 +73,12 @@ func (ni *NetworkInfo) validateNodeIP(nodeIP string) error { return nil } + +func getNetworkInfo(osClient *osclient.Client) (*NetworkInfo, error) { + cn, err := osClient.ClusterNetwork().Get(osapi.ClusterNetworkDefault) + if err != nil { + return nil, err + } + + return validateClusterNetwork(cn.Network, cn.HostSubnetLength, cn.ServiceNetwork, cn.PluginName) +} diff --git a/pkg/sdn/plugin/master.go b/pkg/sdn/plugin/master.go index 8eb318a50119..64fdf1b4dc1d 100644 --- a/pkg/sdn/plugin/master.go +++ b/pkg/sdn/plugin/master.go @@ -8,8 +8,11 @@ import ( osclient "github.com/openshift/origin/pkg/client" osconfigapi "github.com/openshift/origin/pkg/cmd/server/api" + osapi "github.com/openshift/origin/pkg/sdn/api" "github.com/openshift/origin/pkg/util/netutils" + kapi "k8s.io/kubernetes/pkg/api" + kapiunversioned "k8s.io/kubernetes/pkg/api/unversioned" kclient "k8s.io/kubernetes/pkg/client/unversioned" kerrors "k8s.io/kubernetes/pkg/util/errors" ) @@ -39,18 +42,45 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie return err } - changed, net_err := master.isClusterNetworkChanged() - if changed { + createConfig := false + updateConfig := false + cn, err := master.registry.oClient.ClusterNetwork().Get(osapi.ClusterNetworkDefault) + if err == nil { + if master.networkInfo.ClusterNetwork.String() != cn.Network || + master.networkInfo.HostSubnetLength != cn.HostSubnetLength || + master.networkInfo.ServiceNetwork.String() != cn.ServiceNetwork || + master.networkInfo.PluginName != cn.PluginName { + updateConfig = true + } + } else { + cn = &osapi.ClusterNetwork{ + TypeMeta: kapiunversioned.TypeMeta{Kind: "ClusterNetwork"}, + ObjectMeta: kapi.ObjectMeta{Name: osapi.ClusterNetworkDefault}, + } + createConfig = true + } + if createConfig || updateConfig { if err = master.validateNetworkConfig(); err != nil { return err } - if err = master.registry.UpdateClusterNetwork(master.networkInfo); err != nil { + cn.Network = master.networkInfo.ClusterNetwork.String() + cn.HostSubnetLength = master.networkInfo.HostSubnetLength + cn.ServiceNetwork = master.networkInfo.ServiceNetwork.String() + cn.PluginName = master.networkInfo.PluginName + } + + if createConfig { + cn, err := master.registry.oClient.ClusterNetwork().Create(cn) + if err != nil { return err } - } else if net_err != nil { - if err = master.registry.CreateClusterNetwork(master.networkInfo); err != nil { + log.Infof("Created ClusterNetwork %s", clusterNetworkToString(cn)) + } else if updateConfig { + cn, err := master.registry.oClient.ClusterNetwork().Update(cn) + if err != nil { return err } + log.Infof("Updated ClusterNetwork %s", clusterNetworkToString(cn)) } if err = master.SubnetStartMaster(master.networkInfo.ClusterNetwork, networkConfig.HostSubnetLength); err != nil { @@ -122,19 +152,3 @@ func (master *OsdnMaster) validateNetworkConfig() error { return kerrors.NewAggregate(errList) } - -func (master *OsdnMaster) isClusterNetworkChanged() (bool, error) { - oldNetwork, err := master.registry.GetNetworkInfo() - if err != nil { - return false, err - } - curNetwork := master.networkInfo - - if curNetwork.ClusterNetwork.String() != oldNetwork.ClusterNetwork.String() || - curNetwork.HostSubnetLength != oldNetwork.HostSubnetLength || - curNetwork.ServiceNetwork.String() != oldNetwork.ServiceNetwork.String() || - curNetwork.PluginName != oldNetwork.PluginName { - return true, nil - } - return false, nil -} diff --git a/pkg/sdn/plugin/node.go b/pkg/sdn/plugin/node.go index 098b11f8da3c..fc39e1663229 100644 --- a/pkg/sdn/plugin/node.go +++ b/pkg/sdn/plugin/node.go @@ -80,7 +80,7 @@ func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient *kclien func (node *OsdnNode) Start() error { var err error - node.networkInfo, err = node.registry.GetNetworkInfo() + node.networkInfo, err = getNetworkInfo(node.registry.oClient) if err != nil { return fmt.Errorf("Failed to get network information: %v", err) } diff --git a/pkg/sdn/plugin/proxy.go b/pkg/sdn/plugin/proxy.go index 21e8a0010da9..f3c00cd5a290 100644 --- a/pkg/sdn/plugin/proxy.go +++ b/pkg/sdn/plugin/proxy.go @@ -50,7 +50,7 @@ func (proxy *ovsProxyPlugin) Start(baseHandler pconfig.EndpointsConfigHandler) e glog.Infof("Starting multitenant SDN proxy endpoint filter") var err error - proxy.networkInfo, err = proxy.registry.GetNetworkInfo() + proxy.networkInfo, err = getNetworkInfo(proxy.registry.oClient) if err != nil { return fmt.Errorf("could not get network info: %s", err) } diff --git a/pkg/sdn/plugin/registry.go b/pkg/sdn/plugin/registry.go index 088c38ca295e..113ffffc05cd 100644 --- a/pkg/sdn/plugin/registry.go +++ b/pkg/sdn/plugin/registry.go @@ -113,49 +113,6 @@ func (registry *Registry) GetPod(nodeName, namespace, podName string) (*kapi.Pod return nil, nil } -func (registry *Registry) UpdateClusterNetwork(ni *NetworkInfo) error { - cn, err := registry.oClient.ClusterNetwork().Get(osapi.ClusterNetworkDefault) - if err != nil { - return err - } - cn.Network = ni.ClusterNetwork.String() - cn.HostSubnetLength = ni.HostSubnetLength - cn.ServiceNetwork = ni.ServiceNetwork.String() - cn.PluginName = ni.PluginName - updatedNetwork, err := registry.oClient.ClusterNetwork().Update(cn) - if err != nil { - return err - } - log.Infof("Updated ClusterNetwork %s", clusterNetworkToString(updatedNetwork)) - return nil -} - -func (registry *Registry) CreateClusterNetwork(ni *NetworkInfo) error { - cn := &osapi.ClusterNetwork{ - TypeMeta: unversioned.TypeMeta{Kind: "ClusterNetwork"}, - ObjectMeta: kapi.ObjectMeta{Name: osapi.ClusterNetworkDefault}, - Network: ni.ClusterNetwork.String(), - HostSubnetLength: ni.HostSubnetLength, - ServiceNetwork: ni.ServiceNetwork.String(), - PluginName: ni.PluginName, - } - updatedNetwork, err := registry.oClient.ClusterNetwork().Create(cn) - if err != nil { - return err - } - log.Infof("Created ClusterNetwork %s", clusterNetworkToString(updatedNetwork)) - return nil -} - -func (registry *Registry) GetNetworkInfo() (*NetworkInfo, error) { - cn, err := registry.oClient.ClusterNetwork().Get(osapi.ClusterNetworkDefault) - if err != nil { - return nil, err - } - - return validateClusterNetwork(cn.Network, cn.HostSubnetLength, cn.ServiceNetwork, cn.PluginName) -} - func (registry *Registry) GetNetNamespaces() ([]osapi.NetNamespace, error) { netNamespaceList, err := registry.oClient.NetNamespaces().List(kapi.ListOptions{}) if err != nil { From da69710403571c69592f4a70b2b042aafc7c7cdd Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 29 Aug 2016 09:57:55 -0400 Subject: [PATCH 3/5] Make NetworkInfo just cache cluster/service CIDRs Nothing needs to have the other info cached --- pkg/sdn/plugin/common.go | 28 +++++++++++----------------- pkg/sdn/plugin/master.go | 15 +++++++++------ 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/pkg/sdn/plugin/common.go b/pkg/sdn/plugin/common.go index 859e7a8bba21..f66c040e8125 100644 --- a/pkg/sdn/plugin/common.go +++ b/pkg/sdn/plugin/common.go @@ -26,32 +26,23 @@ func hostSubnetToString(subnet *osapi.HostSubnet) string { } type NetworkInfo struct { - ClusterNetwork *net.IPNet - ServiceNetwork *net.IPNet - HostSubnetLength uint32 - PluginName string + ClusterNetwork *net.IPNet + ServiceNetwork *net.IPNet } -func validateClusterNetwork(network string, hostSubnetLength uint32, serviceNetwork string, pluginName string) (*NetworkInfo, error) { - _, cn, err := net.ParseCIDR(network) +func parseNetworkInfo(clusterNetwork string, serviceNetwork string) (*NetworkInfo, error) { + _, cn, err := net.ParseCIDR(clusterNetwork) if err != nil { - return nil, fmt.Errorf("Failed to parse ClusterNetwork CIDR %s: %v", network, err) + return nil, fmt.Errorf("Failed to parse ClusterNetwork CIDR %s: %v", clusterNetwork, err) } - _, sn, err := net.ParseCIDR(serviceNetwork) if err != nil { return nil, fmt.Errorf("Failed to parse ServiceNetwork CIDR %s: %v", serviceNetwork, err) } - if hostSubnetLength <= 0 || hostSubnetLength > 32 { - return nil, fmt.Errorf("Invalid HostSubnetLength %d (not between 1 and 32)", hostSubnetLength) - } - return &NetworkInfo{ - ClusterNetwork: cn, - ServiceNetwork: sn, - HostSubnetLength: hostSubnetLength, - PluginName: pluginName, + ClusterNetwork: cn, + ServiceNetwork: sn, }, nil } @@ -70,6 +61,9 @@ func (ni *NetworkInfo) validateNodeIP(nodeIP string) error { if ni.ClusterNetwork.Contains(ipaddr) { return fmt.Errorf("Node IP %s conflicts with cluster network %s", nodeIP, ni.ClusterNetwork.String()) } + if ni.ServiceNetwork.Contains(ipaddr) { + return fmt.Errorf("Node IP %s conflicts with service network %s", nodeIP, ni.ServiceNetwork.String()) + } return nil } @@ -80,5 +74,5 @@ func getNetworkInfo(osClient *osclient.Client) (*NetworkInfo, error) { return nil, err } - return validateClusterNetwork(cn.Network, cn.HostSubnetLength, cn.ServiceNetwork, cn.PluginName) + return parseNetworkInfo(cn.Network, cn.ServiceNetwork) } diff --git a/pkg/sdn/plugin/master.go b/pkg/sdn/plugin/master.go index 64fdf1b4dc1d..426963330cf7 100644 --- a/pkg/sdn/plugin/master.go +++ b/pkg/sdn/plugin/master.go @@ -35,9 +35,8 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie registry: newRegistry(osClient, kClient), } - // Validate command-line/config parameters var err error - master.networkInfo, err = validateClusterNetwork(networkConfig.ClusterNetworkCIDR, networkConfig.HostSubnetLength, networkConfig.ServiceNetworkCIDR, networkConfig.NetworkPluginName) + master.networkInfo, err = parseNetworkInfo(networkConfig.ClusterNetworkCIDR, networkConfig.ServiceNetworkCIDR) if err != nil { return err } @@ -47,9 +46,9 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie cn, err := master.registry.oClient.ClusterNetwork().Get(osapi.ClusterNetworkDefault) if err == nil { if master.networkInfo.ClusterNetwork.String() != cn.Network || - master.networkInfo.HostSubnetLength != cn.HostSubnetLength || + networkConfig.HostSubnetLength != cn.HostSubnetLength || master.networkInfo.ServiceNetwork.String() != cn.ServiceNetwork || - master.networkInfo.PluginName != cn.PluginName { + networkConfig.NetworkPluginName != cn.PluginName { updateConfig = true } } else { @@ -63,10 +62,14 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie if err = master.validateNetworkConfig(); err != nil { return err } + size, len := master.networkInfo.ClusterNetwork.Mask.Size() + if networkConfig.HostSubnetLength < 1 || networkConfig.HostSubnetLength >= uint32(len-size) { + return fmt.Errorf("invalid HostSubnetLength %d for network %s (must be from 1 to %d)", networkConfig.HostSubnetLength, networkConfig.ClusterNetworkCIDR, len-size) + } cn.Network = master.networkInfo.ClusterNetwork.String() - cn.HostSubnetLength = master.networkInfo.HostSubnetLength + cn.HostSubnetLength = networkConfig.HostSubnetLength cn.ServiceNetwork = master.networkInfo.ServiceNetwork.String() - cn.PluginName = master.networkInfo.PluginName + cn.PluginName = networkConfig.NetworkPluginName } if createConfig { From b94e4247cfdbb2e90c5ebd0f36a12867bb356796 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 29 Aug 2016 13:05:21 -0400 Subject: [PATCH 4/5] Kill off SDN "Registry" type This used to be an abstraction between kclient-based access and direct etcd access, back when openshift-sdn could be compiled standalone, but now it's just cruft. --- pkg/sdn/plugin/common.go | 74 +++++++++++ pkg/sdn/plugin/controller.go | 10 +- pkg/sdn/plugin/master.go | 20 +-- pkg/sdn/plugin/node.go | 29 ++++- pkg/sdn/plugin/plugin.go | 23 +++- pkg/sdn/plugin/proxy.go | 14 +- pkg/sdn/plugin/registry.go | 228 --------------------------------- pkg/sdn/plugin/subnets.go | 31 +++-- pkg/sdn/plugin/vnids_master.go | 42 +++--- pkg/sdn/plugin/vnids_node.go | 21 +-- 10 files changed, 200 insertions(+), 292 deletions(-) delete mode 100644 pkg/sdn/plugin/registry.go diff --git a/pkg/sdn/plugin/common.go b/pkg/sdn/plugin/common.go index f66c040e8125..39fa62de95e5 100644 --- a/pkg/sdn/plugin/common.go +++ b/pkg/sdn/plugin/common.go @@ -4,11 +4,16 @@ import ( "fmt" "net" "strings" + "time" + + "github.com/golang/glog" osclient "github.com/openshift/origin/pkg/client" osapi "github.com/openshift/origin/pkg/sdn/api" kapi "k8s.io/kubernetes/pkg/api" + kcache "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/fields" ) func getPodContainerID(pod *kapi.Pod) string { @@ -25,6 +30,10 @@ func hostSubnetToString(subnet *osapi.HostSubnet) string { return fmt.Sprintf("%s (host: %q, ip: %q, subnet: %q)", subnet.Name, subnet.Host, subnet.HostIP, subnet.Subnet) } +func clusterNetworkToString(n *osapi.ClusterNetwork) string { + return fmt.Sprintf("%s (network: %q, hostSubnetBits: %d, serviceNetwork: %q, pluginName: %q)", n.Name, n.Network, n.HostSubnetLength, n.ServiceNetwork, n.PluginName) +} + type NetworkInfo struct { ClusterNetwork *net.IPNet ServiceNetwork *net.IPNet @@ -76,3 +85,68 @@ func getNetworkInfo(osClient *osclient.Client) (*NetworkInfo, error) { return parseNetworkInfo(cn.Network, cn.ServiceNetwork) } + +type ResourceName string + +const ( + Nodes ResourceName = "Nodes" + Namespaces ResourceName = "Namespaces" + NetNamespaces ResourceName = "NetNamespaces" + Services ResourceName = "Services" + HostSubnets ResourceName = "HostSubnets" + Pods ResourceName = "Pods" + EgressNetworkPolicies ResourceName = "EgressNetworkPolicies" +) + +// Run event queue for the given resource. The 'process' function is called +// repeatedly with each available cache.Delta that describes state changes +// to an object. If the process function returns an error queued changes +// for that object are dropped but processing continues with the next available +// object's cache.Deltas. The error is logged with call stack information. +func runEventQueueForResource(client kcache.Getter, resourceName ResourceName, expectedType interface{}, selector fields.Selector, process ProcessEventFunc) { + rn := strings.ToLower(string(resourceName)) + lw := kcache.NewListWatchFromClient(client, rn, kapi.NamespaceAll, selector) + eventQueue := NewEventQueue(kcache.MetaNamespaceKeyFunc) + // Repopulate event queue every 30 mins + // Existing items in the event queue will have watch.Modified event type + kcache.NewReflector(lw, expectedType, eventQueue, 30*time.Minute).Run() + + // Run the queue + for { + eventQueue.Pop(process) + } +} + +// Run event queue for the given resource +func RunEventQueue(client kcache.Getter, resourceName ResourceName, process ProcessEventFunc) { + var expectedType interface{} + + switch resourceName { + case HostSubnets: + expectedType = &osapi.HostSubnet{} + case NetNamespaces: + expectedType = &osapi.NetNamespace{} + case Nodes: + expectedType = &kapi.Node{} + case Namespaces: + expectedType = &kapi.Namespace{} + case Services: + expectedType = &kapi.Service{} + case Pods: + expectedType = &kapi.Pod{} + case EgressNetworkPolicies: + expectedType = &osapi.EgressNetworkPolicy{} + default: + glog.Fatalf("Unknown resource %s during initialization of event queue", resourceName) + } + + runEventQueueForResource(client, resourceName, expectedType, fields.Everything(), process) +} + +func RunLocalPodsEventQueue(client kcache.Getter, nodeName string, process ProcessEventFunc) { + if nodeName == "" { + glog.Fatalf("LocalPods resource requires a node name") + } + + runEventQueueForResource(client, Pods, &kapi.Pod{}, fields.Set{"spec.host": nodeName}.AsSelector(), process) +} diff --git a/pkg/sdn/plugin/controller.go b/pkg/sdn/plugin/controller.go index c2e6f5e176af..e3e5b6e3820b 100644 --- a/pkg/sdn/plugin/controller.go +++ b/pkg/sdn/plugin/controller.go @@ -357,7 +357,7 @@ func (plugin *OsdnNode) SetupSDN(localSubnetCIDR, clusterNetworkCIDR, servicesNe } func (plugin *OsdnNode) updateEgressNetworkPolicyFailureLabel(failure bool) error { - node, err := plugin.registry.kClient.Nodes().Get(plugin.hostName) + node, err := plugin.kClient.Nodes().Get(plugin.hostName) if err != nil { return err } @@ -374,12 +374,12 @@ func (plugin *OsdnNode) updateEgressNetworkPolicyFailureLabel(failure bool) erro delete(node.Labels, EgressNetworkPolicyFailureLabel) } - _, err = plugin.registry.kClient.Nodes().UpdateStatus(node) + _, err = plugin.kClient.Nodes().UpdateStatus(node) return err } func (plugin *OsdnNode) SetupEgressNetworkPolicy() error { - policies, err := plugin.registry.GetEgressNetworkPolicies() + policies, err := plugin.osClient.EgressNetworkPolicies(kapi.NamespaceAll).List(kapi.ListOptions{}) if err != nil { if kapierrs.IsForbidden(err) { // 1.3 node running with 1.2-bootstrapped policies @@ -398,7 +398,7 @@ func (plugin *OsdnNode) SetupEgressNetworkPolicy() error { } } - for _, policy := range policies { + for _, policy := range policies.Items { vnid, err := plugin.vnids.GetVNID(policy.Namespace) if err != nil { glog.Warningf("Could not find netid for namespace %q: %v", policy.Namespace, err) @@ -419,7 +419,7 @@ func (plugin *OsdnNode) SetupEgressNetworkPolicy() error { } func (plugin *OsdnNode) watchEgressNetworkPolicies() { - plugin.registry.RunEventQueue(EgressNetworkPolicies, func(delta cache.Delta) error { + RunEventQueue(plugin.osClient, EgressNetworkPolicies, func(delta cache.Delta) error { policy := delta.Object.(*osapi.EgressNetworkPolicy) vnid, err := plugin.vnids.GetVNID(policy.Namespace) diff --git a/pkg/sdn/plugin/master.go b/pkg/sdn/plugin/master.go index 426963330cf7..30b75e52fb60 100644 --- a/pkg/sdn/plugin/master.go +++ b/pkg/sdn/plugin/master.go @@ -18,7 +18,8 @@ import ( ) type OsdnMaster struct { - registry *Registry + kClient *kclient.Client + osClient *osclient.Client networkInfo *NetworkInfo subnetAllocator *netutils.SubnetAllocator vnids *masterVNIDMap @@ -32,7 +33,8 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie log.Infof("Initializing SDN master of type %q", networkConfig.NetworkPluginName) master := &OsdnMaster{ - registry: newRegistry(osClient, kClient), + kClient: kClient, + osClient: osClient, } var err error @@ -43,7 +45,7 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie createConfig := false updateConfig := false - cn, err := master.registry.oClient.ClusterNetwork().Get(osapi.ClusterNetworkDefault) + cn, err := master.osClient.ClusterNetwork().Get(osapi.ClusterNetworkDefault) if err == nil { if master.networkInfo.ClusterNetwork.String() != cn.Network || networkConfig.HostSubnetLength != cn.HostSubnetLength || @@ -73,13 +75,13 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie } if createConfig { - cn, err := master.registry.oClient.ClusterNetwork().Create(cn) + cn, err := master.osClient.ClusterNetwork().Create(cn) if err != nil { return err } log.Infof("Created ClusterNetwork %s", clusterNetworkToString(cn)) } else if updateConfig { - cn, err := master.registry.oClient.ClusterNetwork().Update(cn) + cn, err := master.osClient.ClusterNetwork().Update(cn) if err != nil { return err } @@ -127,11 +129,11 @@ func (master *OsdnMaster) validateNetworkConfig() error { } // Ensure each host subnet is within the cluster network - subnets, err := master.registry.GetSubnets() + subnets, err := master.osClient.HostSubnets().List(kapi.ListOptions{}) if err != nil { return fmt.Errorf("Error in initializing/fetching subnets: %v", err) } - for _, sub := range subnets { + for _, sub := range subnets.Items { subnetIP, _, _ := net.ParseCIDR(sub.Subnet) if subnetIP == nil { errList = append(errList, fmt.Errorf("Failed to parse network address: %s", sub.Subnet)) @@ -143,11 +145,11 @@ func (master *OsdnMaster) validateNetworkConfig() error { } // Ensure each service is within the services network - services, err := master.registry.GetServices() + services, err := master.kClient.Services(kapi.NamespaceAll).List(kapi.ListOptions{}) if err != nil { return err } - for _, svc := range services { + for _, svc := range services.Items { if !ni.ServiceNetwork.Contains(net.ParseIP(svc.Spec.ClusterIP)) { errList = append(errList, fmt.Errorf("Error: Existing service with IP: %s is not part of service network: %s", svc.Spec.ClusterIP, ni.ServiceNetwork.String())) } diff --git a/pkg/sdn/plugin/node.go b/pkg/sdn/plugin/node.go index fc39e1663229..65aa18174182 100644 --- a/pkg/sdn/plugin/node.go +++ b/pkg/sdn/plugin/node.go @@ -15,14 +15,17 @@ import ( kapi "k8s.io/kubernetes/pkg/api" kclient "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/fields" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/labels" kexec "k8s.io/kubernetes/pkg/util/exec" kubeutilnet "k8s.io/kubernetes/pkg/util/net" ) type OsdnNode struct { multitenant bool - registry *Registry + kClient *kclient.Client + osClient *osclient.Client networkInfo *NetworkInfo localIP string localSubnet *osapi.HostSubnet @@ -66,7 +69,8 @@ func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient *kclien plugin := &OsdnNode{ multitenant: IsOpenShiftMultitenantNetworkPlugin(pluginName), - registry: newRegistry(osClient, kClient), + kClient: kClient, + osClient: osClient, localIP: selfIP, hostName: hostname, vnids: newNodeVNIDMap(), @@ -80,7 +84,7 @@ func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient *kclien func (node *OsdnNode) Start() error { var err error - node.networkInfo, err = getNetworkInfo(node.registry.oClient) + node.networkInfo, err = getNetworkInfo(node.osClient) if err != nil { return fmt.Errorf("Failed to get network information: %v", err) } @@ -126,7 +130,24 @@ func (node *OsdnNode) Start() error { } func (node *OsdnNode) GetLocalPods(namespace string) ([]kapi.Pod, error) { - return node.registry.GetRunningPods(node.hostName, namespace) + fieldSelector := fields.Set{"spec.host": node.hostName}.AsSelector() + opts := kapi.ListOptions{ + LabelSelector: labels.Everything(), + FieldSelector: fieldSelector, + } + podList, err := node.kClient.Pods(namespace).List(opts) + if err != nil { + return nil, err + } + + // Filter running pods + pods := make([]kapi.Pod, 0, len(podList.Items)) + for _, pod := range podList.Items { + if pod.Status.Phase == kapi.PodRunning { + pods = append(pods, pod) + } + } + return pods, nil } func (node *OsdnNode) markPodNetworkReady() { diff --git a/pkg/sdn/plugin/plugin.go b/pkg/sdn/plugin/plugin.go index 608d37d1002c..0b592c076240 100644 --- a/pkg/sdn/plugin/plugin.go +++ b/pkg/sdn/plugin/plugin.go @@ -11,8 +11,10 @@ import ( kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/apis/componentconfig" + "k8s.io/kubernetes/pkg/fields" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/container" knetwork "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/labels" utilsets "k8s.io/kubernetes/pkg/util/sets" ) @@ -153,13 +155,32 @@ func getScriptError(output []byte) string { return string(output) } +func (plugin *OsdnNode) getPod(nodeName, namespace, podName string) (*kapi.Pod, error) { + fieldSelector := fields.Set{"spec.host": nodeName}.AsSelector() + opts := kapi.ListOptions{ + LabelSelector: labels.Everything(), + FieldSelector: fieldSelector, + } + podList, err := plugin.kClient.Pods(namespace).List(opts) + if err != nil { + return nil, err + } + + for _, pod := range podList.Items { + if pod.ObjectMeta.Name == podName { + return &pod, nil + } + } + return nil, nil +} + func (plugin *OsdnNode) SetUpPod(namespace string, name string, id kubeletTypes.ContainerID) error { err := plugin.WaitForPodNetworkReady() if err != nil { return err } - pod, err := plugin.registry.GetPod(plugin.hostName, namespace, name) + pod, err := plugin.getPod(plugin.hostName, namespace, name) if err != nil { return err } diff --git a/pkg/sdn/plugin/proxy.go b/pkg/sdn/plugin/proxy.go index f3c00cd5a290..0288672a3e0e 100644 --- a/pkg/sdn/plugin/proxy.go +++ b/pkg/sdn/plugin/proxy.go @@ -25,7 +25,8 @@ type proxyFirewallItem struct { } type ovsProxyPlugin struct { - registry *Registry + kClient *kclient.Client + osClient *osclient.Client networkInfo *NetworkInfo baseEndpointsHandler pconfig.EndpointsConfigHandler @@ -41,7 +42,8 @@ func NewProxyPlugin(pluginName string, osClient *osclient.Client, kClient *kclie } return &ovsProxyPlugin{ - registry: newRegistry(osClient, kClient), + kClient: kClient, + osClient: osClient, firewall: make(map[string][]proxyFirewallItem), }, nil } @@ -50,13 +52,13 @@ func (proxy *ovsProxyPlugin) Start(baseHandler pconfig.EndpointsConfigHandler) e glog.Infof("Starting multitenant SDN proxy endpoint filter") var err error - proxy.networkInfo, err = getNetworkInfo(proxy.registry.oClient) + proxy.networkInfo, err = getNetworkInfo(proxy.osClient) if err != nil { return fmt.Errorf("could not get network info: %s", err) } proxy.baseEndpointsHandler = baseHandler - policies, err := proxy.registry.GetEgressNetworkPolicies() + policies, err := proxy.osClient.EgressNetworkPolicies(kapi.NamespaceAll).List(kapi.ListOptions{}) if err != nil { if kapierrs.IsForbidden(err) { // controller.go will log an error about this @@ -64,7 +66,7 @@ func (proxy *ovsProxyPlugin) Start(baseHandler pconfig.EndpointsConfigHandler) e } return fmt.Errorf("could not get EgressNetworkPolicies: %s", err) } - for _, policy := range policies { + for _, policy := range policies.Items { proxy.updateNetworkPolicy(policy) } @@ -73,7 +75,7 @@ func (proxy *ovsProxyPlugin) Start(baseHandler pconfig.EndpointsConfigHandler) e } func (proxy *ovsProxyPlugin) watchEgressNetworkPolicies() { - proxy.registry.RunEventQueue(EgressNetworkPolicies, func(delta cache.Delta) error { + RunEventQueue(proxy.osClient, EgressNetworkPolicies, func(delta cache.Delta) error { policy := delta.Object.(*osapi.EgressNetworkPolicy) if delta.Type == cache.Deleted { policy.Spec.Egress = nil diff --git a/pkg/sdn/plugin/registry.go b/pkg/sdn/plugin/registry.go deleted file mode 100644 index 113ffffc05cd..000000000000 --- a/pkg/sdn/plugin/registry.go +++ /dev/null @@ -1,228 +0,0 @@ -package plugin - -import ( - "fmt" - "strings" - "time" - - log "github.com/golang/glog" - - kapi "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/client/cache" - kclient "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/labels" - - osclient "github.com/openshift/origin/pkg/client" - osapi "github.com/openshift/origin/pkg/sdn/api" -) - -type Registry struct { - oClient *osclient.Client - kClient *kclient.Client -} - -type ResourceName string - -const ( - Nodes ResourceName = "Nodes" - Namespaces ResourceName = "Namespaces" - NetNamespaces ResourceName = "NetNamespaces" - Services ResourceName = "Services" - HostSubnets ResourceName = "HostSubnets" - Pods ResourceName = "Pods" - EgressNetworkPolicies ResourceName = "EgressNetworkPolicies" -) - -func newRegistry(osClient *osclient.Client, kClient *kclient.Client) *Registry { - return &Registry{ - oClient: osClient, - kClient: kClient, - } -} - -func (registry *Registry) GetSubnets() ([]osapi.HostSubnet, error) { - hostSubnetList, err := registry.oClient.HostSubnets().List(kapi.ListOptions{}) - if err != nil { - return nil, err - } - return hostSubnetList.Items, nil -} - -func (registry *Registry) GetSubnet(nodeName string) (*osapi.HostSubnet, error) { - return registry.oClient.HostSubnets().Get(nodeName) -} - -func (registry *Registry) DeleteSubnet(nodeName string) error { - return registry.oClient.HostSubnets().Delete(nodeName) -} - -func (registry *Registry) CreateSubnet(nodeName, nodeIP, subnetCIDR string) (*osapi.HostSubnet, error) { - hs := &osapi.HostSubnet{ - TypeMeta: unversioned.TypeMeta{Kind: "HostSubnet"}, - ObjectMeta: kapi.ObjectMeta{Name: nodeName}, - Host: nodeName, - HostIP: nodeIP, - Subnet: subnetCIDR, - } - return registry.oClient.HostSubnets().Create(hs) -} - -func (registry *Registry) UpdateSubnet(hs *osapi.HostSubnet) (*osapi.HostSubnet, error) { - return registry.oClient.HostSubnets().Update(hs) -} - -func (registry *Registry) GetRunningPods(nodeName, namespace string) ([]kapi.Pod, error) { - fieldSelector := fields.Set{"spec.host": nodeName}.AsSelector() - opts := kapi.ListOptions{ - LabelSelector: labels.Everything(), - FieldSelector: fieldSelector, - } - podList, err := registry.kClient.Pods(namespace).List(opts) - if err != nil { - return nil, err - } - - // Filter running pods - pods := make([]kapi.Pod, 0, len(podList.Items)) - for _, pod := range podList.Items { - if pod.Status.Phase == kapi.PodRunning { - pods = append(pods, pod) - } - } - return pods, nil -} - -func (registry *Registry) GetPod(nodeName, namespace, podName string) (*kapi.Pod, error) { - fieldSelector := fields.Set{"spec.host": nodeName}.AsSelector() - opts := kapi.ListOptions{ - LabelSelector: labels.Everything(), - FieldSelector: fieldSelector, - } - podList, err := registry.kClient.Pods(namespace).List(opts) - if err != nil { - return nil, err - } - - for _, pod := range podList.Items { - if pod.ObjectMeta.Name == podName { - return &pod, nil - } - } - return nil, nil -} - -func (registry *Registry) GetNetNamespaces() ([]osapi.NetNamespace, error) { - netNamespaceList, err := registry.oClient.NetNamespaces().List(kapi.ListOptions{}) - if err != nil { - return nil, err - } - return netNamespaceList.Items, nil -} - -func (registry *Registry) GetNetNamespace(name string) (*osapi.NetNamespace, error) { - return registry.oClient.NetNamespaces().Get(name) -} - -func (registry *Registry) CreateNetNamespace(name string, id uint32) error { - netns := &osapi.NetNamespace{ - TypeMeta: unversioned.TypeMeta{Kind: "NetNamespace"}, - ObjectMeta: kapi.ObjectMeta{Name: name}, - NetName: name, - NetID: id, - } - _, err := registry.oClient.NetNamespaces().Create(netns) - return err -} - -func (registry *Registry) UpdateNetNamespace(netns *osapi.NetNamespace) (*osapi.NetNamespace, error) { - return registry.oClient.NetNamespaces().Update(netns) -} - -func (registry *Registry) DeleteNetNamespace(name string) error { - return registry.oClient.NetNamespaces().Delete(name) -} - -func (registry *Registry) GetServicesForNamespace(namespace string) ([]kapi.Service, error) { - return registry.getServices(namespace) -} - -func (registry *Registry) GetServices() ([]kapi.Service, error) { - return registry.getServices(kapi.NamespaceAll) -} - -func (registry *Registry) getServices(namespace string) ([]kapi.Service, error) { - kServList, err := registry.kClient.Services(namespace).List(kapi.ListOptions{}) - if err != nil { - return nil, err - } - - servList := make([]kapi.Service, 0, len(kServList.Items)) - for _, service := range kServList.Items { - if !kapi.IsServiceIPSet(&service) { - continue - } - servList = append(servList, service) - } - return servList, nil -} - -func (registry *Registry) GetEgressNetworkPolicies() ([]osapi.EgressNetworkPolicy, error) { - policyList, err := registry.oClient.EgressNetworkPolicies(kapi.NamespaceAll).List(kapi.ListOptions{}) - if err != nil { - return nil, err - } - return policyList.Items, nil -} - -// Run event queue for the given resource. The 'process' function is called -// repeatedly with each available cache.Delta that describes state changes -// to an object. If the process function returns an error queued changes -// for that object are dropped but processing continues with the next available -// object's cache.Deltas. The error is logged with call stack information. -func (registry *Registry) RunEventQueue(resourceName ResourceName, process ProcessEventFunc) { - var client cache.Getter - var expectedType interface{} - - switch resourceName { - case HostSubnets: - expectedType = &osapi.HostSubnet{} - client = registry.oClient - case NetNamespaces: - expectedType = &osapi.NetNamespace{} - client = registry.oClient - case Nodes: - expectedType = &kapi.Node{} - client = registry.kClient - case Namespaces: - expectedType = &kapi.Namespace{} - client = registry.kClient - case Services: - expectedType = &kapi.Service{} - client = registry.kClient - case Pods: - expectedType = &kapi.Pod{} - client = registry.kClient - case EgressNetworkPolicies: - expectedType = &osapi.EgressNetworkPolicy{} - client = registry.oClient - default: - log.Fatalf("Unknown resource %s during initialization of event queue", resourceName) - } - - lw := cache.NewListWatchFromClient(client, strings.ToLower(string(resourceName)), kapi.NamespaceAll, fields.Everything()) - eventQueue := NewEventQueue(cache.MetaNamespaceKeyFunc) - // Repopulate event queue every 30 mins - // Existing items in the event queue will have watch.Modified event type - cache.NewReflector(lw, expectedType, eventQueue, 30*time.Minute).Run() - - // Run the queue - for { - eventQueue.Pop(process) - } -} - -func clusterNetworkToString(n *osapi.ClusterNetwork) string { - return fmt.Sprintf("%s (network: %q, hostSubnetBits: %d, serviceNetwork: %q, pluginName: %q)", n.Name, n.Network, n.HostSubnetLength, n.ServiceNetwork, n.PluginName) -} diff --git a/pkg/sdn/plugin/subnets.go b/pkg/sdn/plugin/subnets.go index 93cffa488088..b3a0abf906eb 100644 --- a/pkg/sdn/plugin/subnets.go +++ b/pkg/sdn/plugin/subnets.go @@ -21,12 +21,12 @@ import ( func (master *OsdnMaster) SubnetStartMaster(clusterNetwork *net.IPNet, hostSubnetLength uint32) error { subrange := make([]string, 0) - subnets, err := master.registry.GetSubnets() + subnets, err := master.osClient.HostSubnets().List(kapi.ListOptions{}) if err != nil { log.Errorf("Error in initializing/fetching subnets: %v", err) return err } - for _, sub := range subnets { + for _, sub := range subnets.Items { subrange = append(subrange, sub.Subnet) if err = master.networkInfo.validateNodeIP(sub.HostIP); err != nil { // Don't error out; just warn so the error can be corrected with 'oc' @@ -52,14 +52,14 @@ func (master *OsdnMaster) addNode(nodeName string, nodeIP string) error { } // Check if subnet needs to be created or updated - sub, err := master.registry.GetSubnet(nodeName) + sub, err := master.osClient.HostSubnets().Get(nodeName) if err == nil { if sub.HostIP == nodeIP { return nil } else { // Node IP changed, update old subnet sub.HostIP = nodeIP - sub, err = master.registry.UpdateSubnet(sub) + sub, err = master.osClient.HostSubnets().Update(sub) if err != nil { return fmt.Errorf("Error updating subnet %s for node %s: %v", sub.Subnet, nodeName, err) } @@ -74,7 +74,14 @@ func (master *OsdnMaster) addNode(nodeName string, nodeIP string) error { return fmt.Errorf("Error allocating network for node %s: %v", nodeName, err) } - sub, err = master.registry.CreateSubnet(nodeName, nodeIP, sn.String()) + sub = &osapi.HostSubnet{ + TypeMeta: kapiunversioned.TypeMeta{Kind: "HostSubnet"}, + ObjectMeta: kapi.ObjectMeta{Name: nodeName}, + Host: nodeName, + HostIP: nodeIP, + Subnet: sn.String(), + } + sub, err = master.osClient.HostSubnets().Create(sub) if err != nil { master.subnetAllocator.ReleaseNetwork(sn) return fmt.Errorf("Error creating subnet %s for node %s: %v", sn.String(), nodeName, err) @@ -84,7 +91,7 @@ func (master *OsdnMaster) addNode(nodeName string, nodeIP string) error { } func (master *OsdnMaster) deleteNode(nodeName string) error { - sub, err := master.registry.GetSubnet(nodeName) + sub, err := master.osClient.HostSubnets().Get(nodeName) if err != nil { return fmt.Errorf("Error fetching subnet for node %q for deletion: %v", nodeName, err) } @@ -93,7 +100,7 @@ func (master *OsdnMaster) deleteNode(nodeName string) error { return fmt.Errorf("Error parsing subnet %q for node %q for deletion: %v", sub.Subnet, nodeName, err) } master.subnetAllocator.ReleaseNetwork(ipnet) - err = master.registry.DeleteSubnet(nodeName) + err = master.osClient.HostSubnets().Delete(nodeName) if err != nil { return fmt.Errorf("Error deleting subnet %v for node %q: %v", sub, nodeName, err) } @@ -123,7 +130,7 @@ func (master *OsdnMaster) clearInitialNodeNetworkUnavailableCondition(node *kapi var err error if knode != node { - knode, err = master.registry.kClient.Nodes().Get(node.ObjectMeta.Name) + knode, err = master.kClient.Nodes().Get(node.ObjectMeta.Name) if err != nil { return err } @@ -136,7 +143,7 @@ func (master *OsdnMaster) clearInitialNodeNetworkUnavailableCondition(node *kapi condition.Reason = "RouteCreated" condition.Message = "openshift-sdn cleared kubelet-set NoRouteCreated" condition.LastTransitionTime = kapiunversioned.Now() - knode, err = master.registry.kClient.Nodes().UpdateStatus(knode) + knode, err = master.kClient.Nodes().UpdateStatus(knode) if err == nil { cleared = true } @@ -152,7 +159,7 @@ func (master *OsdnMaster) clearInitialNodeNetworkUnavailableCondition(node *kapi func (master *OsdnMaster) watchNodes() { nodeAddressMap := map[types.UID]string{} - master.registry.RunEventQueue(Nodes, func(delta cache.Delta) error { + RunEventQueue(master.kClient, Nodes, func(delta cache.Delta) error { node := delta.Object.(*kapi.Node) name := node.ObjectMeta.Name uid := node.ObjectMeta.UID @@ -215,7 +222,7 @@ func (node *OsdnNode) initSelfSubnet() error { // Try every retryInterval and bail-out if it exceeds max retries for i := 0; i < retries; i++ { // Get subnet for current node - subnet, err = node.registry.GetSubnet(node.hostName) + subnet, err = node.osClient.HostSubnets().Get(node.hostName) if err == nil { break } @@ -238,7 +245,7 @@ func (node *OsdnNode) initSelfSubnet() error { // Only run on the nodes func (node *OsdnNode) watchSubnets() { subnets := make(map[string]*osapi.HostSubnet) - node.registry.RunEventQueue(HostSubnets, func(delta cache.Delta) error { + RunEventQueue(node.osClient, HostSubnets, func(delta cache.Delta) error { hs := delta.Object.(*osapi.HostSubnet) if hs.HostIP == node.localIP { return nil diff --git a/pkg/sdn/plugin/vnids_master.go b/pkg/sdn/plugin/vnids_master.go index 3d1ea7f26d00..cac4d2fa4577 100644 --- a/pkg/sdn/plugin/vnids_master.go +++ b/pkg/sdn/plugin/vnids_master.go @@ -7,10 +7,12 @@ import ( log "github.com/golang/glog" kapi "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/util/sets" utilwait "k8s.io/kubernetes/pkg/util/wait" + osclient "github.com/openshift/origin/pkg/client" osapi "github.com/openshift/origin/pkg/sdn/api" pnetid "github.com/openshift/origin/pkg/sdn/plugin/netid" ) @@ -69,13 +71,13 @@ func (vmap *masterVNIDMap) isAdminNamespace(nsName string) bool { return false } -func (vmap *masterVNIDMap) populateVNIDs(registry *Registry) error { - netnsList, err := registry.GetNetNamespaces() +func (vmap *masterVNIDMap) populateVNIDs(osClient *osclient.Client) error { + netnsList, err := osClient.NetNamespaces().List(kapi.ListOptions{}) if err != nil { return err } - for _, netns := range netnsList { + for _, netns := range netnsList.Items { vmap.setVNID(netns.NetName, netns.NetID) // Skip GlobalVNID, not part of netID allocation range @@ -194,7 +196,7 @@ func (vmap *masterVNIDMap) updateNetID(nsName string, action osapi.PodNetworkAct } // assignVNID, revokeVNID and updateVNID methods updates in-memory structs and persists etcd objects -func (vmap *masterVNIDMap) assignVNID(registry *Registry, nsName string) error { +func (vmap *masterVNIDMap) assignVNID(osClient *osclient.Client, nsName string) error { vmap.lock.Lock() defer vmap.lock.Unlock() @@ -205,7 +207,13 @@ func (vmap *masterVNIDMap) assignVNID(registry *Registry, nsName string) error { if !exists { // Create NetNamespace Object and update vnid map - err = registry.CreateNetNamespace(nsName, netid) + netns := &osapi.NetNamespace{ + TypeMeta: unversioned.TypeMeta{Kind: "NetNamespace"}, + ObjectMeta: kapi.ObjectMeta{Name: nsName}, + NetName: nsName, + NetID: netid, + } + _, err := osClient.NetNamespaces().Create(netns) if err != nil { vmap.releaseNetID(nsName) return err @@ -214,12 +222,12 @@ func (vmap *masterVNIDMap) assignVNID(registry *Registry, nsName string) error { return nil } -func (vmap *masterVNIDMap) revokeVNID(registry *Registry, nsName string) error { +func (vmap *masterVNIDMap) revokeVNID(osClient *osclient.Client, nsName string) error { vmap.lock.Lock() defer vmap.lock.Unlock() // Delete NetNamespace object - if err := registry.DeleteNetNamespace(nsName); err != nil { + if err := osClient.NetNamespaces().Delete(nsName); err != nil { return err } @@ -229,7 +237,7 @@ func (vmap *masterVNIDMap) revokeVNID(registry *Registry, nsName string) error { return nil } -func (vmap *masterVNIDMap) updateVNID(registry *Registry, netns *osapi.NetNamespace) error { +func (vmap *masterVNIDMap) updateVNID(osClient *osclient.Client, netns *osapi.NetNamespace) error { action, args, err := osapi.GetChangePodNetworkAnnotation(netns) if err == osapi.ErrorPodNetworkAnnotationNotFound { // Nothing to update @@ -246,7 +254,7 @@ func (vmap *masterVNIDMap) updateVNID(registry *Registry, netns *osapi.NetNamesp netns.NetID = netid osapi.DeleteChangePodNetworkAnnotation(netns) - if _, err := registry.UpdateNetNamespace(netns); err != nil { + if _, err := osClient.NetNamespaces().Update(netns); err != nil { return err } return nil @@ -255,7 +263,7 @@ func (vmap *masterVNIDMap) updateVNID(registry *Registry, netns *osapi.NetNamesp //--------------------- Master methods ---------------------- func (master *OsdnMaster) VnidStartMaster() error { - err := master.vnids.populateVNIDs(master.registry) + err := master.vnids.populateVNIDs(master.osClient) if err != nil { return err } @@ -266,20 +274,18 @@ func (master *OsdnMaster) VnidStartMaster() error { } func (master *OsdnMaster) watchNamespaces() { - registry := master.registry - - registry.RunEventQueue(Namespaces, func(delta cache.Delta) error { + RunEventQueue(master.kClient, Namespaces, func(delta cache.Delta) error { ns := delta.Object.(*kapi.Namespace) name := ns.ObjectMeta.Name log.V(5).Infof("Watch %s event for Namespace %q", delta.Type, name) switch delta.Type { case cache.Sync, cache.Added, cache.Updated: - if err := master.vnids.assignVNID(registry, name); err != nil { + if err := master.vnids.assignVNID(master.osClient, name); err != nil { return fmt.Errorf("Error assigning netid: %v", err) } case cache.Deleted: - if err := master.vnids.revokeVNID(registry, name); err != nil { + if err := master.vnids.revokeVNID(master.osClient, name); err != nil { return fmt.Errorf("Error revoking netid: %v", err) } } @@ -288,16 +294,14 @@ func (master *OsdnMaster) watchNamespaces() { } func (master *OsdnMaster) watchNetNamespaces() { - registry := master.registry - - registry.RunEventQueue(NetNamespaces, func(delta cache.Delta) error { + RunEventQueue(master.osClient, NetNamespaces, func(delta cache.Delta) error { netns := delta.Object.(*osapi.NetNamespace) name := netns.ObjectMeta.Name log.V(5).Infof("Watch %s event for NetNamespace %q", delta.Type, name) switch delta.Type { case cache.Sync, cache.Added, cache.Updated: - err := master.vnids.updateVNID(registry, netns) + err := master.vnids.updateVNID(master.osClient, netns) if err != nil { return fmt.Errorf("Error updating netid: %v", err) } diff --git a/pkg/sdn/plugin/vnids_node.go b/pkg/sdn/plugin/vnids_node.go index 4cd03c31ffbe..58c1512d5988 100644 --- a/pkg/sdn/plugin/vnids_node.go +++ b/pkg/sdn/plugin/vnids_node.go @@ -14,6 +14,7 @@ import ( "k8s.io/kubernetes/pkg/util/sets" utilwait "k8s.io/kubernetes/pkg/util/wait" + osclient "github.com/openshift/origin/pkg/client" osapi "github.com/openshift/origin/pkg/sdn/api" ) @@ -117,13 +118,13 @@ func (vmap *nodeVNIDMap) unsetVNID(name string) (id uint32, err error) { return id, nil } -func (vmap *nodeVNIDMap) populateVNIDs(registry *Registry) error { - nets, err := registry.GetNetNamespaces() +func (vmap *nodeVNIDMap) populateVNIDs(osClient *osclient.Client) error { + nets, err := osClient.NetNamespaces().List(kapi.ListOptions{}) if err != nil { return err } - for _, net := range nets { + for _, net := range nets.Items { vmap.setVNID(net.Name, net.NetID) } return nil @@ -133,7 +134,7 @@ func (vmap *nodeVNIDMap) populateVNIDs(registry *Registry) error { func (node *OsdnNode) VnidStartNode() error { // Populate vnid map synchronously so that existing services can fetch vnid - err := node.vnids.populateVNIDs(node.registry) + err := node.vnids.populateVNIDs(node.osClient) if err != nil { return err } @@ -152,7 +153,7 @@ func (node *OsdnNode) updatePodNetwork(namespace string, oldNetID, netID uint32) if err != nil { return err } - services, err := node.registry.GetServicesForNamespace(namespace) + services, err := node.kClient.Services(namespace).List(kapi.ListOptions{}) if err != nil { return err } @@ -168,7 +169,11 @@ func (node *OsdnNode) updatePodNetwork(namespace string, oldNetID, netID uint32) } // Update OF rules for the old services in the namespace - for _, svc := range services { + for _, svc := range services.Items { + if !kapi.IsServiceIPSet(&svc) { + continue + } + if err = node.DeleteServiceRules(&svc); err != nil { log.Error(err) } @@ -186,7 +191,7 @@ func (node *OsdnNode) updatePodNetwork(namespace string, oldNetID, netID uint32) } func (node *OsdnNode) watchNetNamespaces() { - node.registry.RunEventQueue(NetNamespaces, func(delta cache.Delta) error { + RunEventQueue(node.osClient, NetNamespaces, func(delta cache.Delta) error { netns := delta.Object.(*osapi.NetNamespace) log.V(5).Infof("Watch %s event for NetNamespace %q", delta.Type, netns.ObjectMeta.Name) @@ -231,7 +236,7 @@ func isServiceChanged(oldsvc, newsvc *kapi.Service) bool { func (node *OsdnNode) watchServices() { services := make(map[string]*kapi.Service) - node.registry.RunEventQueue(Services, func(delta cache.Delta) error { + RunEventQueue(node.kClient, Services, func(delta cache.Delta) error { serv := delta.Object.(*kapi.Service) // Ignore headless services From eca63dabb73a39a6b0577defa9cc096dff48a17f Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Thu, 15 Sep 2016 20:00:44 -0400 Subject: [PATCH 5/5] Fix use of deprecated PodSpec field name --- pkg/sdn/plugin/node.go | 2 +- pkg/sdn/plugin/plugin.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sdn/plugin/node.go b/pkg/sdn/plugin/node.go index 65aa18174182..50f0e9feeb73 100644 --- a/pkg/sdn/plugin/node.go +++ b/pkg/sdn/plugin/node.go @@ -130,7 +130,7 @@ func (node *OsdnNode) Start() error { } func (node *OsdnNode) GetLocalPods(namespace string) ([]kapi.Pod, error) { - fieldSelector := fields.Set{"spec.host": node.hostName}.AsSelector() + fieldSelector := fields.Set{"spec.nodeName": node.hostName}.AsSelector() opts := kapi.ListOptions{ LabelSelector: labels.Everything(), FieldSelector: fieldSelector, diff --git a/pkg/sdn/plugin/plugin.go b/pkg/sdn/plugin/plugin.go index 0b592c076240..600a1a9a86e8 100644 --- a/pkg/sdn/plugin/plugin.go +++ b/pkg/sdn/plugin/plugin.go @@ -156,7 +156,7 @@ func getScriptError(output []byte) string { } func (plugin *OsdnNode) getPod(nodeName, namespace, podName string) (*kapi.Pod, error) { - fieldSelector := fields.Set{"spec.host": nodeName}.AsSelector() + fieldSelector := fields.Set{"spec.nodeName": nodeName}.AsSelector() opts := kapi.ListOptions{ LabelSelector: labels.Everything(), FieldSelector: fieldSelector,