Skip to content

Commit

Permalink
Merge pull request #1432 from mrjana/lb
Browse files Browse the repository at this point in the history
Use complete port configs when plumbing mark rules
  • Loading branch information
mavenugo authored Sep 22, 2016
2 parents 892324f + 6d44146 commit f4de3a4
Showing 1 changed file with 141 additions and 43 deletions.
184 changes: 141 additions & 43 deletions service_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

func init() {
reexec.Register("fwmarker", fwMarker)
reexec.Register("redirecter", redirecter)
}

func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service {
Expand Down Expand Up @@ -275,6 +276,12 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
n := ep.getNetwork()
eIP := ep.Iface().Address()

if n.ingress {
if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil {
logrus.Errorf("Failed to add redirect rules for ep %s: %v", ep.Name(), err)
}
}

if sb.ingress {
// For the ingress sandbox if this is not gateway
// endpoint do nothing.
Expand Down Expand Up @@ -380,17 +387,17 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
}

if addService {
var iPorts []*PortConfig
var filteredPorts []*PortConfig
if sb.ingress {
iPorts = filterPortConfigs(ingressPorts, false)
if err := programIngress(gwIP, iPorts, false); err != nil {
filteredPorts = filterPortConfigs(ingressPorts, false)
if err := programIngress(gwIP, filteredPorts, false); err != nil {
logrus.Errorf("Failed to add ingress: %v", err)
return
}
}

logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, iPorts)
if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, false); err != nil {
logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts)
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, false); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
return
}
Expand Down Expand Up @@ -453,15 +460,15 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err)
}

var iPorts []*PortConfig
var filteredPorts []*PortConfig
if sb.ingress {
iPorts = filterPortConfigs(ingressPorts, true)
if err := programIngress(gwIP, iPorts, true); err != nil {
filteredPorts = filterPortConfigs(ingressPorts, true)
if err := programIngress(gwIP, filteredPorts, true); err != nil {
logrus.Errorf("Failed to delete ingress: %v", err)
}
}

if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil {
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, true); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
}
}
Expand Down Expand Up @@ -715,33 +722,57 @@ func plumbProxy(iPort *PortConfig, isDelete bool) error {
return nil
}

func writePortsToFile(ports []*PortConfig) (string, error) {
f, err := ioutil.TempFile("", "port_configs")
if err != nil {
return "", err
}
defer f.Close()

buf, err := proto.Marshal(&EndpointRecord{
IngressPorts: ports,
})

n, err := f.Write(buf)
if err != nil {
return "", err
}

if n < len(buf) {
return "", io.ErrShortWrite
}

return f.Name(), nil
}

func readPortsFromFile(fileName string) ([]*PortConfig, error) {
buf, err := ioutil.ReadFile(fileName)
if err != nil {
return nil, err
}

var epRec EndpointRecord
err = proto.Unmarshal(buf, &epRec)
if err != nil {
return nil, err
}

return epRec.IngressPorts, nil
}

// Invoke fwmarker reexec routine to mark vip destined packets with
// the passed firewall mark.
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
var ingressPortsFile string
if len(ingressPorts) != 0 {
f, err := ioutil.TempFile("", "port_configs")
if err != nil {
return err
}

buf, err := proto.Marshal(&EndpointRecord{
IngressPorts: ingressPorts,
})

n, err := f.Write(buf)
if len(ingressPorts) != 0 {
var err error
ingressPortsFile, err = writePortsToFile(ingressPorts)
if err != nil {
f.Close()
return err
}

if n < len(buf) {
f.Close()
return io.ErrShortWrite
}

ingressPortsFile = f.Name()
f.Close()
defer os.Remove(ingressPortsFile)
}

addDelOpt := "-A"
Expand Down Expand Up @@ -775,20 +806,12 @@ func fwMarker() {

var ingressPorts []*PortConfig
if os.Args[5] != "" {
buf, err := ioutil.ReadFile(os.Args[5])
var err error
ingressPorts, err = readPortsFromFile(os.Args[5])
if err != nil {
logrus.Errorf("Failed to read ports config file: %v", err)
logrus.Errorf("Failed reading ingress ports file: %v", err)
os.Exit(6)
}

var epRec EndpointRecord
err = proto.Unmarshal(buf, &epRec)
if err != nil {
logrus.Errorf("Failed to unmarshal ports config data: %v", err)
os.Exit(7)
}

ingressPorts = epRec.IngressPorts
}

vip := os.Args[2]
Expand All @@ -801,11 +824,7 @@ func fwMarker() {

rules := [][]string{}
for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
rules = append(rules, rule)

rule = strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark))
rules = append(rules, rule)
}
Expand Down Expand Up @@ -852,3 +871,82 @@ func fwMarker() {
}
}
}

func addRedirectRules(path string, eIP *net.IPNet, ingressPorts []*PortConfig) error {
var ingressPortsFile string

if len(ingressPorts) != 0 {
var err error
ingressPortsFile, err = writePortsToFile(ingressPorts)
if err != nil {
return err
}
defer os.Remove(ingressPortsFile)
}

cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"redirecter"}, path, eIP.String(), ingressPortsFile),
Stdout: os.Stdout,
Stderr: os.Stderr,
}

if err := cmd.Run(); err != nil {
return fmt.Errorf("reexec failed: %v", err)
}

return nil
}

// Redirecter reexec function.
func redirecter() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()

if len(os.Args) < 4 {
logrus.Error("invalid number of arguments..")
os.Exit(1)
}

var ingressPorts []*PortConfig
if os.Args[3] != "" {
var err error
ingressPorts, err = readPortsFromFile(os.Args[3])
if err != nil {
logrus.Errorf("Failed reading ingress ports file: %v", err)
os.Exit(2)
}
}

eIP, _, err := net.ParseCIDR(os.Args[2])
if err != nil {
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[2], err)
os.Exit(3)
}

rules := [][]string{}
for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t nat -A PREROUTING -d %s -p %s --dport %d -j REDIRECT --to-port %d",
eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
rules = append(rules, rule)
}

ns, err := netns.GetFromPath(os.Args[1])
if err != nil {
logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
os.Exit(4)
}
defer ns.Close()

if err := netns.Set(ns); err != nil {
logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
os.Exit(5)
}

for _, rule := range rules {
if err := iptables.RawCombinedOutputNative(rule...); err != nil {
logrus.Errorf("setting up rule failed, %v: %v", rule, err)
os.Exit(5)
}
}
}

0 comments on commit f4de3a4

Please sign in to comment.