PostgreSQL Source Code  git master
message.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * message.c
4  * Generic logical messages.
5  *
6  * Copyright (c) 2013-2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/message.c
10  *
11  * NOTES
12  *
13  * Generic logical messages allow XLOG logging of arbitrary binary blobs that
14  * get passed to the logical decoding plugin. In normal XLOG processing they
15  * are same as NOOP.
16  *
17  * These messages can be either transactional or non-transactional.
18  * Transactional messages are part of current transaction and will be sent to
19  * decoding plugin using in a same way as DML operations.
20  * Non-transactional messages are sent to the plugin at the time when the
21  * logical decoding reads them from XLOG. This also means that transactional
22  * messages won't be delivered if the transaction was rolled back but the
23  * non-transactional one will always be delivered.
24  *
25  * Every message carries prefix to avoid conflicts between different decoding
26  * plugins. The plugin authors must take extra care to use unique prefix,
27  * good options seems to be for example to use the name of the extension.
28  *
29  * ---------------------------------------------------------------------------
30  */
31 
32 #include "postgres.h"
33 
34 #include "access/xact.h"
35 #include "access/xloginsert.h"
36 #include "miscadmin.h"
37 #include "nodes/execnodes.h"
38 #include "replication/logical.h"
39 #include "replication/message.h"
40 #include "utils/memutils.h"
41 
42 /*
43  * Write logical decoding message into XLog.
44  */
46 LogLogicalMessage(const char *prefix, const char *message, size_t size,
47  bool transactional, bool flush)
48 {
49  xl_logical_message xlrec;
50  XLogRecPtr lsn;
51 
52  /*
53  * Force xid to be allocated if we're emitting a transactional message.
54  */
55  if (transactional)
56  {
59  }
60 
61  xlrec.dbId = MyDatabaseId;
62  xlrec.transactional = transactional;
63  /* trailing zero is critical; see logicalmsg_desc */
64  xlrec.prefix_size = strlen(prefix) + 1;
65  xlrec.message_size = size;
66 
68  XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
69  XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
70  XLogRegisterData(unconstify(char *, message), size);
71 
72  /* allow origin filtering */
74 
75  lsn = XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
76 
77  /*
78  * Make sure that the message hits disk before leaving if emitting a
79  * non-transactional message when flush is requested.
80  */
81  if (!transactional && flush)
82  XLogFlush(lsn);
83  return lsn;
84 }
85 
86 /*
87  * Redo is basically just noop for logical decoding messages.
88  */
89 void
91 {
92  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
93 
94  if (info != XLOG_LOGICAL_MESSAGE)
95  elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
96 
97  /* This is only interesting for logical decoding, see decode.c. */
98 }
#define unconstify(underlying_type, expr)
Definition: c.h:1234
unsigned char uint8
Definition: c.h:493
#define PANIC
Definition: elog.h:42
Oid MyDatabaseId
Definition: globals.c:90
Assert(fmt[strlen(fmt) - 1] !='\n')
XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, size_t size, bool transactional, bool flush)
Definition: message.c:46
void logicalmsg_redo(XLogReaderState *record)
Definition: message.c:90
#define SizeOfLogicalMessage
Definition: message.h:30
#define XLOG_LOGICAL_MESSAGE
Definition: message.h:37
bool transactional
Definition: message.h:23
bool IsTransactionState(void)
Definition: xact.c:378
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:445
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2733
#define XLOG_INCLUDE_ORIGIN
Definition: xlog.h:152
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:365
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:475
void XLogSetRecordFlags(uint8 flags)
Definition: xloginsert.c:457
void XLogBeginInsert(void)
Definition: xloginsert.c:150
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLR_INFO_MASK
Definition: xlogrecord.h:62