Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Allow LB to work for apps with IP per task #144

Merged
merged 2 commits into from
Apr 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea
templates/
__pycache__
*.pyc
169 changes: 122 additions & 47 deletions marathon_lb.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
### Command Line Usage
"""

from logging.handlers import SysLogHandler
from operator import attrgetter
from shutil import move
from tempfile import mkstemp
Expand All @@ -44,8 +43,8 @@
import socket
import time
import dateutil.parser
import math
import threading
import traceback
import random
import hashlib

Expand All @@ -54,16 +53,36 @@

class MarathonBackend(object):

def __init__(self, host, port, draining):
def __init__(self, host, ip, port, draining):
self.host = host
"""
The host that is running this task.
"""

self.ip = ip
"""
The IP address used to access the task. For tasks using IP-per-task,
this is the actual IP address of the task; otherwise, it is the IP
address resolved from the hostname.
"""

self.port = port
"""
The port used to access a particular service on a task. For tasks
using IP-per-task, this is the actual port exposed by the task;
otherwise, it is the port exposed on the host.
"""

self.draining = draining
"""
Whether we should be draining access to this task in the LB.
"""

def __hash__(self):
return hash((self.host, self.port))

def __repr__(self):
return "MarathonBackend(%r, %r)" % (self.host, self.port)
return "MarathonBackend(%r, %r, %r)" % (self.host, self.ip, self.port)


class MarathonService(object):
Expand Down Expand Up @@ -93,8 +112,8 @@ def __init__(self, appId, servicePort, healthCheck):
if healthCheck['protocol'] == 'HTTP':
self.mode = 'http'

def add_backend(self, host, port, draining):
self.backends.add(MarathonBackend(host, port, draining))
def add_backend(self, host, ip, port, draining):
self.backends.add(MarathonBackend(host, ip, port, draining))

def __hash__(self):
return hash(self.servicePort)
Expand Down Expand Up @@ -408,12 +427,25 @@ def config(apps, groups, bind_http_https, ssl_certs, templater):
key_func = attrgetter('host', 'port')
for backendServer in sorted(app.backends, key=key_func):
logger.debug(
"backend server at %s:%d",
backendServer.host,
backendServer.port)
serverName = re.sub(
r'[^a-zA-Z0-9\-]', '_',
backendServer.host + '_' + str(backendServer.port))
"backend server %s:%d on %s",
backendServer.ip,
backendServer.port,
backendServer.host)

# Create a unique, friendly name for the backend server. We concat
# the host, task IP and task port together. If the host and task
# IP are actually the same then omit one for clarity.
if backendServer.host != backendServer.ip:
serverName = re.sub(
r'[^a-zA-Z0-9\-]', '_',
(backendServer.host + '_' +
backendServer.ip + '_' +
str(backendServer.port)))
else:
serverName = re.sub(
r'[^a-zA-Z0-9\-]', '_',
(backendServer.ip + '_' +
str(backendServer.port)))
shortHashedServerName = hashlib.sha1(serverName.encode()) \
.hexdigest()[:10]

Expand Down Expand Up @@ -449,26 +481,19 @@ def config(apps, groups, bind_http_https, ssl_certs, templater):
healthCheckPortOptions=' port ' +
str(healthCheckPort) if healthCheckPort else ''
)
ipv4 = resolve_ip(backendServer.host)

if ipv4 is not None:
backend_server_options = templater \
.haproxy_backend_server_options(app)
backends += backend_server_options.format(
host=backendServer.host,
host_ipv4=ipv4,
port=backendServer.port,
serverName=serverName,
cookieOptions=' check cookie ' +
shortHashedServerName if app.sticky else '',
healthCheckOptions=healthCheckOptions
if healthCheckOptions else '',
otherOptions=' disabled' if backendServer.draining else ''
)
else:
logger.warning("Could not resolve ip for host %s, "
"ignoring this backend",
backendServer.host)
backend_server_options = templater \
.haproxy_backend_server_options(app)
backends += backend_server_options.format(
host=backendServer.host,
host_ipv4=backendServer.ip,
port=backendServer.port,
serverName=serverName,
cookieOptions=' check cookie ' +
shortHashedServerName if app.sticky else '',
healthCheckOptions=healthCheckOptions
if healthCheckOptions else '',
otherOptions=' disabled' if backendServer.draining else ''
)

if bind_http_https:
config += http_frontends
Expand Down Expand Up @@ -790,6 +815,55 @@ def get_health_check(app, portIndex):
return None


def is_ip_per_task(app):
"""
Return whether the application is using IP-per-task.
:param app: The application to check.
:return: True if using IP per task, False otherwise.
"""
return app.get('ipAddress') is not None


def get_task_ip_and_ports(app, task):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you write a test around this new function?

"""
Return the IP address and list of ports used to access a task. For a
task using IP-per-task, this is the IP address of the task, and the ports
exposed by the task services. Otherwise, this is the IP address of the
host and the ports exposed by the host.
:param app: The application owning the task.
:param task: The task.
:return: Tuple of (ip address, [ports]). Returns (None, None) if no IP
address could be resolved or found for the task.
"""
# If the app ipAddress field is present and not None then this app is using
# IP per task. The ipAddress may be an empty dictionary though, in which
# case there are no discovery ports. At the moment, Mesos only supports a
# single IP address, so just take the first IP in the list.
if is_ip_per_task(app):
logger.debug("Using IP per container")
task_ip_addresses = task.get('ipAddresses')
if not task_ip_addresses:
logger.warning("Task %s does not yet have an ip address allocated",
task['id'])
return None, None
task_ip = task_ip_addresses[0]['ipAddress']

discovery = app['ipAddress'].get('discovery', {})
task_ports = [int(port['number'])
for port in discovery.get('ports', [])]
else:
logger.debug("Using host port mapping")
task_ports = task.get('ports', [])
task_ip = resolve_ip(task['host'])
if not task_ip:
logger.warning("Could not resolve ip for host %s, ignoring",
task['host'])
return None, None

logger.debug("Returning: %r, %r", task_ip, task_ports)
return task_ip, task_ports


def get_apps(marathon):
apps = marathon.list()
logger.debug("got apps %s", [app["id"] for app in apps])
Expand Down Expand Up @@ -839,11 +913,11 @@ def get_apps(marathon):
target_instances = \
int(new['labels']['HAPROXY_DEPLOYMENT_TARGET_INSTANCES'])

# mark N tasks from old app as draining, where N is the
# number of instances in the new app
old_tasks = sorted(old['tasks'],
key=lambda task: task['host'] +
":" + str(task['ports']))
# Mark N tasks from old app as draining, where N is the
# number of instances in the new app. Sort the old tasks so that
# order is deterministic (i.e. so that we always drain the same
# tasks).
old_tasks = sorted(old['tasks'], key=lambda task: task['id'])

healthy_new_instances = 0
if len(app['healthChecks']) > 0:
Expand Down Expand Up @@ -909,7 +983,7 @@ def get_apps(marathon):

for task in app['tasks']:
# Marathon 0.7.6 bug workaround
if len(task['host']) == 0:
if not task['host']:
logger.warning("Ignoring Marathon task without host " +
task['id'])
continue
Expand All @@ -925,22 +999,21 @@ def get_apps(marathon):
if not alive:
continue

task_ports = task.get('ports', [])
draining = False
if 'draining' in task:
draining = task['draining']
task_ip, task_ports = get_task_ip_and_ports(app, task)
if not task_ip:
logger.warning("Task has no resolvable IP address - skip")
continue

draining = task.get('draining', False)

# if different versions of app have different number of ports,
# try to match as many ports as possible
number_of_defined_ports = min(len(task_ports), len(service_ports))

for i in range(number_of_defined_ports):
task_port = task_ports[i]
service_port = service_ports[i]
for task_port, service_port in zip(task_ports, service_ports):
service = marathon_app.services.get(service_port, None)
if service:
service.groups = marathon_app.groups
service.add_backend(task['host'],
task_ip,
task_port,
draining)

Expand All @@ -950,6 +1023,7 @@ def get_apps(marathon):
for service in list(marathon_app.services.values()):
if service.backends:
apps_list.append(service)

return apps_list


Expand Down Expand Up @@ -1156,6 +1230,7 @@ def process_sse_events(marathon, config_file, groups,
except:
print(event.data)
print("Unexpected error:", sys.exc_info()[0])
traceback.print_stack()
raise
finally:
processor.stop()
Expand Down
Loading