A library that provides a wrapper of PGMQ, a PostgreSQL message queue implementation, making it easier to integrate into your application. As designed you can implement your own database access layer by implementing an Adapter
conforming to a simple protocol. The provided adapter utilizes HikariCP and next.jdbc.
-
Documentation This is documentation that will contain explanations of the functions as well as usage examples when appropriate. It is generated from the docstrings present in code generated by quickdoc. This documentation has a complete table of contents.
-
Specs This project uses clojure.spec.alpha to provide a means of validating expectations. Using this project, leveraging the
clojure.spec.alpha/describe
, documentation is generated describing functions, return types, and their expected inputs. They are organized by a namespaced function name followed by the arguments of that function.
This will build a jar that can be used directly in your applications.
# babashka
bb jar
# build tools
clj -T:build all
This will run all tests. Tests currently include an integration test using test containers against postgresql 15, 16, and 17.
# babashka
bb test
bb test coverage
bb test watch
# build tools
clj -M:test --profile test
clj -M:test --profile coverage
clj -M:test --profile watch
This will update the local depencies in deps.edn
and bb.edn
.
# babashka
bb upgrade
# build tools
clj -M:upgrade
com.thirstysink.pgmq-clj.core
archive-messages
- Archives messagesmsg-ids
in a queue namedqueue-name
using a givenadapter
.create-queue
- Create a queue namedqueue-name
using a givenadapter
.delete-message
- Permanently deletes message with idmsg-id
in the queue namedqueue-name
using a givenadapter
.delete-message-batch
- Deletes allmsg-ids
messages in queuequeue-name
using a givenadapter
.drop-queue
- Drop queue namedqueue-name
using a givenadapter
.list-queues
- List all queues using a givenadapter
.pop-message
- Pops one message from the queue namedqueue-name
using a givenadapter
.read-message
- Read aquantity
of messages fromqueue-name
marking them invisible forvisible_time
seconds using a givenadapter
.send-message
- Send one message to a queuequeue-name
with apayload
that will not be read fordelay
seconds using a givenadapter
.send-message-batch
- Sendspayload
to the queue namedqueue-name
as a collection of messages that cannot be read fordelay
seconds using a givenadapter
.
com.thirstysink.pgmq-clj.db.adapter
Adapter
close
- Performs database connection cleanup usingthis
.execute!
- Execute asql
statement andparams
with 0 or more return values usingthis
.execute-one!
- Execute asql
statement andparams
with 0 or 1 return values usingthis
.query
- Query the database with a givensql
,params
, and return results usingthis
.with-transaction
- Wrap a functionf
in a database transaction usingthis
.
com.thirstysink.pgmq-clj.db.adapters.hikari-adapter
->pgobject
- Transforms Clojure data to a PGobjectx
that contains the data as JSON.<-pgobject
- Transform PGobjectv
containingjson
orjsonb
value to Clojure data.ensure-pgmq-extension
- Checks the database to verify that thepgmq
extension is installed using theadapter
.make-hikari-adapter
- Create a newHikariAdapter
instance.
com.thirstysink.pgmq-clj.instrumentation
disable-instrumentation
- Disablesclojure.specs.alpha
specs instrumentation.enable-instrumentation
- Enablesclojure.specs.alpha
specs instrumentation.instrumentation-enabled?
- A flag that indicates if instrumentation is enabled.
com.thirstysink.pgmq-clj.json
->json
- Returns a JSON-encoding String for the given Clojure object.
com.thirstysink.pgmq-clj.specs
(archive-messages adapter queue-name msg-ids)
Function.
Archives messages msg-ids
in a queue named queue-name
using a given adapter
.
This will remove the message from queue-name
and place it in a archive table
which is named a_{queue-name}
.
Example: (core/archive-messages adapter "test-queue" [3]) ;; => ()
(create-queue adapter queue-name)
Function.
Create a queue named queue-name
using a given adapter
.
Example:
(core/create-queue adapter "test-queue")
;; => nil
(delete-message adapter queue-name msg-id)
Function.
Permanently deletes message with id msg-id
in the queue
named queue-name
using a given adapter
.
Example: (core/delete-message adapter "test-queue" 3) ;; => true
(delete-message-batch adapter queue-name msg-ids)
Function.
Deletes all msg-ids
messages in queue queue-name
using a given adapter
.
Example: (core/delete-message-batch adapter "test-queue" [2 5 6]) ;; => [2 5 6]
(drop-queue adapter queue-name)
Function.
Drop queue named queue-name
using a given adapter
.
Example:
(core/drop-queue adapter "test-queue-2")
;; => true
(list-queues adapter)
Function.
List all queues using a given adapter
.
Example:
(core/list-queues adapter)
;; => [{:queue-name "test-queue",
:is-partitioned false,
:is-unlogged false,
:created-at
#object[java.time.Instant 0x680b0f16 "2025-03-20T01:01:42.842248Z"]}
{:queue-name "test-queue-2",
:is-partitioned false,
:is-unlogged false,
:created-at
#object[java.time.Instant 0x45e79bdf "2025-03-20T01:01:46.292274Z"]}
{:queue-name "test-queue-3",
:is-partitioned false,
:is-unlogged false,
:created-at
#object[java.time.Instant 0x19767429 "2025-03-20T01:01:54.665295Z"]}]
(pop-message adapter queue-name)
Function.
Pops one message from the queue named queue-name
using a given adapter
. The side-effect of
this function is equivalent to reading and deleting a message. See also
[[read-message]] and [[delete-message]].
Example: (core/pop-message adapter "test-queue") ;; => {:msg-id 1, :read-ct 0, :enqueued-at #object[java.time.Instant 0x79684534 "2025-03-20T01:29:15.298975Z"], :vt #object[java.time.Instant 0x391acb50 "2025-03-20T01:30:45.300696Z"], :message {:user-id "0f83fbeb-345b-41ca-bbec-3bace0cff5b4", :order-count 12}, :headers {:TENANT "b5bda77b-8283-4a6d-8de8-40a5041a60ee"}
(read-message adapter queue-name visible_time quantity filter)
Function.
Read a quantity
of messages from queue-name
marking them invisible for
visible_time
seconds using a given adapter
. This function supports the
ability to filter
messages received when making a read request.
Here are some examples of how this conditional works:
-
If conditional is an empty JSON object ('{}'::jsonb), the condition always evaluates to TRUE, and all messages are considered matching.
-
If conditional is a JSON object with a single key-value pair, such as {'key': 'value'}, the condition checks if the message column contains a JSON object with the same key-value pair. For example:
message = {'key': 'value', 'other_key': 'other_value'}: // matches
message = {'other_key': 'other_value'}: // does not match
- If conditional is a JSON object with multiple key-value pairs, such as {'key1': 'value1', 'key2': 'value2'}, the condition checks if the message column contains a JSON object with all the specified key-value pairs. For example:
message = {'key1': 'value1', 'key2': 'value2', 'other_key': 'other_value'}: // matches
message = {'key1': 'value1', 'other_key': 'other_value'}: // does not match
Some examples of conditional JSONB values and their effects on the query:
{}
: matches all messages{'type': 'error'}
: matches messages with a type key equal to 'error'{'type': 'error', 'severity': 'high'}
: matches messages with both type equal to 'error' and severity equal to 'high'{'user_id': 123}
: matches messages with a user_id key equal to 123
Example: (core/read-message adapter "test-queue" 10 88 nil) ;; => ({:msg-id 2, :read-ct 1, :enqueued-at #object[java.time.Instant 0x5f794b3d "2025-03-21T01:14:00.831673Z"], :vt #object[java.time.Instant 0x3fcde164 "2025-03-21T01:15:32.988540Z"], :message {:user-id "0f83fbeb-345b-41ca-bbec-3bace0cff5b4", :order-count 12}, :headers {:TENANT "b5bda77b-8283-4a6d-8de8-40a5041a60ee"}})
(send-message adapter queue-name payload delay)
Function.
Send one message to a queue queue-name
with a payload
that will not be read for delay
seconds using a given adapter
.
A delay
of 0 indicates it may be read immediately.
Example Payloads:
{:data {:foo "bad"} :headers {:x-data "baz"}}
{:data "feed" :headers {:version "3"}}
Example: (core/send-message adapter "test-queue" {:data {:order-count 12 :user-id "0f83fbeb-345b-41ca-bbec-3bace0cff5b4"} :headers {:TENANT "b5bda77b-8283-4a6d-8de8-40a5041a60ee"}} 90) ;; => 1
(send-message-batch adapter queue-name payload delay)
Function.
Sends payload
to the queue named queue-name
as a collection of messages
that cannot be read for delay
seconds using a given adapter
. The payload
should be a sequence of valid JSON objects. See also [[send-message]].
Example Payloads:
[{:data {:foo "bar"} :headers {:x-data "bat"}}]
[{:data 10002 :headers {}} {:data "feed" :headers {:version "2"}} ]
Example: (core/send-message-batch adapter "test-queue" [{:data {:order-count 12 :user-id "0f83fbeb-345b-41ca-bbec-3bace0cff5b4"} :headers {:X-SESS-ID "b5bda77b-8283-4a6d-8de8-40a5041a60ee"}} {:data {:order-count 12 :user-id "da04bf11-018f-45c4-908f-62c33b6e8aa6"} :headers {:X-SESS-ID "b0ef0d6a-e587-4c28-b995-1efe8cb31c9e"}}] 15) ;; => [5 6]
(close this)
Function.
Performs database connection cleanup using this
.
(execute! this sql params)
Function.
Execute a sql
statement and params
with 0 or more return values using this
.
(execute-one! this sql params)
Function.
Execute a sql
statement and params
with 0 or 1 return values using this
.
(query this sql params)
Function.
Query the database with a given sql
, params
, and return results using this
.
(with-transaction this f)
Function.
Wrap a function f
in a database transaction using this
.
(->pgobject x)
Function.
Transforms Clojure data to a PGobject x
that contains the data as
JSON. PGObject type defaults to jsonb
but can be changed via
metadata key :pgtype
(<-pgobject v)
Function.
Transform PGobject v
containing json
or jsonb
value to Clojure data.
(ensure-pgmq-extension adapter)
Function.
Checks the database to verify that the pgmq
extension is installed
using the adapter
. If it is not then it will throw an exception.
Example:
(hikari/ensure-pgmq-extension adapter)
;; => nil
(make-hikari-adapter config)
Function.
Create a new HikariAdapter
instance. The argument config
provides database connection values. See https://github.com/tomekw/hikari-cp
for additional details on the configuration options.
Setting | Description |
---|---|
jdbc-url | This property sets the JDBC connection URL. |
username | This property sets the default authentication username used when obtaining Connections from the underlying driver. |
password | This property sets the default authentication password used when obtaining Connections from the underlying driver. |
maximum-pool-size | This property controls the maximum size that the pool is allowed to reach, including both idle and in-use connections. |
minimum-idle | This property controls the minimum number of idle connections that HikariCP tries to maintain in the pool. |
Example:
(def adapter (hikari/make-hikari-adapter {:jdbc-url "jdbc:postgresql://0.0.0.0:5432/postgres" :username "postgres" :password "postgres"}))
;; => #'user/adapter
(disable-instrumentation)
(disable-instrumentation ns)
Function.
Disables clojure.specs.alpha
specs instrumentation. If
no namespace ns
is provided it will disable instrumentation
for com.thirstysink.pgmq-clj.core
.
(enable-instrumentation)
(enable-instrumentation ns)
Function.
Enables clojure.specs.alpha
specs instrumentation. If
no namespace ns
is provided it will instrument
com.thirstysink.pgmq-clj.core
.
A flag that indicates if instrumentation is enabled.
This is determined by the value of the environment variable PGMQCLJ_INSTRUMENTAION_ENABLED
.
If the environment variable is set, the value will be true; otherwise, false.
Returns a JSON-encoding String for the given Clojure object. Takes an optional date format string that Date objects will be encoded with.
The default date format (in UTC) is: yyyy-MM-dd'T'HH:mm:ss'Z'
(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name :msg-id :com.thirstysink.pgmq-clj.specs/msg-id) :ret boolean? :fn nil)
(or :string string? :keyword keyword?)
boolean?
(instance? java.time.Instant %)
(and number? pos?)
(instance? java.time.Instant %)
int?
(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name :visibility_time :com.thirstysink.pgmq-clj.specs/visibility_time :quantity :com.thirstysink.pgmq-clj.specs/quantity :filter :com.thirstysink.pgmq-clj.specs/json) :ret :com.thirstysink.pgmq-clj.specs/message-records :fn nil)
(conformer (zipmap (map :clojure.spec.alpha/k %) (map :clojure.spec.alpha/v %)) (map (fn [[k v]] #:clojure.spec.alpha{:k k, :v v}) %))
(fn [x] (or (map? x) (vector? x) (string? x) (number? x) (boolean? x) (nil? x)))
(fn [x] (or (map? x) (vector? x) (string? x) (number? x) (boolean? x) (nil? x)))
(coll-of :com.thirstysink.pgmq-clj.specs/msg-id)
(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name :payload :com.thirstysink.pgmq-clj.specs/payload-objects :delay :com.thirstysink.pgmq-clj.specs/delay) :ret :com.thirstysink.pgmq-clj.specs/msg-ids :fn nil)
(fn [x] (fn* [] (instance? java.time.Instant x)))
(nilable (map-of :com.thirstysink.pgmq-clj.specs/header-key :com.thirstysink.pgmq-clj.specs/header-value :min-count 0))
(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name :msg-ids :com.thirstysink.pgmq-clj.specs/msg-ids) :ret :com.thirstysink.pgmq-clj.specs/msg-ids :fn nil)
valid-queue-name?
boolean?
(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name) :ret nil :fn nil)
(and :com.thirstysink.pgmq-clj.specs/msg-ids (complement empty?))
(coll-of :com.thirstysink.pgmq-clj.specs/queue-record)
(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name) :ret :com.thirstysink.pgmq-clj.specs/message-record :fn nil)
(and int? (>= % 0))
(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name :msg-ids :com.thirstysink.pgmq-clj.specs/non-empty-msg-ids) :ret :com.thirstysink.pgmq-clj.specs/msg-ids :fn nil)
(coll-of :com.thirstysink.pgmq-clj.specs/payload-object)
(keys :req-un [:com.thirstysink.pgmq-clj.specs/data :com.thirstysink.pgmq-clj.specs/headers])
(satisfies? Adapter %)
int?
(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name) :ret boolean? :fn nil)
(keys :req-un [:com.thirstysink.pgmq-clj.specs/msg-id :com.thirstysink.pgmq-clj.specs/read-ct :com.thirstysink.pgmq-clj.specs/enqueued-at :com.thirstysink.pgmq-clj.specs/vt :com.thirstysink.pgmq-clj.specs/message] :opt-un [:com.thirstysink.pgmq-clj.specs/headers])
(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter) :ret :com.thirstysink.pgmq-clj.specs/queue-result :fn nil)
(or :string string? :number number? :list (coll-of (or :string string? :number number?)))
(instance? java.time.Instant %)
(fn [x] (or (map? x) (vector? x) (string? x) (number? x) (boolean? x) (nil? x)))
(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name :payload :com.thirstysink.pgmq-clj.specs/payload-object :delay :com.thirstysink.pgmq-clj.specs/delay) :ret :com.thirstysink.pgmq-clj.specs/msg-id :fn nil)
(keys :req-un [:com.thirstysink.pgmq-clj.specs/queue-name :com.thirstysink.pgmq-clj.specs/is-partitioned :com.thirstysink.pgmq-clj.specs/is-unlogged :com.thirstysink.pgmq-clj.specs/created-at])
(and int? (> % 0))
(coll-of :com.thirstysink.pgmq-clj.specs/mesage-record)