diff --git a/changelog.d/276.feature b/changelog.d/276.feature new file mode 100644 index 00000000..f119ccdd --- /dev/null +++ b/changelog.d/276.feature @@ -0,0 +1 @@ +Add function `registerMetrics` to `MembershipQueue` to track metrics. \ No newline at end of file diff --git a/src/components/membership-queue.ts b/src/components/membership-queue.ts index f48a3bc4..927c7f6b 100644 --- a/src/components/membership-queue.ts +++ b/src/components/membership-queue.ts @@ -1,6 +1,7 @@ import { Bridge } from "../bridge"; import { get as getLogger } from "./logging"; import PQueue from "p-queue"; +import { Counter, Gauge } from "prom-client"; const log = getLogger("MembershipQueue"); @@ -17,6 +18,7 @@ interface QueueUserItem { userId: string; retry: boolean; req: ThinRequest; + ts: number; } export interface MembershipQueueOpts { @@ -39,6 +41,10 @@ const DEFAULT_OPTS = { */ export class MembershipQueue { private queues: Map = new Map(); + private pendingGauge?: Gauge<"type"|"instance_id">; + private processedCounter?: Counter<"type"|"instance_id"|"outcome">; + private failureReasonCounter?: Counter<"errcode"|"http_status"|"type">; + private ageOfLastProcessedGauge?: Gauge; constructor(private bridge: Bridge, private opts: MembershipQueueOpts) { this.opts = { ...DEFAULT_OPTS, ...this.opts}; @@ -50,6 +56,37 @@ export class MembershipQueue { } } + /** + * This should be called after starting the bridge in order + * to track metrics for the membership queue. + */ + public registerMetrics() { + const metrics = this.bridge.getPrometheusMetrics(false); + + this.pendingGauge = metrics.addGauge({ + name: "membershipqueue_pending", + help: "Count of membership actions in the queue by type", + labels: ["type"] + }); + + this.processedCounter = metrics.addCounter({ + name: "membershipqueue_processed", + help: "Count of membership actions processed by type and outcome", + labels: ["type", "outcome"], + }); + + this.failureReasonCounter = metrics.addCounter({ + name: "membershipqueue_reason", + help: "Count of failures to process membership, by matrix errcode and http status", + labels: ["errcode", "http_status"], + }); + + this.ageOfLastProcessedGauge = metrics.addGauge({ + name: "membershipqueue_lastage", + help: "Gauge to measure the age of the last processed event", + }); + } + /** * Join a user to a room * @param roomId The roomId to join @@ -65,6 +102,7 @@ export class MembershipQueue { req, attempts: 0, type: "join", + ts: Date.now(), }); } @@ -88,6 +126,7 @@ export class MembershipQueue { reason, kickUser, type: "leave", + ts: Date.now(), }) } @@ -98,6 +137,9 @@ export class MembershipQueue { throw Error("Could not find queue for hash"); } queue.add(() => this.serviceQueue(item)); + this.pendingGauge?.inc({ + type: item.kickUser ? "kick" : item.type + }); } catch (ex) { log.error(`Failed to handle membership: ${ex}`); @@ -115,6 +157,7 @@ export class MembershipQueue { const reqIdStr = req.getId() ? `[${req.getId()}]`: ""; log.debug(`${reqIdStr} ${userId}@${roomId} -> ${type} (reason: ${reason || "none"}, kicker: ${kickUser})`); const intent = this.bridge.getIntent(kickUser || userId); + this.ageOfLastProcessedGauge?.set(Date.now() - item.ts); try { if (type === "join") { await intent.join(roomId); @@ -125,9 +168,30 @@ export class MembershipQueue { else { await intent.leave(roomId, reason); } + this.pendingGauge?.dec({ + type: item.kickUser ? "kick" : item.type + }); + this.processedCounter?.inc({ + type: item.kickUser ? "kick" : item.type, + outcome: "success", + }); } catch (ex) { + if (ex.errcode && ex.httpStatus) { + this.failureReasonCounter?.inc({ + type: item.kickUser ? "kick" : item.type, + errcode: ex.errcode, + http_status: ex.httpStatus + }); + } if (!this.shouldRetry(ex, attempts)) { + this.pendingGauge?.dec({ + type: item.kickUser ? "kick" : item.type + }); + this.processedCounter?.inc({ + type: item.kickUser ? "kick" : item.type, + outcome: "fail", + }); throw ex; } const delay = Math.min(