-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathhap.py
576 lines (484 loc) Β· 20.4 KB
/
hap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
#!/usr/bin/python3
############################
# HAPpy - Helium API Parser, Python
# https://github.com/co8/happy
#
# co8.com
# enrique r grullon
# discord: co8#1934
############################
# modules/libraries
from io import StringIO
import sys
import time
import requests
import json
from datetime import datetime
####
# Notes:
# To Add: Private methods, functions and data members
# _ : you shouldnβt access this method because itβs not part of the API
# __ : mangle the attribute names of a class to avoid conflicts of attribute names between classes
####
# main class
class happy:
"""HAPpy - Helium API Parser, Python"""
# attr
hotspot = ""
vars = {}
activities = [] # input
ness = response = [] # output
#
_helium_api_endpoint = "https://api.helium.io/v1/"
json_file_input = "" # keep?
_invalid_reason_short_names = {
"witness_too_close": "Too Close",
"witness_rssi_too_high": "RSSI Too High",
"witness_rssi_below_lower_bound": "RSSI Below Lower Bound",
}
__reward_short_names = {
"poc_witnesses": "Witness",
"poc_challengees": "Beacon",
"poc_challengers": "Challenger",
"data_credits": "Data",
}
def _load_json_data(self):
# get json_file_input from vars if exists
if "json_file_input" in self.vars and bool(self.vars["json_file_input"]):
self.json_file_input = self.vars["json_file_input"]
if bool(self.json_file_input):
with open(self.json_file_input) as json_data_file:
json_load = json.load(json_data_file)
self.activities = json_load["data"]
json_load.pop("data")
# if more vars after data, load in vars
if bool(json_load):
for key, var in json_load.items():
self.vars[key] = var
def _load_hotspot_data(self):
if (
"get_wallet" in self.vars
and bool(self.vars["get_wallet"])
or "get_hotspot" in self.vars
and bool(self.vars["get_hotspot"])
):
# try to get json or return error
status = ""
try:
# LIVE API data
hotspot_request = requests.get(
self._helium_api_endpoint + "hotspots/" + self.hotspot
)
hs = hotspot_request.json()
hs = hs["data"]
except requests.RequestException:
status = "Connectivity"
except ValueError: # includes simplejson.decoder.JSONDecodeError
status = "Parsing JSON"
except (IndexError, KeyError):
status = "JSON format"
if bool(status):
print(f"Hotspot API Error: {status}")
quit()
# success. add vars into vars.hotspot
else:
self.vars["hotspot"] = {
"address": hs["address"],
"owner": hs["owner"],
"name": self.nice_hotspot_name(hs["name"]),
"initials": "",
"status": str(hs["status"]["online"]).capitalize(),
"block_height": "",
"sync": "",
"height": hs["status"]["height"],
"block": hs["block"],
"reward_scale": "{:.2f}".format(round(hs["reward_scale"], 2)),
}
self.vars["hotspot"]["initials"] = self.nice_hotspot_initials(
self.vars["hotspot"]["name"]
)
###block height percentage
block_height = round(
self.vars["hotspot"]["height"]
/ self.vars["hotspot"]["block"]
* 100,
3,
)
self.vars["hotspot"]["block_height"] = str(block_height) + "%"
self.vars["hotspot"]["sync"] = (
"Synced" if block_height > 98 else str(block_height) + "%"
)
# need to get hotspot data to get owner to then get wallet
def _load_wallet_data(self):
if "get_wallet" in self.vars and bool(self.vars["get_wallet"]):
# get owner from hotspot for request
if (
"hotspot" not in self.vars
or "hotspot" not in self.vars
and "owner" in self.vars["hotspot"]
):
self._load_hotspot_data()
# try to get json or return error
status = ""
try:
# LIVE API data
wallet_request = requests.get(
self._helium_api_endpoint
+ "accounts/"
+ self.vars["hotspot"]["owner"]
)
w = wallet_request.json()
except requests.RequestException:
status = "Connectivity"
except ValueError: # includes simplejson.decoder.JSONDecodeError
status = "Parsing JSON"
except (IndexError, KeyError):
status = "JSON format"
if bool(status):
print(f"Wallet API Error: {status}")
quit()
# success. add vars into vars.wallet
else:
self.vars["wallet"] = {
"block": w["data"]["block"],
"balance": w["data"]["balance"],
"balance_nice": self.nice_hnt_amount_or_seconds(
w["data"]["balance"]
),
"dc_balance": w["data"]["dc_balance"],
"dc_balance_nice": self.nice_hnt_amount_or_seconds(
w["data"]["dc_balance"]
),
"staked_balance": w["data"]["staked_balance"],
"staked_balance_nice": self.nice_hnt_amount_or_seconds(
w["data"]["staked_balance"]
),
}
# delete hotspot from self.vars if not requested
if (
"get_hotspot" not in self.vars
or "get_hotspot" in self.vars
and not bool(self.vars["get_hotspot"])
):
del self.vars["hotspot"]
def trim_activities(self):
if (
"max" in self.vars
and bool(self.vars["max"])
and len(self.activities) > self.vars["max"]
):
del self.activities[self.vars["max"] :]
def get_time(self):
now = datetime.now()
self.vars["now"] = round(datetime.timestamp(now))
self.vars["time"] = str(now.strftime("%H:%M %D"))
def nice_date(self, time):
timestamp = datetime.fromtimestamp(time)
return timestamp.strftime("%H:%M %d/%b").upper()
def nice_hotspot_name(self, name):
return name.replace("-", " ").title()
def nice_hotspot_initials(self, name):
return "".join(item[0].upper() for item in name.split())
def nice_hnt_amount_or_seconds(self, amt):
niceNum = 0.00000001
niceNumSmall = 100000000
# float. for time i
if isinstance(amt, float):
amt_output = "{:.2f}".format(amt)
# int. up to 3 decimal payments
else:
amt_output = "{:.3f}".format(amt * niceNum)
# int. 8 decimal places for micropayments
if amt in range(0, 100000):
amt_output = "{:.8f}".format(amt / niceNumSmall).rstrip("0")
# fix nice 0
if amt_output == "0.":
amt_output = "0.000"
return str(amt_output)
def _nice_invalid_reason(self, ir):
"""invalid reason nice name, or raw reason if not in dict"""
return (
self._invalid_reason_short_names[ir]
if ir in self._invalid_reason_short_names
else str(ir)
)
def _reward_short_name(self, reward_type):
"""activity type name to short name"""
return (
self.__reward_short_names[reward_type]
if reward_type in self.__reward_short_names
else reward_type.upper()
)
def _write_json(self):
with open(self.vars["json_file_output"], "w") as outfile:
json.dump(self.ness, outfile)
def _get_cursor_only(self):
try:
# LIVE API data
activity_endpoint = (
self._helium_api_endpoint + "hotspots/" + self.hotspot + "/activity/"
)
activity_request = requests.get(activity_endpoint)
data = activity_request.json()
self.vars["cursor"] = data["cursor"]
except:
print("cannot get cursor from Helium API. I quit")
quit()
# def func_get_cursor_and_activities(self):
# if "get_cursor_and_activities" in self.vars:
# self._get_cursor_only()
###############################################
def _load_activity_data(self):
# get activities from "data" in loadvars
if "data" in self.vars:
self.activities = self.vars["data"]
# add cursor if set, get and set cursor if get_cursor_and_activities
else:
add_cursor = ""
if (
"cursor" in self.vars
and bool(self.vars["cursor"])
or "get_cursor_and_activities" in self.vars
and bool(self.vars["get_cursor_and_activities"])
):
if "get_cursor_and_activities" in self.vars:
self._get_cursor_only()
elif "cursor" in self.vars and bool(self.vars["cursor"]):
add_cursor = f"?cursor={self.vars['cursor']}"
add_cursor = f"?cursor={self.vars['cursor']}"
# try to get json or return error
status = ""
try:
# LIVE API data
activity_endpoint = (
self._helium_api_endpoint
+ "hotspots/"
+ self.hotspot
+ "/activity/"
+ add_cursor
)
activity_request = requests.get(activity_endpoint)
data = activity_request.json()
self.activities = data["data"]
except requests.RequestException:
status = "Connectivity"
except ValueError: # includes simplejson.decoder.JSONDecodeError
status = "Parsing JSON"
except (IndexError, KeyError):
status = "JSON format"
if bool(status):
print(f"Activity API Error: {status}")
quit()
self.activities = data["data"]
###############################################
def _loop_activities(self):
self.trim_activities()
if bool(self.activities):
for activity in self.activities:
parsed_activity = {}
# activity time
parsed_activity["height"] = activity["height"]
parsed_activity["hash"] = activity["hash"]
parsed_activity["time"] = activity["time"]
parsed_activity["time_nice"] = self.nice_date(activity["time"])
# reward
if activity["type"] == "rewards_v2":
for reward in activity["rewards"]:
parsed_activity["type"] = activity["type"]
parsed_activity["subtype"] = "rewards"
parsed_activity["name"] = "Rewards"
parsed_activity["emoji"] = "πͺ"
parsed_activity["hnt_emoji"] = "π₯"
parsed_activity["reward_type"] = self._reward_short_name(
reward["type"]
)
parsed_activity["subtype"] += (
"_" + parsed_activity["reward_type"].lower()
)
parsed_activity["amount"] = reward["amount"]
parsed_activity[
"amount_nice"
] = self.nice_hnt_amount_or_seconds(reward["amount"])
# transferred packets
elif activity["type"] == "state_channel_close_v1":
parsed_activity["type"] = activity["type"]
parsed_activity["subtype"] = "packets"
parsed_activity["name"] = "Transferred Packets"
parsed_activity["emoji"] = "π"
for summary in activity["state_channel"]["summaries"]:
packet_plural = "s" if summary["num_packets"] != 1 else ""
parsed_activity["num_packets"] = summary["num_packets"]
parsed_activity["packets_text"] = f"Packet{packet_plural}"
parsed_activity["num_dcs"] = summary["num_dcs"]
# ...challenge accepted
elif activity["type"] == "poc_request_v1":
parsed_activity["type"] = activity["type"]
parsed_activity["name"] = "Created Challenge..."
parsed_activity["subtype"] = "created_challenge"
parsed_activity["emoji"] = "π²"
# beacon sent, valid witness, invalid witness
elif activity["type"] == "poc_receipts_v1":
parsed_activity = self._poc_receipts_v1(activity)
# other
else:
parsed_activity["type"] = activity["type"]
other_type_name = activity["type"]
parsed_activity["name"] = other_type_name.upper()
parsed_activity["emoji"] = "π"
self.ness.append(parsed_activity)
self._filter_ness()
self._clone_ness_response()
def _clone_ness_response(self):
"""Response is an alias of Ness"""
self.response = self.ness
def _filter_ness(self):
# filtering
if "filter" in self.vars:
# if string, convert filter to list
filters = self.vars["filter"]
if isinstance(self.vars["filter"], str):
filters = []
filters.append(self.vars["filter"])
# loop and find matches
self_ness_filtered = []
for ness_index, _ in enumerate(self.ness):
if (
self.ness[ness_index]["type"] in filters
or self.ness[ness_index]["subtype"] in filters
):
self_ness_filtered.append(self.ness[ness_index])
self.ness = self_ness_filtered # replace
def _poc_receipts_v1(self, activity):
parsed_poc = {}
valid_text = "Invalid"
parsed_poc["height"] = activity["height"]
parsed_poc["hash"] = activity["hash"]
parsed_poc["time"] = activity["time"]
parsed_poc["time_nice"] = self.nice_date(activity["time"])
parsed_poc["type"] = "poc_receipts_v1"
witnesses = {}
wit_count = 0
if "path" in activity and "witnesses" in activity["path"][0]:
witnesses = activity["path"][0]["witnesses"]
wit_count = len(witnesses)
# pluralize Witness
wit_plural = "es" if wit_count != 1 else ""
# challenge accepted
if "challenger" in activity and activity["challenger"] == self.hotspot:
parsed_poc["name"] = "...Challenged Beaconer"
parsed_poc["emoji"] = "π"
parsed_poc["subtype"] = "challenged_beaconer"
parsed_poc["witnesses"] = len(witnesses)
parsed_poc["witness_text"] = f"Witness{wit_plural}"
# beacon sent
elif (
"challengee" in activity["path"][0]
and activity["path"][0]["challengee"] == self.hotspot
):
# beacon sent plus witness count and valid count
valid_wit_count = 0
for wit in witnesses:
if bool(wit["is_valid"]):
valid_wit_count = valid_wit_count + 1
if bool(wit_count):
if valid_wit_count == len(witnesses):
valid_wit_count = "All"
parsed_poc["name"] = "Sent Beacon"
parsed_poc["subtype"] = "sent_beacon"
parsed_poc["emoji"] = "π"
parsed_poc["witnesses"] = len(witnesses)
parsed_poc["witness_text"] = f"Witness{wit_plural}"
parsed_poc["valid_witnesses"] = valid_wit_count
# witnessed beacon plus valid or invalid and invalid reason
elif bool(witnesses):
vw = 0 # valid witnesses
valid_witness = False
for w in witnesses:
# valid witness count among witnesses
if "is_valid" in w and bool(w["is_valid"]):
vw = vw + 1
if w["gateway"] == self.hotspot:
witness_info = ""
if bool(w["is_valid"]):
valid_witness = True
valid_text = "Valid"
parsed_poc["emoji"] = "πΈ"
witness_info = f", 1 of {wit_count}"
elif "invalid_reason" in w:
# valid_text = "π© Invalid"
valid_text = "Invalid"
parsed_poc["emoji"] = "π©"
parsed_poc["invalid_reason"] = self._nice_invalid_reason(
w["invalid_reason"]
)
parsed_poc["subtype"] = f"{valid_text.lower()}_witness"
# add valid witness count among witnesses
if bool(valid_witness) and vw >= 1:
vw = "All" if vw == len(witnesses) else vw
witness_info += f", {vw} Valid"
parsed_poc["name"] = f"{valid_text} Witness"
parsed_poc["witnesses"] = wit_count
parsed_poc["witness_text"] = f"Witness{wit_plural}"
parsed_poc["valid_witnesses"] = vw
# other
else:
ac_type = activity["type"]
parsed_poc["name"] = ac_type.upper()
parsed_poc["emoji"] = "π"
return parsed_poc
##############################
def __init__(self, hotspot, loadvars={}):
"""init"""
if not bool(hotspot):
print("happy needs a hotspot address to get latest activities")
quit()
self.hotspot = hotspot
# loadvars is str and is ends with .json
if isinstance(loadvars, str) and loadvars.find(".json") != -1:
self.vars["json_file_input"] = loadvars
self._load_json_data()
elif (
"json_file_input" in loadvars
and loadvars["json_file_input"].find(".json") != -1
):
self.json_file_input = loadvars["json_file_input"]
self._load_json_data()
# loadvars is dict has "data" and bool("data")
if (
loadvars
and isinstance(loadvars, dict)
and "data" in loadvars
and bool(loadvars["data"])
):
self.activities = loadvars["data"]
loadvars.pop("data")
# if loadvars dict and more than "data", load into vars
if isinstance(loadvars, dict) and bool(loadvars):
# get activities from json file provided in "json_file_input" in loadvars
if (
"json_file_input" in self.vars
and self.vars["json_file_input"].find(".json") != -1
):
self.json_file_input = loadvars["json_file_input"]
for key, var in loadvars.items():
self.vars[key] = var
# load activities if no json file as loadvars as str or as loadvars.json_file_input
if not bool(self.json_file_input):
# get live data
self._load_activity_data()
# loop activities
self._loop_activities()
# if set in loadvars
# if load_wallet set in loadvars
self._load_hotspot_data()
# if set in loadvars
self._load_wallet_data()
# reverse sort if set in loadvars
if "reverse_sort" in self.vars and bool(self.vars["reverse_sort"]):
reverse_sort_ness = self.ness
reverse_sort_ness.reverse()
self.ness = self.response = reverse_sort_ness # ness and alias response
# write to json file if set in loadvars
if "json_file_output" in self.vars and bool(self.vars["json_file_output"]):
self._write_json()