Skip to content

Commit

Permalink
Merge pull request #10924 from danwinship/kill-registry
Browse files Browse the repository at this point in the history
Merged by openshift-bot
  • Loading branch information
OpenShift Bot authored Sep 27, 2016
2 parents a464734 + eca63da commit 4750429
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 441 deletions.
128 changes: 128 additions & 0 deletions pkg/sdn/plugin/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@ package plugin

import (
"fmt"
"net"
"strings"
"time"

"github.com/golang/glog"

osclient "github.com/openshift/origin/pkg/client"
osapi "github.com/openshift/origin/pkg/sdn/api"

kapi "k8s.io/kubernetes/pkg/api"
kcache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/fields"
)

func getPodContainerID(pod *kapi.Pod) string {
Expand All @@ -22,3 +29,124 @@ func getPodContainerID(pod *kapi.Pod) string {
func hostSubnetToString(subnet *osapi.HostSubnet) string {
return fmt.Sprintf("%s (host: %q, ip: %q, subnet: %q)", subnet.Name, subnet.Host, subnet.HostIP, subnet.Subnet)
}

func clusterNetworkToString(n *osapi.ClusterNetwork) string {
return fmt.Sprintf("%s (network: %q, hostSubnetBits: %d, serviceNetwork: %q, pluginName: %q)", n.Name, n.Network, n.HostSubnetLength, n.ServiceNetwork, n.PluginName)
}

type NetworkInfo struct {
ClusterNetwork *net.IPNet
ServiceNetwork *net.IPNet
}

func parseNetworkInfo(clusterNetwork string, serviceNetwork string) (*NetworkInfo, error) {
_, cn, err := net.ParseCIDR(clusterNetwork)
if err != nil {
return nil, fmt.Errorf("Failed to parse ClusterNetwork CIDR %s: %v", clusterNetwork, err)
}
_, sn, err := net.ParseCIDR(serviceNetwork)
if err != nil {
return nil, fmt.Errorf("Failed to parse ServiceNetwork CIDR %s: %v", serviceNetwork, err)
}

return &NetworkInfo{
ClusterNetwork: cn,
ServiceNetwork: sn,
}, nil
}

func (ni *NetworkInfo) validateNodeIP(nodeIP string) error {
if nodeIP == "" || nodeIP == "127.0.0.1" {
return fmt.Errorf("Invalid node IP %q", nodeIP)
}

// Ensure each node's NodeIP is not contained by the cluster network,
// which could cause a routing loop. (rhbz#1295486)
ipaddr := net.ParseIP(nodeIP)
if ipaddr == nil {
return fmt.Errorf("Failed to parse node IP %s", nodeIP)
}

if ni.ClusterNetwork.Contains(ipaddr) {
return fmt.Errorf("Node IP %s conflicts with cluster network %s", nodeIP, ni.ClusterNetwork.String())
}
if ni.ServiceNetwork.Contains(ipaddr) {
return fmt.Errorf("Node IP %s conflicts with service network %s", nodeIP, ni.ServiceNetwork.String())
}

return nil
}

func getNetworkInfo(osClient *osclient.Client) (*NetworkInfo, error) {
cn, err := osClient.ClusterNetwork().Get(osapi.ClusterNetworkDefault)
if err != nil {
return nil, err
}

return parseNetworkInfo(cn.Network, cn.ServiceNetwork)
}

type ResourceName string

const (
Nodes ResourceName = "Nodes"
Namespaces ResourceName = "Namespaces"
NetNamespaces ResourceName = "NetNamespaces"
Services ResourceName = "Services"
HostSubnets ResourceName = "HostSubnets"
Pods ResourceName = "Pods"
EgressNetworkPolicies ResourceName = "EgressNetworkPolicies"
)

// Run event queue for the given resource. The 'process' function is called
// repeatedly with each available cache.Delta that describes state changes
// to an object. If the process function returns an error queued changes
// for that object are dropped but processing continues with the next available
// object's cache.Deltas. The error is logged with call stack information.
func runEventQueueForResource(client kcache.Getter, resourceName ResourceName, expectedType interface{}, selector fields.Selector, process ProcessEventFunc) {
rn := strings.ToLower(string(resourceName))
lw := kcache.NewListWatchFromClient(client, rn, kapi.NamespaceAll, selector)
eventQueue := NewEventQueue(kcache.MetaNamespaceKeyFunc)
// Repopulate event queue every 30 mins
// Existing items in the event queue will have watch.Modified event type
kcache.NewReflector(lw, expectedType, eventQueue, 30*time.Minute).Run()

// Run the queue
for {
eventQueue.Pop(process)
}
}

// Run event queue for the given resource
func RunEventQueue(client kcache.Getter, resourceName ResourceName, process ProcessEventFunc) {
var expectedType interface{}

switch resourceName {
case HostSubnets:
expectedType = &osapi.HostSubnet{}
case NetNamespaces:
expectedType = &osapi.NetNamespace{}
case Nodes:
expectedType = &kapi.Node{}
case Namespaces:
expectedType = &kapi.Namespace{}
case Services:
expectedType = &kapi.Service{}
case Pods:
expectedType = &kapi.Pod{}
case EgressNetworkPolicies:
expectedType = &osapi.EgressNetworkPolicy{}
default:
glog.Fatalf("Unknown resource %s during initialization of event queue", resourceName)
}

runEventQueueForResource(client, resourceName, expectedType, fields.Everything(), process)
}

func RunLocalPodsEventQueue(client kcache.Getter, nodeName string, process ProcessEventFunc) {
if nodeName == "" {
glog.Fatalf("LocalPods resource requires a node name")
}

runEventQueueForResource(client, Pods, &kapi.Pod{}, fields.Set{"spec.host": nodeName}.AsSelector(), process)
}
10 changes: 5 additions & 5 deletions pkg/sdn/plugin/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (plugin *OsdnNode) SetupSDN(localSubnetCIDR, clusterNetworkCIDR, servicesNe
}

func (plugin *OsdnNode) updateEgressNetworkPolicyFailureLabel(failure bool) error {
node, err := plugin.registry.kClient.Nodes().Get(plugin.hostName)
node, err := plugin.kClient.Nodes().Get(plugin.hostName)
if err != nil {
return err
}
Expand All @@ -374,12 +374,12 @@ func (plugin *OsdnNode) updateEgressNetworkPolicyFailureLabel(failure bool) erro
delete(node.Labels, EgressNetworkPolicyFailureLabel)
}

_, err = plugin.registry.kClient.Nodes().UpdateStatus(node)
_, err = plugin.kClient.Nodes().UpdateStatus(node)
return err
}

func (plugin *OsdnNode) SetupEgressNetworkPolicy() error {
policies, err := plugin.registry.GetEgressNetworkPolicies()
policies, err := plugin.osClient.EgressNetworkPolicies(kapi.NamespaceAll).List(kapi.ListOptions{})
if err != nil {
if kapierrs.IsForbidden(err) {
// 1.3 node running with 1.2-bootstrapped policies
Expand All @@ -398,7 +398,7 @@ func (plugin *OsdnNode) SetupEgressNetworkPolicy() error {
}
}

for _, policy := range policies {
for _, policy := range policies.Items {
vnid, err := plugin.vnids.GetVNID(policy.Namespace)
if err != nil {
glog.Warningf("Could not find netid for namespace %q: %v", policy.Namespace, err)
Expand All @@ -419,7 +419,7 @@ func (plugin *OsdnNode) SetupEgressNetworkPolicy() error {
}

func (plugin *OsdnNode) watchEgressNetworkPolicies() {
plugin.registry.RunEventQueue(EgressNetworkPolicies, func(delta cache.Delta) error {
RunEventQueue(plugin.osClient, EgressNetworkPolicies, func(delta cache.Delta) error {
policy := delta.Object.(*osapi.EgressNetworkPolicy)

vnid, err := plugin.vnids.GetVNID(policy.Namespace)
Expand Down
85 changes: 54 additions & 31 deletions pkg/sdn/plugin/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@ import (

osclient "github.com/openshift/origin/pkg/client"
osconfigapi "github.com/openshift/origin/pkg/cmd/server/api"
osapi "github.com/openshift/origin/pkg/sdn/api"
"github.com/openshift/origin/pkg/util/netutils"

kapi "k8s.io/kubernetes/pkg/api"
kapiunversioned "k8s.io/kubernetes/pkg/api/unversioned"
kclient "k8s.io/kubernetes/pkg/client/unversioned"
kerrors "k8s.io/kubernetes/pkg/util/errors"
)

type OsdnMaster struct {
registry *Registry
kClient *kclient.Client
osClient *osclient.Client
networkInfo *NetworkInfo
subnetAllocator *netutils.SubnetAllocator
vnids *masterVNIDMap
}
Expand All @@ -28,30 +33,62 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie
log.Infof("Initializing SDN master of type %q", networkConfig.NetworkPluginName)

master := &OsdnMaster{
registry: newRegistry(osClient, kClient),
kClient: kClient,
osClient: osClient,
}

// Validate command-line/config parameters
ni, err := validateClusterNetwork(networkConfig.ClusterNetworkCIDR, networkConfig.HostSubnetLength, networkConfig.ServiceNetworkCIDR, networkConfig.NetworkPluginName)
var err error
master.networkInfo, err = parseNetworkInfo(networkConfig.ClusterNetworkCIDR, networkConfig.ServiceNetworkCIDR)
if err != nil {
return err
}

changed, net_err := master.isClusterNetworkChanged(ni)
if changed {
if err = master.validateNetworkConfig(ni); err != nil {
createConfig := false
updateConfig := false
cn, err := master.osClient.ClusterNetwork().Get(osapi.ClusterNetworkDefault)
if err == nil {
if master.networkInfo.ClusterNetwork.String() != cn.Network ||
networkConfig.HostSubnetLength != cn.HostSubnetLength ||
master.networkInfo.ServiceNetwork.String() != cn.ServiceNetwork ||
networkConfig.NetworkPluginName != cn.PluginName {
updateConfig = true
}
} else {
cn = &osapi.ClusterNetwork{
TypeMeta: kapiunversioned.TypeMeta{Kind: "ClusterNetwork"},
ObjectMeta: kapi.ObjectMeta{Name: osapi.ClusterNetworkDefault},
}
createConfig = true
}
if createConfig || updateConfig {
if err = master.validateNetworkConfig(); err != nil {
return err
}
if err = master.registry.UpdateClusterNetwork(ni); err != nil {
size, len := master.networkInfo.ClusterNetwork.Mask.Size()
if networkConfig.HostSubnetLength < 1 || networkConfig.HostSubnetLength >= uint32(len-size) {
return fmt.Errorf("invalid HostSubnetLength %d for network %s (must be from 1 to %d)", networkConfig.HostSubnetLength, networkConfig.ClusterNetworkCIDR, len-size)
}
cn.Network = master.networkInfo.ClusterNetwork.String()
cn.HostSubnetLength = networkConfig.HostSubnetLength
cn.ServiceNetwork = master.networkInfo.ServiceNetwork.String()
cn.PluginName = networkConfig.NetworkPluginName
}

if createConfig {
cn, err := master.osClient.ClusterNetwork().Create(cn)
if err != nil {
return err
}
} else if net_err != nil {
if err = master.registry.CreateClusterNetwork(ni); err != nil {
log.Infof("Created ClusterNetwork %s", clusterNetworkToString(cn))
} else if updateConfig {
cn, err := master.osClient.ClusterNetwork().Update(cn)
if err != nil {
return err
}
log.Infof("Updated ClusterNetwork %s", clusterNetworkToString(cn))
}

if err = master.SubnetStartMaster(ni.ClusterNetwork, networkConfig.HostSubnetLength); err != nil {
if err = master.SubnetStartMaster(master.networkInfo.ClusterNetwork, networkConfig.HostSubnetLength); err != nil {
return err
}

Expand All @@ -66,12 +103,13 @@ func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclie
return nil
}

func (master *OsdnMaster) validateNetworkConfig(ni *NetworkInfo) error {
func (master *OsdnMaster) validateNetworkConfig() error {
hostIPNets, err := netutils.GetHostIPNetworks([]string{TUN, LBR})
if err != nil {
return err
}

ni := master.networkInfo
errList := []error{}

// Ensure cluster and service network don't overlap with host networks
Expand All @@ -91,11 +129,11 @@ func (master *OsdnMaster) validateNetworkConfig(ni *NetworkInfo) error {
}

// Ensure each host subnet is within the cluster network
subnets, err := master.registry.GetSubnets()
subnets, err := master.osClient.HostSubnets().List(kapi.ListOptions{})
if err != nil {
return fmt.Errorf("Error in initializing/fetching subnets: %v", err)
}
for _, sub := range subnets {
for _, sub := range subnets.Items {
subnetIP, _, _ := net.ParseCIDR(sub.Subnet)
if subnetIP == nil {
errList = append(errList, fmt.Errorf("Failed to parse network address: %s", sub.Subnet))
Expand All @@ -107,30 +145,15 @@ func (master *OsdnMaster) validateNetworkConfig(ni *NetworkInfo) error {
}

// Ensure each service is within the services network
services, err := master.registry.GetServices()
services, err := master.kClient.Services(kapi.NamespaceAll).List(kapi.ListOptions{})
if err != nil {
return err
}
for _, svc := range services {
for _, svc := range services.Items {
if !ni.ServiceNetwork.Contains(net.ParseIP(svc.Spec.ClusterIP)) {
errList = append(errList, fmt.Errorf("Error: Existing service with IP: %s is not part of service network: %s", svc.Spec.ClusterIP, ni.ServiceNetwork.String()))
}
}

return kerrors.NewAggregate(errList)
}

func (master *OsdnMaster) isClusterNetworkChanged(curNetwork *NetworkInfo) (bool, error) {
oldNetwork, err := master.registry.GetNetworkInfo()
if err != nil {
return false, err
}

if curNetwork.ClusterNetwork.String() != oldNetwork.ClusterNetwork.String() ||
curNetwork.HostSubnetLength != oldNetwork.HostSubnetLength ||
curNetwork.ServiceNetwork.String() != oldNetwork.ServiceNetwork.String() ||
curNetwork.PluginName != oldNetwork.PluginName {
return true, nil
}
return false, nil
}
Loading

0 comments on commit 4750429

Please sign in to comment.