-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
125 lines (104 loc) · 4.33 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""
Ko-fi WebSocket Bridge
This module implements a FastAPI-based WebSocket bridge for Ko-fi webhooks. It serves
as an intermediary that receives Ko-fi donation webhooks and forwards them to connected
clients via WebSocket connections.
Key Features:
- Receives and validates Ko-fi webhook notifications
- Maintains WebSocket connections with clients
- Forwards webhook data to corresponding clients based on verification tokens
- Implements connection health checks via ping/pong mechanism
- Provides basic API endpoints for status and version information
The bridge uses verification tokens to manage and authenticate WebSocket connections,
ensuring webhook data is delivered to the correct client.
For webhook format details, see:
https://help.ko-fi.com/hc/en-us/articles/360004162298-Does-Ko-fi-have-an-API-or-webhook
"""
import asyncio
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Form
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.exceptions import HTTPException
from starlette.middleware.cors import CORSMiddleware
active_connections: dict[str, WebSocket] = {}
app = FastAPI(
version="1.0.2",
docs_url=None, # Disable Swagger UI
redoc_url=None # Disable ReDoc
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/", response_class=HTMLResponse)
async def root():
"""
Returns the home page of the application, which explains the purpose
and functionalities of the Ko-fi WebSocket bridge.
"""
return FileResponse("static/index.html")
@app.get("/ping")
async def _ping():
return {"message": "pong"}
@app.get("/version")
async def _version():
return {"version": app.version}
@app.post("/webhook")
async def ko_fi_webhook(data: str = Form(...)):
"""
Handles incoming webhooks from Ko-fi.
This endpoint expects a JSON payload with the following structure that
follows the Ko-fi webhook format:
https://help.ko-fi.com/hc/en-us/articles/360004162298-Does-Ko-fi-have-an-API-or-webhook#h_01HP1SMJAKE2HQ82A5011Z5648
The Ko-fi WebSocket bridge will attempt to forward the webhook data to the
corresponding WebSocket connection. If the connection is not established or
the connection is closed, the bridge will return a 400 error with the detail
"Connection closed or not established".
"""
webhook_data = json.loads(data)
verification_token = webhook_data.get('verification_token')
if not verification_token:
raise HTTPException(
status_code=400, detail="Missing verification_token")
if verification_token in active_connections:
websocket = active_connections[verification_token]
for _ in range(3): # Try 3 times
try:
await websocket.send_json(webhook_data)
break
except WebSocketDisconnect:
if verification_token in active_connections:
del active_connections[verification_token]
break
except (ConnectionError, RuntimeError):
await asyncio.sleep(1)
else:
if verification_token in active_connections:
await websocket.close()
del active_connections[verification_token]
return {"status": "success"}
@app.websocket("/ws/{verification_token}")
async def websocket_endpoint(websocket: WebSocket, verification_token: str):
"""
Establishes a WebSocket connection with the client and forwards incoming
Ko-fi webhooks to the corresponding connection.
The endpoint expects a verification token as a path parameter, which is used
to identify the connection. The endpoint will keep the connection alive by
sending a "pong" response to the "ping" message sent by the client.
If the connection is closed, the endpoint will remove the connection from the
active connections dictionary.
"""
await websocket.accept()
active_connections[verification_token] = websocket
try:
while True:
message = await websocket.receive_text()
if message == "ping":
await websocket.send_text("pong")
# Keep connection alive
except WebSocketDisconnect:
if verification_token in active_connections:
del active_connections[verification_token]