-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpingmaster.py
105 lines (87 loc) · 3.12 KB
/
pingmaster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import requests # to make HTTP requests
import json # library for handling JSON data
import time # time library
import math, statistics # maths library to compute Z-score
from boltiot import Bolt # importing Bolt from boltiot module
import speedtest # speedtest library to check Network speed
import conf # config file
mybolt = Bolt(conf.bolt_api_key, conf.device_id)
ping_history=[]
try:
file = open('pingmaster-header.txt', 'r')
print (' ')
print (file.read())
file.close()
except IOError:
print ('\nBanner File not found!')
# Function to compute Upper and Lower Bounds by Z-Score Analysis
def compute_bounds(ping_history,frame_size,factor):
if len(ping_history) < frame_size:
return None
if len(ping_history) > frame_size:
del ping_history[0:len(ping_history)-frame_size]
Mn=statistics.mean(ping_history)
Variance=0
for data in ping_history:
Variance += math.pow((data-Mn),2)
Zn = factor*math.sqrt(Variance/frame_size)
High_bound = ping_history[frame_size-1]+Zn
Low_bound = ping_history[frame_size-1]-Zn
return [High_bound,Low_bound]
# Function to test Network Speed
def test():
try:
s = speedtest.Speedtest()
s.get_servers()
s.get_best_server()
s.download()
s.upload()
res = s.results.dict()
return res["download"], res["upload"], res["ping"]
except Exception as e:
print("\nInternet connection was lost :(")
print("\nPlease connect to your Internet and try again !\n")
# Function to send output to Telegram
def telegram_message(message):
url = "https://api.telegram.org/" + conf.telegram_bot_id + "/sendMessage"
data = {"chat_id": conf.telegram_chat_id, "text": message}
try:
response = requests.request("POST", url, params=data)
#print("\nThis is the Telegram URL : ")
#print(url)
#print("\nThis is the Telegram response : ")
#print(response.text)
telegram_data = json.loads(response.text)
return telegram_data["ok"]
except Exception as e:
print("\nAn error occurred in sending the alert message via Telegram")
print(e)
return False
while True:
d, u, p = test()
print('\nNetwork Information :\n')
print('Download : {:.2f} Kb/s\n'.format(d / 1024))
print('Upload : {:.2f} Kb/s\n'.format(u / 1024))
print('Ping : {:.2f} \n'.format(p))
ping_value = int(p)
bound = compute_bounds(ping_history,conf.frame_size,conf.mul_factor)
if not bound:
required_data_count = conf.frame_size-len(ping_history)
print("Not enough data to compute Z-score. Need ",required_data_count," more data point(s) !")
ping_history.append(ping_value)
continue
try:
if ping_value > bound[0] :
print ("Ping increased suddenly. Sending an alert via Telegram....")
message = "Your Ping has increased suddenly." + "\nCurrent Ping value is : " + str(ping_value)
message += "\nConsider disconnecting some Devices from your Network or switch to a better Network !"
telegram_status = telegram_message(message)
print ("\nThis is the Telegram status : ", telegram_status, "\n")
for i in range(0,9):
mybolt.digitalWrite('1', 'HIGH')
time.sleep(0.05)
mybolt.digitalWrite('1', 'LOW')
ping_history.append(ping_value)
except Exception as e:
print ("Error : ",e)
time.sleep(10)