Skip to content

πŸš€ Async REST API Processing with PGMQ β€” Queue, Retry, Rate-Limit, DLQ

License

Notifications You must be signed in to change notification settings

arun0009/pgmq-taskq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

2 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

pgmq-taskq

An open-source asynchronous task processor built with Go and backed by PostgreSQL using the PGMQ extension. Submit tasks to call REST APIs asynchronously with client-defined rate limits, retries, and timeouts. Track processing status, recover failed tasks from a Dead Letter Queue (DLQ), configure task type rate limits, and monitor via Prometheus.

πŸš€ Architecture Overview

  • Client submits tasks via POST /tasks with custom config (retries, rate limits, headers, HTTP method).
  • Server enqueues messages into PGMQ (task_queue) and returns a task_id and message_ids.
  • Workers poll the queue, process tasks with exponential backoff retries, respect rate limits, update statuses in PostgreSQL, and move failed tasks to task_dlq.
  • Client can:
    • Retrieve task status via GET /tasks/{task_id}.
    • Recover failed tasks via POST /dlq/reprocess.
    • Configure task type rate limits via POST /task-types and retrieve them via GET /task-types/{task_type}.
  • Metrics are exposed at /metrics for observability.

This architecture ensures scalability (via worker pool), resilience (PGMQ + retries + DLQ), and observability (Prometheus metrics + structured logging).

🀠 How It Works

%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#007bff', 'secondaryColor': '#28a745', 'lineColor': '#6c757d', 'actorBkg': '#e9ecef', 'noteBkgColor': '#f8f9fa' }}}%%
sequenceDiagram
    actor Client
    participant Server as HTTP Server<br>(Echo :8080)
    participant Queue as PGMQ<br>(task_queue)
    participant DLQ as PGMQ<br>(task_dlq)
    participant Workers as Workers<br>(MaxWorkers)
    participant Postgres as Postgres<br>(task_status)
    participant Metrics as Metrics<br>(/metrics)
    participant ExternalAPI as External API

    Client->>Server: POST /tasks<br>{APIConfig, Payloads}
    activate Server
    Note right of Server: APIConfig:<br>Method, RPS,<br>Retries, Headers
    Server->>Queue: Enqueue Messages
    activate Queue
    Server-->>Client: 201 Created<br>{task_id, queue_message_ids}
    deactivate Server

    loop Poll Queue
        Workers->>Queue: Read (1s if empty)
        activate Workers
        Queue-->>Workers: Messages
        Workers->>Postgres: Set pending
        activate Postgres
        Workers->>Metrics: Update metrics
        activate Metrics
        Note left of Workers: Apply RPS,<br>Exponential Backoff
        Workers->>ExternalAPI: HTTP Call<br>(POST/PATCH/PUT/DELETE)
        ExternalAPI-->>Workers: Response
        alt Success
            Workers->>Postgres: Set success
            Workers->>Queue: Delete Message
        else Failure
            alt Retry < MaxRetries
                Workers->>Postgres: Increment attempt
                Workers->>Queue: Archive Message
            else MaxRetries Reached
                Workers->>Postgres: Set failed
                Workers->>DLQ: Move to DLQ
                activate DLQ
                Workers->>Queue: Delete Message
                deactivate DLQ
            end
        end
        deactivate Postgres
        deactivate Metrics
        deactivate Workers
    end

    Client->>Server: GET /tasks/{task_id}
    activate Server
    Server->>Postgres: Query status
    activate Postgres
    Postgres-->>Server: Status data
    Server-->>Client: {task_id, statuses}
    deactivate Postgres
    deactivate Server

    Client->>Server: POST /dlq/reprocess<br>{task_type or task_id}
    activate Server
    Server->>DLQ: Read Messages
    activate DLQ
    Server->>Queue: Requeue Messages
    Server->>Postgres: Set pending
    activate Postgres
    Server-->>Client: {reprocessed_count, message_ids}
    deactivate Postgres
    deactivate DLQ
    deactivate Server

    Client->>Server: POST /task-types<br>{task_type, max_rps}
    activate Server
    Server->>Postgres: Insert/Update config
    activate Postgres
    Server-->>Client: {task_type, max_rps}
    deactivate Postgres
    deactivate Server

    Client->>Server: GET /task-types/{task_type}
    activate Server
    Server->>Postgres: Query config
    activate Postgres
    Postgres-->>Server: Config data
    Server-->>Client: {task_type, max_rps}
    deactivate Postgres
    deactivate Server

    Note over Client,Metrics: Scalable, Reliable, Observable

Loading

✨ Features

  • REST API:

    • POST /tasks: Submit tasks with multiple payloads for asynchronous processing.
    • GET /tasks/{task_id}: Check the status of a task's messages.
    • POST /dlq/reprocess: Recover failed tasks from the DLQ by task_type or task_id.
    • POST /task-types: Configure rate limits (max_rps) for a task type.
    • GET /task-types/{task_type}: Retrieve rate limit configuration for a task type.
    • GET /metrics: Prometheus-compatible metrics.
  • API Documentation: Swagger/OpenAPI specification available in docs/swagger.json and docs/swagger.yaml, with interactive Swagger UI at http://localhost:8080/swagger/index.html.

Flexible HTTP Methods: Supports POST, PATCH, PUT, and DELETE.

  • Customizable: Configure each batch with:

    • API endpoint, HTTP method, headers
    • max_rps, max_retries, retry_delay, timeout
  • Resilient:

    • Exponential backoff retries using go-resty.
    • Dead Letter Queue (task_dlq) for failed tasks.
    • Status tracking in PostgreSQL (task_status).
    • Backpressure via worker pool and buffered channels.
  • Generic: Works with any REST API

  • Production-Ready: Built with Echo, Logging (Zap), Prometheus, Dockerized, graceful shutdown

🧰 Prerequisites

  • Docker
  • Go 1.21+

🏁 Quick Start

1. Clone the repo

git clone https://github.com/arun0009/pgmq-taskq.git
cd pgmq-taskq

2. Start the server

docker-compose up --build

3. Create and enqueue a task

curl -X POST http://localhost:8080/tasks \
  -H "Content-Type: application/json" \
  -d '{
        "item_id": "abc123",
        "task_type": "update-resource",
        "max_retries": 3,
        "retry_delay": 1000,
        "url": "https://api.example.com",
        "method": "POST",
        "headers": {
          "Authorization": "Bearer token"
        },
        "payloads": [
          {
            "data": {
              "accountId": "1",
              "accountType": "savings"
            },
            "api_override": {
              "url": "https://override.example.com",
              "headers": {
                "Authorization": "Bearer different-token"
              }
            }
          },
          {
            "data": {
              "accountId": "2",
              "accountType": "checking"
            }
          },
           {
            "data": {
              "accountId": "3",
              "accountType": "checking"
            }
          }            
        ]
      }'

Sample response:

{
  "task_id": "123e4567-e89b-12d3-a456-426614174000",
  "message_ids": [1, 2, 3]
}

4. Check Status

curl http://localhost:8080/tasks/123e4567-e89b-12d3-a456-426614174000

Sample response:

{
  "task_id": "123e4567-e89b-12d3-a456-426614174000",
  "statuses": [
    {
      "message_id": 1,
      "item_id": "abc123",
      "task_type": "update-resource",
      "payload": {"accountId": "1", "accountType": "savings"},
      "status": "success",
      "error": "",
      "attempt": 0,
      "updated_at": "2025-04-19T12:00:00Z",
      "payload_id": 0
    },
    {
      "message_id": 2,
      "item_id": "abc123",
      "task_type": "update-resource",
      "payload": {"accountId": "2", "accountType": "checking"},
      "status": "pending",
      "error": "",
      "attempt": 0,
      "updated_at": "2025-04-19T12:00:01Z",
      "payload_id": 1
    },
    {
      "message_id": 3,
      "item_id": "abc123",
      "task_type": "update-resource",
      "payload": {"accountId": "3", "accountType": "checking"},
      "status": "failed",
      "error": "timeout",
      "attempt": 3,
      "updated_at": "2025-04-19T12:00:02Z",
      "payload_id": 2
    }
  ]
}

5. Recover Failed Tasks

To reprocess failed tasks from the DLQ by task_type or task_id:

curl -X POST http://localhost:8080/dlq/reprocess \
  -H "Content-Type: application/json" \
  -d '{
        "task_type": "update-resource"
      }'

Sample response:

{
  "reprocessed_count": 1,
  "message_ids": [4]
}

6. Configure Task Type Rate Limit

Set the maximum requests per second (max_rps) for a task type:

curl -X POST http://localhost:8080/task-types \
  -H "Content-Type: application/json" \
  -d '{
        "task_type": "update-resource",
        "max_rps": 10.0
      }'

Sample response:

{
 "task_type": "update-resource",
 "max_rps": 10.0,
 "created_at": "2025-04-19T12:00:00Z",
 "updated_at": "2025-04-19T12:00:00Z"
}

7. Retrieve Task Type Rate Limit

Get the rate limit configuration for a task type:

curl http://localhost:8080/task-types/update-resource

Sample Response

{
  "task_type": "update-resource",
  "max_rps": 10.0,
  "created_at": "2025-04-19T12:00:00Z",
  "updated_at": "2025-04-19T12:00:00Z"
}

8. View Metrics

Open http://localhost:8080/metrics

βš™οΈ Configuration

Variable Default Description
POSTGRES_DSN postgres://postgres:postgres@postgres:5432/postgres PostgreSQL DSN
MAX_WORKERS 10 Number of concurrent workers
MAX_QUEUE_BUFFER 100 In-memory buffer size
VISIBILITY_TIMEOUT 30s PGMQ visibility timeout
PORT 8080 HTTP server port
MAX_RETRIES 3 Max retry attempts for failed tasks
DLQ_QUEUE_NAME task_dlq Name of the Dead Letter Queue
RETRY_BASE_DELAY 1s Base delay for exponential backoff`
RETRY_MAX_DELAY 60s Max delay for exponential backoff`
TIMEOUT 5s HTTP client timeout
LOG_LEVEL info Log Level (logs to console)

Example .env file

POSTGRES_DSN=postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable
PORT=8080
MAX_WORKERS=10
MAX_QUEUE_BUFFER=100
TIMEOUT=5s
VISIBILITY_TIMEOUT=30s
MAX_RETRIES=3
RETRY_BASE_DELAY=1s
RETRY_MAX_DELAY=60s
DLQ_QUEUE_NAME=task_dlq
LOG_LEVEL=info

πŸ›  Development

Build

go mod tidy
go build -o pgmq-taskq ./server/cmd

Test

go test -v ./... 

Note: requires docker to be running as we are using testcontainers for pgmq

Example Client

go run examples/client/main.go

πŸ“œ License

MIT β€” feel free to use, fork, and contribute!

About

πŸš€ Async REST API Processing with PGMQ β€” Queue, Retry, Rate-Limit, DLQ

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published