PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pgrepack.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pgrepack.c
4 * Logical Replication output plugin for REPACK command
5 *
6 * Copyright (c) 2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/pgrepack/pgrepack.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#include "access/detoast.h"
18#include "utils/memutils.h"
19
21
23 OutputPluginOptions *opt, bool is_init);
26 ReorderBufferTXN *txn);
28 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
30 Relation rel, ReorderBufferChange *change);
31static void repack_store_change(LogicalDecodingContext *ctx, Relation relation,
33
34void
43
44
45/* initialize this plugin */
46static void
48 bool is_init)
49{
51
52 /* Probably unnecessary, as we don't use the SQL interface ... */
54
55 /*
56 * REPACK doesn't need access to shared catalogs, so we can speed up the
57 * historic snapshot creation by setting this flag. We'll only have to
58 * wait for transactions in our database.
59 */
60 opt->need_shared_catalogs = false;
61
62 if (ctx->output_plugin_options != NIL)
63 {
66 errmsg("this plugin does not expect any options"));
67 }
68}
69
70static void
74
75/*
76 * As we don't release the slot during processing of particular table, there's
77 * no room for SQL interface, even for debugging purposes. Therefore we need
78 * neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin
79 * callbacks. (Although we might want to write custom callbacks, this API
80 * seems to be unnecessarily generic for our purposes.)
81 */
82
83/* BEGIN callback */
84static void
88
89/* COMMIT callback */
90static void
95
96/*
97 * Callback for individual changed tuples
98 */
99static void
101 Relation relation, ReorderBufferChange *change)
102{
105
106 /* Changes of other relation should not have been decoded. */
107 Assert(RelationGetRelid(relation) == private->relid);
108
109 /* Decode entry depending on its type */
110 switch (change->action)
111 {
113 {
114 HeapTuple newtuple;
115
116 newtuple = change->data.tp.newtuple;
117
118 /*
119 * Identity checks in the main function should have made this
120 * impossible.
121 */
122 if (newtuple == NULL)
123 elog(ERROR, "incomplete insert info");
124
125 repack_store_change(ctx, relation, CHANGE_INSERT, newtuple);
126 }
127 break;
129 {
130 HeapTuple oldtuple,
131 newtuple;
132
133 oldtuple = change->data.tp.oldtuple;
134 newtuple = change->data.tp.newtuple;
135
136 if (newtuple == NULL)
137 elog(ERROR, "incomplete update info");
138
139 if (oldtuple != NULL)
140 repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple);
141
142 repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple);
143 }
144 break;
146 {
147 HeapTuple oldtuple;
148
149 oldtuple = change->data.tp.oldtuple;
150
151 if (oldtuple == NULL)
152 elog(ERROR, "incomplete delete info");
153
154 repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple);
155 }
156 break;
157 default:
158
159 /*
160 * Should not come here. This includes TRUNCATE of the table being
161 * processed. heap_decode() cannot check the file locator easily,
162 * but we assume that TRUNCATE uses AccessExclusiveLock on the
163 * table so it should not occur during REPACK (CONCURRENTLY).
164 */
165 Assert(false);
166 break;
167 }
168}
169
170/*
171 * Write the given tuple, with the given change kind, to the repack spill
172 * file. Later, the repack decoding worker can read these and replay
173 * the operations on the new copy of the table.
174 *
175 * For each change affecting the table being repacked, we store enough
176 * information about each tuple in it, so that it can be replayed in the
177 * new copy of the table.
178 */
179static void
182{
185 BufFile *file;
186 List *attrs_ext = NIL;
187 int natt_ext;
188
190 file = dstate->file;
191
192 /* Store the change kind. */
193 BufFileWrite(file, &kind, 1);
194
195 /* Use a frequently-reset context to avoid dealing with leaks manually */
196 oldcxt = MemoryContextSwitchTo(dstate->change_cxt);
197
198 /*
199 * If the tuple contains "external indirect" attributes, we need to write
200 * the contents to the file because we have no control over that memory.
201 */
202 if (HeapTupleHasExternal(tuple))
203 {
204 TupleDesc desc = RelationGetDescr(relation);
205 TupleTableSlot *slot;
206
207 /* Initialize the slot, if not done already */
208 if (dstate->slot == NULL)
209 {
211
212 MemoryContextSwitchTo(dstate->worker_cxt);
214 CurrentResourceOwner = dstate->worker_resowner;
216 MemoryContextSwitchTo(dstate->change_cxt);
218 }
219
220 slot = dstate->slot;
221 ExecStoreHeapTuple(tuple, slot, false);
222
223 /*
224 * Loop over all attributes, and find out which ones we need to spill
225 * separately, to wit: each one that's a non-null varlena and stored
226 * out of line.
227 */
228 for (int i = 0; i < desc->natts; i++)
229 {
232
233 if (attr->attisdropped || attr->attlen != -1 ||
234 slot_attisnull(slot, i + 1))
235 continue;
236
237 slot_getsomeattrs(slot, i + 1);
238
239 /*
240 * This is a non-null varlena datum, but we only care if it's
241 * out-of-line
242 */
245 continue;
246
247 /*
248 * We spill any indirect-external attributes separately from the
249 * heap tuple. Anything else is written as is.
250 */
253 else
254 {
255 /*
256 * Logical decoding should not produce "external expanded"
257 * attributes (those actually should never appear on disk), so
258 * only TOASTed attribute can be seen here.
259 *
260 * We get here if the table has external values but only
261 * in-line values are being updated now.
262 */
264 }
265 }
266
267 ExecClearTuple(slot);
268 }
269
270 /*
271 * First, write the original heap tuple, prefixed by its length. Note
272 * that the external-toast tag for each toasted attribute will be present
273 * in what we write, so that we know where to restore each one later.
274 */
275 BufFileWrite(file, &tuple->t_len, sizeof(tuple->t_len));
276 BufFileWrite(file, tuple->t_data, tuple->t_len);
277
278 /* Then, write the number of external attributes we found. */
280 BufFileWrite(file, &natt_ext, sizeof(natt_ext));
281
282 /* Finally, the attributes themselves, if any */
284 {
287 /* These attributes could be large, so free them right away */
289 }
290
291 /* Cleanup. */
293 MemoryContextReset(dstate->change_cxt);
294}
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition buffile.c:677
#define PG_USED_FOR_ASSERTS_ONLY
Definition c.h:249
#define Assert(condition)
Definition c.h:943
varlena * detoast_external_attr(varlena *attr)
Definition detoast.c:45
int errcode(int sqlerrcode)
Definition elog.c:874
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:227
#define ereport(elevel,...)
Definition elog.h:151
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsHeapTuple
Definition execTuples.c:85
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
static bool HeapTupleHasExternal(const HeapTupleData *tuple)
int i
Definition isn.c:77
List * lappend(List *list, void *datum)
Definition list.c:339
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
void pfree(void *pointer)
Definition mcxt.c:1616
static char * errmsg
@ OUTPUT_PLUGIN_BINARY_OUTPUT
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
static void repack_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition pgrepack.c:91
static void repack_shutdown(LogicalDecodingContext *ctx)
Definition pgrepack.c:71
static void repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition pgrepack.c:47
PG_MODULE_MAGIC
Definition pgrepack.c:20
static void repack_store_change(LogicalDecodingContext *ctx, Relation relation, ConcurrentChangeKind kind, HeapTuple tuple)
Definition pgrepack.c:180
static void repack_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition pgrepack.c:85
static void repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition pgrepack.c:100
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition pgrepack.c:35
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
static int fb(int x)
#define RelationGetRelid(relation)
Definition rel.h:516
#define RelationGetDescr(relation)
Definition rel.h:542
@ REORDER_BUFFER_CHANGE_INSERT
@ REORDER_BUFFER_CHANGE_DELETE
@ REORDER_BUFFER_CHANGE_UPDATE
#define CHANGE_UPDATE_OLD
#define CHANGE_DELETE
#define CHANGE_UPDATE_NEW
char ConcurrentChangeKind
#define CHANGE_INSERT
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
bool attisdropped
Definition tupdesc.h:78
uint32 t_len
Definition htup.h:64
HeapTupleHeader t_data
Definition htup.h:68
Definition pg_list.h:54
void * output_writer_private
Definition logical.h:81
void * output_plugin_private
Definition logical.h:76
List * output_plugin_options
Definition logical.h:59
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
OutputPluginOutputType output_type
ReorderBufferChangeType action
struct ReorderBufferChange::@120::@121 tp
union ReorderBufferChange::@120 data
Datum * tts_values
Definition tuptable.h:131
Definition c.h:776
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:195
static void slot_getsomeattrs(TupleTableSlot *slot, int attnum)
Definition tuptable.h:376
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
static bool slot_attisnull(TupleTableSlot *slot, int attnum)
Definition tuptable.h:403
static bool VARATT_IS_EXTERNAL_ONDISK(const void *PTR)
Definition varatt.h:361
static Size VARSIZE_ANY(const void *PTR)
Definition varatt.h:460
static bool VARATT_IS_EXTERNAL(const void *PTR)
Definition varatt.h:354
static bool VARATT_IS_EXTERNAL_INDIRECT(const void *PTR)
Definition varatt.h:368
uint64 XLogRecPtr
Definition xlogdefs.h:21