PostgreSQL Source Code  git master
tablesync.c File Reference
#include "postgres.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "miscadmin.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "replication/slot.h"
#include "replication/origin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
Include dependency graph for tablesync.c:

Go to the source code of this file.

Functions

static bool FetchTableStates (bool *started_tx)
 
static void pg_attribute_noreturn () finish_sync_worker(void)
 
static bool wait_for_relation_state_change (Oid relid, char expected_state)
 
static bool wait_for_worker_state_change (char expected_state)
 
void invalidate_syncing_table_states (Datum arg, int cacheid, uint32 hashvalue)
 
static void process_syncing_tables_for_sync (XLogRecPtr current_lsn)
 
static void process_syncing_tables_for_apply (XLogRecPtr current_lsn)
 
void process_syncing_tables (XLogRecPtr current_lsn)
 
static Listmake_copy_attnamelist (LogicalRepRelMapEntry *rel)
 
static int copy_read_data (void *outbuf, int minread, int maxread)
 
static void fetch_remote_table_info (char *nspname, char *relname, LogicalRepRelation *lrel, List **qual)
 
static void copy_table (Relation rel)
 
void ReplicationSlotNameForTablesync (Oid suboid, Oid relid, char *syncslotname, Size szslot)
 
char * LogicalRepSyncTableStart (XLogRecPtr *origin_startpos)
 
bool AllTablesyncsReady (void)
 
void UpdateTwoPhaseState (Oid suboid, char new_state)
 

Variables

static bool table_states_valid = false
 
static Listtable_states_not_ready = NIL
 
static StringInfo copybuf = NULL
 

Function Documentation

◆ AllTablesyncsReady()

bool AllTablesyncsReady ( void  )

Definition at line 1541 of file tablesync.c.

1542 {
1543  bool started_tx = false;
1544  bool has_subrels = false;
1545 
1546  /* We need up-to-date sync state info for subscription tables here. */
1547  has_subrels = FetchTableStates(&started_tx);
1548 
1549  if (started_tx)
1550  {
1552  pgstat_report_stat(true);
1553  }
1554 
1555  /*
1556  * Return false when there are no tables in subscription or not all tables
1557  * are in ready state; true otherwise.
1558  */
1559  return has_subrels && (table_states_not_ready == NIL);
1560 }
#define NIL
Definition: pg_list.h:66
long pgstat_report_stat(bool force)
Definition: pgstat.c:565
static List * table_states_not_ready
Definition: tablesync.c:124
static bool FetchTableStates(bool *started_tx)
Definition: tablesync.c:1480
void CommitTransactionCommand(void)
Definition: xact.c:3022

References CommitTransactionCommand(), FetchTableStates(), NIL, pgstat_report_stat(), and table_states_not_ready.

Referenced by ApplyWorkerMain(), and process_syncing_tables_for_apply().

◆ copy_read_data()

static int copy_read_data ( void *  outbuf,
int  minread,
int  maxread 
)
static

Definition at line 663 of file tablesync.c.

664 {
665  int bytesread = 0;
666  int avail;
667 
668  /* If there are some leftover data from previous read, use it. */
669  avail = copybuf->len - copybuf->cursor;
670  if (avail)
671  {
672  if (avail > maxread)
673  avail = maxread;
674  memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
675  copybuf->cursor += avail;
676  maxread -= avail;
677  bytesread += avail;
678  }
679 
680  while (maxread > 0 && bytesread < minread)
681  {
683  int len;
684  char *buf = NULL;
685 
686  for (;;)
687  {
688  /* Try read the data. */
690 
692 
693  if (len == 0)
694  break;
695  else if (len < 0)
696  return bytesread;
697  else
698  {
699  /* Process the data */
700  copybuf->data = buf;
701  copybuf->len = len;
702  copybuf->cursor = 0;
703 
704  avail = copybuf->len - copybuf->cursor;
705  if (avail > maxread)
706  avail = maxread;
707  memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
708  outbuf = (void *) ((char *) outbuf + avail);
709  copybuf->cursor += avail;
710  maxread -= avail;
711  bytesread += avail;
712  }
713 
714  if (maxread <= 0 || bytesread >= minread)
715  return bytesread;
716  }
717 
718  /*
719  * Wait for more data or latch.
720  */
721  (void) WaitLatchOrSocket(MyLatch,
725 
727  }
728 
729  return bytesread;
730 }
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:251
struct Latch * MyLatch
Definition: globals.c:58
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:524
void ResetLatch(Latch *latch)
Definition: latch.c:683
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
const void size_t len
static char * buf
Definition: pg_test_fsync.c:67
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
static StringInfo copybuf
Definition: tablesync.c:127
@ WAIT_EVENT_LOGICAL_SYNC_DATA
Definition: wait_event.h:108
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:426

References buf, CHECK_FOR_INTERRUPTS, copybuf, StringInfoData::cursor, StringInfoData::data, fd(), StringInfoData::len, len, LogRepWorkerWalRcvConn, MyLatch, PGINVALID_SOCKET, ResetLatch(), WAIT_EVENT_LOGICAL_SYNC_DATA, WaitLatchOrSocket(), walrcv_receive, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by copy_table().

◆ copy_table()

static void copy_table ( Relation  rel)
static

Definition at line 1059 of file tablesync.c.

1060 {
1061  LogicalRepRelMapEntry *relmapentry;
1062  LogicalRepRelation lrel;
1063  List *qual = NIL;
1065  StringInfoData cmd;
1066  CopyFromState cstate;
1067  List *attnamelist;
1068  ParseState *pstate;
1069 
1070  /* Get the publisher relation info. */
1072  RelationGetRelationName(rel), &lrel, &qual);
1073 
1074  /* Put the relation into relmap. */
1075  logicalrep_relmap_update(&lrel);
1076 
1077  /* Map the publisher relation to local one. */
1078  relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1079  Assert(rel == relmapentry->localrel);
1080 
1081  /* Start copy on the publisher. */
1082  initStringInfo(&cmd);
1083 
1084  /* Regular table with no row filter */
1085  if (lrel.relkind == RELKIND_RELATION && qual == NIL)
1086  {
1087  appendStringInfo(&cmd, "COPY %s (",
1089 
1090  /*
1091  * XXX Do we need to list the columns in all cases? Maybe we're
1092  * replicating all columns?
1093  */
1094  for (int i = 0; i < lrel.natts; i++)
1095  {
1096  if (i > 0)
1097  appendStringInfoString(&cmd, ", ");
1098 
1100  }
1101 
1102  appendStringInfoString(&cmd, ") TO STDOUT");
1103  }
1104  else
1105  {
1106  /*
1107  * For non-tables and tables with row filters, we need to do COPY
1108  * (SELECT ...), but we can't just do SELECT * because we need to not
1109  * copy generated columns. For tables with any row filters, build a
1110  * SELECT query with OR'ed row filters for COPY.
1111  */
1112  appendStringInfoString(&cmd, "COPY (SELECT ");
1113  for (int i = 0; i < lrel.natts; i++)
1114  {
1116  if (i < lrel.natts - 1)
1117  appendStringInfoString(&cmd, ", ");
1118  }
1119 
1120  appendStringInfoString(&cmd, " FROM ");
1121 
1122  /*
1123  * For regular tables, make sure we don't copy data from a child that
1124  * inherits the named table as those will be copied separately.
1125  */
1126  if (lrel.relkind == RELKIND_RELATION)
1127  appendStringInfoString(&cmd, "ONLY ");
1128 
1130  /* list of OR'ed filters */
1131  if (qual != NIL)
1132  {
1133  ListCell *lc;
1134  char *q = strVal(linitial(qual));
1135 
1136  appendStringInfo(&cmd, " WHERE %s", q);
1137  for_each_from(lc, qual, 1)
1138  {
1139  q = strVal(lfirst(lc));
1140  appendStringInfo(&cmd, " OR %s", q);
1141  }
1142  list_free_deep(qual);
1143  }
1144 
1145  appendStringInfoString(&cmd, ") TO STDOUT");
1146  }
1147  res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1148  pfree(cmd.data);
1149  if (res->status != WALRCV_OK_COPY_OUT)
1150  ereport(ERROR,
1151  (errcode(ERRCODE_CONNECTION_FAILURE),
1152  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1153  lrel.nspname, lrel.relname, res->err)));
1155 
1156  copybuf = makeStringInfo();
1157 
1158  pstate = make_parsestate(NULL);
1159  (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1160  NULL, false, false);
1161 
1162  attnamelist = make_copy_attnamelist(relmapentry);
1163  cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
1164 
1165  /* Do the copy */
1166  (void) CopyFrom(cstate);
1167 
1168  logicalrep_rel_close(relmapentry, NoLock);
1169 }
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copyfrom.c:1333
uint64 CopyFrom(CopyFromState cstate)
Definition: copyfrom.c:632
int errcode(int sqlerrcode)
Definition: elog.c:695
int errmsg(const char *fmt,...)
Definition: elog.c:906
#define ERROR
Definition: elog.h:35
#define ereport(elevel,...)
Definition: elog.h:145
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
void list_free_deep(List *list)
Definition: list.c:1559
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3331
void pfree(void *pointer)
Definition: mcxt.c:1306
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:43
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
#define lfirst(lc)
Definition: pg_list.h:170
#define for_each_from(cell, lst, N)
Definition: pg_list.h:412
#define linitial(l)
Definition: pg_list.h:176
#define RelationGetRelationName(relation)
Definition: rel.h:535
#define RelationGetNamespace(relation)
Definition: rel.h:542
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:11529
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:11613
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:157
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:319
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:456
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: pg_list.h:52
LogicalRepRelId remoteid
Definition: logicalproto.h:102
static int copy_read_data(void *outbuf, int minread, int maxread)
Definition: tablesync.c:663
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual)
Definition: tablesync.c:739
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
Definition: tablesync.c:643
#define strVal(v)
Definition: value.h:82
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:209
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:440
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:434

References AccessShareLock, addRangeTableEntryForRelation(), appendStringInfo(), appendStringInfoString(), Assert(), LogicalRepRelation::attnames, BeginCopyFrom(), copy_read_data(), copybuf, CopyFrom(), StringInfoData::data, ereport, errcode(), errmsg(), ERROR, fetch_remote_table_info(), for_each_from, get_namespace_name(), i, initStringInfo(), lfirst, linitial, list_free_deep(), LogicalRepRelMapEntry::localrel, logicalrep_rel_close(), logicalrep_rel_open(), logicalrep_relmap_update(), LogRepWorkerWalRcvConn, make_copy_attnamelist(), make_parsestate(), makeStringInfo(), LogicalRepRelation::natts, NIL, NoLock, LogicalRepRelation::nspname, pfree(), quote_identifier(), quote_qualified_identifier(), RelationGetNamespace, RelationGetRelationName, LogicalRepRelation::relkind, LogicalRepRelation::relname, LogicalRepRelation::remoteid, res, strVal, walrcv_clear_result(), walrcv_exec, and WALRCV_OK_COPY_OUT.

Referenced by LogicalRepSyncTableStart().

◆ fetch_remote_table_info()

static void fetch_remote_table_info ( char *  nspname,
char *  relname,
LogicalRepRelation lrel,
List **  qual 
)
static

Definition at line 739 of file tablesync.c.

741 {
743  StringInfoData cmd;
744  TupleTableSlot *slot;
745  Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
746  Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
747  Oid qualRow[] = {TEXTOID};
748  bool isnull;
749  int natt;
750  ListCell *lc;
751  Bitmapset *included_cols = NULL;
752 
753  lrel->nspname = nspname;
754  lrel->relname = relname;
755 
756  /* First fetch Oid and replica identity. */
757  initStringInfo(&cmd);
758  appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
759  " FROM pg_catalog.pg_class c"
760  " INNER JOIN pg_catalog.pg_namespace n"
761  " ON (c.relnamespace = n.oid)"
762  " WHERE n.nspname = %s"
763  " AND c.relname = %s",
764  quote_literal_cstr(nspname),
767  lengthof(tableRow), tableRow);
768 
769  if (res->status != WALRCV_OK_TUPLES)
770  ereport(ERROR,
771  (errcode(ERRCODE_CONNECTION_FAILURE),
772  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
773  nspname, relname, res->err)));
774 
775  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
776  if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
777  ereport(ERROR,
778  (errcode(ERRCODE_UNDEFINED_OBJECT),
779  errmsg("table \"%s.%s\" not found on publisher",
780  nspname, relname)));
781 
782  lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
783  Assert(!isnull);
784  lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
785  Assert(!isnull);
786  lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
787  Assert(!isnull);
788 
791 
792 
793  /*
794  * Get column lists for each relation.
795  *
796  * We need to do this before fetching info about column names and types,
797  * so that we can skip columns that should not be replicated.
798  */
800  {
801  WalRcvExecResult *pubres;
802  TupleTableSlot *tslot;
803  Oid attrsRow[] = {INT2VECTOROID};
804  StringInfoData pub_names;
805  initStringInfo(&pub_names);
806  foreach(lc, MySubscription->publications)
807  {
808  if (foreach_current_index(lc) > 0)
809  appendStringInfoString(&pub_names, ", ");
811  }
812 
813  /*
814  * Fetch info about column lists for the relation (from all the
815  * publications).
816  */
817  resetStringInfo(&cmd);
818  appendStringInfo(&cmd,
819  "SELECT DISTINCT"
820  " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
821  " THEN NULL ELSE gpt.attrs END)"
822  " FROM pg_publication p,"
823  " LATERAL pg_get_publication_tables(p.pubname) gpt,"
824  " pg_class c"
825  " WHERE gpt.relid = %u AND c.oid = gpt.relid"
826  " AND p.pubname IN ( %s )",
827  lrel->remoteid,
828  pub_names.data);
829 
831  lengthof(attrsRow), attrsRow);
832 
833  if (pubres->status != WALRCV_OK_TUPLES)
834  ereport(ERROR,
835  (errcode(ERRCODE_CONNECTION_FAILURE),
836  errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
837  nspname, relname, pubres->err)));
838 
839  /*
840  * We don't support the case where the column list is different for
841  * the same table when combining publications. See comments atop
842  * fetch_table_list. So there should be only one row returned.
843  * Although we already checked this when creating the subscription, we
844  * still need to check here in case the column list was changed after
845  * creating the subscription and before the sync worker is started.
846  */
847  if (tuplestore_tuple_count(pubres->tuplestore) > 1)
848  ereport(ERROR,
849  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
850  errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
851  nspname, relname));
852 
853  /*
854  * Get the column list and build a single bitmap with the attnums.
855  *
856  * If we find a NULL value, it means all the columns should be
857  * replicated.
858  */
860  if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
861  {
862  Datum cfval = slot_getattr(tslot, 1, &isnull);
863 
864  if (!isnull)
865  {
866  ArrayType *arr;
867  int nelems;
868  int16 *elems;
869 
870  arr = DatumGetArrayTypeP(cfval);
871  nelems = ARR_DIMS(arr)[0];
872  elems = (int16 *) ARR_DATA_PTR(arr);
873 
874  for (natt = 0; natt < nelems; natt++)
875  included_cols = bms_add_member(included_cols, elems[natt]);
876  }
877 
878  ExecClearTuple(tslot);
879  }
881 
882  walrcv_clear_result(pubres);
883 
884  pfree(pub_names.data);
885  }
886 
887  /*
888  * Now fetch column names and types.
889  */
890  resetStringInfo(&cmd);
891  appendStringInfo(&cmd,
892  "SELECT a.attnum,"
893  " a.attname,"
894  " a.atttypid,"
895  " a.attnum = ANY(i.indkey)"
896  " FROM pg_catalog.pg_attribute a"
897  " LEFT JOIN pg_catalog.pg_index i"
898  " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
899  " WHERE a.attnum > 0::pg_catalog.int2"
900  " AND NOT a.attisdropped %s"
901  " AND a.attrelid = %u"
902  " ORDER BY a.attnum",
903  lrel->remoteid,
905  "AND a.attgenerated = ''" : ""),
906  lrel->remoteid);
908  lengthof(attrRow), attrRow);
909 
910  if (res->status != WALRCV_OK_TUPLES)
911  ereport(ERROR,
912  (errcode(ERRCODE_CONNECTION_FAILURE),
913  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
914  nspname, relname, res->err)));
915 
916  /* We don't know the number of rows coming, so allocate enough space. */
917  lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
918  lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
919  lrel->attkeys = NULL;
920 
921  /*
922  * Store the columns as a list of names. Ignore those that are not
923  * present in the column list, if there is one.
924  */
925  natt = 0;
926  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
927  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
928  {
929  char *rel_colname;
931 
932  attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
933  Assert(!isnull);
934 
935  /* If the column is not in the column list, skip it. */
936  if (included_cols != NULL && !bms_is_member(attnum, included_cols))
937  {
938  ExecClearTuple(slot);
939  continue;
940  }
941 
942  rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
943  Assert(!isnull);
944 
945  lrel->attnames[natt] = rel_colname;
946  lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
947  Assert(!isnull);
948 
949  if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
950  lrel->attkeys = bms_add_member(lrel->attkeys, natt);
951 
952  /* Should never happen. */
953  if (++natt >= MaxTupleAttributeNumber)
954  elog(ERROR, "too many columns in remote table \"%s.%s\"",
955  nspname, relname);
956 
957  ExecClearTuple(slot);
958  }
960 
961  lrel->natts = natt;
962 
964 
965  /*
966  * Get relation's row filter expressions. DISTINCT avoids the same
967  * expression of a table in multiple publications from being included
968  * multiple times in the final expression.
969  *
970  * We need to copy the row even if it matches just one of the
971  * publications, so we later combine all the quals with OR.
972  *
973  * For initial synchronization, row filtering can be ignored in following
974  * cases:
975  *
976  * 1) one of the subscribed publications for the table hasn't specified
977  * any row filter
978  *
979  * 2) one of the subscribed publications has puballtables set to true
980  *
981  * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
982  * that includes this relation
983  */
985  {
986  StringInfoData pub_names;
987 
988  /* Build the pubname list. */
989  initStringInfo(&pub_names);
990  foreach(lc, MySubscription->publications)
991  {
992  char *pubname = strVal(lfirst(lc));
993 
994  if (foreach_current_index(lc) > 0)
995  appendStringInfoString(&pub_names, ", ");
996 
997  appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
998  }
999 
1000  /* Check for row filters. */
1001  resetStringInfo(&cmd);
1002  appendStringInfo(&cmd,
1003  "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1004  " FROM pg_publication p,"
1005  " LATERAL pg_get_publication_tables(p.pubname) gpt"
1006  " WHERE gpt.relid = %u"
1007  " AND p.pubname IN ( %s )",
1008  lrel->remoteid,
1009  pub_names.data);
1010 
1011  res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1012 
1013  if (res->status != WALRCV_OK_TUPLES)
1014  ereport(ERROR,
1015  (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1016  nspname, relname, res->err)));
1017 
1018  /*
1019  * Multiple row filter expressions for the same table will be combined
1020  * by COPY using OR. If any of the filter expressions for this table
1021  * are null, it means the whole table will be copied. In this case it
1022  * is not necessary to construct a unified row filter expression at
1023  * all.
1024  */
1025  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1026  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1027  {
1028  Datum rf = slot_getattr(slot, 1, &isnull);
1029 
1030  if (!isnull)
1031  *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1032  else
1033  {
1034  /* Ignore filters and cleanup as necessary. */
1035  if (*qual)
1036  {
1037  list_free_deep(*qual);
1038  *qual = NIL;
1039  }
1040  break;
1041  }
1042 
1043  ExecClearTuple(slot);
1044  }
1046 
1048  }
1049 
1050  pfree(cmd.data);
1051 }
#define ARR_DATA_PTR(a)
Definition: array.h:315
#define DatumGetArrayTypeP(X)
Definition: array.h:254
#define ARR_DIMS(a)
Definition: array.h:287
int16 AttrNumber
Definition: attnum.h:21
Subscription * MySubscription
Definition: worker.c:253
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:428
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:739
#define TextDatumGetCString(d)
Definition: builtins.h:86
signed short int16
Definition: c.h:429
#define lengthof(array)
Definition: c.h:724
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1254
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1238
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
List * lappend(List *list, void *datum)
Definition: list.c:338
void * palloc0(Size size)
Definition: mcxt.c:1230
int16 attnum
Definition: pg_attribute.h:83
NameData relname
Definition: pg_class.h:38
#define foreach_current_index(cell)
Definition: pg_list.h:401
static bool DatumGetBool(Datum X)
Definition: postgres.h:438
uintptr_t Datum
Definition: postgres.h:412
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:590
static char DatumGetChar(Datum X)
Definition: postgres.h:460
static int16 DatumGetInt16(Datum X)
Definition: postgres.h:510
unsigned int Oid
Definition: postgres_ext.h:31
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
Bitmapset * attkeys
Definition: logicalproto.h:110
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
WalRcvExecStatus status
Definition: walreceiver.h:220
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
int64 tuplestore_tuple_count(Tuplestorestate *state)
Definition: tuplestore.c:546
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:433
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:389
String * makeString(char *str)
Definition: value.c:63
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
#define walrcv_server_version(conn)
Definition: walreceiver.h:418

References appendStringInfo(), appendStringInfoString(), ARR_DATA_PTR, ARR_DIMS, Assert(), LogicalRepRelation::attkeys, LogicalRepRelation::attnames, attnum, LogicalRepRelation::atttyps, bms_add_member(), bms_is_member(), StringInfoData::data, DatumGetArrayTypeP, DatumGetBool(), DatumGetChar(), DatumGetInt16(), DatumGetObjectId(), elog(), ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), foreach_current_index, initStringInfo(), lappend(), lengthof, lfirst, list_free_deep(), LogRepWorkerWalRcvConn, MakeSingleTupleTableSlot(), makeString(), MaxTupleAttributeNumber, MySubscription, LogicalRepRelation::natts, NIL, LogicalRepRelation::nspname, palloc0(), pfree(), Subscription::publications, quote_literal_cstr(), LogicalRepRelation::relkind, relname, LogicalRepRelation::relname, LogicalRepRelation::remoteid, LogicalRepRelation::replident, res, resetStringInfo(), slot_getattr(), WalRcvExecResult::status, strVal, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), tuplestore_tuple_count(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, and walrcv_server_version.

Referenced by copy_table().

◆ FetchTableStates()

static bool FetchTableStates ( bool started_tx)
static

Definition at line 1480 of file tablesync.c.

1481 {
1482  static bool has_subrels = false;
1483 
1484  *started_tx = false;
1485 
1486  if (!table_states_valid)
1487  {
1488  MemoryContext oldctx;
1489  List *rstates;
1490  ListCell *lc;
1491  SubscriptionRelState *rstate;
1492 
1493  /* Clean the old lists. */
1496 
1497  if (!IsTransactionState())
1498  {
1500  *started_tx = true;
1501  }
1502 
1503  /* Fetch all non-ready tables. */
1504  rstates = GetSubscriptionRelations(MySubscription->oid, true);
1505 
1506  /* Allocate the tracking info in a permanent memory context. */
1508  foreach(lc, rstates)
1509  {
1510  rstate = palloc(sizeof(SubscriptionRelState));
1511  memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1513  }
1514  MemoryContextSwitchTo(oldctx);
1515 
1516  /*
1517  * Does the subscription have tables?
1518  *
1519  * If there were not-READY relations found then we know it does. But
1520  * if table_state_not_ready was empty we still need to check again to
1521  * see if there are 0 tables.
1522  */
1523  has_subrels = (table_states_not_ready != NIL) ||
1525 
1526  table_states_valid = true;
1527  }
1528 
1529  return has_subrels;
1530 }
MemoryContext CacheMemoryContext
Definition: mcxt.c:133
void * palloc(Size size)
Definition: mcxt.c:1199
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:135
List * GetSubscriptionRelations(Oid subid, bool not_ready)
bool HasSubscriptionRelations(Oid subid)
static bool table_states_valid
Definition: tablesync.c:123
bool IsTransactionState(void)
Definition: xact.c:377
void StartTransactionCommand(void)
Definition: xact.c:2925

References CacheMemoryContext, GetSubscriptionRelations(), HasSubscriptionRelations(), IsTransactionState(), lappend(), lfirst, list_free_deep(), MemoryContextSwitchTo(), MySubscription, NIL, Subscription::oid, palloc(), StartTransactionCommand(), table_states_not_ready, and table_states_valid.

Referenced by AllTablesyncsReady(), and process_syncing_tables_for_apply().

◆ invalidate_syncing_table_states()

void invalidate_syncing_table_states ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)

Definition at line 271 of file tablesync.c.

272 {
273  table_states_valid = false;
274 }

References table_states_valid.

Referenced by ApplyWorkerMain().

◆ LogicalRepSyncTableStart()

char* LogicalRepSyncTableStart ( XLogRecPtr origin_startpos)

Definition at line 1205 of file tablesync.c.

1206 {
1207  char *slotname;
1208  char *err;
1209  char relstate;
1210  XLogRecPtr relstate_lsn;
1211  Relation rel;
1212  AclResult aclresult;
1214  char originname[NAMEDATALEN];
1215  RepOriginId originid;
1216 
1217  /* Check the state of the table synchronization. */
1221  &relstate_lsn);
1223 
1225  MyLogicalRepWorker->relstate = relstate;
1226  MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1228 
1229  /*
1230  * If synchronization is already done or no longer necessary, exit now
1231  * that we've updated shared memory state.
1232  */
1233  switch (relstate)
1234  {
1235  case SUBREL_STATE_SYNCDONE:
1236  case SUBREL_STATE_READY:
1237  case SUBREL_STATE_UNKNOWN:
1238  finish_sync_worker(); /* doesn't return */
1239  }
1240 
1241  /* Calculate the name of the tablesync slot. */
1242  slotname = (char *) palloc(NAMEDATALEN);
1245  slotname,
1246  NAMEDATALEN);
1247 
1248  /*
1249  * Here we use the slot name instead of the subscription name as the
1250  * application_name, so that it is different from the main apply worker,
1251  * so that synchronous replication can distinguish them.
1252  */
1254  walrcv_connect(MySubscription->conninfo, true, slotname, &err);
1255  if (LogRepWorkerWalRcvConn == NULL)
1256  ereport(ERROR,
1257  (errcode(ERRCODE_CONNECTION_FAILURE),
1258  errmsg("could not connect to the publisher: %s", err)));
1259 
1260  Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1261  MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1262  MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1263 
1264  /* Assign the origin tracking record name. */
1267  originname,
1268  sizeof(originname));
1269 
1270  if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1271  {
1272  /*
1273  * We have previously errored out before finishing the copy so the
1274  * replication slot might exist. We want to remove the slot if it
1275  * already exists and proceed.
1276  *
1277  * XXX We could also instead try to drop the slot, last time we failed
1278  * but for that, we might need to clean up the copy state as it might
1279  * be in the middle of fetching the rows. Also, if there is a network
1280  * breakdown then it wouldn't have succeeded so trying it next time
1281  * seems like a better bet.
1282  */
1284  }
1285  else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1286  {
1287  /*
1288  * The COPY phase was previously done, but tablesync then crashed
1289  * before it was able to finish normally.
1290  */
1292 
1293  /*
1294  * The origin tracking name must already exist. It was created first
1295  * time this tablesync was launched.
1296  */
1297  originid = replorigin_by_name(originname, false);
1298  replorigin_session_setup(originid);
1299  replorigin_session_origin = originid;
1300  *origin_startpos = replorigin_session_get_progress(false);
1301 
1303 
1304  goto copy_table_done;
1305  }
1306 
1308  MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1311 
1312  /* Update the state and make it visible to others. */
1319  pgstat_report_stat(true);
1320 
1322 
1323  /*
1324  * Use a standard write lock here. It might be better to disallow access
1325  * to the table while it's being synchronized. But we don't want to block
1326  * the main apply process from working and it has to open the relation in
1327  * RowExclusiveLock when remapping remote relation id to local one.
1328  */
1330 
1331  /*
1332  * Check that our table sync worker has permission to insert into the
1333  * target table.
1334  */
1335  aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1336  ACL_INSERT);
1337  if (aclresult != ACLCHECK_OK)
1338  aclcheck_error(aclresult,
1339  get_relkind_objtype(rel->rd_rel->relkind),
1341 
1342  /*
1343  * COPY FROM does not honor RLS policies. That is not a problem for
1344  * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1345  * who has it implicitly), but other roles should not be able to
1346  * circumvent RLS. Disallow logical replication into RLS enabled
1347  * relations for such roles.
1348  */
1350  ereport(ERROR,
1351  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1352  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1353  GetUserNameFromId(GetUserId(), true),
1354  RelationGetRelationName(rel))));
1355 
1356  /*
1357  * Start a transaction in the remote node in REPEATABLE READ mode. This
1358  * ensures that both the replication slot we create (see below) and the
1359  * COPY are consistent with each other.
1360  */
1362  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1363  0, NULL);
1364  if (res->status != WALRCV_OK_COMMAND)
1365  ereport(ERROR,
1366  (errcode(ERRCODE_CONNECTION_FAILURE),
1367  errmsg("table copy could not start transaction on publisher: %s",
1368  res->err)));
1370 
1371  /*
1372  * Create a new permanent logical decoding slot. This slot will be used
1373  * for the catchup phase after COPY is done, so tell it to use the
1374  * snapshot to make the final data consistent.
1375  *
1376  * Prevent cancel/die interrupts while creating slot here because it is
1377  * possible that before the server finishes this command, a concurrent
1378  * drop subscription happens which would complete without removing this
1379  * slot leading to a dangling slot on the server.
1380  */
1381  HOLD_INTERRUPTS();
1383  slotname, false /* permanent */ , false /* two_phase */ ,
1384  CRS_USE_SNAPSHOT, origin_startpos);
1386 
1387  /*
1388  * Setup replication origin tracking. The purpose of doing this before the
1389  * copy is to avoid doing the copy again due to any error in setting up
1390  * origin tracking.
1391  */
1392  originid = replorigin_by_name(originname, true);
1393  if (!OidIsValid(originid))
1394  {
1395  /*
1396  * Origin tracking does not exist, so create it now.
1397  *
1398  * Then advance to the LSN got from walrcv_create_slot. This is WAL
1399  * logged for the purpose of recovery. Locks are to prevent the
1400  * replication origin from vanishing while advancing.
1401  */
1402  originid = replorigin_create(originname);
1403 
1404  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1405  replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1406  true /* go backward */ , true /* WAL log */ );
1407  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1408 
1409  replorigin_session_setup(originid);
1410  replorigin_session_origin = originid;
1411  }
1412  else
1413  {
1414  ereport(ERROR,
1416  errmsg("replication origin \"%s\" already exists",
1417  originname)));
1418  }
1419 
1420  /* Now do the initial data copy */
1422  copy_table(rel);
1424 
1425  res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1426  if (res->status != WALRCV_OK_COMMAND)
1427  ereport(ERROR,
1428  (errcode(ERRCODE_CONNECTION_FAILURE),
1429  errmsg("table copy could not finish transaction on publisher: %s",
1430  res->err)));
1432 
1433  table_close(rel, NoLock);
1434 
1435  /* Make the copy visible. */
1437 
1438  /*
1439  * Update the persisted state to indicate the COPY phase is done; make it
1440  * visible to others.
1441  */
1444  SUBREL_STATE_FINISHEDCOPY,
1446 
1448 
1449 copy_table_done:
1450 
1451  elog(DEBUG1,
1452  "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1453  originname, LSN_FORMAT_ARGS(*origin_startpos));
1454 
1455  /*
1456  * We are done with the initial data synchronization, update the state.
1457  */
1459  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1460  MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1462 
1463  /*
1464  * Finally, wait until the main apply worker tells us to catch up and then
1465  * return to let LogicalRepApplyLoop do it.
1466  */
1467  wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1468  return slotname;
1469 }
AclResult
Definition: acl.h:183
@ ACLCHECK_OK
Definition: acl.h:184
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3485
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4746
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:376
#define OidIsValid(objectId)
Definition: c.h:711
#define DEBUG1
Definition: elog.h:26
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:59
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:228
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:109
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:134
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:132
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:967
Oid GetUserId(void)
Definition: miscinit.c:497
ObjectType get_relkind_objtype(char relkind)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:252
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1083
RepOriginId replorigin_session_origin
Definition: origin.c:156
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:884
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1218
#define ACL_INSERT
Definition: parsenodes.h:83
#define NAMEDATALEN
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationGetRelid(relation)
Definition: rel.h:501
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:251
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:683
void PopActiveSnapshot(void)
Definition: snapmgr.c:778
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
XLogRecPtr relstate_lsn
Form_pg_class rd_rel
Definition: rel.h:110
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:222
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1189
static void copy_table(Relation rel)
Definition: tablesync.c:1059
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
Definition: walreceiver.h:430
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:408
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
void CommandCounterIncrement(void)
Definition: xact.c:1077
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References ACL_INSERT, aclcheck_error(), ACLCHECK_OK, Assert(), check_enable_rls(), CommandCounterIncrement(), CommitTransactionCommand(), Subscription::conninfo, copy_table(), CRS_USE_SNAPSHOT, DEBUG1, elog(), ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, get_relkind_objtype(), GetSubscriptionRelState(), GetTransactionSnapshot(), GetUserId(), GetUserNameFromId(), HOLD_INTERRUPTS, InvalidOid, InvalidXLogRecPtr, LockRelationOid(), LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, MyLogicalRepWorker, MySubscription, NAMEDATALEN, NoLock, Subscription::oid, OidIsValid, palloc(), pg_class_aclcheck(), pgstat_report_stat(), PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_advance(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), res, RESUME_INTERRUPTS, RLS_ENABLED, RowExclusiveLock, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), LogicalRepWorker::subid, table_close(), table_open(), UnlockRelationOid(), UpdateSubscriptionRelState(), wait_for_worker_state_change(), walrcv_clear_result(), walrcv_connect, walrcv_create_slot, walrcv_exec, and WALRCV_OK_COMMAND.

Referenced by start_table_sync().

◆ make_copy_attnamelist()

static List* make_copy_attnamelist ( LogicalRepRelMapEntry rel)
static

Definition at line 643 of file tablesync.c.

644 {
645  List *attnamelist = NIL;
646  int i;
647 
648  for (i = 0; i < rel->remoterel.natts; i++)
649  {
650  attnamelist = lappend(attnamelist,
651  makeString(rel->remoterel.attnames[i]));
652  }
653 
654 
655  return attnamelist;
656 }
LogicalRepRelation remoterel

References LogicalRepRelation::attnames, i, lappend(), makeString(), LogicalRepRelation::natts, NIL, and LogicalRepRelMapEntry::remoterel.

Referenced by copy_table().

◆ pg_attribute_noreturn()

static void pg_attribute_noreturn ( )
static

Definition at line 133 of file tablesync.c.

135 {
136  /*
137  * Commit any outstanding transaction. This is the usual case, unless
138  * there was nothing to do for the table.
139  */
140  if (IsTransactionState())
141  {
143  pgstat_report_stat(true);
144  }
145 
146  /* And flush all writes. */
148 
150  ereport(LOG,
151  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
155 
156  /* Find the main apply worker and signal it. */
158 
159  /* Stop gracefully */
160  proc_exit(0);
161 }
#define LOG
Definition: elog.h:27
void proc_exit(int code)
Definition: ipc.c:104
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:534
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1910
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:8874
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2512

References CommitTransactionCommand(), ereport, errmsg(), get_rel_name(), GetXLogWriteRecPtr(), InvalidOid, IsTransactionState(), LOG, logicalrep_worker_wakeup(), MyLogicalRepWorker, MySubscription, Subscription::name, pgstat_report_stat(), proc_exit(), LogicalRepWorker::relid, StartTransactionCommand(), LogicalRepWorker::subid, and XLogFlush().

◆ process_syncing_tables()

void process_syncing_tables ( XLogRecPtr  current_lsn)

Definition at line 631 of file tablesync.c.

632 {
633  if (am_tablesync_worker())
634  process_syncing_tables_for_sync(current_lsn);
635  else
637 }
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:408
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:285
static bool am_tablesync_worker(void)

References am_tablesync_worker(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

Referenced by apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_commit(), apply_handle_stream_prepare(), and LogicalRepApplyLoop().

◆ process_syncing_tables_for_apply()

static void process_syncing_tables_for_apply ( XLogRecPtr  current_lsn)
static

Definition at line 408 of file tablesync.c.

409 {
410  struct tablesync_start_time_mapping
411  {
412  Oid relid;
413  TimestampTz last_start_time;
414  };
415  static HTAB *last_start_times = NULL;
416  ListCell *lc;
417  bool started_tx = false;
418 
420 
421  /* We need up-to-date sync state info for subscription tables here. */
422  FetchTableStates(&started_tx);
423 
424  /*
425  * Prepare a hash table for tracking last start times of workers, to avoid
426  * immediate restarts. We don't need it if there are no tables that need
427  * syncing.
428  */
429  if (table_states_not_ready != NIL && !last_start_times)
430  {
431  HASHCTL ctl;
432 
433  ctl.keysize = sizeof(Oid);
434  ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
435  last_start_times = hash_create("Logical replication table sync worker start times",
436  256, &ctl, HASH_ELEM | HASH_BLOBS);
437  }
438 
439  /*
440  * Clean up the hash table when we're done with all tables (just to
441  * release the bit of memory).
442  */
443  else if (table_states_not_ready == NIL && last_start_times)
444  {
445  hash_destroy(last_start_times);
446  last_start_times = NULL;
447  }
448 
449  /*
450  * Even when the two_phase mode is requested by the user, it remains as
451  * 'pending' until all tablesyncs have reached READY state.
452  *
453  * When this happens, we restart the apply worker and (if the conditions
454  * are still ok) then the two_phase tri-state will become 'enabled' at
455  * that time.
456  *
457  * Note: If the subscription has no tables then leave the state as
458  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
459  * work.
460  */
463  {
464  ereport(LOG,
465  (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
466  MySubscription->name)));
467 
468  proc_exit(0);
469  }
470 
471  /*
472  * Process all tables that are being synchronized.
473  */
474  foreach(lc, table_states_not_ready)
475  {
477 
478  if (rstate->state == SUBREL_STATE_SYNCDONE)
479  {
480  /*
481  * Apply has caught up to the position where the table sync has
482  * finished. Mark the table as ready so that the apply will just
483  * continue to replicate it normally.
484  */
485  if (current_lsn >= rstate->lsn)
486  {
487  char originname[NAMEDATALEN];
488 
489  rstate->state = SUBREL_STATE_READY;
490  rstate->lsn = current_lsn;
491  if (!started_tx)
492  {
494  started_tx = true;
495  }
496 
497  /*
498  * Remove the tablesync origin tracking if exists.
499  *
500  * There is a chance that the user is concurrently performing
501  * refresh for the subscription where we remove the table
502  * state and its origin or the tablesync worker would have
503  * already removed this origin. We can't rely on tablesync
504  * worker to remove the origin tracking as if there is any
505  * error while dropping we won't restart it to drop the
506  * origin. So passing missing_ok = true.
507  */
509  rstate->relid,
510  originname,
511  sizeof(originname));
512  replorigin_drop_by_name(originname, true, false);
513 
514  /*
515  * Update the state to READY only after the origin cleanup.
516  */
518  rstate->relid, rstate->state,
519  rstate->lsn);
520  }
521  }
522  else
523  {
524  LogicalRepWorker *syncworker;
525 
526  /*
527  * Look for a sync worker for this relation.
528  */
529  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
530 
532  rstate->relid, false);
533 
534  if (syncworker)
535  {
536  /* Found one, update our copy of its state */
537  SpinLockAcquire(&syncworker->relmutex);
538  rstate->state = syncworker->relstate;
539  rstate->lsn = syncworker->relstate_lsn;
540  if (rstate->state == SUBREL_STATE_SYNCWAIT)
541  {
542  /*
543  * Sync worker is waiting for apply. Tell sync worker it
544  * can catchup now.
545  */
546  syncworker->relstate = SUBREL_STATE_CATCHUP;
547  syncworker->relstate_lsn =
548  Max(syncworker->relstate_lsn, current_lsn);
549  }
550  SpinLockRelease(&syncworker->relmutex);
551 
552  /* If we told worker to catch up, wait for it. */
553  if (rstate->state == SUBREL_STATE_SYNCWAIT)
554  {
555  /* Signal the sync worker, as it may be waiting for us. */
556  if (syncworker->proc)
557  logicalrep_worker_wakeup_ptr(syncworker);
558 
559  /* Now safe to release the LWLock */
560  LWLockRelease(LogicalRepWorkerLock);
561 
562  /*
563  * Enter busy loop and wait for synchronization worker to
564  * reach expected state (or die trying).
565  */
566  if (!started_tx)
567  {
569  started_tx = true;
570  }
571 
573  SUBREL_STATE_SYNCDONE);
574  }
575  else
576  LWLockRelease(LogicalRepWorkerLock);
577  }
578  else
579  {
580  /*
581  * If there is no sync worker for this table yet, count
582  * running sync workers for this subscription, while we have
583  * the lock.
584  */
585  int nsyncworkers =
587 
588  /* Now safe to release the LWLock */
589  LWLockRelease(LogicalRepWorkerLock);
590 
591  /*
592  * If there are free sync worker slot(s), start a new sync
593  * worker for the table.
594  */
595  if (nsyncworkers < max_sync_workers_per_subscription)
596  {
598  struct tablesync_start_time_mapping *hentry;
599  bool found;
600 
601  hentry = hash_search(last_start_times, &rstate->relid,
602  HASH_ENTER, &found);
603 
604  if (!found ||
605  TimestampDifferenceExceeds(hentry->last_start_time, now,
607  {
612  rstate->relid);
613  hentry->last_start_time = now;
614  }
615  }
616  }
617  }
618  }
619 
620  if (started_tx)
621  {
623  pgstat_report_stat(true);
624  }
625 }
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1719
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1573
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1537
#define Max(x, y)
Definition: c.h:931
int64 TimestampTz
Definition: timestamp.h:39
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:863
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:215
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:554
void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid)
Definition: launcher.c:266
int max_sync_workers_per_subscription
Definition: launcher.c:57
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:664
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1194
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1802
@ LW_SHARED
Definition: lwlock.h:113
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:426
#define LOGICALREP_TWOPHASE_STATE_PENDING
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
Definition: dynahash.c:220
bool AllTablesyncsReady(void)
Definition: tablesync.c:1541
static bool wait_for_relation_state_change(Oid relid, char expected_state)
Definition: tablesync.c:174
int wal_retrieve_retry_interval
Definition: xlog.c:137

References AllTablesyncsReady(), Assert(), CommitTransactionCommand(), LogicalRepWorker::dbid, HASHCTL::entrysize, ereport, errmsg(), FetchTableStates(), GetCurrentTimestamp(), HASH_BLOBS, hash_create(), hash_destroy(), HASH_ELEM, HASH_ENTER, hash_search(), IsTransactionState(), HASHCTL::keysize, lfirst, LOG, logicalrep_sync_worker_count(), LOGICALREP_TWOPHASE_STATE_PENDING, logicalrep_worker_find(), logicalrep_worker_launch(), logicalrep_worker_wakeup_ptr(), SubscriptionRelState::lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), Max, max_sync_workers_per_subscription, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, NIL, now(), Subscription::oid, pgstat_report_stat(), LogicalRepWorker::proc, proc_exit(), SubscriptionRelState::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), replorigin_drop_by_name(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), SubscriptionRelState::state, LogicalRepWorker::subid, table_states_not_ready, TimestampDifferenceExceeds(), Subscription::twophasestate, UpdateSubscriptionRelState(), LogicalRepWorker::userid, wait_for_relation_state_change(), and wal_retrieve_retry_interval.

Referenced by process_syncing_tables().

◆ process_syncing_tables_for_sync()

static void process_syncing_tables_for_sync ( XLogRecPtr  current_lsn)
static

Definition at line 285 of file tablesync.c.

286 {
288 
289  if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
290  current_lsn >= MyLogicalRepWorker->relstate_lsn)
291  {
292  TimeLineID tli;
293  char syncslotname[NAMEDATALEN] = {0};
294  char originname[NAMEDATALEN] = {0};
295 
296  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
297  MyLogicalRepWorker->relstate_lsn = current_lsn;
298 
300 
301  /*
302  * UpdateSubscriptionRelState must be called within a transaction.
303  */
304  if (!IsTransactionState())
306 
311 
312  /*
313  * End streaming so that LogRepWorkerWalRcvConn can be used to drop
314  * the slot.
315  */
317 
318  /*
319  * Cleanup the tablesync slot.
320  *
321  * This has to be done after updating the state because otherwise if
322  * there is an error while doing the database operations we won't be
323  * able to rollback dropped slot.
324  */
327  syncslotname,
328  sizeof(syncslotname));
329 
330  /*
331  * It is important to give an error if we are unable to drop the slot,
332  * otherwise, it won't be dropped till the corresponding subscription
333  * is dropped. So passing missing_ok = false.
334  */
336 
338  pgstat_report_stat(false);
339 
340  /*
341  * Start a new transaction to clean up the tablesync origin tracking.
342  * This transaction will be ended within the finish_sync_worker().
343  * Now, even, if we fail to remove this here, the apply worker will
344  * ensure to clean it up afterward.
345  *
346  * We need to do this after the table state is set to SYNCDONE.
347  * Otherwise, if an error occurs while performing the database
348  * operation, the worker will be restarted and the in-memory state of
349  * replication progress (remote_lsn) won't be rolled-back which would
350  * have been cleared before restart. So, the restarted worker will use
351  * invalid replication progress state resulting in replay of
352  * transactions that have already been applied.
353  */
355 
358  originname,
359  sizeof(originname));
360 
361  /*
362  * Resetting the origin session removes the ownership of the slot.
363  * This is needed to allow the origin to be dropped.
364  */
369 
370  /*
371  * Drop the tablesync's origin tracking if exists.
372  *
373  * There is a chance that the user is concurrently performing refresh
374  * for the subscription where we remove the table state and its origin
375  * or the apply worker would have removed this origin. So passing
376  * missing_ok = true.
377  */
378  replorigin_drop_by_name(originname, true, false);
379 
380  finish_sync_worker();
381  }
382  else
384 }
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
void replorigin_session_reset(void)
Definition: origin.c:1171
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
#define InvalidRepOriginId
Definition: origin.h:33
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:424
uint32 TimeLineID
Definition: xlogdefs.h:59

References CommitTransactionCommand(), InvalidRepOriginId, InvalidXLogRecPtr, IsTransactionState(), LogRepWorkerWalRcvConn, MyLogicalRepWorker, NAMEDATALEN, pgstat_report_stat(), LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, replorigin_session_reset(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), LogicalRepWorker::subid, UpdateSubscriptionRelState(), and walrcv_endstreaming.

Referenced by process_syncing_tables().

◆ ReplicationSlotNameForTablesync()

void ReplicationSlotNameForTablesync ( Oid  suboid,
Oid  relid,
char *  syncslotname,
Size  szslot 
)

Definition at line 1189 of file tablesync.c.

1191 {
1192  snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1193  relid, GetSystemIdentifier());
1194 }
#define UINT64_FORMAT
Definition: c.h:485
#define snprintf
Definition: port.h:238
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4181

References GetSystemIdentifier(), snprintf, and UINT64_FORMAT.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), process_syncing_tables_for_sync(), and ReportSlotConnectionError().

◆ UpdateTwoPhaseState()

void UpdateTwoPhaseState ( Oid  suboid,
char  new_state 
)

Definition at line 1566 of file tablesync.c.

1567 {
1568  Relation rel;
1569  HeapTuple tup;
1570  bool nulls[Natts_pg_subscription];
1571  bool replaces[Natts_pg_subscription];
1572  Datum values[Natts_pg_subscription];
1573 
1575  new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1576  new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1577 
1578  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1580  if (!HeapTupleIsValid(tup))
1581  elog(ERROR,
1582  "cache lookup failed for subscription oid %u",
1583  suboid);
1584 
1585  /* Form a new tuple. */
1586  memset(values, 0, sizeof(values));
1587  memset(nulls, false, sizeof(nulls));
1588  memset(replaces, false, sizeof(replaces));
1589 
1590  /* And update/set two_phase state */
1591  values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1592  replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1593 
1594  tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1595  values, nulls, replaces);
1596  CatalogTupleUpdate(rel, &tup->t_self, tup);
1597 
1598  heap_freetuple(tup);
1600 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_ENABLED
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:600
static Datum CharGetDatum(char X)
Definition: postgres.h:470
#define RelationGetDescr(relation)
Definition: rel.h:527
ItemPointerData t_self
Definition: htup.h:65
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:179
@ SUBSCRIPTIONOID
Definition: syscache.h:99

References Assert(), CatalogTupleUpdate(), CharGetDatum(), elog(), ERROR, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, HeapTupleData::t_self, table_close(), table_open(), and values.

Referenced by ApplyWorkerMain(), and CreateSubscription().

◆ wait_for_relation_state_change()

static bool wait_for_relation_state_change ( Oid  relid,
char  expected_state 
)
static

Definition at line 174 of file tablesync.c.

175 {
176  char state;
177 
178  for (;;)
179  {
180  LogicalRepWorker *worker;
181  XLogRecPtr statelsn;
182 
184 
187  relid, &statelsn);
188 
189  if (state == SUBREL_STATE_UNKNOWN)
190  break;
191 
192  if (state == expected_state)
193  return true;
194 
195  /* Check if the sync worker is still running and bail if not. */
196  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
198  false);
199  LWLockRelease(LogicalRepWorkerLock);
200  if (!worker)
201  break;
202 
203  (void) WaitLatch(MyLatch,
206 
208  }
209 
210  return false;
211 }
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:476
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:457
Definition: regguts.h:318
@ WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE
Definition: wait_event.h:109

References CHECK_FOR_INTERRUPTS, GetSubscriptionRelState(), InvalidateCatalogSnapshot(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, ResetLatch(), LogicalRepWorker::subid, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by process_syncing_tables_for_apply().

◆ wait_for_worker_state_change()

static bool wait_for_worker_state_change ( char  expected_state)
static

Definition at line 222 of file tablesync.c.

223 {
224  int rc;
225 
226  for (;;)
227  {
228  LogicalRepWorker *worker;
229 
231 
232  /*
233  * Done if already in correct state. (We assume this fetch is atomic
234  * enough to not give a misleading answer if we do it with no lock.)
235  */
236  if (MyLogicalRepWorker->relstate == expected_state)
237  return true;
238 
239  /*
240  * Bail out if the apply worker has died, else signal it we're
241  * waiting.
242  */
243  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
245  InvalidOid, false);
246  if (worker && worker->proc)
248  LWLockRelease(LogicalRepWorkerLock);
249  if (!worker)
250  break;
251 
252  /*
253  * Wait. We expect to get a latch signal back from the apply worker,
254  * but use a timeout in case it dies without sending one.
255  */
256  rc = WaitLatch(MyLatch,
259 
260  if (rc & WL_LATCH_SET)
262  }
263 
264  return false;
265 }

References CHECK_FOR_INTERRUPTS, InvalidOid, logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, LogicalRepWorker::proc, LogicalRepWorker::relstate, ResetLatch(), LogicalRepWorker::subid, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by LogicalRepSyncTableStart().

Variable Documentation

◆ copybuf

◆ table_states_not_ready

List* table_states_not_ready = NIL
static

◆ table_states_valid

bool table_states_valid = false
static

Definition at line 123 of file tablesync.c.

Referenced by FetchTableStates(), and invalidate_syncing_table_states().