PostgreSQL Source Code git master
Loading...
Searching...
No Matches
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "lib/pairingheap.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReorderBufferChange
 
struct  ReorderBufferTXN
 
struct  ReorderBuffer
 

Macros

#define PG_LOGICAL_DIR   "pg_logical"
 
#define PG_LOGICAL_MAPPINGS_DIR   PG_LOGICAL_DIR "/mappings"
 
#define PG_LOGICAL_SNAPSHOTS_DIR   PG_LOGICAL_DIR "/snapshots"
 
#define RBTXN_HAS_CATALOG_CHANGES   0x0001
 
#define RBTXN_IS_SUBXACT   0x0002
 
#define RBTXN_IS_SERIALIZED   0x0004
 
#define RBTXN_IS_SERIALIZED_CLEAR   0x0008
 
#define RBTXN_IS_STREAMED   0x0010
 
#define RBTXN_HAS_PARTIAL_CHANGE   0x0020
 
#define RBTXN_IS_PREPARED   0x0040
 
#define RBTXN_SKIPPED_PREPARE   0x0080
 
#define RBTXN_HAS_STREAMABLE_CHANGE   0x0100
 
#define RBTXN_SENT_PREPARE   0x0200
 
#define RBTXN_IS_COMMITTED   0x0400
 
#define RBTXN_IS_ABORTED   0x0800
 
#define RBTXN_DISTR_INVAL_OVERFLOWED   0x1000
 
#define RBTXN_PREPARE_STATUS_MASK   (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)
 
#define rbtxn_has_catalog_changes(txn)
 
#define rbtxn_is_known_subxact(txn)
 
#define rbtxn_is_serialized(txn)
 
#define rbtxn_is_serialized_clear(txn)
 
#define rbtxn_has_partial_change(txn)
 
#define rbtxn_has_streamable_change(txn)
 
#define rbtxn_is_streamed(txn)
 
#define rbtxn_is_prepared(txn)
 
#define rbtxn_sent_prepare(txn)
 
#define rbtxn_is_committed(txn)
 
#define rbtxn_is_aborted(txn)
 
#define rbtxn_skip_prepared(txn)
 
#define rbtxn_distr_inval_overflowed(txn)
 
#define rbtxn_is_toptxn(txn)
 
#define rbtxn_is_subtxn(txn)
 
#define rbtxn_get_toptxn(txn)
 

Typedefs

typedef enum ReorderBufferChangeType ReorderBufferChangeType
 
typedef struct ReorderBufferChange ReorderBufferChange
 
typedef struct ReorderBufferTXN ReorderBufferTXN
 
typedef struct ReorderBuffer ReorderBuffer
 
typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
typedef void(* ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
 
typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
 
typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
typedef void(* ReorderBufferStreamPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(* ReorderBufferUpdateProgressTxnCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr lsn)
 

Enumerations

enum  DebugLogicalRepStreamingMode { DEBUG_LOGICAL_REP_STREAMING_BUFFERED , DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE }
 
enum  ReorderBufferChangeType {
  REORDER_BUFFER_CHANGE_INSERT , REORDER_BUFFER_CHANGE_UPDATE , REORDER_BUFFER_CHANGE_DELETE , REORDER_BUFFER_CHANGE_MESSAGE ,
  REORDER_BUFFER_CHANGE_INVALIDATION , REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT , REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID , REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID ,
  REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT , REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM , REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT , REORDER_BUFFER_CHANGE_TRUNCATE
}
 

Functions

ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
HeapTuple ReorderBufferAllocTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferFreeTupleBuf (HeapTuple tuple)
 
ReorderBufferChangeReorderBufferAllocChange (ReorderBuffer *rb)
 
void ReorderBufferFreeChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
OidReorderBufferAllocRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferFreeRelids (ReorderBuffer *rb, Oid *relids)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, ReplOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, ReplOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferAddDistributedInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, ReplOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
uint32 ReorderBufferGetInvalidations (ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs)
 
void StartupReorderBuffer (void)
 

Variables

PGDLLIMPORT int logical_decoding_work_mem
 
PGDLLIMPORT int debug_logical_replication_streaming
 

Macro Definition Documentation

◆ PG_LOGICAL_DIR

#define PG_LOGICAL_DIR   "pg_logical"

Definition at line 22 of file reorderbuffer.h.

◆ PG_LOGICAL_MAPPINGS_DIR

#define PG_LOGICAL_MAPPINGS_DIR   PG_LOGICAL_DIR "/mappings"

Definition at line 23 of file reorderbuffer.h.

◆ PG_LOGICAL_SNAPSHOTS_DIR

#define PG_LOGICAL_SNAPSHOTS_DIR   PG_LOGICAL_DIR "/snapshots"

Definition at line 24 of file reorderbuffer.h.

◆ RBTXN_DISTR_INVAL_OVERFLOWED

#define RBTXN_DISTR_INVAL_OVERFLOWED   0x1000

Definition at line 179 of file reorderbuffer.h.

◆ rbtxn_distr_inval_overflowed

#define rbtxn_distr_inval_overflowed (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_DISTR_INVAL_OVERFLOWED) != 0 \
)
#define RBTXN_DISTR_INVAL_OVERFLOWED

Definition at line 270 of file reorderbuffer.h.

289 : (txn) \
290)
291
292typedef struct ReorderBufferTXN
293{
294 /* See above */
296
297 /* The transaction's transaction id, can be a toplevel or sub xid. */
299
300 /* Xid of top-level transaction, if known */
302
303 /*
304 * Global transaction id required for identification of prepared
305 * transactions.
306 */
307 char *gid;
308
309 /*
310 * LSN of the first data carrying, WAL record with knowledge about this
311 * xid. This is allowed to *not* be first record adorned with this xid, if
312 * the previous records aren't relevant for logical decoding.
313 */
315
316 /* ----
317 * LSN of the record that lead to this xact to be prepared or committed or
318 * aborted. This can be a
319 * * plain commit record
320 * * plain commit record, of a parent transaction
321 * * prepared transaction
322 * * prepared transaction commit
323 * * plain abort record
324 * * prepared transaction abort
325 *
326 * This can also become set to earlier values than transaction end when
327 * a transaction is spilled to disk; specifically it's set to the LSN of
328 * the latest change written to disk so far.
329 * ----
330 */
332
333 /*
334 * LSN pointing to the end of the commit record + 1.
335 */
337
338 /* Toplevel transaction for this subxact (NULL for top-level). */
339 struct ReorderBufferTXN *toptxn;
340
341 /*
342 * LSN of the last lsn at which snapshot information reside, so we can
343 * restart decoding from there and fully recover this transaction from
344 * WAL.
345 */
347
348 /* origin of the change that caused this transaction */
351
352 /*
353 * Commit or Prepare time, only known when we read the actual commit or
354 * prepare record.
355 */
356 union
357 {
361 };
362
363 /*
364 * The base snapshot is used to decode all changes until either this
365 * transaction modifies the catalog, or another catalog-modifying
366 * transaction commits.
367 */
370 dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
371
372 /*
373 * Snapshot/CID from the previous streaming run. Only valid for already
374 * streamed transactions (NULL/InvalidCommandId otherwise).
375 */
378
379 /*
380 * How many ReorderBufferChange's do we have in this txn.
381 *
382 * Changes in subtransactions are *not* included but tracked separately.
383 */
385
386 /*
387 * How many of the above entries are stored in memory in contrast to being
388 * spilled to disk.
389 */
391
392 /*
393 * List of ReorderBufferChange structs, including new Snapshots, new
394 * CommandIds and command invalidation messages.
395 */
397
398 /*
399 * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples.
400 * Those are always assigned to the toplevel transaction. (Keep track of
401 * #entries to create a hash of the right size)
402 */
405
406 /*
407 * On-demand built hash for looking up the above values.
408 */
410
411 /*
412 * Hash containing (potentially partial) toast entries. NULL if no toast
413 * tuples have been found for the current change.
414 */
416
417 /*
418 * non-hierarchical list of subtransactions that are *not* aborted. Only
419 * used in toplevel transactions.
420 */
423
424 /*
425 * Stored cache invalidations. This is not a linked list because we get
426 * all the invalidations at once.
427 */
430
431 /*
432 * Stores cache invalidation messages distributed by other transactions.
433 */
436
437 /* ---
438 * Position in one of two lists:
439 * * list of subtransactions if we are *known* to be subxact
440 * * list of toplevel xacts (can be an as-yet unknown subxact)
441 * ---
442 */
444
445 /*
446 * A node in the list of catalog modifying transactions
447 */
449
450 /*
451 * A node in txn_heap
452 */
454
455 /*
456 * Size of this transaction (changes currently in memory, in bytes).
457 */
458 Size size;
459
460 /* Size of top-transaction including sub-transactions. */
462
463 /*
464 * Private data pointer of the output plugin.
465 */
468
469/* so we can define the callbacks used inside struct ReorderBuffer itself */
470typedef struct ReorderBuffer ReorderBuffer;
471
472/* change callback signature */
474 ReorderBufferTXN *txn,
475 Relation relation,
476 ReorderBufferChange *change);
477
478/* truncate callback signature */
480 ReorderBufferTXN *txn,
481 int nrelations,
482 Relation relations[],
483 ReorderBufferChange *change);
484
485/* begin callback signature */
487 ReorderBufferTXN *txn);
488
489/* commit callback signature */
491 ReorderBufferTXN *txn,
492 XLogRecPtr commit_lsn);
493
494/* message callback signature */
496 ReorderBufferTXN *txn,
498 bool transactional,
499 const char *prefix, Size sz,
500 const char *message);
501
502/* begin prepare callback signature */
504 ReorderBufferTXN *txn);
505
506/* prepare callback signature */
508 ReorderBufferTXN *txn,
509 XLogRecPtr prepare_lsn);
510
511/* commit prepared callback signature */
513 ReorderBufferTXN *txn,
514 XLogRecPtr commit_lsn);
515
516/* rollback prepared callback signature */
518 ReorderBufferTXN *txn,
519 XLogRecPtr prepare_end_lsn,
520 TimestampTz prepare_time);
521
522/* start streaming transaction callback signature */
524 ReorderBufferTXN *txn,
525 XLogRecPtr first_lsn);
526
527/* stop streaming transaction callback signature */
529 ReorderBufferTXN *txn,
530 XLogRecPtr last_lsn);
531
532/* discard streamed transaction callback signature */
534 ReorderBufferTXN *txn,
535 XLogRecPtr abort_lsn);
536
537/* prepare streamed transaction callback signature */
539 ReorderBufferTXN *txn,
540 XLogRecPtr prepare_lsn);
541
542/* commit streamed transaction callback signature */
544 ReorderBufferTXN *txn,
545 XLogRecPtr commit_lsn);
546
547/* stream change callback signature */
549 ReorderBufferTXN *txn,
550 Relation relation,
551 ReorderBufferChange *change);
552
553/* stream message callback signature */
555 ReorderBufferTXN *txn,
557 bool transactional,
558 const char *prefix, Size sz,
559 const char *message);
560
561/* stream truncate callback signature */
563 ReorderBufferTXN *txn,
564 int nrelations,
565 Relation relations[],
566 ReorderBufferChange *change);
567
568/* update progress txn callback signature */
570 ReorderBufferTXN *txn,
571 XLogRecPtr lsn);
572
573struct ReorderBuffer
574{
575 /*
576 * xid => ReorderBufferTXN lookup table
577 */
578 HTAB *by_txn;
579
580 /*
581 * Transactions that could be a toplevel xact, ordered by LSN of the first
582 * record bearing that xid.
583 */
585
586 /*
587 * Transactions and subtransactions that have a base snapshot, ordered by
588 * LSN of the record which caused us to first obtain the base snapshot.
589 * This is not the same as toplevel_by_lsn, because we only set the base
590 * snapshot on the first logical-decoding-relevant record (eg. heap
591 * writes), whereas the initial LSN could be set by other operations.
592 */
594
595 /*
596 * Transactions and subtransactions that have modified system catalogs.
597 */
599
600 /*
601 * one-entry sized cache for by_txn. Very frequently the same txn gets
602 * looked up over and over again.
603 */
606
607 /*
608 * Callbacks to be called when a transactions commits.
609 */
615
616 /*
617 * Callbacks to be called when streaming a transaction at prepare time.
618 */
623
624 /*
625 * Callbacks to be called when streaming a transaction.
626 */
635
636 /*
637 * Callback to be called when updating progress during sending data of a
638 * transaction (and its subtransactions) to the output plugin.
639 */
641
642 /*
643 * Pointer that will be passed untouched to the callbacks.
644 */
645 void *private_data;
646
647 /*
648 * Saved output plugin option
649 */
650 bool output_rewrites;
651
652 /*
653 * Private memory context.
654 */
656
657 /*
658 * Memory contexts for specific types objects
659 */
663
665
666 /* buffer for disk<->memory conversions */
667 char *outbuf;
669
670 /* memory accounting */
671 Size size;
672
673 /* Max-heap for sizes of all top-level and sub transactions */
675
676 /*
677 * Statistics about transactions spilled to disk.
678 *
679 * A single transaction may be spilled repeatedly, which is why we keep
680 * two different counters. For spilling, the transaction counter includes
681 * both toplevel transactions and subtransactions.
682 */
683 int64 spillTxns; /* number of transactions spilled to disk */
684 int64 spillCount; /* spill-to-disk invocation counter */
685 int64 spillBytes; /* amount of data spilled to disk */
686
687 /* Statistics about transactions streamed to the decoding output plugin */
688 int64 streamTxns; /* number of transactions streamed */
689 int64 streamCount; /* streaming invocation counter */
690 int64 streamBytes; /* amount of data decoded */
691
692 /* Number of times the logical_decoding_work_mem limit has been reached */
694
695 /*
696 * Statistics about all the transactions sent to the decoding output
697 * plugin
698 */
699 int64 totalTxns; /* total number of transactions sent */
700 int64 totalBytes; /* total amount of data decoded */
701};
702
703
706
708extern void ReorderBufferFreeTupleBuf(HeapTuple tuple);
709
712 ReorderBufferChange *change, bool upd_mem);
713
714extern Oid *ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids);
715extern void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids);
716
718 XLogRecPtr lsn, ReorderBufferChange *change,
719 bool toast_insert);
722 bool transactional, const char *prefix,
723 Size message_size, const char *message);
725 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
726 TimestampTz commit_time, ReplOriginId origin_id, XLogRecPtr origin_lsn);
728 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
729 XLogRecPtr two_phase_at,
730 TimestampTz commit_time,
731 ReplOriginId origin_id, XLogRecPtr origin_lsn,
732 char *gid, bool is_commit);
734 TransactionId subxid, XLogRecPtr lsn);
736 TransactionId subxid, XLogRecPtr commit_lsn,
737 XLogRecPtr end_lsn);
739 TimestampTz abort_time);
740extern void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid);
743
751 XLogRecPtr lsn, RelFileLocator locator,
752 ItemPointerData tid,
753 CommandId cmin, CommandId cmax, CommandId combocid);
755 Size nmsgs, SharedInvalidationMessage *msgs);
757 XLogRecPtr lsn, Size nmsgs,
759extern void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
760 SharedInvalidationMessage *invalidations);
762
766
768 XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
769 TimestampTz prepare_time,
770 ReplOriginId origin_id, XLogRecPtr origin_lsn);
772extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
776
778
780 TransactionId xid,
782
783extern void StartupReorderBuffer(void);
784
785#endif
int64_t int64
Definition c.h:615
uint32 bits32
Definition c.h:627
uint64_t uint64
Definition c.h:619
uint32_t uint32
Definition c.h:618
uint32 CommandId
Definition c.h:752
uint32 TransactionId
Definition c.h:738
size_t Size
Definition c.h:691
int64 TimestampTz
Definition timestamp.h:39
unsigned int Oid
static int fb(int x)
void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)
void ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
void(* ReorderBufferCommitCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
void(* ReorderBufferUpdateProgressTxnCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr lsn)
void(* ReorderBufferStreamCommitCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
void(* ReorderBufferStreamStartCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
void(* ReorderBufferApplyChangeCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void(* ReorderBufferStreamPrepareCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void(* ReorderBufferStreamChangeCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void ReorderBufferFreeTupleBuf(HeapTuple tuple)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid)
uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs)
void(* ReorderBufferCommitPreparedCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void(* ReorderBufferStreamMessageCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
TransactionId * ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
void(* ReorderBufferBeginCB)(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBuffer * ReorderBufferAllocate(void)
void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, ReplOriginId origin_id, XLogRecPtr origin_lsn)
void(* ReorderBufferMessageCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, ReplOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
void(* ReorderBufferApplyTruncateCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, ReplOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
void(* ReorderBufferBeginPrepareCB)(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
HeapTuple ReorderBufferAllocTupleBuf(ReorderBuffer *rb, Size tuple_len)
void(* ReorderBufferStreamAbortCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
ReorderBufferChange * ReorderBufferAllocChange(ReorderBuffer *rb)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
void(* ReorderBufferPrepareCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
Oid * ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids)
void(* ReorderBufferRollbackPreparedCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
void ReorderBufferFree(ReorderBuffer *rb)
void(* ReorderBufferStreamStopCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
void StartupReorderBuffer(void)
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
void(* ReorderBufferStreamTruncateCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
XLogRecPtr restart_decoding_lsn
pairingheap_node txn_node
TimestampTz commit_time
XLogRecPtr base_snapshot_lsn
TransactionId toplevel_xid
dlist_node catchange_node
SharedInvalidationMessage * invalidations
struct ReorderBufferTXN * toptxn
dlist_head tuplecids
XLogRecPtr first_lsn
TimestampTz abort_time
XLogRecPtr final_lsn
void * output_plugin_private
uint32 ninvalidations_distributed
XLogRecPtr origin_lsn
TimestampTz prepare_time
TransactionId xid
ReplOriginId origin_id
dlist_node base_snapshot_node
SharedInvalidationMessage * invalidations_distributed
ReorderBufferStreamMessageCB stream_message
ReorderBufferStreamChangeCB stream_change
ReorderBufferBeginCB begin_prepare
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferMessageCB message
dlist_head txns_by_base_snapshot_lsn
MemoryContext context
dclist_head catchange_txns
ReorderBufferRollbackPreparedCB rollback_prepared
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop
ReorderBufferApplyChangeCB apply_change
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamAbortCB stream_abort
MemoryContext tup_context
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferStreamCommitCB stream_commit
ReorderBufferApplyTruncateCB apply_truncate
dlist_head toplevel_by_lsn
pairingheap * txn_heap
ReorderBufferBeginCB begin
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21

◆ rbtxn_get_toptxn

#define rbtxn_get_toptxn (   txn)
Value:
( \
rbtxn_is_subtxn(txn) ? (txn)->toptxn : (txn) \
)

Definition at line 288 of file reorderbuffer.h.

290 : (txn) \
291)

◆ RBTXN_HAS_CATALOG_CHANGES

#define RBTXN_HAS_CATALOG_CHANGES   0x0001

Definition at line 167 of file reorderbuffer.h.

◆ rbtxn_has_catalog_changes

#define rbtxn_has_catalog_changes (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
)
#define RBTXN_HAS_CATALOG_CHANGES

Definition at line 184 of file reorderbuffer.h.

◆ RBTXN_HAS_PARTIAL_CHANGE

#define RBTXN_HAS_PARTIAL_CHANGE   0x0020

Definition at line 172 of file reorderbuffer.h.

◆ rbtxn_has_partial_change

#define rbtxn_has_partial_change (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
)
#define RBTXN_HAS_PARTIAL_CHANGE

Definition at line 208 of file reorderbuffer.h.

◆ RBTXN_HAS_STREAMABLE_CHANGE

#define RBTXN_HAS_STREAMABLE_CHANGE   0x0100

Definition at line 175 of file reorderbuffer.h.

◆ rbtxn_has_streamable_change

#define rbtxn_has_streamable_change (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
)
#define RBTXN_HAS_STREAMABLE_CHANGE

Definition at line 214 of file reorderbuffer.h.

◆ RBTXN_IS_ABORTED

#define RBTXN_IS_ABORTED   0x0800

Definition at line 178 of file reorderbuffer.h.

◆ rbtxn_is_aborted

#define rbtxn_is_aborted (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_ABORTED) != 0 \
)
#define RBTXN_IS_ABORTED

Definition at line 258 of file reorderbuffer.h.

◆ RBTXN_IS_COMMITTED

#define RBTXN_IS_COMMITTED   0x0400

Definition at line 177 of file reorderbuffer.h.

◆ rbtxn_is_committed

#define rbtxn_is_committed (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_COMMITTED) != 0 \
)
#define RBTXN_IS_COMMITTED

Definition at line 252 of file reorderbuffer.h.

◆ rbtxn_is_known_subxact

#define rbtxn_is_known_subxact (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \
)
#define RBTXN_IS_SUBXACT

Definition at line 190 of file reorderbuffer.h.

◆ RBTXN_IS_PREPARED

#define RBTXN_IS_PREPARED   0x0040

Definition at line 173 of file reorderbuffer.h.

◆ rbtxn_is_prepared

#define rbtxn_is_prepared (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_PREPARED) != 0 \
)
#define RBTXN_IS_PREPARED

Definition at line 240 of file reorderbuffer.h.

◆ RBTXN_IS_SERIALIZED

#define RBTXN_IS_SERIALIZED   0x0004

Definition at line 169 of file reorderbuffer.h.

◆ rbtxn_is_serialized

#define rbtxn_is_serialized (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
)
#define RBTXN_IS_SERIALIZED

Definition at line 196 of file reorderbuffer.h.

◆ RBTXN_IS_SERIALIZED_CLEAR

#define RBTXN_IS_SERIALIZED_CLEAR   0x0008

Definition at line 170 of file reorderbuffer.h.

◆ rbtxn_is_serialized_clear

#define rbtxn_is_serialized_clear (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
)
#define RBTXN_IS_SERIALIZED_CLEAR

Definition at line 202 of file reorderbuffer.h.

◆ RBTXN_IS_STREAMED

#define RBTXN_IS_STREAMED   0x0010

Definition at line 171 of file reorderbuffer.h.

◆ rbtxn_is_streamed

#define rbtxn_is_streamed (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
)
#define RBTXN_IS_STREAMED

Definition at line 228 of file reorderbuffer.h.

◆ rbtxn_is_subtxn

#define rbtxn_is_subtxn (   txn)
Value:
( \
(txn)->toptxn != NULL \
)

Definition at line 282 of file reorderbuffer.h.

◆ RBTXN_IS_SUBXACT

#define RBTXN_IS_SUBXACT   0x0002

Definition at line 168 of file reorderbuffer.h.

◆ rbtxn_is_toptxn

#define rbtxn_is_toptxn (   txn)
Value:
( \
(txn)->toptxn == NULL \
)

Definition at line 276 of file reorderbuffer.h.

◆ RBTXN_PREPARE_STATUS_MASK

#define RBTXN_PREPARE_STATUS_MASK   (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)

Definition at line 181 of file reorderbuffer.h.

◆ RBTXN_SENT_PREPARE

#define RBTXN_SENT_PREPARE   0x0200

Definition at line 176 of file reorderbuffer.h.

◆ rbtxn_sent_prepare

#define rbtxn_sent_prepare (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_SENT_PREPARE) != 0 \
)
#define RBTXN_SENT_PREPARE

Definition at line 246 of file reorderbuffer.h.

◆ rbtxn_skip_prepared

#define rbtxn_skip_prepared (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
)
#define RBTXN_SKIPPED_PREPARE

Definition at line 264 of file reorderbuffer.h.

◆ RBTXN_SKIPPED_PREPARE

#define RBTXN_SKIPPED_PREPARE   0x0080

Definition at line 174 of file reorderbuffer.h.

Typedef Documentation

◆ ReorderBuffer

Definition at line 471 of file reorderbuffer.h.

◆ ReorderBufferApplyChangeCB

typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 474 of file reorderbuffer.h.

◆ ReorderBufferApplyTruncateCB

typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 480 of file reorderbuffer.h.

◆ ReorderBufferBeginCB

typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 487 of file reorderbuffer.h.

◆ ReorderBufferBeginPrepareCB

typedef void(* ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 504 of file reorderbuffer.h.

◆ ReorderBufferChange

◆ ReorderBufferChangeType

◆ ReorderBufferCommitCB

typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 491 of file reorderbuffer.h.

◆ ReorderBufferCommitPreparedCB

typedef void(* ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 513 of file reorderbuffer.h.

◆ ReorderBufferMessageCB

typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 496 of file reorderbuffer.h.

◆ ReorderBufferPrepareCB

typedef void(* ReorderBufferPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

Definition at line 508 of file reorderbuffer.h.

◆ ReorderBufferRollbackPreparedCB

typedef void(* ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)

Definition at line 518 of file reorderbuffer.h.

◆ ReorderBufferStreamAbortCB

typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)

Definition at line 534 of file reorderbuffer.h.

◆ ReorderBufferStreamChangeCB

typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 549 of file reorderbuffer.h.

◆ ReorderBufferStreamCommitCB

typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 544 of file reorderbuffer.h.

◆ ReorderBufferStreamMessageCB

typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 555 of file reorderbuffer.h.

◆ ReorderBufferStreamPrepareCB

typedef void(* ReorderBufferStreamPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

Definition at line 539 of file reorderbuffer.h.

◆ ReorderBufferStreamStartCB

typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)

Definition at line 524 of file reorderbuffer.h.

◆ ReorderBufferStreamStopCB

typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)

Definition at line 529 of file reorderbuffer.h.

◆ ReorderBufferStreamTruncateCB

typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 563 of file reorderbuffer.h.

◆ ReorderBufferTXN

◆ ReorderBufferUpdateProgressTxnCB

typedef void(* ReorderBufferUpdateProgressTxnCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr lsn)

Definition at line 570 of file reorderbuffer.h.

Enumeration Type Documentation

◆ DebugLogicalRepStreamingMode

Enumerator
DEBUG_LOGICAL_REP_STREAMING_BUFFERED 
DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE 

Definition at line 31 of file reorderbuffer.h.

◆ ReorderBufferChangeType

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INVALIDATION 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT 
REORDER_BUFFER_CHANGE_TRUNCATE 

Definition at line 50 of file reorderbuffer.h.

51{
ReorderBufferChangeType
@ REORDER_BUFFER_CHANGE_INVALIDATION
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
@ REORDER_BUFFER_CHANGE_INSERT
@ REORDER_BUFFER_CHANGE_MESSAGE
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
@ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT
@ REORDER_BUFFER_CHANGE_TRUNCATE
@ REORDER_BUFFER_CHANGE_DELETE
@ REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT
@ REORDER_BUFFER_CHANGE_UPDATE

Function Documentation

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)
extern

Definition at line 3085 of file reorderbuffer.c.

3087{
3088 ReorderBufferTXN *txn;
3089
3090 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3091 false);
3092
3093 /* unknown, nothing to remove */
3094 if (txn == NULL)
3095 return;
3096
3097 txn->abort_time = abort_time;
3098
3099 /* For streamed transactions notify the remote node about the abort. */
3100 if (rbtxn_is_streamed(txn))
3101 {
3102 rb->stream_abort(rb, txn, lsn);
3103
3104 /*
3105 * We might have decoded changes for this transaction that could load
3106 * the cache as per the current transaction's view (consider DDL's
3107 * happened in this transaction). We don't want the decoding of future
3108 * transactions to use those cache entries so execute only the inval
3109 * messages in this transaction.
3110 */
3111 if (txn->ninvalidations > 0)
3113 txn->invalidations);
3114 }
3115
3116 /* cosmetic... */
3117 txn->final_lsn = lsn;
3118
3119 /* remove potential on-disk data, and deallocate */
3121}
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References ReorderBufferTXN::abort_time, fb(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)
extern

Definition at line 3131 of file reorderbuffer.c.

3132{
3134
3135 /*
3136 * Iterate through all (potential) toplevel TXNs and abort all that are
3137 * older than what possibly can be running. Once we've found the first
3138 * that is alive we stop, there might be some that acquired an xid earlier
3139 * but started writing later, but it's unlikely and they will be cleaned
3140 * up in a later call to this function.
3141 */
3142 dlist_foreach_modify(it, &rb->toplevel_by_lsn)
3143 {
3144 ReorderBufferTXN *txn;
3145
3146 txn = dlist_container(ReorderBufferTXN, node, it.cur);
3147
3148 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
3149 {
3150 elog(DEBUG2, "aborting old transaction %u", txn->xid);
3151
3152 /* Notify the remote node about the crash/immediate restart. */
3153 if (rbtxn_is_streamed(txn))
3154 rb->stream_abort(rb, txn, InvalidXLogRecPtr);
3155
3156 /* remove potential on-disk data, and deallocate this tx */
3158 }
3159 else
3160 return;
3161 }
3162}
#define DEBUG2
Definition elog.h:29
#define elog(elevel,...)
Definition elog.h:226
#define dlist_foreach_modify(iter, lhead)
Definition ilist.h:640
#define dlist_container(type, membername, ptr)
Definition ilist.h:593
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263

References DEBUG2, dlist_container, dlist_foreach_modify, elog, fb(), InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAddDistributedInvalidations()

void ReorderBufferAddDistributedInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)
extern

Definition at line 3581 of file reorderbuffer.c.

3584{
3585 ReorderBufferTXN *txn;
3586 MemoryContext oldcontext;
3587
3588 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3589
3590 oldcontext = MemoryContextSwitchTo(rb->context);
3591
3592 /*
3593 * Collect all the invalidations under the top transaction, if available,
3594 * so that we can execute them all together. See comments
3595 * ReorderBufferAddInvalidations.
3596 */
3597 txn = rbtxn_get_toptxn(txn);
3598
3599 Assert(nmsgs > 0);
3600
3602 {
3603 /*
3604 * Check the transaction has enough space for storing distributed
3605 * invalidation messages.
3606 */
3608 {
3609 /*
3610 * Mark the invalidation message as overflowed and free up the
3611 * messages accumulated so far.
3612 */
3614
3616 {
3620 }
3621 }
3622 else
3625 msgs, nmsgs);
3626 }
3627
3628 /* Queue the invalidation messages into the transaction */
3629 ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
3630
3631 MemoryContextSwitchTo(oldcontext);
3632}
#define Assert(condition)
Definition c.h:945
void pfree(void *pointer)
Definition mcxt.c:1616
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
static void ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out, uint32 *ninvals_out, SharedInvalidationMessage *msgs_new, Size nmsgs_new)
#define MAX_DISTR_INVAL_MSG_PER_TXN
static void ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
#define rbtxn_get_toptxn(txn)
#define rbtxn_distr_inval_overflowed(txn)

References Assert, fb(), ReorderBufferTXN::invalidations_distributed, MAX_DISTR_INVAL_MSG_PER_TXN, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations_distributed, pfree(), RBTXN_DISTR_INVAL_OVERFLOWED, rbtxn_distr_inval_overflowed, rbtxn_get_toptxn, ReorderBufferAccumulateInvalidations(), ReorderBufferQueueInvalidations(), ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by SnapBuildDistributeSnapshotAndInval().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)
extern

Definition at line 3540 of file reorderbuffer.c.

3543{
3544 ReorderBufferTXN *txn;
3545 MemoryContext oldcontext;
3546
3547 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3548
3549 oldcontext = MemoryContextSwitchTo(rb->context);
3550
3551 /*
3552 * Collect all the invalidations under the top transaction, if available,
3553 * so that we can execute them all together. See comments atop this
3554 * function.
3555 */
3556 txn = rbtxn_get_toptxn(txn);
3557
3558 Assert(nmsgs > 0);
3559
3561 &txn->ninvalidations,
3562 msgs, nmsgs);
3563
3564 ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
3565
3566 MemoryContextSwitchTo(oldcontext);
3567}

References Assert, fb(), ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, rbtxn_get_toptxn, ReorderBufferAccumulateInvalidations(), ReorderBufferQueueInvalidations(), and ReorderBufferTXNByXid().

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)
extern

Definition at line 3355 of file reorderbuffer.c.

3357{
3359
3360 change->data.command_id = cid;
3362
3363 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3364}
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
ReorderBufferChange * ReorderBufferAllocChange(ReorderBuffer *rb)
ReorderBufferChangeType action
union ReorderBufferChange::@117 data

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, fb(), REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferAllocChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)
extern

Definition at line 3454 of file reorderbuffer.c.

3458{
3460 ReorderBufferTXN *txn;
3461
3462 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3463
3464 change->data.tuplecid.locator = locator;
3465 change->data.tuplecid.tid = tid;
3466 change->data.tuplecid.cmin = cmin;
3467 change->data.tuplecid.cmax = cmax;
3468 change->data.tuplecid.combocid = combocid;
3469 change->lsn = lsn;
3470 change->txn = txn;
3472
3473 dlist_push_tail(&txn->tuplecids, &change->node);
3474 txn->ntuplecids++;
3475}
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition ilist.h:364
struct ReorderBufferChange::@117::@121 tuplecid
ItemPointerData tid
struct ReorderBufferTXN * txn
RelFileLocator locator

References ReorderBufferChange::action, ReorderBufferChange::cmax, ReorderBufferChange::cmin, ReorderBufferChange::combocid, ReorderBufferChange::data, dlist_push_tail(), fb(), ReorderBufferChange::locator, ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferAllocChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

◆ ReorderBufferAllocate()

ReorderBuffer * ReorderBufferAllocate ( void  )
extern

Definition at line 325 of file reorderbuffer.c.

326{
327 ReorderBuffer *buffer;
330
332
333 /* allocate memory in own context, to have better accountability */
335 "ReorderBuffer",
337
338 buffer =
340
341 memset(&hash_ctl, 0, sizeof(hash_ctl));
342
343 buffer->context = new_ctx;
344
346 "Change",
348 sizeof(ReorderBufferChange));
349
351 "TXN",
353 sizeof(ReorderBufferTXN));
354
355 /*
356 * To minimize memory fragmentation caused by long-running transactions
357 * with changes spanning multiple memory blocks, we use a single
358 * fixed-size memory block for decoded tuple storage. The performance
359 * testing showed that the default memory block size maintains logical
360 * decoding performance without causing fragmentation due to concurrent
361 * transactions. One might think that we can use the max size as
362 * SLAB_LARGE_BLOCK_SIZE but the test also showed it doesn't help resolve
363 * the memory fragmentation.
364 */
366 "Tuples",
370
371 hash_ctl.keysize = sizeof(TransactionId);
372 hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
373 hash_ctl.hcxt = buffer->context;
374
375 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
377
379 buffer->by_txn_last_txn = NULL;
380
381 buffer->outbuf = NULL;
382 buffer->outbufsize = 0;
383 buffer->size = 0;
384
385 /* txn_heap is ordered by transaction size */
387
388 buffer->spillTxns = 0;
389 buffer->spillCount = 0;
390 buffer->spillBytes = 0;
391 buffer->streamTxns = 0;
392 buffer->streamCount = 0;
393 buffer->streamBytes = 0;
394 buffer->memExceededCount = 0;
395 buffer->totalTxns = 0;
396 buffer->totalBytes = 0;
397
399
400 dlist_init(&buffer->toplevel_by_lsn);
402 dclist_init(&buffer->catchange_txns);
403
404 /*
405 * Ensure there's no stale data from prior uses of this slot, in case some
406 * prior exit avoided calling ReorderBufferFree. Failure to do this can
407 * produce duplicated txns, and it's very cheap if there's nothing there.
408 */
410
411 return buffer;
412}
#define NameStr(name)
Definition c.h:837
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition generation.c:162
#define HASH_CONTEXT
Definition hsearch.h:102
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_BLOBS
Definition hsearch.h:97
static void dlist_init(dlist_head *head)
Definition ilist.h:314
static void dclist_init(dclist_head *head)
Definition ilist.h:671
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define SLAB_DEFAULT_BLOCK_SIZE
Definition memutils.h:189
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition pairingheap.c:42
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition slab.c:322
ReplicationSlot * MyReplicationSlot
Definition slot.c:149
ReplicationSlotPersistentData data
Definition slot.h:213
#define InvalidTransactionId
Definition transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), fb(), GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, InvalidTransactionId, InvalidXLogRecPtr, ReorderBuffer::memExceededCount, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, pairingheap_allocate(), ReorderBufferCleanupSerializedTXNs(), ReorderBufferTXNSizeCompare(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, ReorderBuffer::txn_heap, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferAllocChange()

◆ ReorderBufferAllocRelids()

Oid * ReorderBufferAllocRelids ( ReorderBuffer rb,
int  nrelids 
)
extern

Definition at line 626 of file reorderbuffer.c.

627{
628 Oid *relids;
630
631 alloc_len = sizeof(Oid) * nrelids;
632
633 relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
634
635 return relids;
636}

References fb(), and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferAllocTupleBuf()

HeapTuple ReorderBufferAllocTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)
extern

Definition at line 593 of file reorderbuffer.c.

594{
595 HeapTuple tuple;
597
598 alloc_len = tuple_len + SizeofHeapTupleHeader;
599
600 tuple = (HeapTuple) MemoryContextAlloc(rb->tup_context,
602 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
603
604 return tuple;
605}
#define HEAPTUPLESIZE
Definition htup.h:73
HeapTupleData * HeapTuple
Definition htup.h:71
HeapTupleHeaderData * HeapTupleHeader
Definition htup.h:23
#define SizeofHeapTupleHeader
HeapTupleHeader t_data
Definition htup.h:68

References fb(), HEAPTUPLESIZE, MemoryContextAlloc(), SizeofHeapTupleHeader, and HeapTupleData::t_data.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)
extern

Definition at line 1100 of file reorderbuffer.c.

1102{
1103 ReorderBufferTXN *txn;
1105 bool new_top;
1106 bool new_sub;
1107
1108 txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1109 subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1110
1111 if (!new_sub)
1112 {
1114 {
1115 /* already associated, nothing to do */
1116 return;
1117 }
1118 else
1119 {
1120 /*
1121 * We already saw this transaction, but initially added it to the
1122 * list of top-level txns. Now that we know it's not top-level,
1123 * remove it from there.
1124 */
1125 dlist_delete(&subtxn->node);
1126 }
1127 }
1128
1129 subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1130 subtxn->toplevel_xid = xid;
1131 Assert(subtxn->nsubtxns == 0);
1132
1133 /* set the reference to top-level transaction */
1134 subtxn->toptxn = txn;
1135
1136 /* add to subtransaction list */
1137 dlist_push_tail(&txn->subtxns, &subtxn->node);
1138 txn->nsubtxns++;
1139
1140 /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1142
1143 /* Verify LSN-ordering invariant */
1145}
static void dlist_delete(dlist_node *node)
Definition ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define rbtxn_is_known_subxact(txn)

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), fb(), ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), and ReorderBufferTXN::subtxns.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
ReplOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
extern

Definition at line 2882 of file reorderbuffer.c.

2886{
2887 ReorderBufferTXN *txn;
2888
2889 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2890 false);
2891
2892 /* unknown transaction, nothing to replay */
2893 if (txn == NULL)
2894 return;
2895
2896 ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2897 origin_id, origin_lsn);
2898}
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, ReplOriginId origin_id, XLogRecPtr origin_lsn)

References fb(), InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)
extern

Definition at line 1220 of file reorderbuffer.c.

1223{
1225
1226 subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1227 InvalidXLogRecPtr, false);
1228
1229 /*
1230 * No need to do anything if that subtxn didn't contain any changes
1231 */
1232 if (!subtxn)
1233 return;
1234
1235 subtxn->final_lsn = commit_lsn;
1236 subtxn->end_lsn = end_lsn;
1237
1238 /*
1239 * Assign this subxact as a child of the toplevel xact (no-op if already
1240 * done.)
1241 */
1243}
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

References fb(), InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
ReplOriginId  origin_id,
XLogRecPtr  origin_lsn,
char gid,
bool  is_commit 
)
extern

Definition at line 2999 of file reorderbuffer.c.

3004{
3005 ReorderBufferTXN *txn;
3006 XLogRecPtr prepare_end_lsn;
3007 TimestampTz prepare_time;
3008
3009 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
3010
3011 /* unknown transaction, nothing to do */
3012 if (txn == NULL)
3013 return;
3014
3015 /*
3016 * By this time the txn has the prepare record information, remember it to
3017 * be later used for rollback.
3018 */
3019 prepare_end_lsn = txn->end_lsn;
3020 prepare_time = txn->prepare_time;
3021
3022 /* add the gid in the txn */
3023 txn->gid = pstrdup(gid);
3024
3025 /*
3026 * It is possible that this transaction is not decoded at prepare time
3027 * either because by that time we didn't have a consistent snapshot, or
3028 * two_phase was not enabled, or it was decoded earlier but we have
3029 * restarted. We only need to send the prepare if it was not decoded
3030 * earlier. We don't need to decode the xact for aborts if it is not done
3031 * already.
3032 */
3033 if ((txn->final_lsn < two_phase_at) && is_commit)
3034 {
3035 /*
3036 * txn must have been marked as a prepared transaction and skipped but
3037 * not sent a prepare. Also, the prepare info must have been updated
3038 * in txn even if we skip prepare.
3039 */
3043
3044 /*
3045 * By this time the txn has the prepare record information and it is
3046 * important to use that so that downstream gets the accurate
3047 * information. If instead, we have passed commit information here
3048 * then downstream can behave as it has already replayed commit
3049 * prepared after the restart.
3050 */
3051 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
3052 txn->prepare_time, txn->origin_id, txn->origin_lsn);
3053 }
3054
3055 txn->final_lsn = commit_lsn;
3056 txn->end_lsn = end_lsn;
3057 txn->commit_time = commit_time;
3058 txn->origin_id = origin_id;
3059 txn->origin_lsn = origin_lsn;
3060
3061 if (is_commit)
3062 rb->commit_prepared(rb, txn, commit_lsn);
3063 else
3064 rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
3065
3066 /* cleanup: make sure there's no cache pollution */
3068 txn->invalidations);
3070}
char * pstrdup(const char *in)
Definition mcxt.c:1781
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE_STATUS_MASK
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, fb(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SKIPPED_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and XLogRecPtrIsValid.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)
extern

Definition at line 3178 of file reorderbuffer.c.

3179{
3180 ReorderBufferTXN *txn;
3181
3182 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3183 false);
3184
3185 /* unknown, nothing to forget */
3186 if (txn == NULL)
3187 return;
3188
3189 /* this transaction mustn't be streamed */
3191
3192 /* cosmetic... */
3193 txn->final_lsn = lsn;
3194
3195 /*
3196 * Process only cache invalidation messages in this transaction if there
3197 * are any. Even if we're not interested in the transaction's contents, it
3198 * could have manipulated the catalog and we need to update the caches
3199 * according to that.
3200 */
3201 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3203 txn->invalidations);
3204 else
3205 Assert(txn->ninvalidations == 0);
3206
3207 /* remove potential on-disk data, and deallocate */
3209}

References Assert, ReorderBufferTXN::base_snapshot, fb(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)
extern

Definition at line 418 of file reorderbuffer.c.

419{
420 MemoryContext context = rb->context;
421
422 /*
423 * We free separately allocated data by entirely scrapping reorderbuffer's
424 * memory context.
425 */
426 MemoryContextDelete(context);
427
428 /* Free disk space used by unconsumed reorder buffers */
430}
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472

References ReplicationSlot::data, fb(), MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferFreeChange()

void ReorderBufferFreeChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)
extern

Definition at line 523 of file reorderbuffer.c.

525{
526 /* update memory accounting info */
527 if (upd_mem)
530
531 /* free contained data */
532 switch (change->action)
533 {
538 if (change->data.tp.newtuple)
539 {
541 change->data.tp.newtuple = NULL;
542 }
543
544 if (change->data.tp.oldtuple)
545 {
547 change->data.tp.oldtuple = NULL;
548 }
549 break;
551 if (change->data.msg.prefix != NULL)
552 pfree(change->data.msg.prefix);
553 change->data.msg.prefix = NULL;
554 if (change->data.msg.message != NULL)
555 pfree(change->data.msg.message);
556 change->data.msg.message = NULL;
557 break;
559 if (change->data.inval.invalidations)
560 pfree(change->data.inval.invalidations);
561 change->data.inval.invalidations = NULL;
562 break;
564 if (change->data.snapshot)
565 {
567 change->data.snapshot = NULL;
568 }
569 break;
570 /* no data in addition to the struct itself */
572 if (change->data.truncate.relids != NULL)
573 {
575 change->data.truncate.relids = NULL;
576 }
577 break;
582 break;
583 }
584
585 pfree(change);
586}
void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)
void ReorderBufferFreeTupleBuf(HeapTuple tuple)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
struct ReorderBufferChange::@117::@119 truncate
struct ReorderBufferChange::@117::@122 inval
struct ReorderBufferChange::@117::@120 msg
SharedInvalidationMessage * invalidations
struct ReorderBufferChange::@117::@118 tp

References ReorderBufferChange::action, ReorderBufferChange::data, fb(), ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferChange::message, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::oldtuple, pfree(), ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeRelids(), ReorderBufferFreeSnap(), ReorderBufferFreeTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferFreeRelids()

void ReorderBufferFreeRelids ( ReorderBuffer rb,
Oid relids 
)
extern

Definition at line 642 of file reorderbuffer.c.

643{
644 pfree(relids);
645}

References pfree().

Referenced by ReorderBufferFreeChange().

◆ ReorderBufferFreeTupleBuf()

void ReorderBufferFreeTupleBuf ( HeapTuple  tuple)
extern

Definition at line 611 of file reorderbuffer.c.

612{
613 pfree(tuple);
614}

References pfree().

Referenced by ReorderBufferFreeChange().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId * ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)
extern

Definition at line 3689 of file reorderbuffer.c.

3690{
3691 dlist_iter iter;
3692 TransactionId *xids = NULL;
3693 size_t xcnt = 0;
3694
3695 /* Quick return if the list is empty */
3696 if (dclist_count(&rb->catchange_txns) == 0)
3697 return NULL;
3698
3699 /* Initialize XID array */
3700 xids = palloc_array(TransactionId, dclist_count(&rb->catchange_txns));
3701 dclist_foreach(iter, &rb->catchange_txns)
3702 {
3704 catchange_node,
3705 iter.cur);
3706
3708
3709 xids[xcnt++] = txn->xid;
3710 }
3711
3712 qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3713
3714 Assert(xcnt == dclist_count(&rb->catchange_txns));
3715 return xids;
3716}
#define palloc_array(type, count)
Definition fe_memutils.h:76
#define dclist_container(type, membername, ptr)
Definition ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition ilist.h:932
#define dclist_foreach(iter, lhead)
Definition ilist.h:970
#define qsort(a, b, c, d)
Definition port.h:495
#define rbtxn_has_catalog_changes(txn)
dlist_node * cur
Definition ilist.h:179
int xidComparator(const void *arg1, const void *arg2)
Definition xid.c:152

References Assert, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, fb(), palloc_array, qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetInvalidations()

uint32 ReorderBufferGetInvalidations ( ReorderBuffer rb,
TransactionId  xid,
SharedInvalidationMessage **  msgs 
)
extern

Definition at line 5629 of file reorderbuffer.c.

5631{
5632 ReorderBufferTXN *txn;
5633
5634 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
5635 false);
5636
5637 if (txn == NULL)
5638 return 0;
5639
5640 *msgs = txn->invalidations;
5641
5642 return txn->ninvalidations;
5643}

References fb(), ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, and ReorderBufferTXNByXid().

Referenced by SnapBuildDistributeSnapshotAndInval().

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN * ReorderBufferGetOldestTXN ( ReorderBuffer rb)
extern

Definition at line 1045 of file reorderbuffer.c.

1046{
1047 ReorderBufferTXN *txn;
1048
1050
1051 if (dlist_is_empty(&rb->toplevel_by_lsn))
1052 return NULL;
1053
1054 txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
1055
1058 return txn;
1059}
#define dlist_head_element(type, membername, lhead)
Definition ilist.h:603
static bool dlist_is_empty(const dlist_head *head)
Definition ilist.h:336

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), fb(), ReorderBufferTXN::first_lsn, rbtxn_is_known_subxact, and XLogRecPtrIsValid.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)
extern

Definition at line 1073 of file reorderbuffer.c.

1074{
1075 ReorderBufferTXN *txn;
1076
1078
1079 if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
1080 return InvalidTransactionId;
1081
1082 txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1083 &rb->txns_by_base_snapshot_lsn);
1084 return txn->base_snapshot->xmin;
1085}
TransactionId xmin
Definition snapshot.h:153

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), fb(), InvalidTransactionId, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)
extern

Definition at line 3251 of file reorderbuffer.c.

3253{
3257 int i;
3258
3259 if (use_subtxn)
3261
3262 /*
3263 * Force invalidations to happen outside of a valid transaction - that way
3264 * entries will just be marked as invalid without accessing the catalog.
3265 * That's advantageous because we don't need to setup the full state
3266 * necessary for catalog access.
3267 */
3268 if (use_subtxn)
3270
3271 for (i = 0; i < ninvalidations; i++)
3272 LocalExecuteInvalidationMessage(&invalidations[i]);
3273
3274 if (use_subtxn)
3275 {
3278 CurrentResourceOwner = cowner;
3279 }
3280}
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition inval.c:823
int i
Definition isn.c:77
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5012
void BeginInternalSubTransaction(const char *name)
Definition xact.c:4717
void RollbackAndReleaseCurrentSubTransaction(void)
Definition xact.c:4819
void AbortCurrentTransaction(void)
Definition xact.c:3473

References AbortCurrentTransaction(), BeginInternalSubTransaction(), CurrentMemoryContext, CurrentResourceOwner, fb(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), MemoryContextSwitchTo(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)
extern

Definition at line 3220 of file reorderbuffer.c.

3221{
3222 ReorderBufferTXN *txn;
3223
3224 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3225 false);
3226
3227 /* unknown, nothing to do */
3228 if (txn == NULL)
3229 return;
3230
3231 /*
3232 * Process cache invalidation messages if there are any. Even if we're not
3233 * interested in the transaction's contents, it could have manipulated the
3234 * catalog and we need to update the caches according to that.
3235 */
3236 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3238 txn->invalidations);
3239 else
3240 Assert(txn->ninvalidations == 0);
3241}

References Assert, ReorderBufferTXN::base_snapshot, fb(), ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char gid 
)
extern

Definition at line 2958 of file reorderbuffer.c.

2960{
2961 ReorderBufferTXN *txn;
2962
2963 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2964 false);
2965
2966 /* unknown transaction, nothing to replay */
2967 if (txn == NULL)
2968 return;
2969
2970 /*
2971 * txn must have been marked as a prepared transaction and must have
2972 * neither been skipped nor sent a prepare. Also, the prepare info must
2973 * have been updated in it by now.
2974 */
2977
2978 txn->gid = pstrdup(gid);
2979
2980 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2981 txn->prepare_time, txn->origin_id, txn->origin_lsn);
2982
2983 /*
2984 * Send a prepare if not already done so. This might occur if we have
2985 * detected a concurrent abort while replaying the non-streaming
2986 * transaction.
2987 */
2988 if (!rbtxn_sent_prepare(txn))
2989 {
2990 rb->prepare(rb, txn, txn->final_lsn);
2992 }
2993}
#define rbtxn_sent_prepare(txn)

References Assert, ReorderBufferTXN::end_lsn, fb(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SENT_PREPARE, rbtxn_sent_prepare, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and XLogRecPtrIsValid.

Referenced by DecodePrepare().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)
extern

Definition at line 3293 of file reorderbuffer.c.

3294{
3295 /* many records won't have an xid assigned, centralize check here */
3296 if (xid != InvalidTransactionId)
3297 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3298}

References fb(), InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)
extern

Definition at line 811 of file reorderbuffer.c.

813{
814 ReorderBufferTXN *txn;
815
816 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
817
818 /*
819 * If we have detected that the transaction is aborted while streaming the
820 * previous changes or by checking its CLOG, there is no point in
821 * collecting further changes for it.
822 */
823 if (rbtxn_is_aborted(txn))
824 {
825 /*
826 * We don't need to update memory accounting for this change as we
827 * have not added it to the queue yet.
828 */
829 ReorderBufferFreeChange(rb, change, false);
830 return;
831 }
832
833 /*
834 * The changes that are sent downstream are considered streamable. We
835 * remember such transactions so that only those will later be considered
836 * for streaming.
837 */
838 if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
844 {
845 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
846
848 }
849
850 change->lsn = lsn;
851 change->txn = txn;
852
854 dlist_push_tail(&txn->changes, &change->node);
855 txn->nentries++;
856 txn->nentries_mem++;
857
858 /* update memory accounting information */
861
862 /* process partial change */
864
865 /* check the memory limits and evict something if needed */
867}
void ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
#define rbtxn_is_aborted(txn)

References ReorderBufferChange::action, Assert, ReorderBufferTXN::changes, dlist_push_tail(), fb(), ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, rbtxn_is_aborted, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferFreeChange(), ReorderBufferProcessPartialChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, ReorderBufferTXN::txn_flags, and XLogRecPtrIsValid.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), ReorderBufferQueueInvalidations(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char prefix,
Size  message_size,
const char message 
)
extern

Definition at line 874 of file reorderbuffer.c.

878{
879 if (transactional)
880 {
881 MemoryContext oldcontext;
882 ReorderBufferChange *change;
883
885
886 /*
887 * We don't expect snapshots for transactional changes - we'll use the
888 * snapshot derived later during apply (unless the change gets
889 * skipped).
890 */
891 Assert(!snap);
892
893 oldcontext = MemoryContextSwitchTo(rb->context);
894
897 change->data.msg.prefix = pstrdup(prefix);
898 change->data.msg.message_size = message_size;
899 change->data.msg.message = palloc(message_size);
900 memcpy(change->data.msg.message, message, message_size);
901
902 ReorderBufferQueueChange(rb, xid, lsn, change, false);
903
904 MemoryContextSwitchTo(oldcontext);
905 }
906 else
907 {
908 ReorderBufferTXN *txn = NULL;
909 volatile Snapshot snapshot_now = snap;
910
911 /* Non-transactional changes require a valid snapshot. */
912 Assert(snapshot_now);
913
914 if (xid != InvalidTransactionId)
915 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
916
917 /* setup snapshot to allow catalog access */
918 SetupHistoricSnapshot(snapshot_now, NULL);
919 PG_TRY();
920 {
921 rb->message(rb, txn, lsn, false, prefix, message_size, message);
922
924 }
925 PG_CATCH();
926 {
928 PG_RE_THROW();
929 }
930 PG_END_TRY();
931 }
932}
#define PG_RE_THROW()
Definition elog.h:405
#define PG_TRY(...)
Definition elog.h:372
#define PG_END_TRY(...)
Definition elog.h:397
#define PG_CATCH(...)
Definition elog.h:382
void * palloc(Size size)
Definition mcxt.c:1387
void TeardownHistoricSnapshot(bool is_error)
Definition snapmgr.c:1685
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition snapmgr.c:1669

References ReorderBufferChange::action, Assert, ReorderBufferChange::data, fb(), InvalidTransactionId, MemoryContextSwitchTo(), ReorderBufferChange::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBufferChange::prefix, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferAllocChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
ReplOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
extern

Definition at line 2905 of file reorderbuffer.c.

2909{
2910 ReorderBufferTXN *txn;
2911
2912 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2913
2914 /* unknown transaction, nothing to do */
2915 if (txn == NULL)
2916 return false;
2917
2918 /*
2919 * Remember the prepare information to be later used by commit prepared in
2920 * case we skip doing prepare.
2921 */
2922 txn->final_lsn = prepare_lsn;
2923 txn->end_lsn = end_lsn;
2924 txn->prepare_time = prepare_time;
2925 txn->origin_id = origin_id;
2926 txn->origin_lsn = origin_lsn;
2927
2928 /* Mark this transaction as a prepared transaction */
2931
2932 return true;
2933}

References Assert, ReorderBufferTXN::end_lsn, fb(), ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)
extern

Definition at line 3324 of file reorderbuffer.c.

3326{
3327 ReorderBufferTXN *txn;
3328 bool is_new;
3329
3330 Assert(snap != NULL);
3331
3332 /*
3333 * Fetch the transaction to operate on. If we know it's a subtransaction,
3334 * operate on its top-level transaction instead.
3335 */
3336 txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3337 if (rbtxn_is_known_subxact(txn))
3338 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3339 NULL, InvalidXLogRecPtr, false);
3340 Assert(txn->base_snapshot == NULL);
3341
3342 txn->base_snapshot = snap;
3343 txn->base_snapshot_lsn = lsn;
3344 dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
3345
3347}

References Assert, AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), fb(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), and ReorderBufferTXN::toplevel_xid.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)
extern

Definition at line 1088 of file reorderbuffer.c.

1089{
1090 rb->current_restart_decoding_lsn = ptr;
1091}

References fb().

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)
extern

Definition at line 2937 of file reorderbuffer.c.

2938{
2939 ReorderBufferTXN *txn;
2940
2941 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2942
2943 /* unknown transaction, nothing to do */
2944 if (txn == NULL)
2945 return;
2946
2947 /* txn must have been marked as a prepared transaction */
2950}

References Assert, fb(), InvalidXLogRecPtr, RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)
extern

Definition at line 3740 of file reorderbuffer.c.

3741{
3742 ReorderBufferTXN *txn;
3743
3744 txn = ReorderBufferTXNByXid(rb, xid, false,
3745 NULL, InvalidXLogRecPtr, false);
3746
3747 /* transaction isn't known yet, ergo no snapshot */
3748 if (txn == NULL)
3749 return false;
3750
3751 /* a known subtxn? operate on top-level txn instead */
3752 if (rbtxn_is_known_subxact(txn))
3753 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3754 NULL, InvalidXLogRecPtr, false);
3755
3756 return txn->base_snapshot != NULL;
3757}

References ReorderBufferTXN::base_snapshot, fb(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), and ReorderBufferTXN::toplevel_xid.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeSnapshotAndInval(), and SnapBuildProcessChange().

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)
extern

Definition at line 3723 of file reorderbuffer.c.

3724{
3725 ReorderBufferTXN *txn;
3726
3727 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3728 false);
3729 if (txn == NULL)
3730 return false;
3731
3732 return rbtxn_has_catalog_changes(txn);
3733}

References fb(), InvalidXLogRecPtr, rbtxn_has_catalog_changes, and ReorderBufferTXNByXid().

Referenced by SnapBuildXidHasCatalogChanges().

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)
extern

Definition at line 3651 of file reorderbuffer.c.

3653{
3654 ReorderBufferTXN *txn;
3655
3656 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3657
3658 if (!rbtxn_has_catalog_changes(txn))
3659 {
3661 dclist_push_tail(&rb->catchange_txns, &txn->catchange_node);
3662 }
3663
3664 /*
3665 * Mark top-level transaction as having catalog changes too if one of its
3666 * children has so that the ReorderBufferBuildTupleCidHash can
3667 * conveniently check just top-level transaction and decide whether to
3668 * build the hash table or not.
3669 */
3670 if (rbtxn_is_subtxn(txn))
3671 {
3672 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3673
3674 if (!rbtxn_has_catalog_changes(toptxn))
3675 {
3677 dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
3678 }
3679 }
3680}
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition ilist.h:709
#define rbtxn_is_subtxn(txn)

References ReorderBufferTXN::catchange_node, dclist_push_tail(), fb(), rbtxn_get_toptxn, RBTXN_HAS_CATALOG_CHANGES, rbtxn_has_catalog_changes, rbtxn_is_subtxn, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by SnapBuildProcessNewCid(), and xact_decode().

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )
extern

Definition at line 4939 of file reorderbuffer.c.

4940{
4942 struct dirent *logical_de;
4943
4946 {
4947 if (strcmp(logical_de->d_name, ".") == 0 ||
4948 strcmp(logical_de->d_name, "..") == 0)
4949 continue;
4950
4951 /* if it cannot be a slot, skip the directory */
4952 if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2))
4953 continue;
4954
4955 /*
4956 * ok, has to be a surviving logical slot, iterate and delete
4957 * everything starting with xid-*
4958 */
4960 }
4962}
int FreeDir(DIR *dir)
Definition fd.c:3009
DIR * AllocateDir(const char *dirname)
Definition fd.c:2891
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition fd.c:2957
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:268
#define PG_REPLSLOT_DIR
Definition slot.h:21
Definition dirent.c:26

References AllocateDir(), DEBUG2, fb(), FreeDir(), PG_REPLSLOT_DIR, ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

Variable Documentation

◆ debug_logical_replication_streaming

PGDLLIMPORT int debug_logical_replication_streaming
extern

◆ logical_decoding_work_mem

PGDLLIMPORT int logical_decoding_work_mem
extern

Definition at line 226 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().