PostgreSQL Source Code  git master
logical.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * logical.c
3  * PostgreSQL logical decoding coordination
4  *
5  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/logical.c
9  *
10  * NOTES
11  * This file coordinates interaction between the various modules that
12  * together provide logical decoding, primarily by providing so
13  * called LogicalDecodingContexts. The goal is to encapsulate most of the
14  * internal complexity for consumers of logical decoding, so they can
15  * create and consume a changestream with a low amount of code. Builtin
16  * consumers are the walsender and SQL SRF interface, but it's possible to
17  * add further ones without changing core code, e.g. to consume changes in
18  * a bgworker.
19  *
20  * The idea is that a consumer provides three callbacks, one to read WAL,
21  * one to prepare a data write, and a final one for actually writing since
22  * their implementation depends on the type of consumer. Check
23  * logicalfuncs.c for an example implementation of a fairly simple consumer
24  * and an implementation of a WAL reading callback that's suitable for
25  * simple consumers.
26  *-------------------------------------------------------------------------
27  */
28 
29 #include "postgres.h"
30 
31 #include "access/xact.h"
32 #include "access/xlogutils.h"
33 #include "fmgr.h"
34 #include "miscadmin.h"
35 #include "pgstat.h"
36 #include "replication/decode.h"
37 #include "replication/logical.h"
39 #include "replication/snapbuild.h"
40 #include "storage/proc.h"
41 #include "storage/procarray.h"
42 #include "utils/builtins.h"
43 #include "utils/inval.h"
44 #include "utils/memutils.h"
45 
46 /* data for errcontext callback */
48 {
50  const char *callback_name;
53 
54 /* wrappers around output plugin callbacks */
55 static void output_plugin_error_callback(void *arg);
57  bool is_init);
59 static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
60 static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
61  XLogRecPtr commit_lsn);
63 static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
64  XLogRecPtr prepare_lsn);
66  XLogRecPtr commit_lsn);
68  XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
69 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
70  Relation relation, ReorderBufferChange *change);
71 static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
72  int nrelations, Relation relations[], ReorderBufferChange *change);
73 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
74  XLogRecPtr message_lsn, bool transactional,
75  const char *prefix, Size message_size, const char *message);
76 
77 /* streaming callbacks */
79  XLogRecPtr first_lsn);
81  XLogRecPtr last_lsn);
83  XLogRecPtr abort_lsn);
85  XLogRecPtr prepare_lsn);
87  XLogRecPtr commit_lsn);
89  Relation relation, ReorderBufferChange *change);
91  XLogRecPtr message_lsn, bool transactional,
92  const char *prefix, Size message_size, const char *message);
94  int nrelations, Relation relations[], ReorderBufferChange *change);
95 
96 /* callback to update txn's progress */
98  ReorderBufferTXN *txn,
99  XLogRecPtr lsn);
100 
101 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
102 
103 /*
104  * Make sure the current settings & environment are capable of doing logical
105  * decoding.
106  */
107 void
109 {
111 
112  /*
113  * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
114  * needs the same check.
115  */
116 
118  ereport(ERROR,
119  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
120  errmsg("logical decoding requires wal_level >= logical")));
121 
122  if (MyDatabaseId == InvalidOid)
123  ereport(ERROR,
124  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
125  errmsg("logical decoding requires a database connection")));
126 
127  if (RecoveryInProgress())
128  {
129  /*
130  * This check may have race conditions, but whenever
131  * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
132  * verify that there are no existing logical replication slots. And to
133  * avoid races around creating a new slot,
134  * CheckLogicalDecodingRequirements() is called once before creating
135  * the slot, and once when logical decoding is initially starting up.
136  */
138  ereport(ERROR,
139  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
140  errmsg("logical decoding on standby requires wal_level >= logical on the primary")));
141  }
142 }
143 
144 /*
145  * Helper function for CreateInitDecodingContext() and
146  * CreateDecodingContext() performing common tasks.
147  */
148 static LogicalDecodingContext *
149 StartupDecodingContext(List *output_plugin_options,
150  XLogRecPtr start_lsn,
151  TransactionId xmin_horizon,
152  bool need_full_snapshot,
153  bool fast_forward,
154  XLogReaderRoutine *xl_routine,
158 {
159  ReplicationSlot *slot;
160  MemoryContext context,
161  old_context;
163 
164  /* shorter lines... */
165  slot = MyReplicationSlot;
166 
168  "Logical decoding context",
170  old_context = MemoryContextSwitchTo(context);
171  ctx = palloc0(sizeof(LogicalDecodingContext));
172 
173  ctx->context = context;
174 
175  /*
176  * (re-)load output plugins, so we detect a bad (removed) output plugin
177  * now.
178  */
179  if (!fast_forward)
181 
182  /*
183  * Now that the slot's xmin has been set, we can announce ourselves as a
184  * logical decoding backend which doesn't need to be checked individually
185  * when computing the xmin horizon because the xmin is enforced via
186  * replication slots.
187  *
188  * We can only do so if we're outside of a transaction (i.e. the case when
189  * streaming changes via walsender), otherwise an already setup
190  * snapshot/xid would end up being ignored. That's not a particularly
191  * bothersome restriction since the SQL interface can't be used for
192  * streaming anyway.
193  */
195  {
196  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
199  LWLockRelease(ProcArrayLock);
200  }
201 
202  ctx->slot = slot;
203 
204  ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
205  if (!ctx->reader)
206  ereport(ERROR,
207  (errcode(ERRCODE_OUT_OF_MEMORY),
208  errmsg("out of memory"),
209  errdetail("Failed while allocating a WAL reading processor.")));
210 
212  ctx->snapshot_builder =
213  AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
214  need_full_snapshot, slot->data.two_phase_at);
215 
216  ctx->reorder->private_data = ctx;
217 
218  /* wrap output plugin callbacks, so we can add error context information */
224 
225  /*
226  * To support streaming, we require start/stop/abort/commit/change
227  * callbacks. The message and truncate callbacks are optional, similar to
228  * regular output plugins. We however enable streaming when at least one
229  * of the methods is enabled so that we can easily identify missing
230  * methods.
231  *
232  * We decide it here, but only check it later in the wrappers.
233  */
234  ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
235  (ctx->callbacks.stream_stop_cb != NULL) ||
236  (ctx->callbacks.stream_abort_cb != NULL) ||
237  (ctx->callbacks.stream_commit_cb != NULL) ||
238  (ctx->callbacks.stream_change_cb != NULL) ||
239  (ctx->callbacks.stream_message_cb != NULL) ||
240  (ctx->callbacks.stream_truncate_cb != NULL);
241 
242  /*
243  * streaming callbacks
244  *
245  * stream_message and stream_truncate callbacks are optional, so we do not
246  * fail with ERROR when missing, but the wrappers simply do nothing. We
247  * must set the ReorderBuffer callbacks to something, otherwise the calls
248  * from there will crash (we don't want to move the checks there).
249  */
258 
259 
260  /*
261  * To support two-phase logical decoding, we require
262  * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
263  * filter_prepare callback is optional. We however enable two-phase
264  * logical decoding when at least one of the methods is enabled so that we
265  * can easily identify missing methods.
266  *
267  * We decide it here, but only check it later in the wrappers.
268  */
269  ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
270  (ctx->callbacks.prepare_cb != NULL) ||
271  (ctx->callbacks.commit_prepared_cb != NULL) ||
272  (ctx->callbacks.rollback_prepared_cb != NULL) ||
273  (ctx->callbacks.stream_prepare_cb != NULL) ||
274  (ctx->callbacks.filter_prepare_cb != NULL);
275 
276  /*
277  * Callback to support decoding at prepare time.
278  */
283 
284  /*
285  * Callback to support updating progress during sending data of a
286  * transaction (and its subtransactions) to the output plugin.
287  */
289 
290  ctx->out = makeStringInfo();
291  ctx->prepare_write = prepare_write;
292  ctx->write = do_write;
293  ctx->update_progress = update_progress;
294 
295  ctx->output_plugin_options = output_plugin_options;
296 
297  ctx->fast_forward = fast_forward;
298 
299  MemoryContextSwitchTo(old_context);
300 
301  return ctx;
302 }
303 
304 /*
305  * Create a new decoding context, for a new logical slot.
306  *
307  * plugin -- contains the name of the output plugin
308  * output_plugin_options -- contains options passed to the output plugin
309  * need_full_snapshot -- if true, must obtain a snapshot able to read all
310  * tables; if false, one that can read only catalogs is acceptable.
311  * restart_lsn -- if given as invalid, it's this routine's responsibility to
312  * mark WAL as reserved by setting a convenient restart_lsn for the slot.
313  * Otherwise, we set for decoding to start from the given LSN without
314  * marking WAL reserved beforehand. In that scenario, it's up to the
315  * caller to guarantee that WAL remains available.
316  * xl_routine -- XLogReaderRoutine for underlying XLogReader
317  * prepare_write, do_write, update_progress --
318  * callbacks that perform the use-case dependent, actual, work.
319  *
320  * Needs to be called while in a memory context that's at least as long lived
321  * as the decoding context because further memory contexts will be created
322  * inside it.
323  *
324  * Returns an initialized decoding context after calling the output plugin's
325  * startup function.
326  */
329  List *output_plugin_options,
330  bool need_full_snapshot,
331  XLogRecPtr restart_lsn,
332  XLogReaderRoutine *xl_routine,
336 {
337  TransactionId xmin_horizon = InvalidTransactionId;
338  ReplicationSlot *slot;
339  NameData plugin_name;
341  MemoryContext old_context;
342 
343  /*
344  * On a standby, this check is also required while creating the slot.
345  * Check the comments in the function.
346  */
348 
349  /* shorter lines... */
350  slot = MyReplicationSlot;
351 
352  /* first some sanity checks that are unlikely to be violated */
353  if (slot == NULL)
354  elog(ERROR, "cannot perform logical decoding without an acquired slot");
355 
356  if (plugin == NULL)
357  elog(ERROR, "cannot initialize logical decoding without a specified plugin");
358 
359  /* Make sure the passed slot is suitable. These are user facing errors. */
360  if (SlotIsPhysical(slot))
361  ereport(ERROR,
362  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
363  errmsg("cannot use physical replication slot for logical decoding")));
364 
365  if (slot->data.database != MyDatabaseId)
366  ereport(ERROR,
367  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
368  errmsg("replication slot \"%s\" was not created in this database",
369  NameStr(slot->data.name))));
370 
371  if (IsTransactionState() &&
373  ereport(ERROR,
374  (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
375  errmsg("cannot create logical replication slot in transaction that has performed writes")));
376 
377  /*
378  * Register output plugin name with slot. We need the mutex to avoid
379  * concurrent reading of a partially copied string. But we don't want any
380  * complicated code while holding a spinlock, so do namestrcpy() outside.
381  */
382  namestrcpy(&plugin_name, plugin);
383  SpinLockAcquire(&slot->mutex);
384  slot->data.plugin = plugin_name;
385  SpinLockRelease(&slot->mutex);
386 
387  if (XLogRecPtrIsInvalid(restart_lsn))
389  else
390  {
391  SpinLockAcquire(&slot->mutex);
392  slot->data.restart_lsn = restart_lsn;
393  SpinLockRelease(&slot->mutex);
394  }
395 
396  /* ----
397  * This is a bit tricky: We need to determine a safe xmin horizon to start
398  * decoding from, to avoid starting from a running xacts record referring
399  * to xids whose rows have been vacuumed or pruned
400  * already. GetOldestSafeDecodingTransactionId() returns such a value, but
401  * without further interlock its return value might immediately be out of
402  * date.
403  *
404  * So we have to acquire the ProcArrayLock to prevent computation of new
405  * xmin horizons by other backends, get the safe decoding xid, and inform
406  * the slot machinery about the new limit. Once that's done the
407  * ProcArrayLock can be released as the slot machinery now is
408  * protecting against vacuum.
409  *
410  * Note that, temporarily, the data, not just the catalog, xmin has to be
411  * reserved if a data snapshot is to be exported. Otherwise the initial
412  * data snapshot created here is not guaranteed to be valid. After that
413  * the data xmin doesn't need to be managed anymore and the global xmin
414  * should be recomputed. As we are fine with losing the pegged data xmin
415  * after crash - no chance a snapshot would get exported anymore - we can
416  * get away with just setting the slot's
417  * effective_xmin. ReplicationSlotRelease will reset it again.
418  *
419  * ----
420  */
421  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
422 
423  xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
424 
425  SpinLockAcquire(&slot->mutex);
426  slot->effective_catalog_xmin = xmin_horizon;
427  slot->data.catalog_xmin = xmin_horizon;
428  if (need_full_snapshot)
429  slot->effective_xmin = xmin_horizon;
430  SpinLockRelease(&slot->mutex);
431 
433 
434  LWLockRelease(ProcArrayLock);
435 
438 
439  ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
440  need_full_snapshot, false,
441  xl_routine, prepare_write, do_write,
442  update_progress);
443 
444  /* call output plugin initialization callback */
445  old_context = MemoryContextSwitchTo(ctx->context);
446  if (ctx->callbacks.startup_cb != NULL)
447  startup_cb_wrapper(ctx, &ctx->options, true);
448  MemoryContextSwitchTo(old_context);
449 
450  /*
451  * We allow decoding of prepared transactions when the two_phase is
452  * enabled at the time of slot creation, or when the two_phase option is
453  * given at the streaming start, provided the plugin supports all the
454  * callbacks for two-phase.
455  */
456  ctx->twophase &= slot->data.two_phase;
457 
459 
460  return ctx;
461 }
462 
463 /*
464  * Create a new decoding context, for a logical slot that has previously been
465  * used already.
466  *
467  * start_lsn
468  * The LSN at which to start decoding. If InvalidXLogRecPtr, restart
469  * from the slot's confirmed_flush; otherwise, start from the specified
470  * location (but move it forwards to confirmed_flush if it's older than
471  * that, see below).
472  *
473  * output_plugin_options
474  * options passed to the output plugin.
475  *
476  * fast_forward
477  * bypass the generation of logical changes.
478  *
479  * xl_routine
480  * XLogReaderRoutine used by underlying xlogreader
481  *
482  * prepare_write, do_write, update_progress
483  * callbacks that have to be filled to perform the use-case dependent,
484  * actual work.
485  *
486  * Needs to be called while in a memory context that's at least as long lived
487  * as the decoding context because further memory contexts will be created
488  * inside it.
489  *
490  * Returns an initialized decoding context after calling the output plugin's
491  * startup function.
492  */
495  List *output_plugin_options,
496  bool fast_forward,
497  XLogReaderRoutine *xl_routine,
501 {
503  ReplicationSlot *slot;
504  MemoryContext old_context;
505 
506  /* shorter lines... */
507  slot = MyReplicationSlot;
508 
509  /* first some sanity checks that are unlikely to be violated */
510  if (slot == NULL)
511  elog(ERROR, "cannot perform logical decoding without an acquired slot");
512 
513  /* make sure the passed slot is suitable, these are user facing errors */
514  if (SlotIsPhysical(slot))
515  ereport(ERROR,
516  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
517  errmsg("cannot use physical replication slot for logical decoding")));
518 
519  if (slot->data.database != MyDatabaseId)
520  ereport(ERROR,
521  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
522  errmsg("replication slot \"%s\" was not created in this database",
523  NameStr(slot->data.name))));
524 
525  /*
526  * Do not allow consumption of a "synchronized" slot until the standby
527  * gets promoted.
528  */
529  if (RecoveryInProgress() && slot->data.synced)
530  ereport(ERROR,
531  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
532  errmsg("cannot use replication slot \"%s\" for logical decoding",
533  NameStr(slot->data.name)),
534  errdetail("This slot is being synchronized from the primary server."),
535  errhint("Specify another replication slot."));
536 
537  /*
538  * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
539  * "cannot get changes" wording in this errmsg because that'd be
540  * confusingly ambiguous about no changes being available when called from
541  * pg_logical_slot_get_changes_guts().
542  */
544  ereport(ERROR,
545  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
546  errmsg("can no longer get changes from replication slot \"%s\"",
548  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
549 
551  ereport(ERROR,
552  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
553  errmsg("can no longer get changes from replication slot \"%s\"",
555  errdetail("This slot has been invalidated because it was conflicting with recovery.")));
556 
559 
560  if (start_lsn == InvalidXLogRecPtr)
561  {
562  /* continue from last position */
563  start_lsn = slot->data.confirmed_flush;
564  }
565  else if (start_lsn < slot->data.confirmed_flush)
566  {
567  /*
568  * It might seem like we should error out in this case, but it's
569  * pretty common for a client to acknowledge a LSN it doesn't have to
570  * do anything for, and thus didn't store persistently, because the
571  * xlog records didn't result in anything relevant for logical
572  * decoding. Clients have to be able to do that to support synchronous
573  * replication.
574  *
575  * Starting at a different LSN than requested might not catch certain
576  * kinds of client errors; so the client may wish to check that
577  * confirmed_flush_lsn matches its expectations.
578  */
579  elog(LOG, "%X/%X has been already streamed, forwarding to %X/%X",
580  LSN_FORMAT_ARGS(start_lsn),
582 
583  start_lsn = slot->data.confirmed_flush;
584  }
585 
586  ctx = StartupDecodingContext(output_plugin_options,
587  start_lsn, InvalidTransactionId, false,
588  fast_forward, xl_routine, prepare_write,
589  do_write, update_progress);
590 
591  /* call output plugin initialization callback */
592  old_context = MemoryContextSwitchTo(ctx->context);
593  if (ctx->callbacks.startup_cb != NULL)
594  startup_cb_wrapper(ctx, &ctx->options, false);
595  MemoryContextSwitchTo(old_context);
596 
597  /*
598  * We allow decoding of prepared transactions when the two_phase is
599  * enabled at the time of slot creation, or when the two_phase option is
600  * given at the streaming start, provided the plugin supports all the
601  * callbacks for two-phase.
602  */
603  ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
604 
605  /* Mark slot to allow two_phase decoding if not already marked */
606  if (ctx->twophase && !slot->data.two_phase)
607  {
608  SpinLockAcquire(&slot->mutex);
609  slot->data.two_phase = true;
610  slot->data.two_phase_at = start_lsn;
611  SpinLockRelease(&slot->mutex);
614  SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
615  }
616 
618 
619  ereport(LOG,
620  (errmsg("starting logical decoding for slot \"%s\"",
621  NameStr(slot->data.name)),
622  errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
624  LSN_FORMAT_ARGS(slot->data.restart_lsn))));
625 
626  return ctx;
627 }
628 
629 /*
630  * Returns true if a consistent initial decoding snapshot has been built.
631  */
632 bool
634 {
636 }
637 
638 /*
639  * Read from the decoding slot, until it is ready to start extracting changes.
640  */
641 void
643 {
644  ReplicationSlot *slot = ctx->slot;
645 
646  /* Initialize from where to start reading WAL. */
647  XLogBeginRead(ctx->reader, slot->data.restart_lsn);
648 
649  elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
651 
652  /* Wait for a consistent starting point */
653  for (;;)
654  {
655  XLogRecord *record;
656  char *err = NULL;
657 
658  /* the read_page callback waits for new WAL */
659  record = XLogReadRecord(ctx->reader, &err);
660  if (err)
661  elog(ERROR, "could not find logical decoding starting point: %s", err);
662  if (!record)
663  elog(ERROR, "could not find logical decoding starting point");
664 
666 
667  /* only continue till we found a consistent spot */
668  if (DecodingContextReady(ctx))
669  break;
670 
672  }
673 
674  SpinLockAcquire(&slot->mutex);
675  slot->data.confirmed_flush = ctx->reader->EndRecPtr;
676  if (slot->data.two_phase)
677  slot->data.two_phase_at = ctx->reader->EndRecPtr;
678  SpinLockRelease(&slot->mutex);
679 }
680 
681 /*
682  * Free a previously allocated decoding context, invoking the shutdown
683  * callback if necessary.
684  */
685 void
687 {
688  if (ctx->callbacks.shutdown_cb != NULL)
689  shutdown_cb_wrapper(ctx);
690 
693  XLogReaderFree(ctx->reader);
695 }
696 
697 /*
698  * Prepare a write using the context's output routine.
699  */
700 void
702 {
703  if (!ctx->accept_writes)
704  elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
705 
706  ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
707  ctx->prepared_write = true;
708 }
709 
710 /*
711  * Perform a write using the context's output routine.
712  */
713 void
714 OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
715 {
716  if (!ctx->prepared_write)
717  elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
718 
719  ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
720  ctx->prepared_write = false;
721 }
722 
723 /*
724  * Update progress tracking (if supported).
725  */
726 void
728  bool skipped_xact)
729 {
730  if (!ctx->update_progress)
731  return;
732 
733  ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
734  skipped_xact);
735 }
736 
737 /*
738  * Load the output plugin, lookup its output plugin init function, and check
739  * that it provides the required callbacks.
740  */
741 static void
743 {
744  LogicalOutputPluginInit plugin_init;
745 
746  plugin_init = (LogicalOutputPluginInit)
747  load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
748 
749  if (plugin_init == NULL)
750  elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
751 
752  /* ask the output plugin to fill the callback struct */
753  plugin_init(callbacks);
754 
755  if (callbacks->begin_cb == NULL)
756  elog(ERROR, "output plugins have to register a begin callback");
757  if (callbacks->change_cb == NULL)
758  elog(ERROR, "output plugins have to register a change callback");
759  if (callbacks->commit_cb == NULL)
760  elog(ERROR, "output plugins have to register a commit callback");
761 }
762 
763 static void
765 {
767 
768  /* not all callbacks have an associated LSN */
769  if (state->report_location != InvalidXLogRecPtr)
770  errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
771  NameStr(state->ctx->slot->data.name),
772  NameStr(state->ctx->slot->data.plugin),
773  state->callback_name,
774  LSN_FORMAT_ARGS(state->report_location));
775  else
776  errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
777  NameStr(state->ctx->slot->data.name),
778  NameStr(state->ctx->slot->data.plugin),
779  state->callback_name);
780 }
781 
782 static void
784 {
786  ErrorContextCallback errcallback;
787 
788  Assert(!ctx->fast_forward);
789 
790  /* Push callback + info on the error context stack */
791  state.ctx = ctx;
792  state.callback_name = "startup";
793  state.report_location = InvalidXLogRecPtr;
795  errcallback.arg = (void *) &state;
796  errcallback.previous = error_context_stack;
797  error_context_stack = &errcallback;
798 
799  /* set output state */
800  ctx->accept_writes = false;
801  ctx->end_xact = false;
802 
803  /* do the actual work: call callback */
804  ctx->callbacks.startup_cb(ctx, opt, is_init);
805 
806  /* Pop the error context stack */
807  error_context_stack = errcallback.previous;
808 }
809 
810 static void
812 {
814  ErrorContextCallback errcallback;
815 
816  Assert(!ctx->fast_forward);
817 
818  /* Push callback + info on the error context stack */
819  state.ctx = ctx;
820  state.callback_name = "shutdown";
821  state.report_location = InvalidXLogRecPtr;
823  errcallback.arg = (void *) &state;
824  errcallback.previous = error_context_stack;
825  error_context_stack = &errcallback;
826 
827  /* set output state */
828  ctx->accept_writes = false;
829  ctx->end_xact = false;
830 
831  /* do the actual work: call callback */
832  ctx->callbacks.shutdown_cb(ctx);
833 
834  /* Pop the error context stack */
835  error_context_stack = errcallback.previous;
836 }
837 
838 
839 /*
840  * Callbacks for ReorderBuffer which add in some more information and then call
841  * output_plugin.h plugins.
842  */
843 static void
845 {
846  LogicalDecodingContext *ctx = cache->private_data;
848  ErrorContextCallback errcallback;
849 
850  Assert(!ctx->fast_forward);
851 
852  /* Push callback + info on the error context stack */
853  state.ctx = ctx;
854  state.callback_name = "begin";
855  state.report_location = txn->first_lsn;
857  errcallback.arg = (void *) &state;
858  errcallback.previous = error_context_stack;
859  error_context_stack = &errcallback;
860 
861  /* set output state */
862  ctx->accept_writes = true;
863  ctx->write_xid = txn->xid;
864  ctx->write_location = txn->first_lsn;
865  ctx->end_xact = false;
866 
867  /* do the actual work: call callback */
868  ctx->callbacks.begin_cb(ctx, txn);
869 
870  /* Pop the error context stack */
871  error_context_stack = errcallback.previous;
872 }
873 
874 static void
876  XLogRecPtr commit_lsn)
877 {
878  LogicalDecodingContext *ctx = cache->private_data;
880  ErrorContextCallback errcallback;
881 
882  Assert(!ctx->fast_forward);
883 
884  /* Push callback + info on the error context stack */
885  state.ctx = ctx;
886  state.callback_name = "commit";
887  state.report_location = txn->final_lsn; /* beginning of commit record */
889  errcallback.arg = (void *) &state;
890  errcallback.previous = error_context_stack;
891  error_context_stack = &errcallback;
892 
893  /* set output state */
894  ctx->accept_writes = true;
895  ctx->write_xid = txn->xid;
896  ctx->write_location = txn->end_lsn; /* points to the end of the record */
897  ctx->end_xact = true;
898 
899  /* do the actual work: call callback */
900  ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
901 
902  /* Pop the error context stack */
903  error_context_stack = errcallback.previous;
904 }
905 
906 /*
907  * The functionality of begin_prepare is quite similar to begin with the
908  * exception that this will have gid (global transaction id) information which
909  * can be used by plugin. Now, we thought about extending the existing begin
910  * but that would break the replication protocol and additionally this looks
911  * cleaner.
912  */
913 static void
915 {
916  LogicalDecodingContext *ctx = cache->private_data;
918  ErrorContextCallback errcallback;
919 
920  Assert(!ctx->fast_forward);
921 
922  /* We're only supposed to call this when two-phase commits are supported */
923  Assert(ctx->twophase);
924 
925  /* Push callback + info on the error context stack */
926  state.ctx = ctx;
927  state.callback_name = "begin_prepare";
928  state.report_location = txn->first_lsn;
930  errcallback.arg = (void *) &state;
931  errcallback.previous = error_context_stack;
932  error_context_stack = &errcallback;
933 
934  /* set output state */
935  ctx->accept_writes = true;
936  ctx->write_xid = txn->xid;
937  ctx->write_location = txn->first_lsn;
938  ctx->end_xact = false;
939 
940  /*
941  * If the plugin supports two-phase commits then begin prepare callback is
942  * mandatory
943  */
944  if (ctx->callbacks.begin_prepare_cb == NULL)
945  ereport(ERROR,
946  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
947  errmsg("logical replication at prepare time requires a %s callback",
948  "begin_prepare_cb")));
949 
950  /* do the actual work: call callback */
951  ctx->callbacks.begin_prepare_cb(ctx, txn);
952 
953  /* Pop the error context stack */
954  error_context_stack = errcallback.previous;
955 }
956 
957 static void
959  XLogRecPtr prepare_lsn)
960 {
961  LogicalDecodingContext *ctx = cache->private_data;
963  ErrorContextCallback errcallback;
964 
965  Assert(!ctx->fast_forward);
966 
967  /* We're only supposed to call this when two-phase commits are supported */
968  Assert(ctx->twophase);
969 
970  /* Push callback + info on the error context stack */
971  state.ctx = ctx;
972  state.callback_name = "prepare";
973  state.report_location = txn->final_lsn; /* beginning of prepare record */
975  errcallback.arg = (void *) &state;
976  errcallback.previous = error_context_stack;
977  error_context_stack = &errcallback;
978 
979  /* set output state */
980  ctx->accept_writes = true;
981  ctx->write_xid = txn->xid;
982  ctx->write_location = txn->end_lsn; /* points to the end of the record */
983  ctx->end_xact = true;
984 
985  /*
986  * If the plugin supports two-phase commits then prepare callback is
987  * mandatory
988  */
989  if (ctx->callbacks.prepare_cb == NULL)
990  ereport(ERROR,
991  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
992  errmsg("logical replication at prepare time requires a %s callback",
993  "prepare_cb")));
994 
995  /* do the actual work: call callback */
996  ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
997 
998  /* Pop the error context stack */
999  error_context_stack = errcallback.previous;
1000 }
1001 
1002 static void
1004  XLogRecPtr commit_lsn)
1005 {
1006  LogicalDecodingContext *ctx = cache->private_data;
1008  ErrorContextCallback errcallback;
1009 
1010  Assert(!ctx->fast_forward);
1011 
1012  /* We're only supposed to call this when two-phase commits are supported */
1013  Assert(ctx->twophase);
1014 
1015  /* Push callback + info on the error context stack */
1016  state.ctx = ctx;
1017  state.callback_name = "commit_prepared";
1018  state.report_location = txn->final_lsn; /* beginning of commit record */
1019  errcallback.callback = output_plugin_error_callback;
1020  errcallback.arg = (void *) &state;
1021  errcallback.previous = error_context_stack;
1022  error_context_stack = &errcallback;
1023 
1024  /* set output state */
1025  ctx->accept_writes = true;
1026  ctx->write_xid = txn->xid;
1027  ctx->write_location = txn->end_lsn; /* points to the end of the record */
1028  ctx->end_xact = true;
1029 
1030  /*
1031  * If the plugin support two-phase commits then commit prepared callback
1032  * is mandatory
1033  */
1034  if (ctx->callbacks.commit_prepared_cb == NULL)
1035  ereport(ERROR,
1036  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1037  errmsg("logical replication at prepare time requires a %s callback",
1038  "commit_prepared_cb")));
1039 
1040  /* do the actual work: call callback */
1041  ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
1042 
1043  /* Pop the error context stack */
1044  error_context_stack = errcallback.previous;
1045 }
1046 
1047 static void
1049  XLogRecPtr prepare_end_lsn,
1050  TimestampTz prepare_time)
1051 {
1052  LogicalDecodingContext *ctx = cache->private_data;
1054  ErrorContextCallback errcallback;
1055 
1056  Assert(!ctx->fast_forward);
1057 
1058  /* We're only supposed to call this when two-phase commits are supported */
1059  Assert(ctx->twophase);
1060 
1061  /* Push callback + info on the error context stack */
1062  state.ctx = ctx;
1063  state.callback_name = "rollback_prepared";
1064  state.report_location = txn->final_lsn; /* beginning of commit record */
1065  errcallback.callback = output_plugin_error_callback;
1066  errcallback.arg = (void *) &state;
1067  errcallback.previous = error_context_stack;
1068  error_context_stack = &errcallback;
1069 
1070  /* set output state */
1071  ctx->accept_writes = true;
1072  ctx->write_xid = txn->xid;
1073  ctx->write_location = txn->end_lsn; /* points to the end of the record */
1074  ctx->end_xact = true;
1075 
1076  /*
1077  * If the plugin support two-phase commits then rollback prepared callback
1078  * is mandatory
1079  */
1080  if (ctx->callbacks.rollback_prepared_cb == NULL)
1081  ereport(ERROR,
1082  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1083  errmsg("logical replication at prepare time requires a %s callback",
1084  "rollback_prepared_cb")));
1085 
1086  /* do the actual work: call callback */
1087  ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
1088  prepare_time);
1089 
1090  /* Pop the error context stack */
1091  error_context_stack = errcallback.previous;
1092 }
1093 
1094 static void
1096  Relation relation, ReorderBufferChange *change)
1097 {
1098  LogicalDecodingContext *ctx = cache->private_data;
1100  ErrorContextCallback errcallback;
1101 
1102  Assert(!ctx->fast_forward);
1103 
1104  /* Push callback + info on the error context stack */
1105  state.ctx = ctx;
1106  state.callback_name = "change";
1107  state.report_location = change->lsn;
1108  errcallback.callback = output_plugin_error_callback;
1109  errcallback.arg = (void *) &state;
1110  errcallback.previous = error_context_stack;
1111  error_context_stack = &errcallback;
1112 
1113  /* set output state */
1114  ctx->accept_writes = true;
1115  ctx->write_xid = txn->xid;
1116 
1117  /*
1118  * Report this change's lsn so replies from clients can give an up-to-date
1119  * answer. This won't ever be enough (and shouldn't be!) to confirm
1120  * receipt of this transaction, but it might allow another transaction's
1121  * commit to be confirmed with one message.
1122  */
1123  ctx->write_location = change->lsn;
1124 
1125  ctx->end_xact = false;
1126 
1127  ctx->callbacks.change_cb(ctx, txn, relation, change);
1128 
1129  /* Pop the error context stack */
1130  error_context_stack = errcallback.previous;
1131 }
1132 
1133 static void
1135  int nrelations, Relation relations[], ReorderBufferChange *change)
1136 {
1137  LogicalDecodingContext *ctx = cache->private_data;
1139  ErrorContextCallback errcallback;
1140 
1141  Assert(!ctx->fast_forward);
1142 
1143  if (!ctx->callbacks.truncate_cb)
1144  return;
1145 
1146  /* Push callback + info on the error context stack */
1147  state.ctx = ctx;
1148  state.callback_name = "truncate";
1149  state.report_location = change->lsn;
1150  errcallback.callback = output_plugin_error_callback;
1151  errcallback.arg = (void *) &state;
1152  errcallback.previous = error_context_stack;
1153  error_context_stack = &errcallback;
1154 
1155  /* set output state */
1156  ctx->accept_writes = true;
1157  ctx->write_xid = txn->xid;
1158 
1159  /*
1160  * Report this change's lsn so replies from clients can give an up-to-date
1161  * answer. This won't ever be enough (and shouldn't be!) to confirm
1162  * receipt of this transaction, but it might allow another transaction's
1163  * commit to be confirmed with one message.
1164  */
1165  ctx->write_location = change->lsn;
1166 
1167  ctx->end_xact = false;
1168 
1169  ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
1170 
1171  /* Pop the error context stack */
1172  error_context_stack = errcallback.previous;
1173 }
1174 
1175 bool
1177  const char *gid)
1178 {
1180  ErrorContextCallback errcallback;
1181  bool ret;
1182 
1183  Assert(!ctx->fast_forward);
1184 
1185  /* Push callback + info on the error context stack */
1186  state.ctx = ctx;
1187  state.callback_name = "filter_prepare";
1188  state.report_location = InvalidXLogRecPtr;
1189  errcallback.callback = output_plugin_error_callback;
1190  errcallback.arg = (void *) &state;
1191  errcallback.previous = error_context_stack;
1192  error_context_stack = &errcallback;
1193 
1194  /* set output state */
1195  ctx->accept_writes = false;
1196  ctx->end_xact = false;
1197 
1198  /* do the actual work: call callback */
1199  ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
1200 
1201  /* Pop the error context stack */
1202  error_context_stack = errcallback.previous;
1203 
1204  return ret;
1205 }
1206 
1207 bool
1209 {
1211  ErrorContextCallback errcallback;
1212  bool ret;
1213 
1214  Assert(!ctx->fast_forward);
1215 
1216  /* Push callback + info on the error context stack */
1217  state.ctx = ctx;
1218  state.callback_name = "filter_by_origin";
1219  state.report_location = InvalidXLogRecPtr;
1220  errcallback.callback = output_plugin_error_callback;
1221  errcallback.arg = (void *) &state;
1222  errcallback.previous = error_context_stack;
1223  error_context_stack = &errcallback;
1224 
1225  /* set output state */
1226  ctx->accept_writes = false;
1227  ctx->end_xact = false;
1228 
1229  /* do the actual work: call callback */
1230  ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
1231 
1232  /* Pop the error context stack */
1233  error_context_stack = errcallback.previous;
1234 
1235  return ret;
1236 }
1237 
1238 static void
1240  XLogRecPtr message_lsn, bool transactional,
1241  const char *prefix, Size message_size, const char *message)
1242 {
1243  LogicalDecodingContext *ctx = cache->private_data;
1245  ErrorContextCallback errcallback;
1246 
1247  Assert(!ctx->fast_forward);
1248 
1249  if (ctx->callbacks.message_cb == NULL)
1250  return;
1251 
1252  /* Push callback + info on the error context stack */
1253  state.ctx = ctx;
1254  state.callback_name = "message";
1255  state.report_location = message_lsn;
1256  errcallback.callback = output_plugin_error_callback;
1257  errcallback.arg = (void *) &state;
1258  errcallback.previous = error_context_stack;
1259  error_context_stack = &errcallback;
1260 
1261  /* set output state */
1262  ctx->accept_writes = true;
1263  ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
1264  ctx->write_location = message_lsn;
1265  ctx->end_xact = false;
1266 
1267  /* do the actual work: call callback */
1268  ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
1269  message_size, message);
1270 
1271  /* Pop the error context stack */
1272  error_context_stack = errcallback.previous;
1273 }
1274 
1275 static void
1277  XLogRecPtr first_lsn)
1278 {
1279  LogicalDecodingContext *ctx = cache->private_data;
1281  ErrorContextCallback errcallback;
1282 
1283  Assert(!ctx->fast_forward);
1284 
1285  /* We're only supposed to call this when streaming is supported. */
1286  Assert(ctx->streaming);
1287 
1288  /* Push callback + info on the error context stack */
1289  state.ctx = ctx;
1290  state.callback_name = "stream_start";
1291  state.report_location = first_lsn;
1292  errcallback.callback = output_plugin_error_callback;
1293  errcallback.arg = (void *) &state;
1294  errcallback.previous = error_context_stack;
1295  error_context_stack = &errcallback;
1296 
1297  /* set output state */
1298  ctx->accept_writes = true;
1299  ctx->write_xid = txn->xid;
1300 
1301  /*
1302  * Report this message's lsn so replies from clients can give an
1303  * up-to-date answer. This won't ever be enough (and shouldn't be!) to
1304  * confirm receipt of this transaction, but it might allow another
1305  * transaction's commit to be confirmed with one message.
1306  */
1307  ctx->write_location = first_lsn;
1308 
1309  ctx->end_xact = false;
1310 
1311  /* in streaming mode, stream_start_cb is required */
1312  if (ctx->callbacks.stream_start_cb == NULL)
1313  ereport(ERROR,
1314  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1315  errmsg("logical streaming requires a %s callback",
1316  "stream_start_cb")));
1317 
1318  ctx->callbacks.stream_start_cb(ctx, txn);
1319 
1320  /* Pop the error context stack */
1321  error_context_stack = errcallback.previous;
1322 }
1323 
1324 static void
1326  XLogRecPtr last_lsn)
1327 {
1328  LogicalDecodingContext *ctx = cache->private_data;
1330  ErrorContextCallback errcallback;
1331 
1332  Assert(!ctx->fast_forward);
1333 
1334  /* We're only supposed to call this when streaming is supported. */
1335  Assert(ctx->streaming);
1336 
1337  /* Push callback + info on the error context stack */
1338  state.ctx = ctx;
1339  state.callback_name = "stream_stop";
1340  state.report_location = last_lsn;
1341  errcallback.callback = output_plugin_error_callback;
1342  errcallback.arg = (void *) &state;
1343  errcallback.previous = error_context_stack;
1344  error_context_stack = &errcallback;
1345 
1346  /* set output state */
1347  ctx->accept_writes = true;
1348  ctx->write_xid = txn->xid;
1349 
1350  /*
1351  * Report this message's lsn so replies from clients can give an
1352  * up-to-date answer. This won't ever be enough (and shouldn't be!) to
1353  * confirm receipt of this transaction, but it might allow another
1354  * transaction's commit to be confirmed with one message.
1355  */
1356  ctx->write_location = last_lsn;
1357 
1358  ctx->end_xact = false;
1359 
1360  /* in streaming mode, stream_stop_cb is required */
1361  if (ctx->callbacks.stream_stop_cb == NULL)
1362  ereport(ERROR,
1363  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1364  errmsg("logical streaming requires a %s callback",
1365  "stream_stop_cb")));
1366 
1367  ctx->callbacks.stream_stop_cb(ctx, txn);
1368 
1369  /* Pop the error context stack */
1370  error_context_stack = errcallback.previous;
1371 }
1372 
1373 static void
1375  XLogRecPtr abort_lsn)
1376 {
1377  LogicalDecodingContext *ctx = cache->private_data;
1379  ErrorContextCallback errcallback;
1380 
1381  Assert(!ctx->fast_forward);
1382 
1383  /* We're only supposed to call this when streaming is supported. */
1384  Assert(ctx->streaming);
1385 
1386  /* Push callback + info on the error context stack */
1387  state.ctx = ctx;
1388  state.callback_name = "stream_abort";
1389  state.report_location = abort_lsn;
1390  errcallback.callback = output_plugin_error_callback;
1391  errcallback.arg = (void *) &state;
1392  errcallback.previous = error_context_stack;
1393  error_context_stack = &errcallback;
1394 
1395  /* set output state */
1396  ctx->accept_writes = true;
1397  ctx->write_xid = txn->xid;
1398  ctx->write_location = abort_lsn;
1399  ctx->end_xact = true;
1400 
1401  /* in streaming mode, stream_abort_cb is required */
1402  if (ctx->callbacks.stream_abort_cb == NULL)
1403  ereport(ERROR,
1404  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1405  errmsg("logical streaming requires a %s callback",
1406  "stream_abort_cb")));
1407 
1408  ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
1409 
1410  /* Pop the error context stack */
1411  error_context_stack = errcallback.previous;
1412 }
1413 
1414 static void
1416  XLogRecPtr prepare_lsn)
1417 {
1418  LogicalDecodingContext *ctx = cache->private_data;
1420  ErrorContextCallback errcallback;
1421 
1422  Assert(!ctx->fast_forward);
1423 
1424  /*
1425  * We're only supposed to call this when streaming and two-phase commits
1426  * are supported.
1427  */
1428  Assert(ctx->streaming);
1429  Assert(ctx->twophase);
1430 
1431  /* Push callback + info on the error context stack */
1432  state.ctx = ctx;
1433  state.callback_name = "stream_prepare";
1434  state.report_location = txn->final_lsn;
1435  errcallback.callback = output_plugin_error_callback;
1436  errcallback.arg = (void *) &state;
1437  errcallback.previous = error_context_stack;
1438  error_context_stack = &errcallback;
1439 
1440  /* set output state */
1441  ctx->accept_writes = true;
1442  ctx->write_xid = txn->xid;
1443  ctx->write_location = txn->end_lsn;
1444  ctx->end_xact = true;
1445 
1446  /* in streaming mode with two-phase commits, stream_prepare_cb is required */
1447  if (ctx->callbacks.stream_prepare_cb == NULL)
1448  ereport(ERROR,
1449  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1450  errmsg("logical streaming at prepare time requires a %s callback",
1451  "stream_prepare_cb")));
1452 
1453  ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
1454 
1455  /* Pop the error context stack */
1456  error_context_stack = errcallback.previous;
1457 }
1458 
1459 static void
1461  XLogRecPtr commit_lsn)
1462 {
1463  LogicalDecodingContext *ctx = cache->private_data;
1465  ErrorContextCallback errcallback;
1466 
1467  Assert(!ctx->fast_forward);
1468 
1469  /* We're only supposed to call this when streaming is supported. */
1470  Assert(ctx->streaming);
1471 
1472  /* Push callback + info on the error context stack */
1473  state.ctx = ctx;
1474  state.callback_name = "stream_commit";
1475  state.report_location = txn->final_lsn;
1476  errcallback.callback = output_plugin_error_callback;
1477  errcallback.arg = (void *) &state;
1478  errcallback.previous = error_context_stack;
1479  error_context_stack = &errcallback;
1480 
1481  /* set output state */
1482  ctx->accept_writes = true;
1483  ctx->write_xid = txn->xid;
1484  ctx->write_location = txn->end_lsn;
1485  ctx->end_xact = true;
1486 
1487  /* in streaming mode, stream_commit_cb is required */
1488  if (ctx->callbacks.stream_commit_cb == NULL)
1489  ereport(ERROR,
1490  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1491  errmsg("logical streaming requires a %s callback",
1492  "stream_commit_cb")));
1493 
1494  ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
1495 
1496  /* Pop the error context stack */
1497  error_context_stack = errcallback.previous;
1498 }
1499 
1500 static void
1502  Relation relation, ReorderBufferChange *change)
1503 {
1504  LogicalDecodingContext *ctx = cache->private_data;
1506  ErrorContextCallback errcallback;
1507 
1508  Assert(!ctx->fast_forward);
1509 
1510  /* We're only supposed to call this when streaming is supported. */
1511  Assert(ctx->streaming);
1512 
1513  /* Push callback + info on the error context stack */
1514  state.ctx = ctx;
1515  state.callback_name = "stream_change";
1516  state.report_location = change->lsn;
1517  errcallback.callback = output_plugin_error_callback;
1518  errcallback.arg = (void *) &state;
1519  errcallback.previous = error_context_stack;
1520  error_context_stack = &errcallback;
1521 
1522  /* set output state */
1523  ctx->accept_writes = true;
1524  ctx->write_xid = txn->xid;
1525 
1526  /*
1527  * Report this change's lsn so replies from clients can give an up-to-date
1528  * answer. This won't ever be enough (and shouldn't be!) to confirm
1529  * receipt of this transaction, but it might allow another transaction's
1530  * commit to be confirmed with one message.
1531  */
1532  ctx->write_location = change->lsn;
1533 
1534  ctx->end_xact = false;
1535 
1536  /* in streaming mode, stream_change_cb is required */
1537  if (ctx->callbacks.stream_change_cb == NULL)
1538  ereport(ERROR,
1539  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1540  errmsg("logical streaming requires a %s callback",
1541  "stream_change_cb")));
1542 
1543  ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
1544 
1545  /* Pop the error context stack */
1546  error_context_stack = errcallback.previous;
1547 }
1548 
1549 static void
1551  XLogRecPtr message_lsn, bool transactional,
1552  const char *prefix, Size message_size, const char *message)
1553 {
1554  LogicalDecodingContext *ctx = cache->private_data;
1556  ErrorContextCallback errcallback;
1557 
1558  Assert(!ctx->fast_forward);
1559 
1560  /* We're only supposed to call this when streaming is supported. */
1561  Assert(ctx->streaming);
1562 
1563  /* this callback is optional */
1564  if (ctx->callbacks.stream_message_cb == NULL)
1565  return;
1566 
1567  /* Push callback + info on the error context stack */
1568  state.ctx = ctx;
1569  state.callback_name = "stream_message";
1570  state.report_location = message_lsn;
1571  errcallback.callback = output_plugin_error_callback;
1572  errcallback.arg = (void *) &state;
1573  errcallback.previous = error_context_stack;
1574  error_context_stack = &errcallback;
1575 
1576  /* set output state */
1577  ctx->accept_writes = true;
1578  ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
1579  ctx->write_location = message_lsn;
1580  ctx->end_xact = false;
1581 
1582  /* do the actual work: call callback */
1583  ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
1584  message_size, message);
1585 
1586  /* Pop the error context stack */
1587  error_context_stack = errcallback.previous;
1588 }
1589 
1590 static void
1592  int nrelations, Relation relations[],
1593  ReorderBufferChange *change)
1594 {
1595  LogicalDecodingContext *ctx = cache->private_data;
1597  ErrorContextCallback errcallback;
1598 
1599  Assert(!ctx->fast_forward);
1600 
1601  /* We're only supposed to call this when streaming is supported. */
1602  Assert(ctx->streaming);
1603 
1604  /* this callback is optional */
1605  if (!ctx->callbacks.stream_truncate_cb)
1606  return;
1607 
1608  /* Push callback + info on the error context stack */
1609  state.ctx = ctx;
1610  state.callback_name = "stream_truncate";
1611  state.report_location = change->lsn;
1612  errcallback.callback = output_plugin_error_callback;
1613  errcallback.arg = (void *) &state;
1614  errcallback.previous = error_context_stack;
1615  error_context_stack = &errcallback;
1616 
1617  /* set output state */
1618  ctx->accept_writes = true;
1619  ctx->write_xid = txn->xid;
1620 
1621  /*
1622  * Report this change's lsn so replies from clients can give an up-to-date
1623  * answer. This won't ever be enough (and shouldn't be!) to confirm
1624  * receipt of this transaction, but it might allow another transaction's
1625  * commit to be confirmed with one message.
1626  */
1627  ctx->write_location = change->lsn;
1628 
1629  ctx->end_xact = false;
1630 
1631  ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
1632 
1633  /* Pop the error context stack */
1634  error_context_stack = errcallback.previous;
1635 }
1636 
1637 static void
1639  XLogRecPtr lsn)
1640 {
1641  LogicalDecodingContext *ctx = cache->private_data;
1643  ErrorContextCallback errcallback;
1644 
1645  Assert(!ctx->fast_forward);
1646 
1647  /* Push callback + info on the error context stack */
1648  state.ctx = ctx;
1649  state.callback_name = "update_progress_txn";
1650  state.report_location = lsn;
1651  errcallback.callback = output_plugin_error_callback;
1652  errcallback.arg = (void *) &state;
1653  errcallback.previous = error_context_stack;
1654  error_context_stack = &errcallback;
1655 
1656  /* set output state */
1657  ctx->accept_writes = false;
1658  ctx->write_xid = txn->xid;
1659 
1660  /*
1661  * Report this change's lsn so replies from clients can give an up-to-date
1662  * answer. This won't ever be enough (and shouldn't be!) to confirm
1663  * receipt of this transaction, but it might allow another transaction's
1664  * commit to be confirmed with one message.
1665  */
1666  ctx->write_location = lsn;
1667 
1668  ctx->end_xact = false;
1669 
1670  OutputPluginUpdateProgress(ctx, false);
1671 
1672  /* Pop the error context stack */
1673  error_context_stack = errcallback.previous;
1674 }
1675 
1676 /*
1677  * Set the required catalog xmin horizon for historic snapshots in the current
1678  * replication slot.
1679  *
1680  * Note that in the most cases, we won't be able to immediately use the xmin
1681  * to increase the xmin horizon: we need to wait till the client has confirmed
1682  * receiving current_lsn with LogicalConfirmReceivedLocation().
1683  */
1684 void
1686 {
1687  bool updated_xmin = false;
1688  ReplicationSlot *slot;
1689  bool got_new_xmin = false;
1690 
1691  slot = MyReplicationSlot;
1692 
1693  Assert(slot != NULL);
1694 
1695  SpinLockAcquire(&slot->mutex);
1696 
1697  /*
1698  * don't overwrite if we already have a newer xmin. This can happen if we
1699  * restart decoding in a slot.
1700  */
1702  {
1703  }
1704 
1705  /*
1706  * If the client has already confirmed up to this lsn, we directly can
1707  * mark this as accepted. This can happen if we restart decoding in a
1708  * slot.
1709  */
1710  else if (current_lsn <= slot->data.confirmed_flush)
1711  {
1712  slot->candidate_catalog_xmin = xmin;
1713  slot->candidate_xmin_lsn = current_lsn;
1714 
1715  /* our candidate can directly be used */
1716  updated_xmin = true;
1717  }
1718 
1719  /*
1720  * Only increase if the previous values have been applied, otherwise we
1721  * might never end up updating if the receiver acks too slowly.
1722  */
1723  else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
1724  {
1725  slot->candidate_catalog_xmin = xmin;
1726  slot->candidate_xmin_lsn = current_lsn;
1727 
1728  /*
1729  * Log new xmin at an appropriate log level after releasing the
1730  * spinlock.
1731  */
1732  got_new_xmin = true;
1733  }
1734  SpinLockRelease(&slot->mutex);
1735 
1736  if (got_new_xmin)
1737  elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
1738  LSN_FORMAT_ARGS(current_lsn));
1739 
1740  /* candidate already valid with the current flush position, apply */
1741  if (updated_xmin)
1743 }
1744 
1745 /*
1746  * Mark the minimal LSN (restart_lsn) we need to read to replay all
1747  * transactions that have not yet committed at current_lsn.
1748  *
1749  * Just like LogicalIncreaseXminForSlot this only takes effect when the
1750  * client has confirmed to have received current_lsn.
1751  */
1752 void
1754 {
1755  bool updated_lsn = false;
1756  ReplicationSlot *slot;
1757 
1758  slot = MyReplicationSlot;
1759 
1760  Assert(slot != NULL);
1761  Assert(restart_lsn != InvalidXLogRecPtr);
1762  Assert(current_lsn != InvalidXLogRecPtr);
1763 
1764  SpinLockAcquire(&slot->mutex);
1765 
1766  /* don't overwrite if have a newer restart lsn */
1767  if (restart_lsn <= slot->data.restart_lsn)
1768  {
1769  }
1770 
1771  /*
1772  * We might have already flushed far enough to directly accept this lsn,
1773  * in this case there is no need to check for existing candidate LSNs
1774  */
1775  else if (current_lsn <= slot->data.confirmed_flush)
1776  {
1777  slot->candidate_restart_valid = current_lsn;
1778  slot->candidate_restart_lsn = restart_lsn;
1779 
1780  /* our candidate can directly be used */
1781  updated_lsn = true;
1782  }
1783 
1784  /*
1785  * Only increase if the previous values have been applied, otherwise we
1786  * might never end up updating if the receiver acks too slowly. A missed
1787  * value here will just cause some extra effort after reconnecting.
1788  */
1790  {
1791  slot->candidate_restart_valid = current_lsn;
1792  slot->candidate_restart_lsn = restart_lsn;
1793  SpinLockRelease(&slot->mutex);
1794 
1795  elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
1796  LSN_FORMAT_ARGS(restart_lsn),
1797  LSN_FORMAT_ARGS(current_lsn));
1798  }
1799  else
1800  {
1801  XLogRecPtr candidate_restart_lsn;
1802  XLogRecPtr candidate_restart_valid;
1803  XLogRecPtr confirmed_flush;
1804 
1805  candidate_restart_lsn = slot->candidate_restart_lsn;
1806  candidate_restart_valid = slot->candidate_restart_valid;
1807  confirmed_flush = slot->data.confirmed_flush;
1808  SpinLockRelease(&slot->mutex);
1809 
1810  elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
1811  LSN_FORMAT_ARGS(restart_lsn),
1812  LSN_FORMAT_ARGS(current_lsn),
1813  LSN_FORMAT_ARGS(candidate_restart_lsn),
1814  LSN_FORMAT_ARGS(candidate_restart_valid),
1815  LSN_FORMAT_ARGS(confirmed_flush));
1816  }
1817 
1818  /* candidates are already valid with the current flush position, apply */
1819  if (updated_lsn)
1821 }
1822 
1823 /*
1824  * Handle a consumer's confirmation having received all changes up to lsn.
1825  */
1826 void
1828 {
1829  Assert(lsn != InvalidXLogRecPtr);
1830 
1831  /* Do an unlocked check for candidate_lsn first. */
1834  {
1835  bool updated_xmin = false;
1836  bool updated_restart = false;
1837 
1839 
1841 
1842  /* if we're past the location required for bumping xmin, do so */
1845  {
1846  /*
1847  * We have to write the changed xmin to disk *before* we change
1848  * the in-memory value, otherwise after a crash we wouldn't know
1849  * that some catalog tuples might have been removed already.
1850  *
1851  * Ensure that by first writing to ->xmin and only update
1852  * ->effective_xmin once the new state is synced to disk. After a
1853  * crash ->effective_xmin is set to ->xmin.
1854  */
1857  {
1861  updated_xmin = true;
1862  }
1863  }
1864 
1867  {
1869 
1873  updated_restart = true;
1874  }
1875 
1877 
1878  /* first write new xmin to disk, so we know what's up after a crash */
1879  if (updated_xmin || updated_restart)
1880  {
1883  elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
1884  }
1885 
1886  /*
1887  * Now the new xmin is safely on disk, we can let the global value
1888  * advance. We do not take ProcArrayLock or similar since we only
1889  * advance xmin here and there's not much harm done by a concurrent
1890  * computation missing that.
1891  */
1892  if (updated_xmin)
1893  {
1897 
1900  }
1901  }
1902  else
1903  {
1907  }
1908 }
1909 
1910 /*
1911  * Clear logical streaming state during (sub)transaction abort.
1912  */
1913 void
1915 {
1917  bsysscan = false;
1918 }
1919 
1920 /*
1921  * Report stats for a slot.
1922  */
1923 void
1925 {
1926  ReorderBuffer *rb = ctx->reorder;
1927  PgStat_StatReplSlotEntry repSlotStat;
1928 
1929  /* Nothing to do if we don't have any replication stats to be sent. */
1930  if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
1931  return;
1932 
1933  elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
1934  rb,
1935  (long long) rb->spillTxns,
1936  (long long) rb->spillCount,
1937  (long long) rb->spillBytes,
1938  (long long) rb->streamTxns,
1939  (long long) rb->streamCount,
1940  (long long) rb->streamBytes,
1941  (long long) rb->totalTxns,
1942  (long long) rb->totalBytes);
1943 
1944  repSlotStat.spill_txns = rb->spillTxns;
1945  repSlotStat.spill_count = rb->spillCount;
1946  repSlotStat.spill_bytes = rb->spillBytes;
1947  repSlotStat.stream_txns = rb->streamTxns;
1948  repSlotStat.stream_count = rb->streamCount;
1949  repSlotStat.stream_bytes = rb->streamBytes;
1950  repSlotStat.total_txns = rb->totalTxns;
1951  repSlotStat.total_bytes = rb->totalBytes;
1952 
1953  pgstat_report_replslot(ctx->slot, &repSlotStat);
1954 
1955  rb->spillTxns = 0;
1956  rb->spillCount = 0;
1957  rb->spillBytes = 0;
1958  rb->streamTxns = 0;
1959  rb->streamCount = 0;
1960  rb->streamBytes = 0;
1961  rb->totalTxns = 0;
1962  rb->totalBytes = 0;
1963 }
1964 
1965 /*
1966  * Read up to the end of WAL starting from the decoding slot's restart_lsn.
1967  * Return true if any meaningful/decodable WAL records are encountered,
1968  * otherwise false.
1969  */
1970 bool
1972 {
1973  bool has_pending_wal = false;
1974 
1976 
1977  PG_TRY();
1978  {
1980 
1981  /*
1982  * Create our decoding context in fast_forward mode, passing start_lsn
1983  * as InvalidXLogRecPtr, so that we start processing from the slot's
1984  * confirmed_flush.
1985  */
1987  NIL,
1988  true, /* fast_forward */
1989  XL_ROUTINE(.page_read = read_local_xlog_page,
1990  .segment_open = wal_segment_open,
1991  .segment_close = wal_segment_close),
1992  NULL, NULL, NULL);
1993 
1994  /*
1995  * Start reading at the slot's restart_lsn, which we know points to a
1996  * valid record.
1997  */
1999 
2000  /* Invalidate non-timetravel entries */
2002 
2003  /* Loop until the end of WAL or some changes are processed */
2004  while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
2005  {
2006  XLogRecord *record;
2007  char *errm = NULL;
2008 
2009  record = XLogReadRecord(ctx->reader, &errm);
2010 
2011  if (errm)
2012  elog(ERROR, "could not find record for logical decoding: %s", errm);
2013 
2014  if (record != NULL)
2016 
2017  has_pending_wal = ctx->processing_required;
2018 
2020  }
2021 
2022  /* Clean up */
2023  FreeDecodingContext(ctx);
2025  }
2026  PG_CATCH();
2027  {
2028  /* clear all timetravel entries */
2030 
2031  PG_RE_THROW();
2032  }
2033  PG_END_TRY();
2034 
2035  return has_pending_wal;
2036 }
#define NameStr(name)
Definition: c.h:733
uint32 TransactionId
Definition: c.h:639
size_t Size
Definition: c.h:592
int64 TimestampTz
Definition: timestamp.h:39
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:88
void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)
Definition: dfmgr.c:105
int errdetail(const char *fmt,...)
Definition: elog.c:1205
ErrorContextCallback * error_context_stack
Definition: elog.c:94
int errhint(const char *fmt,...)
Definition: elog.c:1319
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define LOG
Definition: elog.h:31
#define PG_RE_THROW()
Definition: elog.h:411
#define errcontext
Definition: elog.h:196
#define PG_TRY(...)
Definition: elog.h:370
#define DEBUG2
Definition: elog.h:29
#define PG_END_TRY(...)
Definition: elog.h:395
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:380
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
void err(int eval, const char *fmt,...)
Definition: err.c:43
Oid MyDatabaseId
Definition: globals.c:91
void InvalidateSystemCaches(void)
Definition: inval.c:792
Assert(fmt[strlen(fmt) - 1] !='\n')
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition: logical.c:1095
static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: logical.c:1003
static void update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr lsn)
Definition: logical.c:1638
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1827
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:686
bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
Definition: logical.c:1971
static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
Definition: logical.c:1276
static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: logical.c:875
static void output_plugin_error_callback(void *arg)
Definition: logical.c:764
static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
Definition: logical.c:914
static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: logical.c:1415
static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: logical.c:958
static LogicalDecodingContext * StartupDecodingContext(List *output_plugin_options, XLogRecPtr start_lsn, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:149
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:714
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: logical.c:1591
static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: logical.c:1134
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:642
static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
Definition: logical.c:844
static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: logical.c:1048
bool DecodingContextReady(LogicalDecodingContext *ctx)
Definition: logical.c:633
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
Definition: logical.c:727
static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: logical.c:783
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1924
void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
Definition: logical.c:1753
static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition: logical.c:1501
static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: logical.c:1374
void ResetLogicalStreamingState(void)
Definition: logical.c:1914
void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
Definition: logical.c:1685
struct LogicalErrorCallbackState LogicalErrorCallbackState
static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message)
Definition: logical.c:1550
static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: logical.c:1460
bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
Definition: logical.c:1176
static void shutdown_cb_wrapper(LogicalDecodingContext *ctx)
Definition: logical.c:811
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:701
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:108
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:328
bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: logical.c:1208
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
Definition: logical.c:742
static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
Definition: logical.c:1325
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message)
Definition: logical.c:1239
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:494
void(* LogicalOutputPluginWriterUpdateProgress)(struct LogicalDecodingContext *lr, XLogRecPtr Ptr, TransactionId xid, bool skipped_xact)
Definition: logical.h:27
void(* LogicalOutputPluginWriterWrite)(struct LogicalDecodingContext *lr, XLogRecPtr Ptr, TransactionId xid, bool last_write)
Definition: logical.h:19
LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite
Definition: logical.h:25
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1172
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1785
@ LW_EXCLUSIVE
Definition: lwlock.h:116
void * palloc0(Size size)
Definition: mcxt.c:1334
MemoryContext CurrentMemoryContext
Definition: mcxt.c:131
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:442
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
void namestrcpy(Name name, const char *str)
Definition: name.c:233
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
void * arg
const void * data
#define NIL
Definition: pg_list.h:68
static const char * plugin
void pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat)
#define InvalidOid
Definition: postgres_ext.h:36
#define PROC_IN_LOGICAL_DECODING
Definition: proc.h:61
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
Definition: procarray.c:2932
ReorderBuffer * ReorderBufferAllocate(void)
void ReorderBufferFree(ReorderBuffer *rb)
void ReplicationSlotMarkDirty(void)
Definition: slot.c:983
void ReplicationSlotReserveWal(void)
Definition: slot.c:1374
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1022
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
void ReplicationSlotSave(void)
Definition: slot.c:965
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1078
void CheckSlotRequirements(void)
Definition: slot.c:1335
#define SlotIsPhysical(slot)
Definition: slot.h:206
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:51
@ RS_INVAL_NONE
Definition: slot.h:49
void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:424
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:406
void FreeSnapshotBuilder(SnapBuild *builder)
Definition: snapbuild.c:362
SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, XLogRecPtr two_phase_at)
Definition: snapbuild.c:316
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:46
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
PGPROC * MyProc
Definition: proc.c:66
PROC_HDR * ProcGlobal
Definition: proc.c:78
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
struct ErrorContextCallback * previous
Definition: elog.h:295
void(* callback)(void *arg)
Definition: elog.h:296
Definition: pg_list.h:54
OutputPluginOptions options
Definition: logical.h:54
XLogReaderState * reader
Definition: logical.h:42
MemoryContext context
Definition: logical.h:36
struct SnapBuild * snapshot_builder
Definition: logical.h:44
StringInfo out
Definition: logical.h:71
XLogRecPtr write_location
Definition: logical.h:108
LogicalOutputPluginWriterPrepareWrite prepare_write
Definition: logical.h:64
OutputPluginCallbacks callbacks
Definition: logical.h:53
TransactionId write_xid
Definition: logical.h:109
List * output_plugin_options
Definition: logical.h:59
ReplicationSlot * slot
Definition: logical.h:39
LogicalOutputPluginWriterWrite write
Definition: logical.h:65
struct ReorderBuffer * reorder
Definition: logical.h:43
LogicalOutputPluginWriterUpdateProgress update_progress
Definition: logical.h:66
XLogRecPtr report_location
Definition: logical.c:51
LogicalDecodingContext * ctx
Definition: logical.c:49
const char * callback_name
Definition: logical.c:50
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
uint8 statusFlags
Definition: proc.h:238
int pgxactoff
Definition: proc.h:180
uint8 * statusFlags
Definition: proc.h:395
PgStat_Counter stream_count
Definition: pgstat.h:373
PgStat_Counter total_txns
Definition: pgstat.h:375
PgStat_Counter total_bytes
Definition: pgstat.h:376
PgStat_Counter spill_txns
Definition: pgstat.h:369
PgStat_Counter stream_txns
Definition: pgstat.h:372
PgStat_Counter spill_count
Definition: pgstat.h:370
PgStat_Counter stream_bytes
Definition: pgstat.h:374
PgStat_Counter spill_bytes
Definition: pgstat.h:371
XLogRecPtr first_lsn
XLogRecPtr final_lsn
XLogRecPtr end_lsn
TransactionId xid
ReorderBufferStreamMessageCB stream_message
ReorderBufferStreamChangeCB stream_change
ReorderBufferBeginCB begin_prepare
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferMessageCB message
ReorderBufferRollbackPreparedCB rollback_prepared
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop
ReorderBufferApplyChangeCB apply_change
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamAbortCB stream_abort
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferStreamCommitCB stream_commit
ReorderBufferApplyTruncateCB apply_truncate
ReorderBufferBeginCB begin
void * private_data
TransactionId catalog_xmin
Definition: slot.h:90
XLogRecPtr restart_lsn
Definition: slot.h:93
XLogRecPtr confirmed_flush
Definition: slot.h:104
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:96
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:194
TransactionId effective_catalog_xmin
Definition: slot.h:175
slock_t mutex
Definition: slot.h:151
XLogRecPtr candidate_restart_valid
Definition: slot.h:195
TransactionId effective_xmin
Definition: slot.h:174
XLogRecPtr candidate_restart_lsn
Definition: slot.h:196
TransactionId candidate_catalog_xmin
Definition: slot.h:193
ReplicationSlotPersistentData data
Definition: slot.h:178
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
Definition: c.h:728
Definition: regguts.h:323
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4910
bool bsysscan
Definition: xact.c:98
TransactionId CheckXidAlive
Definition: xact.c:97
bool IsTransactionState(void)
Definition: xact.c:379
TransactionId GetTopTransactionIdIfAny(void)
Definition: xact.c:433
bool RecoveryInProgress(void)
Definition: xlog.c:6201
int wal_level
Definition: xlog.c:131
int wal_segment_size
Definition: xlog.c:143
WalLevel GetActiveWalLevelOnStandby(void)
Definition: xlog.c:4749
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:74
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:389
void XLogReaderFree(XLogReaderState *state)
Definition: xlogreader.c:161
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:106
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:231
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:842
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: xlogutils.c:817
int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: xlogutils.c:861