PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pg_publication.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_publication.c
4 * publication C API manipulation
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/catalog/pg_publication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/genam.h"
18#include "access/heapam.h"
19#include "access/htup_details.h"
20#include "access/tableam.h"
21#include "catalog/catalog.h"
22#include "catalog/dependency.h"
23#include "catalog/indexing.h"
24#include "catalog/namespace.h"
26#include "catalog/partition.h"
27#include "catalog/pg_inherits.h"
32#include "catalog/pg_type.h"
34#include "funcapi.h"
35#include "utils/array.h"
36#include "utils/builtins.h"
37#include "utils/catcache.h"
38#include "utils/fmgroids.h"
39#include "utils/lsyscache.h"
40#include "utils/rel.h"
41#include "utils/syscache.h"
42
43/* Records association between publication and published table */
44typedef struct
45{
46 Oid relid; /* OID of published table */
47 Oid pubid; /* OID of publication that publishes this
48 * table. */
50
51/*
52 * Check if relation can be in given publication and throws appropriate
53 * error if not.
54 */
55static void
57{
58 Relation targetrel = pri->relation;
59 const char *errormsg;
60
61 if (pri->except)
62 errormsg = gettext_noop("cannot use publication EXCEPT clause for relation \"%s\"");
63 else
64 errormsg = gettext_noop("cannot add relation \"%s\" to publication");
65
66 /* If in EXCEPT clause, must be root partitioned table */
67 if (pri->except && targetrel->rd_rel->relispartition)
71 errdetail("This operation is not supported for individual partitions.")));
72
73 /* Must be a regular or partitioned table */
80
81 /* Can't be system table */
86 errdetail("This operation is not supported for system tables.")));
87
88 /* UNLOGGED and TEMP relations cannot be part of publication. */
89 if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
93 errdetail("This operation is not supported for temporary tables.")));
94 else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
98 errdetail("This operation is not supported for unlogged tables.")));
99}
100
101/*
102 * Check if schema can be in given publication and throw appropriate error if
103 * not.
104 */
105static void
107{
108 /* Can't be system namespace */
112 errmsg("cannot add schema \"%s\" to publication",
114 errdetail("This operation is not supported for system schemas.")));
115
116 /* Can't be temporary namespace */
120 errmsg("cannot add schema \"%s\" to publication",
122 errdetail("Temporary schemas cannot be replicated.")));
123}
124
125/*
126 * Returns if relation represented by oid and Form_pg_class entry
127 * is publishable.
128 *
129 * Does same checks as check_publication_add_relation() above except for
130 * RELKIND_SEQUENCE, but does not need relation to be opened and also does
131 * not throw errors. Here, the additional check is to support ALL SEQUENCES
132 * publication.
133 *
134 * XXX This also excludes all tables with relid < FirstNormalObjectId,
135 * ie all tables created during initdb. This mainly affects the preinstalled
136 * information_schema. IsCatalogRelationOid() only excludes tables with
137 * relid < FirstUnpinnedObjectId, making that test rather redundant,
138 * but really we should get rid of the FirstNormalObjectId test not
139 * IsCatalogRelationOid. We can't do so today because we don't want
140 * information_schema tables to be considered publishable; but this test
141 * is really inadequate for that, since the information_schema could be
142 * dropped and reloaded and then it'll be considered publishable. The best
143 * long-term solution may be to add a "relispublishable" bool to pg_class,
144 * and depend on that instead of OID checks.
145 */
146static bool
148{
149 return (reltuple->relkind == RELKIND_RELATION ||
151 reltuple->relkind == RELKIND_SEQUENCE) &&
152 !IsCatalogRelationOid(relid) &&
153 reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
154 relid >= FirstNormalObjectId;
155}
156
157/*
158 * Another variant of is_publishable_class(), taking a Relation.
159 */
160bool
165
166/*
167 * SQL-callable variant of the above
168 *
169 * This returns null when the relation does not exist. This is intended to be
170 * used for example in psql to avoid gratuitous errors when there are
171 * concurrent catalog changes.
172 */
173Datum
175{
176 Oid relid = PG_GETARG_OID(0);
177 HeapTuple tuple;
178 bool result;
179
180 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
181 if (!HeapTupleIsValid(tuple))
183 result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
184 ReleaseSysCache(tuple);
185 PG_RETURN_BOOL(result);
186}
187
188/*
189 * Returns true if the ancestor is in the list of published relations.
190 * Otherwise, returns false.
191 */
192static bool
194{
195 ListCell *lc;
196
197 foreach(lc, table_infos)
198 {
199 Oid relid = ((published_rel *) lfirst(lc))->relid;
200
201 if (relid == ancestor)
202 return true;
203 }
204
205 return false;
206}
207
208/*
209 * Filter out the partitions whose parent tables are also present in the list.
210 */
211static void
213{
214 ListCell *lc;
215
216 foreach(lc, table_infos)
217 {
218 bool skip = false;
219 List *ancestors = NIL;
220 ListCell *lc2;
222
224 ancestors = get_partition_ancestors(table_info->relid);
225
226 foreach(lc2, ancestors)
227 {
229
231 {
232 skip = true;
233 break;
234 }
235 }
236
237 if (skip)
239 }
240}
241
242/*
243 * Returns true if any schema is associated with the publication, false if no
244 * schema is associated with the publication.
245 */
246bool
272
273/*
274 * Returns true if the relation has column list associated with the
275 * publication, false otherwise.
276 *
277 * If a column list is found, the corresponding bitmap is returned through the
278 * cols parameter, if provided. The bitmap is constructed within the given
279 * memory context (mcxt).
280 */
281bool
283 Bitmapset **cols)
284{
286 bool found = false;
287
288 if (pub->alltables)
289 return false;
290
292 ObjectIdGetDatum(relid),
293 ObjectIdGetDatum(pub->oid));
295 {
297 bool isnull;
298
299 /* Lookup the column list attribute. */
302
303 /* Was a column list found? */
304 if (!isnull)
305 {
306 /* Build the column list bitmap in the given memory context. */
307 if (cols)
308 *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
309
310 found = true;
311 }
312
314 }
315
316 return found;
317}
318
319/*
320 * Gets the relations based on the publication partition option for a specified
321 * relation.
322 */
323List *
325 Oid relid)
326{
329 {
331 NULL);
332
334 result = list_concat(result, all_parts);
336 {
337 ListCell *lc;
338
339 foreach(lc, all_parts)
340 {
342
344 result = lappend_oid(result, partOid);
345 }
346 }
347 else
348 Assert(false);
349 }
350 else
351 result = lappend_oid(result, relid);
352
353 return result;
354}
355
356/*
357 * Returns the relid of the topmost ancestor that is published via this
358 * publication if any and set its ancestor level to ancestor_level,
359 * otherwise returns InvalidOid.
360 *
361 * The ancestor_level value allows us to compare the results for multiple
362 * publications, and decide which value is higher up.
363 *
364 * Note that the list of ancestors should be ordered such that the topmost
365 * ancestor is at the end of the list.
366 */
367Oid
369{
370 ListCell *lc;
372 int level = 0;
373
374 /*
375 * Find the "topmost" ancestor that is in this publication.
376 */
377 foreach(lc, ancestors)
378 {
382
383 level++;
384
386 {
388
389 if (ancestor_level)
390 *ancestor_level = level;
391 }
392 else
393 {
396 {
398
399 if (ancestor_level)
400 *ancestor_level = level;
401 }
402 }
403
406 }
407
408 return topmost_relid;
409}
410
411/*
412 * attnumstoint2vector
413 * Convert a Bitmapset of AttrNumbers into an int2vector.
414 *
415 * AttrNumber numbers are 0-based, i.e., not offset by
416 * FirstLowInvalidHeapAttributeNumber.
417 */
418static int2vector *
420{
421 int2vector *result;
422 int n = bms_num_members(attrs);
423 int i = -1;
424 int j = 0;
425
426 result = buildint2vector(NULL, n);
427
428 while ((i = bms_next_member(attrs, i)) >= 0)
429 {
431
432 result->values[j++] = (int16) i;
433 }
434
435 return result;
436}
437
438/*
439 * Insert new publication / relation mapping.
440 */
443 bool if_not_exists)
444{
445 Relation rel;
448 bool nulls[Natts_pg_publication_rel];
449 Relation targetrel = pri->relation;
452 Bitmapset *attnums;
453 Publication *pub = GetPublication(pubid);
456 List *relids = NIL;
457 int i;
458
460
461 /*
462 * Check for duplicates. Note that this does not really prevent
463 * duplicates, it's here just to provide nicer error message in common
464 * case. The real protection is the unique key on the catalog.
465 */
467 ObjectIdGetDatum(pubid)))
468 {
470
471 if (if_not_exists)
473
476 errmsg("relation \"%s\" is already member of publication \"%s\"",
478 }
479
481
482 /* Validate and translate column names into a Bitmapset of attnums. */
483 attnums = pub_collist_validate(pri->relation, pri->columns);
484
485 /* Form a tuple. */
486 memset(values, 0, sizeof(values));
487 memset(nulls, false, sizeof(nulls));
488
493 ObjectIdGetDatum(pubid);
495 ObjectIdGetDatum(relid);
497 BoolGetDatum(pri->except);
498
499 /* Add qualifications, if available */
500 if (pri->whereClause != NULL)
502 else
503 nulls[Anum_pg_publication_rel_prqual - 1] = true;
504
505 /* Add column list, if available */
506 if (pri->columns)
508 else
509 nulls[Anum_pg_publication_rel_prattrs - 1] = true;
510
512
513 /* Insert tuple into catalog. */
516
517 /* Register dependencies as needed */
519
520 /* Add dependency on the publication */
523
524 /* Add dependency on the relation */
527
528 /* Add dependency on the objects mentioned in the qualifications */
529 if (pri->whereClause)
530 recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
532 false);
533
534 /* Add dependency on the columns, if any are listed */
535 i = -1;
536 while ((i = bms_next_member(attnums, i)) >= 0)
537 {
540 }
541
542 /* Close the table. */
544
545 /*
546 * Relations excluded via the EXCEPT clause do not need explicit
547 * invalidation as CreatePublication() function invalidates all relations
548 * as part of defining a FOR ALL TABLES publication.
549 */
550 if (!pri->except)
551 {
552 /*
553 * Invalidate relcache so that publication info is rebuilt.
554 *
555 * For the partitioned tables, we must invalidate all partitions
556 * contained in the respective partition hierarchies, not just the one
557 * explicitly mentioned in the publication. This is required because
558 * we implicitly publish the child tables when the parent table is
559 * published.
560 */
562 relid);
563
565 }
566
567 return myself;
568}
569
570/*
571 * pub_collist_validate
572 * Process and validate the 'columns' list and ensure the columns are all
573 * valid to use for a publication. Checks for and raises an ERROR for
574 * any unknown columns, system columns, duplicate columns, or virtual
575 * generated columns.
576 *
577 * Looks up each column's attnum and returns a 0-based Bitmapset of the
578 * corresponding attnums.
579 */
580Bitmapset *
582{
583 Bitmapset *set = NULL;
584 ListCell *lc;
586
587 foreach(lc, columns)
588 {
589 char *colname = strVal(lfirst(lc));
591
595 errmsg("column \"%s\" of relation \"%s\" does not exist",
597
601 errmsg("cannot use system column \"%s\" in publication column list",
602 colname));
603
604 if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
607 errmsg("cannot use virtual generated column \"%s\" in publication column list",
608 colname));
609
610 if (bms_is_member(attnum, set))
613 errmsg("duplicate column \"%s\" in publication column list",
614 colname));
615
616 set = bms_add_member(set, attnum);
617 }
618
619 return set;
620}
621
622/*
623 * Transform a column list (represented by an array Datum) to a bitmapset.
624 *
625 * If columns isn't NULL, add the column numbers to that set.
626 *
627 * If mcxt isn't NULL, build the bitmapset in that context.
628 */
629Bitmapset *
631{
632 Bitmapset *result = columns;
633 ArrayType *arr;
634 int nelems;
635 int16 *elems;
637
639 nelems = ARR_DIMS(arr)[0];
640 elems = (int16 *) ARR_DATA_PTR(arr);
641
642 /* If a memory context was specified, switch to it. */
643 if (mcxt)
645
646 for (int i = 0; i < nelems; i++)
647 result = bms_add_member(result, elems[i]);
648
649 if (mcxt)
651
652 return result;
653}
654
655/*
656 * Returns a bitmap representing the columns of the specified table.
657 *
658 * Generated columns are included if include_gencols_type is
659 * PUBLISH_GENCOLS_STORED.
660 */
661Bitmapset *
662pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
663{
664 Bitmapset *result = NULL;
665 TupleDesc desc = RelationGetDescr(relation);
666
667 for (int i = 0; i < desc->natts; i++)
668 {
670
671 if (att->attisdropped)
672 continue;
673
674 if (att->attgenerated)
675 {
676 /* We only support replication of STORED generated cols. */
677 if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
678 continue;
679
680 /* User hasn't requested to replicate STORED generated cols. */
681 if (include_gencols_type != PUBLISH_GENCOLS_STORED)
682 continue;
683 }
684
685 result = bms_add_member(result, att->attnum);
686 }
687
688 return result;
689}
690
691/*
692 * Insert new publication / schema mapping.
693 */
695publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
696{
697 Relation rel;
701 Oid psschid;
702 Publication *pub = GetPublication(pubid);
706
708
709 /*
710 * Check for duplicates. Note that this does not really prevent
711 * duplicates, it's here just to provide nicer error message in common
712 * case. The real protection is the unique key on the catalog.
713 */
716 ObjectIdGetDatum(pubid)))
717 {
719
720 if (if_not_exists)
722
725 errmsg("schema \"%s\" is already member of publication \"%s\"",
727 }
728
730
731 /* Form a tuple */
732 memset(values, 0, sizeof(values));
733 memset(nulls, false, sizeof(nulls));
734
739 ObjectIdGetDatum(pubid);
742
744
745 /* Insert tuple into catalog */
748
750
751 /* Add dependency on the publication */
754
755 /* Add dependency on the schema */
758
759 /* Close the table */
761
762 /*
763 * Invalidate relcache so that publication info is rebuilt. See
764 * publication_add_relation for why we need to consider all the
765 * partitions.
766 */
770
771 return myself;
772}
773
774/*
775 * Internal function to get the list of publication oids for a relation.
776 *
777 * If except_flag is true, returns the list of publication that specified the
778 * relation in EXCEPT clause; otherwise, returns the list of publications
779 * in which relation is included.
780 */
781static List *
783{
784 List *result = NIL;
786
787 /* Find all publications associated with the relation. */
789 ObjectIdGetDatum(relid));
790 for (int i = 0; i < pubrellist->n_members; i++)
791 {
792 HeapTuple tup = &pubrellist->members[i]->tuple;
794 Oid pubid = pubrel->prpubid;
795
796 if (pubrel->prexcept == except_flag)
797 result = lappend_oid(result, pubid);
798 }
799
801
802 return result;
803}
804
805/*
806 * Gets list of publication oids for a relation.
807 */
808List *
810{
811 return get_relation_publications(relid, false);
812}
813
814/*
815 * Gets list of publication oids which has relation in EXCEPT clause.
816 */
817List *
819{
820 return get_relation_publications(relid, true);
821}
822
823/*
824 * Internal function to get the list of relation oids for a publication.
825 *
826 * If except_flag is true, returns the list of relations specified in the
827 * EXCEPT clause of the publication; otherwise, returns the list of relations
828 * included in the publication.
829 */
830static List *
832 bool except_flag)
833{
834 List *result;
837 SysScanDesc scan;
839
840 /* Find all relations associated with the publication. */
842
846 ObjectIdGetDatum(pubid));
847
849 true, NULL, 1, &scankey);
850
851 result = NIL;
852 while (HeapTupleIsValid(tup = systable_getnext(scan)))
853 {
855
857
858 if (except_flag == pubrel->prexcept)
860 pubrel->prrelid);
861 }
862
863 systable_endscan(scan);
865
866 /* Now sort and de-duplicate the result list */
867 list_sort(result, list_oid_cmp);
868 list_deduplicate_oid(result);
869
870 return result;
871}
872
873/*
874 * Gets list of relation oids that are associated with a publication.
875 *
876 * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
877 * should use GetAllPublicationRelations().
878 */
879List *
881{
882 Assert(!GetPublication(pubid)->alltables);
883
884 return get_publication_relations(pubid, pub_partopt, false);
885}
886
887/*
888 * Gets list of table oids that were specified in the EXCEPT clause for a
889 * publication.
890 *
891 * This should only be used FOR ALL TABLES publications.
892 */
893List *
895{
896 Assert(GetPublication(pubid)->alltables);
897
898 return get_publication_relations(pubid, pub_partopt, true);
899}
900
901/*
902 * Gets list of publication oids for publications marked as FOR ALL TABLES.
903 */
904List *
906{
907 List *result;
908 Relation rel;
910 SysScanDesc scan;
912
913 /* Find all publications that are marked as for all tables. */
915
919 BoolGetDatum(true));
920
921 scan = systable_beginscan(rel, InvalidOid, false,
922 NULL, 1, &scankey);
923
924 result = NIL;
925 while (HeapTupleIsValid(tup = systable_getnext(scan)))
926 {
927 Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
928
929 result = lappend_oid(result, oid);
930 }
931
932 systable_endscan(scan);
934
935 return result;
936}
937
938/*
939 * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
940 * publication.
941 *
942 * If the publication publishes partition changes via their respective root
943 * partitioned tables, we must exclude partitions in favor of including the
944 * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
945 * publication.
946 *
947 * For a FOR ALL TABLES publication, the returned list excludes tables mentioned
948 * in EXCEPT TABLE clause.
949 */
950List *
951GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot)
952{
954 ScanKeyData key[1];
955 TableScanDesc scan;
956 HeapTuple tuple;
957 List *result = NIL;
959
960 Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
961
962 /* EXCEPT filtering applies only to relations, not sequences */
963 if (relkind == RELKIND_RELATION)
964 exceptlist = GetExcludedPublicationTables(pubid, pubviaroot ?
967
969
970 ScanKeyInit(&key[0],
973 CharGetDatum(relkind));
974
975 scan = table_beginscan_catalog(classRel, 1, key);
976
977 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
978 {
980 Oid relid = relForm->oid;
981
982 if (is_publishable_class(relid, relForm) &&
983 !(relForm->relispartition && pubviaroot) &&
985 result = lappend_oid(result, relid);
986 }
987
988 table_endscan(scan);
989
990 if (pubviaroot)
991 {
992 ScanKeyInit(&key[0],
996
997 scan = table_beginscan_catalog(classRel, 1, key);
998
999 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1000 {
1002 Oid relid = relForm->oid;
1003
1004 if (is_publishable_class(relid, relForm) &&
1005 !relForm->relispartition &&
1006 !list_member_oid(exceptlist, relid))
1007 result = lappend_oid(result, relid);
1008 }
1009
1010 table_endscan(scan);
1011 }
1012
1014 return result;
1015}
1016
1017/*
1018 * Gets the list of schema oids for a publication.
1019 *
1020 * This should only be used FOR TABLES IN SCHEMA publications.
1021 */
1022List *
1024{
1025 List *result = NIL;
1028 SysScanDesc scan;
1029 HeapTuple tup;
1030
1031 /* Find all schemas associated with the publication */
1033
1037 ObjectIdGetDatum(pubid));
1038
1041 true, NULL, 1, &scankey);
1042 while (HeapTupleIsValid(tup = systable_getnext(scan)))
1043 {
1045
1047
1048 result = lappend_oid(result, pubsch->pnnspid);
1049 }
1050
1051 systable_endscan(scan);
1053
1054 return result;
1055}
1056
1057/*
1058 * Gets the list of publication oids associated with a specified schema.
1059 */
1060List *
1062{
1063 List *result = NIL;
1065 int i;
1066
1067 /* Find all publications associated with the schema */
1070 for (i = 0; i < pubschlist->n_members; i++)
1071 {
1072 HeapTuple tup = &pubschlist->members[i]->tuple;
1074
1075 result = lappend_oid(result, pubid);
1076 }
1077
1079
1080 return result;
1081}
1082
1083/*
1084 * Get the list of publishable relation oids for a specified schema.
1085 */
1086List *
1088{
1090 ScanKeyData key[1];
1091 TableScanDesc scan;
1092 HeapTuple tuple;
1093 List *result = NIL;
1094
1096
1098
1099 ScanKeyInit(&key[0],
1103
1104 /* get all the relations present in the specified schema */
1105 scan = table_beginscan_catalog(classRel, 1, key);
1106 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1107 {
1109 Oid relid = relForm->oid;
1110 char relkind;
1111
1112 if (!is_publishable_class(relid, relForm))
1113 continue;
1114
1115 relkind = get_rel_relkind(relid);
1116 if (relkind == RELKIND_RELATION)
1117 result = lappend_oid(result, relid);
1118 else if (relkind == RELKIND_PARTITIONED_TABLE)
1119 {
1121
1122 /*
1123 * It is quite possible that some of the partitions are in a
1124 * different schema than the parent table, so we need to get such
1125 * partitions separately.
1126 */
1129 relForm->oid);
1130 result = list_concat_unique_oid(result, partitionrels);
1131 }
1132 }
1133
1134 table_endscan(scan);
1136 return result;
1137}
1138
1139/*
1140 * Gets the list of all relations published by FOR TABLES IN SCHEMA
1141 * publication.
1142 */
1143List *
1145{
1146 List *result = NIL;
1148 ListCell *cell;
1149
1150 foreach(cell, pubschemalist)
1151 {
1152 Oid schemaid = lfirst_oid(cell);
1153 List *schemaRels = NIL;
1154
1156 result = list_concat(result, schemaRels);
1157 }
1158
1159 return result;
1160}
1161
1162/*
1163 * Get publication using oid
1164 *
1165 * The Publication struct and its data are palloc'ed here.
1166 */
1169{
1170 HeapTuple tup;
1171 Publication *pub;
1173
1175 if (!HeapTupleIsValid(tup))
1176 elog(ERROR, "cache lookup failed for publication %u", pubid);
1177
1179
1181 pub->oid = pubid;
1182 pub->name = pstrdup(NameStr(pubform->pubname));
1183 pub->alltables = pubform->puballtables;
1184 pub->allsequences = pubform->puballsequences;
1185 pub->pubactions.pubinsert = pubform->pubinsert;
1186 pub->pubactions.pubupdate = pubform->pubupdate;
1187 pub->pubactions.pubdelete = pubform->pubdelete;
1188 pub->pubactions.pubtruncate = pubform->pubtruncate;
1189 pub->pubviaroot = pubform->pubviaroot;
1190 pub->pubgencols_type = pubform->pubgencols;
1191
1193
1194 return pub;
1195}
1196
1197/*
1198 * Get Publication using name.
1199 */
1201GetPublicationByName(const char *pubname, bool missing_ok)
1202{
1203 Oid oid;
1204
1205 oid = get_publication_oid(pubname, missing_ok);
1206
1207 return OidIsValid(oid) ? GetPublication(oid) : NULL;
1208}
1209
1210/*
1211 * Get information of the tables in the given publication array.
1212 *
1213 * Returns pubid, relid, column list, row filter for each table.
1214 */
1215Datum
1217{
1218#define NUM_PUBLICATION_TABLES_ELEM 4
1220 List *table_infos = NIL;
1221
1222 /* stuff done only on the first call of the function */
1223 if (SRF_IS_FIRSTCALL())
1224 {
1225 TupleDesc tupdesc;
1226 MemoryContext oldcontext;
1227 ArrayType *arr;
1228 Datum *elems;
1229 int nelems,
1230 i;
1231 bool viaroot = false;
1232
1233 /* create a function context for cross-call persistence */
1235
1236 /* switch to memory context appropriate for multiple function calls */
1237 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1238
1239 /*
1240 * Deconstruct the parameter into elements where each element is a
1241 * publication name.
1242 */
1243 arr = PG_GETARG_ARRAYTYPE_P(0);
1244 deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
1245
1246 /* Get Oids of tables from each publication. */
1247 for (i = 0; i < nelems; i++)
1248 {
1251 ListCell *lc;
1252
1254
1255 /*
1256 * Publications support partitioned tables. If
1257 * publish_via_partition_root is false, all changes are replicated
1258 * using leaf partition identity and schema, so we only need
1259 * those. Otherwise, get the partitioned table itself.
1260 */
1261 if (pub_elem->alltables)
1264 pub_elem->pubviaroot);
1265 else
1266 {
1267 List *relids,
1268 *schemarelids;
1269
1271 pub_elem->pubviaroot ?
1275 pub_elem->pubviaroot ?
1279 }
1280
1281 /*
1282 * Record the published table and the corresponding publication so
1283 * that we can get row filters and column lists later.
1284 *
1285 * When a table is published by multiple publications, to obtain
1286 * all row filters and column lists, the structure related to this
1287 * table will be recorded multiple times.
1288 */
1289 foreach(lc, pub_elem_tables)
1290 {
1292
1293 table_info->relid = lfirst_oid(lc);
1294 table_info->pubid = pub_elem->oid;
1296 }
1297
1298 /* At least one publication is using publish_via_partition_root. */
1299 if (pub_elem->pubviaroot)
1300 viaroot = true;
1301 }
1302
1303 /*
1304 * If the publication publishes partition changes via their respective
1305 * root partitioned tables, we must exclude partitions in favor of
1306 * including the root partitioned tables. Otherwise, the function
1307 * could return both the child and parent tables which could cause
1308 * data of the child table to be double-published on the subscriber
1309 * side.
1310 */
1311 if (viaroot)
1313
1314 /* Construct a tuple descriptor for the result rows. */
1316 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1317 OIDOID, -1, 0);
1318 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1319 OIDOID, -1, 0);
1320 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1321 INT2VECTOROID, -1, 0);
1322 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1323 PG_NODE_TREEOID, -1, 0);
1324
1325 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
1326 funcctx->user_fctx = table_infos;
1327
1328 MemoryContextSwitchTo(oldcontext);
1329 }
1330
1331 /* stuff done on every call of the function */
1333 table_infos = (List *) funcctx->user_fctx;
1334
1335 if (funcctx->call_cntr < list_length(table_infos))
1336 {
1339 Publication *pub;
1341 Oid relid = table_info->relid;
1344 bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1345
1346 /*
1347 * Form tuple with appropriate data.
1348 */
1349
1350 pub = GetPublication(table_info->pubid);
1351
1352 values[0] = ObjectIdGetDatum(pub->oid);
1353 values[1] = ObjectIdGetDatum(relid);
1354
1355 /*
1356 * We don't consider row filters or column lists for FOR ALL TABLES or
1357 * FOR TABLES IN SCHEMA publications.
1358 */
1359 if (!pub->alltables &&
1362 ObjectIdGetDatum(pub->oid)))
1364 ObjectIdGetDatum(relid),
1365 ObjectIdGetDatum(pub->oid));
1366
1368 {
1369 /* Lookup the column list attribute. */
1372 &(nulls[2]));
1373
1374 /* Null indicates no filter. */
1377 &(nulls[3]));
1378 }
1379 else
1380 {
1381 nulls[2] = true;
1382 nulls[3] = true;
1383 }
1384
1385 /* Show all columns when the column list is not specified. */
1386 if (nulls[2])
1387 {
1388 Relation rel = table_open(relid, AccessShareLock);
1389 int nattnums = 0;
1390 int16 *attnums;
1391 TupleDesc desc = RelationGetDescr(rel);
1392 int i;
1393
1394 attnums = palloc_array(int16, desc->natts);
1395
1396 for (i = 0; i < desc->natts; i++)
1397 {
1399
1400 if (att->attisdropped)
1401 continue;
1402
1403 if (att->attgenerated)
1404 {
1405 /* We only support replication of STORED generated cols. */
1406 if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
1407 continue;
1408
1409 /*
1410 * User hasn't requested to replicate STORED generated
1411 * cols.
1412 */
1414 continue;
1415 }
1416
1417 attnums[nattnums++] = att->attnum;
1418 }
1419
1420 if (nattnums > 0)
1421 {
1422 values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1423 nulls[2] = false;
1424 }
1425
1427 }
1428
1429 rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1430
1432 }
1433
1435}
1436
1437/*
1438 * Returns Oids of sequences in a publication.
1439 */
1440Datum
1442{
1444 List *sequences = NIL;
1445
1446 /* stuff done only on the first call of the function */
1447 if (SRF_IS_FIRSTCALL())
1448 {
1449 char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1450 Publication *publication;
1451 MemoryContext oldcontext;
1452
1453 /* create a function context for cross-call persistence */
1455
1456 /* switch to memory context appropriate for multiple function calls */
1457 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1458
1459 publication = GetPublicationByName(pubname, false);
1460
1461 if (publication->allsequences)
1464 false);
1465
1466 funcctx->user_fctx = sequences;
1467
1468 MemoryContextSwitchTo(oldcontext);
1469 }
1470
1471 /* stuff done on every call of the function */
1473 sequences = (List *) funcctx->user_fctx;
1474
1475 if (funcctx->call_cntr < list_length(sequences))
1476 {
1477 Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
1478
1480 }
1481
1483}
#define PG_GETARG_ARRAYTYPE_P(n)
Definition array.h:263
#define ARR_DATA_PTR(a)
Definition array.h:322
#define DatumGetArrayTypeP(X)
Definition array.h:261
#define ARR_DIMS(a)
Definition array.h:294
void deconstruct_array_builtin(const ArrayType *array, Oid elmtype, Datum **elemsp, bool **nullsp, int *nelemsp)
int16 AttrNumber
Definition attnum.h:21
#define AttrNumberIsForUserDefinedAttr(attributeNumber)
Definition attnum.h:41
#define InvalidAttrNumber
Definition attnum.h:23
int bms_next_member(const Bitmapset *a, int prevbit)
Definition bitmapset.c:1290
int bms_num_members(const Bitmapset *a)
Definition bitmapset.c:744
bool bms_is_member(int x, const Bitmapset *a)
Definition bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
static Datum values[MAXATTR]
Definition bootstrap.c:187
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define TextDatumGetCString(d)
Definition builtins.h:99
#define NameStr(name)
Definition c.h:798
#define gettext_noop(x)
Definition c.h:1248
#define Assert(condition)
Definition c.h:906
int16_t int16
Definition c.h:574
#define PG_INT16_MAX
Definition c.h:633
#define OidIsValid(objectId)
Definition c.h:821
bool IsToastNamespace(Oid namespaceId)
Definition catalog.c:261
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition catalog.c:448
bool IsCatalogNamespace(Oid namespaceId)
Definition catalog.c:243
bool IsCatalogRelation(Relation relation)
Definition catalog.c:104
bool IsCatalogRelationOid(Oid relid)
Definition catalog.c:121
void recordDependencyOnSingleRelExpr(const ObjectAddress *depender, Node *expr, Oid relId, DependencyType behavior, DependencyType self_behavior, bool reverse_self)
@ DEPENDENCY_AUTO
Definition dependency.h:34
@ DEPENDENCY_NORMAL
Definition dependency.h:33
int errcode(int sqlerrcode)
Definition elog.c:874
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
#define palloc_object(type)
Definition fe_memutils.h:74
#define palloc_array(type, count)
Definition fe_memutils.h:76
#define PG_GETARG_OID(n)
Definition fmgr.h:275
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition fmgr.h:360
#define SRF_IS_FIRSTCALL()
Definition funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition funcapi.h:306
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition funcapi.h:230
#define SRF_RETURN_DONE(_funcctx)
Definition funcapi.h:328
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
Definition heapam.c:1410
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
int2vector * buildint2vector(const int16 *int2s, int n)
Definition int.c:114
int j
Definition isn.c:78
int i
Definition isn.c:77
List * list_concat_unique_oid(List *list1, const List *list2)
Definition list.c:1469
List * lappend(List *list, void *datum)
Definition list.c:339
void list_sort(List *list, list_sort_comparator cmp)
Definition list.c:1674
List * list_concat(List *list1, const List *list2)
Definition list.c:561
List * lappend_oid(List *list, Oid datum)
Definition list.c:375
void list_deduplicate_oid(List *list)
Definition list.c:1495
int list_oid_cmp(const ListCell *p1, const ListCell *p2)
Definition list.c:1703
void list_free(List *list)
Definition list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition list.c:722
#define NoLock
Definition lockdefs.h:34
#define AccessShareLock
Definition lockdefs.h:36
#define RowExclusiveLock
Definition lockdefs.h:38
bool get_rel_relispartition(Oid relid)
Definition lsyscache.c:2177
AttrNumber get_attnum(Oid relid, const char *attname)
Definition lsyscache.c:934
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2153
Oid get_publication_oid(const char *pubname, bool missing_ok)
Definition lsyscache.c:3777
Oid get_rel_namespace(Oid relid)
Definition lsyscache.c:2102
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3518
char * pstrdup(const char *in)
Definition mcxt.c:1781
bool isAnyTempNamespace(Oid namespaceId)
Definition namespace.c:3759
static char * errmsg
const ObjectAddress InvalidObjectAddress
#define ObjectAddressSet(addr, class_id, object_id)
#define ObjectAddressSubSet(addr, class_id, object_id, object_sub_id)
char * nodeToString(const void *obj)
Definition outfuncs.c:802
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
List * get_partition_ancestors(Oid relid)
Definition partition.c:134
int16 attnum
FormData_pg_attribute * Form_pg_attribute
static const struct exclude_list_item skip[]
int errdetail_relkind_not_supported(char relkind)
Definition pg_class.c:24
FormData_pg_class * Form_pg_class
Definition pg_class.h:160
void recordDependencyOn(const ObjectAddress *depender, const ObjectAddress *referenced, DependencyType behavior)
Definition pg_depend.c:47
static SequenceItem * sequences
Definition pg_dump.c:214
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
static Oid list_nth_oid(const List *list, int n)
Definition pg_list.h:321
#define foreach_delete_current(lst, var_or_cell)
Definition pg_list.h:391
static void * list_nth(const List *list, int n)
Definition pg_list.h:299
#define lfirst_oid(lc)
Definition pg_list.h:174
static int2vector * attnumstoint2vector(Bitmapset *attrs)
Bitmapset * pub_collist_validate(Relation targetrel, List *columns)
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
bool is_schema_publication(Oid pubid)
ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
List * GetPublicationSchemas(Oid pubid)
static void check_publication_add_relation(PublicationRelInfo *pri)
ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists)
List * GetAllTablesPublications(void)
List * GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
List * GetRelationIncludedPublications(Oid relid)
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
List * GetSchemaPublications(Oid schemaid)
Datum pg_get_publication_tables(PG_FUNCTION_ARGS)
static void filter_partitions(List *table_infos)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
List * GetIncludedPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
static List * get_publication_relations(Oid pubid, PublicationPartOpt pub_partopt, bool except_flag)
static bool is_publishable_class(Oid relid, Form_pg_class reltuple)
Datum pg_get_publication_sequences(PG_FUNCTION_ARGS)
List * GetRelationExcludedPublications(Oid relid)
Bitmapset * pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
Publication * GetPublication(Oid pubid)
#define NUM_PUBLICATION_TABLES_ELEM
static bool is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
static void check_publication_add_schema(Oid schemaid)
List * GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot)
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
static List * get_relation_publications(Oid relid, bool except_flag)
Datum pg_relation_is_publishable(PG_FUNCTION_ARGS)
bool is_publishable_relation(Relation rel)
List * GetExcludedPublicationTables(Oid pubid, PublicationPartOpt pub_partopt)
END_CATALOG_STRUCT typedef FormData_pg_publication * Form_pg_publication
PublicationPartOpt
@ PUBLICATION_PART_LEAF
@ PUBLICATION_PART_ROOT
@ PUBLICATION_PART_ALL
END_CATALOG_STRUCT typedef FormData_pg_publication_namespace * Form_pg_publication_namespace
END_CATALOG_STRUCT typedef FormData_pg_publication_rel * Form_pg_publication_rel
static Datum PointerGetDatum(const void *X)
Definition postgres.h:352
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
uint64_t Datum
Definition postgres.h:70
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define InvalidOid
unsigned int Oid
static int fb(int x)
void InvalidatePublicationRels(List *relids)
#define RelationGetForm(relation)
Definition rel.h:508
#define RelationGetRelid(relation)
Definition rel.h:514
#define RelationGetDescr(relation)
Definition rel.h:540
#define RelationGetRelationName(relation)
Definition rel.h:548
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
@ ForwardScanDirection
Definition sdir.h:28
#define BTEqualStrategyNumber
Definition stratnum.h:31
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
Definition pg_list.h:54
PublishGencolsType pubgencols_type
PublicationActions pubactions
Form_pg_class rd_rel
Definition rel.h:111
int16 values[FLEXIBLE_ARRAY_MEMBER]
Definition c.h:774
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache2(SysCacheIdentifier cacheId, Datum key1, Datum key2)
Definition syscache.c:230
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:220
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:595
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition syscache.h:93
#define ReleaseSysCacheList(x)
Definition syscache.h:134
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition syscache.h:102
#define SearchSysCacheList1(cacheId, key1)
Definition syscache.h:127
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, ScanKeyData *key)
Definition tableam.c:113
static void table_endscan(TableScanDesc scan)
Definition tableam.h:1004
#define FirstNormalObjectId
Definition transam.h:197
TupleDesc CreateTemplateTupleDesc(int natts)
Definition tupdesc.c:165
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition tupdesc.c:825
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:160
#define strVal(v)
Definition value.h:82
char * text_to_cstring(const text *t)
Definition varlena.c:215