forked from jooray/nutband
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlxmf_wrapper_client.py
242 lines (196 loc) · 8.82 KB
/
lxmf_wrapper_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import asyncio
import json
import RNS
import os
import time
import LXMF
import sys
import random
import string
class LXMFWrapperClient:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(LXMFWrapperClient, cls).__new__(cls)
return cls._instance
# Generate a 4-byte ASCII string
def random_id(self):
return ''.join(random.choices(string.ascii_letters + string.digits, k=4))
def receive_handler(self, lxm):
fields = lxm.fields
req_id = fields.pop("req_id", None)
if req_id == None:
print("Received reply with req_id not set")
return
if not req_id in self.reply_callbacks:
print(f"Received reply with unknown req_id {req_id}")
return
reply_callback, destination_hash = self.reply_callbacks[req_id]
# Only call callbacks that come from the right source for the req_id
# source_hash is signed, so it could not have come from anyone else
if lxm.source_hash != destination_hash:
print(f"Received reply for {req_id} from wrong source. Was expecting {lxm.source_hash}, got {destination_hash}")
return
del self.reply_callbacks[req_id]
print (f"Calling reply_callback for {req_id}")
reply_callback(req_id, lxm)
async def send_lxmf_message(self, destination, content, fields,
delivery_callback, failed_callback,
reply_callback, req_id=None):
# Convert string to bytes below if you pass as a string
destination_bytes = bytes.fromhex(destination)
# Check to see if RNS knows the identity
destination_identity = RNS.Identity.recall(destination_bytes)
# If it doesn't know the identity:
if destination_identity == None:
basetime = time.time()
# Request it
RNS.Transport.request_path(destination_bytes)
# And wait until it arrives; timeout in 300s
print("Don't have identity for " + destination + ", waiting for it to arrive for 300s")
while destination_identity == None and (time.time() - basetime) < 300:
destination_identity = RNS.Identity.recall()
await asyncio.sleep(1)
if destination_identity == None:
print("Error: Cannot recall identity")
sys.exit(1)
lxmf_destination = RNS.Destination(
destination_identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
"lxmf",
"delivery"
)
if req_id is None:
req_id = self.random_id()
fields["req_id"]=req_id
# Create the lxm object
lxm = LXMF.LXMessage(
lxmf_destination,
self.local_lxmf_destination,
content,
fields=fields,
desired_method=LXMF.LXMessage.DIRECT
)
if delivery_callback is not None:
lxm.register_delivery_callback(delivery_callback)
if failed_callback is not None:
lxm.register_failed_callback(failed_callback)
if reply_callback is not None:
self.reply_callbacks[fields["req_id"]]=(reply_callback, lxmf_destination.hash)
# Send the message through the router
self.lxm_router.handle_outbound(lxm)
def create_lxmf_proxy(self):
# Initialize Reticulum. It's a singleton, we do it once per process
if RNS.Reticulum.get_instance() is None:
reticulum = RNS.Reticulum()
# Reticulum / LXMF has permanent identity, but we specifically
# don't want to be permanent, we will use per launch identity
self.ID = RNS.Identity()
userdir = os.path.expanduser("~")
configdir = userdir+"/.lxmfproxy_client/"
if not os.path.isdir(configdir):
os.makedirs(configdir)
self.lxm_router = LXMF.LXMRouter(identity = self.ID, storagepath = configdir)
self.lxm_router.register_delivery_callback(lambda lxm: self.receive_handler(lxm))
self.local_lxmf_destination = self.lxm_router.register_delivery_identity(self.ID,display_name="LXMFProxy")
self.local_lxmf_destination.announce()
def __init__(self):
if (not hasattr(self, 'reply_callbacks')) or (self.reply_callbacks is None):
self.reply_callbacks = {}
self.create_lxmf_proxy()
class LXMFProxy:
def __init__(self, lxmf_wrapper_client: LXMFWrapperClient, httpx=None, httpx_allowed=False, mappings=None):
self.mappings = mappings
if self.mappings is None:
self.mappings = {}
self.lxmf_wrapper_client = lxmf_wrapper_client
self.httpx = httpx
self.httpx_allowed = httpx_allowed
self.futures = {} # Dictionary to store futures mapped by req_id
self.event_loop = asyncio.get_running_loop()
def get_destination_for_url(self, url):
destination = None
new_url = url
for map_url in self.mappings:
if url.startswith(map_url):
destination = self.mappings[map_url]
new_url = url.replace(map_url, '')
break
return (destination, new_url)
async def handle_request(self, method, url, *, data=None, json=None, headers=None, cookies=None, params=None, **kwargs):
destination, new_url = self.get_destination_for_url(url)
if destination is None:
if self.httpx_allowed and self.httpx is not None:
return await self.httpx.get(url, params=params, headers=headers, cookies=cookies, **kwargs)
else:
raise Exception(f"URL {url} not found in mappings and http(s) is disabled")
else:
fields = {}
fields["method"] = method
if data is not None:
fields["data"] = data
if json is not None:
fields["json"] = json
if params is not None:
fields["params"] = params
if headers is not None:
fields["headers"] = headers
if cookies is not None:
fields["cookies"] = cookies
def describe_request(lxm):
req_id = ""
method = ""
if "req_id" in lxm.fields:
req_id = lxm.fields["req_id"]
if "method" in lxm.fields:
method = lxm.fields["method"]
return f"{method} request ID {req_id}"
def delivery_callback(lxm):
print("Delivered: " + describe_request(lxm))
def failed_callback(lxm):
request_description = describe_request(lxm)
print("Failed: " + request_description)
if "req_id" in lxm.fields:
req_id = lxm.fields["req_id"]
future = self.futures.get(req_id)
if future and not future.done():
del self.futures[req_id]
future.set_exception(Exception("Request failed: " + request_description))
else:
raise Exception("Request failed: " + request_description)
def failed_callback(lxm):
request_description = describe_request(lxm)
print("Failed: " + request_description)
raise Exception("Request failed " + request_description)
def reply_callback(req_id, lxm):
response = lxm
future = self.futures.pop(req_id, None)
if future and not future.done():
try:
self.event_loop.call_soon_threadsafe(future.set_result, response)
except Exception as e:
print(e)
req_id=self.lxmf_wrapper_client.random_id()
future = asyncio.Future()
self.futures[req_id] = future
await self.lxmf_wrapper_client.send_lxmf_message(destination, new_url, fields,
delivery_callback, failed_callback,
reply_callback, req_id=req_id)
lxm_reply = await future
print (lxm_reply)
return LXMFProxyResponse(lxm_reply)
async def get(self, url, *, params=None, headers=None, cookies=None, **kwargs):
return await self.handle_request('GET', url, params=params, headers=headers, cookies=cookies)
async def post(self, url, *, data=None, json=None, headers=None, cookies=None, **kwargs):
return await self.handle_request('POST', url, data=data, json=json, headers=headers, cookies=cookies)
class LXMFProxyResponse:
def __init__(self, lxm):
self.lxm = lxm
self.content = lxm.content
def json(self):
return json.loads(self.text())
def text(self):
return self.content
def raise_for_status(self):
pass