PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pg_publication.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_publication.c
4 * publication C API manipulation
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/catalog/pg_publication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/genam.h"
18#include "access/heapam.h"
19#include "access/htup_details.h"
20#include "access/tableam.h"
21#include "catalog/catalog.h"
22#include "catalog/dependency.h"
23#include "catalog/indexing.h"
24#include "catalog/namespace.h"
26#include "catalog/partition.h"
27#include "catalog/pg_inherits.h"
32#include "catalog/pg_type.h"
34#include "funcapi.h"
35#include "utils/array.h"
36#include "utils/builtins.h"
37#include "utils/catcache.h"
38#include "utils/fmgroids.h"
39#include "utils/lsyscache.h"
40#include "utils/rel.h"
41#include "utils/syscache.h"
42
43/* Records association between publication and published table */
44typedef struct
45{
46 Oid relid; /* OID of published table */
47 Oid pubid; /* OID of publication that publishes this
48 * table. */
50
51/*
52 * Check if relation can be in given publication and throws appropriate
53 * error if not.
54 */
55static void
57{
58 Relation targetrel = pri->relation;
59 const char *errormsg;
60
61 if (pri->except)
62 errormsg = gettext_noop("cannot use publication EXCEPT clause for relation \"%s\"");
63 else
64 errormsg = gettext_noop("cannot add relation \"%s\" to publication");
65
66 /* If in EXCEPT clause, must be root partitioned table */
67 if (pri->except && targetrel->rd_rel->relispartition)
71 errdetail("This operation is not supported for individual partitions.")));
72
73 /* Must be a regular or partitioned table */
80
81 /* Can't be system table */
86 errdetail("This operation is not supported for system tables.")));
87
88 /* UNLOGGED and TEMP relations cannot be part of publication. */
89 if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
93 errdetail("This operation is not supported for temporary tables.")));
94 else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
98 errdetail("This operation is not supported for unlogged tables.")));
99}
100
101/*
102 * Check if schema can be in given publication and throw appropriate error if
103 * not.
104 */
105static void
107{
108 /* Can't be system namespace */
112 errmsg("cannot add schema \"%s\" to publication",
114 errdetail("This operation is not supported for system schemas.")));
115
116 /* Can't be temporary namespace */
120 errmsg("cannot add schema \"%s\" to publication",
122 errdetail("Temporary schemas cannot be replicated.")));
123}
124
125/*
126 * Returns if relation represented by oid and Form_pg_class entry
127 * is publishable.
128 *
129 * Does same checks as check_publication_add_relation() above except for
130 * RELKIND_SEQUENCE, but does not need relation to be opened and also does
131 * not throw errors. Here, the additional check is to support ALL SEQUENCES
132 * publication.
133 *
134 * XXX This also excludes all tables with relid < FirstNormalObjectId,
135 * ie all tables created during initdb. This mainly affects the preinstalled
136 * information_schema. IsCatalogRelationOid() only excludes tables with
137 * relid < FirstUnpinnedObjectId, making that test rather redundant,
138 * but really we should get rid of the FirstNormalObjectId test not
139 * IsCatalogRelationOid. We can't do so today because we don't want
140 * information_schema tables to be considered publishable; but this test
141 * is really inadequate for that, since the information_schema could be
142 * dropped and reloaded and then it'll be considered publishable. The best
143 * long-term solution may be to add a "relispublishable" bool to pg_class,
144 * and depend on that instead of OID checks.
145 */
146static bool
148{
149 return (reltuple->relkind == RELKIND_RELATION ||
151 reltuple->relkind == RELKIND_SEQUENCE) &&
152 !IsCatalogRelationOid(relid) &&
153 reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
154 relid >= FirstNormalObjectId;
155}
156
157/*
158 * Another variant of is_publishable_class(), taking a Relation.
159 */
160bool
165
166/*
167 * SQL-callable variant of the above
168 *
169 * This returns null when the relation does not exist. This is intended to be
170 * used for example in psql to avoid gratuitous errors when there are
171 * concurrent catalog changes.
172 */
173Datum
175{
176 Oid relid = PG_GETARG_OID(0);
177 HeapTuple tuple;
178 bool result;
179
180 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
181 if (!HeapTupleIsValid(tuple))
183 result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
184 ReleaseSysCache(tuple);
185 PG_RETURN_BOOL(result);
186}
187
188/*
189 * Returns true if the ancestor is in the list of published relations.
190 * Otherwise, returns false.
191 */
192static bool
194{
195 ListCell *lc;
196
197 foreach(lc, table_infos)
198 {
199 Oid relid = ((published_rel *) lfirst(lc))->relid;
200
201 if (relid == ancestor)
202 return true;
203 }
204
205 return false;
206}
207
208/*
209 * Filter out the partitions whose parent tables are also present in the list.
210 */
211static void
213{
214 ListCell *lc;
215
216 foreach(lc, table_infos)
217 {
218 bool skip = false;
219 List *ancestors = NIL;
220 ListCell *lc2;
222
224 ancestors = get_partition_ancestors(table_info->relid);
225
226 foreach(lc2, ancestors)
227 {
229
231 {
232 skip = true;
233 break;
234 }
235 }
236
237 if (skip)
239 }
240}
241
242/*
243 * Returns true if any schema is associated with the publication, false if no
244 * schema is associated with the publication.
245 */
246bool
272
273/*
274 * Returns true if the publication has explicitly included relation (i.e.,
275 * not marked as EXCEPT).
276 */
277bool
279{
282 SysScanDesc scan;
284 bool result = false;
285
290 ObjectIdGetDatum(pubid));
291
294 true, NULL, 1, &scankey);
295 tup = systable_getnext(scan);
297 {
299
301
302 /*
303 * For any publication, pg_publication_rel contains either only EXCEPT
304 * entries or only explicitly included tables. Therefore, examining
305 * the first tuple is sufficient to determine table inclusion.
306 */
307 result = !pubrel->prexcept;
308 }
309
310 systable_endscan(scan);
312
313 return result;
314}
315
316/*
317 * Returns true if the relation has column list associated with the
318 * publication, false otherwise.
319 *
320 * If a column list is found, the corresponding bitmap is returned through the
321 * cols parameter, if provided. The bitmap is constructed within the given
322 * memory context (mcxt).
323 */
324bool
326 Bitmapset **cols)
327{
329 bool found = false;
330
331 if (pub->alltables)
332 return false;
333
335 ObjectIdGetDatum(relid),
336 ObjectIdGetDatum(pub->oid));
338 {
340 bool isnull;
341
342 /* Lookup the column list attribute. */
345
346 /* Was a column list found? */
347 if (!isnull)
348 {
349 /* Build the column list bitmap in the given memory context. */
350 if (cols)
351 *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
352
353 found = true;
354 }
355
357 }
358
359 return found;
360}
361
362/*
363 * Gets the relations based on the publication partition option for a specified
364 * relation.
365 */
366List *
368 Oid relid)
369{
372 {
374 NULL);
375
377 result = list_concat(result, all_parts);
379 {
380 ListCell *lc;
381
382 foreach(lc, all_parts)
383 {
385
387 result = lappend_oid(result, partOid);
388 }
389 }
390 else
391 Assert(false);
392 }
393 else
394 result = lappend_oid(result, relid);
395
396 return result;
397}
398
399/*
400 * Returns the relid of the topmost ancestor that is published via this
401 * publication if any and set its ancestor level to ancestor_level,
402 * otherwise returns InvalidOid.
403 *
404 * The ancestor_level value allows us to compare the results for multiple
405 * publications, and decide which value is higher up.
406 *
407 * Note that the list of ancestors should be ordered such that the topmost
408 * ancestor is at the end of the list.
409 */
410Oid
412{
413 ListCell *lc;
415 int level = 0;
416
417 /*
418 * Find the "topmost" ancestor that is in this publication.
419 */
420 foreach(lc, ancestors)
421 {
425
426 level++;
427
429 {
431
432 if (ancestor_level)
433 *ancestor_level = level;
434 }
435 else
436 {
439 {
441
442 if (ancestor_level)
443 *ancestor_level = level;
444 }
445 }
446
449 }
450
451 return topmost_relid;
452}
453
454/*
455 * attnumstoint2vector
456 * Convert a Bitmapset of AttrNumbers into an int2vector.
457 *
458 * AttrNumber numbers are 0-based, i.e., not offset by
459 * FirstLowInvalidHeapAttributeNumber.
460 */
461static int2vector *
463{
464 int2vector *result;
465 int n = bms_num_members(attrs);
466 int i = -1;
467 int j = 0;
468
469 result = buildint2vector(NULL, n);
470
471 while ((i = bms_next_member(attrs, i)) >= 0)
472 {
474
475 result->values[j++] = (int16) i;
476 }
477
478 return result;
479}
480
481/*
482 * Insert new publication / relation mapping.
483 */
486 bool if_not_exists, AlterPublicationStmt *alter_stmt)
487{
488 Relation rel;
491 bool nulls[Natts_pg_publication_rel];
492 Relation targetrel = pri->relation;
495 Bitmapset *attnums;
496 Publication *pub = GetPublication(pubid);
499 List *relids = NIL;
500 int i;
502
504
505 /*
506 * Check for duplicates. Note that this does not really prevent
507 * duplicates, it's here just to provide nicer error message in common
508 * case. The real protection is the unique key on the catalog.
509 */
511 ObjectIdGetDatum(pubid)))
512 {
514
515 if (if_not_exists)
517
520 errmsg("relation \"%s\" is already member of publication \"%s\"",
522 }
523
525
526 /* Validate and translate column names into a Bitmapset of attnums. */
527 attnums = pub_collist_validate(pri->relation, pri->columns);
528
529 /* Form a tuple. */
530 memset(values, 0, sizeof(values));
531 memset(nulls, false, sizeof(nulls));
532
537 ObjectIdGetDatum(pubid);
539 ObjectIdGetDatum(relid);
541 BoolGetDatum(pri->except);
542
543 /* Add qualifications, if available */
544 if (pri->whereClause != NULL)
546 else
547 nulls[Anum_pg_publication_rel_prqual - 1] = true;
548
549 /* Add column list, if available */
550 if (pri->columns)
552 else
553 nulls[Anum_pg_publication_rel_prattrs - 1] = true;
554
556
557 /* Insert tuple into catalog. */
560
561 /* Register dependencies as needed */
563
564 /* Add dependency on the publication */
567
568 /* Add dependency on the relation */
571
572 /* Add dependency on the objects mentioned in the qualifications */
573 if (pri->whereClause)
574 recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
576 false);
577
578 /* Add dependency on the columns, if any are listed */
579 i = -1;
580 while ((i = bms_next_member(attnums, i)) >= 0)
581 {
584 }
585
586 /* Close the table. */
588
589 /*
590 * Determine whether EXCEPT tables require explicit relcache invalidation.
591 *
592 * For CREATE PUBLICATION with EXCEPT tables, invalidation is skipped
593 * here, as CreatePublication() function invalidates all relations as part
594 * of defining a FOR ALL TABLES publication.
595 *
596 * For ALTER PUBLICATION, invalidation is needed only when adding an
597 * EXCEPT table to a publication already marked as ALL TABLES. For
598 * publications that were originally empty or defined as ALL SEQUENCES and
599 * are being converted to ALL TABLES, invalidation is skipped here, as
600 * AlterPublicationAllFlags() function invalidates all relations while
601 * marking the publication as ALL TABLES publication.
602 */
604 (alter_stmt->for_all_tables && pri->except);
605
606 if (!pri->except || inval_except_table)
607 {
608 /*
609 * Invalidate relcache so that publication info is rebuilt.
610 *
611 * For the partitioned tables, we must invalidate all partitions
612 * contained in the respective partition hierarchies, not just the one
613 * explicitly mentioned in the publication. This is required because
614 * we implicitly publish the child tables when the parent table is
615 * published.
616 */
618 relid);
619
621 }
622
623 return myself;
624}
625
626/*
627 * pub_collist_validate
628 * Process and validate the 'columns' list and ensure the columns are all
629 * valid to use for a publication. Checks for and raises an ERROR for
630 * any unknown columns, system columns, duplicate columns, or virtual
631 * generated columns.
632 *
633 * Looks up each column's attnum and returns a 0-based Bitmapset of the
634 * corresponding attnums.
635 */
636Bitmapset *
638{
639 Bitmapset *set = NULL;
640 ListCell *lc;
642
643 foreach(lc, columns)
644 {
645 char *colname = strVal(lfirst(lc));
647
651 errmsg("column \"%s\" of relation \"%s\" does not exist",
653
657 errmsg("cannot use system column \"%s\" in publication column list",
658 colname));
659
660 if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
663 errmsg("cannot use virtual generated column \"%s\" in publication column list",
664 colname));
665
666 if (bms_is_member(attnum, set))
669 errmsg("duplicate column \"%s\" in publication column list",
670 colname));
671
672 set = bms_add_member(set, attnum);
673 }
674
675 return set;
676}
677
678/*
679 * Transform a column list (represented by an array Datum) to a bitmapset.
680 *
681 * If columns isn't NULL, add the column numbers to that set.
682 *
683 * If mcxt isn't NULL, build the bitmapset in that context.
684 */
685Bitmapset *
687{
688 Bitmapset *result = columns;
689 ArrayType *arr;
690 int nelems;
691 int16 *elems;
693
695 nelems = ARR_DIMS(arr)[0];
696 elems = (int16 *) ARR_DATA_PTR(arr);
697
698 /* If a memory context was specified, switch to it. */
699 if (mcxt)
701
702 for (int i = 0; i < nelems; i++)
703 result = bms_add_member(result, elems[i]);
704
705 if (mcxt)
707
708 return result;
709}
710
711/*
712 * Returns a bitmap representing the columns of the specified table.
713 *
714 * Generated columns are included if include_gencols_type is
715 * PUBLISH_GENCOLS_STORED.
716 */
717Bitmapset *
718pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
719{
720 Bitmapset *result = NULL;
721 TupleDesc desc = RelationGetDescr(relation);
722
723 for (int i = 0; i < desc->natts; i++)
724 {
726
727 if (att->attisdropped)
728 continue;
729
730 if (att->attgenerated)
731 {
732 /* We only support replication of STORED generated cols. */
733 if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
734 continue;
735
736 /* User hasn't requested to replicate STORED generated cols. */
737 if (include_gencols_type != PUBLISH_GENCOLS_STORED)
738 continue;
739 }
740
741 result = bms_add_member(result, att->attnum);
742 }
743
744 return result;
745}
746
747/*
748 * Insert new publication / schema mapping.
749 */
751publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
752{
753 Relation rel;
757 Oid psschid;
758 Publication *pub = GetPublication(pubid);
762
764
765 /*
766 * Check for duplicates. Note that this does not really prevent
767 * duplicates, it's here just to provide nicer error message in common
768 * case. The real protection is the unique key on the catalog.
769 */
772 ObjectIdGetDatum(pubid)))
773 {
775
776 if (if_not_exists)
778
781 errmsg("schema \"%s\" is already member of publication \"%s\"",
783 }
784
786
787 /* Form a tuple */
788 memset(values, 0, sizeof(values));
789 memset(nulls, false, sizeof(nulls));
790
795 ObjectIdGetDatum(pubid);
798
800
801 /* Insert tuple into catalog */
804
806
807 /* Add dependency on the publication */
810
811 /* Add dependency on the schema */
814
815 /* Close the table */
817
818 /*
819 * Invalidate relcache so that publication info is rebuilt. See
820 * publication_add_relation for why we need to consider all the
821 * partitions.
822 */
826
827 return myself;
828}
829
830/*
831 * Internal function to get the list of publication oids for a relation.
832 *
833 * If except_flag is true, returns the list of publication that specified the
834 * relation in EXCEPT clause; otherwise, returns the list of publications
835 * in which relation is included.
836 */
837static List *
839{
840 List *result = NIL;
842
843 /* Find all publications associated with the relation. */
845 ObjectIdGetDatum(relid));
846 for (int i = 0; i < pubrellist->n_members; i++)
847 {
848 HeapTuple tup = &pubrellist->members[i]->tuple;
850 Oid pubid = pubrel->prpubid;
851
852 if (pubrel->prexcept == except_flag)
853 result = lappend_oid(result, pubid);
854 }
855
857
858 return result;
859}
860
861/*
862 * Gets list of publication oids for a relation.
863 */
864List *
866{
867 return get_relation_publications(relid, false);
868}
869
870/*
871 * Gets list of publication oids which has relation in EXCEPT clause.
872 */
873List *
875{
876 return get_relation_publications(relid, true);
877}
878
879/*
880 * Internal function to get the list of relation oids for a publication.
881 *
882 * If except_flag is true, returns the list of relations specified in the
883 * EXCEPT clause of the publication; otherwise, returns the list of relations
884 * included in the publication.
885 */
886static List *
888 bool except_flag)
889{
890 List *result;
893 SysScanDesc scan;
895
896 /* Find all relations associated with the publication. */
898
902 ObjectIdGetDatum(pubid));
903
905 true, NULL, 1, &scankey);
906
907 result = NIL;
908 while (HeapTupleIsValid(tup = systable_getnext(scan)))
909 {
911
913
914 if (except_flag == pubrel->prexcept)
916 pubrel->prrelid);
917 }
918
919 systable_endscan(scan);
921
922 /* Now sort and de-duplicate the result list */
923 list_sort(result, list_oid_cmp);
924 list_deduplicate_oid(result);
925
926 return result;
927}
928
929/*
930 * Gets list of relation oids that are associated with a publication.
931 *
932 * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
933 * should use GetAllPublicationRelations().
934 */
935List *
937{
938 Assert(!GetPublication(pubid)->alltables);
939
940 return get_publication_relations(pubid, pub_partopt, false);
941}
942
943/*
944 * Gets list of table oids that were specified in the EXCEPT clause for a
945 * publication.
946 *
947 * This should only be used FOR ALL TABLES publications.
948 */
949List *
951{
952 Assert(GetPublication(pubid)->alltables);
953
954 return get_publication_relations(pubid, pub_partopt, true);
955}
956
957/*
958 * Gets list of publication oids for publications marked as FOR ALL TABLES.
959 */
960List *
962{
963 List *result;
964 Relation rel;
966 SysScanDesc scan;
968
969 /* Find all publications that are marked as for all tables. */
971
975 BoolGetDatum(true));
976
977 scan = systable_beginscan(rel, InvalidOid, false,
978 NULL, 1, &scankey);
979
980 result = NIL;
981 while (HeapTupleIsValid(tup = systable_getnext(scan)))
982 {
983 Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
984
985 result = lappend_oid(result, oid);
986 }
987
988 systable_endscan(scan);
990
991 return result;
992}
993
994/*
995 * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
996 * publication.
997 *
998 * If the publication publishes partition changes via their respective root
999 * partitioned tables, we must exclude partitions in favor of including the
1000 * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
1001 * publication.
1002 *
1003 * For a FOR ALL TABLES publication, the returned list excludes tables mentioned
1004 * in EXCEPT TABLE clause.
1005 */
1006List *
1007GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot)
1008{
1010 ScanKeyData key[1];
1011 TableScanDesc scan;
1012 HeapTuple tuple;
1013 List *result = NIL;
1014 List *exceptlist = NIL;
1015
1016 Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
1017
1018 /* EXCEPT filtering applies only to relations, not sequences */
1019 if (relkind == RELKIND_RELATION)
1020 exceptlist = GetExcludedPublicationTables(pubid, pubviaroot ?
1023
1025
1026 ScanKeyInit(&key[0],
1029 CharGetDatum(relkind));
1030
1031 scan = table_beginscan_catalog(classRel, 1, key);
1032
1033 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1034 {
1036 Oid relid = relForm->oid;
1037
1038 if (is_publishable_class(relid, relForm) &&
1039 !(relForm->relispartition && pubviaroot) &&
1040 !list_member_oid(exceptlist, relid))
1041 result = lappend_oid(result, relid);
1042 }
1043
1044 table_endscan(scan);
1045
1046 if (pubviaroot)
1047 {
1048 ScanKeyInit(&key[0],
1052
1053 scan = table_beginscan_catalog(classRel, 1, key);
1054
1055 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1056 {
1058 Oid relid = relForm->oid;
1059
1060 if (is_publishable_class(relid, relForm) &&
1061 !relForm->relispartition &&
1062 !list_member_oid(exceptlist, relid))
1063 result = lappend_oid(result, relid);
1064 }
1065
1066 table_endscan(scan);
1067 }
1068
1070 return result;
1071}
1072
1073/*
1074 * Gets the list of schema oids for a publication.
1075 *
1076 * This should only be used FOR TABLES IN SCHEMA publications.
1077 */
1078List *
1080{
1081 List *result = NIL;
1084 SysScanDesc scan;
1085 HeapTuple tup;
1086
1087 /* Find all schemas associated with the publication */
1089
1093 ObjectIdGetDatum(pubid));
1094
1097 true, NULL, 1, &scankey);
1098 while (HeapTupleIsValid(tup = systable_getnext(scan)))
1099 {
1101
1103
1104 result = lappend_oid(result, pubsch->pnnspid);
1105 }
1106
1107 systable_endscan(scan);
1109
1110 return result;
1111}
1112
1113/*
1114 * Gets the list of publication oids associated with a specified schema.
1115 */
1116List *
1118{
1119 List *result = NIL;
1121 int i;
1122
1123 /* Find all publications associated with the schema */
1126 for (i = 0; i < pubschlist->n_members; i++)
1127 {
1128 HeapTuple tup = &pubschlist->members[i]->tuple;
1130
1131 result = lappend_oid(result, pubid);
1132 }
1133
1135
1136 return result;
1137}
1138
1139/*
1140 * Get the list of publishable relation oids for a specified schema.
1141 */
1142List *
1144{
1146 ScanKeyData key[1];
1147 TableScanDesc scan;
1148 HeapTuple tuple;
1149 List *result = NIL;
1150
1152
1154
1155 ScanKeyInit(&key[0],
1159
1160 /* get all the relations present in the specified schema */
1161 scan = table_beginscan_catalog(classRel, 1, key);
1162 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1163 {
1165 Oid relid = relForm->oid;
1166 char relkind;
1167
1168 if (!is_publishable_class(relid, relForm))
1169 continue;
1170
1171 relkind = get_rel_relkind(relid);
1172 if (relkind == RELKIND_RELATION)
1173 result = lappend_oid(result, relid);
1174 else if (relkind == RELKIND_PARTITIONED_TABLE)
1175 {
1177
1178 /*
1179 * It is quite possible that some of the partitions are in a
1180 * different schema than the parent table, so we need to get such
1181 * partitions separately.
1182 */
1185 relForm->oid);
1186 result = list_concat_unique_oid(result, partitionrels);
1187 }
1188 }
1189
1190 table_endscan(scan);
1192 return result;
1193}
1194
1195/*
1196 * Gets the list of all relations published by FOR TABLES IN SCHEMA
1197 * publication.
1198 */
1199List *
1201{
1202 List *result = NIL;
1204 ListCell *cell;
1205
1206 foreach(cell, pubschemalist)
1207 {
1208 Oid schemaid = lfirst_oid(cell);
1209 List *schemaRels = NIL;
1210
1212 result = list_concat(result, schemaRels);
1213 }
1214
1215 return result;
1216}
1217
1218/*
1219 * Get publication using oid
1220 *
1221 * The Publication struct and its data are palloc'ed here.
1222 */
1225{
1226 HeapTuple tup;
1227 Publication *pub;
1229
1231 if (!HeapTupleIsValid(tup))
1232 elog(ERROR, "cache lookup failed for publication %u", pubid);
1233
1235
1237 pub->oid = pubid;
1238 pub->name = pstrdup(NameStr(pubform->pubname));
1239 pub->alltables = pubform->puballtables;
1240 pub->allsequences = pubform->puballsequences;
1241 pub->pubactions.pubinsert = pubform->pubinsert;
1242 pub->pubactions.pubupdate = pubform->pubupdate;
1243 pub->pubactions.pubdelete = pubform->pubdelete;
1244 pub->pubactions.pubtruncate = pubform->pubtruncate;
1245 pub->pubviaroot = pubform->pubviaroot;
1246 pub->pubgencols_type = pubform->pubgencols;
1247
1249
1250 return pub;
1251}
1252
1253/*
1254 * Get Publication using name.
1255 */
1257GetPublicationByName(const char *pubname, bool missing_ok)
1258{
1259 Oid oid;
1260
1261 oid = get_publication_oid(pubname, missing_ok);
1262
1263 return OidIsValid(oid) ? GetPublication(oid) : NULL;
1264}
1265
1266/*
1267 * Get information of the tables in the given publication array.
1268 *
1269 * Returns pubid, relid, column list, row filter for each table.
1270 */
1271Datum
1273{
1274#define NUM_PUBLICATION_TABLES_ELEM 4
1276 List *table_infos = NIL;
1277
1278 /* stuff done only on the first call of the function */
1279 if (SRF_IS_FIRSTCALL())
1280 {
1281 TupleDesc tupdesc;
1282 MemoryContext oldcontext;
1283 ArrayType *arr;
1284 Datum *elems;
1285 int nelems,
1286 i;
1287 bool viaroot = false;
1288
1289 /* create a function context for cross-call persistence */
1291
1292 /* switch to memory context appropriate for multiple function calls */
1293 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1294
1295 /*
1296 * Deconstruct the parameter into elements where each element is a
1297 * publication name.
1298 */
1299 arr = PG_GETARG_ARRAYTYPE_P(0);
1300 deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
1301
1302 /* Get Oids of tables from each publication. */
1303 for (i = 0; i < nelems; i++)
1304 {
1307 ListCell *lc;
1308
1310
1311 /*
1312 * Publications support partitioned tables. If
1313 * publish_via_partition_root is false, all changes are replicated
1314 * using leaf partition identity and schema, so we only need
1315 * those. Otherwise, get the partitioned table itself.
1316 */
1317 if (pub_elem->alltables)
1320 pub_elem->pubviaroot);
1321 else
1322 {
1323 List *relids,
1324 *schemarelids;
1325
1327 pub_elem->pubviaroot ?
1331 pub_elem->pubviaroot ?
1335 }
1336
1337 /*
1338 * Record the published table and the corresponding publication so
1339 * that we can get row filters and column lists later.
1340 *
1341 * When a table is published by multiple publications, to obtain
1342 * all row filters and column lists, the structure related to this
1343 * table will be recorded multiple times.
1344 */
1345 foreach(lc, pub_elem_tables)
1346 {
1348
1349 table_info->relid = lfirst_oid(lc);
1350 table_info->pubid = pub_elem->oid;
1352 }
1353
1354 /* At least one publication is using publish_via_partition_root. */
1355 if (pub_elem->pubviaroot)
1356 viaroot = true;
1357 }
1358
1359 /*
1360 * If the publication publishes partition changes via their respective
1361 * root partitioned tables, we must exclude partitions in favor of
1362 * including the root partitioned tables. Otherwise, the function
1363 * could return both the child and parent tables which could cause
1364 * data of the child table to be double-published on the subscriber
1365 * side.
1366 */
1367 if (viaroot)
1369
1370 /* Construct a tuple descriptor for the result rows. */
1372 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1373 OIDOID, -1, 0);
1374 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1375 OIDOID, -1, 0);
1376 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1377 INT2VECTOROID, -1, 0);
1378 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1379 PG_NODE_TREEOID, -1, 0);
1380
1381 TupleDescFinalize(tupdesc);
1382 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
1383 funcctx->user_fctx = table_infos;
1384
1385 MemoryContextSwitchTo(oldcontext);
1386 }
1387
1388 /* stuff done on every call of the function */
1390 table_infos = (List *) funcctx->user_fctx;
1391
1392 if (funcctx->call_cntr < list_length(table_infos))
1393 {
1396 Publication *pub;
1398 Oid relid = table_info->relid;
1401 bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1402
1403 /*
1404 * Form tuple with appropriate data.
1405 */
1406
1407 pub = GetPublication(table_info->pubid);
1408
1409 values[0] = ObjectIdGetDatum(pub->oid);
1410 values[1] = ObjectIdGetDatum(relid);
1411
1412 /*
1413 * We don't consider row filters or column lists for FOR ALL TABLES or
1414 * FOR TABLES IN SCHEMA publications.
1415 */
1416 if (!pub->alltables &&
1419 ObjectIdGetDatum(pub->oid)))
1421 ObjectIdGetDatum(relid),
1422 ObjectIdGetDatum(pub->oid));
1423
1425 {
1426 /* Lookup the column list attribute. */
1429 &(nulls[2]));
1430
1431 /* Null indicates no filter. */
1434 &(nulls[3]));
1435 }
1436 else
1437 {
1438 nulls[2] = true;
1439 nulls[3] = true;
1440 }
1441
1442 /* Show all columns when the column list is not specified. */
1443 if (nulls[2])
1444 {
1445 Relation rel = table_open(relid, AccessShareLock);
1446 int nattnums = 0;
1447 int16 *attnums;
1448 TupleDesc desc = RelationGetDescr(rel);
1449 int i;
1450
1451 attnums = palloc_array(int16, desc->natts);
1452
1453 for (i = 0; i < desc->natts; i++)
1454 {
1456
1457 if (att->attisdropped)
1458 continue;
1459
1460 if (att->attgenerated)
1461 {
1462 /* We only support replication of STORED generated cols. */
1463 if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
1464 continue;
1465
1466 /*
1467 * User hasn't requested to replicate STORED generated
1468 * cols.
1469 */
1471 continue;
1472 }
1473
1474 attnums[nattnums++] = att->attnum;
1475 }
1476
1477 if (nattnums > 0)
1478 {
1479 values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1480 nulls[2] = false;
1481 }
1482
1484 }
1485
1486 rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1487
1489 }
1490
1492}
1493
1494/*
1495 * Returns Oids of sequences in a publication.
1496 */
1497Datum
1499{
1501 List *sequences = NIL;
1502
1503 /* stuff done only on the first call of the function */
1504 if (SRF_IS_FIRSTCALL())
1505 {
1506 char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1507 Publication *publication;
1508 MemoryContext oldcontext;
1509
1510 /* create a function context for cross-call persistence */
1512
1513 /* switch to memory context appropriate for multiple function calls */
1514 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1515
1516 publication = GetPublicationByName(pubname, false);
1517
1518 if (publication->allsequences)
1521 false);
1522
1523 funcctx->user_fctx = sequences;
1524
1525 MemoryContextSwitchTo(oldcontext);
1526 }
1527
1528 /* stuff done on every call of the function */
1530 sequences = (List *) funcctx->user_fctx;
1531
1532 if (funcctx->call_cntr < list_length(sequences))
1533 {
1534 Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
1535
1537 }
1538
1540}
#define PG_GETARG_ARRAYTYPE_P(n)
Definition array.h:263
#define ARR_DATA_PTR(a)
Definition array.h:322
#define DatumGetArrayTypeP(X)
Definition array.h:261
#define ARR_DIMS(a)
Definition array.h:294
void deconstruct_array_builtin(const ArrayType *array, Oid elmtype, Datum **elemsp, bool **nullsp, int *nelemsp)
int16 AttrNumber
Definition attnum.h:21
#define AttrNumberIsForUserDefinedAttr(attributeNumber)
Definition attnum.h:41
#define InvalidAttrNumber
Definition attnum.h:23
int bms_next_member(const Bitmapset *a, int prevbit)
Definition bitmapset.c:1290
int bms_num_members(const Bitmapset *a)
Definition bitmapset.c:744
bool bms_is_member(int x, const Bitmapset *a)
Definition bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
static Datum values[MAXATTR]
Definition bootstrap.c:188
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define TextDatumGetCString(d)
Definition builtins.h:99
#define NameStr(name)
Definition c.h:837
#define gettext_noop(x)
Definition c.h:1287
#define Assert(condition)
Definition c.h:945
int16_t int16
Definition c.h:613
#define PG_INT16_MAX
Definition c.h:672
#define OidIsValid(objectId)
Definition c.h:860
bool IsToastNamespace(Oid namespaceId)
Definition catalog.c:261
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition catalog.c:448
bool IsCatalogNamespace(Oid namespaceId)
Definition catalog.c:243
bool IsCatalogRelation(Relation relation)
Definition catalog.c:104
bool IsCatalogRelationOid(Oid relid)
Definition catalog.c:121
void recordDependencyOnSingleRelExpr(const ObjectAddress *depender, Node *expr, Oid relId, DependencyType behavior, DependencyType self_behavior, bool reverse_self)
@ DEPENDENCY_AUTO
Definition dependency.h:34
@ DEPENDENCY_NORMAL
Definition dependency.h:33
int errcode(int sqlerrcode)
Definition elog.c:874
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
#define palloc_object(type)
Definition fe_memutils.h:74
#define palloc_array(type, count)
Definition fe_memutils.h:76
#define PG_GETARG_OID(n)
Definition fmgr.h:275
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition fmgr.h:360
#define SRF_IS_FIRSTCALL()
Definition funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition funcapi.h:306
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition funcapi.h:230
#define SRF_RETURN_DONE(_funcctx)
Definition funcapi.h:328
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
Definition heapam.c:1420
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1037
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1384
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
int2vector * buildint2vector(const int16 *int2s, int n)
Definition int.c:114
int j
Definition isn.c:78
int i
Definition isn.c:77
List * list_concat_unique_oid(List *list1, const List *list2)
Definition list.c:1469
List * lappend(List *list, void *datum)
Definition list.c:339
void list_sort(List *list, list_sort_comparator cmp)
Definition list.c:1674
List * list_concat(List *list1, const List *list2)
Definition list.c:561
List * lappend_oid(List *list, Oid datum)
Definition list.c:375
void list_deduplicate_oid(List *list)
Definition list.c:1495
int list_oid_cmp(const ListCell *p1, const ListCell *p2)
Definition list.c:1703
void list_free(List *list)
Definition list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition list.c:722
#define NoLock
Definition lockdefs.h:34
#define AccessShareLock
Definition lockdefs.h:36
#define RowExclusiveLock
Definition lockdefs.h:38
bool get_rel_relispartition(Oid relid)
Definition lsyscache.c:2247
AttrNumber get_attnum(Oid relid, const char *attname)
Definition lsyscache.c:977
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2223
Oid get_publication_oid(const char *pubname, bool missing_ok)
Definition lsyscache.c:3847
Oid get_rel_namespace(Oid relid)
Definition lsyscache.c:2172
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3588
char * pstrdup(const char *in)
Definition mcxt.c:1781
bool isAnyTempNamespace(Oid namespaceId)
Definition namespace.c:3759
static char * errmsg
const ObjectAddress InvalidObjectAddress
#define ObjectAddressSet(addr, class_id, object_id)
#define ObjectAddressSubSet(addr, class_id, object_id, object_sub_id)
char * nodeToString(const void *obj)
Definition outfuncs.c:811
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
List * get_partition_ancestors(Oid relid)
Definition partition.c:134
int16 attnum
FormData_pg_attribute * Form_pg_attribute
static const struct exclude_list_item skip[]
int errdetail_relkind_not_supported(char relkind)
Definition pg_class.c:24
FormData_pg_class * Form_pg_class
Definition pg_class.h:160
void recordDependencyOn(const ObjectAddress *depender, const ObjectAddress *referenced, DependencyType behavior)
Definition pg_depend.c:47
static SequenceItem * sequences
Definition pg_dump.c:214
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
static Oid list_nth_oid(const List *list, int n)
Definition pg_list.h:353
#define foreach_delete_current(lst, var_or_cell)
Definition pg_list.h:423
static void * list_nth(const List *list, int n)
Definition pg_list.h:331
#define lfirst_oid(lc)
Definition pg_list.h:174
static int2vector * attnumstoint2vector(Bitmapset *attrs)
Bitmapset * pub_collist_validate(Relation targetrel, List *columns)
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
bool is_schema_publication(Oid pubid)
ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
List * GetPublicationSchemas(Oid pubid)
static void check_publication_add_relation(PublicationRelInfo *pri)
List * GetAllTablesPublications(void)
List * GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
List * GetRelationIncludedPublications(Oid relid)
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
List * GetSchemaPublications(Oid schemaid)
Datum pg_get_publication_tables(PG_FUNCTION_ARGS)
static void filter_partitions(List *table_infos)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
List * GetIncludedPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
static List * get_publication_relations(Oid pubid, PublicationPartOpt pub_partopt, bool except_flag)
static bool is_publishable_class(Oid relid, Form_pg_class reltuple)
ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists, AlterPublicationStmt *alter_stmt)
Datum pg_get_publication_sequences(PG_FUNCTION_ARGS)
List * GetRelationExcludedPublications(Oid relid)
Bitmapset * pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
Publication * GetPublication(Oid pubid)
#define NUM_PUBLICATION_TABLES_ELEM
static bool is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
static void check_publication_add_schema(Oid schemaid)
List * GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot)
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
bool is_table_publication(Oid pubid)
static List * get_relation_publications(Oid relid, bool except_flag)
Datum pg_relation_is_publishable(PG_FUNCTION_ARGS)
bool is_publishable_relation(Relation rel)
List * GetExcludedPublicationTables(Oid pubid, PublicationPartOpt pub_partopt)
END_CATALOG_STRUCT typedef FormData_pg_publication * Form_pg_publication
PublicationPartOpt
@ PUBLICATION_PART_LEAF
@ PUBLICATION_PART_ROOT
@ PUBLICATION_PART_ALL
END_CATALOG_STRUCT typedef FormData_pg_publication_namespace * Form_pg_publication_namespace
END_CATALOG_STRUCT typedef FormData_pg_publication_rel * Form_pg_publication_rel
static Datum PointerGetDatum(const void *X)
Definition postgres.h:342
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define InvalidOid
unsigned int Oid
static int fb(int x)
void InvalidatePublicationRels(List *relids)
#define RelationGetForm(relation)
Definition rel.h:508
#define RelationGetRelid(relation)
Definition rel.h:514
#define RelationGetDescr(relation)
Definition rel.h:540
#define RelationGetRelationName(relation)
Definition rel.h:548
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
@ ForwardScanDirection
Definition sdir.h:28
#define BTEqualStrategyNumber
Definition stratnum.h:31
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
Definition pg_list.h:54
PublishGencolsType pubgencols_type
PublicationActions pubactions
Form_pg_class rd_rel
Definition rel.h:111
int16 values[FLEXIBLE_ARRAY_MEMBER]
Definition c.h:813
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache2(SysCacheIdentifier cacheId, Datum key1, Datum key2)
Definition syscache.c:231
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:596
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition syscache.h:93
#define ReleaseSysCacheList(x)
Definition syscache.h:134
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition syscache.h:102
#define SearchSysCacheList1(cacheId, key1)
Definition syscache.h:127
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, ScanKeyData *key)
Definition tableam.c:113
static void table_endscan(TableScanDesc scan)
Definition tableam.h:1007
#define FirstNormalObjectId
Definition transam.h:197
TupleDesc CreateTemplateTupleDesc(int natts)
Definition tupdesc.c:165
void TupleDescFinalize(TupleDesc tupdesc)
Definition tupdesc.c:511
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition tupdesc.c:900
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:178
#define strVal(v)
Definition value.h:82
char * text_to_cstring(const text *t)
Definition varlena.c:217