-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.py
125 lines (111 loc) · 3.98 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import hashlib
import hmac
import json
from typing import Any, Dict
import dramatiq
import httpx
import sentry_sdk
from dramatiq.brokers.redis import RedisBroker
from prometheus_client import Counter
from sentry_sdk.integrations.dramatiq import DramatiqIntegration
from circuit_breaker import CircuitBreaker, RedisBackend
from main import settings
if settings.sentry_dsn:
sentry_sdk.init(
dsn=settings.sentry_dsn,
integrations=[DramatiqIntegration()],
)
redis_broker = RedisBroker(url=settings.redis_url)
dramatiq.set_broker(redis_broker)
redis_backend = RedisBackend(url=settings.redis_url)
target_circuit = CircuitBreaker(
backend=redis_backend,
key="target-service",
failure_threshold=5,
reset_timeout=60,
half_open_timeout=30,
)
WEBHOOK_FORWARDS = Counter(
"webhook_forwards_total",
"Total number of webhook forwards to target service",
["status"],
)
@dramatiq.actor(priority=0)
def update_ci_status(repo: str, sha: str) -> None:
try:
with httpx.Client() as client:
response = client.post(
f"https://api.github.com/repos/{repo}/statuses/{sha}",
headers={
"Accept": "application/vnd.github.v3+json",
"Authorization": f"token {settings.github_token}",
},
json={
"state": "pending",
"context": "builds/x86_64",
"description": "Build pending",
},
timeout=10.0,
)
response.raise_for_status()
except Exception as e:
sentry_sdk.set_context(
"github_api", {"repo": repo, "sha": sha, "operation": "update_ci_status"}
)
print(f"Error updating CI status: {str(e)}")
raise
@dramatiq.actor(priority=10)
def forward_webhook(payload: Dict[str, Any], event_type: str) -> None:
try:
with target_circuit.acquire():
circuit_state = target_circuit.get_state()
if circuit_state == "half-open":
print(
"Circuit breaker in half-open state for "
f"{settings.target_service_url}, attempting recovery"
)
sentry_sdk.set_context(
"circuit_breaker",
{
"state": "half-open",
"target_url": settings.target_service_url,
"event_type": event_type,
},
)
payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8")
signature_sha1 = hmac.new(
settings.target_service_secret.encode("utf-8"),
payload_bytes,
hashlib.sha1,
).hexdigest()
signature_sha256 = hmac.new(
settings.target_service_secret.encode("utf-8"),
payload_bytes,
hashlib.sha256,
).hexdigest()
with httpx.Client() as client:
response = client.post(
settings.target_service_url,
json=payload,
headers={
"X-GitHub-Event": event_type,
"X-Hub-Signature": f"sha1={signature_sha1}",
"X-Hub-Signature-256": f"sha256={signature_sha256}",
},
timeout=30.0,
)
response.raise_for_status()
WEBHOOK_FORWARDS.labels(status="success").inc()
except Exception as e:
WEBHOOK_FORWARDS.labels(status="error").inc()
sentry_sdk.set_context(
"webhook_forward",
{
"event_type": event_type,
"target_url": settings.target_service_url,
"circuit_breaker_state": target_circuit.get_state(),
"payload_size": len(json.dumps(payload)),
},
)
print(f"Error forwarding webhook: {str(e)}")
raise