PostgreSQL Source Code  git master
tablesync.c File Reference
#include "postgres.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "replication/slot.h"
#include "replication/origin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
Include dependency graph for tablesync.c:

Go to the source code of this file.

Functions

static bool FetchTableStates (bool *started_tx)
 
static void pg_attribute_noreturn () finish_sync_worker(void)
 
static bool wait_for_relation_state_change (Oid relid, char expected_state)
 
static bool wait_for_worker_state_change (char expected_state)
 
void invalidate_syncing_table_states (Datum arg, int cacheid, uint32 hashvalue)
 
static void process_syncing_tables_for_sync (XLogRecPtr current_lsn)
 
static void process_syncing_tables_for_apply (XLogRecPtr current_lsn)
 
void process_syncing_tables (XLogRecPtr current_lsn)
 
static Listmake_copy_attnamelist (LogicalRepRelMapEntry *rel)
 
static int copy_read_data (void *outbuf, int minread, int maxread)
 
static void fetch_remote_table_info (char *nspname, char *relname, LogicalRepRelation *lrel, List **qual)
 
static void copy_table (Relation rel)
 
void ReplicationSlotNameForTablesync (Oid suboid, Oid relid, char *syncslotname, Size szslot)
 
static char * LogicalRepSyncTableStart (XLogRecPtr *origin_startpos)
 
static void start_table_sync (XLogRecPtr *origin_startpos, char **slotname)
 
static void run_tablesync_worker ()
 
void TablesyncWorkerMain (Datum main_arg)
 
bool AllTablesyncsReady (void)
 
void UpdateTwoPhaseState (Oid suboid, char new_state)
 

Variables

static bool table_states_valid = false
 
static Listtable_states_not_ready = NIL
 
static StringInfo copybuf = NULL
 

Function Documentation

◆ AllTablesyncsReady()

bool AllTablesyncsReady ( void  )

Definition at line 1714 of file tablesync.c.

1715 {
1716  bool started_tx = false;
1717  bool has_subrels = false;
1718 
1719  /* We need up-to-date sync state info for subscription tables here. */
1720  has_subrels = FetchTableStates(&started_tx);
1721 
1722  if (started_tx)
1723  {
1725  pgstat_report_stat(true);
1726  }
1727 
1728  /*
1729  * Return false when there are no tables in subscription or not all tables
1730  * are in ready state; true otherwise.
1731  */
1732  return has_subrels && (table_states_not_ready == NIL);
1733 }
#define NIL
Definition: pg_list.h:68
long pgstat_report_stat(bool force)
Definition: pgstat.c:582
static List * table_states_not_ready
Definition: tablesync.c:127
static bool FetchTableStates(bool *started_tx)
Definition: tablesync.c:1565
void CommitTransactionCommand(void)
Definition: xact.c:3050

References CommitTransactionCommand(), FetchTableStates(), NIL, pgstat_report_stat(), and table_states_not_ready.

Referenced by pa_can_start(), process_syncing_tables_for_apply(), and run_apply_worker().

◆ copy_read_data()

static int copy_read_data ( void *  outbuf,
int  minread,
int  maxread 
)
static

Definition at line 711 of file tablesync.c.

712 {
713  int bytesread = 0;
714  int avail;
715 
716  /* If there are some leftover data from previous read, use it. */
717  avail = copybuf->len - copybuf->cursor;
718  if (avail)
719  {
720  if (avail > maxread)
721  avail = maxread;
722  memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
723  copybuf->cursor += avail;
724  maxread -= avail;
725  bytesread += avail;
726  }
727 
728  while (maxread > 0 && bytesread < minread)
729  {
731  int len;
732  char *buf = NULL;
733 
734  for (;;)
735  {
736  /* Try read the data. */
738 
740 
741  if (len == 0)
742  break;
743  else if (len < 0)
744  return bytesread;
745  else
746  {
747  /* Process the data */
748  copybuf->data = buf;
749  copybuf->len = len;
750  copybuf->cursor = 0;
751 
752  avail = copybuf->len - copybuf->cursor;
753  if (avail > maxread)
754  avail = maxread;
755  memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
756  outbuf = (void *) ((char *) outbuf + avail);
757  copybuf->cursor += avail;
758  maxread -= avail;
759  bytesread += avail;
760  }
761 
762  if (maxread <= 0 || bytesread >= minread)
763  return bytesread;
764  }
765 
766  /*
767  * Wait for more data or latch.
768  */
769  (void) WaitLatchOrSocket(MyLatch,
772  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
773 
775  }
776 
777  return bytesread;
778 }
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:321
struct Latch * MyLatch
Definition: globals.c:59
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:566
void ResetLatch(Latch *latch)
Definition: latch.c:725
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const void size_t len
static char * buf
Definition: pg_test_fsync.c:73
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
static StringInfo copybuf
Definition: tablesync.c:130
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:452

References buf, CHECK_FOR_INTERRUPTS, copybuf, StringInfoData::cursor, StringInfoData::data, fd(), StringInfoData::len, len, LogRepWorkerWalRcvConn, MyLatch, PGINVALID_SOCKET, ResetLatch(), WaitLatchOrSocket(), walrcv_receive, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by copy_table().

◆ copy_table()

static void copy_table ( Relation  rel)
static

Definition at line 1108 of file tablesync.c.

1109 {
1110  LogicalRepRelMapEntry *relmapentry;
1111  LogicalRepRelation lrel;
1112  List *qual = NIL;
1114  StringInfoData cmd;
1115  CopyFromState cstate;
1116  List *attnamelist;
1117  ParseState *pstate;
1118  List *options = NIL;
1119 
1120  /* Get the publisher relation info. */
1122  RelationGetRelationName(rel), &lrel, &qual);
1123 
1124  /* Put the relation into relmap. */
1125  logicalrep_relmap_update(&lrel);
1126 
1127  /* Map the publisher relation to local one. */
1128  relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1129  Assert(rel == relmapentry->localrel);
1130 
1131  /* Start copy on the publisher. */
1132  initStringInfo(&cmd);
1133 
1134  /* Regular table with no row filter */
1135  if (lrel.relkind == RELKIND_RELATION && qual == NIL)
1136  {
1137  appendStringInfo(&cmd, "COPY %s",
1139 
1140  /* If the table has columns, then specify the columns */
1141  if (lrel.natts)
1142  {
1143  appendStringInfoString(&cmd, " (");
1144 
1145  /*
1146  * XXX Do we need to list the columns in all cases? Maybe we're
1147  * replicating all columns?
1148  */
1149  for (int i = 0; i < lrel.natts; i++)
1150  {
1151  if (i > 0)
1152  appendStringInfoString(&cmd, ", ");
1153 
1155  }
1156 
1157  appendStringInfoString(&cmd, ")");
1158  }
1159 
1160  appendStringInfoString(&cmd, " TO STDOUT");
1161  }
1162  else
1163  {
1164  /*
1165  * For non-tables and tables with row filters, we need to do COPY
1166  * (SELECT ...), but we can't just do SELECT * because we need to not
1167  * copy generated columns. For tables with any row filters, build a
1168  * SELECT query with OR'ed row filters for COPY.
1169  */
1170  appendStringInfoString(&cmd, "COPY (SELECT ");
1171  for (int i = 0; i < lrel.natts; i++)
1172  {
1174  if (i < lrel.natts - 1)
1175  appendStringInfoString(&cmd, ", ");
1176  }
1177 
1178  appendStringInfoString(&cmd, " FROM ");
1179 
1180  /*
1181  * For regular tables, make sure we don't copy data from a child that
1182  * inherits the named table as those will be copied separately.
1183  */
1184  if (lrel.relkind == RELKIND_RELATION)
1185  appendStringInfoString(&cmd, "ONLY ");
1186 
1188  /* list of OR'ed filters */
1189  if (qual != NIL)
1190  {
1191  ListCell *lc;
1192  char *q = strVal(linitial(qual));
1193 
1194  appendStringInfo(&cmd, " WHERE %s", q);
1195  for_each_from(lc, qual, 1)
1196  {
1197  q = strVal(lfirst(lc));
1198  appendStringInfo(&cmd, " OR %s", q);
1199  }
1200  list_free_deep(qual);
1201  }
1202 
1203  appendStringInfoString(&cmd, ") TO STDOUT");
1204  }
1205 
1206  /*
1207  * Prior to v16, initial table synchronization will use text format even
1208  * if the binary option is enabled for a subscription.
1209  */
1212  {
1213  appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1214  options = list_make1(makeDefElem("format",
1215  (Node *) makeString("binary"), -1));
1216  }
1217 
1218  res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1219  pfree(cmd.data);
1220  if (res->status != WALRCV_OK_COPY_OUT)
1221  ereport(ERROR,
1222  (errcode(ERRCODE_CONNECTION_FAILURE),
1223  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1224  lrel.nspname, lrel.relname, res->err)));
1226 
1227  copybuf = makeStringInfo();
1228 
1229  pstate = make_parsestate(NULL);
1230  (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1231  NULL, false, false);
1232 
1233  attnamelist = make_copy_attnamelist(relmapentry);
1234  cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1235 
1236  /* Do the copy */
1237  (void) CopyFrom(cstate);
1238 
1239  logicalrep_rel_close(relmapentry, NoLock);
1240 }
Subscription * MySubscription
Definition: worker.c:323
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copyfrom.c:1373
uint64 CopyFrom(CopyFromState cstate)
Definition: copyfrom.c:633
int errcode(int sqlerrcode)
Definition: elog.c:860
int errmsg(const char *fmt,...)
Definition: elog.c:1075
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
void list_free_deep(List *list)
Definition: list.c:1560
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3321
DefElem * makeDefElem(char *name, Node *arg, int location)
Definition: makefuncs.c:549
void pfree(void *pointer)
Definition: mcxt.c:1431
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:44
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
#define lfirst(lc)
Definition: pg_list.h:172
#define list_make1(x1)
Definition: pg_list.h:212
#define for_each_from(cell, lst, N)
Definition: pg_list.h:414
#define linitial(l)
Definition: pg_list.h:178
#define RelationGetRelationName(relation)
Definition: rel.h:538
#define RelationGetNamespace(relation)
Definition: rel.h:545
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:11975
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:12059
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:165
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:328
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:474
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:182
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: pg_list.h:54
LogicalRepRelId remoteid
Definition: logicalproto.h:107
Definition: nodes.h:129
static int copy_read_data(void *outbuf, int minread, int maxread)
Definition: tablesync.c:711
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual)
Definition: tablesync.c:787
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
Definition: tablesync.c:691
String * makeString(char *str)
Definition: value.c:63
#define strVal(v)
Definition: value.h:82
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:208
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:468
#define walrcv_server_version(conn)
Definition: walreceiver.h:444
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:462

References AccessShareLock, addRangeTableEntryForRelation(), appendStringInfo(), appendStringInfoString(), Assert(), LogicalRepRelation::attnames, BeginCopyFrom(), Subscription::binary, copy_read_data(), copybuf, CopyFrom(), StringInfoData::data, ereport, errcode(), errmsg(), ERROR, fetch_remote_table_info(), for_each_from, get_namespace_name(), i, initStringInfo(), lfirst, linitial, list_free_deep(), list_make1, LogicalRepRelMapEntry::localrel, logicalrep_rel_close(), logicalrep_rel_open(), logicalrep_relmap_update(), LogRepWorkerWalRcvConn, make_copy_attnamelist(), make_parsestate(), makeDefElem(), makeString(), makeStringInfo(), MySubscription, LogicalRepRelation::natts, NIL, NoLock, LogicalRepRelation::nspname, pfree(), quote_identifier(), quote_qualified_identifier(), RelationGetNamespace, RelationGetRelationName, LogicalRepRelation::relkind, LogicalRepRelation::relname, LogicalRepRelation::remoteid, res, strVal, walrcv_clear_result(), walrcv_exec, WALRCV_OK_COPY_OUT, and walrcv_server_version.

Referenced by LogicalRepSyncTableStart().

◆ fetch_remote_table_info()

static void fetch_remote_table_info ( char *  nspname,
char *  relname,
LogicalRepRelation lrel,
List **  qual 
)
static

Definition at line 787 of file tablesync.c.

789 {
791  StringInfoData cmd;
792  TupleTableSlot *slot;
793  Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
794  Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
795  Oid qualRow[] = {TEXTOID};
796  bool isnull;
797  int natt;
798  ListCell *lc;
799  Bitmapset *included_cols = NULL;
800 
801  lrel->nspname = nspname;
802  lrel->relname = relname;
803 
804  /* First fetch Oid and replica identity. */
805  initStringInfo(&cmd);
806  appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
807  " FROM pg_catalog.pg_class c"
808  " INNER JOIN pg_catalog.pg_namespace n"
809  " ON (c.relnamespace = n.oid)"
810  " WHERE n.nspname = %s"
811  " AND c.relname = %s",
812  quote_literal_cstr(nspname),
815  lengthof(tableRow), tableRow);
816 
817  if (res->status != WALRCV_OK_TUPLES)
818  ereport(ERROR,
819  (errcode(ERRCODE_CONNECTION_FAILURE),
820  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
821  nspname, relname, res->err)));
822 
823  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
824  if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
825  ereport(ERROR,
826  (errcode(ERRCODE_UNDEFINED_OBJECT),
827  errmsg("table \"%s.%s\" not found on publisher",
828  nspname, relname)));
829 
830  lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
831  Assert(!isnull);
832  lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
833  Assert(!isnull);
834  lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
835  Assert(!isnull);
836 
839 
840 
841  /*
842  * Get column lists for each relation.
843  *
844  * We need to do this before fetching info about column names and types,
845  * so that we can skip columns that should not be replicated.
846  */
848  {
849  WalRcvExecResult *pubres;
850  TupleTableSlot *tslot;
851  Oid attrsRow[] = {INT2VECTOROID};
852  StringInfoData pub_names;
853 
854  initStringInfo(&pub_names);
855  foreach(lc, MySubscription->publications)
856  {
857  if (foreach_current_index(lc) > 0)
858  appendStringInfoString(&pub_names, ", ");
860  }
861 
862  /*
863  * Fetch info about column lists for the relation (from all the
864  * publications).
865  */
866  resetStringInfo(&cmd);
867  appendStringInfo(&cmd,
868  "SELECT DISTINCT"
869  " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
870  " THEN NULL ELSE gpt.attrs END)"
871  " FROM pg_publication p,"
872  " LATERAL pg_get_publication_tables(p.pubname) gpt,"
873  " pg_class c"
874  " WHERE gpt.relid = %u AND c.oid = gpt.relid"
875  " AND p.pubname IN ( %s )",
876  lrel->remoteid,
877  pub_names.data);
878 
880  lengthof(attrsRow), attrsRow);
881 
882  if (pubres->status != WALRCV_OK_TUPLES)
883  ereport(ERROR,
884  (errcode(ERRCODE_CONNECTION_FAILURE),
885  errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
886  nspname, relname, pubres->err)));
887 
888  /*
889  * We don't support the case where the column list is different for
890  * the same table when combining publications. See comments atop
891  * fetch_table_list. So there should be only one row returned.
892  * Although we already checked this when creating the subscription, we
893  * still need to check here in case the column list was changed after
894  * creating the subscription and before the sync worker is started.
895  */
896  if (tuplestore_tuple_count(pubres->tuplestore) > 1)
897  ereport(ERROR,
898  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
899  errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
900  nspname, relname));
901 
902  /*
903  * Get the column list and build a single bitmap with the attnums.
904  *
905  * If we find a NULL value, it means all the columns should be
906  * replicated.
907  */
909  if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
910  {
911  Datum cfval = slot_getattr(tslot, 1, &isnull);
912 
913  if (!isnull)
914  {
915  ArrayType *arr;
916  int nelems;
917  int16 *elems;
918 
919  arr = DatumGetArrayTypeP(cfval);
920  nelems = ARR_DIMS(arr)[0];
921  elems = (int16 *) ARR_DATA_PTR(arr);
922 
923  for (natt = 0; natt < nelems; natt++)
924  included_cols = bms_add_member(included_cols, elems[natt]);
925  }
926 
927  ExecClearTuple(tslot);
928  }
930 
931  walrcv_clear_result(pubres);
932 
933  pfree(pub_names.data);
934  }
935 
936  /*
937  * Now fetch column names and types.
938  */
939  resetStringInfo(&cmd);
940  appendStringInfo(&cmd,
941  "SELECT a.attnum,"
942  " a.attname,"
943  " a.atttypid,"
944  " a.attnum = ANY(i.indkey)"
945  " FROM pg_catalog.pg_attribute a"
946  " LEFT JOIN pg_catalog.pg_index i"
947  " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
948  " WHERE a.attnum > 0::pg_catalog.int2"
949  " AND NOT a.attisdropped %s"
950  " AND a.attrelid = %u"
951  " ORDER BY a.attnum",
952  lrel->remoteid,
954  "AND a.attgenerated = ''" : ""),
955  lrel->remoteid);
957  lengthof(attrRow), attrRow);
958 
959  if (res->status != WALRCV_OK_TUPLES)
960  ereport(ERROR,
961  (errcode(ERRCODE_CONNECTION_FAILURE),
962  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
963  nspname, relname, res->err)));
964 
965  /* We don't know the number of rows coming, so allocate enough space. */
966  lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
967  lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
968  lrel->attkeys = NULL;
969 
970  /*
971  * Store the columns as a list of names. Ignore those that are not
972  * present in the column list, if there is one.
973  */
974  natt = 0;
975  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
976  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
977  {
978  char *rel_colname;
980 
981  attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
982  Assert(!isnull);
983 
984  /* If the column is not in the column list, skip it. */
985  if (included_cols != NULL && !bms_is_member(attnum, included_cols))
986  {
987  ExecClearTuple(slot);
988  continue;
989  }
990 
991  rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
992  Assert(!isnull);
993 
994  lrel->attnames[natt] = rel_colname;
995  lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
996  Assert(!isnull);
997 
998  if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
999  lrel->attkeys = bms_add_member(lrel->attkeys, natt);
1000 
1001  /* Should never happen. */
1002  if (++natt >= MaxTupleAttributeNumber)
1003  elog(ERROR, "too many columns in remote table \"%s.%s\"",
1004  nspname, relname);
1005 
1006  ExecClearTuple(slot);
1007  }
1009 
1010  lrel->natts = natt;
1011 
1013 
1014  /*
1015  * Get relation's row filter expressions. DISTINCT avoids the same
1016  * expression of a table in multiple publications from being included
1017  * multiple times in the final expression.
1018  *
1019  * We need to copy the row even if it matches just one of the
1020  * publications, so we later combine all the quals with OR.
1021  *
1022  * For initial synchronization, row filtering can be ignored in following
1023  * cases:
1024  *
1025  * 1) one of the subscribed publications for the table hasn't specified
1026  * any row filter
1027  *
1028  * 2) one of the subscribed publications has puballtables set to true
1029  *
1030  * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1031  * that includes this relation
1032  */
1034  {
1035  StringInfoData pub_names;
1036 
1037  /* Build the pubname list. */
1038  initStringInfo(&pub_names);
1040  {
1041  char *pubname = strVal(pubstr);
1042 
1043  if (foreach_current_index(pubstr) > 0)
1044  appendStringInfoString(&pub_names, ", ");
1045 
1046  appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
1047  }
1048 
1049  /* Check for row filters. */
1050  resetStringInfo(&cmd);
1051  appendStringInfo(&cmd,
1052  "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1053  " FROM pg_publication p,"
1054  " LATERAL pg_get_publication_tables(p.pubname) gpt"
1055  " WHERE gpt.relid = %u"
1056  " AND p.pubname IN ( %s )",
1057  lrel->remoteid,
1058  pub_names.data);
1059 
1060  res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1061 
1062  if (res->status != WALRCV_OK_TUPLES)
1063  ereport(ERROR,
1064  (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1065  nspname, relname, res->err)));
1066 
1067  /*
1068  * Multiple row filter expressions for the same table will be combined
1069  * by COPY using OR. If any of the filter expressions for this table
1070  * are null, it means the whole table will be copied. In this case it
1071  * is not necessary to construct a unified row filter expression at
1072  * all.
1073  */
1074  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1075  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1076  {
1077  Datum rf = slot_getattr(slot, 1, &isnull);
1078 
1079  if (!isnull)
1080  *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1081  else
1082  {
1083  /* Ignore filters and cleanup as necessary. */
1084  if (*qual)
1085  {
1086  list_free_deep(*qual);
1087  *qual = NIL;
1088  }
1089  break;
1090  }
1091 
1092  ExecClearTuple(slot);
1093  }
1095 
1097  }
1098 
1099  pfree(cmd.data);
1100 }
#define ARR_DATA_PTR(a)
Definition: array.h:322
#define DatumGetArrayTypeP(X)
Definition: array.h:261
#define ARR_DIMS(a)
Definition: array.h:294
int16 AttrNumber
Definition: attnum.h:21
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:523
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:828
#define TextDatumGetCString(d)
Definition: builtins.h:98
signed short int16
Definition: c.h:482
#define lengthof(array)
Definition: c.h:777
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1253
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1237
#define MaxTupleAttributeNumber
Definition: htup_details.h:34
List * lappend(List *list, void *datum)
Definition: list.c:339
void * palloc0(Size size)
Definition: mcxt.c:1232
int16 attnum
Definition: pg_attribute.h:74
NameData relname
Definition: pg_class.h:38
#define foreach_current_index(var_or_cell)
Definition: pg_list.h:403
#define foreach_node(type, var, lst)
Definition: pg_list.h:496
static bool DatumGetBool(Datum X)
Definition: postgres.h:90
uintptr_t Datum
Definition: postgres.h:64
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:242
static char DatumGetChar(Datum X)
Definition: postgres.h:112
static int16 DatumGetInt16(Datum X)
Definition: postgres.h:162
unsigned int Oid
Definition: postgres_ext.h:31
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:78
Bitmapset * attkeys
Definition: logicalproto.h:115
Definition: value.h:64
Tuplestorestate * tuplestore
Definition: walreceiver.h:222
TupleDesc tupledesc
Definition: walreceiver.h:223
WalRcvExecStatus status
Definition: walreceiver.h:219
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
int64 tuplestore_tuple_count(Tuplestorestate *state)
Definition: tuplestore.c:546
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:433
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:389
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:206

References appendStringInfo(), appendStringInfoString(), ARR_DATA_PTR, ARR_DIMS, Assert(), LogicalRepRelation::attkeys, LogicalRepRelation::attnames, attnum, LogicalRepRelation::atttyps, bms_add_member(), bms_is_member(), StringInfoData::data, DatumGetArrayTypeP, DatumGetBool(), DatumGetChar(), DatumGetInt16(), DatumGetObjectId(), elog(), ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), foreach_current_index, foreach_node, initStringInfo(), lappend(), lengthof, lfirst, list_free_deep(), LogRepWorkerWalRcvConn, MakeSingleTupleTableSlot(), makeString(), MaxTupleAttributeNumber, MySubscription, LogicalRepRelation::natts, NIL, LogicalRepRelation::nspname, palloc0(), pfree(), Subscription::publications, quote_literal_cstr(), LogicalRepRelation::relkind, relname, LogicalRepRelation::relname, LogicalRepRelation::remoteid, LogicalRepRelation::replident, res, resetStringInfo(), slot_getattr(), WalRcvExecResult::status, strVal, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), tuplestore_tuple_count(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, and walrcv_server_version.

Referenced by copy_table().

◆ FetchTableStates()

static bool FetchTableStates ( bool started_tx)
static

Definition at line 1565 of file tablesync.c.

1566 {
1567  static bool has_subrels = false;
1568 
1569  *started_tx = false;
1570 
1571  if (!table_states_valid)
1572  {
1573  MemoryContext oldctx;
1574  List *rstates;
1575  ListCell *lc;
1576  SubscriptionRelState *rstate;
1577 
1578  /* Clean the old lists. */
1581 
1582  if (!IsTransactionState())
1583  {
1585  *started_tx = true;
1586  }
1587 
1588  /* Fetch all non-ready tables. */
1589  rstates = GetSubscriptionRelations(MySubscription->oid, true);
1590 
1591  /* Allocate the tracking info in a permanent memory context. */
1593  foreach(lc, rstates)
1594  {
1595  rstate = palloc(sizeof(SubscriptionRelState));
1596  memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1598  }
1599  MemoryContextSwitchTo(oldctx);
1600 
1601  /*
1602  * Does the subscription have tables?
1603  *
1604  * If there were not-READY relations found then we know it does. But
1605  * if table_states_not_ready was empty we still need to check again to
1606  * see if there are 0 tables.
1607  */
1608  has_subrels = (table_states_not_ready != NIL) ||
1610 
1611  table_states_valid = true;
1612  }
1613 
1614  return has_subrels;
1615 }
MemoryContext CacheMemoryContext
Definition: mcxt.c:144
void * palloc(Size size)
Definition: mcxt.c:1201
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
List * GetSubscriptionRelations(Oid subid, bool not_ready)
bool HasSubscriptionRelations(Oid subid)
static bool table_states_valid
Definition: tablesync.c:126
bool IsTransactionState(void)
Definition: xact.c:378
void StartTransactionCommand(void)
Definition: xact.c:2953

References CacheMemoryContext, GetSubscriptionRelations(), HasSubscriptionRelations(), IsTransactionState(), lappend(), lfirst, list_free_deep(), MemoryContextSwitchTo(), MySubscription, NIL, Subscription::oid, palloc(), StartTransactionCommand(), table_states_not_ready, and table_states_valid.

Referenced by AllTablesyncsReady(), and process_syncing_tables_for_apply().

◆ invalidate_syncing_table_states()

void invalidate_syncing_table_states ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)

Definition at line 274 of file tablesync.c.

275 {
276  table_states_valid = false;
277 }

References table_states_valid.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ LogicalRepSyncTableStart()

static char* LogicalRepSyncTableStart ( XLogRecPtr origin_startpos)
static

Definition at line 1276 of file tablesync.c.

1277 {
1278  char *slotname;
1279  char *err;
1280  char relstate;
1281  XLogRecPtr relstate_lsn;
1282  Relation rel;
1283  AclResult aclresult;
1285  char originname[NAMEDATALEN];
1286  RepOriginId originid;
1287  UserContext ucxt;
1288  bool must_use_password;
1289  bool run_as_owner;
1290 
1291  /* Check the state of the table synchronization. */
1295  &relstate_lsn);
1297 
1298  /* Is the use of a password mandatory? */
1299  must_use_password = MySubscription->passwordrequired &&
1301 
1303  MyLogicalRepWorker->relstate = relstate;
1304  MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1306 
1307  /*
1308  * If synchronization is already done or no longer necessary, exit now
1309  * that we've updated shared memory state.
1310  */
1311  switch (relstate)
1312  {
1313  case SUBREL_STATE_SYNCDONE:
1314  case SUBREL_STATE_READY:
1315  case SUBREL_STATE_UNKNOWN:
1316  finish_sync_worker(); /* doesn't return */
1317  }
1318 
1319  /* Calculate the name of the tablesync slot. */
1320  slotname = (char *) palloc(NAMEDATALEN);
1323  slotname,
1324  NAMEDATALEN);
1325 
1326  /*
1327  * Here we use the slot name instead of the subscription name as the
1328  * application_name, so that it is different from the leader apply worker,
1329  * so that synchronous replication can distinguish them.
1330  */
1332  walrcv_connect(MySubscription->conninfo, true, true,
1333  must_use_password,
1334  slotname, &err);
1335  if (LogRepWorkerWalRcvConn == NULL)
1336  ereport(ERROR,
1337  (errcode(ERRCODE_CONNECTION_FAILURE),
1338  errmsg("could not connect to the publisher: %s", err)));
1339 
1340  Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1341  MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1342  MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1343 
1344  /* Assign the origin tracking record name. */
1347  originname,
1348  sizeof(originname));
1349 
1350  if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1351  {
1352  /*
1353  * We have previously errored out before finishing the copy so the
1354  * replication slot might exist. We want to remove the slot if it
1355  * already exists and proceed.
1356  *
1357  * XXX We could also instead try to drop the slot, last time we failed
1358  * but for that, we might need to clean up the copy state as it might
1359  * be in the middle of fetching the rows. Also, if there is a network
1360  * breakdown then it wouldn't have succeeded so trying it next time
1361  * seems like a better bet.
1362  */
1364  }
1365  else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1366  {
1367  /*
1368  * The COPY phase was previously done, but tablesync then crashed
1369  * before it was able to finish normally.
1370  */
1372 
1373  /*
1374  * The origin tracking name must already exist. It was created first
1375  * time this tablesync was launched.
1376  */
1377  originid = replorigin_by_name(originname, false);
1378  replorigin_session_setup(originid, 0);
1379  replorigin_session_origin = originid;
1380  *origin_startpos = replorigin_session_get_progress(false);
1381 
1383 
1384  goto copy_table_done;
1385  }
1386 
1388  MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1391 
1392  /* Update the state and make it visible to others. */
1399  pgstat_report_stat(true);
1400 
1402 
1403  /*
1404  * Use a standard write lock here. It might be better to disallow access
1405  * to the table while it's being synchronized. But we don't want to block
1406  * the main apply process from working and it has to open the relation in
1407  * RowExclusiveLock when remapping remote relation id to local one.
1408  */
1410 
1411  /*
1412  * Start a transaction in the remote node in REPEATABLE READ mode. This
1413  * ensures that both the replication slot we create (see below) and the
1414  * COPY are consistent with each other.
1415  */
1417  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1418  0, NULL);
1419  if (res->status != WALRCV_OK_COMMAND)
1420  ereport(ERROR,
1421  (errcode(ERRCODE_CONNECTION_FAILURE),
1422  errmsg("table copy could not start transaction on publisher: %s",
1423  res->err)));
1425 
1426  /*
1427  * Create a new permanent logical decoding slot. This slot will be used
1428  * for the catchup phase after COPY is done, so tell it to use the
1429  * snapshot to make the final data consistent.
1430  */
1432  slotname, false /* permanent */ , false /* two_phase */ ,
1434  CRS_USE_SNAPSHOT, origin_startpos);
1435 
1436  /*
1437  * Setup replication origin tracking. The purpose of doing this before the
1438  * copy is to avoid doing the copy again due to any error in setting up
1439  * origin tracking.
1440  */
1441  originid = replorigin_by_name(originname, true);
1442  if (!OidIsValid(originid))
1443  {
1444  /*
1445  * Origin tracking does not exist, so create it now.
1446  *
1447  * Then advance to the LSN got from walrcv_create_slot. This is WAL
1448  * logged for the purpose of recovery. Locks are to prevent the
1449  * replication origin from vanishing while advancing.
1450  */
1451  originid = replorigin_create(originname);
1452 
1453  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1454  replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1455  true /* go backward */ , true /* WAL log */ );
1456  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1457 
1458  replorigin_session_setup(originid, 0);
1459  replorigin_session_origin = originid;
1460  }
1461  else
1462  {
1463  ereport(ERROR,
1465  errmsg("replication origin \"%s\" already exists",
1466  originname)));
1467  }
1468 
1469  /*
1470  * Make sure that the copy command runs as the table owner, unless the
1471  * user has opted out of that behaviour.
1472  */
1473  run_as_owner = MySubscription->runasowner;
1474  if (!run_as_owner)
1475  SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1476 
1477  /*
1478  * Check that our table sync worker has permission to insert into the
1479  * target table.
1480  */
1481  aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1482  ACL_INSERT);
1483  if (aclresult != ACLCHECK_OK)
1484  aclcheck_error(aclresult,
1485  get_relkind_objtype(rel->rd_rel->relkind),
1487 
1488  /*
1489  * COPY FROM does not honor RLS policies. That is not a problem for
1490  * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1491  * who has it implicitly), but other roles should not be able to
1492  * circumvent RLS. Disallow logical replication into RLS enabled
1493  * relations for such roles.
1494  */
1496  ereport(ERROR,
1497  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1498  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1499  GetUserNameFromId(GetUserId(), true),
1500  RelationGetRelationName(rel))));
1501 
1502  /* Now do the initial data copy */
1504  copy_table(rel);
1506 
1507  res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1508  if (res->status != WALRCV_OK_COMMAND)
1509  ereport(ERROR,
1510  (errcode(ERRCODE_CONNECTION_FAILURE),
1511  errmsg("table copy could not finish transaction on publisher: %s",
1512  res->err)));
1514 
1515  if (!run_as_owner)
1516  RestoreUserContext(&ucxt);
1517 
1518  table_close(rel, NoLock);
1519 
1520  /* Make the copy visible. */
1522 
1523  /*
1524  * Update the persisted state to indicate the COPY phase is done; make it
1525  * visible to others.
1526  */
1529  SUBREL_STATE_FINISHEDCOPY,
1531 
1533 
1534 copy_table_done:
1535 
1536  elog(DEBUG1,
1537  "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1538  originname, LSN_FORMAT_ARGS(*origin_startpos));
1539 
1540  /*
1541  * We are done with the initial data synchronization, update the state.
1542  */
1544  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1545  MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1547 
1548  /*
1549  * Finally, wait until the leader apply worker tells us to catch up and
1550  * then return to let LogicalRepApplyLoop do it.
1551  */
1552  wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1553  return slotname;
1554 }
AclResult
Definition: acl.h:181
@ ACLCHECK_OK
Definition: acl.h:182
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2701
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4081
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:453
#define OidIsValid(objectId)
Definition: c.h:764
#define DEBUG1
Definition: elog.h:30
void err(int eval, const char *fmt,...)
Definition: err.c:43
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:61
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:228
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:109
#define RowExclusiveLock
Definition: lockdefs.h:38
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:981
Oid GetUserId(void)
Definition: miscinit.c:515
ObjectType get_relkind_objtype(char relkind)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:222
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:253
RepOriginId replorigin_session_origin
Definition: origin.c:156
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:889
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1098
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1238
#define ACL_INSERT
Definition: parsenodes.h:76
#define NAMEDATALEN
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationGetRelid(relation)
Definition: rel.h:504
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:223
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:655
void PopActiveSnapshot(void)
Definition: snapmgr.c:750
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
XLogRecPtr relstate_lsn
Form_pg_class rd_rel
Definition: rel.h:111
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:225
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1260
static void copy_table(Relation rel)
Definition: tablesync.c:1108
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:432
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:204
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
Definition: walreceiver.h:456
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
void CommandCounterIncrement(void)
Definition: xact.c:1078
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References ACL_INSERT, aclcheck_error(), ACLCHECK_OK, Assert(), check_enable_rls(), CommandCounterIncrement(), CommitTransactionCommand(), Subscription::conninfo, copy_table(), CRS_USE_SNAPSHOT, DEBUG1, elog(), ereport, err(), errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, Subscription::failover, get_relkind_objtype(), GetSubscriptionRelState(), GetTransactionSnapshot(), GetUserId(), GetUserNameFromId(), InvalidOid, InvalidXLogRecPtr, LockRelationOid(), LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, MyLogicalRepWorker, MySubscription, NAMEDATALEN, NoLock, Subscription::oid, OidIsValid, Subscription::ownersuperuser, palloc(), Subscription::passwordrequired, pg_class_aclcheck(), pgstat_report_stat(), PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_advance(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), res, RestoreUserContext(), RLS_ENABLED, RowExclusiveLock, Subscription::runasowner, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), LogicalRepWorker::subid, SwitchToUntrustedUser(), table_close(), table_open(), UnlockRelationOid(), UpdateSubscriptionRelState(), wait_for_worker_state_change(), walrcv_clear_result(), walrcv_connect, walrcv_create_slot, walrcv_exec, and WALRCV_OK_COMMAND.

Referenced by start_table_sync().

◆ make_copy_attnamelist()

static List* make_copy_attnamelist ( LogicalRepRelMapEntry rel)
static

Definition at line 691 of file tablesync.c.

692 {
693  List *attnamelist = NIL;
694  int i;
695 
696  for (i = 0; i < rel->remoterel.natts; i++)
697  {
698  attnamelist = lappend(attnamelist,
699  makeString(rel->remoterel.attnames[i]));
700  }
701 
702 
703  return attnamelist;
704 }
LogicalRepRelation remoterel

References LogicalRepRelation::attnames, i, lappend(), makeString(), LogicalRepRelation::natts, NIL, and LogicalRepRelMapEntry::remoterel.

Referenced by copy_table().

◆ pg_attribute_noreturn()

static void pg_attribute_noreturn ( )
static

Definition at line 136 of file tablesync.c.

138 {
139  /*
140  * Commit any outstanding transaction. This is the usual case, unless
141  * there was nothing to do for the table.
142  */
143  if (IsTransactionState())
144  {
146  pgstat_report_stat(true);
147  }
148 
149  /* And flush all writes. */
151 
153  ereport(LOG,
154  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
158 
159  /* Find the leader apply worker and signal it. */
161 
162  /* Stop gracefully */
163  proc_exit(0);
164 }
#define LOG
Definition: elog.h:31
void proc_exit(int code)
Definition: ipc.c:104
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:682
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1905
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:9288
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2733

References CommitTransactionCommand(), ereport, errmsg(), get_rel_name(), GetXLogWriteRecPtr(), InvalidOid, IsTransactionState(), LOG, logicalrep_worker_wakeup(), MyLogicalRepWorker, MySubscription, Subscription::name, pgstat_report_stat(), proc_exit(), LogicalRepWorker::relid, StartTransactionCommand(), LogicalRepWorker::subid, and XLogFlush().

◆ process_syncing_tables()

void process_syncing_tables ( XLogRecPtr  current_lsn)

Definition at line 660 of file tablesync.c.

661 {
662  switch (MyLogicalRepWorker->type)
663  {
665 
666  /*
667  * Skip for parallel apply workers because they only operate on
668  * tables that are in a READY state. See pa_can_start() and
669  * should_apply_changes_for_rel().
670  */
671  break;
672 
674  process_syncing_tables_for_sync(current_lsn);
675  break;
676 
677  case WORKERTYPE_APPLY:
679  break;
680 
681  case WORKERTYPE_UNKNOWN:
682  /* Should never happen. */
683  elog(ERROR, "Unknown worker type");
684  }
685 }
LogicalRepWorkerType type
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:411
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:288
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY

References elog(), ERROR, MyLogicalRepWorker, process_syncing_tables_for_apply(), process_syncing_tables_for_sync(), LogicalRepWorker::type, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_TABLESYNC, and WORKERTYPE_UNKNOWN.

Referenced by apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_commit(), apply_handle_stream_prepare(), and LogicalRepApplyLoop().

◆ process_syncing_tables_for_apply()

static void process_syncing_tables_for_apply ( XLogRecPtr  current_lsn)
static

Definition at line 411 of file tablesync.c.

412 {
413  struct tablesync_start_time_mapping
414  {
415  Oid relid;
416  TimestampTz last_start_time;
417  };
418  static HTAB *last_start_times = NULL;
419  ListCell *lc;
420  bool started_tx = false;
421  bool should_exit = false;
422 
424 
425  /* We need up-to-date sync state info for subscription tables here. */
426  FetchTableStates(&started_tx);
427 
428  /*
429  * Prepare a hash table for tracking last start times of workers, to avoid
430  * immediate restarts. We don't need it if there are no tables that need
431  * syncing.
432  */
434  {
435  HASHCTL ctl;
436 
437  ctl.keysize = sizeof(Oid);
438  ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
439  last_start_times = hash_create("Logical replication table sync worker start times",
440  256, &ctl, HASH_ELEM | HASH_BLOBS);
441  }
442 
443  /*
444  * Clean up the hash table when we're done with all tables (just to
445  * release the bit of memory).
446  */
448  {
450  last_start_times = NULL;
451  }
452 
453  /*
454  * Process all tables that are being synchronized.
455  */
456  foreach(lc, table_states_not_ready)
457  {
459 
460  if (rstate->state == SUBREL_STATE_SYNCDONE)
461  {
462  /*
463  * Apply has caught up to the position where the table sync has
464  * finished. Mark the table as ready so that the apply will just
465  * continue to replicate it normally.
466  */
467  if (current_lsn >= rstate->lsn)
468  {
469  char originname[NAMEDATALEN];
470 
471  rstate->state = SUBREL_STATE_READY;
472  rstate->lsn = current_lsn;
473  if (!started_tx)
474  {
476  started_tx = true;
477  }
478 
479  /*
480  * Remove the tablesync origin tracking if exists.
481  *
482  * There is a chance that the user is concurrently performing
483  * refresh for the subscription where we remove the table
484  * state and its origin or the tablesync worker would have
485  * already removed this origin. We can't rely on tablesync
486  * worker to remove the origin tracking as if there is any
487  * error while dropping we won't restart it to drop the
488  * origin. So passing missing_ok = true.
489  */
491  rstate->relid,
492  originname,
493  sizeof(originname));
494  replorigin_drop_by_name(originname, true, false);
495 
496  /*
497  * Update the state to READY only after the origin cleanup.
498  */
500  rstate->relid, rstate->state,
501  rstate->lsn);
502  }
503  }
504  else
505  {
506  LogicalRepWorker *syncworker;
507 
508  /*
509  * Look for a sync worker for this relation.
510  */
511  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
512 
514  rstate->relid, false);
515 
516  if (syncworker)
517  {
518  /* Found one, update our copy of its state */
519  SpinLockAcquire(&syncworker->relmutex);
520  rstate->state = syncworker->relstate;
521  rstate->lsn = syncworker->relstate_lsn;
522  if (rstate->state == SUBREL_STATE_SYNCWAIT)
523  {
524  /*
525  * Sync worker is waiting for apply. Tell sync worker it
526  * can catchup now.
527  */
528  syncworker->relstate = SUBREL_STATE_CATCHUP;
529  syncworker->relstate_lsn =
530  Max(syncworker->relstate_lsn, current_lsn);
531  }
532  SpinLockRelease(&syncworker->relmutex);
533 
534  /* If we told worker to catch up, wait for it. */
535  if (rstate->state == SUBREL_STATE_SYNCWAIT)
536  {
537  /* Signal the sync worker, as it may be waiting for us. */
538  if (syncworker->proc)
539  logicalrep_worker_wakeup_ptr(syncworker);
540 
541  /* Now safe to release the LWLock */
542  LWLockRelease(LogicalRepWorkerLock);
543 
544  if (started_tx)
545  {
546  /*
547  * We must commit the existing transaction to release
548  * the existing locks before entering a busy loop.
549  * This is required to avoid any undetected deadlocks
550  * due to any existing lock as deadlock detector won't
551  * be able to detect the waits on the latch.
552  */
554  pgstat_report_stat(false);
555  }
556 
557  /*
558  * Enter busy loop and wait for synchronization worker to
559  * reach expected state (or die trying).
560  */
562  started_tx = true;
563 
565  SUBREL_STATE_SYNCDONE);
566  }
567  else
568  LWLockRelease(LogicalRepWorkerLock);
569  }
570  else
571  {
572  /*
573  * If there is no sync worker for this table yet, count
574  * running sync workers for this subscription, while we have
575  * the lock.
576  */
577  int nsyncworkers =
579 
580  /* Now safe to release the LWLock */
581  LWLockRelease(LogicalRepWorkerLock);
582 
583  /*
584  * If there are free sync worker slot(s), start a new sync
585  * worker for the table.
586  */
587  if (nsyncworkers < max_sync_workers_per_subscription)
588  {
590  struct tablesync_start_time_mapping *hentry;
591  bool found;
592 
593  hentry = hash_search(last_start_times, &rstate->relid,
594  HASH_ENTER, &found);
595 
596  if (!found ||
597  TimestampDifferenceExceeds(hentry->last_start_time, now,
599  {
605  rstate->relid,
607  hentry->last_start_time = now;
608  }
609  }
610  }
611  }
612  }
613 
614  if (started_tx)
615  {
616  /*
617  * Even when the two_phase mode is requested by the user, it remains
618  * as 'pending' until all tablesyncs have reached READY state.
619  *
620  * When this happens, we restart the apply worker and (if the
621  * conditions are still ok) then the two_phase tri-state will become
622  * 'enabled' at that time.
623  *
624  * Note: If the subscription has no tables then leave the state as
625  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
626  * work.
627  */
629  {
630  CommandCounterIncrement(); /* make updates visible */
631  if (AllTablesyncsReady())
632  {
633  ereport(LOG,
634  (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
635  MySubscription->name)));
636  should_exit = true;
637  }
638  }
639 
641  pgstat_report_stat(true);
642  }
643 
644  if (should_exit)
645  {
646  /*
647  * Reset the last-start time for this worker so that the launcher will
648  * restart it without waiting for wal_retrieve_retry_interval.
649  */
651 
652  proc_exit(0);
653  }
654 }
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1791
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1655
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1619
#define Max(x, y)
Definition: c.h:987
int64 TimestampTz
Definition: timestamp.h:39
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:863
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
Definition: launcher.c:306
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:249
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:702
static dshash_table * last_start_times
Definition: launcher.c:95
int max_sync_workers_per_subscription
Definition: launcher.c:58
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:854
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1081
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:117
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:412
#define LOGICALREP_TWOPHASE_STATE_PENDING
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
Definition: dynahash.c:220
bool AllTablesyncsReady(void)
Definition: tablesync.c:1714
static bool wait_for_relation_state_change(Oid relid, char expected_state)
Definition: tablesync.c:177
int wal_retrieve_retry_interval
Definition: xlog.c:138

References AllTablesyncsReady(), ApplyLauncherForgetWorkerStartTime(), Assert(), CommandCounterIncrement(), CommitTransactionCommand(), LogicalRepWorker::dbid, DSM_HANDLE_INVALID, HASHCTL::entrysize, ereport, errmsg(), FetchTableStates(), GetCurrentTimestamp(), HASH_BLOBS, hash_create(), hash_destroy(), HASH_ELEM, HASH_ENTER, hash_search(), IsTransactionState(), HASHCTL::keysize, last_start_times, lfirst, LOG, logicalrep_sync_worker_count(), LOGICALREP_TWOPHASE_STATE_PENDING, logicalrep_worker_find(), logicalrep_worker_launch(), logicalrep_worker_wakeup_ptr(), SubscriptionRelState::lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), Max, max_sync_workers_per_subscription, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, NIL, now(), Subscription::oid, pgstat_report_stat(), LogicalRepWorker::proc, proc_exit(), SubscriptionRelState::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), replorigin_drop_by_name(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), SubscriptionRelState::state, LogicalRepWorker::subid, table_states_not_ready, TimestampDifferenceExceeds(), Subscription::twophasestate, UpdateSubscriptionRelState(), LogicalRepWorker::userid, wait_for_relation_state_change(), wal_retrieve_retry_interval, and WORKERTYPE_TABLESYNC.

Referenced by process_syncing_tables().

◆ process_syncing_tables_for_sync()

static void process_syncing_tables_for_sync ( XLogRecPtr  current_lsn)
static

Definition at line 288 of file tablesync.c.

289 {
291 
292  if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
293  current_lsn >= MyLogicalRepWorker->relstate_lsn)
294  {
295  TimeLineID tli;
296  char syncslotname[NAMEDATALEN] = {0};
297  char originname[NAMEDATALEN] = {0};
298 
299  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
300  MyLogicalRepWorker->relstate_lsn = current_lsn;
301 
303 
304  /*
305  * UpdateSubscriptionRelState must be called within a transaction.
306  */
307  if (!IsTransactionState())
309 
314 
315  /*
316  * End streaming so that LogRepWorkerWalRcvConn can be used to drop
317  * the slot.
318  */
320 
321  /*
322  * Cleanup the tablesync slot.
323  *
324  * This has to be done after updating the state because otherwise if
325  * there is an error while doing the database operations we won't be
326  * able to rollback dropped slot.
327  */
330  syncslotname,
331  sizeof(syncslotname));
332 
333  /*
334  * It is important to give an error if we are unable to drop the slot,
335  * otherwise, it won't be dropped till the corresponding subscription
336  * is dropped. So passing missing_ok = false.
337  */
339 
341  pgstat_report_stat(false);
342 
343  /*
344  * Start a new transaction to clean up the tablesync origin tracking.
345  * This transaction will be ended within the finish_sync_worker().
346  * Now, even, if we fail to remove this here, the apply worker will
347  * ensure to clean it up afterward.
348  *
349  * We need to do this after the table state is set to SYNCDONE.
350  * Otherwise, if an error occurs while performing the database
351  * operation, the worker will be restarted and the in-memory state of
352  * replication progress (remote_lsn) won't be rolled-back which would
353  * have been cleared before restart. So, the restarted worker will use
354  * invalid replication progress state resulting in replay of
355  * transactions that have already been applied.
356  */
358 
361  originname,
362  sizeof(originname));
363 
364  /*
365  * Resetting the origin session removes the ownership of the slot.
366  * This is needed to allow the origin to be dropped.
367  */
372 
373  /*
374  * Drop the tablesync's origin tracking if exists.
375  *
376  * There is a chance that the user is concurrently performing refresh
377  * for the subscription where we remove the table state and its origin
378  * or the apply worker would have removed this origin. So passing
379  * missing_ok = true.
380  */
381  replorigin_drop_by_name(originname, true, false);
382 
383  finish_sync_worker();
384  }
385  else
387 }
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
void replorigin_session_reset(void)
Definition: origin.c:1191
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
#define InvalidRepOriginId
Definition: origin.h:33
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:450
uint32 TimeLineID
Definition: xlogdefs.h:59

References CommitTransactionCommand(), InvalidRepOriginId, InvalidXLogRecPtr, IsTransactionState(), LogRepWorkerWalRcvConn, MyLogicalRepWorker, NAMEDATALEN, pgstat_report_stat(), LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, replorigin_session_reset(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), LogicalRepWorker::subid, UpdateSubscriptionRelState(), and walrcv_endstreaming.

Referenced by process_syncing_tables().

◆ ReplicationSlotNameForTablesync()

void ReplicationSlotNameForTablesync ( Oid  suboid,
Oid  relid,
char *  syncslotname,
Size  szslot 
)

Definition at line 1260 of file tablesync.c.

1262 {
1263  snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1264  relid, GetSystemIdentifier());
1265 }
#define UINT64_FORMAT
Definition: c.h:538
#define snprintf
Definition: port.h:238
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4463

References GetSystemIdentifier(), snprintf, and UINT64_FORMAT.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), process_syncing_tables_for_sync(), and ReportSlotConnectionError().

◆ run_tablesync_worker()

static void run_tablesync_worker ( )
static

Definition at line 1668 of file tablesync.c.

1669 {
1670  char originname[NAMEDATALEN];
1671  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1672  char *slotname = NULL;
1674 
1675  start_table_sync(&origin_startpos, &slotname);
1676 
1679  originname,
1680  sizeof(originname));
1681 
1682  set_apply_error_context_origin(originname);
1683 
1684  set_stream_options(&options, slotname, &origin_startpos);
1685 
1687 
1688  /* Apply the changes till we catchup with the apply worker. */
1689  start_apply(origin_startpos);
1690 }
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:4363
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4450
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5060
static char ** options
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
Definition: tablesync.c:1626
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:448

References InvalidXLogRecPtr, LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, NAMEDATALEN, Subscription::oid, options, LogicalRepWorker::relid, ReplicationOriginNameForLogicalRep(), set_apply_error_context_origin(), set_stream_options(), start_apply(), start_table_sync(), and walrcv_startstreaming.

Referenced by TablesyncWorkerMain().

◆ start_table_sync()

static void start_table_sync ( XLogRecPtr origin_startpos,
char **  slotname 
)
static

Definition at line 1626 of file tablesync.c.

1627 {
1628  char *sync_slotname = NULL;
1629 
1631 
1632  PG_TRY();
1633  {
1634  /* Call initial sync. */
1635  sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1636  }
1637  PG_CATCH();
1638  {
1641  else
1642  {
1643  /*
1644  * Report the worker failed during table synchronization. Abort
1645  * the current transaction so that the stats message is sent in an
1646  * idle state.
1647  */
1650 
1651  PG_RE_THROW();
1652  }
1653  }
1654  PG_END_TRY();
1655 
1656  /* allocate slot name in long-lived context */
1657  *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1658  pfree(sync_slotname);
1659 }
void DisableSubscriptionAndExit(void)
Definition: worker.c:4730
MemoryContext ApplyContext
Definition: worker.c:316
#define PG_RE_THROW()
Definition: elog.h:411
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_CATCH(...)
Definition: elog.h:380
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1606
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:1276
static bool am_tablesync_worker(void)
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4728

References AbortOutOfAnyTransaction(), am_tablesync_worker(), ApplyContext, Assert(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepSyncTableStart(), MemoryContextStrdup(), MySubscription, Subscription::oid, pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, and pgstat_report_subscription_error().

Referenced by run_tablesync_worker().

◆ TablesyncWorkerMain()

void TablesyncWorkerMain ( Datum  main_arg)

Definition at line 1694 of file tablesync.c.

1695 {
1696  int worker_slot = DatumGetInt32(main_arg);
1697 
1698  SetupApplyOrSyncWorker(worker_slot);
1699 
1701 
1702  finish_sync_worker();
1703 }
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4669
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
static void run_tablesync_worker()
Definition: tablesync.c:1668

References DatumGetInt32(), run_tablesync_worker(), and SetupApplyOrSyncWorker().

◆ UpdateTwoPhaseState()

void UpdateTwoPhaseState ( Oid  suboid,
char  new_state 
)

Definition at line 1739 of file tablesync.c.

1740 {
1741  Relation rel;
1742  HeapTuple tup;
1743  bool nulls[Natts_pg_subscription];
1744  bool replaces[Natts_pg_subscription];
1745  Datum values[Natts_pg_subscription];
1746 
1748  new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1749  new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1750 
1751  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1752  tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1753  if (!HeapTupleIsValid(tup))
1754  elog(ERROR,
1755  "cache lookup failed for subscription oid %u",
1756  suboid);
1757 
1758  /* Form a new tuple. */
1759  memset(values, 0, sizeof(values));
1760  memset(nulls, false, sizeof(nulls));
1761  memset(replaces, false, sizeof(replaces));
1762 
1763  /* And update/set two_phase state */
1764  values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1765  replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1766 
1767  tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1768  values, nulls, replaces);
1769  CatalogTupleUpdate(rel, &tup->t_self, tup);
1770 
1771  heap_freetuple(tup);
1773 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_ENABLED
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Datum CharGetDatum(char X)
Definition: postgres.h:122
#define RelationGetDescr(relation)
Definition: rel.h:530
ItemPointerData t_self
Definition: htup.h:65
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:86

References Assert(), CatalogTupleUpdate(), CharGetDatum(), elog(), ERROR, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, HeapTupleData::t_self, table_close(), table_open(), and values.

Referenced by CreateSubscription(), and run_apply_worker().

◆ wait_for_relation_state_change()

static bool wait_for_relation_state_change ( Oid  relid,
char  expected_state 
)
static

Definition at line 177 of file tablesync.c.

178 {
179  char state;
180 
181  for (;;)
182  {
183  LogicalRepWorker *worker;
184  XLogRecPtr statelsn;
185 
187 
190  relid, &statelsn);
191 
192  if (state == SUBREL_STATE_UNKNOWN)
193  break;
194 
195  if (state == expected_state)
196  return true;
197 
198  /* Check if the sync worker is still running and bail if not. */
199  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
201  false);
202  LWLockRelease(LogicalRepWorkerLock);
203  if (!worker)
204  break;
205 
206  (void) WaitLatch(MyLatch,
208  1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
209 
211  }
212 
213  return false;
214 }
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:518
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:429
Definition: regguts.h:323

References CHECK_FOR_INTERRUPTS, GetSubscriptionRelState(), InvalidateCatalogSnapshot(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, ResetLatch(), LogicalRepWorker::subid, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by process_syncing_tables_for_apply().

◆ wait_for_worker_state_change()

static bool wait_for_worker_state_change ( char  expected_state)
static

Definition at line 225 of file tablesync.c.

226 {
227  int rc;
228 
229  for (;;)
230  {
231  LogicalRepWorker *worker;
232 
234 
235  /*
236  * Done if already in correct state. (We assume this fetch is atomic
237  * enough to not give a misleading answer if we do it with no lock.)
238  */
239  if (MyLogicalRepWorker->relstate == expected_state)
240  return true;
241 
242  /*
243  * Bail out if the apply worker has died, else signal it we're
244  * waiting.
245  */
246  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
248  InvalidOid, false);
249  if (worker && worker->proc)
251  LWLockRelease(LogicalRepWorkerLock);
252  if (!worker)
253  break;
254 
255  /*
256  * Wait. We expect to get a latch signal back from the apply worker,
257  * but use a timeout in case it dies without sending one.
258  */
259  rc = WaitLatch(MyLatch,
261  1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
262 
263  if (rc & WL_LATCH_SET)
265  }
266 
267  return false;
268 }

References CHECK_FOR_INTERRUPTS, InvalidOid, logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, LogicalRepWorker::proc, LogicalRepWorker::relstate, ResetLatch(), LogicalRepWorker::subid, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by LogicalRepSyncTableStart().

Variable Documentation

◆ copybuf

◆ table_states_not_ready

List* table_states_not_ready = NIL
static

◆ table_states_valid

bool table_states_valid = false
static

Definition at line 126 of file tablesync.c.

Referenced by FetchTableStates(), and invalidate_syncing_table_states().