From 6a210e16f645ff56937de5b4f8df2c3f97b8e910 Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Wed, 7 Sep 2016 10:10:00 -0700 Subject: [PATCH] Use complete port configs when plumbing mark rules Currently, a reference counting scheme is used to reference count all individual port configs that need to be plumbed in the ingress to make sure that in situations where a service with the same set of port configs is getting added or removed doesn't accidentally remove the port config plumbing if the add/remove notifications come out of order. This same reference counting scheme is also used for plumbing the port-based marking rules. But marking rules should not be plumbed based on that because marks are always different for different instantiations of the same service. So fixed the code to plumb port-based mark rules based on the complete set of port configs, while plumbing pure port rules and proxies based on a filter set of port configs based on the reference count. Signed-off-by: Jana Radhakrishnan --- service_linux.go | 115 +++++++++++++++++++++++++++++++---------------- 1 file changed, 76 insertions(+), 39 deletions(-) diff --git a/service_linux.go b/service_linux.go index 3fafa9af44..e9d09fe40d 100644 --- a/service_linux.go +++ b/service_linux.go @@ -380,17 +380,17 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P } if addService { - var iPorts []*PortConfig + var filteredPorts []*PortConfig if sb.ingress { - iPorts = filterPortConfigs(ingressPorts, false) - if err := programIngress(gwIP, iPorts, false); err != nil { + filteredPorts = filterPortConfigs(ingressPorts, false) + if err := programIngress(gwIP, filteredPorts, false); err != nil { logrus.Errorf("Failed to add ingress: %v", err) return } } - logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, iPorts) - if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, false); err != nil { + logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts) + if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, false); err != nil { logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) return } @@ -453,15 +453,15 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err) } - var iPorts []*PortConfig + var filteredPorts []*PortConfig if sb.ingress { - iPorts = filterPortConfigs(ingressPorts, true) - if err := programIngress(gwIP, iPorts, true); err != nil { + filteredPorts = filterPortConfigs(ingressPorts, true) + if err := programIngress(gwIP, filteredPorts, true); err != nil { logrus.Errorf("Failed to delete ingress: %v", err) } } - if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil { + if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, true); err != nil { logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) } } @@ -715,33 +715,66 @@ func plumbProxy(iPort *PortConfig, isDelete bool) error { return nil } +func writePortsToFile(ports []*PortConfig) (string, error) { + f, err := ioutil.TempFile("", "port_configs") + if err != nil { + return "", err + } + defer f.Close() + + buf, err := proto.Marshal(&EndpointRecord{ + IngressPorts: ports, + }) + + n, err := f.Write(buf) + if err != nil { + return "", err + } + + if n < len(buf) { + return "", io.ErrShortWrite + } + + return f.Name(), nil +} + +func readPortsFromFile(fileName string) ([]*PortConfig, error) { + buf, err := ioutil.ReadFile(fileName) + if err != nil { + return nil, err + } + + var epRec EndpointRecord + err = proto.Unmarshal(buf, &epRec) + if err != nil { + return nil, err + } + + return epRec.IngressPorts, nil +} + // Invoke fwmarker reexec routine to mark vip destined packets with // the passed firewall mark. -func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error { - var ingressPortsFile string +func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, filteredPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error { + var ( + ingressPortsFile string + filteredPortsFile string + ) + if len(ingressPorts) != 0 { - f, err := ioutil.TempFile("", "port_configs") + var err error + ingressPortsFile, err = writePortsToFile(ingressPorts) if err != nil { return err } + } - buf, err := proto.Marshal(&EndpointRecord{ - IngressPorts: ingressPorts, - }) - - n, err := f.Write(buf) + if len(filteredPorts) != 0 { + var err error + filteredPortsFile, err = writePortsToFile(filteredPorts) if err != nil { - f.Close() return err } - - if n < len(buf) { - f.Close() - return io.ErrShortWrite - } - - ingressPortsFile = f.Name() - f.Close() } addDelOpt := "-A" @@ -751,7 +784,7 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port cmd := &exec.Cmd{ Path: reexec.Self(), - Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()), + Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, filteredPortsFile, eIP.String()), Stdout: os.Stdout, Stderr: os.Stderr, } @@ -768,27 +801,29 @@ func fwMarker() { runtime.LockOSThread() defer runtime.UnlockOSThread() - if len(os.Args) < 7 { + if len(os.Args) < 8 { logrus.Error("invalid number of arguments..") os.Exit(1) } var ingressPorts []*PortConfig + var filteredPorts []*PortConfig if os.Args[5] != "" { - buf, err := ioutil.ReadFile(os.Args[5]) + var err error + ingressPorts, err = readPortsFromFile(os.Args[5]) if err != nil { - logrus.Errorf("Failed to read ports config file: %v", err) + logrus.Errorf("Failed reading ingress ports file: %v", err) os.Exit(6) } + } - var epRec EndpointRecord - err = proto.Unmarshal(buf, &epRec) + if os.Args[6] != "" { + var err error + filteredPorts, err = readPortsFromFile(os.Args[6]) if err != nil { - logrus.Errorf("Failed to unmarshal ports config data: %v", err) + logrus.Errorf("Failed reading filtered ports file: %v", err) os.Exit(7) } - - ingressPorts = epRec.IngressPorts } vip := os.Args[2] @@ -800,12 +835,14 @@ func fwMarker() { addDelOpt := os.Args[4] rules := [][]string{} - for _, iPort := range ingressPorts { + for _, iPort := range filteredPorts { rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d", addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort)) rules = append(rules, rule) + } - rule = strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d", + for _, iPort := range ingressPorts { + rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d", addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark)) rules = append(rules, rule) } @@ -823,9 +860,9 @@ func fwMarker() { } if addDelOpt == "-A" { - eIP, subnet, err := net.ParseCIDR(os.Args[6]) + eIP, subnet, err := net.ParseCIDR(os.Args[7]) if err != nil { - logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err) + logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[7], err) os.Exit(9) }