From ab555e2bc40e39d1e785cc91cfe2952d9cb6a3e2 Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Wed, 7 Sep 2016 10:10:00 -0700 Subject: [PATCH 1/2] Use complete port configs when plumbing mark rules Currently, a reference counting scheme is used to reference count all individual port configs that need to be plumbed in the ingress to make sure that in situations where a service with the same set of port configs is getting added or removed doesn't accidentally remove the port config plumbing if the add/remove notifications come out of order. This same reference counting scheme is also used for plumbing the port-based marking rules. But marking rules should not be plumbed based on that because marks are always different for different instantiations of the same service. So fixed the code to plumb port-based mark rules based on the complete set of port configs, while plumbing pure port rules and proxies based on a filter set of port configs based on the reference count. Signed-off-by: Jana Radhakrishnan --- service_linux.go | 115 +++++++++++++++++++++++++++++++---------------- 1 file changed, 76 insertions(+), 39 deletions(-) diff --git a/service_linux.go b/service_linux.go index 3fafa9af44..e9d09fe40d 100644 --- a/service_linux.go +++ b/service_linux.go @@ -380,17 +380,17 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P } if addService { - var iPorts []*PortConfig + var filteredPorts []*PortConfig if sb.ingress { - iPorts = filterPortConfigs(ingressPorts, false) - if err := programIngress(gwIP, iPorts, false); err != nil { + filteredPorts = filterPortConfigs(ingressPorts, false) + if err := programIngress(gwIP, filteredPorts, false); err != nil { logrus.Errorf("Failed to add ingress: %v", err) return } } - logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, iPorts) - if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, false); err != nil { + logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts) + if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, false); err != nil { logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) return } @@ -453,15 +453,15 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err) } - var iPorts []*PortConfig + var filteredPorts []*PortConfig if sb.ingress { - iPorts = filterPortConfigs(ingressPorts, true) - if err := programIngress(gwIP, iPorts, true); err != nil { + filteredPorts = filterPortConfigs(ingressPorts, true) + if err := programIngress(gwIP, filteredPorts, true); err != nil { logrus.Errorf("Failed to delete ingress: %v", err) } } - if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil { + if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, true); err != nil { logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) } } @@ -715,33 +715,66 @@ func plumbProxy(iPort *PortConfig, isDelete bool) error { return nil } +func writePortsToFile(ports []*PortConfig) (string, error) { + f, err := ioutil.TempFile("", "port_configs") + if err != nil { + return "", err + } + defer f.Close() + + buf, err := proto.Marshal(&EndpointRecord{ + IngressPorts: ports, + }) + + n, err := f.Write(buf) + if err != nil { + return "", err + } + + if n < len(buf) { + return "", io.ErrShortWrite + } + + return f.Name(), nil +} + +func readPortsFromFile(fileName string) ([]*PortConfig, error) { + buf, err := ioutil.ReadFile(fileName) + if err != nil { + return nil, err + } + + var epRec EndpointRecord + err = proto.Unmarshal(buf, &epRec) + if err != nil { + return nil, err + } + + return epRec.IngressPorts, nil +} + // Invoke fwmarker reexec routine to mark vip destined packets with // the passed firewall mark. -func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error { - var ingressPortsFile string +func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, filteredPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error { + var ( + ingressPortsFile string + filteredPortsFile string + ) + if len(ingressPorts) != 0 { - f, err := ioutil.TempFile("", "port_configs") + var err error + ingressPortsFile, err = writePortsToFile(ingressPorts) if err != nil { return err } + } - buf, err := proto.Marshal(&EndpointRecord{ - IngressPorts: ingressPorts, - }) - - n, err := f.Write(buf) + if len(filteredPorts) != 0 { + var err error + filteredPortsFile, err = writePortsToFile(filteredPorts) if err != nil { - f.Close() return err } - - if n < len(buf) { - f.Close() - return io.ErrShortWrite - } - - ingressPortsFile = f.Name() - f.Close() } addDelOpt := "-A" @@ -751,7 +784,7 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port cmd := &exec.Cmd{ Path: reexec.Self(), - Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()), + Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, filteredPortsFile, eIP.String()), Stdout: os.Stdout, Stderr: os.Stderr, } @@ -768,27 +801,29 @@ func fwMarker() { runtime.LockOSThread() defer runtime.UnlockOSThread() - if len(os.Args) < 7 { + if len(os.Args) < 8 { logrus.Error("invalid number of arguments..") os.Exit(1) } var ingressPorts []*PortConfig + var filteredPorts []*PortConfig if os.Args[5] != "" { - buf, err := ioutil.ReadFile(os.Args[5]) + var err error + ingressPorts, err = readPortsFromFile(os.Args[5]) if err != nil { - logrus.Errorf("Failed to read ports config file: %v", err) + logrus.Errorf("Failed reading ingress ports file: %v", err) os.Exit(6) } + } - var epRec EndpointRecord - err = proto.Unmarshal(buf, &epRec) + if os.Args[6] != "" { + var err error + filteredPorts, err = readPortsFromFile(os.Args[6]) if err != nil { - logrus.Errorf("Failed to unmarshal ports config data: %v", err) + logrus.Errorf("Failed reading filtered ports file: %v", err) os.Exit(7) } - - ingressPorts = epRec.IngressPorts } vip := os.Args[2] @@ -800,12 +835,14 @@ func fwMarker() { addDelOpt := os.Args[4] rules := [][]string{} - for _, iPort := range ingressPorts { + for _, iPort := range filteredPorts { rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d", addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort)) rules = append(rules, rule) + } - rule = strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d", + for _, iPort := range ingressPorts { + rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d", addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark)) rules = append(rules, rule) } @@ -823,9 +860,9 @@ func fwMarker() { } if addDelOpt == "-A" { - eIP, subnet, err := net.ParseCIDR(os.Args[6]) + eIP, subnet, err := net.ParseCIDR(os.Args[7]) if err != nil { - logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err) + logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[7], err) os.Exit(9) } From 6d441466098f220057bf0e35751c3f2ebcdcf7ed Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Wed, 21 Sep 2016 12:15:14 -0700 Subject: [PATCH 2/2] Delay port redirect until packet reaches container With port redirect in the ingress path happening before ipvs in the ingess sandbox, there is a chance of 5-tuple collision in the ipvs connection table for two entirely different services have different PublishedPorts but the same TargetPort. To disambiguate the ipvs connection table, delay the port redirect from PublishedPort to TargetPort until after the loadbalancing has happened in ipvs. To be specific, perform the redirect after the packet enters the real backend container namespace. Signed-off-by: Jana Radhakrishnan --- service_linux.go | 129 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 95 insertions(+), 34 deletions(-) diff --git a/service_linux.go b/service_linux.go index e9d09fe40d..5e3802a823 100644 --- a/service_linux.go +++ b/service_linux.go @@ -26,6 +26,7 @@ import ( func init() { reexec.Register("fwmarker", fwMarker) + reexec.Register("redirecter", redirecter) } func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service { @@ -275,6 +276,12 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) { n := ep.getNetwork() eIP := ep.Iface().Address() + if n.ingress { + if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil { + logrus.Errorf("Failed to add redirect rules for ep %s: %v", ep.Name(), err) + } + } + if sb.ingress { // For the ingress sandbox if this is not gateway // endpoint do nothing. @@ -390,7 +397,7 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P } logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts) - if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, false); err != nil { + if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, false); err != nil { logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) return } @@ -461,7 +468,7 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po } } - if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, true); err != nil { + if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, true); err != nil { logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) } } @@ -755,11 +762,8 @@ func readPortsFromFile(fileName string) ([]*PortConfig, error) { // Invoke fwmarker reexec routine to mark vip destined packets with // the passed firewall mark. -func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, filteredPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error { - var ( - ingressPortsFile string - filteredPortsFile string - ) +func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error { + var ingressPortsFile string if len(ingressPorts) != 0 { var err error @@ -767,14 +771,8 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port if err != nil { return err } - } - if len(filteredPorts) != 0 { - var err error - filteredPortsFile, err = writePortsToFile(filteredPorts) - if err != nil { - return err - } + defer os.Remove(ingressPortsFile) } addDelOpt := "-A" @@ -784,7 +782,7 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port cmd := &exec.Cmd{ Path: reexec.Self(), - Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, filteredPortsFile, eIP.String()), + Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()), Stdout: os.Stdout, Stderr: os.Stderr, } @@ -801,13 +799,12 @@ func fwMarker() { runtime.LockOSThread() defer runtime.UnlockOSThread() - if len(os.Args) < 8 { + if len(os.Args) < 7 { logrus.Error("invalid number of arguments..") os.Exit(1) } var ingressPorts []*PortConfig - var filteredPorts []*PortConfig if os.Args[5] != "" { var err error ingressPorts, err = readPortsFromFile(os.Args[5]) @@ -817,15 +814,6 @@ func fwMarker() { } } - if os.Args[6] != "" { - var err error - filteredPorts, err = readPortsFromFile(os.Args[6]) - if err != nil { - logrus.Errorf("Failed reading filtered ports file: %v", err) - os.Exit(7) - } - } - vip := os.Args[2] fwMark, err := strconv.ParseUint(os.Args[3], 10, 32) if err != nil { @@ -835,12 +823,6 @@ func fwMarker() { addDelOpt := os.Args[4] rules := [][]string{} - for _, iPort := range filteredPorts { - rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d", - addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort)) - rules = append(rules, rule) - } - for _, iPort := range ingressPorts { rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d", addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark)) @@ -860,9 +842,9 @@ func fwMarker() { } if addDelOpt == "-A" { - eIP, subnet, err := net.ParseCIDR(os.Args[7]) + eIP, subnet, err := net.ParseCIDR(os.Args[6]) if err != nil { - logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[7], err) + logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err) os.Exit(9) } @@ -889,3 +871,82 @@ func fwMarker() { } } } + +func addRedirectRules(path string, eIP *net.IPNet, ingressPorts []*PortConfig) error { + var ingressPortsFile string + + if len(ingressPorts) != 0 { + var err error + ingressPortsFile, err = writePortsToFile(ingressPorts) + if err != nil { + return err + } + defer os.Remove(ingressPortsFile) + } + + cmd := &exec.Cmd{ + Path: reexec.Self(), + Args: append([]string{"redirecter"}, path, eIP.String(), ingressPortsFile), + Stdout: os.Stdout, + Stderr: os.Stderr, + } + + if err := cmd.Run(); err != nil { + return fmt.Errorf("reexec failed: %v", err) + } + + return nil +} + +// Redirecter reexec function. +func redirecter() { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + if len(os.Args) < 4 { + logrus.Error("invalid number of arguments..") + os.Exit(1) + } + + var ingressPorts []*PortConfig + if os.Args[3] != "" { + var err error + ingressPorts, err = readPortsFromFile(os.Args[3]) + if err != nil { + logrus.Errorf("Failed reading ingress ports file: %v", err) + os.Exit(2) + } + } + + eIP, _, err := net.ParseCIDR(os.Args[2]) + if err != nil { + logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[2], err) + os.Exit(3) + } + + rules := [][]string{} + for _, iPort := range ingressPorts { + rule := strings.Fields(fmt.Sprintf("-t nat -A PREROUTING -d %s -p %s --dport %d -j REDIRECT --to-port %d", + eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort)) + rules = append(rules, rule) + } + + ns, err := netns.GetFromPath(os.Args[1]) + if err != nil { + logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err) + os.Exit(4) + } + defer ns.Close() + + if err := netns.Set(ns); err != nil { + logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err) + os.Exit(5) + } + + for _, rule := range rules { + if err := iptables.RawCombinedOutputNative(rule...); err != nil { + logrus.Errorf("setting up rule failed, %v: %v", rule, err) + os.Exit(5) + } + } +}