PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
worker.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "funcapi.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/namespace.h"
#include "catalog/pg_subscription.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "nodes/makefuncs.h"
#include "optimizer/planner.h"
#include "parser/parse_relation.h"
#include "postmaster/bgworker.h"
#include "postmaster/postmaster.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/reorderbuffer.h"
#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/timeout.h"
#include "utils/tqual.h"
#include "utils/syscache.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  SlotErrCallbackArg
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct SlotErrCallbackArg SlotErrCallbackArg
 

Functions

static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void store_flush_position (XLogRecPtr remote_lsn)
 
static void reread_subscription (void)
 
static bool ensure_transaction (void)
 
static EStatecreate_estate_for_relation (LogicalRepRelMapEntry *rel)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_error_callback (void *arg)
 
static void slot_store_cstrings (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
 
static void slot_modify_cstrings (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static Oid GetRelationIdentityOrPK (Relation rel)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void ApplyLoop (void)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
void ApplyWorkerMain (Datum main_arg)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static MemoryContext ApplyContext = NULL
 
static MemoryContext ApplyCacheContext = NULL
 
WalReceiverConnwrconn = NULL
 
SubscriptionMySubscription = NULL
 
bool MySubscriptionValid = false
 
bool in_remote_transaction = false
 

Macro Definition Documentation

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 86 of file worker.c.

Referenced by ApplyLoop().

Typedef Documentation

Function Documentation

static void apply_dispatch ( StringInfo  s)
static

Definition at line 777 of file worker.c.

References apply_handle_begin(), apply_handle_commit(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_relation(), apply_handle_type(), apply_handle_update(), ereport, errcode(), errmsg(), ERROR, and pq_getmsgbyte().

Referenced by ApplyLoop().

778 {
779  char action = pq_getmsgbyte(s);
780 
781  switch (action)
782  {
783  /* BEGIN */
784  case 'B':
786  break;
787  /* COMMIT */
788  case 'C':
790  break;
791  /* INSERT */
792  case 'I':
794  break;
795  /* UPDATE */
796  case 'U':
798  break;
799  /* DELETE */
800  case 'D':
802  break;
803  /* RELATION */
804  case 'R':
806  break;
807  /* TYPE */
808  case 'Y':
810  break;
811  /* ORIGIN */
812  case 'O':
814  break;
815  default:
816  ereport(ERROR,
817  (errcode(ERRCODE_PROTOCOL_VIOLATION),
818  errmsg("invalid logical replication message type %c", action)));
819  }
820 }
static void apply_handle_type(StringInfo s)
Definition: worker.c:472
static void apply_handle_insert(StringInfo s)
Definition: worker.c:502
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
static void apply_handle_delete(StringInfo s)
Definition: worker.c:691
static void apply_handle_begin(StringInfo s)
Definition: worker.c:389
#define ereport(elevel, rest)
Definition: elog.h:122
static void apply_handle_commit(StringInfo s)
Definition: worker.c:409
static void apply_handle_update(StringInfo s)
Definition: worker.c:583
static void apply_handle_relation(StringInfo s)
Definition: worker.c:457
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
static void apply_handle_origin(StringInfo s)
Definition: worker.c:436
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void apply_handle_begin ( StringInfo  s)
static

Definition at line 389 of file worker.c.

References LogicalRepBeginData::committime, LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), NULL, pgstat_report_activity(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, and STATE_RUNNING.

Referenced by apply_dispatch().

390 {
391  LogicalRepBeginData begin_data;
392 
393  logicalrep_read_begin(s, &begin_data);
394 
397 
398  in_remote_transaction = true;
399 
401 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2805
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:57
bool in_remote_transaction
Definition: worker.c:111
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:151
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:152
XLogRecPtr final_lsn
Definition: logicalproto.h:65
#define NULL
Definition: c.h:226
TimestampTz committime
Definition: logicalproto.h:66
static void apply_handle_commit ( StringInfo  s)
static

Definition at line 409 of file worker.c.

References Assert, LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, in_remote_transaction, IsTransactionState(), logicalrep_read_commit(), NULL, pgstat_report_activity(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, STATE_IDLE, and store_flush_position().

Referenced by apply_dispatch().

410 {
411  LogicalRepCommitData commit_data;
412 
413  logicalrep_read_commit(s, &commit_data);
414 
417 
418  if (IsTransactionState())
419  {
421 
422  store_flush_position(commit_data.end_lsn);
423  }
424 
425  in_remote_transaction = false;
426 
428 }
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:880
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2805
void CommitTransactionCommand(void)
Definition: xact.c:2745
bool in_remote_transaction
Definition: worker.c:111
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:151
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:152
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
bool IsTransactionState(void)
Definition: xact.c:349
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:92
XLogRecPtr commit_lsn
Definition: logicalproto.h:72
TimestampTz committime
Definition: logicalproto.h:74
static void apply_handle_delete ( StringInfo  s)
static

Definition at line 691 of file worker.c.

References Assert, check_relation_updatable(), CommandCounterIncrement(), create_estate_for_relation(), DEBUG1, ensure_transaction(), ereport, errmsg(), EState::es_result_relation_info, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSetSlotDescriptor(), ExecSimpleRelationDelete(), FreeExecutorState(), GetPerTupleMemoryContext, GetRelationIdentityOrPK(), GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, LockTupleExclusive, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NIL, NoLock, NULL, OidIsValid, PopActiveSnapshot(), PushActiveSnapshot(), RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, REPLICA_IDENTITY_FULL, LogicalRepRelation::replident, RowExclusiveLock, slot_store_cstrings(), and LogicalRepTupleData::values.

Referenced by apply_dispatch().

692 {
694  LogicalRepTupleData oldtup;
695  LogicalRepRelId relid;
696  Oid idxoid;
697  EState *estate;
698  EPQState epqstate;
699  TupleTableSlot *remoteslot;
700  TupleTableSlot *localslot;
701  bool found;
702  MemoryContext oldctx;
703 
705 
706  relid = logicalrep_read_delete(s, &oldtup);
708 
709  /* Check if we can do the delete. */
711 
712  /* Initialize the executor state. */
713  estate = create_estate_for_relation(rel);
714  remoteslot = ExecInitExtraTupleSlot(estate);
716  localslot = ExecInitExtraTupleSlot(estate);
718  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
719 
722 
723  /* Find the tuple using the replica identity index. */
725  slot_store_cstrings(remoteslot, rel, oldtup.values);
726  MemoryContextSwitchTo(oldctx);
727 
728  /*
729  * Try to find tuple using either replica identity index, primary key
730  * or if needed, sequential scan.
731  */
732  idxoid = GetRelationIdentityOrPK(rel->localrel);
733  Assert(OidIsValid(idxoid) ||
735 
736  if (OidIsValid(idxoid))
737  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
739  remoteslot, localslot);
740  else
742  remoteslot, localslot);
743  /* If found delete it. */
744  if (found)
745  {
746  EvalPlanQualSetSlot(&epqstate, localslot);
747 
748  /* Do the actual delete. */
749  ExecSimpleRelationDelete(estate, &epqstate, localslot);
750  }
751  else
752  {
753  /* The tuple to be deleted could not be found.*/
754  ereport(DEBUG1,
755  (errmsg("logical replication could not find row for delete "
756  "in replication target %s",
758  }
759 
760  /* Cleanup. */
763  EvalPlanQualEnd(&epqstate);
764  ExecResetTupleTable(estate->es_tupleTable, false);
765  FreeExecutorState(estate);
766 
768 
770 }
#define NIL
Definition: pg_list.h:69
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:151
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:486
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
#define DEBUG1
Definition: elog.h:25
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:277
#define RelationGetDescr(relation)
Definition: rel.h:425
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:367
void PopActiveSnapshot(void)
Definition: snapmgr.c:807
unsigned int Oid
Definition: postgres_ext.h:31
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:300
#define OidIsValid(objectId)
Definition: c.h:534
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3030
void FreeExecutorState(EState *estate)
Definition: execUtils.c:168
#define REPLICA_IDENTITY_FULL
Definition: pg_class.h:179
static bool ensure_transaction(void)
Definition: worker.c:125
LogicalRepRelation remoterel
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:267
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:728
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:433
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:230
#define ereport(elevel, rest)
Definition: elog.h:122
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:549
List * es_tupleTable
Definition: execnodes.h:399
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:156
void CommandCounterIncrement(void)
Definition: xact.c:921
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
void EvalPlanQualInit(EPQState *epqstate, EState *estate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2619
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:33
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:343
int errmsg(const char *fmt,...)
Definition: elog.c:797
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
uint32 LogicalRepRelId
Definition: logicalproto.h:37
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:228
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:384
static void apply_handle_insert ( StringInfo  s)
static

Definition at line 502 of file worker.c.

References CommandCounterIncrement(), create_estate_for_relation(), ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSetSlotDescriptor(), ExecSimpleRelationInsert(), FreeExecutorState(), GetPerTupleMemoryContext, GetTransactionSnapshot(), LogicalRepRelMapEntry::localrel, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, RowExclusiveLock, slot_fill_defaults(), slot_store_cstrings(), and LogicalRepTupleData::values.

Referenced by apply_dispatch().

503 {
505  LogicalRepTupleData newtup;
506  LogicalRepRelId relid;
507  EState *estate;
508  TupleTableSlot *remoteslot;
509  MemoryContext oldctx;
510 
512 
513  relid = logicalrep_read_insert(s, &newtup);
515 
516  /* Initialize the executor state. */
517  estate = create_estate_for_relation(rel);
518  remoteslot = ExecInitExtraTupleSlot(estate);
520 
521  /* Process and store remote tuple in the slot */
523  slot_store_cstrings(remoteslot, rel, newtup.values);
524  slot_fill_defaults(rel, estate, remoteslot);
525  MemoryContextSwitchTo(oldctx);
526 
529 
530  /* Do the insert. */
531  ExecSimpleRelationInsert(estate, remoteslot);
532 
533  /* Cleanup. */
536  ExecResetTupleTable(estate->es_tupleTable, false);
537  FreeExecutorState(estate);
538 
540 
542 }
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:151
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
#define RelationGetDescr(relation)
Definition: rel.h:425
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:367
void PopActiveSnapshot(void)
Definition: snapmgr.c:807
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:300
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:160
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:187
void FreeExecutorState(EState *estate)
Definition: execUtils.c:168
static bool ensure_transaction(void)
Definition: worker.c:125
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:267
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:728
#define RowExclusiveLock
Definition: lockdefs.h:38
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:230
List * es_tupleTable
Definition: execnodes.h:399
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:156
void CommandCounterIncrement(void)
Definition: xact.c:921
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:33
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:343
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
uint32 LogicalRepRelId
Definition: logicalproto.h:37
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:384
static void apply_handle_origin ( StringInfo  s)
static

Definition at line 436 of file worker.c.

References ereport, errcode(), errmsg(), ERROR, in_remote_transaction, and IsTransactionState().

Referenced by apply_dispatch().

437 {
438  /*
439  * ORIGIN message can only come inside remote transaction and before
440  * any actual writes.
441  */
443  ereport(ERROR,
444  (errcode(ERRCODE_PROTOCOL_VIOLATION),
445  errmsg("ORIGIN message sent out of order")));
446 }
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
bool in_remote_transaction
Definition: worker.c:111
#define ereport(elevel, rest)
Definition: elog.h:122
bool IsTransactionState(void)
Definition: xact.c:349
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void apply_handle_relation ( StringInfo  s)
static

Definition at line 457 of file worker.c.

References logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

458 {
459  LogicalRepRelation *rel;
460 
461  rel = logicalrep_read_rel(s);
463 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:324
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:165
static void apply_handle_type ( StringInfo  s)
static

Definition at line 472 of file worker.c.

References logicalrep_read_typ(), and logicalrep_typmap_update().

Referenced by apply_dispatch().

473 {
474  LogicalRepTyp typ;
475 
476  logicalrep_read_typ(s, &typ);
478 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:376
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:410
static void apply_handle_update ( StringInfo  s)
static

Definition at line 583 of file worker.c.

References Assert, LogicalRepTupleData::changed, check_relation_updatable(), CommandCounterIncrement(), create_estate_for_relation(), DEBUG1, elog, ensure_transaction(), EState::es_result_relation_info, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), ExecResetTupleTable(), ExecSetSlotDescriptor(), ExecSimpleRelationUpdate(), ExecStoreTuple(), FreeExecutorState(), GetPerTupleMemoryContext, GetRelationIdentityOrPK(), GetTransactionSnapshot(), InvalidBuffer, LogicalRepRelMapEntry::localrel, LockTupleExclusive, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NIL, NoLock, NULL, OidIsValid, PopActiveSnapshot(), PushActiveSnapshot(), RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, REPLICA_IDENTITY_FULL, LogicalRepRelation::replident, RowExclusiveLock, slot_modify_cstrings(), slot_store_cstrings(), TupleTableSlot::tts_tuple, and LogicalRepTupleData::values.

Referenced by apply_dispatch().

584 {
586  LogicalRepRelId relid;
587  Oid idxoid;
588  EState *estate;
589  EPQState epqstate;
590  LogicalRepTupleData oldtup;
591  LogicalRepTupleData newtup;
592  bool has_oldtup;
593  TupleTableSlot *localslot;
594  TupleTableSlot *remoteslot;
595  bool found;
596  MemoryContext oldctx;
597 
599 
600  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
601  &newtup);
603 
604  /* Check if we can do the update. */
606 
607  /* Initialize the executor state. */
608  estate = create_estate_for_relation(rel);
609  remoteslot = ExecInitExtraTupleSlot(estate);
611  localslot = ExecInitExtraTupleSlot(estate);
613  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
614 
617 
618  /* Build the search tuple. */
620  slot_store_cstrings(remoteslot, rel,
621  has_oldtup ? oldtup.values : newtup.values);
622  MemoryContextSwitchTo(oldctx);
623 
624  /*
625  * Try to find tuple using either replica identity index, primary key
626  * or if needed, sequential scan.
627  */
628  idxoid = GetRelationIdentityOrPK(rel->localrel);
629  Assert(OidIsValid(idxoid) ||
630  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
631 
632  if (OidIsValid(idxoid))
633  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
635  remoteslot, localslot);
636  else
638  remoteslot, localslot);
639 
640  ExecClearTuple(remoteslot);
641 
642  /*
643  * Tuple found.
644  *
645  * Note this will fail if there are other conflicting unique indexes.
646  */
647  if (found)
648  {
649  /* Process and store remote tuple in the slot */
651  ExecStoreTuple(localslot->tts_tuple, remoteslot, InvalidBuffer, false);
652  slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
653  MemoryContextSwitchTo(oldctx);
654 
655  EvalPlanQualSetSlot(&epqstate, remoteslot);
656 
657  /* Do the actual update. */
658  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
659  }
660  else
661  {
662  /*
663  * The tuple to be updated could not be found.
664  *
665  * TODO what to do here, change the log level to LOG perhaps?
666  */
667  elog(DEBUG1,
668  "logical replication did not find row for update "
669  "in replication target relation \"%s\"",
671  }
672 
673  /* Cleanup. */
676  EvalPlanQualEnd(&epqstate);
677  ExecResetTupleTable(estate->es_tupleTable, false);
678  FreeExecutorState(estate);
679 
681 
683 }
#define NIL
Definition: pg_list.h:69
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:151
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:486
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
#define DEBUG1
Definition: elog.h:25
#define RelationGetDescr(relation)
Definition: rel.h:425
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define InvalidBuffer
Definition: buf.h:25
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:367
void PopActiveSnapshot(void)
Definition: snapmgr.c:807
unsigned int Oid
Definition: postgres_ext.h:31
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:300
#define OidIsValid(objectId)
Definition: c.h:534
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3030
void FreeExecutorState(EState *estate)
Definition: execUtils.c:168
#define REPLICA_IDENTITY_FULL
Definition: pg_class.h:179
static bool ensure_transaction(void)
Definition: worker.c:125
LogicalRepRelation remoterel
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:267
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:728
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:433
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:230
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:549
List * es_tupleTable
Definition: execnodes.h:399
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:156
void CommandCounterIncrement(void)
Definition: xact.c:921
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
void EvalPlanQualInit(EPQState *epqstate, EState *estate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2619
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:211
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:33
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:343
static void slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:331
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
HeapTuple tts_tuple
Definition: tuptable.h:120
#define elog
Definition: elog.h:219
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
uint32 LogicalRepRelId
Definition: logicalproto.h:37
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:228
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:384
static void ApplyLoop ( void  )
static

Definition at line 915 of file worker.c.

References ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, ALLOCSET_DEFAULT_MINSIZE, AllocSetContextCreate(), apply_dispatch(), buf, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), StringInfoData::cursor, StringInfoData::data, endpos, ereport, errmsg(), ERROR, fd(), GetCurrentTimestamp(), got_SIGTERM, in_remote_transaction, InvalidXLogRecPtr, StringInfoData::len, LOG, StringInfoData::maxlen, MemoryContextResetAndDeleteChildren, MemoryContextSwitchTo(), MyProc, MySubscriptionValid, NAPTIME_PER_CYCLE, now(), NULL, PGINVALID_SOCKET, pgstat_report_activity(), pq_getmsgbyte(), pq_getmsgint64(), proc_exit(), PGPROC::procLatch, reread_subscription(), ResetLatch(), send_feedback(), StartTransactionCommand(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WAIT_EVENT_LOGICAL_APPLY_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, walrcv_receive, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by ApplyWorkerMain().

916 {
917  XLogRecPtr last_received = InvalidXLogRecPtr;
918 
919  /* Init the ApplyContext which we use for easier cleanup. */
921  "ApplyContext",
925 
926  /* mark as idle, before starting to loop */
928 
929  while (!got_SIGTERM)
930  {
932  int rc;
933  int len;
934  char *buf = NULL;
935  bool endofstream = false;
936  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
937  bool ping_sent = false;
938 
940 
941  len = walrcv_receive(wrconn, &buf, &fd);
942 
943  if (len != 0)
944  {
945  /* Process the data */
946  for (;;)
947  {
949 
950  if (len == 0)
951  {
952  break;
953  }
954  else if (len < 0)
955  {
956  ereport(LOG,
957  (errmsg("data stream from publisher has ended")));
958  endofstream = true;
959  break;
960  }
961  else
962  {
963  int c;
964  StringInfoData s;
965 
966  /* Reset timeout. */
967  last_recv_timestamp = GetCurrentTimestamp();
968  ping_sent = false;
969 
970  /* Ensure we are reading the data into our memory context. */
972 
973  s.data = buf;
974  s.len = len;
975  s.cursor = 0;
976  s.maxlen = -1;
977 
978  c = pq_getmsgbyte(&s);
979 
980  if (c == 'w')
981  {
982  XLogRecPtr start_lsn;
983  XLogRecPtr end_lsn;
984  TimestampTz send_time;
985 
986  start_lsn = pq_getmsgint64(&s);
987  end_lsn = pq_getmsgint64(&s);
988  send_time = pq_getmsgint64(&s);
989 
990  if (last_received < start_lsn)
991  last_received = start_lsn;
992 
993  if (last_received < end_lsn)
994  last_received = end_lsn;
995 
996  UpdateWorkerStats(last_received, send_time, false);
997 
998  apply_dispatch(&s);
999  }
1000  else if (c == 'k')
1001  {
1004  bool reply_requested;
1005 
1006  endpos = pq_getmsgint64(&s);
1007  timestamp = pq_getmsgint64(&s);
1008  reply_requested = pq_getmsgbyte(&s);
1009 
1010  send_feedback(endpos, reply_requested, false);
1011  UpdateWorkerStats(last_received, timestamp, true);
1012  }
1013  /* other message types are purposefully ignored */
1014  }
1015 
1016  len = walrcv_receive(wrconn, &buf, &fd);
1017  }
1018  }
1019 
1020  if (!in_remote_transaction)
1021  {
1022  /*
1023  * If we didn't get any transactions for a while there might be
1024  * unconsumed invalidation messages in the queue, consume them now.
1025  */
1027  /* Check for subscription change */
1028  if (!MySubscriptionValid)
1031  }
1032 
1033  /* confirm all writes at once */
1034  send_feedback(last_received, false, false);
1035 
1036  /* Cleanup the memory. */
1039 
1040  /* Check if we need to exit the streaming loop. */
1041  if (endofstream)
1042  break;
1043 
1044  /*
1045  * Wait for more data or latch.
1046  */
1050  fd, NAPTIME_PER_CYCLE,
1052 
1053  /* Emergency bailout if postmaster has died */
1054  if (rc & WL_POSTMASTER_DEATH)
1055  proc_exit(1);
1056 
1057  if (rc & WL_TIMEOUT)
1058  {
1059  /*
1060  * We didn't receive anything new. If we haven't heard
1061  * anything from the server for more than
1062  * wal_receiver_timeout / 2, ping the server. Also, if
1063  * it's been longer than wal_receiver_status_interval
1064  * since the last update we sent, send a status update to
1065  * the master anyway, to report any progress in applying
1066  * WAL.
1067  */
1068  bool requestReply = false;
1069 
1070  /*
1071  * Check if time since last receive from standby has
1072  * reached the configured limit.
1073  */
1074  if (wal_receiver_timeout > 0)
1075  {
1077  TimestampTz timeout;
1078 
1079  timeout =
1080  TimestampTzPlusMilliseconds(last_recv_timestamp,
1082 
1083  if (now >= timeout)
1084  ereport(ERROR,
1085  (errmsg("terminating logical replication worker due to timeout")));
1086 
1087  /*
1088  * We didn't receive anything new, for half of
1089  * receiver replication timeout. Ping the server.
1090  */
1091  if (!ping_sent)
1092  {
1093  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1094  (wal_receiver_timeout / 2));
1095  if (now >= timeout)
1096  {
1097  requestReply = true;
1098  ping_sent = true;
1099  }
1100  }
1101  }
1102 
1103  send_feedback(last_received, requestReply, requestReply);
1104  }
1105 
1107  }
1108 }
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1117
WalReceiverConn * wrconn
Definition: worker.c:106
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define WL_TIMEOUT
Definition: latch.h:127
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2805
int64 timestamp
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
void CommitTransactionCommand(void)
Definition: xact.c:2745
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:899
static void reread_subscription(void)
Definition: worker.c:1206
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:223
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void proc_exit(int code)
Definition: ipc.c:99
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ResetLatch(volatile Latch *latch)
Definition: latch.c:462
#define LOG
Definition: elog.h:26
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:142
int wal_receiver_timeout
Definition: walreceiver.c:74
Latch procLatch
Definition: proc.h:93
static XLogRecPtr endpos
#define ERROR
Definition: elog.h:43
#define NAPTIME_PER_CYCLE
Definition: worker.c:86
bool in_remote_transaction
Definition: worker.c:111
static volatile sig_atomic_t got_SIGTERM
Definition: autovacuum.c:141
char * c
static char * buf
Definition: pg_test_fsync.c:65
int pgsocket
Definition: port.h:22
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext TopMemoryContext
Definition: mcxt.c:43
static MemoryContext ApplyContext
Definition: worker.c:103
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:88
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
#define PGINVALID_SOCKET
Definition: port.h:24
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
static void apply_dispatch(StringInfo s)
Definition: worker.c:777
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:321
void StartTransactionCommand(void)
Definition: xact.c:2675
bool MySubscriptionValid
Definition: worker.c:109
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:143
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:144
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define WL_LATCH_SET
Definition: latch.h:124
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
void ApplyWorkerMain ( Datum  main_arg)

Definition at line 1308 of file worker.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), ApplyLoop(), Assert, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), BackgroundWorker::bgw_name, CacheMemoryContext, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, CreateCacheMemoryContext(), CurrentResourceOwner, DatumGetObjectId, LogicalRepWorker::dbid, DEBUG1, elog, Subscription::enabled, ereport, errmsg(), ERROR, GetCurrentTimestamp(), GetSubscription(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_VERSION_NUM, logicalrep_worker_attach(), logicalrep_worker_sigterm(), MemoryContextSwitchTo(), MyBgworkerEntry, MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, NULL, Subscription::oid, OidIsValid, options, PGC_S_OVERRIDE, PGC_S_SESSION, PGC_SUSET, PGC_USERSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, Subscription::publications, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, ResourceOwnerCreate(), server_version, SetConfigOption(), Subscription::slotname, WalRcvStreamOptions::slotname, snprintf(), WalRcvStreamOptions::startpoint, StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, LogicalRepWorker::userid, walrcv_connect, walrcv_disconnect, walrcv_identify_system, and walrcv_startstreaming.

Referenced by logicalrep_worker_launch().

1309 {
1310  int worker_slot = DatumGetObjectId(main_arg);
1311  MemoryContext oldctx;
1312  char originname[NAMEDATALEN];
1313  RepOriginId originid;
1314  XLogRecPtr origin_startpos;
1315  char *err;
1316  int server_version;
1317  TimeLineID startpointTLI;
1319 
1320  /* Attach to slot */
1321  logicalrep_worker_attach(worker_slot);
1322 
1323  /* Setup signal handling */
1326 
1327  /* Initialise stats to a sanish value */
1330 
1331  /* Make it easy to identify our processes. */
1332  SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
1334 
1335  /* Load the libpq-specific functions */
1336  load_file("libpqwalreceiver", false);
1337 
1340  "logical replication apply");
1341 
1342  /* Run as replica session replication role. */
1343  SetConfigOption("session_replication_role", "replica",
1345 
1346  /* Connect to our database. */
1349 
1350  /* Load the subscription into persistent memory context. */
1353  "ApplyCacheContext",
1358  MySubscriptionValid = true;
1359  MemoryContextSwitchTo(oldctx);
1360 
1361  if (!MySubscription->enabled)
1362  {
1363  ereport(LOG,
1364  (errmsg("logical replication worker for subscription \"%s\" will not "
1365  "start because the subscription was disabled during startup",
1366  MySubscription->name)));
1367 
1368  proc_exit(0);
1369  }
1370 
1371  /* Keep us informed about subscription changes. */
1374  (Datum) 0);
1375 
1376  ereport(LOG,
1377  (errmsg("logical replication apply for subscription \"%s\" has started",
1378  MySubscription->name)));
1379 
1380  /* Setup replication origin tracking. */
1381  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1382  originid = replorigin_by_name(originname, true);
1383  if (!OidIsValid(originid))
1384  originid = replorigin_create(originname);
1385  replorigin_session_setup(originid);
1386  replorigin_session_origin = originid;
1387  origin_startpos = replorigin_session_get_progress(false);
1388 
1390 
1391  /* Connect to the origin and start the replication. */
1392  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1395  MySubscription->name, &err);
1396  if (wrconn == NULL)
1397  ereport(ERROR,
1398  (errmsg("could not connect to the publisher: %s", err)));
1399 
1400  /*
1401  * We don't really use the output identify_system for anything
1402  * but it does some initializations on the upstream so let's still
1403  * call it.
1404  */
1405  (void) walrcv_identify_system(wrconn, &startpointTLI, &server_version);
1406 
1407  /* Build logical replication streaming options. */
1408  options.logical = true;
1409  options.startpoint = origin_startpos;
1410  options.slotname = MySubscription->slotname;
1411  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1412  options.proto.logical.publication_names = MySubscription->publications;
1413 
1414  /* Start streaming from the slot. */
1415  walrcv_startstreaming(wrconn, &options);
1416 
1417  /* Run the main loop. */
1418  ApplyLoop();
1419 
1421 
1422  /* We should only get here if we received SIGTERM */
1423  proc_exit(0);
1424 }
Subscription * MySubscription
Definition: worker.c:108
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
WalReceiverConn * wrconn
Definition: worker.c:106
#define DEBUG1
Definition: elog.h:25
static MemoryContext ApplyCacheContext
Definition: worker.c:104
uint32 TimeLineID
Definition: xlogdefs.h:45
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
#define DatumGetObjectId(X)
Definition: postgres.h:508
void CommitTransactionCommand(void)
Definition: xact.c:2745
union WalRcvStreamOptions::@53 proto
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:51
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:219
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1107
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:189
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:980
#define LOG
Definition: elog.h:26
#define OidIsValid(objectId)
Definition: c.h:534
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
#define NAMEDATALEN
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1300
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:59
#define ERROR
Definition: elog.h:43
Definition: guc.h:75
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:145
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:215
void logicalrep_worker_attach(int slot)
Definition: launcher.c:407
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6629
XLogRecPtr startpoint
Definition: walreceiver.h:144
List * publications
RepOriginId replorigin_create(char *roname)
Definition: origin.c:235
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
Definition: postmaster.c:5468
#define ereport(elevel, rest)
Definition: elog.h:122
static char ** options
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1381
uintptr_t Datum
Definition: postgres.h:374
void logicalrep_worker_sigterm(SIGNAL_ARGS)
Definition: launcher.c:457
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
TimestampTz last_recv_time
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:671
RepOriginId replorigin_session_origin
Definition: origin.c:150
void StartTransactionCommand(void)
Definition: xact.c:2675
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:133
static int server_version
Definition: pg_dumpall.c:81
void CreateCacheMemoryContext(void)
Definition: catcache.c:525
#define walrcv_disconnect(conn)
Definition: walreceiver.h:231
bool MySubscriptionValid
Definition: worker.c:109
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define elog
Definition: elog.h:219
static void ApplyLoop(void)
Definition: worker.c:915
TimestampTz reply_time
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
MemoryContext CacheMemoryContext
Definition: mcxt.c:46
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5497
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:209
static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 549 of file worker.c.

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), and apply_handle_update().

550 {
551  /* Updatable, no error. */
552  if (rel->updatable)
553  return;
554 
555  /*
556  * We are in error mode so it's fine this is somewhat slow.
557  * It's better to give user correct error.
558  */
560  {
561  ereport(ERROR,
562  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
563  errmsg("publisher does not send replica identity column "
564  "expected by the logical replication target relation \"%s.%s\"",
565  rel->remoterel.nspname, rel->remoterel.relname)));
566  }
567 
568  ereport(ERROR,
569  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
570  errmsg("logical replication target relation \"%s.%s\" has "
571  "neither REPLICA IDENTIY index nor PRIMARY "
572  "KEY and published relation does not have "
573  "REPLICA IDENTITY FULL",
574  rel->remoterel.nspname, rel->remoterel.relname)));
575 }
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:486
int errcode(int sqlerrcode)
Definition: elog.c:575
#define OidIsValid(objectId)
Definition: c.h:534
#define ERROR
Definition: elog.h:43
LogicalRepRelation remoterel
#define ereport(elevel, rest)
Definition: elog.h:122
int errmsg(const char *fmt,...)
Definition: elog.c:797
static EState* create_estate_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 151 of file worker.c.

References CreateExecutorState(), EState::es_num_result_relations, EState::es_range_table, EState::es_result_relation_info, EState::es_result_relations, EState::es_trig_tuple_slot, ExecInitExtraTupleSlot(), InitResultRelInfo(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, NULL, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, ResultRelInfo::ri_TrigDesc, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

152 {
153  EState *estate;
154  ResultRelInfo *resultRelInfo;
155  RangeTblEntry *rte;
156 
157  estate = CreateExecutorState();
158 
159  rte = makeNode(RangeTblEntry);
160  rte->rtekind = RTE_RELATION;
161  rte->relid = RelationGetRelid(rel->localrel);
162  rte->relkind = rel->localrel->rd_rel->relkind;
163  estate->es_range_table = list_make1(rte);
164 
165  resultRelInfo = makeNode(ResultRelInfo);
166  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
167 
168  estate->es_result_relations = resultRelInfo;
169  estate->es_num_result_relations = 1;
170  estate->es_result_relation_info = resultRelInfo;
171 
172  /* Triggers might need a slot */
173  if (resultRelInfo->ri_TrigDesc)
174  estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
175 
176  return estate;
177 }
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1221
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
List * es_range_table
Definition: execnodes.h:372
Form_pg_class rd_rel
Definition: rel.h:113
#define list_make1(x1)
Definition: pg_list.h:133
ResultRelInfo * es_result_relations
Definition: execnodes.h:382
TupleTableSlot * es_trig_tuple_slot
Definition: execnodes.h:388
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:339
EState * CreateExecutorState(void)
Definition: execUtils.c:72
int es_num_result_relations
Definition: execnodes.h:383
#define makeNode(_type_)
Definition: nodes.h:556
#define NULL
Definition: c.h:226
RTEKind rtekind
Definition: parsenodes.h:882
#define RelationGetRelid(relation)
Definition: rel.h:413
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:384
static bool ensure_transaction ( void  )
static

Definition at line 125 of file worker.c.

References CurrentMemoryContext, IsTransactionState(), MemoryContextSwitchTo(), MySubscriptionValid, reread_subscription(), and StartTransactionCommand().

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

126 {
127  if (IsTransactionState())
128  {
131  return false;
132  }
133 
135 
136  if (!MySubscriptionValid)
138 
140  return true;
141 }
static void reread_subscription(void)
Definition: worker.c:1206
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static MemoryContext ApplyContext
Definition: worker.c:103
void StartTransactionCommand(void)
Definition: xact.c:2675
bool IsTransactionState(void)
Definition: xact.c:349
bool MySubscriptionValid
Definition: worker.c:109
static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 836 of file worker.c.

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, pfree(), and FlushPosition::remote_end.

Referenced by send_feedback().

838 {
839  dlist_mutable_iter iter;
840  XLogRecPtr local_flush = GetFlushRecPtr();
841 
843  *flush = InvalidXLogRecPtr;
844 
846  {
847  FlushPosition *pos =
848  dlist_container(FlushPosition, node, iter.cur);
849 
850  *write = pos->remote_end;
851 
852  if (pos->local_end <= local_flush)
853  {
854  *flush = pos->remote_end;
855  dlist_delete(iter.cur);
856  pfree(pos);
857  }
858  else
859  {
860  /*
861  * Don't want to uselessly iterate over the rest of the list which
862  * could potentially be long. Instead get the last element and
863  * grab the write position from there.
864  */
865  pos = dlist_tail_element(FlushPosition, node,
866  &lsn_mapping);
867  *write = pos->remote_end;
868  *have_pending_txes = true;
869  return;
870  }
871  }
872 
873  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
874 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static dlist_head lsn_mapping
Definition: worker.c:95
#define write(a, b, c)
Definition: win32.h:19
XLogRecPtr remote_end
Definition: worker.c:92
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8154
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:992
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
XLogRecPtr local_end
Definition: worker.c:91
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
static Oid GetRelationIdentityOrPK ( Relation  rel)
static

Definition at line 486 of file worker.c.

References OidIsValid, RelationGetPrimaryKeyIndex(), and RelationGetReplicaIndex().

Referenced by apply_handle_delete(), apply_handle_update(), and check_relation_updatable().

487 {
488  Oid idxoid;
489 
490  idxoid = RelationGetReplicaIndex(rel);
491 
492  if (!OidIsValid(idxoid))
493  idxoid = RelationGetPrimaryKeyIndex(rel);
494 
495  return idxoid;
496 }
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4588
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:534
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4567
static void reread_subscription ( void  )
static

Definition at line 1206 of file worker.c.

References Subscription::conninfo, Subscription::dbid, elog, Subscription::enabled, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, newsub(), proc_exit(), Subscription::publications, Subscription::slotname, LogicalRepWorker::subid, and walrcv_disconnect.

Referenced by ApplyLoop(), and ensure_transaction().

1207 {
1208  MemoryContext oldctx;
1210 
1211  /* Ensure allocations in permanent context. */
1213 
1214  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1215 
1216  /*
1217  * Exit if the subscription was removed.
1218  * This normally should not happen as the worker gets killed
1219  * during DROP SUBSCRIPTION.
1220  */
1221  if (!newsub)
1222  {
1223  ereport(LOG,
1224  (errmsg("logical replication worker for subscription \"%s\" will "
1225  "stop because the subscription was removed",
1226  MySubscription->name)));
1227 
1229  proc_exit(0);
1230  }
1231 
1232  /*
1233  * Exit if connection string was changed. The launcher will start
1234  * new worker.
1235  */
1236  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1237  {
1238  ereport(LOG,
1239  (errmsg("logical replication worker for subscription \"%s\" will "
1240  "restart because the connection information was changed",
1241  MySubscription->name)));
1242 
1244  proc_exit(0);
1245  }
1246 
1247  /*
1248  * Exit if publication list was changed. The launcher will start
1249  * new worker.
1250  */
1252  {
1253  ereport(LOG,
1254  (errmsg("logical replication worker for subscription \"%s\" will "
1255  "restart because subscription's publications were changed",
1256  MySubscription->name)));
1257 
1259  proc_exit(0);
1260  }
1261 
1262  /*
1263  * Exit if the subscription was disabled.
1264  * This normally should not happen as the worker gets killed
1265  * during ALTER SUBSCRIPTION ... DISABLE.
1266  */
1267  if (!newsub->enabled)
1268  {
1269  ereport(LOG,
1270  (errmsg("logical replication worker for subscription \"%s\" will "
1271  "stop because the subscription was disabled",
1272  MySubscription->name)));
1273 
1275  proc_exit(0);
1276  }
1277 
1278  /* Check for other changes that should never happen too. */
1279  if (newsub->dbid != MySubscription->dbid ||
1280  strcmp(newsub->name, MySubscription->name) != 0 ||
1281  strcmp(newsub->slotname, MySubscription->slotname) != 0)
1282  {
1283  elog(ERROR, "subscription %u changed unexpectedly",
1285  }
1286 
1287  /* Clean old subscription info and switch to new one. */
1290 
1291  MemoryContextSwitchTo(oldctx);
1292 
1293  MySubscriptionValid = true;
1294 }
Subscription * MySubscription
Definition: worker.c:108
WalReceiverConn * wrconn
Definition: worker.c:106
static MemoryContext ApplyCacheContext
Definition: worker.c:104
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2870
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void proc_exit(int code)
Definition: ipc.c:99
#define LOG
Definition: elog.h:26
Subscription * GetSubscription(Oid subid, bool missing_ok)
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:59
#define ERROR
Definition: elog.h:43
List * publications
#define ereport(elevel, rest)
Definition: elog.h:122
#define walrcv_disconnect(conn)
Definition: walreceiver.h:231
bool MySubscriptionValid
Definition: worker.c:109
void FreeSubscription(Subscription *sub)
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define elog
Definition: elog.h:219
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 1117 of file worker.c.

References StringInfoData::data, DEBUG2, elog, get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, StringInfoData::len, makeStringInfo(), MemoryContextSwitchTo(), now(), NULL, pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by ApplyLoop().

1118 {
1119  static StringInfo reply_message = NULL;
1120  static TimestampTz send_time = 0;
1121 
1122  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1123  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1124  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1125 
1126  XLogRecPtr writepos;
1127  XLogRecPtr flushpos;
1128  TimestampTz now;
1129  bool have_pending_txes;
1130 
1131  /*
1132  * If the user doesn't want status to be reported to the publisher, be
1133  * sure to exit before doing anything at all.
1134  */
1135  if (!force && wal_receiver_status_interval <= 0)
1136  return;
1137 
1138  /* It's legal to not pass a recvpos */
1139  if (recvpos < last_recvpos)
1140  recvpos = last_recvpos;
1141 
1142  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1143 
1144  /*
1145  * No outstanding transactions to flush, we can report the latest
1146  * received position. This is important for synchronous replication.
1147  */
1148  if (!have_pending_txes)
1149  flushpos = writepos = recvpos;
1150 
1151  if (writepos < last_writepos)
1152  writepos = last_writepos;
1153 
1154  if (flushpos < last_flushpos)
1155  flushpos = last_flushpos;
1156 
1157  now = GetCurrentTimestamp();
1158 
1159  /* if we've already reported everything we're good */
1160  if (!force &&
1161  writepos == last_writepos &&
1162  flushpos == last_flushpos &&
1163  !TimestampDifferenceExceeds(send_time, now,
1165  return;
1166  send_time = now;
1167 
1168  if (!reply_message)
1169  {
1171  reply_message = makeStringInfo();
1172  MemoryContextSwitchTo(oldctx);
1173  }
1174  else
1175  resetStringInfo(reply_message);
1176 
1177  pq_sendbyte(reply_message, 'r');
1178  pq_sendint64(reply_message, recvpos); /* write */
1179  pq_sendint64(reply_message, flushpos); /* flush */
1180  pq_sendint64(reply_message, writepos); /* apply */
1181  pq_sendint64(reply_message, now); /* sendTime */
1182  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1183 
1184  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1185  force,
1186  (uint32) (recvpos >> 32), (uint32) recvpos,
1187  (uint32) (writepos >> 32), (uint32) writepos,
1188  (uint32) (flushpos >> 32), (uint32) flushpos
1189  );
1190 
1191  walrcv_send(wrconn, reply_message->data, reply_message->len);
1192 
1193  if (recvpos > last_recvpos)
1194  last_recvpos = recvpos;
1195  if (writepos > last_writepos)
1196  last_writepos = writepos;
1197  if (flushpos > last_flushpos)
1198  last_flushpos = flushpos;
1199 }
WalReceiverConn * wrconn
Definition: worker.c:106
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static MemoryContext ApplyCacheContext
Definition: worker.c:104
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
StringInfo makeStringInfo(void)
Definition: stringinfo.c:29
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int wal_receiver_status_interval
Definition: walreceiver.c:73
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1648
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:836
static StringInfoData reply_message
Definition: walreceiver.c:110
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
unsigned int uint32
Definition: c.h:265
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:225
#define elog
Definition: elog.h:219
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 187 of file worker.c.

References LogicalRepRelMapEntry::attrmap, tupleDesc::attrs, build_column_default(), ExecEvalExpr, ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, LogicalRepRelation::natts, tupleDesc::natts, NULL, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, and TupleTableSlot::tts_values.

Referenced by apply_handle_insert().

189 {
190  TupleDesc desc = RelationGetDescr(rel->localrel);
191  int num_phys_attrs = desc->natts;
192  int i;
193  int attnum,
194  num_defaults = 0;
195  int *defmap;
196  ExprState **defexprs;
197  ExprContext *econtext;
198 
199  econtext = GetPerTupleExprContext(estate);
200 
201  /* We got all the data via replication, no need to evaluate anything. */
202  if (num_phys_attrs == rel->remoterel.natts)
203  return;
204 
205  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
206  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
207 
208  for (attnum = 0; attnum < num_phys_attrs; attnum++)
209  {
210  Expr *defexpr;
211 
212  if (desc->attrs[attnum]->attisdropped)
213  continue;
214 
215  if (rel->attrmap[attnum] >= 0)
216  continue;
217 
218  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
219 
220  if (defexpr != NULL)
221  {
222  /* Run the expression through planner */
223  defexpr = expression_planner(defexpr);
224 
225  /* Initialize executable expression in copycontext */
226  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
227  defmap[num_defaults] = attnum;
228  num_defaults++;
229  }
230 
231  }
232 
233  for (i = 0; i < num_defaults; i++)
234  slot->tts_values[defmap[i]] =
235  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
236 }
#define RelationGetDescr(relation)
Definition: rel.h:425
Expr * expression_planner(Expr *expr)
Definition: planner.c:5211
Form_pg_attribute * attrs
Definition: tupdesc.h:74
Datum * tts_values
Definition: tuptable.h:125
int natts
Definition: tupdesc.h:73
#define GetPerTupleExprContext(estate)
Definition: executor.h:338
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execQual.c:4266
#define ExecEvalExpr(expr, econtext, isNull)
Definition: executor.h:73
LogicalRepRelation remoterel
bool * tts_isnull
Definition: tuptable.h:126
Node * build_column_default(Relation rel, int attrno)
#define NULL
Definition: c.h:226
void * palloc(Size size)
Definition: mcxt.c:891
int i
static void slot_modify_cstrings ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
char **  values,
bool replaces 
)
static

Definition at line 331 of file worker.c.

References ErrorContextCallback::arg, SlotErrCallbackArg::attnum, LogicalRepRelMapEntry::attrmap, tupleDesc::attrs, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, tupleDesc::natts, NULL, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, LogicalRepRelMapEntry::remoterel, slot_getallattrs(), slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, and TupleTableSlot::tts_values.

Referenced by apply_handle_update().

333 {
334  int natts = slot->tts_tupleDescriptor->natts;
335  int i;
336  SlotErrCallbackArg errarg;
337  ErrorContextCallback errcallback;
338 
339  slot_getallattrs(slot);
340  ExecClearTuple(slot);
341 
342  /* Push callback + info on the error context stack */
343  errarg.rel = &rel->remoterel;
344  errarg.attnum = -1;
345  errcallback.callback = slot_store_error_callback;
346  errcallback.arg = (void *) &errarg;
347  errcallback.previous = error_context_stack;
348  error_context_stack = &errcallback;
349 
350  /* Call the "in" function for each replaced attribute */
351  for (i = 0; i < natts; i++)
352  {
354  int remoteattnum = rel->attrmap[i];
355 
356  if (remoteattnum >= 0 && !replaces[remoteattnum])
357  continue;
358 
359  if (remoteattnum >= 0 && values[remoteattnum] != NULL)
360  {
361  Oid typinput;
362  Oid typioparam;
363 
364  errarg.attnum = remoteattnum;
365 
366  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
367  slot->tts_values[i] = OidInputFunctionCall(typinput, values[i],
368  typioparam,
369  att->atttypmod);
370  slot->tts_isnull[i] = false;
371  }
372  else
373  {
374  slot->tts_values[i] = (Datum) 0;
375  slot->tts_isnull[i] = true;
376  }
377  }
378 
379  /* Pop the error context stack */
380  error_context_stack = errcallback.previous;
381 
382  ExecStoreVirtualTuple(slot);
383 }
LogicalRepRelation * rel
Definition: worker.c:99
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
Form_pg_attribute * attrs
Definition: tupdesc.h:74
Datum * tts_values
Definition: tuptable.h:125
unsigned int Oid
Definition: postgres_ext.h:31
struct ErrorContextCallback * previous
Definition: elog.h:238
int natts
Definition: tupdesc.h:73
ErrorContextCallback * error_context_stack
Definition: elog.c:88
LogicalRepRelation remoterel
bool * tts_isnull
Definition: tuptable.h:126
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:184
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2567
void slot_getallattrs(TupleTableSlot *slot)
Definition: heaptuple.c:1239
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
uintptr_t Datum
Definition: postgres.h:374
#define NULL
Definition: c.h:226
static void slot_store_error_callback(void *arg)
Definition: worker.c:242
static Datum values[MAXATTR]
Definition: bootstrap.c:162
void(* callback)(void *arg)
Definition: elog.h:239
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1997
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:488
static void slot_store_cstrings ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
char **  values 
)
static

Definition at line 267 of file worker.c.

References ErrorContextCallback::arg, SlotErrCallbackArg::attnum, LogicalRepRelMapEntry::attrmap, tupleDesc::attrs, ErrorContextCallback::callback, error_context_stack, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeInputInfo(), i, tupleDesc::natts, NULL, OidInputFunctionCall(), ErrorContextCallback::previous, SlotErrCallbackArg::rel, LogicalRepRelMapEntry::remoterel, slot_store_error_callback(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, and TupleTableSlot::tts_values.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

269 {
270  int natts = slot->tts_tupleDescriptor->natts;
271  int i;
272  SlotErrCallbackArg errarg;
273  ErrorContextCallback errcallback;
274 
275  ExecClearTuple(slot);
276 
277  /* Push callback + info on the error context stack */
278  errarg.rel = &rel->remoterel;
279  errarg.attnum = -1;
280  errcallback.callback = slot_store_error_callback;
281  errcallback.arg = (void *) &errarg;
282  errcallback.previous = error_context_stack;
283  error_context_stack = &errcallback;
284 
285  /* Call the "in" function for each non-dropped attribute */
286  for (i = 0; i < natts; i++)
287  {
289  int remoteattnum = rel->attrmap[i];
290 
291  if (!att->attisdropped && remoteattnum >= 0 &&
292  values[remoteattnum] != NULL)
293  {
294  Oid typinput;
295  Oid typioparam;
296 
297  errarg.attnum = remoteattnum;
298 
299  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
300  slot->tts_values[i] = OidInputFunctionCall(typinput,
301  values[remoteattnum],
302  typioparam,
303  att->atttypmod);
304  slot->tts_isnull[i] = false;
305  }
306  else
307  {
308  /*
309  * We assign NULL to dropped attributes, NULL values, and missing
310  * values (missing values should be later filled using
311  * slot_fill_defaults).
312  */
313  slot->tts_values[i] = (Datum) 0;
314  slot->tts_isnull[i] = true;
315  }
316  }
317 
318  /* Pop the error context stack */
319  error_context_stack = errcallback.previous;
320 
321  ExecStoreVirtualTuple(slot);
322 }
LogicalRepRelation * rel
Definition: worker.c:99
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
Form_pg_attribute * attrs
Definition: tupdesc.h:74
Datum * tts_values
Definition: tuptable.h:125
unsigned int Oid
Definition: postgres_ext.h:31
struct ErrorContextCallback * previous
Definition: elog.h:238
int natts
Definition: tupdesc.h:73
ErrorContextCallback * error_context_stack
Definition: elog.c:88
LogicalRepRelation remoterel
bool * tts_isnull
Definition: tuptable.h:126
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:184
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2567
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
uintptr_t Datum
Definition: postgres.h:374
#define NULL
Definition: c.h:226
static void slot_store_error_callback(void *arg)
Definition: worker.c:242
static Datum values[MAXATTR]
Definition: bootstrap.c:162
void(* callback)(void *arg)
Definition: elog.h:239
int i
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1997
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:488
static void slot_store_error_callback ( void *  arg)
static

Definition at line 242 of file worker.c.

References LogicalRepRelation::attnames, SlotErrCallbackArg::attnum, LogicalRepRelation::atttyps, errcontext, format_type_be(), logicalrep_typmap_getid(), LogicalRepRelation::nspname, SlotErrCallbackArg::rel, and LogicalRepRelation::relname.

Referenced by slot_modify_cstrings(), and slot_store_cstrings().

243 {
245  Oid remotetypoid,
246  localtypoid;
247 
248  if (errarg->attnum < 0)
249  return;
250 
251  remotetypoid = errarg->rel->atttyps[errarg->attnum];
252  localtypoid = logicalrep_typmap_getid(remotetypoid);
253  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
254  "remote type %s, local type %s",
255  errarg->rel->nspname, errarg->rel->relname,
256  errarg->rel->attnames[errarg->attnum],
257  format_type_be(remotetypoid),
258  format_type_be(localtypoid));
259 }
LogicalRepRelation * rel
Definition: worker.c:99
char * format_type_be(Oid type_oid)
Definition: format_type.c:94
Oid logicalrep_typmap_getid(Oid remoteid)
Definition: relation.c:441
unsigned int Oid
Definition: postgres_ext.h:31
#define errcontext
Definition: elog.h:164
void * arg
static void store_flush_position ( XLogRecPtr  remote_lsn)
static

Definition at line 880 of file worker.c.

References dlist_push_tail(), FlushPosition::local_end, MemoryContextSwitchTo(), FlushPosition::node, palloc(), FlushPosition::remote_end, and XactLastCommitEnd.

Referenced by apply_handle_commit().

881 {
882  FlushPosition *flushpos;
883 
884  /* Need to do this in permanent context */
886 
887  /* Track commit lsn */
888  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
889  flushpos->local_end = XactLastCommitEnd;
890  flushpos->remote_end = remote_lsn;
891 
892  dlist_push_tail(&lsn_mapping, &flushpos->node);
894 }
static MemoryContext ApplyCacheContext
Definition: worker.c:104
static dlist_head lsn_mapping
Definition: worker.c:95
dlist_node node
Definition: worker.c:90
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:337
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
XLogRecPtr remote_end
Definition: worker.c:92
static MemoryContext ApplyContext
Definition: worker.c:103
XLogRecPtr local_end
Definition: worker.c:91
void * palloc(Size size)
Definition: mcxt.c:891
static void subscription_change_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1300 of file worker.c.

References MySubscriptionValid.

Referenced by ApplyWorkerMain().

1301 {
1302  MySubscriptionValid = false;
1303 }
bool MySubscriptionValid
Definition: worker.c:109
static void UpdateWorkerStats ( XLogRecPtr  last_lsn,
TimestampTz  send_time,
bool  reply 
)
static

Definition at line 899 of file worker.c.

References GetCurrentTimestamp(), LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, MyLogicalRepWorker, LogicalRepWorker::reply_lsn, and LogicalRepWorker::reply_time.

Referenced by ApplyLoop().

900 {
901  MyLogicalRepWorker->last_lsn = last_lsn;
902  MyLogicalRepWorker->last_send_time = send_time;
904  if (reply)
905  {
906  MyLogicalRepWorker->reply_lsn = last_lsn;
907  MyLogicalRepWorker->reply_time = send_time;
908  }
909 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
TimestampTz last_send_time
XLogRecPtr last_lsn
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:59
XLogRecPtr reply_lsn
TimestampTz last_recv_time
TimestampTz reply_time

Variable Documentation

MemoryContext ApplyCacheContext = NULL
static

Definition at line 104 of file worker.c.

MemoryContext ApplyContext = NULL
static

Definition at line 103 of file worker.c.

bool in_remote_transaction = false

Definition at line 111 of file worker.c.

Referenced by apply_handle_begin(), apply_handle_commit(), apply_handle_origin(), and ApplyLoop().

dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
static

Definition at line 95 of file worker.c.

Subscription* MySubscription = NULL

Definition at line 108 of file worker.c.

bool MySubscriptionValid = false
WalReceiverConn* wrconn = NULL

Definition at line 106 of file worker.c.

Referenced by CreateSubscription(), and DropSubscription().