-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfoo.py
104 lines (86 loc) · 2.64 KB
/
foo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/python
import socket
from statistics import mean
import asyncio
from time import time
from typing import Optional
async def measure_latency(
host: str,
port: int = 443,
timeout: float = 5,
runs: int = 1,
wait: float = 1,
human_output: bool = False,
) -> list:
"""
:rtype: list
Builds a list composed of latency_points
"""
latency_points = []
for i in range(runs):
await asyncio.sleep(wait)
last_latency_point = latency_point(
host=host,
port=port,
timeout=timeout,
)
if human_output:
if i == 0:
print("tcp-latency {}".format(host))
_human_output(
host=host,
port=port,
timeout=timeout,
latency_point=last_latency_point,
seq_number=i,
)
if i == len(range(runs)) - 1:
print(f"--- {host} tcp-latency statistics ---")
print(f"{i+1} packets transmitted")
if latency_points:
print(
f"rtt min/avg/max = {min(latency_points)}/{mean(latency_points)}/{max(latency_points)} ms", # noqa: E501
)
latency_points.append(last_latency_point)
return latency_points
def latency_point(host: str, port: int = 443, timeout: float = 5) -> Optional[float]:
"""
:rtype: Returns float if possible
Calculate a latency point using sockets. If something bad happens the point returned is None
"""
# New Socket and Time out
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
# Start a timer
s_start = time()
# Try to Connect
try:
s.connect((host, int(port)))
s.shutdown(socket.SHUT_RD)
# If something bad happens, the latency_point is None
except socket.timeout:
pass
return None
except OSError:
pass
return None
# Stop Timer
s_runtime = (time() - s_start) * 1000
return float(s_runtime)
def _human_output(
host: str, port: int, timeout: int, latency_point: float, seq_number: int
):
"""fstring based output for the console_script"""
# In case latency_point is None
if latency_point:
print(
f"{host}: tcp seq={seq_number} port={port} timeout={timeout} time={latency_point} ms",
)
else:
print(f"{host}: tcp seq={seq_number} port={port} timeout={timeout} failed")
async def main():
host = "202.80.104.24"
port = "8484"
a = await measure_latency(host=host, port=port, runs=5)
print(a)
new = asyncio.run(main())