-
Notifications
You must be signed in to change notification settings - Fork 179
/
Copy pathpg_repack.c
2381 lines (2080 loc) · 68.5 KB
/
pg_repack.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* pg_repack.c: bin/pg_repack.c
*
* Portions Copyright (c) 2008-2011, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
* Portions Copyright (c) 2011, Itagaki Takahiro
* Portions Copyright (c) 2012-2020, The Reorg Development Team
*/
/**
* @brief Client Modules
*/
const char *PROGRAM_URL = "https://reorg.github.io/pg_repack/";
const char *PROGRAM_ISSUES = "https://github.com/reorg/pg_repack/issues";
#ifdef REPACK_VERSION
/* macro trick to stringify a macro expansion */
#define xstr(s) str(s)
#define str(s) #s
const char *PROGRAM_VERSION = xstr(REPACK_VERSION);
#else
const char *PROGRAM_VERSION = "unknown";
#endif
#include "pgut/pgut-fe.h"
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
/*
* APPLY_COUNT: Number of applied logs per transaction. Larger values
* could be faster, but will be long transactions in the REDO phase.
*/
#define APPLY_COUNT 1000
/* Once we get down to seeing fewer than this many tuples in the
* log table, we'll say that we're ready to perform the switch.
*/
#define SWITCH_THRESHOLD_DEFAULT 100
/* poll() or select() timeout, in seconds */
#define POLL_TIMEOUT 3
/* Compile an array of existing transactions which are active during
* pg_repack's setup. Some transactions we can safely ignore:
* a. The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted
* servers. See https://github.com/reorg/pg_reorg/issues/1
* b. Our own database connections
* c. Other pg_repack clients, as distinguished by application_name, which
* may be operating on other tables at the same time. See
* https://github.com/reorg/pg_repack/issues/1
* d. open transactions/locks existing on other databases than the actual
* processing relation (except for locks on shared objects)
* e. VACUUMs which are always executed outside transaction blocks.
*
* Note, there is some redundancy in how the filtering is done (e.g. excluding
* based on pg_backend_pid() and application_name), but that shouldn't hurt
* anything. Also, the test of application_name is not bulletproof -- for
* instance, the application name when running installcheck will be
* pg_regress.
*/
#define SQL_XID_SNAPSHOT_90200 \
"SELECT coalesce(array_agg(l.virtualtransaction), '{}') " \
" FROM pg_locks AS l " \
" LEFT JOIN pg_stat_activity AS a " \
" ON l.pid = a.pid " \
" LEFT JOIN pg_database AS d " \
" ON a.datid = d.oid " \
" WHERE l.locktype = 'virtualxid' " \
" AND l.pid NOT IN (pg_backend_pid(), $1) " \
" AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
" AND (a.application_name IS NULL OR a.application_name <> $2)" \
" AND a.query !~* E'^\\\\s*vacuum\\\\s+' " \
" AND a.query !~ E'^autovacuum: ' " \
" AND ((d.datname IS NULL OR d.datname = current_database()) OR l.database = 0)"
#define SQL_XID_SNAPSHOT_90000 \
"SELECT coalesce(array_agg(l.virtualtransaction), '{}') " \
" FROM pg_locks AS l " \
" LEFT JOIN pg_stat_activity AS a " \
" ON l.pid = a.procpid " \
" LEFT JOIN pg_database AS d " \
" ON a.datid = d.oid " \
" WHERE l.locktype = 'virtualxid' " \
" AND l.pid NOT IN (pg_backend_pid(), $1) " \
" AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
" AND (a.application_name IS NULL OR a.application_name <> $2)" \
" AND a.current_query !~* E'^\\\\s*vacuum\\\\s+' " \
" AND a.current_query !~ E'^autovacuum: ' " \
" AND ((d.datname IS NULL OR d.datname = current_database()) OR l.database = 0)"
/* application_name is not available before 9.0. The last clause of
* the WHERE clause is just to eat the $2 parameter (application name).
*/
#define SQL_XID_SNAPSHOT_80300 \
"SELECT coalesce(array_agg(l.virtualtransaction), '{}') " \
" FROM pg_locks AS l" \
" LEFT JOIN pg_stat_activity AS a " \
" ON l.pid = a.procpid " \
" LEFT JOIN pg_database AS d " \
" ON a.datid = d.oid " \
" WHERE l.locktype = 'virtualxid' AND l.pid NOT IN (pg_backend_pid(), $1)" \
" AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
" AND a.current_query !~* E'^\\\\s*vacuum\\\\s+' " \
" AND a.current_query !~ E'^autovacuum: ' " \
" AND ((d.datname IS NULL OR d.datname = current_database()) OR l.database = 0)" \
" AND ($2::text IS NOT NULL)"
#define SQL_XID_SNAPSHOT \
(PQserverVersion(connection) >= 90200 ? SQL_XID_SNAPSHOT_90200 : \
(PQserverVersion(connection) >= 90000 ? SQL_XID_SNAPSHOT_90000 : \
SQL_XID_SNAPSHOT_80300))
/* Later, check whether any of the transactions we saw before are still
* alive, and wait for them to go away.
*/
#define SQL_XID_ALIVE \
"SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\
" AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)"
/* To be run while our main connection holds an AccessExclusive lock on the
* target table, and our secondary conn is attempting to grab an AccessShare
* lock. We know that "granted" must be false for these queries because
* we already hold the AccessExclusive lock. Also, we only care about other
* transactions trying to grab an ACCESS EXCLUSIVE lock, because we are only
* trying to kill off disallowed DDL commands, e.g. ALTER TABLE or TRUNCATE.
*/
#define CANCEL_COMPETING_LOCKS \
"SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\
" AND granted = false AND relation = %u"\
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
#define KILL_COMPETING_LOCKS \
"SELECT pg_terminate_backend(pid) "\
"FROM pg_locks WHERE locktype = 'relation'"\
" AND granted = false AND relation = %u"\
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
#define COUNT_COMPETING_LOCKS \
"SELECT pid FROM pg_locks WHERE locktype = 'relation'" \
" AND granted = false AND relation = %u" \
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
/* Will be used as a unique prefix for advisory locks. */
#define REPACK_LOCK_PREFIX_STR "16185446"
typedef enum
{
UNPROCESSED,
INPROGRESS,
FINISHED
} index_status_t;
/*
* per-index information
*/
typedef struct repack_index
{
Oid target_oid; /* target: OID */
const char *create_index; /* CREATE INDEX */
index_status_t status; /* Track parallel build statuses. */
int worker_idx; /* which worker conn is handling */
} repack_index;
/*
* per-table information
*/
typedef struct repack_table
{
const char *target_name; /* target: relname */
Oid target_oid; /* target: OID */
Oid target_toast; /* target: toast OID */
Oid target_tidx; /* target: toast index OID */
Oid pkid; /* target: PK OID */
Oid ckid; /* target: CK OID */
const char *create_pktype; /* CREATE TYPE pk */
const char *create_log; /* CREATE TABLE log */
const char *create_trigger; /* CREATE TRIGGER repack_trigger */
const char *enable_trigger; /* ALTER TABLE ENABLE ALWAYS TRIGGER repack_trigger */
const char *create_table; /* CREATE TABLE table AS SELECT WITH NO DATA*/
const char *dest_tablespace; /* Destination tablespace */
const char *copy_data; /* INSERT INTO */
const char *alter_col_storage; /* ALTER TABLE ALTER COLUMN SET STORAGE */
const char *drop_columns; /* ALTER TABLE DROP COLUMNs */
const char *delete_log; /* DELETE FROM log */
const char *lock_table; /* LOCK TABLE table */
const char *sql_peek; /* SQL used in flush */
const char *sql_insert; /* SQL used in flush */
const char *sql_delete; /* SQL used in flush */
const char *sql_update; /* SQL used in flush */
const char *sql_pop; /* SQL used in flush */
int n_indexes; /* number of indexes */
repack_index *indexes; /* info on each index */
} repack_table;
static bool is_superuser(void);
static void check_tablespace(void);
static bool preliminary_checks(char *errbuf, size_t errsize);
static bool is_requested_relation_exists(char *errbuf, size_t errsize);
static void repack_all_databases(const char *order_by);
static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize);
static void repack_one_table(repack_table *table, const char *order_by);
static bool repack_table_indexes(PGresult *index_details);
static bool repack_all_indexes(char *errbuf, size_t errsize);
static void repack_cleanup(bool fatal, const repack_table *table);
static void repack_cleanup_callback(bool fatal, void *userdata);
static bool rebuild_indexes(const repack_table *table);
static char *getstr(PGresult *res, int row, int col);
static Oid getoid(PGresult *res, int row, int col);
static bool advisory_lock(PGconn *conn, const char *relid);
static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact);
static bool kill_ddl(PGconn *conn, Oid relid, bool terminate);
static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name);
#define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
#define SQLSTATE_UNDEFINED_FUNCTION "42883"
#define SQLSTATE_LOCK_NOT_AVAILABLE "55P03"
static bool sqlstate_equals(PGresult *res, const char *state)
{
return strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), state) == 0;
}
static bool analyze = true;
static bool alldb = false;
static bool noorder = false;
static SimpleStringList parent_table_list = {NULL, NULL};
static SimpleStringList table_list = {NULL, NULL};
static SimpleStringList schema_list = {NULL, NULL};
static char *orderby = NULL;
static char *tablespace = NULL;
static bool moveidx = false;
static SimpleStringList r_index = {NULL, NULL};
static bool only_indexes = false;
static int wait_timeout = 60; /* in seconds */
static int jobs = 0; /* number of concurrent worker conns. */
static bool dryrun = false;
static unsigned int temp_obj_num = 0; /* temporary objects counter */
static bool no_kill_backend = false; /* abandon when timed-out */
static bool no_superuser_check = false;
static SimpleStringList exclude_extension_list = {NULL, NULL}; /* don't repack tables of these extensions */
static bool error_on_invalid_index = false; /* don't repack when invalid index is found */
static int switch_threshold = SWITCH_THRESHOLD_DEFAULT;
/* buffer should have at least 11 bytes */
static char *
utoa(unsigned int value, char *buffer)
{
sprintf(buffer, "%u", value);
return buffer;
}
static pgut_option options[] =
{
{ 'b', 'a', "all", &alldb },
{ 'l', 't', "table", &table_list },
{ 'l', 'I', "parent-table", &parent_table_list },
{ 'l', 'c', "schema", &schema_list },
{ 'b', 'n', "no-order", &noorder },
{ 'b', 'N', "dry-run", &dryrun },
{ 's', 'o', "order-by", &orderby },
{ 's', 's', "tablespace", &tablespace },
{ 'b', 'S', "moveidx", &moveidx },
{ 'l', 'i', "index", &r_index },
{ 'b', 'x', "only-indexes", &only_indexes },
{ 'i', 'T', "wait-timeout", &wait_timeout },
{ 'B', 'Z', "no-analyze", &analyze },
{ 'i', 'j', "jobs", &jobs },
{ 'b', 'D', "no-kill-backend", &no_kill_backend },
{ 'b', 'k', "no-superuser-check", &no_superuser_check },
{ 'l', 'C', "exclude-extension", &exclude_extension_list },
{ 'b', 2, "error-on-invalid-index", &error_on_invalid_index },
{ 'i', 1, "switch-threshold", &switch_threshold },
{ 0 },
};
int
main(int argc, char *argv[])
{
int i;
char errbuf[256];
i = pgut_getopt(argc, argv, options);
if (i == argc - 1)
dbname = argv[i];
else if (i < argc)
ereport(ERROR,
(errcode(EINVAL),
errmsg("too many arguments")));
check_tablespace();
if (dryrun)
elog(INFO, "Dry run enabled, not executing repack");
if (r_index.head || only_indexes)
{
if (r_index.head && table_list.head)
ereport(ERROR, (errcode(EINVAL),
errmsg("cannot specify --index (-i) and --table (-t)")));
if (r_index.head && parent_table_list.head)
ereport(ERROR, (errcode(EINVAL),
errmsg("cannot specify --index (-i) and --parent-table (-I)")));
else if (r_index.head && only_indexes)
ereport(ERROR, (errcode(EINVAL),
errmsg("cannot specify --index (-i) and --only-indexes (-x)")));
else if (r_index.head && exclude_extension_list.head)
ereport(ERROR, (errcode(EINVAL),
errmsg("cannot specify --index (-i) and --exclude-extension (-C)")));
else if (only_indexes && !(table_list.head || parent_table_list.head))
ereport(ERROR, (errcode(EINVAL),
errmsg("cannot repack all indexes of database, specify the table(s)"
"via --table (-t) or --parent-table (-I)")));
else if (only_indexes && exclude_extension_list.head)
ereport(ERROR, (errcode(EINVAL),
errmsg("cannot specify --only-indexes (-x) and --exclude-extension (-C)")));
else if (alldb)
ereport(ERROR, (errcode(EINVAL),
errmsg("cannot repack specific index(es) in all databases")));
else
{
if (orderby)
ereport(WARNING, (errcode(EINVAL),
errmsg("option -o (--order-by) has no effect while repacking indexes")));
else if (noorder)
ereport(WARNING, (errcode(EINVAL),
errmsg("option -n (--no-order) has no effect while repacking indexes")));
else if (!analyze)
ereport(WARNING, (errcode(EINVAL),
errmsg("ANALYZE is not performed after repacking indexes, -z (--no-analyze) has no effect")));
else if (jobs)
ereport(WARNING, (errcode(EINVAL),
errmsg("option -j (--jobs) has no effect, repacking indexes does not use parallel jobs")));
if (!repack_all_indexes(errbuf, sizeof(errbuf)))
ereport(ERROR,
(errcode(ERROR), errmsg("%s", errbuf)));
}
}
else
{
if (schema_list.head && (table_list.head || parent_table_list.head))
ereport(ERROR,
(errcode(EINVAL),
errmsg("cannot repack specific table(s) in schema, use schema.table notation instead")));
if (exclude_extension_list.head && table_list.head)
ereport(ERROR,
(errcode(EINVAL),
errmsg("cannot specify --table (-t) and --exclude-extension (-C)")));
if (exclude_extension_list.head && parent_table_list.head)
ereport(ERROR,
(errcode(EINVAL),
errmsg("cannot specify --parent-table (-I) and --exclude-extension (-C)")));
if (noorder)
orderby = "";
if (alldb)
{
if (table_list.head || parent_table_list.head)
ereport(ERROR,
(errcode(EINVAL),
errmsg("cannot repack specific table(s) in all databases")));
if (schema_list.head)
ereport(ERROR,
(errcode(EINVAL),
errmsg("cannot repack specific schema(s) in all databases")));
repack_all_databases(orderby);
}
else
{
if (!repack_one_database(orderby, errbuf, sizeof(errbuf)))
ereport(ERROR,
(errcode(ERROR), errmsg("%s failed with error: %s", PROGRAM_NAME, errbuf)));
}
}
return 0;
}
/*
* Test if the current user is a database superuser.
* Borrowed from psql/common.c
*
* Note: this will correctly detect superuserness only with a protocol-3.0
* or newer backend; otherwise it will always say "false".
*/
bool
is_superuser(void)
{
const char *val;
if (no_superuser_check)
return true;
if (!connection)
return false;
val = PQparameterStatus(connection, "is_superuser");
if (val && strcmp(val, "on") == 0)
return true;
return false;
}
/*
* Check if the tablespace requested exists.
*
* Raise an exception on error.
*/
void
check_tablespace()
{
PGresult *res = NULL;
const char *params[1];
if (tablespace == NULL)
{
/* nothing to check, but let's see the options */
if (moveidx)
{
ereport(ERROR,
(errcode(EINVAL),
errmsg("cannot specify --moveidx (-S) without --tablespace (-s)")));
}
return;
}
/* check if the tablespace exists */
reconnect(ERROR);
params[0] = tablespace;
res = execute_elevel(
"select spcname from pg_tablespace where spcname = $1",
1, params, DEBUG2);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
if (PQntuples(res) == 0)
{
ereport(ERROR,
(errcode(EINVAL),
errmsg("the tablespace \"%s\" doesn't exist", tablespace)));
}
}
else
{
ereport(ERROR,
(errcode(EINVAL),
errmsg("error checking the namespace: %s",
PQerrorMessage(connection))));
}
CLEARPGRES(res);
}
/*
* Perform sanity checks before beginning work. Make sure pg_repack is
* installed in the database, the user is a superuser, etc.
*/
static bool
preliminary_checks(char *errbuf, size_t errsize){
bool ret = false;
PGresult *res = NULL;
if (!is_superuser()) {
if (errbuf)
snprintf(errbuf, errsize, "You must be a superuser to use %s",
PROGRAM_NAME);
goto cleanup;
}
/* Query the extension version. Exit if no match */
res = execute_elevel("select repack.version(), repack.version_sql()",
0, NULL, DEBUG2);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
const char *libver;
char buf[64];
/* the string is something like "pg_repack 1.1.7" */
snprintf(buf, sizeof(buf), "%s %s", PROGRAM_NAME, PROGRAM_VERSION);
/* check the version of the C library */
libver = getstr(res, 0, 0);
if (0 != strcmp(buf, libver))
{
if (errbuf)
snprintf(errbuf, errsize,
"program '%s' does not match database library '%s'",
buf, libver);
goto cleanup;
}
/* check the version of the SQL extension */
libver = getstr(res, 0, 1);
if (0 != strcmp(buf, libver))
{
if (errbuf)
snprintf(errbuf, errsize,
"extension '%s' required, found '%s';"
" please drop and re-create the extension",
buf, libver);
goto cleanup;
}
}
else
{
if (sqlstate_equals(res, SQLSTATE_INVALID_SCHEMA_NAME)
|| sqlstate_equals(res, SQLSTATE_UNDEFINED_FUNCTION))
{
/* Schema repack does not exist, or version too old (version
* functions not found). Skip the database.
*/
if (errbuf)
snprintf(errbuf, errsize,
"%s %s is not installed in the database",
PROGRAM_NAME, PROGRAM_VERSION);
}
else
{
/* Return the error message otherwise */
if (errbuf)
snprintf(errbuf, errsize, "%s", PQerrorMessage(connection));
}
goto cleanup;
}
CLEARPGRES(res);
/* Disable statement timeout. */
command("SET statement_timeout = 0", 0, NULL);
/* Restrict search_path to system catalog. */
command("SET search_path = pg_catalog, pg_temp, public", 0, NULL);
/* To avoid annoying "create implicit ..." messages. */
command("SET client_min_messages = warning", 0, NULL);
ret = true;
cleanup:
CLEARPGRES(res);
return ret;
}
/*
* Check the presence of tables specified by --parent-table and --table
* otherwise format user-friendly message
*/
static bool
is_requested_relation_exists(char *errbuf, size_t errsize){
bool ret = false;
PGresult *res = NULL;
const char **params = NULL;
int iparam = 0;
StringInfoData sql;
int num_relations;
SimpleStringListCell *cell;
num_relations = simple_string_list_size(parent_table_list) +
simple_string_list_size(table_list);
/* nothing was implicitly requested, so nothing to do here */
if (num_relations == 0)
return true;
/* has no suitable to_regclass(text) */
if (PQserverVersion(connection)<90600)
return true;
params = pgut_malloc(num_relations * sizeof(char *));
initStringInfo(&sql);
appendStringInfoString(&sql, "SELECT r FROM (VALUES ");
for (cell = table_list.head; cell; cell = cell->next)
{
appendStringInfo(&sql, "($%d, 'r')", iparam + 1);
params[iparam++] = cell->val;
if (iparam < num_relations)
appendStringInfoChar(&sql, ',');
}
for (cell = parent_table_list.head; cell; cell = cell->next)
{
appendStringInfo(&sql, "($%d, 'p')", iparam + 1);
params[iparam++] = cell->val;
if (iparam < num_relations)
appendStringInfoChar(&sql, ',');
}
appendStringInfoString(&sql,
") AS given_t(r,kind) WHERE"
/* regular --table relation or inherited --parent-table */
" NOT EXISTS("
" SELECT FROM repack.tables WHERE relid=to_regclass(given_t.r))"
/* declarative partitioned --parent-table */
" AND NOT EXISTS("
" SELECT FROM pg_catalog.pg_class c WHERE c.oid=to_regclass(given_t.r) AND c.relkind = given_t.kind AND given_t.kind = 'p')"
);
/* double check the parameters array is sane */
if (iparam != num_relations)
{
if (errbuf)
snprintf(errbuf, errsize,
"internal error: bad parameters count: %i instead of %i",
iparam, num_relations);
goto cleanup;
}
res = execute_elevel(sql.data, iparam, params, DEBUG2);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
int num;
num = PQntuples(res);
if (num != 0)
{
int i;
StringInfoData rel_names;
initStringInfo(&rel_names);
for (i = 0; i < num; i++)
{
appendStringInfo(&rel_names, "\"%s\"", getstr(res, i, 0));
if ((i + 1) != num)
appendStringInfoString(&rel_names, ", ");
}
if (errbuf)
{
if (num > 1)
snprintf(errbuf, errsize,
"relations do not exist: %s", rel_names.data);
else
snprintf(errbuf, errsize,
"ERROR: relation %s does not exist", rel_names.data);
}
termStringInfo(&rel_names);
}
else
ret = true;
}
else
{
if (errbuf)
snprintf(errbuf, errsize, "%s", PQerrorMessage(connection));
}
CLEARPGRES(res);
cleanup:
CLEARPGRES(res);
termStringInfo(&sql);
free(params);
return ret;
}
/*
* Call repack_one_database for each database.
*/
static void
repack_all_databases(const char *orderby)
{
PGresult *result;
int i;
dbname = "postgres";
reconnect(ERROR);
if (!is_superuser())
elog(ERROR, "You must be a superuser to use %s", PROGRAM_NAME);
result = execute("SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", 0, NULL);
disconnect();
for (i = 0; i < PQntuples(result); i++)
{
bool ret;
char errbuf[256];
dbname = PQgetvalue(result, i, 0);
elog(INFO, "repacking database \"%s\"", dbname);
if (!dryrun)
{
ret = repack_one_database(orderby, errbuf, sizeof(errbuf));
if (!ret)
elog(INFO, "database \"%s\" skipped: %s", dbname, errbuf);
}
}
CLEARPGRES(result);
}
/* result is not copied */
static char *
getstr(PGresult *res, int row, int col)
{
if (PQgetisnull(res, row, col))
return NULL;
else
return PQgetvalue(res, row, col);
}
static Oid
getoid(PGresult *res, int row, int col)
{
if (PQgetisnull(res, row, col))
return InvalidOid;
else
return (Oid)strtoul(PQgetvalue(res, row, col), NULL, 10);
}
/*
* Call repack_one_table for the target tables or each table in a database.
*/
static bool
repack_one_database(const char *orderby, char *errbuf, size_t errsize)
{
bool ret = false;
PGresult *res = NULL;
int i;
int num;
StringInfoData sql;
SimpleStringListCell *cell;
const char **params = NULL;
int iparam = 0;
size_t num_parent_tables,
num_tables,
num_schemas,
num_params,
num_excluded_extensions;
num_parent_tables = simple_string_list_size(parent_table_list);
num_tables = simple_string_list_size(table_list);
num_schemas = simple_string_list_size(schema_list);
num_excluded_extensions = simple_string_list_size(exclude_extension_list);
/* 1st param is the user-specified tablespace */
num_params = num_excluded_extensions +
num_parent_tables +
num_tables +
num_schemas + 1;
params = pgut_malloc(num_params * sizeof(char *));
initStringInfo(&sql);
reconnect(ERROR);
/* No sense in setting up concurrent workers if --jobs=1 */
if (jobs > 1)
setup_workers(jobs);
if (!preliminary_checks(errbuf, errsize))
goto cleanup;
if (!is_requested_relation_exists(errbuf, errsize))
goto cleanup;
/* acquire target tables */
appendStringInfoString(&sql,
"SELECT t.*,"
" coalesce(v.tablespace, t.tablespace_orig) as tablespace_dest"
" FROM repack.tables t, "
" (VALUES (quote_ident($1::text))) as v (tablespace)"
" WHERE ");
params[iparam++] = tablespace;
if (num_tables || num_parent_tables)
{
/* standalone tables */
if (num_tables)
{
appendStringInfoString(&sql, "(");
for (cell = table_list.head; cell; cell = cell->next)
{
/* Construct table name placeholders to be used by PQexecParams */
appendStringInfo(&sql, "relid = $%d::regclass", iparam + 1);
params[iparam++] = cell->val;
if (cell->next)
appendStringInfoString(&sql, " OR ");
}
appendStringInfoString(&sql, ")");
}
if (num_tables && num_parent_tables)
appendStringInfoString(&sql, " OR ");
/* parent tables + inherited children */
if (num_parent_tables)
{
appendStringInfoString(&sql, "(");
for (cell = parent_table_list.head; cell; cell = cell->next)
{
/* Construct table name placeholders to be used by PQexecParams */
appendStringInfo(&sql,
"relid = ANY(repack.get_table_and_inheritors($%d::regclass))",
iparam + 1);
params[iparam++] = cell->val;
if (cell->next)
appendStringInfoString(&sql, " OR ");
}
appendStringInfoString(&sql, ")");
}
}
else if (num_schemas)
{
appendStringInfoString(&sql, "schemaname IN (");
for (cell = schema_list.head; cell; cell = cell->next)
{
/* Construct schema name placeholders to be used by PQexecParams */
appendStringInfo(&sql, "$%d", iparam + 1);
params[iparam++] = cell->val;
if (cell->next)
appendStringInfoString(&sql, ", ");
}
appendStringInfoString(&sql, ")");
}
else
{
appendStringInfoString(&sql, "pkid IS NOT NULL");
}
/* Exclude tables which belong to extensions */
if (exclude_extension_list.head)
{
appendStringInfoString(&sql, " AND t.relid NOT IN"
" (SELECT d.objid::regclass"
" FROM pg_depend d JOIN pg_extension e"
" ON d.refobjid = e.oid"
" WHERE d.classid = 'pg_class'::regclass AND (");
/* List all excluded extensions */
for (cell = exclude_extension_list.head; cell; cell = cell->next)
{
appendStringInfo(&sql, "e.extname = $%d", iparam + 1);
params[iparam++] = cell->val;
appendStringInfoString(&sql, cell->next ? " OR " : ")");
}
/* Close subquery */
appendStringInfoString(&sql, ")");
}
/* Ensure the regression tests get a consistent ordering of tables */
appendStringInfoString(&sql, " ORDER BY t.relname, t.schemaname");
/* double check the parameters array is sane */
if (iparam != num_params)
{
if (errbuf)
snprintf(errbuf, errsize,
"internal error: bad parameters count: %i instead of %zi",
iparam, num_params);
goto cleanup;
}
res = execute_elevel(sql.data, (int) num_params, params, DEBUG2);
/* on error skip the database */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
/* Return the error message otherwise */
if (errbuf)
snprintf(errbuf, errsize, "%s", PQerrorMessage(connection));
goto cleanup;
}
num = PQntuples(res);
for (i = 0; i < num; i++)
{
repack_table table;
StringInfoData copy_sql;
const char *ckey;
int c = 0;
table.target_name = getstr(res, i, c++);
table.target_oid = getoid(res, i, c++);
table.target_toast = getoid(res, i, c++);
table.target_tidx = getoid(res, i, c++);
c++; // Skip schemaname
table.pkid = getoid(res, i, c++);
table.ckid = getoid(res, i, c++);
if (table.pkid == 0) {
ereport(WARNING,
(errcode(E_PG_COMMAND),
errmsg("relation \"%s\" must have a primary key or not-null unique keys", table.target_name)));
continue;
}
table.create_pktype = getstr(res, i, c++);
table.create_log = getstr(res, i, c++);
table.create_trigger = getstr(res, i, c++);
table.enable_trigger = getstr(res, i, c++);
table.create_table = getstr(res, i, c++);
getstr(res, i, c++); /* tablespace_orig is clobbered */
table.copy_data = getstr(res, i , c++);
table.alter_col_storage = getstr(res, i, c++);
table.drop_columns = getstr(res, i, c++);
table.delete_log = getstr(res, i, c++);
table.lock_table = getstr(res, i, c++);
ckey = getstr(res, i, c++);
table.sql_peek = getstr(res, i, c++);
table.sql_insert = getstr(res, i, c++);
table.sql_delete = getstr(res, i, c++);
table.sql_update = getstr(res, i, c++);
table.sql_pop = getstr(res, i, c++);
table.dest_tablespace = getstr(res, i, c++);
/* Craft Copy SQL */
initStringInfo(©_sql);
appendStringInfoString(©_sql, table.copy_data);
if (!orderby)
{
if (ckey != NULL)
{
/* CLUSTER mode */
appendStringInfoString(©_sql, " ORDER BY ");
appendStringInfoString(©_sql, ckey);
}
/* else, VACUUM FULL mode (non-clustered tables) */
}
else if (!orderby[0])
{
/* VACUUM FULL mode (for clustered tables too), do nothing */
}
else
{
/* User specified ORDER BY */
appendStringInfoString(©_sql, " ORDER BY ");
appendStringInfoString(©_sql, orderby);
}
table.copy_data = copy_sql.data;
repack_one_table(&table, orderby);
}
ret = true;
cleanup:
CLEARPGRES(res);
disconnect();
termStringInfo(&sql);
free(params);
return ret;
}
static int
apply_log(PGconn *conn, const repack_table *table, int count)
{
int result;
PGresult *res;
const char *params[6];
char buffer[12];
params[0] = table->sql_peek;
params[1] = table->sql_insert;
params[2] = table->sql_delete;
params[3] = table->sql_update;
params[4] = table->sql_pop;
params[5] = utoa(count, buffer);
res = pgut_execute(conn,
"SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)",
6, params);
result = atoi(PQgetvalue(res, 0, 0));
CLEARPGRES(res);
return result;
}
/*
* Create indexes on temp table, possibly using multiple worker connections
* concurrently if the user asked for --jobs=...