PostgreSQL Source Code  git master
test_decoding.c File Reference
#include "postgres.h"
#include "catalog/pg_type.h"
#include "replication/logical.h"
#include "replication/origin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for test_decoding.c:

Go to the source code of this file.

Data Structures

struct  TestDecodingData
 
struct  TestDecodingTxnData
 

Functions

static void pg_decode_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pg_decode_shutdown (LogicalDecodingContext *ctx)
 
static void pg_decode_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_output_begin (LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 
static void pg_decode_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pg_decode_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static bool pg_decode_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pg_decode_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pg_decode_filter_prepare (LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
 
static void pg_decode_begin_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_decode_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pg_decode_commit_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_rollback_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
static void pg_decode_stream_start (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_output_stream_start (LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 
static void pg_decode_stream_stop (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_decode_stream_abort (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pg_decode_stream_prepare (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pg_decode_stream_commit (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_stream_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pg_decode_stream_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static void pg_decode_stream_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
void _PG_init (void)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void print_literal (StringInfo s, Oid typid, char *outputstr)
 
static void tuple_to_stringinfo (StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
 

Variables

 PG_MODULE_MAGIC
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 121 of file test_decoding.c.

122 {
123  /* other plugins can perform things here */
124 }

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 128 of file test_decoding.c.

129 {
151 }
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)

References OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::begin_prepare_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::commit_prepared_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::filter_prepare_cb, OutputPluginCallbacks::message_cb, pg_decode_begin_prepare_txn(), pg_decode_begin_txn(), pg_decode_change(), pg_decode_commit_prepared_txn(), pg_decode_commit_txn(), pg_decode_filter(), pg_decode_filter_prepare(), pg_decode_message(), pg_decode_prepare_txn(), pg_decode_rollback_prepared_txn(), pg_decode_shutdown(), pg_decode_startup(), pg_decode_stream_abort(), pg_decode_stream_change(), pg_decode_stream_commit(), pg_decode_stream_message(), pg_decode_stream_prepare(), pg_decode_stream_start(), pg_decode_stream_stop(), pg_decode_stream_truncate(), pg_decode_truncate(), OutputPluginCallbacks::prepare_cb, OutputPluginCallbacks::rollback_prepared_cb, OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_prepare_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

◆ pg_decode_begin_prepare_txn()

static void pg_decode_begin_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 347 of file test_decoding.c.

348 {
350  TestDecodingTxnData *txndata =
352 
353  txndata->xact_wrote_changes = false;
354  txn->output_plugin_private = txndata;
355 
356  /*
357  * If asked to skip empty transactions, we'll emit BEGIN at the point
358  * where the first operation is received for this transaction.
359  */
360  if (data->skip_empty_xacts)
361  return;
362 
363  pg_output_begin(ctx, data, txn, true);
364 }
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1202
const void * data
MemoryContext context
Definition: logical.h:36
void * output_plugin_private
Definition: logical.h:76
void * output_plugin_private
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

References LogicalDecodingContext::context, data, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_begin(), and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_begin_txn()

static void pg_decode_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 287 of file test_decoding.c.

288 {
290  TestDecodingTxnData *txndata =
292 
293  txndata->xact_wrote_changes = false;
294  txn->output_plugin_private = txndata;
295 
296  /*
297  * If asked to skip empty transactions, we'll emit BEGIN at the point
298  * where the first operation is received for this transaction.
299  */
300  if (data->skip_empty_xacts)
301  return;
302 
303  pg_output_begin(ctx, data, txn, true);
304 }

References LogicalDecodingContext::context, data, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_begin(), and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_change()

static void pg_decode_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 600 of file test_decoding.c.

602 {
604  TestDecodingTxnData *txndata;
605  Form_pg_class class_form;
606  TupleDesc tupdesc;
607  MemoryContext old;
608 
610  txndata = txn->output_plugin_private;
611 
612  /* output BEGIN if we haven't yet */
613  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
614  {
615  pg_output_begin(ctx, data, txn, false);
616  }
617  txndata->xact_wrote_changes = true;
618 
619  class_form = RelationGetForm(relation);
620  tupdesc = RelationGetDescr(relation);
621 
622  /* Avoid leaking memory by using and resetting our own context */
623  old = MemoryContextSwitchTo(data->context);
624 
625  OutputPluginPrepareWrite(ctx, true);
626 
627  appendStringInfoString(ctx->out, "table ");
630  class_form->relrewrite ?
631  get_rel_name(class_form->relrewrite) :
632  NameStr(class_form->relname)));
633  appendStringInfoChar(ctx->out, ':');
634 
635  switch (change->action)
636  {
638  appendStringInfoString(ctx->out, " INSERT:");
639  if (change->data.tp.newtuple == NULL)
640  appendStringInfoString(ctx->out, " (no-tuple-data)");
641  else
642  tuple_to_stringinfo(ctx->out, tupdesc,
643  change->data.tp.newtuple,
644  false);
645  break;
647  appendStringInfoString(ctx->out, " UPDATE:");
648  if (change->data.tp.oldtuple != NULL)
649  {
650  appendStringInfoString(ctx->out, " old-key:");
651  tuple_to_stringinfo(ctx->out, tupdesc,
652  change->data.tp.oldtuple,
653  true);
654  appendStringInfoString(ctx->out, " new-tuple:");
655  }
656 
657  if (change->data.tp.newtuple == NULL)
658  appendStringInfoString(ctx->out, " (no-tuple-data)");
659  else
660  tuple_to_stringinfo(ctx->out, tupdesc,
661  change->data.tp.newtuple,
662  false);
663  break;
665  appendStringInfoString(ctx->out, " DELETE:");
666 
667  /* if there was no PK, we only know that a delete happened */
668  if (change->data.tp.oldtuple == NULL)
669  appendStringInfoString(ctx->out, " (no-tuple-data)");
670  /* In DELETE, only the replica identity is present; display that */
671  else
672  tuple_to_stringinfo(ctx->out, tupdesc,
673  change->data.tp.oldtuple,
674  true);
675  break;
676  default:
677  Assert(false);
678  }
679 
681  MemoryContextReset(data->context);
682 
683  OutputPluginWrite(ctx, true);
684 }
#define NameStr(name)
Definition: c.h:733
Assert(fmt[strlen(fmt) - 1] !='\n')
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:714
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:701
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3344
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1930
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1906
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:371
FormData_pg_class * Form_pg_class
Definition: pg_class.h:153
MemoryContextSwitchTo(old_ctx)
#define RelationGetForm(relation)
Definition: rel.h:501
#define RelationGetRelid(relation)
Definition: rel.h:507
#define RelationGetDescr(relation)
Definition: rel.h:533
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:46
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:48
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:47
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:12437
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:182
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:194
StringInfo out
Definition: logical.h:71
ReorderBufferChangeType action
Definition: reorderbuffer.h:75
union ReorderBufferChange::@104 data
struct ReorderBufferChange::@104::@105 tp
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)

References ReorderBufferChange::action, appendStringInfoChar(), appendStringInfoString(), Assert(), ReorderBufferChange::data, data, get_namespace_name(), get_rel_name(), get_rel_namespace(), MemoryContextReset(), MemoryContextSwitchTo(), NameStr, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), RelationGetDescr, RelationGetForm, RelationGetRelid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, tuple_to_stringinfo(), and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_commit_prepared_txn()

static void pg_decode_commit_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 398 of file test_decoding.c.

400 {
402 
403  OutputPluginPrepareWrite(ctx, true);
404 
405  appendStringInfo(ctx->out, "COMMIT PREPARED %s",
406  quote_literal_cstr(txn->gid));
407 
408  if (data->include_xids)
409  appendStringInfo(ctx->out, ", txid %u", txn->xid);
410 
411  if (data->include_timestamp)
412  appendStringInfo(ctx->out, " (at %s)",
414 
415  OutputPluginWrite(ctx, true);
416 }
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1853
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
TimestampTz commit_time
TransactionId xid
union ReorderBufferTXN::@110 xact_time

References appendStringInfo(), ReorderBufferTXN::commit_time, data, ReorderBufferTXN::gid, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), quote_literal_cstr(), timestamptz_to_str(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_commit_txn()

static void pg_decode_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 319 of file test_decoding.c.

321 {
324  bool xact_wrote_changes = txndata->xact_wrote_changes;
325 
326  pfree(txndata);
327  txn->output_plugin_private = NULL;
328 
329  if (data->skip_empty_xacts && !xact_wrote_changes)
330  return;
331 
332  OutputPluginPrepareWrite(ctx, true);
333  if (data->include_xids)
334  appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
335  else
336  appendStringInfoString(ctx->out, "COMMIT");
337 
338  if (data->include_timestamp)
339  appendStringInfo(ctx->out, " (at %s)",
341 
342  OutputPluginWrite(ctx, true);
343 }
void pfree(void *pointer)
Definition: mcxt.c:1508

References appendStringInfo(), appendStringInfoString(), ReorderBufferTXN::commit_time, data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), timestamptz_to_str(), ReorderBufferTXN::xact_time, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_filter()

static bool pg_decode_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 460 of file test_decoding.c.

462 {
464 
465  if (data->only_local && origin_id != InvalidRepOriginId)
466  return true;
467  return false;
468 }
#define InvalidRepOriginId
Definition: origin.h:33

References data, InvalidRepOriginId, and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pg_decode_filter_prepare()

static bool pg_decode_filter_prepare ( LogicalDecodingContext ctx,
TransactionId  xid,
const char *  gid 
)
static

Definition at line 450 of file test_decoding.c.

452 {
453  if (strstr(gid, "_nodecode") != NULL)
454  return true;
455 
456  return false;
457 }

Referenced by _PG_output_plugin_init().

◆ pg_decode_message()

static void pg_decode_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 742 of file test_decoding.c.

745 {
747  TestDecodingTxnData *txndata;
748 
749  txndata = transactional ? txn->output_plugin_private : NULL;
750 
751  /* output BEGIN if we haven't yet for transactional messages */
752  if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
753  pg_output_begin(ctx, data, txn, false);
754 
755  if (transactional)
756  txndata->xact_wrote_changes = true;
757 
758  OutputPluginPrepareWrite(ctx, true);
759  appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
760  transactional, prefix, sz);
761  appendBinaryStringInfo(ctx->out, message, sz);
762  OutputPluginWrite(ctx, true);
763 }
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:233

References appendBinaryStringInfo(), appendStringInfo(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_prepare_txn()

static void pg_decode_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 368 of file test_decoding.c.

370 {
373 
374  /*
375  * If asked to skip empty transactions, we'll emit PREPARE at the point
376  * where the first operation is received for this transaction.
377  */
378  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
379  return;
380 
381  OutputPluginPrepareWrite(ctx, true);
382 
383  appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
384  quote_literal_cstr(txn->gid));
385 
386  if (data->include_xids)
387  appendStringInfo(ctx->out, ", txid %u", txn->xid);
388 
389  if (data->include_timestamp)
390  appendStringInfo(ctx->out, " (at %s)",
392 
393  OutputPluginWrite(ctx, true);
394 }
TimestampTz prepare_time

References appendStringInfo(), data, ReorderBufferTXN::gid, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), ReorderBufferTXN::prepare_time, quote_literal_cstr(), timestamptz_to_str(), ReorderBufferTXN::xact_time, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_rollback_prepared_txn()

static void pg_decode_rollback_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)
static

Definition at line 420 of file test_decoding.c.

424 {
426 
427  OutputPluginPrepareWrite(ctx, true);
428 
429  appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
430  quote_literal_cstr(txn->gid));
431 
432  if (data->include_xids)
433  appendStringInfo(ctx->out, ", txid %u", txn->xid);
434 
435  if (data->include_timestamp)
436  appendStringInfo(ctx->out, " (at %s)",
438 
439  OutputPluginWrite(ctx, true);
440 }

References appendStringInfo(), ReorderBufferTXN::commit_time, data, ReorderBufferTXN::gid, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), quote_literal_cstr(), timestamptz_to_str(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_shutdown()

static void pg_decode_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 277 of file test_decoding.c.

278 {
280 
281  /* cleanup our own resources via memory context reset */
282  MemoryContextDelete(data->context);
283 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:442

References data, MemoryContextDelete(), and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pg_decode_startup()

static void pg_decode_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 156 of file test_decoding.c.

158 {
159  ListCell *option;
161  bool enable_streaming = false;
162 
163  data = palloc0(sizeof(TestDecodingData));
164  data->context = AllocSetContextCreate(ctx->context,
165  "text conversion context",
167  data->include_xids = true;
168  data->include_timestamp = false;
169  data->skip_empty_xacts = false;
170  data->only_local = false;
171 
173 
175  opt->receive_rewrites = false;
176 
177  foreach(option, ctx->output_plugin_options)
178  {
179  DefElem *elem = lfirst(option);
180 
181  Assert(elem->arg == NULL || IsA(elem->arg, String));
182 
183  if (strcmp(elem->defname, "include-xids") == 0)
184  {
185  /* if option does not provide a value, it means its value is true */
186  if (elem->arg == NULL)
187  data->include_xids = true;
188  else if (!parse_bool(strVal(elem->arg), &data->include_xids))
189  ereport(ERROR,
190  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
191  errmsg("could not parse value \"%s\" for parameter \"%s\"",
192  strVal(elem->arg), elem->defname)));
193  }
194  else if (strcmp(elem->defname, "include-timestamp") == 0)
195  {
196  if (elem->arg == NULL)
197  data->include_timestamp = true;
198  else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
199  ereport(ERROR,
200  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
201  errmsg("could not parse value \"%s\" for parameter \"%s\"",
202  strVal(elem->arg), elem->defname)));
203  }
204  else if (strcmp(elem->defname, "force-binary") == 0)
205  {
206  bool force_binary;
207 
208  if (elem->arg == NULL)
209  continue;
210  else if (!parse_bool(strVal(elem->arg), &force_binary))
211  ereport(ERROR,
212  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
213  errmsg("could not parse value \"%s\" for parameter \"%s\"",
214  strVal(elem->arg), elem->defname)));
215 
216  if (force_binary)
218  }
219  else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
220  {
221 
222  if (elem->arg == NULL)
223  data->skip_empty_xacts = true;
224  else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
225  ereport(ERROR,
226  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
227  errmsg("could not parse value \"%s\" for parameter \"%s\"",
228  strVal(elem->arg), elem->defname)));
229  }
230  else if (strcmp(elem->defname, "only-local") == 0)
231  {
232 
233  if (elem->arg == NULL)
234  data->only_local = true;
235  else if (!parse_bool(strVal(elem->arg), &data->only_local))
236  ereport(ERROR,
237  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
238  errmsg("could not parse value \"%s\" for parameter \"%s\"",
239  strVal(elem->arg), elem->defname)));
240  }
241  else if (strcmp(elem->defname, "include-rewrites") == 0)
242  {
243 
244  if (elem->arg == NULL)
245  continue;
246  else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
247  ereport(ERROR,
248  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
249  errmsg("could not parse value \"%s\" for parameter \"%s\"",
250  strVal(elem->arg), elem->defname)));
251  }
252  else if (strcmp(elem->defname, "stream-changes") == 0)
253  {
254  if (elem->arg == NULL)
255  continue;
256  else if (!parse_bool(strVal(elem->arg), &enable_streaming))
257  ereport(ERROR,
258  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
259  errmsg("could not parse value \"%s\" for parameter \"%s\"",
260  strVal(elem->arg), elem->defname)));
261  }
262  else
263  {
264  ereport(ERROR,
265  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
266  errmsg("option \"%s\" = \"%s\" is unknown",
267  elem->defname,
268  elem->arg ? strVal(elem->arg) : "(null)")));
269  }
270  }
271 
272  ctx->streaming &= enable_streaming;
273 }
bool parse_bool(const char *value, bool *result)
Definition: bool.c:30
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
void * palloc0(Size size)
Definition: mcxt.c:1334
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define IsA(nodeptr, _type_)
Definition: nodes.h:158
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
@ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
Definition: output_plugin.h:20
#define lfirst(lc)
Definition: pg_list.h:172
char * defname
Definition: parsenodes.h:811
Node * arg
Definition: parsenodes.h:812
List * output_plugin_options
Definition: logical.h:59
OutputPluginOutputType output_type
Definition: output_plugin.h:28
Definition: value.h:64
#define strVal(v)
Definition: value.h:82

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, DefElem::arg, Assert(), LogicalDecodingContext::context, data, DefElem::defname, ereport, errcode(), errmsg(), ERROR, IsA, lfirst, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OUTPUT_PLUGIN_TEXTUAL_OUTPUT, OutputPluginOptions::output_type, palloc0(), parse_bool(), OutputPluginOptions::receive_rewrites, LogicalDecodingContext::streaming, and strVal.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_abort()

static void pg_decode_stream_abort ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 819 of file test_decoding.c.

822 {
824 
825  /*
826  * stream abort can be sent for an individual subtransaction but we
827  * maintain the output_plugin_private only under the toptxn so if this is
828  * not the toptxn then fetch the toptxn.
829  */
830  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
831  TestDecodingTxnData *txndata = toptxn->output_plugin_private;
832  bool xact_wrote_changes = txndata->xact_wrote_changes;
833 
834  if (rbtxn_is_toptxn(txn))
835  {
836  Assert(txn->output_plugin_private != NULL);
837  pfree(txndata);
838  txn->output_plugin_private = NULL;
839  }
840 
841  if (data->skip_empty_xacts && !xact_wrote_changes)
842  return;
843 
844  OutputPluginPrepareWrite(ctx, true);
845  if (data->include_xids)
846  appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
847  else
848  appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
849  OutputPluginWrite(ctx, true);
850 }
#define rbtxn_is_toptxn(txn)
#define rbtxn_get_toptxn(txn)

References appendStringInfo(), appendStringInfoString(), Assert(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), rbtxn_get_toptxn, rbtxn_is_toptxn, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_change()

static void pg_decode_stream_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 914 of file test_decoding.c.

918 {
921 
922  /* output stream start if we haven't yet */
923  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
924  {
925  pg_output_stream_start(ctx, data, txn, false);
926  }
927  txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
928 
929  OutputPluginPrepareWrite(ctx, true);
930  if (data->include_xids)
931  appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
932  else
933  appendStringInfoString(ctx->out, "streaming change for transaction");
934  OutputPluginWrite(ctx, true);
935 }
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_stream_start(), TestDecodingTxnData::stream_wrote_changes, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_commit()

static void pg_decode_stream_commit ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 880 of file test_decoding.c.

883 {
886  bool xact_wrote_changes = txndata->xact_wrote_changes;
887 
888  pfree(txndata);
889  txn->output_plugin_private = NULL;
890 
891  if (data->skip_empty_xacts && !xact_wrote_changes)
892  return;
893 
894  OutputPluginPrepareWrite(ctx, true);
895 
896  if (data->include_xids)
897  appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
898  else
899  appendStringInfoString(ctx->out, "committing streamed transaction");
900 
901  if (data->include_timestamp)
902  appendStringInfo(ctx->out, " (at %s)",
904 
905  OutputPluginWrite(ctx, true);
906 }

References appendStringInfo(), appendStringInfoString(), ReorderBufferTXN::commit_time, data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), timestamptz_to_str(), ReorderBufferTXN::xact_time, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_message()

static void pg_decode_stream_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 943 of file test_decoding.c.

946 {
947  /* Output stream start if we haven't yet for transactional messages. */
948  if (transactional)
949  {
952 
953  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
954  {
955  pg_output_stream_start(ctx, data, txn, false);
956  }
957  txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
958  }
959 
960  OutputPluginPrepareWrite(ctx, true);
961 
962  if (transactional)
963  {
964  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
965  transactional, prefix, sz);
966  }
967  else
968  {
969  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
970  transactional, prefix, sz);
971  appendBinaryStringInfo(ctx->out, message, sz);
972  }
973 
974  OutputPluginWrite(ctx, true);
975 }

References appendBinaryStringInfo(), appendStringInfo(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_stream_start(), TestDecodingTxnData::stream_wrote_changes, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_prepare()

static void pg_decode_stream_prepare ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 853 of file test_decoding.c.

856 {
859 
860  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
861  return;
862 
863  OutputPluginPrepareWrite(ctx, true);
864 
865  if (data->include_xids)
866  appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
867  quote_literal_cstr(txn->gid), txn->xid);
868  else
869  appendStringInfo(ctx->out, "preparing streamed transaction %s",
870  quote_literal_cstr(txn->gid));
871 
872  if (data->include_timestamp)
873  appendStringInfo(ctx->out, " (at %s)",
875 
876  OutputPluginWrite(ctx, true);
877 }

References appendStringInfo(), data, ReorderBufferTXN::gid, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), ReorderBufferTXN::prepare_time, quote_literal_cstr(), timestamptz_to_str(), ReorderBufferTXN::xact_time, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_start()

static void pg_decode_stream_start ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 766 of file test_decoding.c.

768 {
771 
772  /*
773  * Allocate the txn plugin data for the first stream in the transaction.
774  */
775  if (txndata == NULL)
776  {
777  txndata =
779  txndata->xact_wrote_changes = false;
780  txn->output_plugin_private = txndata;
781  }
782 
783  txndata->stream_wrote_changes = false;
784  if (data->skip_empty_xacts)
785  return;
786  pg_output_stream_start(ctx, data, txn, true);
787 }

References LogicalDecodingContext::context, data, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_stream_start(), TestDecodingTxnData::stream_wrote_changes, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_stop()

static void pg_decode_stream_stop ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 801 of file test_decoding.c.

803 {
806 
807  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
808  return;
809 
810  OutputPluginPrepareWrite(ctx, true);
811  if (data->include_xids)
812  appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
813  else
814  appendStringInfoString(ctx->out, "closing a streamed block for transaction");
815  OutputPluginWrite(ctx, true);
816 }

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), TestDecodingTxnData::stream_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_truncate()

static void pg_decode_stream_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 982 of file test_decoding.c.

985 {
988 
989  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
990  {
991  pg_output_stream_start(ctx, data, txn, false);
992  }
993  txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
994 
995  OutputPluginPrepareWrite(ctx, true);
996  if (data->include_xids)
997  appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
998  else
999  appendStringInfoString(ctx->out, "streaming truncate for transaction");
1000  OutputPluginWrite(ctx, true);
1001 }

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_stream_start(), TestDecodingTxnData::stream_wrote_changes, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_truncate()

static void pg_decode_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 687 of file test_decoding.c.

689 {
691  TestDecodingTxnData *txndata;
692  MemoryContext old;
693  int i;
694 
696  txndata = txn->output_plugin_private;
697 
698  /* output BEGIN if we haven't yet */
699  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
700  {
701  pg_output_begin(ctx, data, txn, false);
702  }
703  txndata->xact_wrote_changes = true;
704 
705  /* Avoid leaking memory by using and resetting our own context */
706  old = MemoryContextSwitchTo(data->context);
707 
708  OutputPluginPrepareWrite(ctx, true);
709 
710  appendStringInfoString(ctx->out, "table ");
711 
712  for (i = 0; i < nrelations; i++)
713  {
714  if (i > 0)
715  appendStringInfoString(ctx->out, ", ");
716 
718  quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
719  NameStr(relations[i]->rd_rel->relname)));
720  }
721 
722  appendStringInfoString(ctx->out, ": TRUNCATE:");
723 
724  if (change->data.truncate.restart_seqs
725  || change->data.truncate.cascade)
726  {
727  if (change->data.truncate.restart_seqs)
728  appendStringInfoString(ctx->out, " restart_seqs");
729  if (change->data.truncate.cascade)
730  appendStringInfoString(ctx->out, " cascade");
731  }
732  else
733  appendStringInfoString(ctx->out, " (no-flags)");
734 
736  MemoryContextReset(data->context);
737 
738  OutputPluginWrite(ctx, true);
739 }
int i
Definition: isn.c:73
struct ReorderBufferChange::@104::@106 truncate

References appendStringInfoString(), ReorderBufferChange::data, data, get_namespace_name(), i, MemoryContextReset(), MemoryContextSwitchTo(), NameStr, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), ReorderBufferChange::truncate, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_output_begin()

static void pg_output_begin ( LogicalDecodingContext ctx,
TestDecodingData data,
ReorderBufferTXN txn,
bool  last_write 
)
static

Definition at line 307 of file test_decoding.c.

308 {
309  OutputPluginPrepareWrite(ctx, last_write);
310  if (data->include_xids)
311  appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
312  else
313  appendStringInfoString(ctx->out, "BEGIN");
314  OutputPluginWrite(ctx, last_write);
315 }

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by pg_decode_begin_prepare_txn(), pg_decode_begin_txn(), pg_decode_change(), pg_decode_message(), and pg_decode_truncate().

◆ pg_output_stream_start()

static void pg_output_stream_start ( LogicalDecodingContext ctx,
TestDecodingData data,
ReorderBufferTXN txn,
bool  last_write 
)
static

Definition at line 790 of file test_decoding.c.

791 {
792  OutputPluginPrepareWrite(ctx, last_write);
793  if (data->include_xids)
794  appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
795  else
796  appendStringInfoString(ctx->out, "opening a streamed block for transaction");
797  OutputPluginWrite(ctx, last_write);
798 }

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by pg_decode_stream_change(), pg_decode_stream_message(), pg_decode_stream_start(), and pg_decode_stream_truncate().

◆ print_literal()

static void print_literal ( StringInfo  s,
Oid  typid,
char *  outputstr 
)
static

Definition at line 478 of file test_decoding.c.

479 {
480  const char *valptr;
481 
482  switch (typid)
483  {
484  case INT2OID:
485  case INT4OID:
486  case INT8OID:
487  case OIDOID:
488  case FLOAT4OID:
489  case FLOAT8OID:
490  case NUMERICOID:
491  /* NB: We don't care about Inf, NaN et al. */
492  appendStringInfoString(s, outputstr);
493  break;
494 
495  case BITOID:
496  case VARBITOID:
497  appendStringInfo(s, "B'%s'", outputstr);
498  break;
499 
500  case BOOLOID:
501  if (strcmp(outputstr, "t") == 0)
502  appendStringInfoString(s, "true");
503  else
504  appendStringInfoString(s, "false");
505  break;
506 
507  default:
508  appendStringInfoChar(s, '\'');
509  for (valptr = outputstr; *valptr; valptr++)
510  {
511  char ch = *valptr;
512 
513  if (SQL_STR_DOUBLE(ch, false))
514  appendStringInfoChar(s, ch);
515  appendStringInfoChar(s, ch);
516  }
517  appendStringInfoChar(s, '\'');
518  break;
519  }
520 }
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1150

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), and SQL_STR_DOUBLE.

Referenced by tuple_to_stringinfo().

◆ tuple_to_stringinfo()

static void tuple_to_stringinfo ( StringInfo  s,
TupleDesc  tupdesc,
HeapTuple  tuple,
bool  skip_nulls 
)
static

Definition at line 524 of file test_decoding.c.

525 {
526  int natt;
527 
528  /* print all columns individually */
529  for (natt = 0; natt < tupdesc->natts; natt++)
530  {
531  Form_pg_attribute attr; /* the attribute itself */
532  Oid typid; /* type of current attribute */
533  Oid typoutput; /* output function */
534  bool typisvarlena;
535  Datum origval; /* possibly toasted Datum */
536  bool isnull; /* column is null? */
537 
538  attr = TupleDescAttr(tupdesc, natt);
539 
540  /*
541  * don't print dropped columns, we can't be sure everything is
542  * available for them
543  */
544  if (attr->attisdropped)
545  continue;
546 
547  /*
548  * Don't print system columns, oid will already have been printed if
549  * present.
550  */
551  if (attr->attnum < 0)
552  continue;
553 
554  typid = attr->atttypid;
555 
556  /* get Datum from tuple */
557  origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
558 
559  if (isnull && skip_nulls)
560  continue;
561 
562  /* print attribute name */
563  appendStringInfoChar(s, ' ');
564  appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
565 
566  /* print attribute type */
567  appendStringInfoChar(s, '[');
569  appendStringInfoChar(s, ']');
570 
571  /* query output function */
572  getTypeOutputInfo(typid,
573  &typoutput, &typisvarlena);
574 
575  /* print separator */
576  appendStringInfoChar(s, ':');
577 
578  /* print data */
579  if (isnull)
580  appendStringInfoString(s, "null");
581  else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
582  appendStringInfoString(s, "unchanged-toast-datum");
583  else if (!typisvarlena)
584  print_literal(s, typid,
585  OidOutputFunctionCall(typoutput, origval));
586  else
587  {
588  Datum val; /* definitely detoasted Datum */
589 
591  print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
592  }
593  }
594 }
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1763
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
char * format_type_be(Oid type_oid)
Definition: format_type.c:343
static Datum heap_getattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:792
long val
Definition: informix.c:670
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2885
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
unsigned int Oid
Definition: postgres_ext.h:31
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:12353
static void print_literal(StringInfo s, Oid typid, char *outputstr)
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290

References appendStringInfoChar(), appendStringInfoString(), format_type_be(), getTypeOutputInfo(), heap_getattr(), NameStr, TupleDescData::natts, OidOutputFunctionCall(), PG_DETOAST_DATUM, PointerGetDatum(), print_literal(), quote_identifier(), TupleDescAttr, val, and VARATT_IS_EXTERNAL_ONDISK.

Referenced by pg_decode_change().

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 25 of file test_decoding.c.