PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pgrepack.c File Reference
#include "postgres.h"
#include "access/detoast.h"
#include "commands/repack.h"
#include "commands/repack_internal.h"
#include "replication/snapbuild.h"
#include "utils/memutils.h"
Include dependency graph for pgrepack.c:

Go to the source code of this file.

Functions

static void repack_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void repack_shutdown (LogicalDecodingContext *ctx)
 
static void repack_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void repack_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void repack_process_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void repack_store_change (LogicalDecodingContext *ctx, Relation relation, ConcurrentChangeKind kind, HeapTuple tuple)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 

Variables

 PG_MODULE_MAGIC
 

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 36 of file pgrepack.c.

37{
43}
static void repack_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition pgrepack.c:102
static void repack_shutdown(LogicalDecodingContext *ctx)
Definition pgrepack.c:82
static void repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition pgrepack.c:48
static void repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition pgrepack.c:111
static void repack_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition pgrepack.c:96
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb

References OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, repack_begin_txn(), repack_commit_txn(), repack_process_change(), repack_shutdown(), repack_startup(), OutputPluginCallbacks::shutdown_cb, and OutputPluginCallbacks::startup_cb.

◆ repack_begin_txn()

static void repack_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 96 of file pgrepack.c.

97{
98}

Referenced by _PG_output_plugin_init().

◆ repack_commit_txn()

static void repack_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 102 of file pgrepack.c.

104{
105}

Referenced by _PG_output_plugin_init().

◆ repack_process_change()

static void repack_process_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 111 of file pgrepack.c.

113{
116
117 /* Changes of other relation should not have been decoded. */
118 Assert(RelationGetRelid(relation) == private->relid);
119
120 /* Decode entry depending on its type */
121 switch (change->action)
122 {
124 {
125 HeapTuple newtuple;
126
127 newtuple = change->data.tp.newtuple;
128
129 /*
130 * Identity checks in the main function should have made this
131 * impossible.
132 */
133 if (newtuple == NULL)
134 elog(ERROR, "incomplete insert info");
135
136 repack_store_change(ctx, relation, CHANGE_INSERT, newtuple);
137 }
138 break;
140 {
141 HeapTuple oldtuple,
142 newtuple;
143
144 oldtuple = change->data.tp.oldtuple;
145 newtuple = change->data.tp.newtuple;
146
147 if (newtuple == NULL)
148 elog(ERROR, "incomplete update info");
149
150 if (oldtuple != NULL)
151 repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple);
152
153 repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple);
154 }
155 break;
157 {
158 HeapTuple oldtuple;
159
160 oldtuple = change->data.tp.oldtuple;
161
162 if (oldtuple == NULL)
163 elog(ERROR, "incomplete delete info");
164
165 repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple);
166 }
167 break;
168 default:
169
170 /*
171 * Should not come here. This includes TRUNCATE of the table being
172 * processed. heap_decode() cannot check the file locator easily,
173 * but we assume that TRUNCATE uses AccessExclusiveLock on the
174 * table so it should not occur during REPACK (CONCURRENTLY).
175 */
176 Assert(false);
177 break;
178 }
179}
#define PG_USED_FOR_ASSERTS_ONLY
Definition c.h:249
#define Assert(condition)
Definition c.h:943
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
static void repack_store_change(LogicalDecodingContext *ctx, Relation relation, ConcurrentChangeKind kind, HeapTuple tuple)
Definition pgrepack.c:191
static int fb(int x)
#define RelationGetRelid(relation)
Definition rel.h:516
@ REORDER_BUFFER_CHANGE_INSERT
@ REORDER_BUFFER_CHANGE_DELETE
@ REORDER_BUFFER_CHANGE_UPDATE
#define CHANGE_UPDATE_OLD
#define CHANGE_DELETE
#define CHANGE_UPDATE_NEW
#define CHANGE_INSERT
void * output_writer_private
Definition logical.h:81
union ReorderBufferChange::@116 data
ReorderBufferChangeType action
struct ReorderBufferChange::@116::@117 tp

References ReorderBufferChange::action, Assert, CHANGE_DELETE, CHANGE_INSERT, CHANGE_UPDATE_NEW, CHANGE_UPDATE_OLD, ReorderBufferChange::data, elog, ERROR, fb(), ReorderBufferChange::newtuple, ReorderBufferChange::oldtuple, LogicalDecodingContext::output_writer_private, PG_USED_FOR_ASSERTS_ONLY, RelationGetRelid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, repack_store_change(), and ReorderBufferChange::tp.

Referenced by _PG_output_plugin_init().

◆ repack_shutdown()

static void repack_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 82 of file pgrepack.c.

83{
84}

Referenced by _PG_output_plugin_init().

◆ repack_startup()

static void repack_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 48 of file pgrepack.c.

50{
52
53 if (!AmRepackWorker())
56 errmsg("unsupported use of logical decoding plugin \"%s\"",
57 "pgrepack"),
58 errdetail("This plugin can only be used by %s.",
59 "REPACK (CONCURRENTLY)"));
60
61 /* Initial setup of our private state */
64 dstate->change_cxt = AllocSetContextCreate(ctx->context,
65 "REPACK - change",
67 /* repack_setup_logical_decoding fills in the rest */
69
70 /* Probably unnecessary, as we don't use the SQL interface ... */
72
73 if (ctx->output_plugin_options != NIL)
74 {
77 errmsg("this plugin does not expect any options"));
78 }
79}
int errcode(int sqlerrcode)
Definition elog.c:875
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define ereport(elevel,...)
Definition elog.h:152
#define palloc0_object(type)
Definition fe_memutils.h:90
MemoryContext CurrentMemoryContext
Definition mcxt.c:161
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
static char * errmsg
@ OUTPUT_PLUGIN_BINARY_OUTPUT
#define NIL
Definition pg_list.h:68
bool AmRepackWorker(void)
MemoryContext context
Definition logical.h:36
List * output_plugin_options
Definition logical.h:59
OutputPluginOutputType output_type

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AmRepackWorker(), Assert, LogicalDecodingContext::context, CurrentMemoryContext, ereport, errcode(), errdetail(), errmsg, ERROR, fb(), NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, OutputPluginOptions::output_type, LogicalDecodingContext::output_writer_private, and palloc0_object.

Referenced by _PG_output_plugin_init().

◆ repack_store_change()

static void repack_store_change ( LogicalDecodingContext ctx,
Relation  relation,
ConcurrentChangeKind  kind,
HeapTuple  tuple 
)
static

Definition at line 191 of file pgrepack.c.

193{
196 BufFile *file;
197 List *attrs_ext = NIL;
198 int natt_ext;
199
201 file = dstate->file;
202
203 /* Store the change kind. */
204 BufFileWrite(file, &kind, 1);
205
206 /* Use a frequently-reset context to avoid dealing with leaks manually */
207 oldcxt = MemoryContextSwitchTo(dstate->change_cxt);
208
209 /*
210 * If the tuple contains "external indirect" attributes, we need to write
211 * the contents to the file because we have no control over that memory.
212 */
213 if (HeapTupleHasExternal(tuple))
214 {
215 TupleDesc desc = RelationGetDescr(relation);
216 TupleTableSlot *slot;
217
218 /* Initialize the slot, if not done already */
219 if (dstate->slot == NULL)
220 {
222
223 MemoryContextSwitchTo(dstate->worker_cxt);
225 CurrentResourceOwner = dstate->worker_resowner;
227 MemoryContextSwitchTo(dstate->change_cxt);
229 }
230
231 slot = dstate->slot;
232 ExecStoreHeapTuple(tuple, slot, false);
233
234 /*
235 * Loop over all attributes, and find out which ones we need to spill
236 * separately, to wit: each one that's a non-null varlena and stored
237 * out of line.
238 */
239 for (int i = 0; i < desc->natts; i++)
240 {
243
244 if (attr->attisdropped || attr->attlen != -1 ||
245 slot_attisnull(slot, i + 1))
246 continue;
247
248 slot_getsomeattrs(slot, i + 1);
249
250 /*
251 * This is a non-null varlena datum, but we only care if it's
252 * out-of-line
253 */
256 continue;
257
258 /*
259 * We spill any indirect-external attributes separately from the
260 * heap tuple. Anything else is written as is.
261 */
264 else
265 {
266 /*
267 * Logical decoding should not produce "external expanded"
268 * attributes (those actually should never appear on disk), so
269 * only TOASTed attribute can be seen here.
270 *
271 * We get here if the table has external values but only
272 * in-line values are being updated now.
273 */
275 }
276 }
277
278 ExecClearTuple(slot);
279 }
280
281 /*
282 * First, write the original heap tuple, prefixed by its length. Note
283 * that the external-toast tag for each toasted attribute will be present
284 * in what we write, so that we know where to restore each one later.
285 */
286 BufFileWrite(file, &tuple->t_len, sizeof(tuple->t_len));
287 BufFileWrite(file, tuple->t_data, tuple->t_len);
288
289 /* Then, write the number of external attributes we found. */
291 BufFileWrite(file, &natt_ext, sizeof(natt_ext));
292
293 /* Finally, the attributes themselves, if any */
295 {
298 /* These attributes could be large, so free them right away */
300 }
301
302 /* Cleanup. */
304 MemoryContextReset(dstate->change_cxt);
305}
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition buffile.c:677
varlena * detoast_external_attr(varlena *attr)
Definition detoast.c:45
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsHeapTuple
Definition execTuples.c:85
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
static bool HeapTupleHasExternal(const HeapTupleData *tuple)
int i
Definition isn.c:77
List * lappend(List *list, void *datum)
Definition list.c:339
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:406
void pfree(void *pointer)
Definition mcxt.c:1619
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
static int list_length(const List *l)
Definition pg_list.h:152
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
#define RelationGetDescr(relation)
Definition rel.h:542
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
bool attisdropped
Definition tupdesc.h:78
uint32 t_len
Definition htup.h:64
HeapTupleHeader t_data
Definition htup.h:68
Definition pg_list.h:54
Datum * tts_values
Definition tuptable.h:131
Definition c.h:776
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:195
static void slot_getsomeattrs(TupleTableSlot *slot, int attnum)
Definition tuptable.h:376
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
static bool slot_attisnull(TupleTableSlot *slot, int attnum)
Definition tuptable.h:403
static bool VARATT_IS_EXTERNAL_ONDISK(const void *PTR)
Definition varatt.h:361
static Size VARSIZE_ANY(const void *PTR)
Definition varatt.h:460
static bool VARATT_IS_EXTERNAL(const void *PTR)
Definition varatt.h:354
static bool VARATT_IS_EXTERNAL_INDIRECT(const void *PTR)
Definition varatt.h:368

References Assert, CompactAttribute::attisdropped, CompactAttribute::attlen, BufFileWrite(), CurrentResourceOwner, DatumGetPointer(), detoast_external_attr(), ExecClearTuple(), ExecStoreHeapTuple(), fb(), foreach_ptr, HeapTupleHasExternal(), i, lappend(), list_length(), MakeSingleTupleTableSlot(), MemoryContextReset(), MemoryContextSwitchTo(), TupleDescData::natts, NIL, LogicalDecodingContext::output_writer_private, pfree(), RelationGetDescr, slot_attisnull(), slot_getsomeattrs(), HeapTupleData::t_data, HeapTupleData::t_len, TupleTableSlot::tts_values, TTSOpsHeapTuple, TupleDescCompactAttr(), VARATT_IS_EXTERNAL(), VARATT_IS_EXTERNAL_INDIRECT(), VARATT_IS_EXTERNAL_ONDISK(), and VARSIZE_ANY().

Referenced by repack_process_change().

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 21 of file pgrepack.c.