PostgreSQL Source Code git master
message.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * message.c
4 * Generic logical messages.
5 *
6 * Copyright (c) 2013-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/message.c
10 *
11 * NOTES
12 *
13 * Generic logical messages allow XLOG logging of arbitrary binary blobs that
14 * get passed to the logical decoding plugin. In normal XLOG processing they
15 * are same as NOOP.
16 *
17 * These messages can be either transactional or non-transactional.
18 * Transactional messages are part of current transaction and will be sent to
19 * decoding plugin using in a same way as DML operations.
20 * Non-transactional messages are sent to the plugin at the time when the
21 * logical decoding reads them from XLOG. This also means that transactional
22 * messages won't be delivered if the transaction was rolled back but the
23 * non-transactional one will always be delivered.
24 *
25 * Every message carries prefix to avoid conflicts between different decoding
26 * plugins. The plugin authors must take extra care to use unique prefix,
27 * good options seems to be for example to use the name of the extension.
28 *
29 * ---------------------------------------------------------------------------
30 */
31
32#include "postgres.h"
33
34#include "access/xact.h"
35#include "access/xloginsert.h"
36#include "miscadmin.h"
37#include "replication/message.h"
38
39/*
40 * Write logical decoding message into XLog.
41 */
43LogLogicalMessage(const char *prefix, const char *message, size_t size,
44 bool transactional, bool flush)
45{
47 XLogRecPtr lsn;
48
49 /*
50 * Force xid to be allocated if we're emitting a transactional message.
51 */
52 if (transactional)
53 {
56 }
57
58 xlrec.dbId = MyDatabaseId;
59 xlrec.transactional = transactional;
60 /* trailing zero is critical; see logicalmsg_desc */
61 xlrec.prefix_size = strlen(prefix) + 1;
62 xlrec.message_size = size;
63
66 XLogRegisterData(prefix, xlrec.prefix_size);
67 XLogRegisterData(message, size);
68
69 /* allow origin filtering */
71
72 lsn = XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
73
74 /*
75 * Make sure that the message hits disk before leaving if emitting a
76 * non-transactional message when flush is requested.
77 */
78 if (!transactional && flush)
79 XLogFlush(lsn);
80 return lsn;
81}
82
83/*
84 * Redo is basically just noop for logical decoding messages.
85 */
86void
88{
89 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
90
91 if (info != XLOG_LOGICAL_MESSAGE)
92 elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
93
94 /* This is only interesting for logical decoding, see decode.c. */
95}
uint8_t uint8
Definition: c.h:486
#define Assert(condition)
Definition: c.h:815
#define PANIC
Definition: elog.h:42
#define elog(elevel,...)
Definition: elog.h:225
Oid MyDatabaseId
Definition: globals.c:93
XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, size_t size, bool transactional, bool flush)
Definition: message.c:43
void logicalmsg_redo(XLogReaderState *record)
Definition: message.c:87
#define SizeOfLogicalMessage
Definition: message.h:30
#define XLOG_LOGICAL_MESSAGE
Definition: message.h:37
static pg_noinline void Size size
Definition: slab.c:607
bool transactional
Definition: message.h:23
bool IsTransactionState(void)
Definition: xact.c:386
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:453
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2805
#define XLOG_INCLUDE_ORIGIN
Definition: xlog.h:154
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogRegisterData(const void *data, uint32 len)
Definition: xloginsert.c:364
void XLogSetRecordFlags(uint8 flags)
Definition: xloginsert.c:456
void XLogBeginInsert(void)
Definition: xloginsert.c:149
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410