PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
test_decoding.c File Reference
#include "postgres.h"
#include "catalog/pg_type.h"
#include "replication/logical.h"
#include "replication/origin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for test_decoding.c:

Go to the source code of this file.

Data Structures

struct  TestDecodingData
 
struct  TestDecodingTxnData
 

Functions

 PG_MODULE_MAGIC_EXT (.name="test_decoding",.version=PG_VERSION)
 
static void pg_decode_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pg_decode_shutdown (LogicalDecodingContext *ctx)
 
static void pg_decode_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_output_begin (LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 
static void pg_decode_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pg_decode_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static bool pg_decode_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pg_decode_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pg_decode_filter_prepare (LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
 
static void pg_decode_begin_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_decode_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pg_decode_commit_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_rollback_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
static void pg_decode_stream_start (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_output_stream_start (LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 
static void pg_decode_stream_stop (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_decode_stream_abort (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pg_decode_stream_prepare (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pg_decode_stream_commit (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_stream_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pg_decode_stream_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static void pg_decode_stream_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
void _PG_init (void)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void print_literal (StringInfo s, Oid typid, char *outputstr)
 
static void tuple_to_stringinfo (StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 124 of file test_decoding.c.

125{
126 /* other plugins can perform things here */
127}

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 131 of file test_decoding.c.

132{
154}
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)

References OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::begin_prepare_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::commit_prepared_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::filter_prepare_cb, OutputPluginCallbacks::message_cb, pg_decode_begin_prepare_txn(), pg_decode_begin_txn(), pg_decode_change(), pg_decode_commit_prepared_txn(), pg_decode_commit_txn(), pg_decode_filter(), pg_decode_filter_prepare(), pg_decode_message(), pg_decode_prepare_txn(), pg_decode_rollback_prepared_txn(), pg_decode_shutdown(), pg_decode_startup(), pg_decode_stream_abort(), pg_decode_stream_change(), pg_decode_stream_commit(), pg_decode_stream_message(), pg_decode_stream_prepare(), pg_decode_stream_start(), pg_decode_stream_stop(), pg_decode_stream_truncate(), pg_decode_truncate(), OutputPluginCallbacks::prepare_cb, OutputPluginCallbacks::rollback_prepared_cb, OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_prepare_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

◆ pg_decode_begin_prepare_txn()

static void pg_decode_begin_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 350 of file test_decoding.c.

351{
353 TestDecodingTxnData *txndata =
355
356 txndata->xact_wrote_changes = false;
357 txn->output_plugin_private = txndata;
358
359 /*
360 * If asked to skip empty transactions, we'll emit BEGIN at the point
361 * where the first operation is received for this transaction.
362 */
363 if (data->skip_empty_xacts)
364 return;
365
366 pg_output_begin(ctx, data, txn, true);
367}
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1294
const void * data
MemoryContext context
Definition: logical.h:36
void * output_plugin_private
Definition: logical.h:76
void * output_plugin_private
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

References LogicalDecodingContext::context, data, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_begin(), and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_begin_txn()

static void pg_decode_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 290 of file test_decoding.c.

291{
293 TestDecodingTxnData *txndata =
295
296 txndata->xact_wrote_changes = false;
297 txn->output_plugin_private = txndata;
298
299 /*
300 * If asked to skip empty transactions, we'll emit BEGIN at the point
301 * where the first operation is received for this transaction.
302 */
303 if (data->skip_empty_xacts)
304 return;
305
306 pg_output_begin(ctx, data, txn, true);
307}

References LogicalDecodingContext::context, data, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_begin(), and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_change()

static void pg_decode_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 603 of file test_decoding.c.

605{
607 TestDecodingTxnData *txndata;
608 Form_pg_class class_form;
609 TupleDesc tupdesc;
610 MemoryContext old;
611
613 txndata = txn->output_plugin_private;
614
615 /* output BEGIN if we haven't yet */
616 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
617 {
618 pg_output_begin(ctx, data, txn, false);
619 }
620 txndata->xact_wrote_changes = true;
621
622 class_form = RelationGetForm(relation);
623 tupdesc = RelationGetDescr(relation);
624
625 /* Avoid leaking memory by using and resetting our own context */
626 old = MemoryContextSwitchTo(data->context);
627
628 OutputPluginPrepareWrite(ctx, true);
629
630 appendStringInfoString(ctx->out, "table ");
633 class_form->relrewrite ?
634 get_rel_name(class_form->relrewrite) :
635 NameStr(class_form->relname)));
636 appendStringInfoChar(ctx->out, ':');
637
638 switch (change->action)
639 {
641 appendStringInfoString(ctx->out, " INSERT:");
642 if (change->data.tp.newtuple == NULL)
643 appendStringInfoString(ctx->out, " (no-tuple-data)");
644 else
645 tuple_to_stringinfo(ctx->out, tupdesc,
646 change->data.tp.newtuple,
647 false);
648 break;
650 appendStringInfoString(ctx->out, " UPDATE:");
651 if (change->data.tp.oldtuple != NULL)
652 {
653 appendStringInfoString(ctx->out, " old-key:");
654 tuple_to_stringinfo(ctx->out, tupdesc,
655 change->data.tp.oldtuple,
656 true);
657 appendStringInfoString(ctx->out, " new-tuple:");
658 }
659
660 if (change->data.tp.newtuple == NULL)
661 appendStringInfoString(ctx->out, " (no-tuple-data)");
662 else
663 tuple_to_stringinfo(ctx->out, tupdesc,
664 change->data.tp.newtuple,
665 false);
666 break;
668 appendStringInfoString(ctx->out, " DELETE:");
669
670 /* if there was no PK, we only know that a delete happened */
671 if (change->data.tp.oldtuple == NULL)
672 appendStringInfoString(ctx->out, " (no-tuple-data)");
673 /* In DELETE, only the replica identity is present; display that */
674 else
675 tuple_to_stringinfo(ctx->out, tupdesc,
676 change->data.tp.oldtuple,
677 true);
678 break;
679 default:
680 Assert(false);
681 }
682
684 MemoryContextReset(data->context);
685
686 OutputPluginWrite(ctx, true);
687}
#define NameStr(name)
Definition: c.h:717
Assert(PointerIsAligned(start, uint64))
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:703
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:690
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2068
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:2092
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3506
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:414
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
FormData_pg_class * Form_pg_class
Definition: pg_class.h:156
#define RelationGetForm(relation)
Definition: rel.h:510
#define RelationGetRelid(relation)
Definition: rel.h:516
#define RelationGetDescr(relation)
Definition: rel.h:542
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:13103
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
StringInfo out
Definition: logical.h:71
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
union ReorderBufferChange::@110 data
struct ReorderBufferChange::@110::@111 tp
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)

References ReorderBufferChange::action, appendStringInfoChar(), appendStringInfoString(), Assert(), ReorderBufferChange::data, data, get_namespace_name(), get_rel_name(), get_rel_namespace(), MemoryContextReset(), MemoryContextSwitchTo(), NameStr, ReorderBufferChange::newtuple, ReorderBufferChange::oldtuple, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), RelationGetDescr, RelationGetForm, RelationGetRelid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, tuple_to_stringinfo(), and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_commit_prepared_txn()

static void pg_decode_commit_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 401 of file test_decoding.c.

403{
405
406 OutputPluginPrepareWrite(ctx, true);
407
408 appendStringInfo(ctx->out, "COMMIT PREPARED %s",
409 quote_literal_cstr(txn->gid));
410
411 if (data->include_xids)
412 appendStringInfo(ctx->out, ", txid %u", txn->xid);
413
414 if (data->include_timestamp)
415 appendStringInfo(ctx->out, " (at %s)",
417
418 OutputPluginWrite(ctx, true);
419}
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1862
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
TimestampTz commit_time
TransactionId xid
union ReorderBufferTXN::@116 xact_time

References appendStringInfo(), ReorderBufferTXN::commit_time, data, ReorderBufferTXN::gid, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), quote_literal_cstr(), timestamptz_to_str(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_commit_txn()

static void pg_decode_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 322 of file test_decoding.c.

324{
327 bool xact_wrote_changes = txndata->xact_wrote_changes;
328
329 pfree(txndata);
330 txn->output_plugin_private = NULL;
331
332 if (data->skip_empty_xacts && !xact_wrote_changes)
333 return;
334
335 OutputPluginPrepareWrite(ctx, true);
336 if (data->include_xids)
337 appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
338 else
339 appendStringInfoString(ctx->out, "COMMIT");
340
341 if (data->include_timestamp)
342 appendStringInfo(ctx->out, " (at %s)",
344
345 OutputPluginWrite(ctx, true);
346}
void pfree(void *pointer)
Definition: mcxt.c:2150

References appendStringInfo(), appendStringInfoString(), ReorderBufferTXN::commit_time, data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), timestamptz_to_str(), ReorderBufferTXN::xact_time, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_filter()

static bool pg_decode_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 463 of file test_decoding.c.

465{
467
468 if (data->only_local && origin_id != InvalidRepOriginId)
469 return true;
470 return false;
471}
#define InvalidRepOriginId
Definition: origin.h:33

References data, InvalidRepOriginId, and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pg_decode_filter_prepare()

static bool pg_decode_filter_prepare ( LogicalDecodingContext ctx,
TransactionId  xid,
const char *  gid 
)
static

Definition at line 453 of file test_decoding.c.

455{
456 if (strstr(gid, "_nodecode") != NULL)
457 return true;
458
459 return false;
460}

Referenced by _PG_output_plugin_init().

◆ pg_decode_message()

static void pg_decode_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 745 of file test_decoding.c.

748{
750 TestDecodingTxnData *txndata;
751
752 txndata = transactional ? txn->output_plugin_private : NULL;
753
754 /* output BEGIN if we haven't yet for transactional messages */
755 if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
756 pg_output_begin(ctx, data, txn, false);
757
758 if (transactional)
759 txndata->xact_wrote_changes = true;
760
761 OutputPluginPrepareWrite(ctx, true);
762 appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
763 transactional, prefix, sz);
764 appendBinaryStringInfo(ctx->out, message, sz);
765 OutputPluginWrite(ctx, true);
766}
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:281

References appendBinaryStringInfo(), appendStringInfo(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_prepare_txn()

static void pg_decode_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 371 of file test_decoding.c.

373{
376
377 /*
378 * If asked to skip empty transactions, we'll emit PREPARE at the point
379 * where the first operation is received for this transaction.
380 */
381 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
382 return;
383
384 OutputPluginPrepareWrite(ctx, true);
385
386 appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
387 quote_literal_cstr(txn->gid));
388
389 if (data->include_xids)
390 appendStringInfo(ctx->out, ", txid %u", txn->xid);
391
392 if (data->include_timestamp)
393 appendStringInfo(ctx->out, " (at %s)",
395
396 OutputPluginWrite(ctx, true);
397}
TimestampTz prepare_time

References appendStringInfo(), data, ReorderBufferTXN::gid, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), ReorderBufferTXN::prepare_time, quote_literal_cstr(), timestamptz_to_str(), ReorderBufferTXN::xact_time, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_rollback_prepared_txn()

static void pg_decode_rollback_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)
static

◆ pg_decode_shutdown()

static void pg_decode_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 280 of file test_decoding.c.

281{
283
284 /* cleanup our own resources via memory context reset */
285 MemoryContextDelete(data->context);
286}
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:485

References data, MemoryContextDelete(), and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pg_decode_startup()

static void pg_decode_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 159 of file test_decoding.c.

161{
164 bool enable_streaming = false;
165
166 data = palloc0(sizeof(TestDecodingData));
167 data->context = AllocSetContextCreate(ctx->context,
168 "text conversion context",
170 data->include_xids = true;
171 data->include_timestamp = false;
172 data->skip_empty_xacts = false;
173 data->only_local = false;
174
176
178 opt->receive_rewrites = false;
179
180 foreach(option, ctx->output_plugin_options)
181 {
182 DefElem *elem = lfirst(option);
183
184 Assert(elem->arg == NULL || IsA(elem->arg, String));
185
186 if (strcmp(elem->defname, "include-xids") == 0)
187 {
188 /* if option does not provide a value, it means its value is true */
189 if (elem->arg == NULL)
190 data->include_xids = true;
191 else if (!parse_bool(strVal(elem->arg), &data->include_xids))
193 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
194 errmsg("could not parse value \"%s\" for parameter \"%s\"",
195 strVal(elem->arg), elem->defname)));
196 }
197 else if (strcmp(elem->defname, "include-timestamp") == 0)
198 {
199 if (elem->arg == NULL)
200 data->include_timestamp = true;
201 else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
203 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
204 errmsg("could not parse value \"%s\" for parameter \"%s\"",
205 strVal(elem->arg), elem->defname)));
206 }
207 else if (strcmp(elem->defname, "force-binary") == 0)
208 {
209 bool force_binary;
210
211 if (elem->arg == NULL)
212 continue;
213 else if (!parse_bool(strVal(elem->arg), &force_binary))
215 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
216 errmsg("could not parse value \"%s\" for parameter \"%s\"",
217 strVal(elem->arg), elem->defname)));
218
219 if (force_binary)
221 }
222 else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
223 {
224
225 if (elem->arg == NULL)
226 data->skip_empty_xacts = true;
227 else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
229 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
230 errmsg("could not parse value \"%s\" for parameter \"%s\"",
231 strVal(elem->arg), elem->defname)));
232 }
233 else if (strcmp(elem->defname, "only-local") == 0)
234 {
235
236 if (elem->arg == NULL)
237 data->only_local = true;
238 else if (!parse_bool(strVal(elem->arg), &data->only_local))
240 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
241 errmsg("could not parse value \"%s\" for parameter \"%s\"",
242 strVal(elem->arg), elem->defname)));
243 }
244 else if (strcmp(elem->defname, "include-rewrites") == 0)
245 {
246
247 if (elem->arg == NULL)
248 continue;
249 else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
251 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
252 errmsg("could not parse value \"%s\" for parameter \"%s\"",
253 strVal(elem->arg), elem->defname)));
254 }
255 else if (strcmp(elem->defname, "stream-changes") == 0)
256 {
257 if (elem->arg == NULL)
258 continue;
259 else if (!parse_bool(strVal(elem->arg), &enable_streaming))
261 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
262 errmsg("could not parse value \"%s\" for parameter \"%s\"",
263 strVal(elem->arg), elem->defname)));
264 }
265 else
266 {
268 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
269 errmsg("option \"%s\" = \"%s\" is unknown",
270 elem->defname,
271 elem->arg ? strVal(elem->arg) : "(null)")));
272 }
273 }
274
275 ctx->streaming &= enable_streaming;
276}
bool parse_bool(const char *value, bool *result)
Definition: bool.c:31
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
void * palloc0(Size size)
Definition: mcxt.c:1973
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define IsA(nodeptr, _type_)
Definition: nodes.h:164
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
@ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
Definition: output_plugin.h:20
#define lfirst(lc)
Definition: pg_list.h:172
char * defname
Definition: parsenodes.h:826
Node * arg
Definition: parsenodes.h:827
List * output_plugin_options
Definition: logical.h:59
OutputPluginOutputType output_type
Definition: output_plugin.h:28
Definition: value.h:64
#define strVal(v)
Definition: value.h:82

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, DefElem::arg, Assert(), LogicalDecodingContext::context, data, DefElem::defname, ereport, errcode(), errmsg(), ERROR, IsA, lfirst, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OUTPUT_PLUGIN_TEXTUAL_OUTPUT, OutputPluginOptions::output_type, palloc0(), parse_bool(), OutputPluginOptions::receive_rewrites, LogicalDecodingContext::streaming, and strVal.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_abort()

static void pg_decode_stream_abort ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 822 of file test_decoding.c.

825{
827
828 /*
829 * stream abort can be sent for an individual subtransaction but we
830 * maintain the output_plugin_private only under the toptxn so if this is
831 * not the toptxn then fetch the toptxn.
832 */
833 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
835 bool xact_wrote_changes = txndata->xact_wrote_changes;
836
837 if (rbtxn_is_toptxn(txn))
838 {
839 Assert(txn->output_plugin_private != NULL);
840 pfree(txndata);
841 txn->output_plugin_private = NULL;
842 }
843
844 if (data->skip_empty_xacts && !xact_wrote_changes)
845 return;
846
847 OutputPluginPrepareWrite(ctx, true);
848 if (data->include_xids)
849 appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
850 else
851 appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
852 OutputPluginWrite(ctx, true);
853}
#define rbtxn_is_toptxn(txn)
#define rbtxn_get_toptxn(txn)

References appendStringInfo(), appendStringInfoString(), Assert(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), rbtxn_get_toptxn, rbtxn_is_toptxn, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_change()

static void pg_decode_stream_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 917 of file test_decoding.c.

921{
924
925 /* output stream start if we haven't yet */
926 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
927 {
928 pg_output_stream_start(ctx, data, txn, false);
929 }
930 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
931
932 OutputPluginPrepareWrite(ctx, true);
933 if (data->include_xids)
934 appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
935 else
936 appendStringInfoString(ctx->out, "streaming change for transaction");
937 OutputPluginWrite(ctx, true);
938}
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_stream_start(), TestDecodingTxnData::stream_wrote_changes, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_commit()

static void pg_decode_stream_commit ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 883 of file test_decoding.c.

886{
889 bool xact_wrote_changes = txndata->xact_wrote_changes;
890
891 pfree(txndata);
892 txn->output_plugin_private = NULL;
893
894 if (data->skip_empty_xacts && !xact_wrote_changes)
895 return;
896
897 OutputPluginPrepareWrite(ctx, true);
898
899 if (data->include_xids)
900 appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
901 else
902 appendStringInfoString(ctx->out, "committing streamed transaction");
903
904 if (data->include_timestamp)
905 appendStringInfo(ctx->out, " (at %s)",
907
908 OutputPluginWrite(ctx, true);
909}

References appendStringInfo(), appendStringInfoString(), ReorderBufferTXN::commit_time, data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), timestamptz_to_str(), ReorderBufferTXN::xact_time, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_message()

static void pg_decode_stream_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 946 of file test_decoding.c.

949{
950 /* Output stream start if we haven't yet for transactional messages. */
951 if (transactional)
952 {
955
956 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
957 {
958 pg_output_stream_start(ctx, data, txn, false);
959 }
960 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
961 }
962
963 OutputPluginPrepareWrite(ctx, true);
964
965 if (transactional)
966 {
967 appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
968 transactional, prefix, sz);
969 }
970 else
971 {
972 appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
973 transactional, prefix, sz);
974 appendBinaryStringInfo(ctx->out, message, sz);
975 }
976
977 OutputPluginWrite(ctx, true);
978}

References appendBinaryStringInfo(), appendStringInfo(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_stream_start(), TestDecodingTxnData::stream_wrote_changes, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_prepare()

static void pg_decode_stream_prepare ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 856 of file test_decoding.c.

859{
862
863 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
864 return;
865
866 OutputPluginPrepareWrite(ctx, true);
867
868 if (data->include_xids)
869 appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
870 quote_literal_cstr(txn->gid), txn->xid);
871 else
872 appendStringInfo(ctx->out, "preparing streamed transaction %s",
873 quote_literal_cstr(txn->gid));
874
875 if (data->include_timestamp)
876 appendStringInfo(ctx->out, " (at %s)",
878
879 OutputPluginWrite(ctx, true);
880}

References appendStringInfo(), data, ReorderBufferTXN::gid, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), ReorderBufferTXN::prepare_time, quote_literal_cstr(), timestamptz_to_str(), ReorderBufferTXN::xact_time, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_start()

static void pg_decode_stream_start ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 769 of file test_decoding.c.

771{
774
775 /*
776 * Allocate the txn plugin data for the first stream in the transaction.
777 */
778 if (txndata == NULL)
779 {
780 txndata =
782 txndata->xact_wrote_changes = false;
783 txn->output_plugin_private = txndata;
784 }
785
786 txndata->stream_wrote_changes = false;
787 if (data->skip_empty_xacts)
788 return;
789 pg_output_stream_start(ctx, data, txn, true);
790}

References LogicalDecodingContext::context, data, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_stream_start(), TestDecodingTxnData::stream_wrote_changes, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_stop()

static void pg_decode_stream_stop ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 804 of file test_decoding.c.

806{
809
810 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
811 return;
812
813 OutputPluginPrepareWrite(ctx, true);
814 if (data->include_xids)
815 appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
816 else
817 appendStringInfoString(ctx->out, "closing a streamed block for transaction");
818 OutputPluginWrite(ctx, true);
819}

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), TestDecodingTxnData::stream_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_truncate()

static void pg_decode_stream_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 985 of file test_decoding.c.

988{
991
992 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
993 {
994 pg_output_stream_start(ctx, data, txn, false);
995 }
996 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
997
998 OutputPluginPrepareWrite(ctx, true);
999 if (data->include_xids)
1000 appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
1001 else
1002 appendStringInfoString(ctx->out, "streaming truncate for transaction");
1003 OutputPluginWrite(ctx, true);
1004}

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_stream_start(), TestDecodingTxnData::stream_wrote_changes, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_truncate()

static void pg_decode_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 690 of file test_decoding.c.

692{
694 TestDecodingTxnData *txndata;
695 MemoryContext old;
696 int i;
697
699 txndata = txn->output_plugin_private;
700
701 /* output BEGIN if we haven't yet */
702 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
703 {
704 pg_output_begin(ctx, data, txn, false);
705 }
706 txndata->xact_wrote_changes = true;
707
708 /* Avoid leaking memory by using and resetting our own context */
709 old = MemoryContextSwitchTo(data->context);
710
711 OutputPluginPrepareWrite(ctx, true);
712
713 appendStringInfoString(ctx->out, "table ");
714
715 for (i = 0; i < nrelations; i++)
716 {
717 if (i > 0)
718 appendStringInfoString(ctx->out, ", ");
719
721 quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
722 NameStr(relations[i]->rd_rel->relname)));
723 }
724
725 appendStringInfoString(ctx->out, ": TRUNCATE:");
726
727 if (change->data.truncate.restart_seqs
728 || change->data.truncate.cascade)
729 {
730 if (change->data.truncate.restart_seqs)
731 appendStringInfoString(ctx->out, " restart_seqs");
732 if (change->data.truncate.cascade)
733 appendStringInfoString(ctx->out, " cascade");
734 }
735 else
736 appendStringInfoString(ctx->out, " (no-flags)");
737
739 MemoryContextReset(data->context);
740
741 OutputPluginWrite(ctx, true);
742}
int i
Definition: isn.c:77
struct ReorderBufferChange::@110::@112 truncate

References appendStringInfoString(), ReorderBufferChange::cascade, ReorderBufferChange::data, data, get_namespace_name(), i, MemoryContextReset(), MemoryContextSwitchTo(), NameStr, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), ReorderBufferChange::restart_seqs, ReorderBufferChange::truncate, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ PG_MODULE_MAGIC_EXT()

PG_MODULE_MAGIC_EXT ( name = "test_decoding",
version = PG_VERSION 
)

◆ pg_output_begin()

static void pg_output_begin ( LogicalDecodingContext ctx,
TestDecodingData data,
ReorderBufferTXN txn,
bool  last_write 
)
static

◆ pg_output_stream_start()

static void pg_output_stream_start ( LogicalDecodingContext ctx,
TestDecodingData data,
ReorderBufferTXN txn,
bool  last_write 
)
static

Definition at line 793 of file test_decoding.c.

794{
795 OutputPluginPrepareWrite(ctx, last_write);
796 if (data->include_xids)
797 appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
798 else
799 appendStringInfoString(ctx->out, "opening a streamed block for transaction");
800 OutputPluginWrite(ctx, last_write);
801}

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by pg_decode_stream_change(), pg_decode_stream_message(), pg_decode_stream_start(), and pg_decode_stream_truncate().

◆ print_literal()

static void print_literal ( StringInfo  s,
Oid  typid,
char *  outputstr 
)
static

Definition at line 481 of file test_decoding.c.

482{
483 const char *valptr;
484
485 switch (typid)
486 {
487 case INT2OID:
488 case INT4OID:
489 case INT8OID:
490 case OIDOID:
491 case FLOAT4OID:
492 case FLOAT8OID:
493 case NUMERICOID:
494 /* NB: We don't care about Inf, NaN et al. */
495 appendStringInfoString(s, outputstr);
496 break;
497
498 case BITOID:
499 case VARBITOID:
500 appendStringInfo(s, "B'%s'", outputstr);
501 break;
502
503 case BOOLOID:
504 if (strcmp(outputstr, "t") == 0)
505 appendStringInfoString(s, "true");
506 else
507 appendStringInfoString(s, "false");
508 break;
509
510 default:
511 appendStringInfoChar(s, '\'');
512 for (valptr = outputstr; *valptr; valptr++)
513 {
514 char ch = *valptr;
515
516 if (SQL_STR_DOUBLE(ch, false))
519 }
520 appendStringInfoChar(s, '\'');
521 break;
522 }
523}
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1134

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), and SQL_STR_DOUBLE.

Referenced by tuple_to_stringinfo().

◆ tuple_to_stringinfo()

static void tuple_to_stringinfo ( StringInfo  s,
TupleDesc  tupdesc,
HeapTuple  tuple,
bool  skip_nulls 
)
static

Definition at line 527 of file test_decoding.c.

528{
529 int natt;
530
531 /* print all columns individually */
532 for (natt = 0; natt < tupdesc->natts; natt++)
533 {
534 Form_pg_attribute attr; /* the attribute itself */
535 Oid typid; /* type of current attribute */
536 Oid typoutput; /* output function */
537 bool typisvarlena;
538 Datum origval; /* possibly toasted Datum */
539 bool isnull; /* column is null? */
540
541 attr = TupleDescAttr(tupdesc, natt);
542
543 /*
544 * don't print dropped columns, we can't be sure everything is
545 * available for them
546 */
547 if (attr->attisdropped)
548 continue;
549
550 /*
551 * Don't print system columns, oid will already have been printed if
552 * present.
553 */
554 if (attr->attnum < 0)
555 continue;
556
557 typid = attr->atttypid;
558
559 /* get Datum from tuple */
560 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
561
562 if (isnull && skip_nulls)
563 continue;
564
565 /* print attribute name */
566 appendStringInfoChar(s, ' ');
568
569 /* print attribute type */
570 appendStringInfoChar(s, '[');
572 appendStringInfoChar(s, ']');
573
574 /* query output function */
575 getTypeOutputInfo(typid,
576 &typoutput, &typisvarlena);
577
578 /* print separator */
579 appendStringInfoChar(s, ':');
580
581 /* print data */
582 if (isnull)
583 appendStringInfoString(s, "null");
584 else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
585 appendStringInfoString(s, "unchanged-toast-datum");
586 else if (!typisvarlena)
587 print_literal(s, typid,
588 OidOutputFunctionCall(typoutput, origval));
589 else
590 {
591 Datum val; /* definitely detoasted Datum */
592
594 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
595 }
596 }
597}
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1763
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
char * format_type_be(Oid type_oid)
Definition: format_type.c:343
static Datum heap_getattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:904
long val
Definition: informix.c:689
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:3047
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:327
uintptr_t Datum
Definition: postgres.h:69
unsigned int Oid
Definition: postgres_ext.h:30
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:13019
static void print_literal(StringInfo s, Oid typid, char *outputstr)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290

References appendStringInfoChar(), appendStringInfoString(), format_type_be(), getTypeOutputInfo(), heap_getattr(), NameStr, TupleDescData::natts, OidOutputFunctionCall(), PG_DETOAST_DATUM, PointerGetDatum(), print_literal(), quote_identifier(), TupleDescAttr(), val, and VARATT_IS_EXTERNAL_ONDISK.

Referenced by pg_decode_change().