You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,
LicensedundertheApacheLicense, Version2.0 (the"License");
youmaynotusethisfileexceptincompliancewith the License.
YoumayobtainacopyoftheLicenseathttp://www.apache.org/licenses/LICENSE-2.0Unlessrequiredbyapplicablelaworagreedtoinwriting, softwaredistributedundertheLicenseisdistributedonan"AS IS"BASIS,
WITHOUTWARRANTIESORCONDITIONSOFANYKIND, eitherexpressor implied.
SeetheLicenseforthespecificlanguagegoverningpermissionsandlimitationsundertheLicense.
*/packagecontrollerimport (
"context""fmt""net""time""k8s.io/api/core/v1""k8s.io/apimachinery/pkg/api/errors"metav1"k8s.io/apimachinery/pkg/apis/meta/v1""k8s.io/apimachinery/pkg/util/runtime""k8s.io/apimachinery/pkg/util/wait"corev1client"k8s.io/client-go/kubernetes/typed/core/v1""k8s.io/client-go/tools/record""k8s.io/client-go/util/retry""k8s.io/kubernetes/pkg/api/legacyscheme"api"k8s.io/kubernetes/pkg/apis/core""k8s.io/kubernetes/pkg/apis/core/v1/helper""k8s.io/kubernetes/pkg/registry/core/rangeallocation""k8s.io/kubernetes/pkg/registry/core/service/ipallocator"netutil"k8s.io/utils/net"
)
// Repair is a controller loop that periodically examines all service ClusterIP allocations// and logs any errors, and then sets the compacted and accurate list of all allocated IPs.//// Handles:// * Duplicate ClusterIP assignments caused by operator action or undetected race conditions// * ClusterIPs that do not match the currently configured range// * Allocations to services that were not actually created due to a crash or powerloss// * Migrates old versions of Kubernetes services into the atomic ipallocator model automatically//// Can be run at infrequent intervals, and is best performed on startup of the master.// Is level driven and idempotent - all valid ClusterIPs will be updated into the ipallocator// map at the end of a single execution loop if no race is encountered.//// TODO: allocate new IPs if necessary// TODO: perform repair?typeRepairstruct {
intervaltime.DurationserviceClientcorev1client.ServicesGetternetworkByFamilymap[v1.IPFamily]*net.IPNet// networks we operate on, by their familyallocatorByFamilymap[v1.IPFamily]rangeallocation.RangeRegistry// allocators we use, by their familyleaksByFamilymap[v1.IPFamily]map[string]int // counter per leaked IP per familyrecorderrecord.EventRecorder
}
// How many times we need to detect a leak before we clean up. This is to// avoid races between allocating an IP and using it.constnumRepairsBeforeLeakCleanup=3// NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster// and generates informational warnings for a cluster that is not in sync.funcNewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient corev1client.EventsGetter, network*net.IPNet, alloc rangeallocation.RangeRegistry, secondaryNetwork*net.IPNet, secondaryAlloc rangeallocation.RangeRegistry) *Repair {
eventBroadcaster:=record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: eventClient.Events("")})
recorder:=eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "ipallocator-repair-controller"})
// build *ByFamily struct membersnetworkByFamily:=make(map[v1.IPFamily]*net.IPNet)
allocatorByFamily:=make(map[v1.IPFamily]rangeallocation.RangeRegistry)
leaksByFamily:=make(map[v1.IPFamily]map[string]int)
primary:=v1.IPv4Protocolsecondary:=v1.IPv6Protocolifnetutil.IsIPv6(network.IP) {
primary=v1.IPv6Protocol
}
networkByFamily[primary] =networkallocatorByFamily[primary] =allocleaksByFamily[primary] =make(map[string]int)
ifsecondaryNetwork!=nil&&secondaryNetwork.IP!=nil {
ifprimary==v1.IPv6Protocol {
secondary=v1.IPv4Protocol
}
networkByFamily[secondary] =secondaryNetworkallocatorByFamily[secondary] =secondaryAllocleaksByFamily[secondary] =make(map[string]int)
}
return&Repair{
interval: interval,
serviceClient: serviceClient,
networkByFamily: networkByFamily,
allocatorByFamily: allocatorByFamily,
leaksByFamily: leaksByFamily,
recorder: recorder,
}
}
// RunUntil starts the controller until the provided ch is closed.func (c*Repair) RunUntil(chchanstruct{}) {
wait.Until(func() {
iferr:=c.RunOnce(); err!=nil {
runtime.HandleError(err)
}
}, c.interval, ch)
}
// RunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.func (c*Repair) RunOnce() error {
returnretry.RetryOnConflict(retry.DefaultBackoff, c.runOnce)
}
// runOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.func (c*Repair) runOnce() error {
// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,// or if they are executed against different leaders,// the ordering guarantee required to ensure no IP is allocated twice is violated.// ListServices must return a ResourceVersion higher than the etcd index Get triggers,// and the release code must not release services that have had IPs allocated but not yet been created// See #8295// If etcd server is not running we should wait for some time and fail only then. This is particularly// important when we start apiserver and etcd at the same time.snapshotByFamily:=make(map[v1.IPFamily]*api.RangeAllocation)
storedByFamily:=make(map[v1.IPFamily]ipallocator.Interface)
err:=wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
forfamily, allocator:=rangec.allocatorByFamily {
// get snapshot if it is not thereif_, ok:=snapshotByFamily[family]; !ok {
snapshot, err:=allocator.Get()
iferr!=nil {
returnfalse, err
}
snapshotByFamily[family] =snapshot
}
}
returntrue, nil
})
iferr!=nil {
returnfmt.Errorf("unable to refresh the service IP block: %v", err)
}
// ensure that ranges are assignedforfamily, snapshot:=rangesnapshotByFamily {
ifsnapshot.Range=="" {
snapshot.Range=c.networkByFamily[family].String()
}
}
// Create an allocator because it is easy to use.forfamily, snapshot:=rangesnapshotByFamily {
stored, err:=ipallocator.NewFromSnapshot(snapshot)
iferr!=nil {
returnfmt.Errorf("unable to rebuild allocator from snapshots for family:%v with error:%v", family, err)
}
storedByFamily[family] =stored
}
rebuiltByFamily:=make(map[v1.IPFamily]*ipallocator.Range)
forfamily, network:=rangec.networkByFamily {
rebuilt, err:=ipallocator.NewCIDRRange(network)
iferr!=nil {
returnfmt.Errorf("unable to create CIDR range for family %v: %v", family, err)
}
rebuiltByFamily[family] =rebuilt
}
// We explicitly send no resource version, since the resource version// of 'snapshot' is from a different collection, it's not comparable to// the service collection. The caching layer keeps per-collection RVs,// and this is proper, since in theory the collections could be hosted// in separate etcd (or even non-etcd) instances.list, err:=c.serviceClient.Services(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
iferr!=nil {
returnfmt.Errorf("unable to refresh the service IP block: %v", err)
}
getFamilyByIP:=func(ip net.IP) v1.IPFamily {
ifnetutil.IsIPv6(ip) {
returnv1.IPv6Protocol
}
returnv1.IPv4Protocol
}
// Check every Service's ClusterIP, and rebuild the state as we think it should be.for _, svc:=rangelist.Items {
if!helper.IsServiceIPSet(&svc) {
// didn't need a cluster IPcontinue
}
for _, ip:=rangesvc.Spec.ClusterIPs {
ip:=net.ParseIP(ip)
ifip==nil {
// cluster IP is corruptc.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotValid", "Cluster IP %s is not a valid IP; please recreate service", ip)
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", ip, svc.Name, svc.Namespace))
continue
}
family:=getFamilyByIP(ip)
if _, ok:=rebuiltByFamily[family]; !ok {
// this service is using an IPFamily no longer configured on clusterc.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotValid", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate service", ip, family)
runtime.HandleError(fmt.Errorf("the cluster IP %s(%s) for service %s/%s is of ip family that is no longer configured on cluster; please recreate", ip, family, svc.Name, svc.Namespace))
continue
}
// mark it as in-useactualAlloc:=rebuiltByFamily[family]
switcherr:=actualAlloc.Allocate(ip); err {
casenil:
actualStored:=storedByFamily[family]
ifactualStored.Has(ip) {
// remove it from the old set, so we can find leaksactualStored.Release(ip)
} else {
// cluster IP doesn't seem to be allocatedc.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotAllocated", "Cluster IP [%v]:%s is not allocated; repairing", family, ip)
runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not allocated; repairing", family, ip, svc.Name, svc.Namespace))
}
delete(c.leaksByFamily[family], ip.String()) // it is used, so it can't be leakedcaseipallocator.ErrAllocated:
// cluster IP is duplicatec.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip)
runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s was assigned to multiple services; please recreate", family, ip, svc.Name, svc.Namespace))
caseerr.(*ipallocator.ErrNotInRange):
// cluster IP is out of rangec.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPOutOfRange", "Cluster IP [%v]:%s is not within the service CIDR %s; please recreate service", family, ip, c.networkByFamily[family])
runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not within the service CIDR %s; please recreate", family, ip, svc.Name, svc.Namespace, c.networkByFamily[family]))
caseipallocator.ErrFull:
// somehow we are out of IPscidr:=actualAlloc.CIDR()
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ServiceCIDRFull", "Service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip)
returnfmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip)
default:
c.recorder.Eventf(&svc, v1.EventTypeWarning, "UnknownError", "Unable to allocate cluster IP [%v]:%s due to an unknown error", family, ip)
returnfmt.Errorf("unable to allocate cluster IP [%v]:%s for service %s/%s due to an unknown error, exiting: %v", family, ip, svc.Name, svc.Namespace, err)
}
}
}
// leak checkforfamily, leaks:=rangec.leaksByFamily {
c.checkLeaked(leaks, storedByFamily[family], rebuiltByFamily[family])
}
// save logic// Blast the rebuilt state into storage.forfamily, rebuilt:=rangerebuiltByFamily {
err=c.saveSnapShot(rebuilt, c.allocatorByFamily[family], snapshotByFamily[family])
iferr!=nil {
returnerr
}
}
returnnil
}
func (c*Repair) saveSnapShot(rebuilt*ipallocator.Range, allocrangeallocation.RangeRegistry, snapshot*api.RangeAllocation) error {
iferr:=rebuilt.Snapshot(snapshot); err!=nil {
returnfmt.Errorf("unable to snapshot the updated service IP allocations: %v", err)
}
iferr:=alloc.CreateOrUpdate(snapshot); err!=nil {
iferrors.IsConflict(err) {
returnerr
}
returnfmt.Errorf("unable to persist the updated service IP allocations: %v", err)
}
returnnil
}
func (c*Repair) checkLeaked(leaksmap[string]int, storedipallocator.Interface, rebuilt*ipallocator.Range) {
// Check for IPs that are left in the old set. They appear to have been leaked.stored.ForEach(func(ip net.IP) {
count, found:=leaks[ip.String()]
switch {
case!found:
// flag it to be cleaned up after any races (hopefully) are goneruntime.HandleError(fmt.Errorf("the cluster IP %s may have leaked: flagging for later clean up", ip))
count=numRepairsBeforeLeakCleanup-1fallthroughcasecount>0:
// pretend it is still in use until count expiresleaks[ip.String()] =count-1iferr:=rebuilt.Allocate(ip); err!=nil {
runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked, but can not be allocated: %v", ip, err))
}
default:
// do not add it to the rebuilt set, which means it will be available for reuseruntime.HandleError(fmt.Errorf("the cluster IP %s appears to have leaked: cleaning up", ip))
}
})
}
ewfilemode100644ndex0000000000000..5f41441fc9a87++b/pkg/registry/core/service/ipallocator/controller/repair_test.go
b978db2e414b90fb8ac73a42138c158aa940c3b2
The text was updated successfully, but these errors were encountered:
[(per smarterclayton) if Get() or ListServices(] if Get() or ListServices() is a weak consistency read,
or if they are executed against different leaders,
the ordering guarantee required to ensure no IP is allocated twice is violated.
ListServices must return a ResourceVersion higher than the etcd index Get triggers,
and the release code must not release services that have had IPs allocated but not yet been created
See kubernetes#8295
kubernetes/pkg/registry/core/service/ipallocator/controller/repair.go
Line 130 in dcda038
b978db2e414b90fb8ac73a42138c158aa940c3b2
The text was updated successfully, but these errors were encountered: