PostgreSQL Source Code git master
Loading...
Searching...
No Matches
tablesync.c File Reference
#include "postgres.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
Include dependency graph for tablesync.c:

Go to the source code of this file.

Functions

static bool wait_for_table_state_change (Oid relid, char expected_state)
 
static bool wait_for_worker_state_change (char expected_state)
 
void ProcessSyncingTablesForSync (XLogRecPtr current_lsn)
 
void ProcessSyncingTablesForApply (XLogRecPtr current_lsn)
 
static Listmake_copy_attnamelist (LogicalRepRelMapEntry *rel)
 
static int copy_read_data (void *outbuf, int minread, int maxread)
 
static void fetch_remote_table_info (char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
 
static void copy_table (Relation rel)
 
void ReplicationSlotNameForTablesync (Oid suboid, Oid relid, char *syncslotname, Size szslot)
 
static charLogicalRepSyncTableStart (XLogRecPtr *origin_startpos)
 
static void start_table_sync (XLogRecPtr *origin_startpos, char **slotname)
 
static void run_tablesync_worker (void)
 
void TableSyncWorkerMain (Datum main_arg)
 
bool AllTablesyncsReady (void)
 
bool HasSubscriptionTablesCached (void)
 
void UpdateTwoPhaseState (Oid suboid, char new_state)
 

Variables

Listtable_states_not_ready = NIL
 
static StringInfo copybuf = NULL
 

Function Documentation

◆ AllTablesyncsReady()

bool AllTablesyncsReady ( void  )

Definition at line 1596 of file tablesync.c.

1597{
1598 bool started_tx;
1599 bool has_tables;
1600
1601 /* We need up-to-date sync state info for subscription tables here. */
1603
1604 if (started_tx)
1605 {
1607 pgstat_report_stat(true);
1608 }
1609
1610 /*
1611 * Return false when there are no tables in subscription or not all tables
1612 * are in ready state; true otherwise.
1613 */
1614 return has_tables && (table_states_not_ready == NIL);
1615}
#define NIL
Definition pg_list.h:68
long pgstat_report_stat(bool force)
Definition pgstat.c:704
static int fb(int x)
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
Definition syncutils.c:203
List * table_states_not_ready
Definition tablesync.c:126
void CommitTransactionCommand(void)
Definition xact.c:3178

References CommitTransactionCommand(), fb(), FetchRelationStates(), NIL, pgstat_report_stat(), and table_states_not_ready.

Referenced by pa_can_start(), ProcessSyncingTablesForApply(), run_apply_worker(), and wait_for_local_flush().

◆ copy_read_data()

static int copy_read_data ( void outbuf,
int  minread,
int  maxread 
)
static

Definition at line 645 of file tablesync.c.

646{
647 int bytesread = 0;
648 int avail;
649
650 /* If there are some leftover data from previous read, use it. */
651 avail = copybuf->len - copybuf->cursor;
652 if (avail)
653 {
654 if (avail > maxread)
655 avail = maxread;
656 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
657 copybuf->cursor += avail;
658 maxread -= avail;
659 bytesread += avail;
660 }
661
662 while (maxread > 0 && bytesread < minread)
663 {
665 int len;
666 char *buf = NULL;
667
668 for (;;)
669 {
670 /* Try read the data. */
672
674
675 if (len == 0)
676 break;
677 else if (len < 0)
678 return bytesread;
679 else
680 {
681 /* Process the data */
682 copybuf->data = buf;
683 copybuf->len = len;
684 copybuf->cursor = 0;
685
686 avail = copybuf->len - copybuf->cursor;
687 if (avail > maxread)
688 avail = maxread;
689 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
690 outbuf = (char *) outbuf + avail;
691 copybuf->cursor += avail;
692 maxread -= avail;
693 bytesread += avail;
694 }
695
697 return bytesread;
698 }
699
700 /*
701 * Wait for more data or latch.
702 */
707
709 }
710
711 return bytesread;
712}
WalReceiverConn * LogRepWorkerWalRcvConn
Definition worker.c:478
struct Latch * MyLatch
Definition globals.c:63
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
void ResetLatch(Latch *latch)
Definition latch.c:374
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
const void size_t len
static char buf[DEFAULT_XLOG_SEG_SIZE]
int pgsocket
Definition port.h:29
#define PGINVALID_SOCKET
Definition port.h:31
static int fd(const char *x, int i)
static StringInfo copybuf
Definition tablesync.c:128
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define walrcv_receive(conn, buffer, wait_fd)

References buf, CHECK_FOR_INTERRUPTS, copybuf, StringInfoData::cursor, StringInfoData::data, fb(), fd(), StringInfoData::len, len, LogRepWorkerWalRcvConn, MyLatch, PGINVALID_SOCKET, ResetLatch(), WaitLatchOrSocket(), walrcv_receive, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by copy_table().

◆ copy_table()

static void copy_table ( Relation  rel)
static

Definition at line 1042 of file tablesync.c.

1043{
1044 LogicalRepRelMapEntry *relmapentry;
1046 List *qual = NIL;
1047 WalRcvExecResult *res;
1048 StringInfoData cmd;
1049 CopyFromState cstate;
1051 ParseState *pstate;
1052 List *options = NIL;
1053 bool gencol_published = false;
1054
1055 /* Get the publisher relation info. */
1057 RelationGetRelationName(rel), &lrel, &qual,
1059
1060 /* Put the relation into relmap. */
1062
1063 /* Map the publisher relation to local one. */
1064 relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1065 Assert(rel == relmapentry->localrel);
1066
1067 /* Start copy on the publisher. */
1068 initStringInfo(&cmd);
1069
1070 /* Regular or partitioned table with no row filter or generated columns */
1071 if ((lrel.relkind == RELKIND_RELATION || lrel.relkind == RELKIND_PARTITIONED_TABLE)
1072 && qual == NIL && !gencol_published)
1073 {
1074 appendStringInfo(&cmd, "COPY %s",
1075 quote_qualified_identifier(lrel.nspname, lrel.relname));
1076
1077 /* If the table has columns, then specify the columns */
1078 if (lrel.natts)
1079 {
1080 appendStringInfoString(&cmd, " (");
1081
1082 /*
1083 * XXX Do we need to list the columns in all cases? Maybe we're
1084 * replicating all columns?
1085 */
1086 for (int i = 0; i < lrel.natts; i++)
1087 {
1088 if (i > 0)
1089 appendStringInfoString(&cmd, ", ");
1090
1092 }
1093
1094 appendStringInfoChar(&cmd, ')');
1095 }
1096
1097 appendStringInfoString(&cmd, " TO STDOUT");
1098 }
1099 else
1100 {
1101 /*
1102 * For non-tables and tables with row filters, we need to do COPY
1103 * (SELECT ...), but we can't just do SELECT * because we may need to
1104 * copy only subset of columns including generated columns. For tables
1105 * with any row filters, build a SELECT query with OR'ed row filters
1106 * for COPY.
1107 *
1108 * We also need to use this same COPY (SELECT ...) syntax when
1109 * generated columns are published, because copy of generated columns
1110 * is not supported by the normal COPY.
1111 */
1112 appendStringInfoString(&cmd, "COPY (SELECT ");
1113 for (int i = 0; i < lrel.natts; i++)
1114 {
1116 if (i < lrel.natts - 1)
1117 appendStringInfoString(&cmd, ", ");
1118 }
1119
1120 appendStringInfoString(&cmd, " FROM ");
1121
1122 /*
1123 * For regular tables, make sure we don't copy data from a child that
1124 * inherits the named table as those will be copied separately.
1125 */
1126 if (lrel.relkind == RELKIND_RELATION)
1127 appendStringInfoString(&cmd, "ONLY ");
1128
1130 /* list of OR'ed filters */
1131 if (qual != NIL)
1132 {
1133 ListCell *lc;
1134 char *q = strVal(linitial(qual));
1135
1136 appendStringInfo(&cmd, " WHERE %s", q);
1137 for_each_from(lc, qual, 1)
1138 {
1139 q = strVal(lfirst(lc));
1140 appendStringInfo(&cmd, " OR %s", q);
1141 }
1142 list_free_deep(qual);
1143 }
1144
1145 appendStringInfoString(&cmd, ") TO STDOUT");
1146 }
1147
1148 /*
1149 * Prior to v16, initial table synchronization will use text format even
1150 * if the binary option is enabled for a subscription.
1151 */
1154 {
1155 appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1156 options = list_make1(makeDefElem("format",
1157 (Node *) makeString("binary"), -1));
1158 }
1159
1161 pfree(cmd.data);
1162 if (res->status != WALRCV_OK_COPY_OUT)
1163 ereport(ERROR,
1165 errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1166 lrel.nspname, lrel.relname, res->err)));
1168
1170
1171 pstate = make_parsestate(NULL);
1173 NULL, false, false);
1174
1175 attnamelist = make_copy_attnamelist(relmapentry);
1176 cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1177
1178 /* Do the copy */
1179 (void) CopyFrom(cstate);
1180
1181 logicalrep_rel_close(relmapentry, NoLock);
1182}
Subscription * MySubscription
Definition worker.c:480
#define Assert(condition)
Definition c.h:885
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition copyfrom.c:1525
uint64 CopyFrom(CopyFromState cstate)
Definition copyfrom.c:779
int errcode(int sqlerrcode)
Definition elog.c:874
int errmsg(const char *fmt,...)
Definition elog.c:1093
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:150
int i
Definition isn.c:77
void list_free_deep(List *list)
Definition list.c:1560
#define NoLock
Definition lockdefs.h:34
#define AccessShareLock
Definition lockdefs.h:36
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3518
DefElem * makeDefElem(char *name, Node *arg, int location)
Definition makefuncs.c:637
void pfree(void *pointer)
Definition mcxt.c:1616
ParseState * make_parsestate(ParseState *parentParseState)
Definition parse_node.c:39
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, LOCKMODE lockmode, Alias *alias, bool inh, bool inFromCl)
#define lfirst(lc)
Definition pg_list.h:172
#define list_make1(x1)
Definition pg_list.h:212
#define for_each_from(cell, lst, N)
Definition pg_list.h:414
#define linitial(l)
Definition pg_list.h:178
#define RelationGetRelationName(relation)
Definition rel.h:548
#define RelationGetNamespace(relation)
Definition rel.h:555
char * quote_qualified_identifier(const char *qualifier, const char *ident)
const char * quote_identifier(const char *ident)
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition relation.c:165
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition relation.c:518
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition relation.c:362
StringInfo makeStringInfo(void)
Definition stringinfo.c:72
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition stringinfo.c:242
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition pg_list.h:54
Definition nodes.h:135
WalRcvExecStatus status
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
Definition tablesync.c:625
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
Definition tablesync.c:724
static int copy_read_data(void *outbuf, int minread, int maxread)
Definition tablesync.c:645
String * makeString(char *str)
Definition value.c:63
#define strVal(v)
Definition value.h:82
@ WALRCV_OK_COPY_OUT
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)

References AccessShareLock, addRangeTableEntryForRelation(), appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), Assert, BeginCopyFrom(), Subscription::binary, copy_read_data(), copybuf, CopyFrom(), StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, fb(), fetch_remote_table_info(), for_each_from, get_namespace_name(), i, initStringInfo(), lfirst, linitial, list_free_deep(), list_make1, LogicalRepRelMapEntry::localrel, logicalrep_rel_close(), logicalrep_rel_open(), logicalrep_relmap_update(), LogRepWorkerWalRcvConn, make_copy_attnamelist(), make_parsestate(), makeDefElem(), makeString(), makeStringInfo(), MySubscription, NIL, NoLock, pfree(), quote_identifier(), quote_qualified_identifier(), RelationGetNamespace, RelationGetRelationName, WalRcvExecResult::status, strVal, walrcv_clear_result(), walrcv_exec, WALRCV_OK_COPY_OUT, and walrcv_server_version.

Referenced by LogicalRepSyncTableStart().

◆ fetch_remote_table_info()

static void fetch_remote_table_info ( char nspname,
char relname,
LogicalRepRelation lrel,
List **  qual,
bool gencol_published 
)
static

Definition at line 724 of file tablesync.c.

726{
727 WalRcvExecResult *res;
728 StringInfoData cmd;
729 TupleTableSlot *slot;
732 Oid qualRow[] = {TEXTOID};
733 bool isnull;
734 int natt;
735 StringInfo pub_names = NULL;
738
739 lrel->nspname = nspname;
740 lrel->relname = relname;
741
742 /* First fetch Oid and replica identity. */
743 initStringInfo(&cmd);
744 appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
745 " FROM pg_catalog.pg_class c"
746 " INNER JOIN pg_catalog.pg_namespace n"
747 " ON (c.relnamespace = n.oid)"
748 " WHERE n.nspname = %s"
749 " AND c.relname = %s",
750 quote_literal_cstr(nspname),
754
755 if (res->status != WALRCV_OK_TUPLES)
758 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
759 nspname, relname, res->err)));
760
762 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
765 errmsg("table \"%s.%s\" not found on publisher",
766 nspname, relname)));
767
768 lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
769 Assert(!isnull);
770 lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
771 Assert(!isnull);
772 lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
773 Assert(!isnull);
774
777
778
779 /*
780 * Get column lists for each relation.
781 *
782 * We need to do this before fetching info about column names and types,
783 * so that we can skip columns that should not be replicated.
784 */
785 if (server_version >= 150000)
786 {
790
791 /* Build the pub_names comma-separated string. */
792 pub_names = makeStringInfo();
794
795 /*
796 * Fetch info about column lists for the relation (from all the
797 * publications).
798 */
799 resetStringInfo(&cmd);
800 appendStringInfo(&cmd,
801 "SELECT DISTINCT"
802 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
803 " THEN NULL ELSE gpt.attrs END)"
804 " FROM pg_publication p,"
805 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
806 " pg_class c"
807 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
808 " AND p.pubname IN ( %s )",
809 lrel->remoteid,
810 pub_names->data);
811
814
815 if (pubres->status != WALRCV_OK_TUPLES)
818 errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
819 nspname, relname, pubres->err)));
820
821 /*
822 * We don't support the case where the column list is different for
823 * the same table when combining publications. See comments atop
824 * fetch_relation_list. So there should be only one row returned.
825 * Although we already checked this when creating the subscription, we
826 * still need to check here in case the column list was changed after
827 * creating the subscription and before the sync worker is started.
828 */
829 if (tuplestore_tuple_count(pubres->tuplestore) > 1)
832 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
833 nspname, relname));
834
835 /*
836 * Get the column list and build a single bitmap with the attnums.
837 *
838 * If we find a NULL value, it means all the columns should be
839 * replicated.
840 */
842 if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
843 {
844 Datum cfval = slot_getattr(tslot, 1, &isnull);
845
846 if (!isnull)
847 {
848 ArrayType *arr;
849 int nelems;
850 int16 *elems;
851
853 nelems = ARR_DIMS(arr)[0];
854 elems = (int16 *) ARR_DATA_PTR(arr);
855
856 for (natt = 0; natt < nelems; natt++)
858 }
859
861 }
863
865 }
866
867 /*
868 * Now fetch column names and types.
869 */
870 resetStringInfo(&cmd);
872 "SELECT a.attnum,"
873 " a.attname,"
874 " a.atttypid,"
875 " a.attnum = ANY(i.indkey)");
876
877 /* Generated columns can be replicated since version 18. */
878 if (server_version >= 180000)
879 appendStringInfoString(&cmd, ", a.attgenerated != ''");
880
881 appendStringInfo(&cmd,
882 " FROM pg_catalog.pg_attribute a"
883 " LEFT JOIN pg_catalog.pg_index i"
884 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
885 " WHERE a.attnum > 0::pg_catalog.int2"
886 " AND NOT a.attisdropped %s"
887 " AND a.attrelid = %u"
888 " ORDER BY a.attnum",
889 lrel->remoteid,
890 (server_version >= 120000 && server_version < 180000 ?
891 "AND a.attgenerated = ''" : ""),
892 lrel->remoteid);
895
896 if (res->status != WALRCV_OK_TUPLES)
899 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
900 nspname, relname, res->err)));
901
902 /* We don't know the number of rows coming, so allocate enough space. */
903 lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
904 lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
905 lrel->attkeys = NULL;
906
907 /*
908 * Store the columns as a list of names. Ignore those that are not
909 * present in the column list, if there is one.
910 */
911 natt = 0;
913 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
914 {
915 char *rel_colname;
917
918 attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
919 Assert(!isnull);
920
921 /* If the column is not in the column list, skip it. */
923 {
924 ExecClearTuple(slot);
925 continue;
926 }
927
928 rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
929 Assert(!isnull);
930
931 lrel->attnames[natt] = rel_colname;
932 lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
933 Assert(!isnull);
934
935 if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
936 lrel->attkeys = bms_add_member(lrel->attkeys, natt);
937
938 /* Remember if the remote table has published any generated column. */
939 if (server_version >= 180000 && !(*gencol_published))
940 {
941 *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
942 Assert(!isnull);
943 }
944
945 /* Should never happen. */
947 elog(ERROR, "too many columns in remote table \"%s.%s\"",
948 nspname, relname);
949
950 ExecClearTuple(slot);
951 }
953
954 lrel->natts = natt;
955
957
958 /*
959 * Get relation's row filter expressions. DISTINCT avoids the same
960 * expression of a table in multiple publications from being included
961 * multiple times in the final expression.
962 *
963 * We need to copy the row even if it matches just one of the
964 * publications, so we later combine all the quals with OR.
965 *
966 * For initial synchronization, row filtering can be ignored in following
967 * cases:
968 *
969 * 1) one of the subscribed publications for the table hasn't specified
970 * any row filter
971 *
972 * 2) one of the subscribed publications has puballtables set to true
973 *
974 * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
975 * that includes this relation
976 */
977 if (server_version >= 150000)
978 {
979 /* Reuse the already-built pub_names. */
980 Assert(pub_names != NULL);
981
982 /* Check for row filters. */
983 resetStringInfo(&cmd);
984 appendStringInfo(&cmd,
985 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
986 " FROM pg_publication p,"
987 " LATERAL pg_get_publication_tables(p.pubname) gpt"
988 " WHERE gpt.relid = %u"
989 " AND p.pubname IN ( %s )",
990 lrel->remoteid,
991 pub_names->data);
992
994
995 if (res->status != WALRCV_OK_TUPLES)
997 (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
998 nspname, relname, res->err)));
999
1000 /*
1001 * Multiple row filter expressions for the same table will be combined
1002 * by COPY using OR. If any of the filter expressions for this table
1003 * are null, it means the whole table will be copied. In this case it
1004 * is not necessary to construct a unified row filter expression at
1005 * all.
1006 */
1008 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1009 {
1010 Datum rf = slot_getattr(slot, 1, &isnull);
1011
1012 if (!isnull)
1013 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1014 else
1015 {
1016 /* Ignore filters and cleanup as necessary. */
1017 if (*qual)
1018 {
1019 list_free_deep(*qual);
1020 *qual = NIL;
1021 }
1022 break;
1023 }
1024
1025 ExecClearTuple(slot);
1026 }
1028
1030 destroyStringInfo(pub_names);
1031 }
1032
1033 pfree(cmd.data);
1034}
#define ARR_DATA_PTR(a)
Definition array.h:322
#define DatumGetArrayTypeP(X)
Definition array.h:261
#define ARR_DIMS(a)
Definition array.h:294
int16 AttrNumber
Definition attnum.h:21
bool bms_is_member(int x, const Bitmapset *a)
Definition bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
#define TextDatumGetCString(d)
Definition builtins.h:99
int16_t int16
Definition c.h:553
#define lengthof(array)
Definition c.h:815
#define elog(elevel,...)
Definition elog.h:226
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
Definition execTuples.c:86
#define MaxTupleAttributeNumber
List * lappend(List *list, void *datum)
Definition list.c:339
void * palloc0(Size size)
Definition mcxt.c:1417
int16 attnum
NameData relname
Definition pg_class.h:40
static int server_version
Definition pg_dumpall.c:121
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
static bool DatumGetBool(Datum X)
Definition postgres.h:100
static Oid DatumGetObjectId(Datum X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static char DatumGetChar(Datum X)
Definition postgres.h:122
static int16 DatumGetInt16(Datum X)
Definition postgres.h:172
unsigned int Oid
char * quote_literal_cstr(const char *rawstr)
Definition quote.c:101
void destroyStringInfo(StringInfo str)
Definition stringinfo.c:409
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
Tuplestorestate * tuplestore
TupleDesc tupledesc
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
int64 tuplestore_tuple_count(Tuplestorestate *state)
Definition tuplestore.c:580
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition tuptable.h:398
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:457
@ WALRCV_OK_TUPLES

References appendStringInfo(), appendStringInfoString(), ARR_DATA_PTR, ARR_DIMS, Assert, attnum, bms_add_member(), bms_is_member(), StringInfoData::data, DatumGetArrayTypeP, DatumGetBool(), DatumGetChar(), DatumGetInt16(), DatumGetObjectId(), destroyStringInfo(), elog, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), fb(), GetPublicationsStr(), initStringInfo(), lappend(), lengthof, list_free_deep(), LogRepWorkerWalRcvConn, MakeSingleTupleTableSlot(), makeString(), makeStringInfo(), MaxTupleAttributeNumber, MySubscription, NIL, palloc0(), pfree(), Subscription::publications, quote_literal_cstr(), relname, resetStringInfo(), server_version, slot_getattr(), WalRcvExecResult::status, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), tuplestore_tuple_count(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, and walrcv_server_version.

Referenced by copy_table().

◆ HasSubscriptionTablesCached()

bool HasSubscriptionTablesCached ( void  )

Definition at line 1626 of file tablesync.c.

1627{
1628 bool started_tx;
1629 bool has_tables;
1630
1631 /* We need up-to-date subscription tables info here */
1633
1634 if (started_tx)
1635 {
1637 pgstat_report_stat(true);
1638 }
1639
1640 return has_tables;
1641}

References CommitTransactionCommand(), fb(), FetchRelationStates(), and pgstat_report_stat().

Referenced by wait_for_local_flush().

◆ LogicalRepSyncTableStart()

static char * LogicalRepSyncTableStart ( XLogRecPtr origin_startpos)
static

Definition at line 1218 of file tablesync.c.

1219{
1220 char *slotname;
1221 char *err;
1222 char relstate;
1223 XLogRecPtr relstate_lsn;
1224 Relation rel;
1226 WalRcvExecResult *res;
1227 char originname[NAMEDATALEN];
1230 bool must_use_password;
1231 bool run_as_owner;
1232
1233 /* Check the state of the table synchronization. */
1237 &relstate_lsn);
1239
1240 /* Is the use of a password mandatory? */
1243
1245 MyLogicalRepWorker->relstate = relstate;
1246 MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1248
1249 /*
1250 * If synchronization is already done or no longer necessary, exit now
1251 * that we've updated shared memory state.
1252 */
1253 switch (relstate)
1254 {
1256 case SUBREL_STATE_READY:
1258 FinishSyncWorker(); /* doesn't return */
1259 }
1260
1261 /* Calculate the name of the tablesync slot. */
1262 slotname = (char *) palloc(NAMEDATALEN);
1265 slotname,
1266 NAMEDATALEN);
1267
1268 /*
1269 * Here we use the slot name instead of the subscription name as the
1270 * application_name, so that it is different from the leader apply worker,
1271 * so that synchronous replication can distinguish them.
1272 */
1276 slotname, &err);
1278 ereport(ERROR,
1280 errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1281 MySubscription->name, err)));
1282
1286
1287 /* Assign the origin tracking record name. */
1290 originname,
1291 sizeof(originname));
1292
1294 {
1295 /*
1296 * We have previously errored out before finishing the copy so the
1297 * replication slot might exist. We want to remove the slot if it
1298 * already exists and proceed.
1299 *
1300 * XXX We could also instead try to drop the slot, last time we failed
1301 * but for that, we might need to clean up the copy state as it might
1302 * be in the middle of fetching the rows. Also, if there is a network
1303 * breakdown then it wouldn't have succeeded so trying it next time
1304 * seems like a better bet.
1305 */
1307 }
1309 {
1310 /*
1311 * The COPY phase was previously done, but tablesync then crashed
1312 * before it was able to finish normally.
1313 */
1315
1316 /*
1317 * The origin tracking name must already exist. It was created first
1318 * time this tablesync was launched.
1319 */
1324
1326
1327 goto copy_table_done;
1328 }
1329
1334
1335 /*
1336 * Update the state, create the replication origin, and make them visible
1337 * to others.
1338 */
1344 false);
1345
1346 /*
1347 * Create the replication origin in a separate transaction from the one
1348 * that sets up the origin in shared memory. This prevents the risk that
1349 * changes to the origin in shared memory cannot be rolled back if the
1350 * transaction aborts.
1351 */
1353 if (!OidIsValid(originid))
1355
1357 pgstat_report_stat(true);
1358
1360
1361 /*
1362 * Use a standard write lock here. It might be better to disallow access
1363 * to the table while it's being synchronized. But we don't want to block
1364 * the main apply process from working and it has to open the relation in
1365 * RowExclusiveLock when remapping remote relation id to local one.
1366 */
1368
1369 /*
1370 * Start a transaction in the remote node in REPEATABLE READ mode. This
1371 * ensures that both the replication slot we create (see below) and the
1372 * COPY are consistent with each other.
1373 */
1375 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1376 0, NULL);
1377 if (res->status != WALRCV_OK_COMMAND)
1378 ereport(ERROR,
1380 errmsg("table copy could not start transaction on publisher: %s",
1381 res->err)));
1383
1384 /*
1385 * Create a new permanent logical decoding slot. This slot will be used
1386 * for the catchup phase after COPY is done, so tell it to use the
1387 * snapshot to make the final data consistent.
1388 */
1390 slotname, false /* permanent */ , false /* two_phase */ ,
1393
1394 /*
1395 * Advance the origin to the LSN got from walrcv_create_slot and then set
1396 * up the origin. The advancement is WAL logged for the purpose of
1397 * recovery. Locks are to prevent the replication origin from vanishing
1398 * while advancing.
1399 *
1400 * The purpose of doing these before the copy is to avoid doing the copy
1401 * again due to any error in advancing or setting up origin tracking.
1402 */
1405 true /* go backward */ , true /* WAL log */ );
1407
1410
1411 /*
1412 * If the user did not opt to run as the owner of the subscription
1413 * ('run_as_owner'), then copy the table as the owner of the table.
1414 */
1416 if (!run_as_owner)
1417 SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1418
1419 /*
1420 * Check that our table sync worker has permission to insert into the
1421 * target table.
1422 */
1424 ACL_INSERT);
1425 if (aclresult != ACLCHECK_OK)
1427 get_relkind_objtype(rel->rd_rel->relkind),
1429
1430 /*
1431 * COPY FROM does not honor RLS policies. That is not a problem for
1432 * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1433 * who has it implicitly), but other roles should not be able to
1434 * circumvent RLS. Disallow logical replication into RLS enabled
1435 * relations for such roles.
1436 */
1438 ereport(ERROR,
1440 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1443
1444 /* Now do the initial data copy */
1446 copy_table(rel);
1448
1449 res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1450 if (res->status != WALRCV_OK_COMMAND)
1451 ereport(ERROR,
1453 errmsg("table copy could not finish transaction on publisher: %s",
1454 res->err)));
1456
1457 if (!run_as_owner)
1459
1460 table_close(rel, NoLock);
1461
1462 /* Make the copy visible. */
1464
1465 /*
1466 * Update the persisted state to indicate the COPY phase is done; make it
1467 * visible to others.
1468 */
1473 false);
1474
1476
1478
1479 elog(DEBUG1,
1480 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1482
1483 /*
1484 * We are done with the initial data synchronization, update the state.
1485 */
1490
1491 /*
1492 * Finally, wait until the leader apply worker tells us to catch up and
1493 * then return to let LogicalRepApplyLoop do it.
1494 */
1496 return slotname;
1497}
AclResult
Definition acl.h:182
@ ACLCHECK_OK
Definition acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2654
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition aclchk.c:4057
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:644
#define OidIsValid(objectId)
Definition c.h:800
#define DEBUG1
Definition elog.h:30
void err(int eval, const char *fmt,...)
Definition err.c:43
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:56
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:107
#define RowExclusiveLock
Definition lockdefs.h:38
void * palloc(Size size)
Definition mcxt.c:1387
Oid GetUserId(void)
Definition miscinit.c:469
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition miscinit.c:988
ObjectType get_relkind_objtype(char relkind)
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:262
ReplOriginXactState replorigin_xact_state
Definition origin.c:166
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:231
void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition origin.c:918
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1328
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1146
#define ACL_INSERT
Definition parsenodes.h:76
#define NAMEDATALEN
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
#define InvalidOid
#define RelationGetRelid(relation)
Definition rel.h:514
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition rls.c:52
@ RLS_ENABLED
Definition rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
XLogRecPtr relstate_lsn
Form_pg_class rd_rel
Definition rel.h:111
ReplOriginId origin
Definition origin.h:45
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
pg_noreturn void FinishSyncWorker(void)
Definition syncutils.c:50
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
static bool wait_for_worker_state_change(char expected_state)
Definition tablesync.c:190
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition tablesync.c:1202
static void copy_table(Relation rel)
Definition tablesync.c:1042
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition usercontext.c:87
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
@ WALRCV_OK_COMMAND
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
@ CRS_USE_SNAPSHOT
Definition walsender.h:24
void CommandCounterIncrement(void)
Definition xact.c:1101
void StartTransactionCommand(void)
Definition xact.c:3080
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References ACL_INSERT, aclcheck_error(), ACLCHECK_OK, Assert, check_enable_rls(), CommandCounterIncrement(), CommitTransactionCommand(), Subscription::conninfo, copy_table(), CRS_USE_SNAPSHOT, DEBUG1, elog, ereport, WalRcvExecResult::err, err(), errcode(), errmsg(), ERROR, Subscription::failover, fb(), FinishSyncWorker(), get_relkind_objtype(), GetSubscriptionRelState(), GetTransactionSnapshot(), GetUserId(), GetUserNameFromId(), InvalidOid, InvalidXLogRecPtr, LockRelationOid(), LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, NoLock, Subscription::oid, OidIsValid, ReplOriginXactState::origin, Subscription::ownersuperuser, palloc(), Subscription::passwordrequired, pg_class_aclcheck(), pgstat_report_stat(), PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_advance(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_setup(), replorigin_xact_state, RestoreUserContext(), RLS_ENABLED, RowExclusiveLock, Subscription::runasowner, SpinLockAcquire(), SpinLockRelease(), StartTransactionCommand(), WalRcvExecResult::status, LogicalRepWorker::subid, SwitchToUntrustedUser(), table_close(), table_open(), UnlockRelationOid(), UpdateSubscriptionRelState(), wait_for_worker_state_change(), walrcv_clear_result(), walrcv_connect, walrcv_create_slot, walrcv_exec, and WALRCV_OK_COMMAND.

Referenced by start_table_sync().

◆ make_copy_attnamelist()

static List * make_copy_attnamelist ( LogicalRepRelMapEntry rel)
static

Definition at line 625 of file tablesync.c.

626{
628 int i;
629
630 for (i = 0; i < rel->remoterel.natts; i++)
631 {
634 }
635
636
637 return attnamelist;
638}
LogicalRepRelation remoterel

References LogicalRepRelation::attnames, fb(), i, lappend(), makeString(), LogicalRepRelation::natts, NIL, and LogicalRepRelMapEntry::remoterel.

Referenced by copy_table().

◆ ProcessSyncingTablesForApply()

void ProcessSyncingTablesForApply ( XLogRecPtr  current_lsn)

Definition at line 367 of file tablesync.c.

368{
370 {
371 Oid relid;
372 TimestampTz last_start_time;
373 };
374 static HTAB *last_start_times = NULL;
375 ListCell *lc;
376 bool started_tx;
377 bool should_exit = false;
378 Relation rel = NULL;
379
381
382 /* We need up-to-date sync state info for subscription tables here. */
384
385 /*
386 * Prepare a hash table for tracking last start times of workers, to avoid
387 * immediate restarts. We don't need it if there are no tables that need
388 * syncing.
389 */
391 {
392 HASHCTL ctl;
393
394 ctl.keysize = sizeof(Oid);
395 ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
396 last_start_times = hash_create("Logical replication table sync worker start times",
397 256, &ctl, HASH_ELEM | HASH_BLOBS);
398 }
399
400 /*
401 * Clean up the hash table when we're done with all tables (just to
402 * release the bit of memory).
403 */
405 {
408 }
409
410 /*
411 * Process all tables that are being synchronized.
412 */
413 foreach(lc, table_states_not_ready)
414 {
416
417 if (!started_tx)
418 {
420 started_tx = true;
421 }
422
424
425 if (rstate->state == SUBREL_STATE_SYNCDONE)
426 {
427 /*
428 * Apply has caught up to the position where the table sync has
429 * finished. Mark the table as ready so that the apply will just
430 * continue to replicate it normally.
431 */
432 if (current_lsn >= rstate->lsn)
433 {
435
436 rstate->state = SUBREL_STATE_READY;
437 rstate->lsn = current_lsn;
438
439 /*
440 * Remove the tablesync origin tracking if exists.
441 *
442 * There is a chance that the user is concurrently performing
443 * refresh for the subscription where we remove the table
444 * state and its origin or the tablesync worker would have
445 * already removed this origin. We can't rely on tablesync
446 * worker to remove the origin tracking as if there is any
447 * error while dropping we won't restart it to drop the
448 * origin. So passing missing_ok = true.
449 *
450 * Lock the subscription and origin in the same order as we
451 * are doing during DDL commands to avoid deadlocks. See
452 * AlterSubscription_refresh.
453 */
455 0, AccessShareLock);
456
457 if (!rel)
459
461 rstate->relid,
463 sizeof(originname));
465
466 /*
467 * Update the state to READY only after the origin cleanup.
468 */
470 rstate->relid, rstate->state,
471 rstate->lsn, true);
472 }
473 }
474 else
475 {
477
478 /*
479 * Look for a sync worker for this relation.
480 */
482
485 rstate->relid, false);
486
487 if (syncworker)
488 {
489 /* Found one, update our copy of its state */
490 SpinLockAcquire(&syncworker->relmutex);
491 rstate->state = syncworker->relstate;
492 rstate->lsn = syncworker->relstate_lsn;
493 if (rstate->state == SUBREL_STATE_SYNCWAIT)
494 {
495 /*
496 * Sync worker is waiting for apply. Tell sync worker it
497 * can catchup now.
498 */
500 syncworker->relstate_lsn =
501 Max(syncworker->relstate_lsn, current_lsn);
502 }
503 SpinLockRelease(&syncworker->relmutex);
504
505 /* If we told worker to catch up, wait for it. */
506 if (rstate->state == SUBREL_STATE_SYNCWAIT)
507 {
508 /* Signal the sync worker, as it may be waiting for us. */
509 if (syncworker->proc)
511
512 /* Now safe to release the LWLock */
514
515 if (started_tx)
516 {
517 /*
518 * We must commit the existing transaction to release
519 * the existing locks before entering a busy loop.
520 * This is required to avoid any undetected deadlocks
521 * due to any existing lock as deadlock detector won't
522 * be able to detect the waits on the latch.
523 *
524 * Also close any tables prior to the commit.
525 */
526 if (rel)
527 {
528 table_close(rel, NoLock);
529 rel = NULL;
530 }
532 pgstat_report_stat(false);
533 }
534
535 /*
536 * Enter busy loop and wait for synchronization worker to
537 * reach expected state (or die trying).
538 */
540 started_tx = true;
541
544 }
545 else
547 }
548 else
549 {
550 /*
551 * If there is no sync worker for this table yet, count
552 * running sync workers for this subscription, while we have
553 * the lock.
554 */
555 int nsyncworkers =
558 bool found;
559
560 /* Now safe to release the LWLock */
562
564 HASH_ENTER, &found);
565 if (!found)
566 hentry->last_start_time = 0;
567
569 rstate->relid, &hentry->last_start_time);
570 }
571 }
572 }
573
574 /* Close table if opened */
575 if (rel)
576 table_close(rel, NoLock);
577
578
579 if (started_tx)
580 {
581 /*
582 * Even when the two_phase mode is requested by the user, it remains
583 * as 'pending' until all tablesyncs have reached READY state.
584 *
585 * When this happens, we restart the apply worker and (if the
586 * conditions are still ok) then the two_phase tri-state will become
587 * 'enabled' at that time.
588 *
589 * Note: If the subscription has no tables then leave the state as
590 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
591 * work.
592 */
594 {
595 CommandCounterIncrement(); /* make updates visible */
596 if (AllTablesyncsReady())
597 {
598 ereport(LOG,
599 (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
601 should_exit = true;
602 }
603 }
604
606 pgstat_report_stat(true);
607 }
608
609 if (should_exit)
610 {
611 /*
612 * Reset the last-start time for this worker so that the launcher will
613 * restart it without waiting for wal_retrieve_retry_interval.
614 */
616
617 proc_exit(0);
618 }
619}
#define Max(x, y)
Definition c.h:1013
int64 TimestampTz
Definition timestamp.h:39
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
void hash_destroy(HTAB *hashp)
Definition dynahash.c:865
#define LOG
Definition elog.h:31
@ HASH_ENTER
Definition hsearch.h:114
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_BLOBS
Definition hsearch.h:97
void proc_exit(int code)
Definition ipc.c:105
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition launcher.c:746
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition launcher.c:258
static dshash_table * last_start_times
Definition launcher.c:91
int logicalrep_sync_worker_count(Oid subid)
Definition launcher.c:927
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1154
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2153
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_SHARED
Definition lwlock.h:113
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:447
tree ctl
Definition radixtree.h:1838
Size keysize
Definition hsearch.h:75
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
Definition syncutils.c:118
bool AllTablesyncsReady(void)
Definition tablesync.c:1596
static bool wait_for_table_state_change(Oid relid, char expected_state)
Definition tablesync.c:141
@ WORKERTYPE_TABLESYNC
bool IsTransactionState(void)
Definition xact.c:388

References AccessShareLock, AllTablesyncsReady(), ApplyLauncherForgetWorkerStartTime(), Assert, CommandCounterIncrement(), CommitTransactionCommand(), ctl, ereport, errmsg(), fb(), FetchRelationStates(), get_rel_relkind(), HASH_BLOBS, hash_create(), hash_destroy(), HASH_ELEM, HASH_ENTER, hash_search(), IsTransactionState(), HASHCTL::keysize, last_start_times, launch_sync_worker(), lfirst, LockSharedObject(), LOG, logicalrep_sync_worker_count(), logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), SubscriptionRelState::lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), Max, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, NIL, NoLock, Subscription::oid, pgstat_report_stat(), proc_exit(), SubscriptionRelState::relid, ReplicationOriginNameForLogicalRep(), replorigin_drop_by_name(), RowExclusiveLock, SpinLockAcquire(), SpinLockRelease(), StartTransactionCommand(), SubscriptionRelState::state, LogicalRepWorker::subid, table_close(), table_open(), table_states_not_ready, Subscription::twophasestate, UpdateSubscriptionRelState(), wait_for_table_state_change(), and WORKERTYPE_TABLESYNC.

Referenced by ProcessSyncingRelations().

◆ ProcessSyncingTablesForSync()

void ProcessSyncingTablesForSync ( XLogRecPtr  current_lsn)

Definition at line 245 of file tablesync.c.

246{
248
251 {
252 TimeLineID tli;
253 char syncslotname[NAMEDATALEN] = {0};
254 char originname[NAMEDATALEN] = {0};
255
258
260
261 /*
262 * UpdateSubscriptionRelState must be called within a transaction.
263 */
264 if (!IsTransactionState())
266
271 false);
272
273 /*
274 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
275 * the slot.
276 */
278
279 /*
280 * Cleanup the tablesync slot.
281 *
282 * This has to be done after updating the state because otherwise if
283 * there is an error while doing the database operations we won't be
284 * able to rollback dropped slot.
285 */
289 sizeof(syncslotname));
290
291 /*
292 * It is important to give an error if we are unable to drop the slot,
293 * otherwise, it won't be dropped till the corresponding subscription
294 * is dropped. So passing missing_ok = false.
295 */
297
299 pgstat_report_stat(false);
300
301 /*
302 * Start a new transaction to clean up the tablesync origin tracking.
303 * This transaction will be ended within the FinishSyncWorker(). Now,
304 * even, if we fail to remove this here, the apply worker will ensure
305 * to clean it up afterward.
306 *
307 * We need to do this after the table state is set to SYNCDONE.
308 * Otherwise, if an error occurs while performing the database
309 * operation, the worker will be restarted and the in-memory state of
310 * replication progress (remote_lsn) won't be rolled-back which would
311 * have been cleared before restart. So, the restarted worker will use
312 * invalid replication progress state resulting in replay of
313 * transactions that have already been applied.
314 */
316
320 sizeof(originname));
321
322 /*
323 * Resetting the origin session removes the ownership of the slot.
324 * This is needed to allow the origin to be dropped.
325 */
328
329 /*
330 * Drop the tablesync's origin tracking if exists.
331 *
332 * There is a chance that the user is concurrently performing refresh
333 * for the subscription where we remove the table state and its origin
334 * or the apply worker would have removed this origin. So passing
335 * missing_ok = true.
336 */
338
340 }
341 else
343}
void replorigin_session_reset(void)
Definition origin.c:1276
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1352
#define walrcv_endstreaming(conn, next_tli)
uint32 TimeLineID
Definition xlogdefs.h:63

References CommitTransactionCommand(), fb(), FinishSyncWorker(), IsTransactionState(), LogRepWorkerWalRcvConn, MyLogicalRepWorker, NAMEDATALEN, pgstat_report_stat(), LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), replorigin_session_reset(), replorigin_xact_clear(), SpinLockAcquire(), SpinLockRelease(), StartTransactionCommand(), LogicalRepWorker::subid, UpdateSubscriptionRelState(), and walrcv_endstreaming.

Referenced by ProcessSyncingRelations().

◆ ReplicationSlotNameForTablesync()

void ReplicationSlotNameForTablesync ( Oid  suboid,
Oid  relid,
char syncslotname,
Size  szslot 
)

Definition at line 1202 of file tablesync.c.

1204{
1205 snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1206 relid, GetSystemIdentifier());
1207}
#define UINT64_FORMAT
Definition c.h:577
#define snprintf
Definition port.h:260
uint64 GetSystemIdentifier(void)
Definition xlog.c:4610

References fb(), GetSystemIdentifier(), snprintf, and UINT64_FORMAT.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), ProcessSyncingTablesForSync(), and ReportSlotConnectionError().

◆ run_tablesync_worker()

static void run_tablesync_worker ( void  )
static

Definition at line 1550 of file tablesync.c.

1551{
1552 char originname[NAMEDATALEN];
1554 char *slotname = NULL;
1556
1557 start_table_sync(&origin_startpos, &slotname);
1558
1561 originname,
1562 sizeof(originname));
1563
1565
1567
1569
1570 /* Apply the changes till we catchup with the apply worker. */
1572}
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition worker.c:5553
void start_apply(XLogRecPtr origin_startpos)
Definition worker.c:5622
void set_apply_error_context_origin(char *originname)
Definition worker.c:6342
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
Definition tablesync.c:1508
#define walrcv_startstreaming(conn, options)

References fb(), InvalidXLogRecPtr, LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, NAMEDATALEN, Subscription::oid, LogicalRepWorker::relid, ReplicationOriginNameForLogicalRep(), set_apply_error_context_origin(), set_stream_options(), start_apply(), start_table_sync(), and walrcv_startstreaming.

Referenced by TableSyncWorkerMain().

◆ start_table_sync()

static void start_table_sync ( XLogRecPtr origin_startpos,
char **  slotname 
)
static

Definition at line 1508 of file tablesync.c.

1509{
1510 char *sync_slotname = NULL;
1511
1513
1514 PG_TRY();
1515 {
1516 /* Call initial sync. */
1518 }
1519 PG_CATCH();
1520 {
1523 else
1524 {
1525 /*
1526 * Report the worker failed during table synchronization. Abort
1527 * the current transaction so that the stats message is sent in an
1528 * idle state.
1529 */
1532
1533 PG_RE_THROW();
1534 }
1535 }
1536 PG_END_TRY();
1537
1538 /* allocate slot name in long-lived context */
1541}
void DisableSubscriptionAndExit(void)
Definition worker.c:5985
MemoryContext ApplyContext
Definition worker.c:473
#define PG_RE_THROW()
Definition elog.h:405
#define PG_TRY(...)
Definition elog.h:372
#define PG_END_TRY(...)
Definition elog.h:397
#define PG_CATCH(...)
Definition elog.h:382
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1768
void pgstat_report_subscription_error(Oid subid)
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition tablesync.c:1218
static bool am_tablesync_worker(void)
void AbortOutOfAnyTransaction(void)
Definition xact.c:4884

References AbortOutOfAnyTransaction(), am_tablesync_worker(), ApplyContext, Assert, Subscription::disableonerr, DisableSubscriptionAndExit(), fb(), LogicalRepSyncTableStart(), MemoryContextStrdup(), MySubscription, Subscription::oid, pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, and pgstat_report_subscription_error().

Referenced by run_tablesync_worker().

◆ TableSyncWorkerMain()

void TableSyncWorkerMain ( Datum  main_arg)

Definition at line 1576 of file tablesync.c.

1577{
1579
1581
1583
1585}
void SetupApplyOrSyncWorker(int worker_slot)
Definition worker.c:5925
static int32 DatumGetInt32(Datum X)
Definition postgres.h:212
static void run_tablesync_worker(void)
Definition tablesync.c:1550

References DatumGetInt32(), fb(), FinishSyncWorker(), run_tablesync_worker(), and SetupApplyOrSyncWorker().

◆ UpdateTwoPhaseState()

void UpdateTwoPhaseState ( Oid  suboid,
char  new_state 
)

Definition at line 1647 of file tablesync.c.

1648{
1649 Relation rel;
1650 HeapTuple tup;
1651 bool nulls[Natts_pg_subscription];
1654
1658
1661 if (!HeapTupleIsValid(tup))
1662 elog(ERROR,
1663 "cache lookup failed for subscription oid %u",
1664 suboid);
1665
1666 /* Form a new tuple. */
1667 memset(values, 0, sizeof(values));
1668 memset(nulls, false, sizeof(nulls));
1669 memset(replaces, false, sizeof(replaces));
1670
1671 /* And update/set two_phase state */
1674
1676 values, nulls, replaces);
1677 CatalogTupleUpdate(rel, &tup->t_self, tup);
1678
1681}
static Datum values[MAXATTR]
Definition bootstrap.c:147
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition htup.h:78
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define RelationGetDescr(relation)
Definition rel.h:540
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91

References Assert, CatalogTupleUpdate(), CharGetDatum(), elog, ERROR, fb(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, table_close(), table_open(), and values.

Referenced by CreateSubscription(), and run_apply_worker().

◆ wait_for_table_state_change()

static bool wait_for_table_state_change ( Oid  relid,
char  expected_state 
)
static

Definition at line 141 of file tablesync.c.

142{
143 char state;
144
145 for (;;)
146 {
147 LogicalRepWorker *worker;
148 XLogRecPtr statelsn;
149
151
154 relid, &statelsn);
155
157 break;
158
159 if (state == expected_state)
160 return true;
161
162 /* Check if the sync worker is still running and bail if not. */
166 false);
168 if (!worker)
169 break;
170
174
176 }
177
178 return false;
179}
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
void InvalidateCatalogSnapshot(void)
Definition snapmgr.c:455

References CHECK_FOR_INTERRUPTS, fb(), GetSubscriptionRelState(), InvalidateCatalogSnapshot(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, ResetLatch(), LogicalRepWorker::subid, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_TIMEOUT, and WORKERTYPE_TABLESYNC.

Referenced by ProcessSyncingTablesForApply().

◆ wait_for_worker_state_change()

static bool wait_for_worker_state_change ( char  expected_state)
static

Definition at line 190 of file tablesync.c.

191{
192 int rc;
193
194 for (;;)
195 {
196 LogicalRepWorker *worker;
197
199
200 /*
201 * Done if already in correct state. (We assume this fetch is atomic
202 * enough to not give a misleading answer if we do it with no lock.)
203 */
205 return true;
206
207 /*
208 * Bail out if the apply worker has died, else signal it we're
209 * waiting.
210 */
214 false);
215 if (worker && worker->proc)
218 if (!worker)
219 break;
220
221 /*
222 * Wait. We expect to get a latch signal back from the apply worker,
223 * but use a timeout in case it dies without sending one.
224 */
225 rc = WaitLatch(MyLatch,
228
229 if (rc & WL_LATCH_SET)
231 }
232
233 return false;
234}
@ WORKERTYPE_APPLY

References CHECK_FOR_INTERRUPTS, fb(), InvalidOid, logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, LogicalRepWorker::proc, LogicalRepWorker::relstate, ResetLatch(), LogicalRepWorker::subid, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_TIMEOUT, and WORKERTYPE_APPLY.

Referenced by LogicalRepSyncTableStart().

Variable Documentation

◆ copybuf

◆ table_states_not_ready

List* table_states_not_ready = NIL