-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathinvite.py
194 lines (157 loc) · 7.46 KB
/
invite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
from typing import Optional, Type
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from maubot import Plugin, MessageEvent
from maubot.handlers import command
import json
import datetime
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("admin_secret")
helper.copy("legacy_mr")
helper.copy("reg_url")
helper.copy("reg_page")
helper.copy("admins")
helper.copy("expiration")
helper.copy("message")
class Invite(Plugin):
async def start(self) -> None:
await super().start()
self.config.load_and_update()
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
async def can_manage(self, evt: MessageEvent) -> bool:
if evt.sender in self.config["admins"]:
return True
else:
await evt.respond("You don't have permission to manage invitations for this server.")
return False
def set_api_endpoints(self) -> None:
self.config["api_url"] = self.config["reg_url"] + "/api"
if self.config["legacy_mr"] == True:
self.config["api_url"] = self.config["reg_url"]
@command.new(name="invite", help="Generate a unique invitation code to this matrix homeserver", \
require_subcommand=True)
async def invite(self, evt: MessageEvent) -> None:
pass
@invite.subcommand("generate", help="Generate a new invitation token.")
async def generate(self, evt: MessageEvent) -> None:
await evt.mark_read()
if not await self.can_manage(evt):
return
self.set_api_endpoints()
ex_date = datetime.datetime.strftime( \
(datetime.date.today() + datetime.timedelta(days=self.config["expiration"])), \
"%Y-%m-%d")
# use re-ordered date if using legacy code
if self.config["legacy_mr"] == True:
ex_date = datetime.datetime.strftime( \
(datetime.date.today() + datetime.timedelta(days=self.config["expiration"])), \
"%m.%d.%Y")
headers = {
'Authorization': f"SharedSecret {self.config['admin_secret']}",
'Content-Type': 'application/json'
}
try:
response = await self.http.post(f"{self.config['api_url']}/token", headers=headers, \
json={"max_usage": 1, "one_time": True, "ex_date": ex_date, "expiration_date": ex_date})
status = response.status
resp_json = await response.json()
except Exception as e:
body = await response.text()
await evt.respond(f"Uh oh! I got a {status} response from your registration endpoint:<br /> \
{body}<br /> \
which prompted me to produce this error:<br /> \
<code>{e.message}</code>", allow_html=True)
return None
try:
token = resp_json['name']
except Exception as e:
await evt.respond(f"I got a bad response back, sorry, something is borked. \n\
{resp_json}")
self.log.exception(e)
return None
msg = '<br />'.join(
[
f"Invitation token <b>{token}</b> created!",
f"",
f"Your unique url for registering is:",
f"{self.config['reg_url']}{self.config['reg_page']}?token={token}",
f"This invite token will expire in {self.config['expiration']} days.",
f"If it expires before use, you must request a new token."
])
if self.config['message']:
msg = self.config["message"].format(token=token, reg_url=self.config['reg_url'],
reg_page=self.config['reg_page'], expiration=self.config['expiration'])
await evt.respond(msg, allow_html=True)
@invite.subcommand("status", help="Return the status of an invite token.")
@command.argument("token", "Token", pass_raw=True, required=True)
async def status(self, evt: MessageEvent, token: str) -> None:
await evt.mark_read()
if not await self.can_manage(evt):
return
self.set_api_endpoints()
if not token:
await evt.respond("you must supply a token to check")
headers = {
'Authorization': f"SharedSecret {self.config['admin_secret']}",
'Content-Type': 'application/json'
}
try:
response = await self.http.get(f"{self.config['api_url']}/token/{token}", headers=headers)
resp_json = await response.json()
except Exception as e:
await evt.respond(f"request failed: {e.message}")
return None
# this isn't formatted nicely but i don't really care that much
await evt.respond(f"Status of token {token}: \n<pre><code format=json>{json.dumps(resp_json, indent=4)}</code></pre>", allow_html=True)
@invite.subcommand("revoke", help="Disable an existing invite token.")
@command.argument("token", "Token", pass_raw=True, required=True)
async def revoke(self, evt: MessageEvent, token: str) -> None:
await evt.mark_read()
if not await self.can_manage(evt):
return
self.set_api_endpoints()
if not token:
await evt.respond("you must supply a token to revoke")
headers = {
'Authorization': f"SharedSecret {self.config['admin_secret']}",
'Content-Type': 'application/json'
}
# this is a really gross way of handling legacy installs and should be cleaned up
# basically this command used to use PUT but now uses PATCH
if self.config["legacy_mr"] == True:
try:
response = await self.http.put(f"{self.config['api_url']}/token/{token}", headers=headers, \
json={"disable": True})
resp_json = await response.json()
except Exception as e:
await evt.respond(f"request failed: {e.message}")
return None
else:
try:
response = await self.http.patch(f"{self.config['api_url']}/token/{token}", headers=headers, \
json={"disabled": True})
resp_json = await response.json()
except Exception as e:
await evt.respond(f"request failed: {e.message}")
return None
# this isn't formatted nicely but i don't really care that much
await evt.respond(f"<pre><code format=json>{json.dumps(resp_json, indent=4)}</code></pre>", allow_html=True)
@invite.subcommand("list", help="List all tokens that have been generated.")
async def list(self, evt: MessageEvent) -> None:
await evt.mark_read()
if not await self.can_manage(evt):
return
self.set_api_endpoints()
headers = {
'Authorization': f"SharedSecret {self.config['admin_secret']}"
}
try:
response = await self.http.get(f"{self.config['api_url']}/token", headers=headers)
resp_json = await response.json()
except Exception as e:
await evt.respond(f"request failed: {e.message}")
return None
# this isn't formatted nicely but i don't really care that much
await evt.respond(f"<pre><code format=json>{json.dumps(resp_json, indent=4)}</code></pre>", allow_html=True)