PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
worker.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "funcapi.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/namespace.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "nodes/makefuncs.h"
#include "optimizer/planner.h"
#include "parser/parse_relation.h"
#include "postmaster/bgworker.h"
#include "postmaster/postmaster.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/reorderbuffer.h"
#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/timeout.h"
#include "utils/tqual.h"
#include "utils/syscache.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  SlotErrCallbackArg
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct SlotErrCallbackArg SlotErrCallbackArg
 

Functions

static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void store_flush_position (XLogRecPtr remote_lsn)
 
static void reread_subscription (void)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static bool ensure_transaction (void)
 
static EStatecreate_estate_for_relation (LogicalRepRelMapEntry *rel)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_error_callback (void *arg)
 
static void slot_store_cstrings (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
 
static void slot_modify_cstrings (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static Oid GetRelationIdentityOrPK (Relation rel)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
void ApplyWorkerMain (Datum main_arg)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
WalReceiverConnwrconn = NULL
 
SubscriptionMySubscription = NULL
 
bool MySubscriptionValid = false
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 

Macro Definition Documentation

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 87 of file worker.c.

Referenced by LogicalRepApplyLoop().

Typedef Documentation

Function Documentation

static void apply_dispatch ( StringInfo  s)
static

Definition at line 857 of file worker.c.

References apply_handle_begin(), apply_handle_commit(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_relation(), apply_handle_type(), apply_handle_update(), ereport, errcode(), errmsg(), ERROR, and pq_getmsgbyte().

Referenced by LogicalRepApplyLoop().

858 {
859  char action = pq_getmsgbyte(s);
860 
861  switch (action)
862  {
863  /* BEGIN */
864  case 'B':
866  break;
867  /* COMMIT */
868  case 'C':
870  break;
871  /* INSERT */
872  case 'I':
874  break;
875  /* UPDATE */
876  case 'U':
878  break;
879  /* DELETE */
880  case 'D':
882  break;
883  /* RELATION */
884  case 'R':
886  break;
887  /* TYPE */
888  case 'Y':
890  break;
891  /* ORIGIN */
892  case 'O':
894  break;
895  default:
896  ereport(ERROR,
897  (errcode(ERRCODE_PROTOCOL_VIOLATION),
898  errmsg("invalid logical replication message type %c", action)));
899  }
900 }
static void apply_handle_type(StringInfo s)
Definition: worker.c:513
static void apply_handle_insert(StringInfo s)
Definition: worker.c:543
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
static void apply_handle_delete(StringInfo s)
Definition: worker.c:758
static void apply_handle_begin(StringInfo s)
Definition: worker.c:419
#define ereport(elevel, rest)
Definition: elog.h:122
static void apply_handle_commit(StringInfo s)
Definition: worker.c:438
static void apply_handle_update(StringInfo s)
Definition: worker.c:637
static void apply_handle_relation(StringInfo s)
Definition: worker.c:498
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
static void apply_handle_origin(StringInfo s)
Definition: worker.c:476
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void apply_handle_begin ( StringInfo  s)
static

Definition at line 419 of file worker.c.

References LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), NULL, pgstat_report_activity(), remote_final_lsn, and STATE_RUNNING.

Referenced by apply_dispatch().

420 {
421  LogicalRepBeginData begin_data;
422 
423  logicalrep_read_begin(s, &begin_data);
424 
425  remote_final_lsn = begin_data.final_lsn;
426 
427  in_remote_transaction = true;
428 
430 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:113
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:57
bool in_remote_transaction
Definition: worker.c:112
XLogRecPtr final_lsn
Definition: logicalproto.h:67
#define NULL
Definition: c.h:229
static void apply_handle_commit ( StringInfo  s)
static

Definition at line 438 of file worker.c.

References am_tablesync_worker(), Assert, LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, in_remote_transaction, IsTransactionState(), logicalrep_read_commit(), NULL, pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), remote_final_lsn, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, STATE_IDLE, and store_flush_position().

Referenced by apply_dispatch().

439 {
440  LogicalRepCommitData commit_data;
441 
442  logicalrep_read_commit(s, &commit_data);
443 
444  Assert(commit_data.commit_lsn == remote_final_lsn);
445 
446  /* The synchronization worker runs in single transaction. */
448  {
449  /*
450  * Update origin state so we can restart streaming from correct
451  * position in case of crash.
452  */
455 
457  pgstat_report_stat(false);
458 
459  store_flush_position(commit_data.end_lsn);
460  }
461 
462  in_remote_transaction = false;
463 
464  /* Process any tables that are being synchronized in parallel. */
465  process_syncing_tables(commit_data.end_lsn);
466 
468 }
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:960
static XLogRecPtr remote_final_lsn
Definition: worker.c:113
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:477
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
void CommitTransactionCommand(void)
Definition: xact.c:2749
bool in_remote_transaction
Definition: worker.c:112
static bool am_tablesync_worker(void)
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:151
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:152
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
bool IsTransactionState(void)
Definition: xact.c:350
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:92
XLogRecPtr commit_lsn
Definition: logicalproto.h:74
TimestampTz committime
Definition: logicalproto.h:76
void pgstat_report_stat(bool force)
Definition: pgstat.c:812
static void apply_handle_delete ( StringInfo  s)
static

Definition at line 758 of file worker.c.

References AfterTriggerEndQuery(), Assert, check_relation_updatable(), CommandCounterIncrement(), create_estate_for_relation(), DEBUG1, ensure_transaction(), ereport, errmsg(), EState::es_result_relation_info, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSetSlotDescriptor(), ExecSimpleRelationDelete(), FreeExecutorState(), GetPerTupleMemoryContext, GetRelationIdentityOrPK(), GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, LockTupleExclusive, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NIL, NoLock, NULL, OidIsValid, PopActiveSnapshot(), PushActiveSnapshot(), RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, REPLICA_IDENTITY_FULL, LogicalRepRelation::replident, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_cstrings(), and LogicalRepTupleData::values.

Referenced by apply_dispatch().

759 {
761  LogicalRepTupleData oldtup;
762  LogicalRepRelId relid;
763  Oid idxoid;
764  EState *estate;
765  EPQState epqstate;
766  TupleTableSlot *remoteslot;
767  TupleTableSlot *localslot;
768  bool found;
769  MemoryContext oldctx;
770 
772 
773  relid = logicalrep_read_delete(s, &oldtup);
776  {
777  /*
778  * The relation can't become interesting in the middle of the
779  * transaction so it's safe to unlock it.
780  */
782  return;
783  }
784 
785  /* Check if we can do the delete. */
787 
788  /* Initialize the executor state. */
789  estate = create_estate_for_relation(rel);
790  remoteslot = ExecInitExtraTupleSlot(estate);
792  localslot = ExecInitExtraTupleSlot(estate);
794  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
795 
798 
799  /* Find the tuple using the replica identity index. */
801  slot_store_cstrings(remoteslot, rel, oldtup.values);
802  MemoryContextSwitchTo(oldctx);
803 
804  /*
805  * Try to find tuple using either replica identity index, primary key or
806  * if needed, sequential scan.
807  */
808  idxoid = GetRelationIdentityOrPK(rel->localrel);
809  Assert(OidIsValid(idxoid) ||
811 
812  if (OidIsValid(idxoid))
813  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
815  remoteslot, localslot);
816  else
818  remoteslot, localslot);
819  /* If found delete it. */
820  if (found)
821  {
822  EvalPlanQualSetSlot(&epqstate, localslot);
823 
824  /* Do the actual delete. */
825  ExecSimpleRelationDelete(estate, &epqstate, localslot);
826  }
827  else
828  {
829  /* The tuple to be deleted could not be found. */
830  ereport(DEBUG1,
831  (errmsg("logical replication could not find row for delete "
832  "in replication target %s",
834  }
835 
836  /* Cleanup. */
839 
840  /* Handle queued AFTER triggers. */
841  AfterTriggerEndQuery(estate);
842 
843  EvalPlanQualEnd(&epqstate);
844  ExecResetTupleTable(estate->es_tupleTable, false);
845  FreeExecutorState(estate);
846 
848 
850 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:135
#define NIL
Definition: pg_list.h:69
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:178
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:527
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
#define DEBUG1
Definition: elog.h:25
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:277
#define RelationGetDescr(relation)
Definition: rel.h:429
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:363
void PopActiveSnapshot(void)
Definition: snapmgr.c:807
unsigned int Oid
Definition: postgres_ext.h:31
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:300
#define OidIsValid(objectId)
Definition: c.h:538
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3162
void FreeExecutorState(EState *estate)
Definition: execUtils.c:178
#define REPLICA_IDENTITY_FULL
Definition: pg_class.h:179
static bool ensure_transaction(void)
Definition: worker.c:151
LogicalRepRelation remoterel
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:297
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:728
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:437
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:223
#define ereport(elevel, rest)
Definition: elog.h:122
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:603
List * es_tupleTable
Definition: execnodes.h:450
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:156
void CommandCounterIncrement(void)
Definition: xact.c:922
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
void EvalPlanQualInit(EPQState *epqstate, EState *estate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2751
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:461
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4188
int errmsg(const char *fmt,...)
Definition: elog.c:797
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:220
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:423
static void apply_handle_insert ( StringInfo  s)
static

Definition at line 543 of file worker.c.

References AfterTriggerEndQuery(), CommandCounterIncrement(), create_estate_for_relation(), ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSetSlotDescriptor(), ExecSimpleRelationInsert(), FreeExecutorState(), GetPerTupleMemoryContext, GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_cstrings(), and LogicalRepTupleData::values.

Referenced by apply_dispatch().

544 {
546  LogicalRepTupleData newtup;
547  LogicalRepRelId relid;
548  EState *estate;
549  TupleTableSlot *remoteslot;
550  MemoryContext oldctx;
551 
553 
554  relid = logicalrep_read_insert(s, &newtup);
557  {
558  /*
559  * The relation can't become interesting in the middle of the
560  * transaction so it's safe to unlock it.
561  */
563  return;
564  }
565 
566  /* Initialize the executor state. */
567  estate = create_estate_for_relation(rel);
568  remoteslot = ExecInitExtraTupleSlot(estate);
570 
571  /* Process and store remote tuple in the slot */
573  slot_store_cstrings(remoteslot, rel, newtup.values);
574  slot_fill_defaults(rel, estate, remoteslot);
575  MemoryContextSwitchTo(oldctx);
576 
579 
580  /* Do the insert. */
581  ExecSimpleRelationInsert(estate, remoteslot);
582 
583  /* Cleanup. */
586 
587  /* Handle queued AFTER triggers. */
588  AfterTriggerEndQuery(estate);
589 
590  ExecResetTupleTable(estate->es_tupleTable, false);
591  FreeExecutorState(estate);
592 
594 
596 }
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:135
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:178
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
#define RelationGetDescr(relation)
Definition: rel.h:429
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:363
void PopActiveSnapshot(void)
Definition: snapmgr.c:807
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:300
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:160
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:217
void FreeExecutorState(EState *estate)
Definition: execUtils.c:178
static bool ensure_transaction(void)
Definition: worker.c:151
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:297
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:728
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:223
List * es_tupleTable
Definition: execnodes.h:450
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:156
void CommandCounterIncrement(void)
Definition: xact.c:922
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:461
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4188
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
uint32 LogicalRepRelId
Definition: logicalproto.h:39
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:423
static void apply_handle_origin ( StringInfo  s)
static

Definition at line 476 of file worker.c.

References am_tablesync_worker(), ereport, errcode(), errmsg(), ERROR, in_remote_transaction, and IsTransactionState().

Referenced by apply_dispatch().

477 {
478  /*
479  * ORIGIN message can only come inside remote transaction and before any
480  * actual writes.
481  */
482  if (!in_remote_transaction ||
484  ereport(ERROR,
485  (errcode(ERRCODE_PROTOCOL_VIOLATION),
486  errmsg("ORIGIN message sent out of order")));
487 }
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
bool in_remote_transaction
Definition: worker.c:112
static bool am_tablesync_worker(void)
#define ereport(elevel, rest)
Definition: elog.h:122
bool IsTransactionState(void)
Definition: xact.c:350
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void apply_handle_relation ( StringInfo  s)
static

Definition at line 498 of file worker.c.

References logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

499 {
500  LogicalRepRelation *rel;
501 
502  rel = logicalrep_read_rel(s);
504 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:324
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:158
static void apply_handle_type ( StringInfo  s)
static

Definition at line 513 of file worker.c.

References logicalrep_read_typ(), and logicalrep_typmap_update().

Referenced by apply_dispatch().

514 {
515  LogicalRepTyp typ;
516 
517  logicalrep_read_typ(s, &typ);
519 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:376
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:406
static void apply_handle_update ( StringInfo  s)
static

Definition at line 637 of file worker.c.

References AfterTriggerEndQuery(), Assert, LogicalRepTupleData::changed, check_relation_updatable(), CommandCounterIncrement(), create_estate_for_relation(), DEBUG1, elog, ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSetSlotDescriptor(), ExecSimpleRelationUpdate(), ExecStoreTuple(), FreeExecutorState(), GetPerTupleMemoryContext, GetRelationIdentityOrPK(), GetTransactionSnapshot(), InvalidBuffer, LogicalRepRelMapEntry::localrel, LockTupleExclusive, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NIL, NoLock, NULL, OidIsValid, PopActiveSnapshot(), PushActiveSnapshot(), RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, REPLICA_IDENTITY_FULL, LogicalRepRelation::replident, RowExclusiveLock, should_apply_changes_for_rel(), slot_modify_cstrings(), slot_store_cstrings(), TupleTableSlot::tts_tuple, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

638 {
640  LogicalRepRelId relid;
641  Oid idxoid;
642  EState *estate;
643  EPQState epqstate;
644  LogicalRepTupleData oldtup;
645  LogicalRepTupleData newtup;
646  bool has_oldtup;
647  TupleTableSlot *localslot;
648  TupleTableSlot *remoteslot;
649  bool found;
650  MemoryContext oldctx;
651 
653 
654  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
655  &newtup);
658  {
659  /*
660  * The relation can't become interesting in the middle of the
661  * transaction so it's safe to unlock it.
662  */
664  return;
665  }
666 
667  /* Check if we can do the update. */
669 
670  /* Initialize the executor state. */
671  estate = create_estate_for_relation(rel);
672  remoteslot = ExecInitExtraTupleSlot(estate);
674  localslot = ExecInitExtraTupleSlot(estate);
676  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
677 
680 
681  /* Build the search tuple. */
683  slot_store_cstrings(remoteslot, rel,
684  has_oldtup ? oldtup.values : newtup.values);
685  MemoryContextSwitchTo(oldctx);
686 
687  /*
688  * Try to find tuple using either replica identity index, primary key or
689  * if needed, sequential scan.
690  */
691  idxoid = GetRelationIdentityOrPK(rel->localrel);
692  Assert(OidIsValid(idxoid) ||
693  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
694 
695  if (OidIsValid(idxoid))
696  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
698  remoteslot, localslot);
699  else
701  remoteslot, localslot);
702 
703  ExecClearTuple(remoteslot);
704 
705  /*
706  * Tuple found.
707  *
708  * Note this will fail if there are other conflicting unique indexes.
709  */
710  if (found)
711  {
712  /* Process and store remote tuple in the slot */
714  ExecStoreTuple(localslot->tts_tuple, remoteslot, InvalidBuffer, false);
715  slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
716  MemoryContextSwitchTo(oldctx);
717 
718  EvalPlanQualSetSlot(&epqstate, remoteslot);
719 
720  /* Do the actual update. */
721  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
722  }
723  else
724  {
725  /*
726  * The tuple to be updated could not be found.
727  *
728  * TODO what to do here, change the log level to LOG perhaps?
729  */
730  elog(DEBUG1,
731  "logical replication did not find row for update "
732  "in replication target relation \"%s\"",
734  }
735 
736  /* Cleanup. */
739 
740  /* Handle queued AFTER triggers. */
741  AfterTriggerEndQuery(estate);
742 
743  EvalPlanQualEnd(&epqstate);
744  ExecResetTupleTable(estate->es_tupleTable, false);
745  FreeExecutorState(estate);
746 
748 
750 }
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:135
#define NIL
Definition: pg_list.h:69
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:178
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:527
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
#define DEBUG1
Definition: elog.h:25
#define RelationGetDescr(relation)
Definition: rel.h:429
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define InvalidBuffer
Definition: buf.h:25
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:363
void PopActiveSnapshot(void)
Definition: snapmgr.c:807
unsigned int Oid
Definition: postgres_ext.h:31
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:300
#define OidIsValid(objectId)
Definition: c.h:538
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3162
void FreeExecutorState(EState *estate)
Definition: execUtils.c:178
#define REPLICA_IDENTITY_FULL
Definition: pg_class.h:179
static bool ensure_transaction(void)
Definition: worker.c:151
LogicalRepRelation remoterel
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:297
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:728
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:437
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:223
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:603
List * es_tupleTable
Definition: execnodes.h:450
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:156
void CommandCounterIncrement(void)
Definition: xact.c:922
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
void EvalPlanQualInit(EPQState *epqstate, EState *estate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2751
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:211
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:461
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4188
static void slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:361
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
HeapTuple tts_tuple
Definition: tuptable.h:120
#define elog
Definition: elog.h:219
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:220
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:423
void ApplyWorkerMain ( Datum  main_arg)

Definition at line 1449 of file worker.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), am_tablesync_worker(), Assert, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), BackgroundWorker::bgw_name, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, CurrentResourceOwner, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, elog, Subscription::enabled, ereport, errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_VERSION_NUM, logicalrep_worker_attach(), logicalrep_worker_sighup(), logicalrep_worker_sigterm(), LogicalRepApplyLoop(), LogicalRepSyncTableStart(), MemoryContextSwitchTo(), MyBgworkerEntry, MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, NULL, Subscription::oid, OidIsValid, options, pfree(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_S_SESSION, PGC_SUSET, PGC_USERSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, pstrdup(), Subscription::publications, LogicalRepWorker::relid, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, ResourceOwnerCreate(), server_version, SetConfigOption(), SIGHUP, Subscription::slotname, WalRcvStreamOptions::slotname, snprintf(), WalRcvStreamOptions::startpoint, StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, LogicalRepWorker::userid, walrcv_connect, walrcv_disconnect, walrcv_identify_system, and walrcv_startstreaming.

1450 {
1451  int worker_slot = DatumGetInt32(main_arg);
1452  MemoryContext oldctx;
1453  char originname[NAMEDATALEN];
1454  XLogRecPtr origin_startpos;
1455  char *myslotname;
1457 
1458  /* Attach to slot */
1459  logicalrep_worker_attach(worker_slot);
1460 
1461  /* Setup signal handling */
1465 
1466  /* Initialise stats to a sanish value */
1469 
1470  /* Make it easy to identify our processes. */
1471  SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
1473 
1474  /* Load the libpq-specific functions */
1475  load_file("libpqwalreceiver", false);
1476 
1479  "logical replication apply");
1480 
1481  /* Run as replica session replication role. */
1482  SetConfigOption("session_replication_role", "replica",
1484 
1485  /* Connect to our database. */
1488 
1489  /* Load the subscription into persistent memory context. */
1491  "ApplyContext",
1496  MySubscriptionValid = true;
1497  MemoryContextSwitchTo(oldctx);
1498 
1499  /* Setup synchronous commit according to the user's wishes */
1500  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1502 
1503  if (!MySubscription->enabled)
1504  {
1505  ereport(LOG,
1506  (errmsg("logical replication worker for subscription \"%s\" will not "
1507  "start because the subscription was disabled during startup",
1508  MySubscription->name)));
1509 
1510  proc_exit(0);
1511  }
1512 
1513  /* Keep us informed about subscription changes. */
1516  (Datum) 0);
1517 
1518  if (am_tablesync_worker())
1519  elog(LOG, "logical replication sync for subscription %s, table %s started",
1521  else
1522  elog(LOG, "logical replication apply for subscription %s started",
1523  MySubscription->name);
1524 
1526 
1527  /* Connect to the origin and start the replication. */
1528  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1530 
1531  if (am_tablesync_worker())
1532  {
1533  char *syncslotname;
1534 
1535  /* This is table synchroniation worker, call initial sync. */
1536  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1537 
1538  /* The slot name needs to be allocated in permanent memory context. */
1540  myslotname = pstrdup(syncslotname);
1541  MemoryContextSwitchTo(oldctx);
1542 
1543  pfree(syncslotname);
1544  }
1545  else
1546  {
1547  /* This is main apply worker */
1548  RepOriginId originid;
1549  TimeLineID startpointTLI;
1550  char *err;
1551  int server_version;
1552 
1553  myslotname = MySubscription->slotname;
1554 
1555  /*
1556  * This shouldn't happen if the subscription is enabled, but guard
1557  * against DDL bugs or manual catalog changes. (libpqwalreceiver
1558  * will crash if slot is NULL.
1559  */
1560  if (!myslotname)
1561  ereport(ERROR,
1562  (errmsg("subscription has no replication slot set")));
1563 
1564  /* Setup replication origin tracking. */
1566  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1567  originid = replorigin_by_name(originname, true);
1568  if (!OidIsValid(originid))
1569  originid = replorigin_create(originname);
1570  replorigin_session_setup(originid);
1571  replorigin_session_origin = originid;
1572  origin_startpos = replorigin_session_get_progress(false);
1574 
1575  wrconn = walrcv_connect(MySubscription->conninfo, true, myslotname,
1576  &err);
1577  if (wrconn == NULL)
1578  ereport(ERROR,
1579  (errmsg("could not connect to the publisher: %s", err)));
1580 
1581  /*
1582  * We don't really use the output identify_system for anything but it
1583  * does some initializations on the upstream so let's still call it.
1584  */
1585  (void) walrcv_identify_system(wrconn, &startpointTLI,
1586  &server_version);
1587 
1588  }
1589 
1590  /*
1591  * Setup callback for syscache so that we know when something changes in
1592  * the subscription relation state.
1593  */
1596  (Datum) 0);
1597 
1598  /* Build logical replication streaming options. */
1599  options.logical = true;
1600  options.startpoint = origin_startpos;
1601  options.slotname = myslotname;
1602  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1603  options.proto.logical.publication_names = MySubscription->publications;
1604 
1605  /* Start normal logical streaming replication. */
1606  walrcv_startstreaming(wrconn, &options);
1607 
1608  /* Run the main loop. */
1609  LogicalRepApplyLoop(origin_startpos);
1610 
1612 
1613  /* We should only get here if we received SIGTERM */
1614  proc_exit(0);
1615 }
Subscription * MySubscription
Definition: worker.c:109
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
void logicalrep_worker_sighup(SIGNAL_ARGS)
Definition: launcher.c:636
WalReceiverConn * wrconn
Definition: worker.c:107
#define DEBUG1
Definition: elog.h:25
uint32 TimeLineID
Definition: xlogdefs.h:45
#define DatumGetInt32(X)
Definition: postgres.h:478
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
char * pstrdup(const char *in)
Definition: mcxt.c:1077
void CommitTransactionCommand(void)
Definition: xact.c:2749
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:51
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:252
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1110
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:189
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:983
#define LOG
Definition: elog.h:26
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:762
#define OidIsValid(objectId)
Definition: c.h:538
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
#define NAMEDATALEN
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1441
void pfree(void *pointer)
Definition: mcxt.c:950
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
#define ERROR
Definition: elog.h:43
Definition: guc.h:75
static bool am_tablesync_worker(void)
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:248
void logicalrep_worker_attach(int slot)
Definition: launcher.c:536
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6672
XLogRecPtr startpoint
Definition: walreceiver.h:145
List * publications
RepOriginId replorigin_create(char *roname)
Definition: origin.c:235
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
Definition: postmaster.c:5493
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext TopMemoryContext
Definition: mcxt.c:43
MemoryContext ApplyContext
Definition: worker.c:105
static char ** options
union WalRcvStreamOptions::@97 proto
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1389
uintptr_t Datum
Definition: postgres.h:372
void logicalrep_worker_sigterm(SIGNAL_ARGS)
Definition: launcher.c:622
#define SIGHUP
Definition: win32.h:188
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
TimestampTz last_recv_time
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:675
RepOriginId replorigin_session_origin
Definition: origin.c:150
void StartTransactionCommand(void)
Definition: xact.c:2679
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static int server_version
Definition: pg_dumpall.c:82
#define walrcv_disconnect(conn)
Definition: walreceiver.h:264
bool MySubscriptionValid
Definition: worker.c:110
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:995
#define elog
Definition: elog.h:219
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1726
TimestampTz reply_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:194
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5522
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 603 of file worker.c.

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), and apply_handle_update().

604 {
605  /* Updatable, no error. */
606  if (rel->updatable)
607  return;
608 
609  /*
610  * We are in error mode so it's fine this is somewhat slow. It's better to
611  * give user correct error.
612  */
614  {
615  ereport(ERROR,
616  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
617  errmsg("publisher does not send replica identity column "
618  "expected by the logical replication target relation \"%s.%s\"",
619  rel->remoterel.nspname, rel->remoterel.relname)));
620  }
621 
622  ereport(ERROR,
623  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
624  errmsg("logical replication target relation \"%s.%s\" has "
625  "neither REPLICA IDENTITY index nor PRIMARY "
626  "KEY and published relation does not have "
627  "REPLICA IDENTITY FULL",
628  rel->remoterel.nspname, rel->remoterel.relname)));
629 }
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:527
int errcode(int sqlerrcode)
Definition: elog.c:575
#define OidIsValid(objectId)
Definition: c.h:538
#define ERROR
Definition: elog.h:43
LogicalRepRelation remoterel
#define ereport(elevel, rest)
Definition: elog.h:122
int errmsg(const char *fmt,...)
Definition: elog.c:797
static EState* create_estate_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 178 of file worker.c.

References AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_num_result_relations, EState::es_range_table, EState::es_result_relation_info, EState::es_result_relations, EState::es_trig_tuple_slot, ExecInitExtraTupleSlot(), InitResultRelInfo(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, NULL, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, ResultRelInfo::ri_TrigDesc, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

179 {
180  EState *estate;
181  ResultRelInfo *resultRelInfo;
182  RangeTblEntry *rte;
183 
184  estate = CreateExecutorState();
185 
186  rte = makeNode(RangeTblEntry);
187  rte->rtekind = RTE_RELATION;
188  rte->relid = RelationGetRelid(rel->localrel);
189  rte->relkind = rel->localrel->rd_rel->relkind;
190  estate->es_range_table = list_make1(rte);
191 
192  resultRelInfo = makeNode(ResultRelInfo);
193  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
194 
195  estate->es_result_relations = resultRelInfo;
196  estate->es_num_result_relations = 1;
197  estate->es_result_relation_info = resultRelInfo;
198 
199  /* Triggers might need a slot */
200  if (resultRelInfo->ri_TrigDesc)
201  estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
202 
203  /* Prepare to catch AFTER triggers. */
205 
206  return estate;
207 }
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1297
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
List * es_range_table
Definition: execnodes.h:411
Form_pg_class rd_rel
Definition: rel.h:114
#define list_make1(x1)
Definition: pg_list.h:139
ResultRelInfo * es_result_relations
Definition: execnodes.h:421
TupleTableSlot * es_trig_tuple_slot
Definition: execnodes.h:437
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:378
EState * CreateExecutorState(void)
Definition: execUtils.c:80
int es_num_result_relations
Definition: execnodes.h:422
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4168
#define makeNode(_type_)
Definition: nodes.h:557
#define NULL
Definition: c.h:229
RTEKind rtekind
Definition: parsenodes.h:929
#define RelationGetRelid(relation)
Definition: rel.h:417
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:423
static bool ensure_transaction ( void  )
static

Definition at line 151 of file worker.c.

References CurrentMemoryContext, IsTransactionState(), MemoryContextSwitchTo(), MySubscriptionValid, reread_subscription(), and StartTransactionCommand().

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

152 {
153  if (IsTransactionState())
154  {
157 
158  return false;
159  }
160 
162 
163  if (!MySubscriptionValid)
165 
167  return true;
168 }
static void reread_subscription(void)
Definition: worker.c:1303
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static MemoryContext ApplyMessageContext
Definition: worker.c:104
void StartTransactionCommand(void)
Definition: xact.c:2679
bool IsTransactionState(void)
Definition: xact.c:350
bool MySubscriptionValid
Definition: worker.c:110
static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 916 of file worker.c.

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, pfree(), and FlushPosition::remote_end.

Referenced by send_feedback().

918 {
919  dlist_mutable_iter iter;
920  XLogRecPtr local_flush = GetFlushRecPtr();
921 
923  *flush = InvalidXLogRecPtr;
924 
926  {
927  FlushPosition *pos =
928  dlist_container(FlushPosition, node, iter.cur);
929 
930  *write = pos->remote_end;
931 
932  if (pos->local_end <= local_flush)
933  {
934  *flush = pos->remote_end;
935  dlist_delete(iter.cur);
936  pfree(pos);
937  }
938  else
939  {
940  /*
941  * Don't want to uselessly iterate over the rest of the list which
942  * could potentially be long. Instead get the last element and
943  * grab the write position from there.
944  */
945  pos = dlist_tail_element(FlushPosition, node,
946  &lsn_mapping);
947  *write = pos->remote_end;
948  *have_pending_txes = true;
949  return;
950  }
951  }
952 
953  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
954 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static dlist_head lsn_mapping
Definition: worker.c:96
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:93
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8222
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:950
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
XLogRecPtr local_end
Definition: worker.c:92
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
static Oid GetRelationIdentityOrPK ( Relation  rel)
static

Definition at line 527 of file worker.c.

References OidIsValid, RelationGetPrimaryKeyIndex(), and RelationGetReplicaIndex().

Referenced by apply_handle_delete(), apply_handle_update(), and check_relation_updatable().

528 {
529  Oid idxoid;
530 
531  idxoid = RelationGetReplicaIndex(rel);
532 
533  if (!OidIsValid(idxoid))
534  idxoid = RelationGetPrimaryKeyIndex(rel);
535 
536  return idxoid;
537 }
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4676
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:538
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4655
static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 995 of file worker.c.

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), apply_dispatch(), buf, CHECK_FOR_INTERRUPTS, StringInfoData::cursor, StringInfoData::data, ereport, errmsg(), ERROR, fd(), GetCurrentTimestamp(), got_SIGHUP, got_SIGTERM, in_remote_transaction, StringInfoData::len, LOG, StringInfoData::maxlen, MemoryContextReset(), MemoryContextResetAndDeleteChildren, MemoryContextSwitchTo(), MyProc, MySubscriptionValid, NAPTIME_PER_CYCLE, now(), NULL, PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pq_getmsgbyte(), pq_getmsgint64(), proc_exit(), process_syncing_tables(), ProcessConfigFile(), PGPROC::procLatch, reread_subscription(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WAIT_EVENT_LOGICAL_APPLY_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by ApplyWorkerMain().

996 {
997  /*
998  * Init the ApplyMessageContext which we clean up after each replication
999  * protocol message.
1000  */
1002  "ApplyMessageContext",
1004 
1005  /* mark as idle, before starting to loop */
1007 
1008  while (!got_SIGTERM)
1009  {
1011  int rc;
1012  int len;
1013  char *buf = NULL;
1014  bool endofstream = false;
1015  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1016  bool ping_sent = false;
1017 
1019 
1020  len = walrcv_receive(wrconn, &buf, &fd);
1021 
1022  if (len != 0)
1023  {
1024  /* Process the data */
1025  for (;;)
1026  {
1028 
1029  if (len == 0)
1030  {
1031  break;
1032  }
1033  else if (len < 0)
1034  {
1035  ereport(LOG,
1036  (errmsg("data stream from publisher has ended")));
1037  endofstream = true;
1038  break;
1039  }
1040  else
1041  {
1042  int c;
1043  StringInfoData s;
1044 
1045  /* Reset timeout. */
1046  last_recv_timestamp = GetCurrentTimestamp();
1047  ping_sent = false;
1048 
1049  /* Ensure we are reading the data into our memory context. */
1051 
1052  s.data = buf;
1053  s.len = len;
1054  s.cursor = 0;
1055  s.maxlen = -1;
1056 
1057  c = pq_getmsgbyte(&s);
1058 
1059  if (c == 'w')
1060  {
1061  XLogRecPtr start_lsn;
1062  XLogRecPtr end_lsn;
1063  TimestampTz send_time;
1064 
1065  start_lsn = pq_getmsgint64(&s);
1066  end_lsn = pq_getmsgint64(&s);
1067  send_time = pq_getmsgint64(&s);
1068 
1069  if (last_received < start_lsn)
1070  last_received = start_lsn;
1071 
1072  if (last_received < end_lsn)
1073  last_received = end_lsn;
1074 
1075  UpdateWorkerStats(last_received, send_time, false);
1076 
1077  apply_dispatch(&s);
1078  }
1079  else if (c == 'k')
1080  {
1081  XLogRecPtr end_lsn;
1083  bool reply_requested;
1084 
1085  end_lsn = pq_getmsgint64(&s);
1086  timestamp = pq_getmsgint64(&s);
1087  reply_requested = pq_getmsgbyte(&s);
1088 
1089  if (last_received < end_lsn)
1090  last_received = end_lsn;
1091 
1092  send_feedback(last_received, reply_requested, false);
1093  UpdateWorkerStats(last_received, timestamp, true);
1094  }
1095  /* other message types are purposefully ignored */
1096 
1098  }
1099 
1100  len = walrcv_receive(wrconn, &buf, &fd);
1101  }
1102 
1103  /* confirm all writes at once */
1104  send_feedback(last_received, false, false);
1105  }
1106 
1107  if (!in_remote_transaction)
1108  {
1109  /*
1110  * If we didn't get any transactions for a while there might be
1111  * unconsumed invalidation messages in the queue, consume them
1112  * now.
1113  */
1115  if (!MySubscriptionValid)
1117 
1118  /* Process any table synchronization changes. */
1119  process_syncing_tables(last_received);
1120  }
1121 
1122  /* Cleanup the memory. */
1125 
1126  /* Check if we need to exit the streaming loop. */
1127  if (endofstream)
1128  {
1129  TimeLineID tli;
1130 
1131  walrcv_endstreaming(wrconn, &tli);
1132  break;
1133  }
1134 
1135  /*
1136  * Wait for more data or latch.
1137  */
1141  fd, NAPTIME_PER_CYCLE,
1143 
1144  /* Emergency bailout if postmaster has died */
1145  if (rc & WL_POSTMASTER_DEATH)
1146  proc_exit(1);
1147 
1148  if (got_SIGHUP)
1149  {
1150  got_SIGHUP = false;
1152  }
1153 
1154  if (rc & WL_TIMEOUT)
1155  {
1156  /*
1157  * We didn't receive anything new. If we haven't heard anything
1158  * from the server for more than wal_receiver_timeout / 2, ping
1159  * the server. Also, if it's been longer than
1160  * wal_receiver_status_interval since the last update we sent,
1161  * send a status update to the master anyway, to report any
1162  * progress in applying WAL.
1163  */
1164  bool requestReply = false;
1165 
1166  /*
1167  * Check if time since last receive from standby has reached the
1168  * configured limit.
1169  */
1170  if (wal_receiver_timeout > 0)
1171  {
1173  TimestampTz timeout;
1174 
1175  timeout =
1176  TimestampTzPlusMilliseconds(last_recv_timestamp,
1178 
1179  if (now >= timeout)
1180  ereport(ERROR,
1181  (errmsg("terminating logical replication worker due to timeout")));
1182 
1183  /*
1184  * We didn't receive anything new, for half of receiver
1185  * replication timeout. Ping the server.
1186  */
1187  if (!ping_sent)
1188  {
1189  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1190  (wal_receiver_timeout / 2));
1191  if (now >= timeout)
1192  {
1193  requestReply = true;
1194  ping_sent = true;
1195  }
1196  }
1197  }
1198 
1199  send_feedback(last_received, requestReply, requestReply);
1200  }
1201 
1203  }
1204 }
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1213
WalReceiverConn * wrconn
Definition: worker.c:107
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:254
uint32 TimeLineID
Definition: xlogdefs.h:45
void AcceptInvalidationMessages(void)
Definition: inval.c:679
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:477
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
int64 timestamp
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:979
static void reread_subscription(void)
Definition: worker.c:1303
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:256
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void proc_exit(int code)
Definition: ipc.c:99
static volatile sig_atomic_t got_SIGHUP
Definition: autovacuum.c:140
#define WL_SOCKET_READABLE
Definition: latch.h:125
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
#define LOG
Definition: elog.h:26
static int fd(const char *x, int i)
Definition: preproc-init.c:105
int wal_receiver_timeout
Definition: walreceiver.c:75
Latch procLatch
Definition: proc.h:103
#define ERROR
Definition: elog.h:43
#define NAPTIME_PER_CYCLE
Definition: worker.c:87
bool in_remote_transaction
Definition: worker.c:112
static volatile sig_atomic_t got_SIGTERM
Definition: autovacuum.c:142
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
char * c
static char * buf
Definition: pg_test_fsync.c:66
int pgsocket
Definition: port.h:22
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext TopMemoryContext
Definition: mcxt.c:43
Definition: guc.h:72
MemoryContext ApplyContext
Definition: worker.c:105
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
#define PGINVALID_SOCKET
Definition: port.h:24
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
static void apply_dispatch(StringInfo s)
Definition: worker.c:857
static MemoryContext ApplyMessageContext
Definition: worker.c:104
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
bool MySubscriptionValid
Definition: worker.c:110
int errmsg(const char *fmt,...)
Definition: elog.c:797
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define WL_LATCH_SET
Definition: latch.h:124
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void reread_subscription ( void  )
static

Definition at line 1303 of file worker.c.

References Assert, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, Subscription::enabled, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, newsub(), PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), LogicalRepWorker::subid, Subscription::synccommit, and walrcv_disconnect.

Referenced by ensure_transaction(), and LogicalRepApplyLoop().

1304 {
1305  MemoryContext oldctx;
1307  bool started_tx = false;
1308 
1309  /* This function might be called inside or outside of transaction. */
1310  if (!IsTransactionState())
1311  {
1313  started_tx = true;
1314  }
1315 
1316  /* Ensure allocations in permanent context. */
1318 
1319  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1320 
1321  /*
1322  * Exit if the subscription was removed. This normally should not happen
1323  * as the worker gets killed during DROP SUBSCRIPTION.
1324  */
1325  if (!newsub)
1326  {
1327  ereport(LOG,
1328  (errmsg("logical replication worker for subscription \"%s\" will "
1329  "stop because the subscription was removed",
1330  MySubscription->name)));
1331 
1333  proc_exit(0);
1334  }
1335 
1336  /*
1337  * Exit if the subscription was disabled. This normally should not happen
1338  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1339  */
1340  if (!newsub->enabled)
1341  {
1342  ereport(LOG,
1343  (errmsg("logical replication worker for subscription \"%s\" will "
1344  "stop because the subscription was disabled",
1345  MySubscription->name)));
1346 
1348  proc_exit(0);
1349  }
1350 
1351  /*
1352  * Exit if connection string was changed. The launcher will start new
1353  * worker.
1354  */
1355  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1356  {
1357  ereport(LOG,
1358  (errmsg("logical replication worker for subscription \"%s\" will "
1359  "restart because the connection information was changed",
1360  MySubscription->name)));
1361 
1363  proc_exit(0);
1364  }
1365 
1366  /*
1367  * Exit if subscription name was changed (it's used for
1368  * fallback_application_name). The launcher will start new worker.
1369  */
1370  if (strcmp(newsub->name, MySubscription->name) != 0)
1371  {
1372  ereport(LOG,
1373  (errmsg("logical replication worker for subscription \"%s\" will "
1374  "restart because subscription was renamed",
1375  MySubscription->name)));
1376 
1378  proc_exit(0);
1379  }
1380 
1381  /* !slotname should never happen when enabled is true. */
1382  Assert(newsub->slotname);
1383 
1384  /*
1385  * We need to make new connection to new slot if slot name has changed so
1386  * exit here as well if that's the case.
1387  */
1388  if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1389  {
1390  ereport(LOG,
1391  (errmsg("logical replication worker for subscription \"%s\" will "
1392  "restart because the replication slot name was changed",
1393  MySubscription->name)));
1394 
1396  proc_exit(0);
1397  }
1398 
1399  /*
1400  * Exit if publication list was changed. The launcher will start new
1401  * worker.
1402  */
1404  {
1405  ereport(LOG,
1406  (errmsg("logical replication worker for subscription \"%s\" will "
1407  "restart because subscription's publications were changed",
1408  MySubscription->name)));
1409 
1411  proc_exit(0);
1412  }
1413 
1414  /* Check for other changes that should never happen too. */
1415  if (newsub->dbid != MySubscription->dbid)
1416  {
1417  elog(ERROR, "subscription %u changed unexpectedly",
1419  }
1420 
1421  /* Clean old subscription info and switch to new one. */
1424 
1425  MemoryContextSwitchTo(oldctx);
1426 
1427  /* Change synchronous commit according to the user's wishes */
1428  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1430 
1431  if (started_tx)
1433 
1434  MySubscriptionValid = true;
1435 }
Subscription * MySubscription
Definition: worker.c:109
WalReceiverConn * wrconn
Definition: worker.c:107
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2962
void CommitTransactionCommand(void)
Definition: xact.c:2749
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void proc_exit(int code)
Definition: ipc.c:99
#define LOG
Definition: elog.h:26
Subscription * GetSubscription(Oid subid, bool missing_ok)
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
#define ERROR
Definition: elog.h:43
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6672
List * publications
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext ApplyContext
Definition: worker.c:105
#define Assert(condition)
Definition: c.h:675
void StartTransactionCommand(void)
Definition: xact.c:2679
bool IsTransactionState(void)
Definition: xact.c:350
#define walrcv_disconnect(conn)
Definition: walreceiver.h:264
bool MySubscriptionValid
Definition: worker.c:110
void FreeSubscription(Subscription *sub)
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define elog
Definition: elog.h:219
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 1213 of file worker.c.

References StringInfoData::data, DEBUG2, elog, get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, StringInfoData::len, makeStringInfo(), MemoryContextSwitchTo(), now(), NULL, pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

1214 {
1215  static StringInfo reply_message = NULL;
1216  static TimestampTz send_time = 0;
1217 
1218  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1219  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1220  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1221 
1222  XLogRecPtr writepos;
1223  XLogRecPtr flushpos;
1224  TimestampTz now;
1225  bool have_pending_txes;
1226 
1227  /*
1228  * If the user doesn't want status to be reported to the publisher, be
1229  * sure to exit before doing anything at all.
1230  */
1231  if (!force && wal_receiver_status_interval <= 0)
1232  return;
1233 
1234  /* It's legal to not pass a recvpos */
1235  if (recvpos < last_recvpos)
1236  recvpos = last_recvpos;
1237 
1238  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1239 
1240  /*
1241  * No outstanding transactions to flush, we can report the latest received
1242  * position. This is important for synchronous replication.
1243  */
1244  if (!have_pending_txes)
1245  flushpos = writepos = recvpos;
1246 
1247  if (writepos < last_writepos)
1248  writepos = last_writepos;
1249 
1250  if (flushpos < last_flushpos)
1251  flushpos = last_flushpos;
1252 
1253  now = GetCurrentTimestamp();
1254 
1255  /* if we've already reported everything we're good */
1256  if (!force &&
1257  writepos == last_writepos &&
1258  flushpos == last_flushpos &&
1259  !TimestampDifferenceExceeds(send_time, now,
1261  return;
1262  send_time = now;
1263 
1264  if (!reply_message)
1265  {
1267 
1268  reply_message = makeStringInfo();
1269  MemoryContextSwitchTo(oldctx);
1270  }
1271  else
1272  resetStringInfo(reply_message);
1273 
1274  pq_sendbyte(reply_message, 'r');
1275  pq_sendint64(reply_message, recvpos); /* write */
1276  pq_sendint64(reply_message, flushpos); /* flush */
1277  pq_sendint64(reply_message, writepos); /* apply */
1278  pq_sendint64(reply_message, now); /* sendTime */
1279  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1280 
1281  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1282  force,
1283  (uint32) (recvpos >> 32), (uint32) recvpos,
1284  (uint32) (writepos >> 32), (uint32) writepos,
1285  (uint32) (flushpos >> 32), (uint32) flushpos
1286  );
1287 
1288  walrcv_send(wrconn, reply_message->data, reply_message->len);
1289 
1290  if (recvpos > last_recvpos)
1291  last_recvpos = recvpos;
1292  if (writepos > last_writepos)
1293  last_writepos = writepos;
1294  if (flushpos > last_flushpos)
1295  last_flushpos = flushpos;
1296 }
WalReceiverConn * wrconn
Definition: worker.c:107
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
StringInfo makeStringInfo(void)
Definition: stringinfo.c:28
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int wal_receiver_status_interval
Definition: walreceiver.c:74
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:916
static StringInfoData reply_message
Definition: walreceiver.c:111
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:268
MemoryContext ApplyContext
Definition: worker.c:105
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:258
#define elog
Definition: elog.h:219
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 135 of file worker.c.

References am_tablesync_worker(), LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, LogicalRepRelMapEntry::statelsn, SUBREL_STATE_READY, and SUBREL_STATE_SYNCDONE.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

136 {
137  if (am_tablesync_worker())
138  return MyLogicalRepWorker->relid == rel->localreloid;
139  else
140  return (rel->state == SUBREL_STATE_READY ||
141  (rel->state == SUBREL_STATE_SYNCDONE &&
142  rel->statelsn <= remote_final_lsn));
143 }
static XLogRecPtr remote_final_lsn
Definition: worker.c:113
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
static bool am_tablesync_worker(void)
#define SUBREL_STATE_SYNCDONE
#define SUBREL_STATE_READY
static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 217 of file worker.c.

References LogicalRepRelMapEntry::attrmap, tupleDesc::attrs, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, LogicalRepRelation::natts, tupleDesc::natts, NULL, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, and TupleTableSlot::tts_values.

Referenced by apply_handle_insert().

219 {
220  TupleDesc desc = RelationGetDescr(rel->localrel);
221  int num_phys_attrs = desc->natts;
222  int i;
223  int attnum,
224  num_defaults = 0;
225  int *defmap;
226  ExprState **defexprs;
227  ExprContext *econtext;
228 
229  econtext = GetPerTupleExprContext(estate);
230 
231  /* We got all the data via replication, no need to evaluate anything. */
232  if (num_phys_attrs == rel->remoterel.natts)
233  return;
234 
235  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
236  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
237 
238  for (attnum = 0; attnum < num_phys_attrs; attnum++)
239  {
240  Expr *defexpr;
241 
242  if (desc->attrs[attnum]->attisdropped)
243  continue;
244 
245  if (rel->attrmap[attnum] >= 0)
246  continue;
247 
248  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
249 
250  if (defexpr != NULL)
251  {
252  /* Run the expression through planner */
253  defexpr = expression_planner(defexpr);
254 
255  /* Initialize executable expression in copycontext */
256  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
257  defmap[num_defaults] = attnum;
258  num_defaults++;
259  }
260 
261  }
262 
263  for (i = 0; i < num_defaults; i++)
264  slot->tts_values[defmap[i]] =
265  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
266 }
#define RelationGetDescr(relation)
Definition: rel.h:429
Expr * expression_planner(Expr *expr)
Definition: planner.c:5931
Form_pg_attribute * attrs
Definition: tupdesc.h:74
Datum * tts_values
Definition: tuptable.h:125
int natts
Definition: tupdesc.h:73
#define GetPerTupleExprContext(estate)
Definition: executor.h:456
LogicalRepRelation remoterel
bool * tts_isnull
Definition: tuptable.h:126
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:266
Node * build_column_default(Relation rel, int attrno)
#define NULL
Definition: c.h:229
void * palloc(Size size)
Definition: mcxt.c:849
int i
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:113
static void slot_modify_cstrings ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
char **  values,
bool replaces 
)
static

Definition at line 361 of file worker.c.

References ErrorContextCallback::arg, SlotErrCallbackArg::attnum, LogicalRepRelMapEntry::attrmap, tupleDesc::attrs, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, tupleDesc::natts, NULL, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, LogicalRepRelMapEntry::remoterel, slot_getallattrs(), slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, and TupleTableSlot::tts_values.

Referenced by apply_handle_update().

363 {
364  int natts = slot->tts_tupleDescriptor->natts;
365  int i;
366  SlotErrCallbackArg errarg;
367  ErrorContextCallback errcallback;
368 
369  slot_getallattrs(slot);
370  ExecClearTuple(slot);
371 
372  /* Push callback + info on the error context stack */
373  errarg.rel = &rel->remoterel;
374  errarg.attnum = -1;
375  errcallback.callback = slot_store_error_callback;
376  errcallback.arg = (void *) &errarg;
377  errcallback.previous = error_context_stack;
378  error_context_stack = &errcallback;
379 
380  /* Call the "in" function for each replaced attribute */
381  for (i = 0; i < natts; i++)
382  {
384  int remoteattnum = rel->attrmap[i];
385 
386  if (remoteattnum >= 0 && !replaces[remoteattnum])
387  continue;
388 
389  if (remoteattnum >= 0 && values[remoteattnum] != NULL)
390  {
391  Oid typinput;
392  Oid typioparam;
393 
394  errarg.attnum = remoteattnum;
395 
396  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
397  slot->tts_values[i] = OidInputFunctionCall(typinput, values[i],
398  typioparam,
399  att->atttypmod);
400  slot->tts_isnull[i] = false;
401  }
402  else
403  {
404  slot->tts_values[i] = (Datum) 0;
405  slot->tts_isnull[i] = true;
406  }
407  }
408 
409  /* Pop the error context stack */
410  error_context_stack = errcallback.previous;
411 
412  ExecStoreVirtualTuple(slot);
413 }
LogicalRepRelation * rel
Definition: worker.c:100
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
Form_pg_attribute * attrs
Definition: tupdesc.h:74
Datum * tts_values
Definition: tuptable.h:125
unsigned int Oid
Definition: postgres_ext.h:31
struct ErrorContextCallback * previous
Definition: elog.h:238
int natts
Definition: tupdesc.h:73
ErrorContextCallback * error_context_stack
Definition: elog.c:88
LogicalRepRelation remoterel
bool * tts_isnull
Definition: tuptable.h:126
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2599
void slot_getallattrs(TupleTableSlot *slot)
Definition: heaptuple.c:1237
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
uintptr_t Datum
Definition: postgres.h:372
#define NULL
Definition: c.h:229
static void slot_store_error_callback(void *arg)
Definition: worker.c:272
static Datum values[MAXATTR]
Definition: bootstrap.c:163
void(* callback)(void *arg)
Definition: elog.h:239
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1738
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:488
static void slot_store_cstrings ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
char **  values 
)
static

Definition at line 297 of file worker.c.

References ErrorContextCallback::arg, SlotErrCallbackArg::attnum, LogicalRepRelMapEntry::attrmap, tupleDesc::attrs, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, tupleDesc::natts, NULL, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, LogicalRepRelMapEntry::remoterel, slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, and TupleTableSlot::tts_values.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

299 {
300  int natts = slot->tts_tupleDescriptor->natts;
301  int i;
302  SlotErrCallbackArg errarg;
303  ErrorContextCallback errcallback;
304 
305  ExecClearTuple(slot);
306 
307  /* Push callback + info on the error context stack */
308  errarg.rel = &rel->remoterel;
309  errarg.attnum = -1;
310  errcallback.callback = slot_store_error_callback;
311  errcallback.arg = (void *) &errarg;
312  errcallback.previous = error_context_stack;
313  error_context_stack = &errcallback;
314 
315  /* Call the "in" function for each non-dropped attribute */
316  for (i = 0; i < natts; i++)
317  {
319  int remoteattnum = rel->attrmap[i];
320 
321  if (!att->attisdropped && remoteattnum >= 0 &&
322  values[remoteattnum] != NULL)
323  {
324  Oid typinput;
325  Oid typioparam;
326 
327  errarg.attnum = remoteattnum;
328 
329  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
330  slot->tts_values[i] = OidInputFunctionCall(typinput,
331  values[remoteattnum],
332  typioparam,
333  att->atttypmod);
334  slot->tts_isnull[i] = false;
335  }
336  else
337  {
338  /*
339  * We assign NULL to dropped attributes, NULL values, and missing
340  * values (missing values should be later filled using
341  * slot_fill_defaults).
342  */
343  slot->tts_values[i] = (Datum) 0;
344  slot->tts_isnull[i] = true;
345  }
346  }
347 
348  /* Pop the error context stack */
349  error_context_stack = errcallback.previous;
350 
351  ExecStoreVirtualTuple(slot);
352 }
LogicalRepRelation * rel
Definition: worker.c:100
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
Form_pg_attribute * attrs
Definition: tupdesc.h:74
Datum * tts_values
Definition: tuptable.h:125
unsigned int Oid
Definition: postgres_ext.h:31
struct ErrorContextCallback * previous
Definition: elog.h:238
int natts
Definition: tupdesc.h:73
ErrorContextCallback * error_context_stack
Definition: elog.c:88
LogicalRepRelation remoterel
bool * tts_isnull
Definition: tuptable.h:126
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2599
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
uintptr_t Datum
Definition: postgres.h:372
#define NULL
Definition: c.h:229
static void slot_store_error_callback(void *arg)
Definition: worker.c:272
static Datum values[MAXATTR]
Definition: bootstrap.c:163
void(* callback)(void *arg)
Definition: elog.h:239
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1738
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:488
static void slot_store_error_callback ( void *  arg)
static

Definition at line 272 of file worker.c.

References LogicalRepRelation::attnames, SlotErrCallbackArg::attnum, LogicalRepRelation::atttyps, errcontext, format_type_be(), logicalrep_typmap_getid(), LogicalRepRelation::nspname, SlotErrCallbackArg::rel, and LogicalRepRelation::relname.

Referenced by slot_modify_cstrings(), and slot_store_cstrings().

273 {
275  Oid remotetypoid,
276  localtypoid;
277 
278  if (errarg->attnum < 0)
279  return;
280 
281  remotetypoid = errarg->rel->atttyps[errarg->attnum];
282  localtypoid = logicalrep_typmap_getid(remotetypoid);
283  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
284  "remote type %s, local type %s",
285  errarg->rel->nspname, errarg->rel->relname,
286  errarg->rel->attnames[errarg->attnum],
287  format_type_be(remotetypoid),
288  format_type_be(localtypoid));
289 }
LogicalRepRelation * rel
Definition: worker.c:100
char * format_type_be(Oid type_oid)
Definition: format_type.c:94
Oid logicalrep_typmap_getid(Oid remoteid)
Definition: relation.c:437
unsigned int Oid
Definition: postgres_ext.h:31
#define errcontext
Definition: elog.h:164
void * arg
static void store_flush_position ( XLogRecPtr  remote_lsn)
static

Definition at line 960 of file worker.c.

References dlist_push_tail(), FlushPosition::local_end, MemoryContextSwitchTo(), FlushPosition::node, palloc(), FlushPosition::remote_end, and XactLastCommitEnd.

Referenced by apply_handle_commit().

961 {
962  FlushPosition *flushpos;
963 
964  /* Need to do this in permanent context */
966 
967  /* Track commit lsn */
968  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
969  flushpos->local_end = XactLastCommitEnd;
970  flushpos->remote_end = remote_lsn;
971 
972  dlist_push_tail(&lsn_mapping, &flushpos->node);
974 }
static dlist_head lsn_mapping
Definition: worker.c:96
dlist_node node
Definition: worker.c:91
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:338
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
XLogRecPtr remote_end
Definition: worker.c:93
MemoryContext ApplyContext
Definition: worker.c:105
XLogRecPtr local_end
Definition: worker.c:92
static MemoryContext ApplyMessageContext
Definition: worker.c:104
void * palloc(Size size)
Definition: mcxt.c:849
static void subscription_change_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1441 of file worker.c.

References MySubscriptionValid.

Referenced by ApplyWorkerMain().

1442 {
1443  MySubscriptionValid = false;
1444 }
bool MySubscriptionValid
Definition: worker.c:110
static void UpdateWorkerStats ( XLogRecPtr  last_lsn,
TimestampTz  send_time,
bool  reply 
)
static

Definition at line 979 of file worker.c.

References GetCurrentTimestamp(), LogicalRepWorker::last_lsn, last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, MyLogicalRepWorker, LogicalRepWorker::reply_lsn, and LogicalRepWorker::reply_time.

Referenced by LogicalRepApplyLoop().

980 {
982  MyLogicalRepWorker->last_send_time = send_time;
984  if (reply)
985  {
987  MyLogicalRepWorker->reply_time = send_time;
988  }
989 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
TimestampTz last_send_time
XLogRecPtr last_lsn
XLogRecPtr last_lsn
Definition: walsender.c:210
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
XLogRecPtr reply_lsn
TimestampTz last_recv_time
TimestampTz reply_time

Variable Documentation

MemoryContext ApplyContext = NULL

Definition at line 105 of file worker.c.

MemoryContext ApplyMessageContext = NULL
static

Definition at line 104 of file worker.c.

bool in_remote_transaction = false
dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
static

Definition at line 96 of file worker.c.

Subscription* MySubscription = NULL
bool MySubscriptionValid = false
XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
static

Definition at line 113 of file worker.c.

Referenced by apply_handle_begin(), apply_handle_commit(), and should_apply_changes_for_rel().