Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PSUPCLPL-15272] IPIP connectivity check rework #652

Merged
merged 5 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 57 additions & 26 deletions kubemarine/procedures/check_iaas.py
Original file line number Diff line number Diff line change
Expand Up @@ -1325,17 +1325,23 @@ def ipip_connectivity(cluster: KubernetesCluster) -> None:
raise TestWarn("Check cannot be completed", hint='\n'.join(skipped_msgs))

if failed_nodes:
raise TestFailure(f"Check firewall settings, IP in IP traffic is not allowed between nodes.",
hint='\n'.join(failed_nodes))
raise TestFailure(f"Check firewall settings for all nodes in the cluster, "
"IP in IP traffic is not allowed between nodes.", hint='\n'.join(failed_nodes))

if group.nodes_amount() == 2:
skipped_msgs.append("Change nodes order in 'cluster.yaml' and run the check "
"10 minutes later")
raise TestWarn("Check has been succeded for the second node but cannot be completed for "
"the first node", hint='\n'.join(skipped_msgs))

def check_ipip_tunnel(group: NodeGroup) -> Set[str]:

group_to_rollback = group
cluster = group.cluster

# Copy binaries to the nodes
random_port = str(random.randint(50000, 65535))
random_sport = str(random.randint(50000, 65535))
random_dport = str(random.randint(50000, 65535))
failed_nodes: Set[str] = set()
recv_cmd: Dict[str, str] = {}
trns_cmd: Dict[str, str] = {}
Expand All @@ -1350,51 +1356,76 @@ def check_ipip_tunnel(group: NodeGroup) -> Set[str]:
fake_addr = str(ipaddress.IPv4Address(int_ip))
# That is used as number of packets for transmitter
timeout = int(cluster.inventory['globals']['timeout_download'])
for node in group.get_ordered_members_configs_list():
host = node['internal_address']
# Transmitter start command
# Transmitter starts first and sends IPIP packets every 1 second until the timeout comes or
# the process is killed by terminating command
trns_item_cmd: List[str] = []
for node_item in group.get_ordered_members_configs_list():
if node_item['internal_address'] != host:
trns_item_cmd.append(f"nohup {ipip_check} -mode client -src {host} -int {fake_addr} "
f"-ext {node_item['internal_address']} -dport {random_port} "
f"-msg {msg} -timeout {timeout} > /dev/null 2>&1 & echo $! >> {ipip_check}.pid")
trns_cmd[host] = '& sudo '.join(trns_item_cmd)
# Receiver start command
# Receiver starts after the transmitter and try to get IPIP packets within 3 seconds from eache node
recv_cmd[host] = f"{ipip_check} -mode server -ext {host} -int {fake_addr} -dport {random_port} " \
f"-msg {msg} -timeout 3 2> /dev/null"
nodes_list = group.get_ordered_members_configs_list()
# The ring circuit is used for the procedure. Each node in the ring transmit IPIP packets to the next node in the ring
# and receive IPIP packets from the previous node of the ring.
# That makes check more robast to some IP filters implementation.
recv_neighbor_node: Dict[str, str] = {}
trns_neighbor_host = ""
if len(nodes_list) > 2:
node_number = 0
for node in nodes_list:
host = node['internal_address']
if node_number < len(nodes_list) - 1:
recv_neighbor_node[nodes_list[node_number + 1]['name']] = node['name']
trns_neighbor_host = nodes_list[node_number + 1]['internal_address']
else:
recv_neighbor_node[nodes_list[0]['name']] = node['name']
trns_neighbor_host = nodes_list[0]['internal_address']
# Transmitter start command
# Transmitter starts first and sends IPIP packets every 1 second until the timeout comes or
# the process is killed by terminating command
trns_cmd[host] = f"nohup {ipip_check} -mode client -src {host} -int {fake_addr} " \
f"-ext {trns_neighbor_host} -sport {random_sport} -dport {random_dport} " \
f"-msg {msg} -timeout {timeout} > /dev/null 2>&1 & echo $! >> {ipip_check}.pid"
# Receiver start command
# Receiver starts after the transmitter and try to get IPIP packets within 3 seconds from neighbor node
recv_cmd[host] = f"{ipip_check} -mode server -ext {host} -int {fake_addr} -sport {random_sport}" \
f" -dport {random_dport} -msg {msg} -timeout 3 2> /dev/null"
node_number += 1
else:
# Two nodes have only one transmitter and only one receiver
host = nodes_list[0]['internal_address']
recv_neighbor_node[nodes_list[1]['name']] = nodes_list[0]['name']
trns_neighbor_host = nodes_list[1]['internal_address']
trns_cmd[host] = f"nohup {ipip_check} -mode client -src {host} -int {fake_addr} " \
f"-ext {trns_neighbor_host} -sport {random_sport} -dport {random_dport} " \
f"-msg {msg} -timeout {timeout} > /dev/null 2>&1 & echo $! >> {ipip_check}.pid"
host = nodes_list[1]['internal_address']
recv_cmd[host] = f"{ipip_check} -mode server -ext {host} -int {fake_addr} -sport {random_sport} " \
f"-dport {random_dport} -msg {msg} -timeout 3 2> /dev/null"

try:
collector = CollectorCallback(group.cluster)
cluster.log.debug("Copy binaries to the nodes")
group.put(binary_check_path, f"{ipip_check}.gz")
group.sudo(f"gzip -d {ipip_check}.gz")
group.sudo(f"sudo chmod +x {ipip_check}")
# Run transmitters
# Run transmitters if it's applicable for node
cluster.log.debug("Run transmitters")
with group.new_executor() as exe:
for node_exe in exe.group.get_ordered_members_list():
host_int = node_exe.get_config()['internal_address']
node_exe.sudo(f"{trns_cmd[host_int]}")
# Run receivers and get results
if trns_cmd.get(host_int, ""):
node_exe.sudo(f"{trns_cmd[host_int]}")
# Run receivers and get results if it's applicable for node
cluster.log.debug("Run receivers")
with group.new_executor() as exe:
for node_exe in exe.group.get_ordered_members_list():
host_int = node_exe.get_config()['internal_address']
node_exe.sudo(f"{recv_cmd[host_int]}", warn=True, callback=collector)
if recv_cmd.get(host_int, ""):
node_exe.sudo(f"{recv_cmd[host_int]}", warn=True, callback=collector)

for host, item in collector.result.items():
node_name = cluster.get_node_name(host)
item_list: Set[str] = set()
if len(item.stdout) > 0:
for log_item in item.stdout.split("\n")[:-1]:
item_list.add(log_item)
for node in group.get_ordered_members_configs_list():
if node['internal_address'] not in item_list and node['connect_to'] != host:
failed_nodes.add(f"{node['name']} -> {node_name}")
# Check if the neighbor IP is in logs
trns_node = group.get_member_by_name(recv_neighbor_node[node_name]).get_config()
if trns_node['internal_address'] not in item_list:
failed_nodes.add(f"{trns_node['name']} -> {node_name}")

return failed_nodes
finally:
Expand Down
17 changes: 12 additions & 5 deletions kubemarine/resources/scripts/source/ipip_check/ipip_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ import (
var (
mode, msg, src, dstExt, dstInt string
srcIP, dstExtIP, dstIntIP net.IP
dport, timeout uint
sport, dport, timeout uint
)

func customUsage() {
fmt.Printf("Usage of %s:\n", os.Args[0])
fmt.Printf("%s -mode client -src 192.168.0.1 -ext 192.168.0.2 -int 240.0.0.1 -dport 54545 -msg Message -timeout 10\n",
fmt.Printf("%s -mode client -src 192.168.0.1 -ext 192.168.0.2 -int 240.0.0.1 -sport 45455 -dport 54545 -msg Message -timeout 10\n",
os.Args[0])
fmt.Printf("%s -mode server -ext 192.168.0.2 -int 240.0.0.1 -dport 54545 -msg Message -timeout 3\n",
fmt.Printf("%s -mode server -ext 192.168.0.2 -int 240.0.0.1 -sport 45455 -dport 54545 -msg Message -timeout 3\n",
os.Args[0])
fmt.Println("Where:")
flag.PrintDefaults()
fmt.Println("Note: Pay attention to the fact that some implementations of packet filters might includes rule that allows 'related' traffic (eg.: Security Groups implementation in OpenStack). That means the mode changing on the same host might lead to incorrect results of the check.")
}

func parseParam() error {
Expand All @@ -58,6 +59,7 @@ func parseParam() error {
flag.StringVar(&dstInt, "int", "", "Internal destination IP address")
// UDP port number (UDP Dst Port)
flag.UintVar(&dport, "dport", 65000, "Destination UDP port")
flag.UintVar(&sport, "sport", 53, "Source UDP port")
flag.UintVar(&timeout, "timeout", 0, "Operation timeout")
flag.StringVar(&msg, "msg", "", "Message as UDP payload")
flag.Parse()
Expand All @@ -76,6 +78,9 @@ func parseParam() error {
if dstIntIP == nil {
return errors.New("Internal destination address is invalid")
}
if sport > 65535 {
return errors.New("Source UDP port out of range")
}
if dport > 65535 {
return errors.New("Destination UDP port out of range")
}
Expand All @@ -90,6 +95,7 @@ func runSrv() {
dstExtIPaddr := net.IPAddr{
IP: dstExtIP,
}
srcUDPPort := layers.UDPPort(sport)
dstUDPPort := layers.UDPPort(dport)
// Listen on the external IP address. The payload protocol is IPIP
ipConn, err := net.ListenIP("ip4:4", &dstExtIPaddr)
Expand Down Expand Up @@ -131,7 +137,8 @@ func runSrv() {
if packet.Layers()[1].LayerType() == layers.LayerTypeUDP {
udpLayer := packet.Layers()[1]
udpPacket := udpLayer.(*layers.UDP)
if udpPacket.DstPort == dstUDPPort {
if udpPacket.DstPort == dstUDPPort &&
udpPacket.SrcPort == srcUDPPort {
payload := fmt.Sprintf("%s", packet.Layers()[1].LayerPayload())
// Check UDP paylaod
if payload == msg {
Expand Down Expand Up @@ -162,7 +169,7 @@ func runClt() {
}
// Describe UDP packet
udpLayer := layers.UDP{
SrcPort: layers.UDPPort(53),
SrcPort: layers.UDPPort(uint16(sport)),
DstPort: layers.UDPPort(uint16(dport)),
}
udpLayer.SetNetworkLayerForChecksum(&ipLayer2)
Expand Down
Loading