PostgreSQL Source Code  git master
applyparallelworker.c File Reference
#include "postgres.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "tcop/tcopprot.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
Include dependency graph for applyparallelworker.c:

Go to the source code of this file.

Data Structures

struct  ParallelApplyWorkerEntry
 

Macros

#define PG_LOGICAL_APPLY_SHM_MAGIC   0x787ca067
 
#define PARALLEL_APPLY_KEY_SHARED   1
 
#define PARALLEL_APPLY_KEY_MQ   2
 
#define PARALLEL_APPLY_KEY_ERROR_QUEUE   3
 
#define DSM_QUEUE_SIZE   (16 * 1024 * 1024)
 
#define DSM_ERROR_QUEUE_SIZE   (16 * 1024)
 
#define SIZE_STATS_MESSAGE   (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
 
#define PARALLEL_APPLY_LOCK_STREAM   0
 
#define PARALLEL_APPLY_LOCK_XACT   1
 
#define SHM_SEND_RETRY_INTERVAL_MS   1000
 
#define SHM_SEND_TIMEOUT_MS   (10000 - SHM_SEND_RETRY_INTERVAL_MS)
 

Typedefs

typedef struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry
 

Functions

static void pa_free_worker_info (ParallelApplyWorkerInfo *winfo)
 
static ParallelTransState pa_get_xact_state (ParallelApplyWorkerShared *wshared)
 
static PartialFileSetState pa_get_fileset_state (void)
 
static bool pa_can_start (void)
 
static bool pa_setup_dsm (ParallelApplyWorkerInfo *winfo)
 
static ParallelApplyWorkerInfopa_launch_parallel_worker (void)
 
void pa_allocate_worker (TransactionId xid)
 
ParallelApplyWorkerInfopa_find_worker (TransactionId xid)
 
static void pa_free_worker (ParallelApplyWorkerInfo *winfo)
 
void pa_detach_all_error_mq (void)
 
static bool pa_has_spooled_message_pending ()
 
static bool pa_process_spooled_messages_if_required (void)
 
static void ProcessParallelApplyInterrupts (void)
 
static void LogicalParallelApplyLoop (shm_mq_handle *mqh)
 
static void pa_shutdown (int code, Datum arg)
 
void ParallelApplyWorkerMain (Datum main_arg)
 
void HandleParallelApplyMessageInterrupt (void)
 
static void HandleParallelApplyMessage (StringInfo msg)
 
void HandleParallelApplyMessages (void)
 
bool pa_send_data (ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 
void pa_switch_to_partial_serialize (ParallelApplyWorkerInfo *winfo, bool stream_locked)
 
static void pa_wait_for_xact_state (ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
 
static void pa_wait_for_xact_finish (ParallelApplyWorkerInfo *winfo)
 
void pa_set_xact_state (ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
 
void pa_set_stream_apply_worker (ParallelApplyWorkerInfo *winfo)
 
static void pa_savepoint_name (Oid suboid, TransactionId xid, char *spname, Size szsp)
 
void pa_start_subtrans (TransactionId current_xid, TransactionId top_xid)
 
void pa_reset_subtrans (void)
 
void pa_stream_abort (LogicalRepStreamAbortData *abort_data)
 
void pa_set_fileset_state (ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
 
void pa_lock_stream (TransactionId xid, LOCKMODE lockmode)
 
void pa_unlock_stream (TransactionId xid, LOCKMODE lockmode)
 
void pa_lock_transaction (TransactionId xid, LOCKMODE lockmode)
 
void pa_unlock_transaction (TransactionId xid, LOCKMODE lockmode)
 
void pa_decr_and_wait_stream_block (void)
 
void pa_xact_finish (ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 

Variables

static HTABParallelApplyTxnHash = NULL
 
static ListParallelApplyWorkerPool = NIL
 
ParallelApplyWorkerSharedMyParallelShared = NULL
 
volatile sig_atomic_t ParallelApplyMessagePending = false
 
static ParallelApplyWorkerInfostream_apply_worker = NULL
 
static Listsubxactlist = NIL
 

Macro Definition Documentation

◆ DSM_ERROR_QUEUE_SIZE

#define DSM_ERROR_QUEUE_SIZE   (16 * 1024)

Definition at line 195 of file applyparallelworker.c.

◆ DSM_QUEUE_SIZE

#define DSM_QUEUE_SIZE   (16 * 1024 * 1024)

Definition at line 187 of file applyparallelworker.c.

◆ PARALLEL_APPLY_KEY_ERROR_QUEUE

#define PARALLEL_APPLY_KEY_ERROR_QUEUE   3

Definition at line 184 of file applyparallelworker.c.

◆ PARALLEL_APPLY_KEY_MQ

#define PARALLEL_APPLY_KEY_MQ   2

Definition at line 183 of file applyparallelworker.c.

◆ PARALLEL_APPLY_KEY_SHARED

#define PARALLEL_APPLY_KEY_SHARED   1

Definition at line 182 of file applyparallelworker.c.

◆ PARALLEL_APPLY_LOCK_STREAM

#define PARALLEL_APPLY_LOCK_STREAM   0

Definition at line 209 of file applyparallelworker.c.

◆ PARALLEL_APPLY_LOCK_XACT

#define PARALLEL_APPLY_LOCK_XACT   1

Definition at line 210 of file applyparallelworker.c.

◆ PG_LOGICAL_APPLY_SHM_MAGIC

#define PG_LOGICAL_APPLY_SHM_MAGIC   0x787ca067

Definition at line 175 of file applyparallelworker.c.

◆ SHM_SEND_RETRY_INTERVAL_MS

#define SHM_SEND_RETRY_INTERVAL_MS   1000

◆ SHM_SEND_TIMEOUT_MS

#define SHM_SEND_TIMEOUT_MS   (10000 - SHM_SEND_RETRY_INTERVAL_MS)

◆ SIZE_STATS_MESSAGE

#define SIZE_STATS_MESSAGE   (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))

Definition at line 203 of file applyparallelworker.c.

Typedef Documentation

◆ ParallelApplyWorkerEntry

Function Documentation

◆ HandleParallelApplyMessage()

static void HandleParallelApplyMessage ( StringInfo  msg)
static

Definition at line 1004 of file applyparallelworker.c.

1005 {
1006  char msgtype;
1007 
1008  msgtype = pq_getmsgbyte(msg);
1009 
1010  switch (msgtype)
1011  {
1012  case 'E': /* ErrorResponse */
1013  {
1014  ErrorData edata;
1015 
1016  /* Parse ErrorResponse. */
1017  pq_parse_errornotice(msg, &edata);
1018 
1019  /*
1020  * If desired, add a context line to show that this is a
1021  * message propagated from a parallel apply worker. Otherwise,
1022  * it can sometimes be confusing to understand what actually
1023  * happened.
1024  */
1025  if (edata.context)
1026  edata.context = psprintf("%s\n%s", edata.context,
1027  _("logical replication parallel apply worker"));
1028  else
1029  edata.context = pstrdup(_("logical replication parallel apply worker"));
1030 
1031  /*
1032  * Context beyond that should use the error context callbacks
1033  * that were in effect in LogicalRepApplyLoop().
1034  */
1036 
1037  /*
1038  * The actual error must have been reported by the parallel
1039  * apply worker.
1040  */
1041  ereport(ERROR,
1042  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1043  errmsg("logical replication parallel apply worker exited due to error"),
1044  errcontext("%s", edata.context)));
1045  }
1046 
1047  /*
1048  * Don't need to do anything about NoticeResponse and
1049  * NotifyResponse as the logical replication worker doesn't need
1050  * to send messages to the client.
1051  */
1052  case 'N':
1053  case 'A':
1054  break;
1055 
1056  default:
1057  elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1058  msgtype, msg->len);
1059  }
1060 }
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:305
ErrorContextCallback * error_context_stack
Definition: elog.c:95
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define _(x)
Definition: elog.c:91
#define errcontext
Definition: elog.h:196
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
char * pstrdup(const char *in)
Definition: mcxt.c:1644
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:402
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
Definition: pqmq.c:216
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
char * context
Definition: elog.h:443

References _, apply_error_context_stack, ErrorData::context, elog(), ereport, errcode(), errcontext, errmsg(), ERROR, error_context_stack, StringInfoData::len, pq_getmsgbyte(), pq_parse_errornotice(), psprintf(), and pstrdup().

Referenced by HandleParallelApplyMessages().

◆ HandleParallelApplyMessageInterrupt()

void HandleParallelApplyMessageInterrupt ( void  )

Definition at line 992 of file applyparallelworker.c.

993 {
994  InterruptPending = true;
996  SetLatch(MyLatch);
997 }
volatile sig_atomic_t ParallelApplyMessagePending
volatile sig_atomic_t InterruptPending
Definition: globals.c:30
struct Latch * MyLatch
Definition: globals.c:58
void SetLatch(Latch *latch)
Definition: latch.c:607

References InterruptPending, MyLatch, ParallelApplyMessagePending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ HandleParallelApplyMessages()

void HandleParallelApplyMessages ( void  )

Definition at line 1066 of file applyparallelworker.c.

1067 {
1068  ListCell *lc;
1069  MemoryContext oldcontext;
1070 
1071  static MemoryContext hpam_context = NULL;
1072 
1073  /*
1074  * This is invoked from ProcessInterrupts(), and since some of the
1075  * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1076  * for recursive calls if more signals are received while this runs. It's
1077  * unclear that recursive entry would be safe, and it doesn't seem useful
1078  * even if it is safe, so let's block interrupts until done.
1079  */
1080  HOLD_INTERRUPTS();
1081 
1082  /*
1083  * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1084  * don't want to risk leaking data into long-lived contexts, so let's do
1085  * our work here in a private context that we can reset on each use.
1086  */
1087  if (!hpam_context) /* first time through? */
1088  hpam_context = AllocSetContextCreate(TopMemoryContext,
1089  "HandleParallelApplyMessages",
1091  else
1092  MemoryContextReset(hpam_context);
1093 
1094  oldcontext = MemoryContextSwitchTo(hpam_context);
1095 
1097 
1098  foreach(lc, ParallelApplyWorkerPool)
1099  {
1101  Size nbytes;
1102  void *data;
1104 
1105  /*
1106  * The leader will detach from the error queue and set it to NULL
1107  * before preparing to stop all parallel apply workers, so we don't
1108  * need to handle error messages anymore. See
1109  * logicalrep_worker_detach.
1110  */
1111  if (!winfo->error_mq_handle)
1112  continue;
1113 
1114  res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
1115 
1116  if (res == SHM_MQ_WOULD_BLOCK)
1117  continue;
1118  else if (res == SHM_MQ_SUCCESS)
1119  {
1120  StringInfoData msg;
1121 
1122  initStringInfo(&msg);
1123  appendBinaryStringInfo(&msg, data, nbytes);
1125  pfree(msg.data);
1126  }
1127  else
1128  ereport(ERROR,
1129  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1130  errmsg("lost connection to the logical replication parallel apply worker")));
1131  }
1132 
1133  MemoryContextSwitchTo(oldcontext);
1134 
1135  /* Might as well clear the context on our way out */
1136  MemoryContextReset(hpam_context);
1137 
1139 }
static List * ParallelApplyWorkerPool
static void HandleParallelApplyMessage(StringInfo msg)
size_t Size
Definition: c.h:589
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:330
void pfree(void *pointer)
Definition: mcxt.c:1456
MemoryContext TopMemoryContext
Definition: mcxt.c:141
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:134
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:132
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:573
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:227
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
shm_mq_handle * error_mq_handle

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), StringInfoData::data, data, ereport, errcode(), errmsg(), ERROR, ParallelApplyWorkerInfo::error_mq_handle, HandleParallelApplyMessage(), HOLD_INTERRUPTS, initStringInfo(), lfirst, MemoryContextReset(), MemoryContextSwitchTo(), ParallelApplyMessagePending, ParallelApplyWorkerPool, pfree(), res, RESUME_INTERRUPTS, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and TopMemoryContext.

Referenced by ProcessInterrupts().

◆ LogicalParallelApplyLoop()

static void LogicalParallelApplyLoop ( shm_mq_handle mqh)
static

Definition at line 734 of file applyparallelworker.c.

735 {
736  shm_mq_result shmq_res;
737  ErrorContextCallback errcallback;
739 
740  /*
741  * Init the ApplyMessageContext which we clean up after each replication
742  * protocol message.
743  */
745  "ApplyMessageContext",
747 
748  /*
749  * Push apply error context callback. Fields will be filled while applying
750  * a change.
751  */
752  errcallback.callback = apply_error_callback;
753  errcallback.previous = error_context_stack;
754  error_context_stack = &errcallback;
755 
756  for (;;)
757  {
758  void *data;
759  Size len;
760 
762 
763  /* Ensure we are reading the data into our memory context. */
765 
766  shmq_res = shm_mq_receive(mqh, &len, &data, true);
767 
768  if (shmq_res == SHM_MQ_SUCCESS)
769  {
770  StringInfoData s;
771  int c;
772 
773  if (len == 0)
774  elog(ERROR, "invalid message length");
775 
776  s.cursor = 0;
777  s.maxlen = -1;
778  s.data = (char *) data;
779  s.len = len;
780 
781  /*
782  * The first byte of messages sent from leader apply worker to
783  * parallel apply workers can only be 'w'.
784  */
785  c = pq_getmsgbyte(&s);
786  if (c != 'w')
787  elog(ERROR, "unexpected message \"%c\"", c);
788 
789  /*
790  * Ignore statistics fields that have been updated by the leader
791  * apply worker.
792  *
793  * XXX We can avoid sending the statistics fields from the leader
794  * apply worker but for that, it needs to rebuild the entire
795  * message by removing these fields which could be more work than
796  * simply ignoring these fields in the parallel apply worker.
797  */
799 
800  apply_dispatch(&s);
801  }
802  else if (shmq_res == SHM_MQ_WOULD_BLOCK)
803  {
804  /* Replay the changes from the file, if any. */
806  {
807  int rc;
808 
809  /* Wait for more work. */
810  rc = WaitLatch(MyLatch,
812  1000L,
814 
815  if (rc & WL_LATCH_SET)
817  }
818  }
819  else
820  {
821  Assert(shmq_res == SHM_MQ_DETACHED);
822 
823  ereport(ERROR,
824  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
825  errmsg("lost connection to the logical replication apply worker")));
826  }
827 
829  MemoryContextSwitchTo(oldcxt);
830  }
831 
832  /* Pop the error context stack. */
833  error_context_stack = errcallback.previous;
834 
835  MemoryContextSwitchTo(oldcxt);
836 }
static void ProcessParallelApplyInterrupts(void)
#define SIZE_STATS_MESSAGE
static bool pa_process_spooled_messages_if_required(void)
MemoryContext ApplyMessageContext
Definition: worker.c:307
void apply_dispatch(StringInfo s)
Definition: worker.c:3285
MemoryContext ApplyContext
Definition: worker.c:308
void apply_error_callback(void *arg)
Definition: worker.c:4916
void ResetLatch(Latch *latch)
Definition: latch.c:699
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:492
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
Assert(fmt[strlen(fmt) - 1] !='\n')
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
const void size_t len
char * c
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40
struct ErrorContextCallback * previous
Definition: elog.h:295
void(* callback)(void *arg)
Definition: elog.h:296
@ WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN
Definition: wait_event.h:45

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), ApplyContext, ApplyMessageContext, Assert(), ErrorContextCallback::callback, CurrentMemoryContext, StringInfoData::cursor, StringInfoData::data, data, elog(), ereport, errcode(), errmsg(), ERROR, error_context_stack, StringInfoData::len, len, StringInfoData::maxlen, MemoryContextReset(), MemoryContextSwitchTo(), MyLatch, pa_process_spooled_messages_if_required(), pq_getmsgbyte(), ErrorContextCallback::previous, ProcessParallelApplyInterrupts(), ResetLatch(), SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SIZE_STATS_MESSAGE, WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by ParallelApplyWorkerMain().

◆ pa_allocate_worker()

void pa_allocate_worker ( TransactionId  xid)

Definition at line 469 of file applyparallelworker.c.

470 {
471  bool found;
472  ParallelApplyWorkerInfo *winfo = NULL;
474 
475  if (!pa_can_start())
476  return;
477 
478  winfo = pa_launch_parallel_worker();
479  if (!winfo)
480  return;
481 
482  /* First time through, initialize parallel apply worker state hashtable. */
484  {
485  HASHCTL ctl;
486 
487  MemSet(&ctl, 0, sizeof(ctl));
488  ctl.keysize = sizeof(TransactionId);
489  ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
490  ctl.hcxt = ApplyContext;
491 
492  ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
493  16, &ctl,
495  }
496 
497  /* Create an entry for the requested transaction. */
498  entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
499  if (found)
500  elog(ERROR, "hash table corrupted");
501 
502  /* Update the transaction information in shared memory. */
503  SpinLockAcquire(&winfo->shared->mutex);
505  winfo->shared->xid = xid;
506  SpinLockRelease(&winfo->shared->mutex);
507 
508  winfo->in_use = true;
509  winfo->serialize_changes = false;
510  entry->winfo = winfo;
511  entry->xid = xid;
512 }
struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry
static bool pa_can_start(void)
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
static HTAB * ParallelApplyTxnHash
#define MemSet(start, val, len)
Definition: c.h:1004
uint32 TransactionId
Definition: c.h:636
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
ParallelApplyWorkerInfo * winfo
ParallelApplyWorkerShared * shared
ParallelTransState xact_state
@ PARALLEL_TRANS_UNKNOWN

References ApplyContext, elog(), HASHCTL::entrysize, ERROR, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, ParallelApplyWorkerInfo::in_use, HASHCTL::keysize, MemSet, ParallelApplyWorkerShared::mutex, pa_can_start(), pa_launch_parallel_worker(), PARALLEL_TRANS_UNKNOWN, ParallelApplyTxnHash, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, SpinLockAcquire, SpinLockRelease, ParallelApplyWorkerEntry::winfo, ParallelApplyWorkerShared::xact_state, ParallelApplyWorkerEntry::xid, and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_start().

◆ pa_can_start()

static bool pa_can_start ( void  )
static

Definition at line 265 of file applyparallelworker.c.

266 {
267  /* Only leader apply workers can start parallel apply workers. */
268  if (!am_leader_apply_worker())
269  return false;
270 
271  /*
272  * It is good to check for any change in the subscription parameter to
273  * avoid the case where for a very long time the change doesn't get
274  * reflected. This can happen when there is a constant flow of streaming
275  * transactions that are handled by parallel apply workers.
276  *
277  * It is better to do it before the below checks so that the latest values
278  * of subscription can be used for the checks.
279  */
281 
282  /*
283  * Don't start a new parallel apply worker if the subscription is not
284  * using parallel streaming mode, or if the publisher does not support
285  * parallel apply.
286  */
288  return false;
289 
290  /*
291  * Don't start a new parallel worker if user has set skiplsn as it's
292  * possible that they want to skip the streaming transaction. For
293  * streaming transactions, we need to serialize the transaction to a file
294  * so that we can get the last LSN of the transaction to judge whether to
295  * skip before starting to apply the change.
296  *
297  * One might think that we could allow parallelism if the first lsn of the
298  * transaction is greater than skiplsn, but we don't send it with the
299  * STREAM START message, and it doesn't seem worth sending the extra eight
300  * bytes with the STREAM START to enable parallelism for this case.
301  */
303  return false;
304 
305  /*
306  * For streaming transactions that are being applied using a parallel
307  * apply worker, we cannot decide whether to apply the change for a
308  * relation that is not in the READY state (see
309  * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
310  * time. So, we don't start the new parallel apply worker in this case.
311  */
312  if (!AllTablesyncsReady())
313  return false;
314 
315  return true;
316 }
void maybe_reread_subscription(void)
Definition: worker.c:3872
Subscription * MySubscription
Definition: worker.c:315
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:61
XLogRecPtr skiplsn
bool AllTablesyncsReady(void)
Definition: tablesync.c:1580
static bool am_leader_apply_worker(void)
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References AllTablesyncsReady(), am_leader_apply_worker(), maybe_reread_subscription(), MyLogicalRepWorker, MySubscription, LogicalRepWorker::parallel_apply, Subscription::skiplsn, and XLogRecPtrIsInvalid.

Referenced by pa_allocate_worker().

◆ pa_decr_and_wait_stream_block()

void pa_decr_and_wait_stream_block ( void  )

Definition at line 1594 of file applyparallelworker.c.

1595 {
1597 
1598  /*
1599  * It is only possible to not have any pending stream chunks when we are
1600  * applying spooled messages.
1601  */
1603  {
1605  return;
1606 
1607  elog(ERROR, "invalid pending streaming chunk 0");
1608  }
1609 
1611  {
1614  }
1615 }
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
static bool pa_has_spooled_message_pending()
ParallelApplyWorkerShared * MyParallelShared
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:396
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:236
#define AccessShareLock
Definition: lockdefs.h:36
pg_atomic_uint32 pending_stream_count
static bool am_parallel_apply_worker(void)

References AccessShareLock, am_parallel_apply_worker(), Assert(), elog(), ERROR, MyParallelShared, pa_has_spooled_message_pending(), pa_lock_stream(), pa_unlock_stream(), ParallelApplyWorkerShared::pending_stream_count, pg_atomic_read_u32(), pg_atomic_sub_fetch_u32(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), and apply_handle_stream_stop().

◆ pa_detach_all_error_mq()

void pa_detach_all_error_mq ( void  )

Definition at line 622 of file applyparallelworker.c.

623 {
624  ListCell *lc;
625 
626  foreach(lc, ParallelApplyWorkerPool)
627  {
629 
630  if (winfo->error_mq_handle)
631  {
633  winfo->error_mq_handle = NULL;
634  }
635  }
636 }
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:844

References ParallelApplyWorkerInfo::error_mq_handle, lfirst, ParallelApplyWorkerPool, and shm_mq_detach().

Referenced by logicalrep_worker_detach().

◆ pa_find_worker()

ParallelApplyWorkerInfo* pa_find_worker ( TransactionId  xid)

Definition at line 518 of file applyparallelworker.c.

519 {
520  bool found;
522 
523  if (!TransactionIdIsValid(xid))
524  return NULL;
525 
527  return NULL;
528 
529  /* Return the cached parallel apply worker if valid. */
531  return stream_apply_worker;
532 
533  /* Find an entry for the requested transaction. */
534  entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
535  if (found)
536  {
537  /* The worker must not have exited. */
538  Assert(entry->winfo->in_use);
539  return entry->winfo;
540  }
541 
542  return NULL;
543 }
static ParallelApplyWorkerInfo * stream_apply_worker
@ HASH_FIND
Definition: hsearch.h:113
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert(), HASH_FIND, hash_search(), ParallelApplyWorkerInfo::in_use, ParallelApplyTxnHash, stream_apply_worker, TransactionIdIsValid, and ParallelApplyWorkerEntry::winfo.

Referenced by get_transaction_apply_action().

◆ pa_free_worker()

static void pa_free_worker ( ParallelApplyWorkerInfo winfo)
static

Definition at line 556 of file applyparallelworker.c.

557 {
559  Assert(winfo->in_use);
561 
562  if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
563  elog(ERROR, "hash table corrupted");
564 
565  /*
566  * Stop the worker if there are enough workers in the pool.
567  *
568  * XXX Additionally, we also stop the worker if the leader apply worker
569  * serialize part of the transaction data due to a send timeout. This is
570  * because the message could be partially written to the queue and there
571  * is no way to clean the queue other than resending the message until it
572  * succeeds. Instead of trying to send the data which anyway would have
573  * been serialized and then letting the parallel apply worker deal with
574  * the spurious message, we stop the worker.
575  */
576  if (winfo->serialize_changes ||
579  {
581  pa_free_worker_info(winfo);
582 
583  return;
584  }
585 
586  winfo->in_use = false;
587  winfo->serialize_changes = false;
588 }
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)
@ HASH_REMOVE
Definition: hsearch.h:115
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
Definition: launcher.c:618
int max_parallel_apply_workers_per_subscription
Definition: launcher.c:59
static int list_length(const List *l)
Definition: pg_list.h:152
@ PARALLEL_TRANS_FINISHED

References am_parallel_apply_worker(), Assert(), elog(), ERROR, HASH_REMOVE, hash_search(), ParallelApplyWorkerInfo::in_use, list_length(), logicalrep_pa_worker_stop(), max_parallel_apply_workers_per_subscription, pa_free_worker_info(), pa_get_xact_state(), PARALLEL_TRANS_FINISHED, ParallelApplyTxnHash, ParallelApplyWorkerPool, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, and ParallelApplyWorkerShared::xid.

Referenced by pa_xact_finish().

◆ pa_free_worker_info()

static void pa_free_worker_info ( ParallelApplyWorkerInfo winfo)
static

Definition at line 595 of file applyparallelworker.c.

596 {
597  Assert(winfo);
598 
599  if (winfo->mq_handle)
600  shm_mq_detach(winfo->mq_handle);
601 
602  if (winfo->error_mq_handle)
604 
605  /* Unlink the files with serialized changes. */
606  if (winfo->serialize_changes)
608 
609  if (winfo->dsm_seg)
610  dsm_detach(winfo->dsm_seg);
611 
612  /* Remove from the worker pool. */
614 
615  pfree(winfo);
616 }
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4199
void dsm_detach(dsm_segment *seg)
Definition: dsm.c:776
List * list_delete_ptr(List *list, void *datum)
Definition: list.c:871
shm_mq_handle * mq_handle

References Assert(), dsm_detach(), ParallelApplyWorkerInfo::dsm_seg, ParallelApplyWorkerInfo::error_mq_handle, list_delete_ptr(), ParallelApplyWorkerInfo::mq_handle, MyLogicalRepWorker, ParallelApplyWorkerPool, pfree(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, shm_mq_detach(), stream_cleanup_files(), LogicalRepWorker::subid, and ParallelApplyWorkerShared::xid.

Referenced by pa_free_worker(), and pa_launch_parallel_worker().

◆ pa_get_fileset_state()

static PartialFileSetState pa_get_fileset_state ( void  )
static

◆ pa_get_xact_state()

static ParallelTransState pa_get_xact_state ( ParallelApplyWorkerShared wshared)
static

Definition at line 1322 of file applyparallelworker.c.

1323 {
1324  ParallelTransState xact_state;
1325 
1326  SpinLockAcquire(&wshared->mutex);
1327  xact_state = wshared->xact_state;
1328  SpinLockRelease(&wshared->mutex);
1329 
1330  return xact_state;
1331 }
ParallelTransState

References ParallelApplyWorkerShared::mutex, SpinLockAcquire, SpinLockRelease, and ParallelApplyWorkerShared::xact_state.

Referenced by pa_free_worker(), pa_wait_for_xact_finish(), and pa_wait_for_xact_state().

◆ pa_has_spooled_message_pending()

static bool pa_has_spooled_message_pending ( )
static

Definition at line 642 of file applyparallelworker.c.

643 {
644  PartialFileSetState fileset_state;
645 
646  fileset_state = pa_get_fileset_state();
647 
648  return (fileset_state != FS_EMPTY);
649 }
static PartialFileSetState pa_get_fileset_state(void)
@ FS_EMPTY

References FS_EMPTY, and pa_get_fileset_state().

Referenced by pa_decr_and_wait_stream_block().

◆ pa_launch_parallel_worker()

static ParallelApplyWorkerInfo* pa_launch_parallel_worker ( void  )
static

Definition at line 404 of file applyparallelworker.c.

405 {
406  MemoryContext oldcontext;
407  bool launched;
409  ListCell *lc;
410 
411  /* Try to get an available parallel apply worker from the worker pool. */
412  foreach(lc, ParallelApplyWorkerPool)
413  {
414  winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
415 
416  if (!winfo->in_use)
417  return winfo;
418  }
419 
420  /*
421  * Start a new parallel apply worker.
422  *
423  * The worker info can be used for the lifetime of the worker process, so
424  * create it in a permanent context.
425  */
426  oldcontext = MemoryContextSwitchTo(ApplyContext);
427 
429 
430  /* Setup shared memory. */
431  if (!pa_setup_dsm(winfo))
432  {
433  MemoryContextSwitchTo(oldcontext);
434  pfree(winfo);
435  return NULL;
436  }
437 
442  InvalidOid,
443  dsm_segment_handle(winfo->dsm_seg));
444 
445  if (launched)
446  {
448  }
449  else
450  {
451  pa_free_worker_info(winfo);
452  winfo = NULL;
453  }
454 
455  MemoryContextSwitchTo(oldcontext);
456 
457  return winfo;
458 }
static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
dsm_handle dsm_segment_handle(dsm_segment *seg)
Definition: dsm.c:1094
bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
Definition: launcher.c:306
List * lappend(List *list, void *datum)
Definition: list.c:338
void * palloc0(Size size)
Definition: mcxt.c:1257
#define InvalidOid
Definition: postgres_ext.h:36

References ApplyContext, LogicalRepWorker::dbid, ParallelApplyWorkerInfo::dsm_seg, dsm_segment_handle(), ParallelApplyWorkerInfo::in_use, InvalidOid, lappend(), lfirst, logicalrep_worker_launch(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pa_free_worker_info(), pa_setup_dsm(), palloc0(), ParallelApplyWorkerPool, pfree(), and LogicalRepWorker::userid.

Referenced by pa_allocate_worker().

◆ pa_lock_stream()

void pa_lock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_lock_transaction()

void pa_lock_transaction ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_process_spooled_messages_if_required()

static bool pa_process_spooled_messages_if_required ( void  )
static

Definition at line 658 of file applyparallelworker.c.

659 {
660  PartialFileSetState fileset_state;
661 
662  fileset_state = pa_get_fileset_state();
663 
664  if (fileset_state == FS_EMPTY)
665  return false;
666 
667  /*
668  * If the leader apply worker is busy serializing the partial changes then
669  * acquire the stream lock now and wait for the leader worker to finish
670  * serializing the changes. Otherwise, the parallel apply worker won't get
671  * a chance to receive a STREAM_STOP (and acquire the stream lock) until
672  * the leader had serialized all changes which can lead to undetected
673  * deadlock.
674  *
675  * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
676  * worker has finished serializing the changes.
677  */
678  if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
679  {
682 
683  fileset_state = pa_get_fileset_state();
684  }
685 
686  /*
687  * We cannot read the file immediately after the leader has serialized all
688  * changes to the file because there may still be messages in the memory
689  * queue. We will apply all spooled messages the next time we call this
690  * function and that will ensure there are no messages left in the memory
691  * queue.
692  */
693  if (fileset_state == FS_SERIALIZE_DONE)
694  {
696  }
697  else if (fileset_state == FS_READY)
698  {
703  }
704 
705  return true;
706 }
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2025
@ FS_SERIALIZE_DONE
@ FS_READY
@ FS_SERIALIZE_IN_PROGRESS
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References AccessShareLock, apply_spooled_messages(), ParallelApplyWorkerShared::fileset, FS_EMPTY, FS_READY, FS_SERIALIZE_DONE, FS_SERIALIZE_IN_PROGRESS, InvalidXLogRecPtr, MyParallelShared, pa_get_fileset_state(), pa_lock_stream(), pa_set_fileset_state(), pa_unlock_stream(), and ParallelApplyWorkerShared::xid.

Referenced by LogicalParallelApplyLoop().

◆ pa_reset_subtrans()

void pa_reset_subtrans ( void  )

Definition at line 1405 of file applyparallelworker.c.

1406 {
1407  /*
1408  * We don't need to free this explicitly as the allocated memory will be
1409  * freed at the transaction end.
1410  */
1411  subxactlist = NIL;
1412 }
static List * subxactlist
#define NIL
Definition: pg_list.h:68

References NIL, and subxactlist.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_stream_abort().

◆ pa_savepoint_name()

static void pa_savepoint_name ( Oid  suboid,
TransactionId  xid,
char *  spname,
Size  szsp 
)
static

Definition at line 1351 of file applyparallelworker.c.

1352 {
1353  snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
1354 }
#define snprintf
Definition: port.h:238

References snprintf.

Referenced by pa_start_subtrans(), and pa_stream_abort().

◆ pa_send_data()

bool pa_send_data ( ParallelApplyWorkerInfo winfo,
Size  nbytes,
const void *  data 
)

Definition at line 1149 of file applyparallelworker.c.

1150 {
1151  int rc;
1152  shm_mq_result result;
1153  TimestampTz startTime = 0;
1154 
1156  Assert(!winfo->serialize_changes);
1157 
1158  /*
1159  * We don't try to send data to parallel worker for 'immediate' mode. This
1160  * is primarily used for testing purposes.
1161  */
1163  return false;
1164 
1165 /*
1166  * This timeout is a bit arbitrary but testing revealed that it is sufficient
1167  * to send the message unless the parallel apply worker is waiting on some
1168  * lock or there is a serious resource crunch. See the comments atop this file
1169  * to know why we are using a non-blocking way to send the message.
1170  */
1171 #define SHM_SEND_RETRY_INTERVAL_MS 1000
1172 #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1173 
1174  for (;;)
1175  {
1176  result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
1177 
1178  if (result == SHM_MQ_SUCCESS)
1179  return true;
1180  else if (result == SHM_MQ_DETACHED)
1181  ereport(ERROR,
1182  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1183  errmsg("could not send data to shared-memory queue")));
1184 
1185  Assert(result == SHM_MQ_WOULD_BLOCK);
1186 
1187  /* Wait before retrying. */
1188  rc = WaitLatch(MyLatch,
1192 
1193  if (rc & WL_LATCH_SET)
1194  {
1197  }
1198 
1199  if (startTime == 0)
1200  startTime = GetCurrentTimestamp();
1201  else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
1203  return false;
1204  }
1205 }
#define SHM_SEND_TIMEOUT_MS
#define SHM_SEND_RETRY_INTERVAL_MS
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1719
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
#define unlikely(x)
Definition: c.h:295
int64 TimestampTz
Definition: timestamp.h:39
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
int logical_replication_mode
@ LOGICAL_REP_MODE_IMMEDIATE
Definition: reorderbuffer.h:28
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:330
@ WAIT_EVENT_LOGICAL_APPLY_SEND_DATA
Definition: wait_event.h:109
bool IsTransactionState(void)
Definition: xact.c:378

References Assert(), CHECK_FOR_INTERRUPTS, data, ereport, errcode(), errmsg(), ERROR, GetCurrentTimestamp(), IsTransactionState(), LOGICAL_REP_MODE_IMMEDIATE, logical_replication_mode, ParallelApplyWorkerInfo::mq_handle, MyLatch, ResetLatch(), ParallelApplyWorkerInfo::serialize_changes, SHM_MQ_DETACHED, shm_mq_send(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_SEND_RETRY_INTERVAL_MS, SHM_SEND_TIMEOUT_MS, TimestampDifferenceExceeds(), unlikely, WAIT_EVENT_LOGICAL_APPLY_SEND_DATA, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ pa_set_fileset_state()

◆ pa_set_stream_apply_worker()

void pa_set_stream_apply_worker ( ParallelApplyWorkerInfo winfo)

Definition at line 1337 of file applyparallelworker.c.

1338 {
1339  stream_apply_worker = winfo;
1340 }

References stream_apply_worker.

Referenced by apply_handle_stream_start(), and apply_handle_stream_stop().

◆ pa_set_xact_state()

void pa_set_xact_state ( ParallelApplyWorkerShared wshared,
ParallelTransState  xact_state 
)

◆ pa_setup_dsm()

static bool pa_setup_dsm ( ParallelApplyWorkerInfo winfo)
static

Definition at line 327 of file applyparallelworker.c.

328 {
330  Size segsize;
331  dsm_segment *seg;
332  shm_toc *toc;
334  shm_mq *mq;
335  Size queue_size = DSM_QUEUE_SIZE;
336  Size error_queue_size = DSM_ERROR_QUEUE_SIZE;
337 
338  /*
339  * Estimate how much shared memory we need.
340  *
341  * Because the TOC machinery may choose to insert padding of oddly-sized
342  * requests, we must estimate each chunk separately.
343  *
344  * We need one key to register the location of the header, and two other
345  * keys to track the locations of the message queue and the error message
346  * queue.
347  */
350  shm_toc_estimate_chunk(&e, queue_size);
351  shm_toc_estimate_chunk(&e, error_queue_size);
352 
354  segsize = shm_toc_estimate(&e);
355 
356  /* Create the shared memory segment and establish a table of contents. */
357  seg = dsm_create(shm_toc_estimate(&e), 0);
358  if (!seg)
359  return false;
360 
362  segsize);
363 
364  /* Set up the header region. */
365  shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
366  SpinLockInit(&shared->mutex);
367 
371  shared->fileset_state = FS_EMPTY;
372 
374 
375  /* Set up message queue for the worker. */
376  mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
379 
380  /* Attach the queue. */
381  winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
382 
383  /* Set up error queue for the worker. */
384  mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
385  error_queue_size);
388 
389  /* Attach the queue. */
390  winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
391 
392  /* Return results to caller. */
393  winfo->dsm_seg = seg;
394  winfo->shared = shared;
395 
396  return true;
397 }
#define DSM_ERROR_QUEUE_SIZE
#define DSM_QUEUE_SIZE
#define PARALLEL_APPLY_KEY_SHARED
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
#define PARALLEL_APPLY_KEY_MQ
#define PG_LOGICAL_APPLY_SHM_MAGIC
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:218
void * dsm_segment_address(dsm_segment *seg)
Definition: dsm.c:1066
dsm_segment * dsm_create(Size size, int flags)
Definition: dsm.c:489
e
Definition: preproc-init.c:82
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:291
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:225
shm_mq * shm_mq_create(void *address, Size size)
Definition: shm_mq.c:178
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:207
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
Definition: shm_toc.c:40
Size shm_toc_estimate(shm_toc_estimator *e)
Definition: shm_toc.c:263
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_initialize_estimator(e)
Definition: shm_toc.h:49
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
#define SpinLockInit(lock)
Definition: spin.h:60
PGPROC * MyProc
Definition: proc.c:66
Definition: shm_mq.c:73

References dsm_create(), DSM_ERROR_QUEUE_SIZE, DSM_QUEUE_SIZE, ParallelApplyWorkerInfo::dsm_seg, dsm_segment_address(), ParallelApplyWorkerInfo::error_mq_handle, ParallelApplyWorkerShared::fileset_state, FS_EMPTY, InvalidXLogRecPtr, ParallelApplyWorkerShared::last_commit_end, ParallelApplyWorkerInfo::mq_handle, ParallelApplyWorkerShared::mutex, MyProc, PARALLEL_APPLY_KEY_ERROR_QUEUE, PARALLEL_APPLY_KEY_MQ, PARALLEL_APPLY_KEY_SHARED, PARALLEL_TRANS_UNKNOWN, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_init_u32(), PG_LOGICAL_APPLY_SHM_MAGIC, ParallelApplyWorkerInfo::shared, shm_mq_attach(), shm_mq_create(), shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_allocate(), shm_toc_create(), shm_toc_estimate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_initialize_estimator, shm_toc_insert(), SpinLockInit, and ParallelApplyWorkerShared::xact_state.

Referenced by pa_launch_parallel_worker().

◆ pa_shutdown()

static void pa_shutdown ( int  code,
Datum  arg 
)
static

Definition at line 847 of file applyparallelworker.c.

848 {
852 
854 }
#define InvalidBackendId
Definition: backendid.h:23
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
@ PROCSIG_PARALLEL_APPLY_MESSAGE
Definition: procsignal.h:38

References arg, DatumGetPointer(), dsm_detach(), InvalidBackendId, LogicalRepWorker::leader_pid, MyLogicalRepWorker, PROCSIG_PARALLEL_APPLY_MESSAGE, and SendProcSignal().

Referenced by ParallelApplyWorkerMain().

◆ pa_start_subtrans()

void pa_start_subtrans ( TransactionId  current_xid,
TransactionId  top_xid 
)

Definition at line 1365 of file applyparallelworker.c.

1366 {
1367  if (current_xid != top_xid &&
1368  !list_member_xid(subxactlist, current_xid))
1369  {
1370  MemoryContext oldctx;
1371  char spname[NAMEDATALEN];
1372 
1373  pa_savepoint_name(MySubscription->oid, current_xid,
1374  spname, sizeof(spname));
1375 
1376  elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1377 
1378  /* We must be in transaction block to define the SAVEPOINT. */
1379  if (!IsTransactionBlock())
1380  {
1381  if (!IsTransactionState())
1383 
1386  }
1387 
1388  DefineSavepoint(spname);
1389 
1390  /*
1391  * CommitTransactionCommand is needed to start a subtransaction after
1392  * issuing a SAVEPOINT inside a transaction block (see
1393  * StartSubTransaction()).
1394  */
1396 
1398  subxactlist = lappend_xid(subxactlist, current_xid);
1399  MemoryContextSwitchTo(oldctx);
1400  }
1401 }
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
#define DEBUG1
Definition: elog.h:30
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:392
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:741
MemoryContext TopTransactionContext
Definition: mcxt.c:146
#define NAMEDATALEN
void DefineSavepoint(const char *name)
Definition: xact.c:4218
void StartTransactionCommand(void)
Definition: xact.c:2937
bool IsTransactionBlock(void)
Definition: xact.c:4815
void BeginTransactionBlock(void)
Definition: xact.c:3769
void CommitTransactionCommand(void)
Definition: xact.c:3034

References BeginTransactionBlock(), CommitTransactionCommand(), DEBUG1, DefineSavepoint(), elog(), IsTransactionBlock(), IsTransactionState(), lappend_xid(), list_member_xid(), MemoryContextSwitchTo(), MySubscription, NAMEDATALEN, Subscription::oid, pa_savepoint_name(), StartTransactionCommand(), subxactlist, and TopTransactionContext.

Referenced by handle_streamed_transaction().

◆ pa_stream_abort()

void pa_stream_abort ( LogicalRepStreamAbortData abort_data)

Definition at line 1419 of file applyparallelworker.c.

1420 {
1421  TransactionId xid = abort_data->xid;
1422  TransactionId subxid = abort_data->subxid;
1423 
1424  /*
1425  * Update origin state so we can restart streaming from correct position
1426  * in case of crash.
1427  */
1430 
1431  /*
1432  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1433  * just free the subxactlist.
1434  */
1435  if (subxid == xid)
1436  {
1438 
1439  /*
1440  * Release the lock as we might be processing an empty streaming
1441  * transaction in which case the lock won't be released during
1442  * transaction rollback.
1443  *
1444  * Note that it's ok to release the transaction lock before aborting
1445  * the transaction because even if the parallel apply worker dies due
1446  * to crash or some other reason, such a transaction would still be
1447  * considered aborted.
1448  */
1450 
1452 
1453  if (IsTransactionBlock())
1454  {
1455  EndTransactionBlock(false);
1457  }
1458 
1460 
1462  }
1463  else
1464  {
1465  /* OK, so it's a subxact. Rollback to the savepoint. */
1466  int i;
1467  char spname[NAMEDATALEN];
1468 
1469  pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
1470 
1471  elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1472 
1473  /*
1474  * Search the subxactlist, determine the offset tracked for the
1475  * subxact, and truncate the list.
1476  *
1477  * Note that for an empty sub-transaction we won't find the subxid
1478  * here.
1479  */
1480  for (i = list_length(subxactlist) - 1; i >= 0; i--)
1481  {
1483 
1484  if (xid_tmp == subxid)
1485  {
1486  RollbackToSavepoint(spname);
1489  break;
1490  }
1491  }
1492  }
1493 }
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
int i
Definition: isn.c:73
List * list_truncate(List *list, int new_size)
Definition: list.c:630
#define AccessExclusiveLock
Definition: lockdefs.h:43
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
static ListCell * list_nth_cell(const List *list, int n)
Definition: pg_list.h:277
#define lfirst_xid(lc)
Definition: pg_list.h:175
void RollbackToSavepoint(const char *name)
Definition: xact.c:4412
bool EndTransactionBlock(bool chain)
Definition: xact.c:3889
void AbortCurrentTransaction(void)
Definition: xact.c:3304

References LogicalRepStreamAbortData::abort_lsn, LogicalRepStreamAbortData::abort_time, AbortCurrentTransaction(), AccessExclusiveLock, CommitTransactionCommand(), DEBUG1, elog(), EndTransactionBlock(), i, IsTransactionBlock(), lfirst_xid, list_length(), list_nth_cell(), list_truncate(), MyParallelShared, MySubscription, NAMEDATALEN, Subscription::oid, pa_reset_subtrans(), pa_savepoint_name(), pa_set_xact_state(), pa_unlock_transaction(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, RollbackToSavepoint(), STATE_IDLE, subxactlist, LogicalRepStreamAbortData::subxid, and LogicalRepStreamAbortData::xid.

Referenced by apply_handle_stream_abort().

◆ pa_switch_to_partial_serialize()

void pa_switch_to_partial_serialize ( ParallelApplyWorkerInfo winfo,
bool  stream_locked 
)

Definition at line 1214 of file applyparallelworker.c.

1216 {
1217  ereport(LOG,
1218  (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1219  winfo->shared->xid)));
1220 
1221  /*
1222  * The parallel apply worker could be stuck for some reason (say waiting
1223  * on some lock by other backend), so stop trying to send data directly to
1224  * it and start serializing data to the file instead.
1225  */
1226  winfo->serialize_changes = true;
1227 
1228  /* Initialize the stream fileset. */
1229  stream_start_internal(winfo->shared->xid, true);
1230 
1231  /*
1232  * Acquires the stream lock if not already to make sure that the parallel
1233  * apply worker will wait for the leader to release the stream lock until
1234  * the end of the transaction.
1235  */
1236  if (!stream_locked)
1238 
1240 }
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1453
#define LOG
Definition: elog.h:31

References AccessExclusiveLock, ereport, errmsg(), FS_SERIALIZE_IN_PROGRESS, LOG, pa_lock_stream(), pa_set_fileset_state(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, stream_start_internal(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ pa_unlock_stream()

void pa_unlock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_unlock_transaction()

◆ pa_wait_for_xact_finish()

static void pa_wait_for_xact_finish ( ParallelApplyWorkerInfo winfo)
static

Definition at line 1277 of file applyparallelworker.c.

1278 {
1279  /*
1280  * Wait until the parallel apply worker set the state to
1281  * PARALLEL_TRANS_STARTED which means it has acquired the transaction
1282  * lock. This is to prevent leader apply worker from acquiring the
1283  * transaction lock earlier than the parallel apply worker.
1284  */
1286 
1287  /*
1288  * Wait for the transaction lock to be released. This is required to
1289  * detect deadlock among leader and parallel apply workers. Refer to the
1290  * comments atop this file.
1291  */
1294 
1295  /*
1296  * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
1297  * apply worker failed while applying changes causing the lock to be
1298  * released.
1299  */
1301  ereport(ERROR,
1302  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1303  errmsg("lost connection to the logical replication parallel apply worker")));
1304 }
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
@ PARALLEL_TRANS_STARTED

References AccessShareLock, ereport, errcode(), errmsg(), ERROR, pa_get_xact_state(), pa_lock_transaction(), pa_unlock_transaction(), pa_wait_for_xact_state(), PARALLEL_TRANS_FINISHED, PARALLEL_TRANS_STARTED, ParallelApplyWorkerInfo::shared, and ParallelApplyWorkerShared::xid.

Referenced by pa_xact_finish().

◆ pa_wait_for_xact_state()

static void pa_wait_for_xact_state ( ParallelApplyWorkerInfo winfo,
ParallelTransState  xact_state 
)
static

Definition at line 1247 of file applyparallelworker.c.

1249 {
1250  for (;;)
1251  {
1252  /*
1253  * Stop if the transaction state has reached or exceeded the given
1254  * xact_state.
1255  */
1256  if (pa_get_xact_state(winfo->shared) >= xact_state)
1257  break;
1258 
1259  /* Wait to be signalled. */
1260  (void) WaitLatch(MyLatch,
1262  10L,
1264 
1265  /* Reset the latch so we don't spin. */
1267 
1268  /* An interrupt may have occurred while we were waiting. */
1270  }
1271 }
@ WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE
Definition: wait_event.h:110

References CHECK_FOR_INTERRUPTS, MyLatch, pa_get_xact_state(), ResetLatch(), ParallelApplyWorkerInfo::shared, WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by pa_wait_for_xact_finish().

◆ pa_xact_finish()

void pa_xact_finish ( ParallelApplyWorkerInfo winfo,
XLogRecPtr  remote_lsn 
)

Definition at line 1621 of file applyparallelworker.c.

1622 {
1624 
1625  /*
1626  * Unlock the shared object lock so that parallel apply worker can
1627  * continue to receive and apply changes.
1628  */
1630 
1631  /*
1632  * Wait for that worker to finish. This is necessary to maintain commit
1633  * order which avoids failures due to transaction dependencies and
1634  * deadlocks.
1635  */
1636  pa_wait_for_xact_finish(winfo);
1637 
1638  if (!XLogRecPtrIsInvalid(remote_lsn))
1639  store_flush_position(remote_lsn, winfo->shared->last_commit_end);
1640 
1641  pa_free_worker(winfo);
1642 }
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3449

References AccessExclusiveLock, am_leader_apply_worker(), Assert(), ParallelApplyWorkerShared::last_commit_end, pa_free_worker(), pa_unlock_stream(), pa_wait_for_xact_finish(), ParallelApplyWorkerInfo::shared, store_flush_position(), ParallelApplyWorkerShared::xid, and XLogRecPtrIsInvalid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), and apply_handle_stream_prepare().

◆ ParallelApplyWorkerMain()

void ParallelApplyWorkerMain ( Datum  main_arg)

Definition at line 860 of file applyparallelworker.c.

861 {
863  dsm_handle handle;
864  dsm_segment *seg;
865  shm_toc *toc;
866  shm_mq *mq;
867  shm_mq_handle *mqh;
868  shm_mq_handle *error_mqh;
869  RepOriginId originid;
870  int worker_slot = DatumGetInt32(main_arg);
871  char originname[NAMEDATALEN];
872 
874 
875  /* Setup signal handling. */
878  pqsignal(SIGTERM, die);
880 
881  /*
882  * Attach to the dynamic shared memory segment for the parallel apply, and
883  * find its table of contents.
884  *
885  * Like parallel query, we don't need resource owner by this time. See
886  * ParallelWorkerMain.
887  */
888  memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
889  seg = dsm_attach(handle);
890  if (!seg)
891  ereport(ERROR,
892  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
893  errmsg("unable to map dynamic shared memory segment")));
894 
896  if (!toc)
897  ereport(ERROR,
898  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
899  errmsg("bad magic number in dynamic shared memory segment")));
900 
901  /* Look up the shared information. */
902  shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
903  MyParallelShared = shared;
904 
905  /*
906  * Attach to the message queue.
907  */
908  mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
910  mqh = shm_mq_attach(mq, seg, NULL);
911 
912  /*
913  * Primary initialization is complete. Now, we can attach to our slot.
914  * This is to ensure that the leader apply worker does not write data to
915  * the uninitialized memory queue.
916  */
917  logicalrep_worker_attach(worker_slot);
918 
919  /*
920  * Register the shutdown callback after we are attached to the worker
921  * slot. This is to ensure that MyLogicalRepWorker remains valid when this
922  * callback is invoked.
923  */
925 
930 
931  /*
932  * Attach to the error queue.
933  */
936  error_mqh = shm_mq_attach(mq, seg, NULL);
937 
938  pq_redirect_to_shm_mq(seg, error_mqh);
941 
944 
946 
947  InitializingApplyWorker = false;
948 
949  /* Setup replication origin tracking. */
952  originname, sizeof(originname));
953  originid = replorigin_by_name(originname, false);
954 
955  /*
956  * The parallel apply worker doesn't need to monopolize this replication
957  * origin which was already acquired by its leader process.
958  */
960  replorigin_session_origin = originid;
962 
963  /*
964  * Setup callback for syscache so that we know when something changes in
965  * the subscription relation state.
966  */
969  (Datum) 0);
970 
971  set_apply_error_context_origin(originname);
972 
974 
975  /*
976  * The parallel apply worker must not get here because the parallel apply
977  * worker will only stop when it receives a SIGTERM or SIGINT from the
978  * leader, or when there is an error. None of these cases will allow the
979  * code to reach here.
980  */
981  Assert(false);
982 }
static void pa_shutdown(int code, Datum arg)
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
void InitializeApplyWorker(void)
Definition: worker.c:4445
bool InitializingApplyWorker
Definition: worker.c:335
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:461
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5058
dsm_segment * dsm_attach(dsm_handle h)
Definition: dsm.c:638
uint32 dsm_handle
Definition: dsm_impl.h:55
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:109
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
void logicalrep_worker_attach(int slot)
Definition: launcher.c:692
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
RepOriginId replorigin_session_origin
Definition: origin.c:156
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1095
#define die(msg)
Definition: pg_test_fsync.c:95
pqsigfunc pqsignal(int signo, pqsigfunc func)
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5660
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:193
void pq_set_parallel_leader(pid_t pid, BackendId backend_id)
Definition: pqmq.c:78
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition: pqmq.c:53
shm_toc * shm_toc_attach(uint64 magic, void *address)
Definition: shm_toc.c:64
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
char bgw_extra[BGW_EXTRALEN]
Definition: bgworker.h:99
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
@ SUBSCRIPTIONRELMAP
Definition: syscache.h:100
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:272
#define SIGHUP
Definition: win32_port.h:176
uint16 RepOriginId
Definition: xlogdefs.h:65

References Assert(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), BackgroundWorker::bgw_extra, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), DatumGetInt32(), die, dsm_attach(), dsm_segment_address(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::generation, InitializeApplyWorker(), InitializingApplyWorker, invalidate_syncing_table_states(), InvalidBackendId, InvalidOid, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::leader_pid, LogicalParallelApplyLoop(), logicalrep_worker_attach(), ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, ParallelApplyWorkerShared::mutex, MyBgworkerEntry, MyLogicalRepWorker, MyParallelShared, MyProc, MySubscription, NAMEDATALEN, Subscription::oid, pa_shutdown(), PARALLEL_APPLY_KEY_ERROR_QUEUE, PARALLEL_APPLY_KEY_MQ, PARALLEL_APPLY_KEY_SHARED, PG_LOGICAL_APPLY_SHM_MAGIC, PointerGetDatum(), pq_redirect_to_shm_mq(), pq_set_parallel_leader(), pqsignal(), ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, set_apply_error_context_origin(), shm_mq_attach(), shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_attach(), shm_toc_lookup(), SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), and SUBSCRIPTIONRELMAP.

◆ ProcessParallelApplyInterrupts()

static void ProcessParallelApplyInterrupts ( void  )
static

Definition at line 712 of file applyparallelworker.c.

713 {
715 
717  {
718  ereport(LOG,
719  (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
720  MySubscription->name)));
721 
722  proc_exit(0);
723  }
724 
726  {
727  ConfigReloadPending = false;
729  }
730 }
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void proc_exit(int code)
Definition: ipc.c:104

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, ereport, errmsg(), LOG, MySubscription, Subscription::name, PGC_SIGHUP, proc_exit(), ProcessConfigFile(), and ShutdownRequestPending.

Referenced by LogicalParallelApplyLoop().

Variable Documentation

◆ MyParallelShared

◆ ParallelApplyMessagePending

volatile sig_atomic_t ParallelApplyMessagePending = false

◆ ParallelApplyTxnHash

HTAB* ParallelApplyTxnHash = NULL
static

Definition at line 225 of file applyparallelworker.c.

Referenced by pa_allocate_worker(), pa_find_worker(), and pa_free_worker().

◆ ParallelApplyWorkerPool

◆ stream_apply_worker

ParallelApplyWorkerInfo* stream_apply_worker = NULL
static

Definition at line 252 of file applyparallelworker.c.

Referenced by pa_find_worker(), and pa_set_stream_apply_worker().

◆ subxactlist

List* subxactlist = NIL
static

Definition at line 255 of file applyparallelworker.c.

Referenced by pa_reset_subtrans(), pa_start_subtrans(), and pa_stream_abort().