PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
tablesync.c File Reference
#include "postgres.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
Include dependency graph for tablesync.c:

Go to the source code of this file.

Enumerations

enum  SyncingTablesState { SYNC_TABLE_STATE_NEEDS_REBUILD , SYNC_TABLE_STATE_REBUILD_STARTED , SYNC_TABLE_STATE_VALID }
 

Functions

static bool FetchTableStates (bool *started_tx)
 
static pg_noreturn void finish_sync_worker (void)
 
static bool wait_for_relation_state_change (Oid relid, char expected_state)
 
static bool wait_for_worker_state_change (char expected_state)
 
void invalidate_syncing_table_states (Datum arg, int cacheid, uint32 hashvalue)
 
static void process_syncing_tables_for_sync (XLogRecPtr current_lsn)
 
static void process_syncing_tables_for_apply (XLogRecPtr current_lsn)
 
void process_syncing_tables (XLogRecPtr current_lsn)
 
static Listmake_copy_attnamelist (LogicalRepRelMapEntry *rel)
 
static int copy_read_data (void *outbuf, int minread, int maxread)
 
static void fetch_remote_table_info (char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
 
static void copy_table (Relation rel)
 
void ReplicationSlotNameForTablesync (Oid suboid, Oid relid, char *syncslotname, Size szslot)
 
static char * LogicalRepSyncTableStart (XLogRecPtr *origin_startpos)
 
static void start_table_sync (XLogRecPtr *origin_startpos, char **slotname)
 
static void run_tablesync_worker ()
 
void TablesyncWorkerMain (Datum main_arg)
 
bool AllTablesyncsReady (void)
 
void UpdateTwoPhaseState (Oid suboid, char new_state)
 

Variables

static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD
 
static Listtable_states_not_ready = NIL
 
static StringInfo copybuf = NULL
 

Enumeration Type Documentation

◆ SyncingTablesState

Enumerator
SYNC_TABLE_STATE_NEEDS_REBUILD 
SYNC_TABLE_STATE_REBUILD_STARTED 
SYNC_TABLE_STATE_VALID 

Definition at line 126 of file tablesync.c.

127{
SyncingTablesState
Definition: tablesync.c:127
@ SYNC_TABLE_STATE_REBUILD_STARTED
Definition: tablesync.c:129
@ SYNC_TABLE_STATE_VALID
Definition: tablesync.c:130
@ SYNC_TABLE_STATE_NEEDS_REBUILD
Definition: tablesync.c:128

Function Documentation

◆ AllTablesyncsReady()

bool AllTablesyncsReady ( void  )

Definition at line 1738 of file tablesync.c.

1739{
1740 bool started_tx = false;
1741 bool has_subrels = false;
1742
1743 /* We need up-to-date sync state info for subscription tables here. */
1744 has_subrels = FetchTableStates(&started_tx);
1745
1746 if (started_tx)
1747 {
1749 pgstat_report_stat(true);
1750 }
1751
1752 /*
1753 * Return false when there are no tables in subscription or not all tables
1754 * are in ready state; true otherwise.
1755 */
1756 return has_subrels && (table_states_not_ready == NIL);
1757}
#define NIL
Definition: pg_list.h:68
long pgstat_report_stat(bool force)
Definition: pgstat.c:691
static List * table_states_not_ready
Definition: tablesync.c:134
static bool FetchTableStates(bool *started_tx)
Definition: tablesync.c:1579
void CommitTransactionCommand(void)
Definition: xact.c:3157

References CommitTransactionCommand(), FetchTableStates(), NIL, pgstat_report_stat(), and table_states_not_ready.

Referenced by pa_can_start(), process_syncing_tables_for_apply(), and run_apply_worker().

◆ copy_read_data()

static int copy_read_data ( void *  outbuf,
int  minread,
int  maxread 
)
static

Definition at line 717 of file tablesync.c.

718{
719 int bytesread = 0;
720 int avail;
721
722 /* If there are some leftover data from previous read, use it. */
723 avail = copybuf->len - copybuf->cursor;
724 if (avail)
725 {
726 if (avail > maxread)
727 avail = maxread;
728 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
729 copybuf->cursor += avail;
730 maxread -= avail;
731 bytesread += avail;
732 }
733
734 while (maxread > 0 && bytesread < minread)
735 {
737 int len;
738 char *buf = NULL;
739
740 for (;;)
741 {
742 /* Try read the data. */
744
746
747 if (len == 0)
748 break;
749 else if (len < 0)
750 return bytesread;
751 else
752 {
753 /* Process the data */
754 copybuf->data = buf;
755 copybuf->len = len;
756 copybuf->cursor = 0;
757
758 avail = copybuf->len - copybuf->cursor;
759 if (avail > maxread)
760 avail = maxread;
761 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
762 outbuf = (char *) outbuf + avail;
763 copybuf->cursor += avail;
764 maxread -= avail;
765 bytesread += avail;
766 }
767
768 if (maxread <= 0 || bytesread >= minread)
769 return bytesread;
770 }
771
772 /*
773 * Wait for more data or latch.
774 */
778 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
779
781 }
782
783 return bytesread;
784}
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:297
struct Latch * MyLatch
Definition: globals.c:64
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:221
void ResetLatch(Latch *latch)
Definition: latch.c:372
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
const void size_t len
static char * buf
Definition: pg_test_fsync.c:72
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
static StringInfo copybuf
Definition: tablesync.c:137
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:455

References buf, CHECK_FOR_INTERRUPTS, copybuf, StringInfoData::cursor, StringInfoData::data, fd(), StringInfoData::len, len, LogRepWorkerWalRcvConn, MyLatch, PGINVALID_SOCKET, ResetLatch(), WaitLatchOrSocket(), walrcv_receive, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by copy_table().

◆ copy_table()

static void copy_table ( Relation  rel)
static

Definition at line 1114 of file tablesync.c.

1115{
1116 LogicalRepRelMapEntry *relmapentry;
1117 LogicalRepRelation lrel;
1118 List *qual = NIL;
1119 WalRcvExecResult *res;
1120 StringInfoData cmd;
1121 CopyFromState cstate;
1122 List *attnamelist;
1123 ParseState *pstate;
1124 List *options = NIL;
1125 bool gencol_published = false;
1126
1127 /* Get the publisher relation info. */
1129 RelationGetRelationName(rel), &lrel, &qual,
1130 &gencol_published);
1131
1132 /* Put the relation into relmap. */
1134
1135 /* Map the publisher relation to local one. */
1136 relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1137 Assert(rel == relmapentry->localrel);
1138
1139 /* Start copy on the publisher. */
1140 initStringInfo(&cmd);
1141
1142 /* Regular table with no row filter or generated columns */
1143 if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
1144 {
1145 appendStringInfo(&cmd, "COPY %s",
1147
1148 /* If the table has columns, then specify the columns */
1149 if (lrel.natts)
1150 {
1151 appendStringInfoString(&cmd, " (");
1152
1153 /*
1154 * XXX Do we need to list the columns in all cases? Maybe we're
1155 * replicating all columns?
1156 */
1157 for (int i = 0; i < lrel.natts; i++)
1158 {
1159 if (i > 0)
1160 appendStringInfoString(&cmd, ", ");
1161
1163 }
1164
1165 appendStringInfoChar(&cmd, ')');
1166 }
1167
1168 appendStringInfoString(&cmd, " TO STDOUT");
1169 }
1170 else
1171 {
1172 /*
1173 * For non-tables and tables with row filters, we need to do COPY
1174 * (SELECT ...), but we can't just do SELECT * because we may need to
1175 * copy only subset of columns including generated columns. For tables
1176 * with any row filters, build a SELECT query with OR'ed row filters
1177 * for COPY.
1178 *
1179 * We also need to use this same COPY (SELECT ...) syntax when
1180 * generated columns are published, because copy of generated columns
1181 * is not supported by the normal COPY.
1182 */
1183 appendStringInfoString(&cmd, "COPY (SELECT ");
1184 for (int i = 0; i < lrel.natts; i++)
1185 {
1187 if (i < lrel.natts - 1)
1188 appendStringInfoString(&cmd, ", ");
1189 }
1190
1191 appendStringInfoString(&cmd, " FROM ");
1192
1193 /*
1194 * For regular tables, make sure we don't copy data from a child that
1195 * inherits the named table as those will be copied separately.
1196 */
1197 if (lrel.relkind == RELKIND_RELATION)
1198 appendStringInfoString(&cmd, "ONLY ");
1199
1201 /* list of OR'ed filters */
1202 if (qual != NIL)
1203 {
1204 ListCell *lc;
1205 char *q = strVal(linitial(qual));
1206
1207 appendStringInfo(&cmd, " WHERE %s", q);
1208 for_each_from(lc, qual, 1)
1209 {
1210 q = strVal(lfirst(lc));
1211 appendStringInfo(&cmd, " OR %s", q);
1212 }
1213 list_free_deep(qual);
1214 }
1215
1216 appendStringInfoString(&cmd, ") TO STDOUT");
1217 }
1218
1219 /*
1220 * Prior to v16, initial table synchronization will use text format even
1221 * if the binary option is enabled for a subscription.
1222 */
1225 {
1226 appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1227 options = list_make1(makeDefElem("format",
1228 (Node *) makeString("binary"), -1));
1229 }
1230
1231 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1232 pfree(cmd.data);
1233 if (res->status != WALRCV_OK_COPY_OUT)
1234 ereport(ERROR,
1235 (errcode(ERRCODE_CONNECTION_FAILURE),
1236 errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1237 lrel.nspname, lrel.relname, res->err)));
1239
1241
1242 pstate = make_parsestate(NULL);
1244 NULL, false, false);
1245
1246 attnamelist = make_copy_attnamelist(relmapentry);
1247 cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1248
1249 /* Do the copy */
1250 (void) CopyFrom(cstate);
1251
1252 logicalrep_rel_close(relmapentry, NoLock);
1253}
Subscription * MySubscription
Definition: worker.c:299
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copyfrom.c:1529
uint64 CopyFrom(CopyFromState cstate)
Definition: copyfrom.c:779
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
Assert(PointerIsAligned(start, uint64))
int i
Definition: isn.c:77
void list_free_deep(List *list)
Definition: list.c:1560
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3506
DefElem * makeDefElem(char *name, Node *arg, int location)
Definition: makefuncs.c:637
void pfree(void *pointer)
Definition: mcxt.c:2146
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:39
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
#define lfirst(lc)
Definition: pg_list.h:172
#define list_make1(x1)
Definition: pg_list.h:212
#define for_each_from(cell, lst, N)
Definition: pg_list.h:414
#define linitial(l)
Definition: pg_list.h:178
#define RelationGetRelationName(relation)
Definition: rel.h:550
#define RelationGetNamespace(relation)
Definition: rel.h:557
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:13103
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:13019
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:164
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:504
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:349
StringInfo makeStringInfo(void)
Definition: stringinfo.c:72
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: pg_list.h:54
LogicalRepRelId remoteid
Definition: logicalproto.h:107
Definition: nodes.h:135
WalRcvExecStatus status
Definition: walreceiver.h:220
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
Definition: tablesync.c:697
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
Definition: tablesync.c:796
static int copy_read_data(void *outbuf, int minread, int maxread)
Definition: tablesync.c:717
String * makeString(char *str)
Definition: value.c:63
#define strVal(v)
Definition: value.h:82
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:209
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:471
#define walrcv_server_version(conn)
Definition: walreceiver.h:447
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:465

References AccessShareLock, addRangeTableEntryForRelation(), appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), Assert(), LogicalRepRelation::attnames, BeginCopyFrom(), Subscription::binary, copy_read_data(), copybuf, CopyFrom(), StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, fetch_remote_table_info(), for_each_from, get_namespace_name(), i, initStringInfo(), lfirst, linitial, list_free_deep(), list_make1, LogicalRepRelMapEntry::localrel, logicalrep_rel_close(), logicalrep_rel_open(), logicalrep_relmap_update(), LogRepWorkerWalRcvConn, make_copy_attnamelist(), make_parsestate(), makeDefElem(), makeString(), makeStringInfo(), MySubscription, LogicalRepRelation::natts, NIL, NoLock, LogicalRepRelation::nspname, pfree(), quote_identifier(), quote_qualified_identifier(), RelationGetNamespace, RelationGetRelationName, LogicalRepRelation::relkind, LogicalRepRelation::relname, LogicalRepRelation::remoteid, WalRcvExecResult::status, strVal, walrcv_clear_result(), walrcv_exec, WALRCV_OK_COPY_OUT, and walrcv_server_version.

Referenced by LogicalRepSyncTableStart().

◆ fetch_remote_table_info()

static void fetch_remote_table_info ( char *  nspname,
char *  relname,
LogicalRepRelation lrel,
List **  qual,
bool *  gencol_published 
)
static

Definition at line 796 of file tablesync.c.

798{
799 WalRcvExecResult *res;
800 StringInfoData cmd;
801 TupleTableSlot *slot;
802 Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
803 Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
804 Oid qualRow[] = {TEXTOID};
805 bool isnull;
806 int natt;
807 StringInfo pub_names = NULL;
808 Bitmapset *included_cols = NULL;
810
811 lrel->nspname = nspname;
812 lrel->relname = relname;
813
814 /* First fetch Oid and replica identity. */
815 initStringInfo(&cmd);
816 appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
817 " FROM pg_catalog.pg_class c"
818 " INNER JOIN pg_catalog.pg_namespace n"
819 " ON (c.relnamespace = n.oid)"
820 " WHERE n.nspname = %s"
821 " AND c.relname = %s",
822 quote_literal_cstr(nspname),
825 lengthof(tableRow), tableRow);
826
827 if (res->status != WALRCV_OK_TUPLES)
829 (errcode(ERRCODE_CONNECTION_FAILURE),
830 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
831 nspname, relname, res->err)));
832
834 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
836 (errcode(ERRCODE_UNDEFINED_OBJECT),
837 errmsg("table \"%s.%s\" not found on publisher",
838 nspname, relname)));
839
840 lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
841 Assert(!isnull);
842 lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
843 Assert(!isnull);
844 lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
845 Assert(!isnull);
846
849
850
851 /*
852 * Get column lists for each relation.
853 *
854 * We need to do this before fetching info about column names and types,
855 * so that we can skip columns that should not be replicated.
856 */
857 if (server_version >= 150000)
858 {
859 WalRcvExecResult *pubres;
860 TupleTableSlot *tslot;
861 Oid attrsRow[] = {INT2VECTOROID};
862
863 /* Build the pub_names comma-separated string. */
864 pub_names = makeStringInfo();
866
867 /*
868 * Fetch info about column lists for the relation (from all the
869 * publications).
870 */
871 resetStringInfo(&cmd);
872 appendStringInfo(&cmd,
873 "SELECT DISTINCT"
874 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
875 " THEN NULL ELSE gpt.attrs END)"
876 " FROM pg_publication p,"
877 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
878 " pg_class c"
879 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
880 " AND p.pubname IN ( %s )",
881 lrel->remoteid,
882 pub_names->data);
883
885 lengthof(attrsRow), attrsRow);
886
887 if (pubres->status != WALRCV_OK_TUPLES)
889 (errcode(ERRCODE_CONNECTION_FAILURE),
890 errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
891 nspname, relname, pubres->err)));
892
893 /*
894 * We don't support the case where the column list is different for
895 * the same table when combining publications. See comments atop
896 * fetch_table_list. So there should be only one row returned.
897 * Although we already checked this when creating the subscription, we
898 * still need to check here in case the column list was changed after
899 * creating the subscription and before the sync worker is started.
900 */
901 if (tuplestore_tuple_count(pubres->tuplestore) > 1)
903 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
904 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
905 nspname, relname));
906
907 /*
908 * Get the column list and build a single bitmap with the attnums.
909 *
910 * If we find a NULL value, it means all the columns should be
911 * replicated.
912 */
914 if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
915 {
916 Datum cfval = slot_getattr(tslot, 1, &isnull);
917
918 if (!isnull)
919 {
920 ArrayType *arr;
921 int nelems;
922 int16 *elems;
923
924 arr = DatumGetArrayTypeP(cfval);
925 nelems = ARR_DIMS(arr)[0];
926 elems = (int16 *) ARR_DATA_PTR(arr);
927
928 for (natt = 0; natt < nelems; natt++)
929 included_cols = bms_add_member(included_cols, elems[natt]);
930 }
931
932 ExecClearTuple(tslot);
933 }
935
936 walrcv_clear_result(pubres);
937 }
938
939 /*
940 * Now fetch column names and types.
941 */
942 resetStringInfo(&cmd);
944 "SELECT a.attnum,"
945 " a.attname,"
946 " a.atttypid,"
947 " a.attnum = ANY(i.indkey)");
948
949 /* Generated columns can be replicated since version 18. */
950 if (server_version >= 180000)
951 appendStringInfoString(&cmd, ", a.attgenerated != ''");
952
953 appendStringInfo(&cmd,
954 " FROM pg_catalog.pg_attribute a"
955 " LEFT JOIN pg_catalog.pg_index i"
956 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
957 " WHERE a.attnum > 0::pg_catalog.int2"
958 " AND NOT a.attisdropped %s"
959 " AND a.attrelid = %u"
960 " ORDER BY a.attnum",
961 lrel->remoteid,
962 (server_version >= 120000 && server_version < 180000 ?
963 "AND a.attgenerated = ''" : ""),
964 lrel->remoteid);
966 server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
967
968 if (res->status != WALRCV_OK_TUPLES)
970 (errcode(ERRCODE_CONNECTION_FAILURE),
971 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
972 nspname, relname, res->err)));
973
974 /* We don't know the number of rows coming, so allocate enough space. */
975 lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
976 lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
977 lrel->attkeys = NULL;
978
979 /*
980 * Store the columns as a list of names. Ignore those that are not
981 * present in the column list, if there is one.
982 */
983 natt = 0;
985 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
986 {
987 char *rel_colname;
989
990 attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
991 Assert(!isnull);
992
993 /* If the column is not in the column list, skip it. */
994 if (included_cols != NULL && !bms_is_member(attnum, included_cols))
995 {
996 ExecClearTuple(slot);
997 continue;
998 }
999
1000 rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1001 Assert(!isnull);
1002
1003 lrel->attnames[natt] = rel_colname;
1004 lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
1005 Assert(!isnull);
1006
1007 if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
1008 lrel->attkeys = bms_add_member(lrel->attkeys, natt);
1009
1010 /* Remember if the remote table has published any generated column. */
1011 if (server_version >= 180000 && !(*gencol_published))
1012 {
1013 *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
1014 Assert(!isnull);
1015 }
1016
1017 /* Should never happen. */
1018 if (++natt >= MaxTupleAttributeNumber)
1019 elog(ERROR, "too many columns in remote table \"%s.%s\"",
1020 nspname, relname);
1021
1022 ExecClearTuple(slot);
1023 }
1025
1026 lrel->natts = natt;
1027
1029
1030 /*
1031 * Get relation's row filter expressions. DISTINCT avoids the same
1032 * expression of a table in multiple publications from being included
1033 * multiple times in the final expression.
1034 *
1035 * We need to copy the row even if it matches just one of the
1036 * publications, so we later combine all the quals with OR.
1037 *
1038 * For initial synchronization, row filtering can be ignored in following
1039 * cases:
1040 *
1041 * 1) one of the subscribed publications for the table hasn't specified
1042 * any row filter
1043 *
1044 * 2) one of the subscribed publications has puballtables set to true
1045 *
1046 * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1047 * that includes this relation
1048 */
1049 if (server_version >= 150000)
1050 {
1051 /* Reuse the already-built pub_names. */
1052 Assert(pub_names != NULL);
1053
1054 /* Check for row filters. */
1055 resetStringInfo(&cmd);
1056 appendStringInfo(&cmd,
1057 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1058 " FROM pg_publication p,"
1059 " LATERAL pg_get_publication_tables(p.pubname) gpt"
1060 " WHERE gpt.relid = %u"
1061 " AND p.pubname IN ( %s )",
1062 lrel->remoteid,
1063 pub_names->data);
1064
1065 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1066
1067 if (res->status != WALRCV_OK_TUPLES)
1068 ereport(ERROR,
1069 (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1070 nspname, relname, res->err)));
1071
1072 /*
1073 * Multiple row filter expressions for the same table will be combined
1074 * by COPY using OR. If any of the filter expressions for this table
1075 * are null, it means the whole table will be copied. In this case it
1076 * is not necessary to construct a unified row filter expression at
1077 * all.
1078 */
1080 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1081 {
1082 Datum rf = slot_getattr(slot, 1, &isnull);
1083
1084 if (!isnull)
1085 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1086 else
1087 {
1088 /* Ignore filters and cleanup as necessary. */
1089 if (*qual)
1090 {
1091 list_free_deep(*qual);
1092 *qual = NIL;
1093 }
1094 break;
1095 }
1096
1097 ExecClearTuple(slot);
1098 }
1100
1102 destroyStringInfo(pub_names);
1103 }
1104
1105 pfree(cmd.data);
1106}
#define ARR_DATA_PTR(a)
Definition: array.h:322
#define DatumGetArrayTypeP(X)
Definition: array.h:261
#define ARR_DIMS(a)
Definition: array.h:294
int16 AttrNumber
Definition: attnum.h:21
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
#define TextDatumGetCString(d)
Definition: builtins.h:98
int16_t int16
Definition: c.h:497
#define lengthof(array)
Definition: c.h:759
#define elog(elevel,...)
Definition: elog.h:226
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1427
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1443
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
#define MaxTupleAttributeNumber
Definition: htup_details.h:34
List * lappend(List *list, void *datum)
Definition: list.c:339
void * palloc0(Size size)
Definition: mcxt.c:1969
int16 attnum
Definition: pg_attribute.h:74
NameData relname
Definition: pg_class.h:38
static int server_version
Definition: pg_dumpall.c:113
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
static bool DatumGetBool(Datum X)
Definition: postgres.h:95
uintptr_t Datum
Definition: postgres.h:69
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:247
static char DatumGetChar(Datum X)
Definition: postgres.h:117
static int16 DatumGetInt16(Datum X)
Definition: postgres.h:167
unsigned int Oid
Definition: postgres_ext.h:30
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
void destroyStringInfo(StringInfo str)
Definition: stringinfo.c:409
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
Bitmapset * attkeys
Definition: logicalproto.h:115
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1130
int64 tuplestore_tuple_count(Tuplestorestate *state)
Definition: tuplestore.c:580
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:399
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:458
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207

References appendStringInfo(), appendStringInfoString(), ARR_DATA_PTR, ARR_DIMS, Assert(), LogicalRepRelation::attkeys, LogicalRepRelation::attnames, attnum, LogicalRepRelation::atttyps, bms_add_member(), bms_is_member(), StringInfoData::data, DatumGetArrayTypeP, DatumGetBool(), DatumGetChar(), DatumGetInt16(), DatumGetObjectId(), destroyStringInfo(), elog, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), GetPublicationsStr(), initStringInfo(), lappend(), lengthof, list_free_deep(), LogRepWorkerWalRcvConn, MakeSingleTupleTableSlot(), makeString(), makeStringInfo(), MaxTupleAttributeNumber, MySubscription, LogicalRepRelation::natts, NIL, LogicalRepRelation::nspname, palloc0(), pfree(), Subscription::publications, quote_literal_cstr(), LogicalRepRelation::relkind, relname, LogicalRepRelation::relname, LogicalRepRelation::remoteid, LogicalRepRelation::replident, resetStringInfo(), server_version, slot_getattr(), WalRcvExecResult::status, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), tuplestore_tuple_count(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, and walrcv_server_version.

Referenced by copy_table().

◆ FetchTableStates()

static bool FetchTableStates ( bool *  started_tx)
static

Definition at line 1579 of file tablesync.c.

1580{
1581 static bool has_subrels = false;
1582
1583 *started_tx = false;
1584
1586 {
1587 MemoryContext oldctx;
1588 List *rstates;
1589 ListCell *lc;
1590 SubscriptionRelState *rstate;
1591
1593
1594 /* Clean the old lists. */
1597
1598 if (!IsTransactionState())
1599 {
1601 *started_tx = true;
1602 }
1603
1604 /* Fetch all non-ready tables. */
1606
1607 /* Allocate the tracking info in a permanent memory context. */
1609 foreach(lc, rstates)
1610 {
1611 rstate = palloc(sizeof(SubscriptionRelState));
1612 memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1614 }
1615 MemoryContextSwitchTo(oldctx);
1616
1617 /*
1618 * Does the subscription have tables?
1619 *
1620 * If there were not-READY relations found then we know it does. But
1621 * if table_states_not_ready was empty we still need to check again to
1622 * see if there are 0 tables.
1623 */
1624 has_subrels = (table_states_not_ready != NIL) ||
1626
1627 /*
1628 * If the subscription relation cache has been invalidated since we
1629 * entered this routine, we still use and return the relations we just
1630 * finished constructing, to avoid infinite loops, but we leave the
1631 * table states marked as stale so that we'll rebuild it again on next
1632 * access. Otherwise, we mark the table states as valid.
1633 */
1636 }
1637
1638 return has_subrels;
1639}
void * palloc(Size size)
Definition: mcxt.c:1939
MemoryContext CacheMemoryContext
Definition: mcxt.c:168
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
List * GetSubscriptionRelations(Oid subid, bool not_ready)
bool HasSubscriptionRelations(Oid subid)
static SyncingTablesState table_states_validity
Definition: tablesync.c:133
bool IsTransactionState(void)
Definition: xact.c:387
void StartTransactionCommand(void)
Definition: xact.c:3059

References CacheMemoryContext, GetSubscriptionRelations(), HasSubscriptionRelations(), IsTransactionState(), lappend(), lfirst, list_free_deep(), MemoryContextSwitchTo(), MySubscription, NIL, Subscription::oid, palloc(), StartTransactionCommand(), SYNC_TABLE_STATE_REBUILD_STARTED, SYNC_TABLE_STATE_VALID, table_states_not_ready, and table_states_validity.

Referenced by AllTablesyncsReady(), and process_syncing_tables_for_apply().

◆ finish_sync_worker()

static pg_noreturn void finish_sync_worker ( void  )
static

Definition at line 143 of file tablesync.c.

144{
145 /*
146 * Commit any outstanding transaction. This is the usual case, unless
147 * there was nothing to do for the table.
148 */
149 if (IsTransactionState())
150 {
152 pgstat_report_stat(true);
153 }
154
155 /* And flush all writes. */
157
159 ereport(LOG,
160 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
164
165 /* Find the leader apply worker and signal it. */
167
168 /* Stop gracefully */
169 proc_exit(0);
170}
#define LOG
Definition: elog.h:31
void proc_exit(int code)
Definition: ipc.c:104
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:673
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:54
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2068
#define InvalidOid
Definition: postgres_ext.h:35
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:9628
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2923

References CommitTransactionCommand(), ereport, errmsg(), get_rel_name(), GetXLogWriteRecPtr(), InvalidOid, IsTransactionState(), LOG, logicalrep_worker_wakeup(), MyLogicalRepWorker, MySubscription, Subscription::name, pgstat_report_stat(), proc_exit(), LogicalRepWorker::relid, StartTransactionCommand(), LogicalRepWorker::subid, and XLogFlush().

Referenced by LogicalRepSyncTableStart(), process_syncing_tables_for_sync(), and TablesyncWorkerMain().

◆ invalidate_syncing_table_states()

void invalidate_syncing_table_states ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)

◆ LogicalRepSyncTableStart()

static char * LogicalRepSyncTableStart ( XLogRecPtr origin_startpos)
static

Definition at line 1289 of file tablesync.c.

1290{
1291 char *slotname;
1292 char *err;
1293 char relstate;
1294 XLogRecPtr relstate_lsn;
1295 Relation rel;
1296 AclResult aclresult;
1297 WalRcvExecResult *res;
1298 char originname[NAMEDATALEN];
1299 RepOriginId originid;
1300 UserContext ucxt;
1301 bool must_use_password;
1302 bool run_as_owner;
1303
1304 /* Check the state of the table synchronization. */
1308 &relstate_lsn);
1310
1311 /* Is the use of a password mandatory? */
1312 must_use_password = MySubscription->passwordrequired &&
1314
1316 MyLogicalRepWorker->relstate = relstate;
1317 MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1319
1320 /*
1321 * If synchronization is already done or no longer necessary, exit now
1322 * that we've updated shared memory state.
1323 */
1324 switch (relstate)
1325 {
1326 case SUBREL_STATE_SYNCDONE:
1327 case SUBREL_STATE_READY:
1328 case SUBREL_STATE_UNKNOWN:
1329 finish_sync_worker(); /* doesn't return */
1330 }
1331
1332 /* Calculate the name of the tablesync slot. */
1333 slotname = (char *) palloc(NAMEDATALEN);
1336 slotname,
1337 NAMEDATALEN);
1338
1339 /*
1340 * Here we use the slot name instead of the subscription name as the
1341 * application_name, so that it is different from the leader apply worker,
1342 * so that synchronous replication can distinguish them.
1343 */
1346 must_use_password,
1347 slotname, &err);
1348 if (LogRepWorkerWalRcvConn == NULL)
1349 ereport(ERROR,
1350 (errcode(ERRCODE_CONNECTION_FAILURE),
1351 errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1352 MySubscription->name, err)));
1353
1354 Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1355 MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1356 MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1357
1358 /* Assign the origin tracking record name. */
1361 originname,
1362 sizeof(originname));
1363
1364 if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1365 {
1366 /*
1367 * We have previously errored out before finishing the copy so the
1368 * replication slot might exist. We want to remove the slot if it
1369 * already exists and proceed.
1370 *
1371 * XXX We could also instead try to drop the slot, last time we failed
1372 * but for that, we might need to clean up the copy state as it might
1373 * be in the middle of fetching the rows. Also, if there is a network
1374 * breakdown then it wouldn't have succeeded so trying it next time
1375 * seems like a better bet.
1376 */
1378 }
1379 else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1380 {
1381 /*
1382 * The COPY phase was previously done, but tablesync then crashed
1383 * before it was able to finish normally.
1384 */
1386
1387 /*
1388 * The origin tracking name must already exist. It was created first
1389 * time this tablesync was launched.
1390 */
1391 originid = replorigin_by_name(originname, false);
1392 replorigin_session_setup(originid, 0);
1393 replorigin_session_origin = originid;
1394 *origin_startpos = replorigin_session_get_progress(false);
1395
1397
1398 goto copy_table_done;
1399 }
1400
1402 MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1405
1406 /* Update the state and make it visible to others. */
1413 pgstat_report_stat(true);
1414
1416
1417 /*
1418 * Use a standard write lock here. It might be better to disallow access
1419 * to the table while it's being synchronized. But we don't want to block
1420 * the main apply process from working and it has to open the relation in
1421 * RowExclusiveLock when remapping remote relation id to local one.
1422 */
1424
1425 /*
1426 * Start a transaction in the remote node in REPEATABLE READ mode. This
1427 * ensures that both the replication slot we create (see below) and the
1428 * COPY are consistent with each other.
1429 */
1431 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1432 0, NULL);
1433 if (res->status != WALRCV_OK_COMMAND)
1434 ereport(ERROR,
1435 (errcode(ERRCODE_CONNECTION_FAILURE),
1436 errmsg("table copy could not start transaction on publisher: %s",
1437 res->err)));
1439
1440 /*
1441 * Create a new permanent logical decoding slot. This slot will be used
1442 * for the catchup phase after COPY is done, so tell it to use the
1443 * snapshot to make the final data consistent.
1444 */
1446 slotname, false /* permanent */ , false /* two_phase */ ,
1448 CRS_USE_SNAPSHOT, origin_startpos);
1449
1450 /*
1451 * Setup replication origin tracking. The purpose of doing this before the
1452 * copy is to avoid doing the copy again due to any error in setting up
1453 * origin tracking.
1454 */
1455 originid = replorigin_by_name(originname, true);
1456 if (!OidIsValid(originid))
1457 {
1458 /*
1459 * Origin tracking does not exist, so create it now.
1460 *
1461 * Then advance to the LSN got from walrcv_create_slot. This is WAL
1462 * logged for the purpose of recovery. Locks are to prevent the
1463 * replication origin from vanishing while advancing.
1464 */
1465 originid = replorigin_create(originname);
1466
1467 LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1468 replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1469 true /* go backward */ , true /* WAL log */ );
1470 UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1471
1472 replorigin_session_setup(originid, 0);
1473 replorigin_session_origin = originid;
1474 }
1475 else
1476 {
1477 ereport(ERROR,
1479 errmsg("replication origin \"%s\" already exists",
1480 originname)));
1481 }
1482
1483 /*
1484 * Make sure that the copy command runs as the table owner, unless the
1485 * user has opted out of that behaviour.
1486 */
1487 run_as_owner = MySubscription->runasowner;
1488 if (!run_as_owner)
1489 SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1490
1491 /*
1492 * Check that our table sync worker has permission to insert into the
1493 * target table.
1494 */
1495 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1496 ACL_INSERT);
1497 if (aclresult != ACLCHECK_OK)
1498 aclcheck_error(aclresult,
1499 get_relkind_objtype(rel->rd_rel->relkind),
1501
1502 /*
1503 * COPY FROM does not honor RLS policies. That is not a problem for
1504 * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1505 * who has it implicitly), but other roles should not be able to
1506 * circumvent RLS. Disallow logical replication into RLS enabled
1507 * relations for such roles.
1508 */
1510 ereport(ERROR,
1511 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1512 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1515
1516 /* Now do the initial data copy */
1518 copy_table(rel);
1520
1521 res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1522 if (res->status != WALRCV_OK_COMMAND)
1523 ereport(ERROR,
1524 (errcode(ERRCODE_CONNECTION_FAILURE),
1525 errmsg("table copy could not finish transaction on publisher: %s",
1526 res->err)));
1528
1529 if (!run_as_owner)
1530 RestoreUserContext(&ucxt);
1531
1532 table_close(rel, NoLock);
1533
1534 /* Make the copy visible. */
1536
1537 /*
1538 * Update the persisted state to indicate the COPY phase is done; make it
1539 * visible to others.
1540 */
1543 SUBREL_STATE_FINISHEDCOPY,
1545
1547
1548copy_table_done:
1549
1550 elog(DEBUG1,
1551 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1552 originname, LSN_FORMAT_ARGS(*origin_startpos));
1553
1554 /*
1555 * We are done with the initial data synchronization, update the state.
1556 */
1558 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1559 MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1561
1562 /*
1563 * Finally, wait until the leader apply worker tells us to catch up and
1564 * then return to let LogicalRepApplyLoop do it.
1565 */
1566 wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1567 return slotname;
1568}
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2639
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4024
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:426
#define OidIsValid(objectId)
Definition: c.h:746
#define DEBUG1
Definition: elog.h:30
void err(int eval, const char *fmt,...)
Definition: err.c:43
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:107
#define RowExclusiveLock
Definition: lockdefs.h:38
Oid GetUserId(void)
Definition: miscinit.c:520
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:1039
ObjectType get_relkind_objtype(char relkind)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:257
RepOriginId replorigin_session_origin
Definition: origin.c:163
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:888
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1097
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1237
#define ACL_INSERT
Definition: parsenodes.h:76
#define NAMEDATALEN
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define RelationGetRelid(relation)
Definition: rel.h:516
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:271
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:669
void PopActiveSnapshot(void)
Definition: snapmgr.c:762
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
XLogRecPtr relstate_lsn
Form_pg_class rd_rel
Definition: rel.h:111
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:231
static pg_noreturn void finish_sync_worker(void)
Definition: tablesync.c:143
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1273
static void copy_table(Relation rel)
Definition: tablesync.c:1114
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
Definition: walreceiver.h:459
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
void CommandCounterIncrement(void)
Definition: xact.c:1100
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References ACL_INSERT, aclcheck_error(), ACLCHECK_OK, Assert(), check_enable_rls(), CommandCounterIncrement(), CommitTransactionCommand(), Subscription::conninfo, copy_table(), CRS_USE_SNAPSHOT, DEBUG1, elog, ereport, WalRcvExecResult::err, err(), errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, Subscription::failover, finish_sync_worker(), get_relkind_objtype(), GetSubscriptionRelState(), GetTransactionSnapshot(), GetUserId(), GetUserNameFromId(), InvalidOid, InvalidXLogRecPtr, LockRelationOid(), LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, NoLock, Subscription::oid, OidIsValid, Subscription::ownersuperuser, palloc(), Subscription::passwordrequired, pg_class_aclcheck(), pgstat_report_stat(), PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_advance(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), RestoreUserContext(), RLS_ENABLED, RowExclusiveLock, Subscription::runasowner, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), WalRcvExecResult::status, LogicalRepWorker::subid, SwitchToUntrustedUser(), table_close(), table_open(), UnlockRelationOid(), UpdateSubscriptionRelState(), wait_for_worker_state_change(), walrcv_clear_result(), walrcv_connect, walrcv_create_slot, walrcv_exec, and WALRCV_OK_COMMAND.

Referenced by start_table_sync().

◆ make_copy_attnamelist()

static List * make_copy_attnamelist ( LogicalRepRelMapEntry rel)
static

Definition at line 697 of file tablesync.c.

698{
699 List *attnamelist = NIL;
700 int i;
701
702 for (i = 0; i < rel->remoterel.natts; i++)
703 {
704 attnamelist = lappend(attnamelist,
706 }
707
708
709 return attnamelist;
710}
LogicalRepRelation remoterel

References LogicalRepRelation::attnames, i, lappend(), makeString(), LogicalRepRelation::natts, NIL, and LogicalRepRelMapEntry::remoterel.

Referenced by copy_table().

◆ process_syncing_tables()

void process_syncing_tables ( XLogRecPtr  current_lsn)

Definition at line 666 of file tablesync.c.

667{
668 switch (MyLogicalRepWorker->type)
669 {
671
672 /*
673 * Skip for parallel apply workers because they only operate on
674 * tables that are in a READY state. See pa_can_start() and
675 * should_apply_changes_for_rel().
676 */
677 break;
678
681 break;
682
683 case WORKERTYPE_APPLY:
685 break;
686
688 /* Should never happen. */
689 elog(ERROR, "Unknown worker type");
690 }
691}
LogicalRepWorkerType type
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:417
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:294
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY

References elog, ERROR, MyLogicalRepWorker, process_syncing_tables_for_apply(), process_syncing_tables_for_sync(), LogicalRepWorker::type, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_TABLESYNC, and WORKERTYPE_UNKNOWN.

Referenced by apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_commit(), apply_handle_stream_prepare(), and LogicalRepApplyLoop().

◆ process_syncing_tables_for_apply()

static void process_syncing_tables_for_apply ( XLogRecPtr  current_lsn)
static

Definition at line 417 of file tablesync.c.

418{
419 struct tablesync_start_time_mapping
420 {
421 Oid relid;
422 TimestampTz last_start_time;
423 };
424 static HTAB *last_start_times = NULL;
425 ListCell *lc;
426 bool started_tx = false;
427 bool should_exit = false;
428
430
431 /* We need up-to-date sync state info for subscription tables here. */
432 FetchTableStates(&started_tx);
433
434 /*
435 * Prepare a hash table for tracking last start times of workers, to avoid
436 * immediate restarts. We don't need it if there are no tables that need
437 * syncing.
438 */
440 {
441 HASHCTL ctl;
442
443 ctl.keysize = sizeof(Oid);
444 ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
445 last_start_times = hash_create("Logical replication table sync worker start times",
446 256, &ctl, HASH_ELEM | HASH_BLOBS);
447 }
448
449 /*
450 * Clean up the hash table when we're done with all tables (just to
451 * release the bit of memory).
452 */
454 {
456 last_start_times = NULL;
457 }
458
459 /*
460 * Process all tables that are being synchronized.
461 */
462 foreach(lc, table_states_not_ready)
463 {
465
466 if (rstate->state == SUBREL_STATE_SYNCDONE)
467 {
468 /*
469 * Apply has caught up to the position where the table sync has
470 * finished. Mark the table as ready so that the apply will just
471 * continue to replicate it normally.
472 */
473 if (current_lsn >= rstate->lsn)
474 {
475 char originname[NAMEDATALEN];
476
477 rstate->state = SUBREL_STATE_READY;
478 rstate->lsn = current_lsn;
479 if (!started_tx)
480 {
482 started_tx = true;
483 }
484
485 /*
486 * Remove the tablesync origin tracking if exists.
487 *
488 * There is a chance that the user is concurrently performing
489 * refresh for the subscription where we remove the table
490 * state and its origin or the tablesync worker would have
491 * already removed this origin. We can't rely on tablesync
492 * worker to remove the origin tracking as if there is any
493 * error while dropping we won't restart it to drop the
494 * origin. So passing missing_ok = true.
495 */
497 rstate->relid,
498 originname,
499 sizeof(originname));
500 replorigin_drop_by_name(originname, true, false);
501
502 /*
503 * Update the state to READY only after the origin cleanup.
504 */
506 rstate->relid, rstate->state,
507 rstate->lsn);
508 }
509 }
510 else
511 {
512 LogicalRepWorker *syncworker;
513
514 /*
515 * Look for a sync worker for this relation.
516 */
517 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
518
520 rstate->relid, false);
521
522 if (syncworker)
523 {
524 /* Found one, update our copy of its state */
525 SpinLockAcquire(&syncworker->relmutex);
526 rstate->state = syncworker->relstate;
527 rstate->lsn = syncworker->relstate_lsn;
528 if (rstate->state == SUBREL_STATE_SYNCWAIT)
529 {
530 /*
531 * Sync worker is waiting for apply. Tell sync worker it
532 * can catchup now.
533 */
534 syncworker->relstate = SUBREL_STATE_CATCHUP;
535 syncworker->relstate_lsn =
536 Max(syncworker->relstate_lsn, current_lsn);
537 }
538 SpinLockRelease(&syncworker->relmutex);
539
540 /* If we told worker to catch up, wait for it. */
541 if (rstate->state == SUBREL_STATE_SYNCWAIT)
542 {
543 /* Signal the sync worker, as it may be waiting for us. */
544 if (syncworker->proc)
546
547 /* Now safe to release the LWLock */
548 LWLockRelease(LogicalRepWorkerLock);
549
550 if (started_tx)
551 {
552 /*
553 * We must commit the existing transaction to release
554 * the existing locks before entering a busy loop.
555 * This is required to avoid any undetected deadlocks
556 * due to any existing lock as deadlock detector won't
557 * be able to detect the waits on the latch.
558 */
560 pgstat_report_stat(false);
561 }
562
563 /*
564 * Enter busy loop and wait for synchronization worker to
565 * reach expected state (or die trying).
566 */
568 started_tx = true;
569
571 SUBREL_STATE_SYNCDONE);
572 }
573 else
574 LWLockRelease(LogicalRepWorkerLock);
575 }
576 else
577 {
578 /*
579 * If there is no sync worker for this table yet, count
580 * running sync workers for this subscription, while we have
581 * the lock.
582 */
583 int nsyncworkers =
585
586 /* Now safe to release the LWLock */
587 LWLockRelease(LogicalRepWorkerLock);
588
589 /*
590 * If there are free sync worker slot(s), start a new sync
591 * worker for the table.
592 */
593 if (nsyncworkers < max_sync_workers_per_subscription)
594 {
596 struct tablesync_start_time_mapping *hentry;
597 bool found;
598
599 hentry = hash_search(last_start_times, &rstate->relid,
600 HASH_ENTER, &found);
601
602 if (!found ||
603 TimestampDifferenceExceeds(hentry->last_start_time, now,
605 {
611 rstate->relid,
613 hentry->last_start_time = now;
614 }
615 }
616 }
617 }
618 }
619
620 if (started_tx)
621 {
622 /*
623 * Even when the two_phase mode is requested by the user, it remains
624 * as 'pending' until all tablesyncs have reached READY state.
625 *
626 * When this happens, we restart the apply worker and (if the
627 * conditions are still ok) then the two_phase tri-state will become
628 * 'enabled' at that time.
629 *
630 * Note: If the subscription has no tables then leave the state as
631 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
632 * work.
633 */
634 if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
635 {
636 CommandCounterIncrement(); /* make updates visible */
637 if (AllTablesyncsReady())
638 {
639 ereport(LOG,
640 (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
642 should_exit = true;
643 }
644 }
645
647 pgstat_report_stat(true);
648 }
649
650 if (should_exit)
651 {
652 /*
653 * Reset the last-start time for this worker so that the launcher will
654 * restart it without waiting for wal_retrieve_retry_interval.
655 */
657
658 proc_exit(0);
659 }
660}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
#define Max(x, y)
Definition: c.h:969
int64 TimestampTz
Definition: timestamp.h:39
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
Definition: launcher.c:297
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:693
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:234
static dshash_table * last_start_times
Definition: launcher.c:89
int max_sync_workers_per_subscription
Definition: launcher.c:51
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:845
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1072
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1182
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1902
@ LW_SHARED
Definition: lwlock.h:115
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:416
tree ctl
Definition: radixtree.h:1838
Definition: dynahash.c:220
bool AllTablesyncsReady(void)
Definition: tablesync.c:1738
static bool wait_for_relation_state_change(Oid relid, char expected_state)
Definition: tablesync.c:183
int wal_retrieve_retry_interval
Definition: xlog.c:134

References AllTablesyncsReady(), ApplyLauncherForgetWorkerStartTime(), Assert(), CommandCounterIncrement(), CommitTransactionCommand(), ctl, LogicalRepWorker::dbid, DSM_HANDLE_INVALID, ereport, errmsg(), FetchTableStates(), GetCurrentTimestamp(), HASH_BLOBS, hash_create(), hash_destroy(), HASH_ELEM, HASH_ENTER, hash_search(), IsTransactionState(), last_start_times, lfirst, LOG, logicalrep_sync_worker_count(), logicalrep_worker_find(), logicalrep_worker_launch(), logicalrep_worker_wakeup_ptr(), SubscriptionRelState::lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), Max, max_sync_workers_per_subscription, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, NIL, now(), Subscription::oid, pgstat_report_stat(), LogicalRepWorker::proc, proc_exit(), SubscriptionRelState::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), replorigin_drop_by_name(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), SubscriptionRelState::state, LogicalRepWorker::subid, table_states_not_ready, TimestampDifferenceExceeds(), Subscription::twophasestate, UpdateSubscriptionRelState(), LogicalRepWorker::userid, wait_for_relation_state_change(), wal_retrieve_retry_interval, and WORKERTYPE_TABLESYNC.

Referenced by process_syncing_tables().

◆ process_syncing_tables_for_sync()

static void process_syncing_tables_for_sync ( XLogRecPtr  current_lsn)
static

Definition at line 294 of file tablesync.c.

295{
297
298 if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
299 current_lsn >= MyLogicalRepWorker->relstate_lsn)
300 {
301 TimeLineID tli;
302 char syncslotname[NAMEDATALEN] = {0};
303 char originname[NAMEDATALEN] = {0};
304
305 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
306 MyLogicalRepWorker->relstate_lsn = current_lsn;
307
309
310 /*
311 * UpdateSubscriptionRelState must be called within a transaction.
312 */
313 if (!IsTransactionState())
315
320
321 /*
322 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
323 * the slot.
324 */
326
327 /*
328 * Cleanup the tablesync slot.
329 *
330 * This has to be done after updating the state because otherwise if
331 * there is an error while doing the database operations we won't be
332 * able to rollback dropped slot.
333 */
336 syncslotname,
337 sizeof(syncslotname));
338
339 /*
340 * It is important to give an error if we are unable to drop the slot,
341 * otherwise, it won't be dropped till the corresponding subscription
342 * is dropped. So passing missing_ok = false.
343 */
345
347 pgstat_report_stat(false);
348
349 /*
350 * Start a new transaction to clean up the tablesync origin tracking.
351 * This transaction will be ended within the finish_sync_worker().
352 * Now, even, if we fail to remove this here, the apply worker will
353 * ensure to clean it up afterward.
354 *
355 * We need to do this after the table state is set to SYNCDONE.
356 * Otherwise, if an error occurs while performing the database
357 * operation, the worker will be restarted and the in-memory state of
358 * replication progress (remote_lsn) won't be rolled-back which would
359 * have been cleared before restart. So, the restarted worker will use
360 * invalid replication progress state resulting in replay of
361 * transactions that have already been applied.
362 */
364
367 originname,
368 sizeof(originname));
369
370 /*
371 * Resetting the origin session removes the ownership of the slot.
372 * This is needed to allow the origin to be dropped.
373 */
378
379 /*
380 * Drop the tablesync's origin tracking if exists.
381 *
382 * There is a chance that the user is concurrently performing refresh
383 * for the subscription where we remove the table state and its origin
384 * or the apply worker would have removed this origin. So passing
385 * missing_ok = true.
386 */
387 replorigin_drop_by_name(originname, true, false);
388
390 }
391 else
393}
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:165
void replorigin_session_reset(void)
Definition: origin.c:1190
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:164
#define InvalidRepOriginId
Definition: origin.h:33
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:453
uint32 TimeLineID
Definition: xlogdefs.h:59

References CommitTransactionCommand(), finish_sync_worker(), InvalidRepOriginId, InvalidXLogRecPtr, IsTransactionState(), LogRepWorkerWalRcvConn, MyLogicalRepWorker, NAMEDATALEN, pgstat_report_stat(), LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, replorigin_session_reset(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), LogicalRepWorker::subid, UpdateSubscriptionRelState(), and walrcv_endstreaming.

Referenced by process_syncing_tables().

◆ ReplicationSlotNameForTablesync()

void ReplicationSlotNameForTablesync ( Oid  suboid,
Oid  relid,
char *  syncslotname,
Size  szslot 
)

Definition at line 1273 of file tablesync.c.

1275{
1276 snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1277 relid, GetSystemIdentifier());
1278}
#define UINT64_FORMAT
Definition: c.h:521
#define snprintf
Definition: port.h:239
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4734

References GetSystemIdentifier(), snprintf, and UINT64_FORMAT.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), process_syncing_tables_for_sync(), and ReportSlotConnectionError().

◆ run_tablesync_worker()

static void run_tablesync_worker ( )
static

Definition at line 1692 of file tablesync.c.

1693{
1694 char originname[NAMEDATALEN];
1695 XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1696 char *slotname = NULL;
1698
1699 start_table_sync(&origin_startpos, &slotname);
1700
1703 originname,
1704 sizeof(originname));
1705
1707
1708 set_stream_options(&options, slotname, &origin_startpos);
1709
1711
1712 /* Apply the changes till we catchup with the apply worker. */
1713 start_apply(origin_startpos);
1714}
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:4442
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4511
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5157
static char ** options
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
Definition: tablesync.c:1650
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:451

References InvalidXLogRecPtr, LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, NAMEDATALEN, Subscription::oid, options, LogicalRepWorker::relid, ReplicationOriginNameForLogicalRep(), set_apply_error_context_origin(), set_stream_options(), start_apply(), start_table_sync(), and walrcv_startstreaming.

Referenced by TablesyncWorkerMain().

◆ start_table_sync()

static void start_table_sync ( XLogRecPtr origin_startpos,
char **  slotname 
)
static

Definition at line 1650 of file tablesync.c.

1651{
1652 char *sync_slotname = NULL;
1653
1655
1656 PG_TRY();
1657 {
1658 /* Call initial sync. */
1659 sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1660 }
1661 PG_CATCH();
1662 {
1665 else
1666 {
1667 /*
1668 * Report the worker failed during table synchronization. Abort
1669 * the current transaction so that the stats message is sent in an
1670 * idle state.
1671 */
1674
1675 PG_RE_THROW();
1676 }
1677 }
1678 PG_END_TRY();
1679
1680 /* allocate slot name in long-lived context */
1681 *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1682 pfree(sync_slotname);
1683}
void DisableSubscriptionAndExit(void)
Definition: worker.c:4816
MemoryContext ApplyContext
Definition: worker.c:292
#define PG_RE_THROW()
Definition: elog.h:405
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define PG_CATCH(...)
Definition: elog.h:382
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:2308
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:1289
static bool am_tablesync_worker(void)
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4862

References AbortOutOfAnyTransaction(), am_tablesync_worker(), ApplyContext, Assert(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepSyncTableStart(), MemoryContextStrdup(), MySubscription, Subscription::oid, pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, and pgstat_report_subscription_error().

Referenced by run_tablesync_worker().

◆ TablesyncWorkerMain()

void TablesyncWorkerMain ( Datum  main_arg)

Definition at line 1718 of file tablesync.c.

1719{
1720 int worker_slot = DatumGetInt32(main_arg);
1721
1722 SetupApplyOrSyncWorker(worker_slot);
1723
1725
1727}
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4742
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:207
static void run_tablesync_worker()
Definition: tablesync.c:1692

References DatumGetInt32(), finish_sync_worker(), run_tablesync_worker(), and SetupApplyOrSyncWorker().

◆ UpdateTwoPhaseState()

void UpdateTwoPhaseState ( Oid  suboid,
char  new_state 
)

Definition at line 1763 of file tablesync.c.

1764{
1765 Relation rel;
1766 HeapTuple tup;
1767 bool nulls[Natts_pg_subscription];
1768 bool replaces[Natts_pg_subscription];
1769 Datum values[Natts_pg_subscription];
1770
1771 Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1772 new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1773 new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1774
1775 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1776 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1777 if (!HeapTupleIsValid(tup))
1778 elog(ERROR,
1779 "cache lookup failed for subscription oid %u",
1780 suboid);
1781
1782 /* Form a new tuple. */
1783 memset(values, 0, sizeof(values));
1784 memset(nulls, false, sizeof(nulls));
1785 memset(replaces, false, sizeof(replaces));
1786
1787 /* And update/set two_phase state */
1788 values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1789 replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1790
1791 tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1792 values, nulls, replaces);
1793 CatalogTupleUpdate(rel, &tup->t_self, tup);
1794
1795 heap_freetuple(tup);
1797}
static Datum values[MAXATTR]
Definition: bootstrap.c:151
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
static Datum CharGetDatum(char X)
Definition: postgres.h:127
#define RelationGetDescr(relation)
Definition: rel.h:542
ItemPointerData t_self
Definition: htup.h:65
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91

References Assert(), CatalogTupleUpdate(), CharGetDatum(), elog, ERROR, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, HeapTupleData::t_self, table_close(), table_open(), and values.

Referenced by CreateSubscription(), and run_apply_worker().

◆ wait_for_relation_state_change()

static bool wait_for_relation_state_change ( Oid  relid,
char  expected_state 
)
static

Definition at line 183 of file tablesync.c.

184{
185 char state;
186
187 for (;;)
188 {
189 LogicalRepWorker *worker;
190 XLogRecPtr statelsn;
191
193
196 relid, &statelsn);
197
198 if (state == SUBREL_STATE_UNKNOWN)
199 break;
200
201 if (state == expected_state)
202 return true;
203
204 /* Check if the sync worker is still running and bail if not. */
205 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
207 false);
208 LWLockRelease(LogicalRepWorkerLock);
209 if (!worker)
210 break;
211
212 (void) WaitLatch(MyLatch,
214 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
215
217 }
218
219 return false;
220}
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:443
Definition: regguts.h:323

References CHECK_FOR_INTERRUPTS, GetSubscriptionRelState(), InvalidateCatalogSnapshot(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, ResetLatch(), LogicalRepWorker::subid, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by process_syncing_tables_for_apply().

◆ wait_for_worker_state_change()

static bool wait_for_worker_state_change ( char  expected_state)
static

Definition at line 231 of file tablesync.c.

232{
233 int rc;
234
235 for (;;)
236 {
237 LogicalRepWorker *worker;
238
240
241 /*
242 * Done if already in correct state. (We assume this fetch is atomic
243 * enough to not give a misleading answer if we do it with no lock.)
244 */
245 if (MyLogicalRepWorker->relstate == expected_state)
246 return true;
247
248 /*
249 * Bail out if the apply worker has died, else signal it we're
250 * waiting.
251 */
252 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
254 InvalidOid, false);
255 if (worker && worker->proc)
257 LWLockRelease(LogicalRepWorkerLock);
258 if (!worker)
259 break;
260
261 /*
262 * Wait. We expect to get a latch signal back from the apply worker,
263 * but use a timeout in case it dies without sending one.
264 */
265 rc = WaitLatch(MyLatch,
267 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
268
269 if (rc & WL_LATCH_SET)
271 }
272
273 return false;
274}

References CHECK_FOR_INTERRUPTS, InvalidOid, logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, LogicalRepWorker::proc, LogicalRepWorker::relstate, ResetLatch(), LogicalRepWorker::subid, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by LogicalRepSyncTableStart().

Variable Documentation

◆ copybuf

◆ table_states_not_ready

List* table_states_not_ready = NIL
static

◆ table_states_validity

SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD
static

Definition at line 133 of file tablesync.c.

Referenced by FetchTableStates(), and invalidate_syncing_table_states().