-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdebug.sql
2975 lines (2720 loc) · 95.4 KB
/
debug.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
DO
$$
BEGIN
IF NOT EXISTS (
SELECT *
FROM pg_catalog.pg_user
WHERE usename = 'pgq_reader') THEN
CREATE ROLE pgq_reader;
END IF;
IF NOT EXISTS (
SELECT *
FROM pg_catalog.pg_user
WHERE usename = 'pgq_writer') THEN
CREATE ROLE pgq_writer;
END IF;
IF NOT EXISTS (
SELECT *
FROM pg_catalog.pg_user
WHERE usename = 'pgq_admin') THEN
CREATE ROLE pgq_admin;
END IF;
END
$$;
-- ----------------------------------------------------------------------
-- Section: Internal Tables
--
-- Overview:
-- pgq.queue - Queue configuration
-- pgq.consumer - Consumer names
-- pgq.subscription - Consumer registrations
-- pgq.tick - Per-queue snapshots (ticks)
-- pgq.event_* - Data tables
-- pgq.retry_queue - Events to be retried later
--
--
-- Standard triggers store events in the pgq.event_* data tables
-- There is one top event table pgq.event_<queue_id> for each queue
-- inherited from pgq.event_template wuith three tables for actual data
-- pgq.event_<queue_id>_0 to pgq.event_<queue_id>_2.
--
-- The active table is rotated at interval, so that if all the consubers
-- have passed some poin the oldes one can be emptied using TRUNCATE command
-- for efficiency
--
--
-- ----------------------------------------------------------------------
set client_min_messages = 'warning';
set default_with_oids = 'off';
-- drop schema if exists pgq cascade;
create schema pgq;
-- ----------------------------------------------------------------------
-- Table: pgq.consumer
--
-- Name to id lookup for consumers
--
-- Columns:
-- co_id - consumer's id for internal usage
-- co_name - consumer's id for external usage
-- ----------------------------------------------------------------------
create table pgq.consumer (
co_id serial,
co_name text not null,
constraint consumer_pkey primary key (co_id),
constraint consumer_name_uq UNIQUE (co_name)
);
-- ----------------------------------------------------------------------
-- Table: pgq.queue
--
-- Information about available queues
--
-- Columns:
-- queue_id - queue id for internal usage
-- queue_name - queue name visible outside
-- queue_ntables - how many data tables the queue has
-- queue_cur_table - which data table is currently active
-- queue_rotation_period - period for data table rotation
-- queue_switch_step1 - tx when rotation happened
-- queue_switch_step2 - tx after rotation was committed
-- queue_switch_time - time when switch happened
-- queue_external_ticker - ticks come from some external sources
-- queue_ticker_paused - ticker is paused
-- queue_disable_insert - disallow pgq.insert_event()
-- queue_ticker_max_count - batch should not contain more events
-- queue_ticker_max_lag - events should not age more
-- queue_ticker_idle_period - how often to tick when no events happen
-- queue_per_tx_limit - Max number of events single TX can insert
-- queue_data_pfx - prefix for data table names
-- queue_event_seq - sequence for event id's
-- queue_tick_seq - sequence for tick id's
-- ----------------------------------------------------------------------
create table pgq.queue (
queue_id serial,
queue_name text not null,
queue_ntables integer not null default 3,
queue_cur_table integer not null default 0,
queue_rotation_period interval not null default '2 hours',
queue_switch_step1 bigint not null default txid_current(),
queue_switch_step2 bigint default txid_current(),
queue_switch_time timestamptz not null default now(),
queue_external_ticker boolean not null default false,
queue_disable_insert boolean not null default false,
queue_ticker_paused boolean not null default false,
queue_ticker_max_count integer not null default 500,
queue_ticker_max_lag interval not null default '3 seconds',
queue_ticker_idle_period interval not null default '1 minute',
queue_per_tx_limit integer,
queue_data_pfx text not null,
queue_event_seq text not null,
queue_tick_seq text not null,
constraint queue_pkey primary key (queue_id),
constraint queue_name_uq unique (queue_name)
);
-- ----------------------------------------------------------------------
-- Table: pgq.tick
--
-- Snapshots for event batching
--
-- Columns:
-- tick_queue - queue id whose tick it is
-- tick_id - ticks id (per-queue)
-- tick_time - time when tick happened
-- tick_snapshot - transaction state
-- tick_event_seq - last value for event seq
-- ----------------------------------------------------------------------
create table pgq.tick (
tick_queue int4 not null,
tick_id bigint not null,
tick_time timestamptz not null default now(),
tick_snapshot txid_snapshot not null default txid_current_snapshot(),
tick_event_seq bigint not null, -- may be NULL on upgraded dbs
constraint tick_pkey primary key (tick_queue, tick_id),
constraint tick_queue_fkey foreign key (tick_queue)
references pgq.queue (queue_id)
);
-- ----------------------------------------------------------------------
-- Sequence: pgq.batch_id_seq
--
-- Sequence for batch id's.
-- ----------------------------------------------------------------------
create sequence pgq.batch_id_seq;
-- ----------------------------------------------------------------------
-- Table: pgq.subscription
--
-- Consumer registration on a queue.
--
-- Columns:
--
-- sub_id - subscription id for internal usage
-- sub_queue - queue id
-- sub_consumer - consumer's id
-- sub_last_tick - last tick the consumer processed
-- sub_batch - shortcut for queue_id/consumer_id/tick_id
-- sub_next_tick - batch end pos
-- ----------------------------------------------------------------------
create table pgq.subscription (
sub_id serial not null,
sub_queue int4 not null,
sub_consumer int4 not null,
sub_last_tick bigint,
sub_active timestamptz not null default now(),
sub_batch bigint,
sub_next_tick bigint,
constraint subscription_pkey primary key (sub_queue, sub_consumer),
constraint subscription_batch_idx unique (sub_batch),
constraint sub_queue_fkey foreign key (sub_queue)
references pgq.queue (queue_id),
constraint sub_consumer_fkey foreign key (sub_consumer)
references pgq.consumer (co_id)
);
-- ----------------------------------------------------------------------
-- Table: pgq.event_template
--
-- Parent table for all event tables
--
-- Columns:
-- ev_id - event's id, supposed to be unique per queue
-- ev_time - when the event was inserted
-- ev_txid - transaction id which inserted the event
-- ev_owner - subscription id that wanted to retry this
-- ev_retry - how many times the event has been retried, NULL for new events
-- ev_type - consumer/producer can specify what the data fields contain
-- ev_data - data field
-- ev_extra1 - extra data field
-- ev_extra2 - extra data field
-- ev_extra3 - extra data field
-- ev_extra4 - extra data field
-- ----------------------------------------------------------------------
create table pgq.event_template (
ev_id bigint not null,
ev_time timestamptz not null,
ev_txid bigint not null default txid_current(),
ev_owner int4,
ev_retry int4,
ev_type text,
ev_data text,
ev_extra1 text,
ev_extra2 text,
ev_extra3 text,
ev_extra4 text
);
-- ----------------------------------------------------------------------
-- Table: pgq.retry_queue
--
-- Events to be retried. When retry time reaches, they will
-- be put back into main queue.
--
-- Columns:
-- ev_retry_after - time when it should be re-inserted to main queue
-- ev_queue - queue id, used to speed up event copy into queue
-- * - same as pgq.event_template
-- ----------------------------------------------------------------------
create table pgq.retry_queue (
ev_retry_after timestamptz not null,
ev_queue int4 not null,
like pgq.event_template,
constraint rq_pkey primary key (ev_owner, ev_id),
constraint rq_queue_id_fkey foreign key (ev_queue)
references pgq.queue (queue_id)
);
alter table pgq.retry_queue alter column ev_owner set not null;
alter table pgq.retry_queue alter column ev_txid drop not null;
create index rq_retry_idx on pgq.retry_queue (ev_retry_after);
create or replace function pgq.batch_event_sql(x_batch_id bigint)
returns text as $$
-- ----------------------------------------------------------------------
-- Function: pgq.batch_event_sql(1)
-- Creates SELECT statement that fetches events for this batch.
--
-- Parameters:
-- x_batch_id - ID of a active batch.
--
-- Returns:
-- SQL statement.
-- ----------------------------------------------------------------------
-- ----------------------------------------------------------------------
-- Algorithm description:
-- Given 2 snapshots, sn1 and sn2 with sn1 having xmin1, xmax1
-- and sn2 having xmin2, xmax2 create expression that filters
-- right txid's from event table.
--
-- Simplest solution would be
-- > WHERE ev_txid >= xmin1 AND ev_txid <= xmax2
-- > AND NOT txid_visible_in_snapshot(ev_txid, sn1)
-- > AND txid_visible_in_snapshot(ev_txid, sn2)
--
-- The simple solution has a problem with long transactions (xmin1 very low).
-- All the batches that happen when the long tx is active will need
-- to scan all events in that range. Here is 2 optimizations used:
--
-- 1) Use [xmax1..xmax2] for range scan. That limits the range to
-- txids that actually happened between two snapshots. For txids
-- in the range [xmin1..xmax1] look which ones were actually
-- committed between snapshots and search for them using exact
-- values using IN (..) list.
--
-- 2) As most TX are short, there could be lot of them that were
-- just below xmax1, but were committed before xmax2. So look
-- if there are ID's near xmax1 and lower the range to include
-- them, thus decresing size of IN (..) list.
-- ----------------------------------------------------------------------
declare
rec record;
sql text;
tbl text;
arr text;
part text;
select_fields text;
retry_expr text;
batch record;
begin
select s.sub_last_tick, s.sub_next_tick, s.sub_id, s.sub_queue,
txid_snapshot_xmax(last.tick_snapshot) as tx_start,
txid_snapshot_xmax(cur.tick_snapshot) as tx_end,
last.tick_snapshot as last_snapshot,
cur.tick_snapshot as cur_snapshot
into batch
from pgq.subscription s, pgq.tick last, pgq.tick cur
where s.sub_batch = x_batch_id
and last.tick_queue = s.sub_queue
and last.tick_id = s.sub_last_tick
and cur.tick_queue = s.sub_queue
and cur.tick_id = s.sub_next_tick;
if not found then
raise exception 'batch not found';
end if;
-- load older transactions
arr := '';
for rec in
-- active tx-es in prev_snapshot that were committed in cur_snapshot
select id1 from
txid_snapshot_xip(batch.last_snapshot) id1 left join
txid_snapshot_xip(batch.cur_snapshot) id2 on (id1 = id2)
where id2 is null
order by 1 desc
loop
-- try to avoid big IN expression, so try to include nearby
-- tx'es into range
if batch.tx_start - 100 <= rec.id1 then
batch.tx_start := rec.id1;
else
if arr = '' then
arr := rec.id1::text;
else
arr := arr || ',' || rec.id1::text;
end if;
end if;
end loop;
-- must match pgq.event_template
select_fields := 'select ev_id, ev_time, ev_txid, ev_retry, ev_type,'
|| ' ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4';
retry_expr := ' and (ev_owner is null or ev_owner = '
|| batch.sub_id::text || ')';
-- now generate query that goes over all potential tables
sql := '';
for rec in
select xtbl from pgq.batch_event_tables(x_batch_id) xtbl
loop
tbl := pgq.quote_fqname(rec.xtbl);
-- this gets newer queries that definitely are not in prev_snapshot
part := select_fields
|| ' from pgq.tick cur, pgq.tick last, ' || tbl || ' ev '
|| ' where cur.tick_id = ' || batch.sub_next_tick::text
|| ' and cur.tick_queue = ' || batch.sub_queue::text
|| ' and last.tick_id = ' || batch.sub_last_tick::text
|| ' and last.tick_queue = ' || batch.sub_queue::text
|| ' and ev.ev_txid >= ' || batch.tx_start::text
|| ' and ev.ev_txid <= ' || batch.tx_end::text
|| ' and txid_visible_in_snapshot(ev.ev_txid, cur.tick_snapshot)'
|| ' and not txid_visible_in_snapshot(ev.ev_txid, last.tick_snapshot)'
|| retry_expr;
-- now include older tx-es, that were ongoing
-- at the time of prev_snapshot
if arr <> '' then
part := part || ' union all '
|| select_fields || ' from ' || tbl || ' ev '
|| ' where ev.ev_txid in (' || arr || ')'
|| retry_expr;
end if;
if sql = '' then
sql := part;
else
sql := sql || ' union all ' || part;
end if;
end loop;
if sql = '' then
raise exception 'could not construct sql for batch %', x_batch_id;
end if;
return sql || ' order by 1';
end;
$$ language plpgsql; -- no perms needed
create or replace function pgq.batch_event_tables(x_batch_id bigint)
returns setof text as $$
-- ----------------------------------------------------------------------
-- Function: pgq.batch_event_tables(1)
--
-- Returns set of table names where this batch events may reside.
--
-- Parameters:
-- x_batch_id - ID of a active batch.
-- ----------------------------------------------------------------------
declare
nr integer;
tbl text;
use_prev integer;
use_next integer;
batch record;
begin
select
txid_snapshot_xmin(last.tick_snapshot) as tx_min, -- absolute minimum
txid_snapshot_xmax(cur.tick_snapshot) as tx_max, -- absolute maximum
q.queue_data_pfx, q.queue_ntables,
q.queue_cur_table, q.queue_switch_step1, q.queue_switch_step2
into batch
from pgq.tick last, pgq.tick cur, pgq.subscription s, pgq.queue q
where cur.tick_id = s.sub_next_tick
and cur.tick_queue = s.sub_queue
and last.tick_id = s.sub_last_tick
and last.tick_queue = s.sub_queue
and s.sub_batch = x_batch_id
and q.queue_id = s.sub_queue;
if not found then
raise exception 'Cannot find data for batch %', x_batch_id;
end if;
-- if its definitely not in one or other, look into both
if batch.tx_max < batch.queue_switch_step1 then
use_prev := 1;
use_next := 0;
elsif batch.queue_switch_step2 is not null
and (batch.tx_min > batch.queue_switch_step2)
then
use_prev := 0;
use_next := 1;
else
use_prev := 1;
use_next := 1;
end if;
if use_prev then
nr := batch.queue_cur_table - 1;
if nr < 0 then
nr := batch.queue_ntables - 1;
end if;
tbl := batch.queue_data_pfx || '_' || nr::text;
return next tbl;
end if;
if use_next then
tbl := batch.queue_data_pfx || '_' || batch.queue_cur_table::text;
return next tbl;
end if;
return;
end;
$$ language plpgsql; -- no perms needed
create or replace function pgq.batch_retry(
i_batch_id bigint,
i_retry_seconds integer)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.batch_retry(2)
--
-- Put whole batch into retry queue, to be processed again later.
--
-- Parameters:
-- i_batch_id - ID of active batch.
-- i_retry_time - Time when the event should be put back into queue
--
-- Returns:
-- number of events inserted
-- Calls:
-- None
-- Tables directly manipulated:
-- pgq.retry_queue
-- ----------------------------------------------------------------------
declare
_retry timestamptz;
_cnt integer;
_s record;
begin
_retry := current_timestamp + ((i_retry_seconds::text || ' seconds')::interval);
select * into _s from pgq.subscription where sub_batch = i_batch_id;
if not found then
raise exception 'batch_retry: batch % not found', i_batch_id;
end if;
insert into pgq.retry_queue (ev_retry_after, ev_queue,
ev_id, ev_time, ev_txid, ev_owner, ev_retry,
ev_type, ev_data, ev_extra1, ev_extra2,
ev_extra3, ev_extra4)
select distinct _retry, _s.sub_queue,
b.ev_id, b.ev_time, NULL::int8, _s.sub_id, coalesce(b.ev_retry, 0) + 1,
b.ev_type, b.ev_data, b.ev_extra1, b.ev_extra2,
b.ev_extra3, b.ev_extra4
from pgq.get_batch_events(i_batch_id) b
left join pgq.retry_queue rq
on (rq.ev_id = b.ev_id
and rq.ev_owner = _s.sub_id
and rq.ev_queue = _s.sub_queue)
where rq.ev_id is null;
GET DIAGNOSTICS _cnt = ROW_COUNT;
return _cnt;
end;
$$ language plpgsql security definer;
create or replace function pgq.create_queue(i_queue_name text)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.create_queue(1)
--
-- Creates new queue with given name.
--
-- Returns:
-- 0 - queue already exists
-- 1 - queue created
-- Calls:
-- pgq.grant_perms(i_queue_name);
-- pgq.ticker(i_queue_name);
-- pgq.tune_storage(i_queue_name);
-- Tables directly manipulated:
-- insert - pgq.queue
-- create - pgq.event_N () inherits (pgq.event_template)
-- create - pgq.event_N_0 .. pgq.event_N_M () inherits (pgq.event_N)
-- ----------------------------------------------------------------------
declare
tblpfx text;
tblname text;
idxpfx text;
idxname text;
sql text;
id integer;
tick_seq text;
ev_seq text;
n_tables integer;
begin
if i_queue_name is null then
raise exception 'Invalid NULL value';
end if;
-- check if exists
perform 1 from pgq.queue where queue_name = i_queue_name;
if found then
return 0;
end if;
-- insert event
id := nextval('pgq.queue_queue_id_seq');
tblpfx := 'pgq.event_' || id::text;
idxpfx := 'event_' || id::text;
tick_seq := 'pgq.event_' || id::text || '_tick_seq';
ev_seq := 'pgq.event_' || id::text || '_id_seq';
insert into pgq.queue (queue_id, queue_name,
queue_data_pfx, queue_event_seq, queue_tick_seq)
values (id, i_queue_name, tblpfx, ev_seq, tick_seq);
select queue_ntables into n_tables from pgq.queue
where queue_id = id;
-- create seqs
execute 'CREATE SEQUENCE ' || pgq.quote_fqname(tick_seq);
execute 'CREATE SEQUENCE ' || pgq.quote_fqname(ev_seq);
-- create data tables
execute 'CREATE TABLE ' || pgq.quote_fqname(tblpfx) || ' () '
|| ' INHERITS (pgq.event_template)';
for i in 0 .. (n_tables - 1) loop
tblname := tblpfx || '_' || i::text;
idxname := idxpfx || '_' || i::text || '_txid_idx';
execute 'CREATE TABLE ' || pgq.quote_fqname(tblname) || ' () '
|| ' INHERITS (' || pgq.quote_fqname(tblpfx) || ')';
execute 'ALTER TABLE ' || pgq.quote_fqname(tblname) || ' ALTER COLUMN ev_id '
|| ' SET DEFAULT nextval(' || quote_literal(ev_seq) || ')';
execute 'create index ' || quote_ident(idxname) || ' on '
|| pgq.quote_fqname(tblname) || ' (ev_txid)';
end loop;
perform pgq.grant_perms(i_queue_name);
perform pgq.ticker(i_queue_name);
perform pgq.tune_storage(i_queue_name);
return 1;
end;
$$ language plpgsql security definer;
create or replace function pgq.current_event_table(x_queue_name text)
returns text as $$
-- ----------------------------------------------------------------------
-- Function: pgq.current_event_table(1)
--
-- Return active event table for particular queue.
-- Event can be added to it without going via functions,
-- e.g. by COPY.
--
-- If queue is disabled and GUC session_replication_role <> 'replica'
-- then raises exception.
--
-- or expressed in a different way - an even table of a disabled queue
-- is returned only on replica
--
-- Note:
-- The result is valid only during current transaction.
--
-- Permissions:
-- Actual insertion requires superuser access.
--
-- Parameters:
-- x_queue_name - Queue name.
-- ----------------------------------------------------------------------
declare
res text;
disabled boolean;
begin
select queue_data_pfx || '_' || queue_cur_table::text,
queue_disable_insert
into res, disabled
from pgq.queue where queue_name = x_queue_name;
if not found then
raise exception 'Event queue not found';
end if;
if disabled then
if current_setting('session_replication_role') <> 'replica' then
raise exception 'Writing to queue disabled';
end if;
end if;
return res;
end;
$$ language plpgsql; -- no perms needed
create or replace function pgq.drop_queue(x_queue_name text, x_force bool)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.drop_queue(2)
--
-- Drop queue and all associated tables.
--
-- Parameters:
-- x_queue_name - queue name
-- x_force - ignore (drop) existing consumers
-- Returns:
-- 1 - success
-- Calls:
-- pgq.unregister_consumer(queue_name, consumer_name)
-- perform pgq.ticker(i_queue_name);
-- perform pgq.tune_storage(i_queue_name);
-- Tables directly manipulated:
-- delete - pgq.queue
-- drop - pgq.event_N (), pgq.event_N_0 .. pgq.event_N_M
-- ----------------------------------------------------------------------
declare
tblname text;
q record;
num integer;
begin
-- check if exists
select * into q from pgq.queue
where queue_name = x_queue_name
for update;
if not found then
raise exception 'No such event queue';
end if;
if x_force then
perform pgq.unregister_consumer(queue_name, consumer_name)
from pgq.get_consumer_info(x_queue_name);
else
-- check if no consumers
select count(*) into num from pgq.subscription
where sub_queue = q.queue_id;
if num > 0 then
raise exception 'cannot drop queue, consumers still attached';
end if;
end if;
-- drop data tables
for i in 0 .. (q.queue_ntables - 1) loop
tblname := q.queue_data_pfx || '_' || i::text;
execute 'DROP TABLE ' || pgq.quote_fqname(tblname);
end loop;
execute 'DROP TABLE ' || pgq.quote_fqname(q.queue_data_pfx);
-- delete ticks
delete from pgq.tick where tick_queue = q.queue_id;
-- drop seqs
-- FIXME: any checks needed here?
execute 'DROP SEQUENCE ' || pgq.quote_fqname(q.queue_tick_seq);
execute 'DROP SEQUENCE ' || pgq.quote_fqname(q.queue_event_seq);
-- delete event
delete from pgq.queue
where queue_name = x_queue_name;
return 1;
end;
$$ language plpgsql security definer;
create or replace function pgq.drop_queue(x_queue_name text)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.drop_queue(1)
--
-- Drop queue and all associated tables.
-- No consumers must be listening on the queue.
--
-- ----------------------------------------------------------------------
begin
return pgq.drop_queue(x_queue_name, false);
end;
$$ language plpgsql strict;
create or replace function pgq.event_retry(
x_batch_id bigint,
x_event_id bigint,
x_retry_time timestamptz)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.event_retry(3a)
--
-- Put the event into retry queue, to be processed again later.
--
-- Parameters:
-- x_batch_id - ID of active batch.
-- x_event_id - event id
-- x_retry_time - Time when the event should be put back into queue
--
-- Returns:
-- 1 - success
-- 0 - event already in retry queue
-- Calls:
-- None
-- Tables directly manipulated:
-- insert - pgq.retry_queue
-- ----------------------------------------------------------------------
begin
insert into pgq.retry_queue (ev_retry_after, ev_queue,
ev_id, ev_time, ev_txid, ev_owner, ev_retry, ev_type, ev_data,
ev_extra1, ev_extra2, ev_extra3, ev_extra4)
select x_retry_time, sub_queue,
ev_id, ev_time, NULL, sub_id, coalesce(ev_retry, 0) + 1,
ev_type, ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4
from pgq.get_batch_events(x_batch_id),
pgq.subscription
where sub_batch = x_batch_id
and ev_id = x_event_id;
if not found then
raise exception 'event not found';
end if;
return 1;
-- dont worry if the event is already in queue
exception
when unique_violation then
return 0;
end;
$$ language plpgsql security definer;
create or replace function pgq.event_retry(
x_batch_id bigint,
x_event_id bigint,
x_retry_seconds integer)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.event_retry(3b)
--
-- Put the event into retry queue, to be processed later again.
--
-- Parameters:
-- x_batch_id - ID of active batch.
-- x_event_id - event id
-- x_retry_seconds - Time when the event should be put back into queue
--
-- Returns:
-- 1 - success
-- 0 - event already in retry queue
-- Calls:
-- pgq.event_retry(3a)
-- Tables directly manipulated:
-- None
-- ----------------------------------------------------------------------
declare
new_retry timestamptz;
begin
new_retry := current_timestamp + ((x_retry_seconds::text || ' seconds')::interval);
return pgq.event_retry(x_batch_id, x_event_id, new_retry);
end;
$$ language plpgsql security definer;
create or replace function pgq.event_retry_raw(
x_queue text,
x_consumer text,
x_retry_after timestamptz,
x_ev_id bigint,
x_ev_time timestamptz,
x_ev_retry integer,
x_ev_type text,
x_ev_data text,
x_ev_extra1 text,
x_ev_extra2 text,
x_ev_extra3 text,
x_ev_extra4 text)
returns bigint as $$
-- ----------------------------------------------------------------------
-- Function: pgq.event_retry_raw(12)
--
-- Allows full control over what goes to retry queue.
--
-- Parameters:
-- x_queue - name of the queue
-- x_consumer - name of the consumer
-- x_retry_after - when the event should be processed again
-- x_ev_id - event id
-- x_ev_time - creation time
-- x_ev_retry - retry count
-- x_ev_type - user data
-- x_ev_data - user data
-- x_ev_extra1 - user data
-- x_ev_extra2 - user data
-- x_ev_extra3 - user data
-- x_ev_extra4 - user data
--
-- Returns:
-- Event ID.
-- ----------------------------------------------------------------------
declare
q record;
id bigint;
begin
select sub_id, queue_event_seq, sub_queue into q
from pgq.consumer, pgq.queue, pgq.subscription
where queue_name = x_queue
and co_name = x_consumer
and sub_consumer = co_id
and sub_queue = queue_id;
if not found then
raise exception 'consumer not registered';
end if;
id := x_ev_id;
if id is null then
id := nextval(q.queue_event_seq);
end if;
insert into pgq.retry_queue (ev_retry_after, ev_queue,
ev_id, ev_time, ev_owner, ev_retry,
ev_type, ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4)
values (x_retry_after, q.sub_queue,
id, x_ev_time, q.sub_id, x_ev_retry,
x_ev_type, x_ev_data, x_ev_extra1, x_ev_extra2,
x_ev_extra3, x_ev_extra4);
return id;
end;
$$ language plpgsql security definer;
create or replace function pgq.find_tick_helper(
in i_queue_id int4,
in i_prev_tick_id int8,
in i_prev_tick_time timestamptz,
in i_prev_tick_seq int8,
in i_min_count int8,
in i_min_interval interval,
out next_tick_id int8,
out next_tick_time timestamptz,
out next_tick_seq int8)
as $$
-- ----------------------------------------------------------------------
-- Function: pgq.find_tick_helper(6)
--
-- Helper function for pgq.next_batch_custom() to do extended tick search.
-- ----------------------------------------------------------------------
declare
sure boolean;
can_set boolean;
t record;
cnt int8;
ival interval;
begin
-- first, fetch last tick of the queue
select tick_id, tick_time, tick_event_seq into t
from pgq.tick
where tick_queue = i_queue_id
and tick_id > i_prev_tick_id
order by tick_queue desc, tick_id desc
limit 1;
if not found then
return;
end if;
-- check whether batch would end up within reasonable limits
sure := true;
can_set := false;
if i_min_count is not null then
cnt = t.tick_event_seq - i_prev_tick_seq;
if cnt >= i_min_count then
can_set := true;
end if;
if cnt > i_min_count * 2 then
sure := false;
end if;
end if;
if i_min_interval is not null then
ival = t.tick_time - i_prev_tick_time;
if ival >= i_min_interval then
can_set := true;
end if;
if ival > i_min_interval * 2 then
sure := false;
end if;
end if;
-- if last tick too far away, do large scan
if not sure then
select tick_id, tick_time, tick_event_seq into t
from pgq.tick
where tick_queue = i_queue_id
and tick_id > i_prev_tick_id
and ((i_min_count is not null and (tick_event_seq - i_prev_tick_seq) >= i_min_count)
or
(i_min_interval is not null and (tick_time - i_prev_tick_time) >= i_min_interval))
order by tick_queue asc, tick_id asc
limit 1;
can_set := true;
end if;
if can_set then
next_tick_id := t.tick_id;
next_tick_time := t.tick_time;
next_tick_seq := t.tick_event_seq;
end if;
return;
end;
$$ language plpgsql stable;
create or replace function pgq.finish_batch(
x_batch_id bigint)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.finish_batch(1)
--
-- Closes a batch. No more operations can be done with events
-- of this batch.
--
-- Parameters:
-- x_batch_id - id of batch.
--
-- Returns:
-- 1 if batch was found, 0 otherwise.
-- Calls:
-- None
-- Tables directly manipulated:
-- update - pgq.subscription
-- ----------------------------------------------------------------------
begin
update pgq.subscription
set sub_active = now(),
sub_last_tick = sub_next_tick,
sub_next_tick = null,
sub_batch = null
where sub_batch = x_batch_id;
if not found then
raise warning 'finish_batch: batch % not found', x_batch_id;
return 0;
end if;
return 1;
end;
$$ language plpgsql security definer;
create or replace function pgq.force_tick(i_queue_name text)
returns bigint as $$
-- ----------------------------------------------------------------------
-- Function: pgq.force_tick(2)
--
-- Simulate lots of events happening to force ticker to tick.
--
-- Should be called in loop, with some delay until last tick
-- changes or too much time is passed.
--
-- Such function is needed because paraller calls of pgq.ticker() are
-- dangerous, and cannot be protected with locks as snapshot
-- is taken before locking.
--
-- Parameters:
-- i_queue_name - Name of the queue
--
-- Returns:
-- Currently last tick id.
-- ----------------------------------------------------------------------
declare