PostgreSQL Source Code  git master
test_decoding.c File Reference
#include "postgres.h"
#include "catalog/pg_type.h"
#include "replication/logical.h"
#include "replication/origin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for test_decoding.c:

Go to the source code of this file.

Data Structures

struct  TestDecodingData
 
struct  TestDecodingTxnData
 

Functions

static void pg_decode_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pg_decode_shutdown (LogicalDecodingContext *ctx)
 
static void pg_decode_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_output_begin (LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 
static void pg_decode_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pg_decode_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static bool pg_decode_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pg_decode_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pg_decode_filter_prepare (LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
 
static void pg_decode_begin_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_decode_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pg_decode_commit_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_rollback_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
static void pg_decode_stream_start (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_output_stream_start (LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 
static void pg_decode_stream_stop (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_decode_stream_abort (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pg_decode_stream_prepare (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pg_decode_stream_commit (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_stream_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pg_decode_stream_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static void pg_decode_stream_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
void _PG_init (void)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void print_literal (StringInfo s, Oid typid, char *outputstr)
 
static void tuple_to_stringinfo (StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
 

Variables

 PG_MODULE_MAGIC
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 121 of file test_decoding.c.

122 {
123  /* other plugins can perform things here */
124 }

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 128 of file test_decoding.c.

129 {
131 
153 }
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:965
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)

References AssertVariableIsOfType, OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::begin_prepare_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::commit_prepared_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::filter_prepare_cb, OutputPluginCallbacks::message_cb, pg_decode_begin_prepare_txn(), pg_decode_begin_txn(), pg_decode_change(), pg_decode_commit_prepared_txn(), pg_decode_commit_txn(), pg_decode_filter(), pg_decode_filter_prepare(), pg_decode_message(), pg_decode_prepare_txn(), pg_decode_rollback_prepared_txn(), pg_decode_shutdown(), pg_decode_startup(), pg_decode_stream_abort(), pg_decode_stream_change(), pg_decode_stream_commit(), pg_decode_stream_message(), pg_decode_stream_prepare(), pg_decode_stream_start(), pg_decode_stream_stop(), pg_decode_stream_truncate(), pg_decode_truncate(), OutputPluginCallbacks::prepare_cb, OutputPluginCallbacks::rollback_prepared_cb, OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_prepare_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

◆ pg_decode_begin_prepare_txn()

static void pg_decode_begin_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 349 of file test_decoding.c.

350 {
352  TestDecodingTxnData *txndata =
354 
355  txndata->xact_wrote_changes = false;
356  txn->output_plugin_private = txndata;
357 
358  /*
359  * If asked to skip empty transactions, we'll emit BEGIN at the point
360  * where the first operation is received for this transaction.
361  */
362  if (data->skip_empty_xacts)
363  return;
364 
365  pg_output_begin(ctx, data, txn, true);
366 }
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1048
const void * data
MemoryContext context
Definition: logical.h:36
void * output_plugin_private
Definition: logical.h:76
void * output_plugin_private
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

References LogicalDecodingContext::context, data, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_begin(), and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_begin_txn()

static void pg_decode_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 289 of file test_decoding.c.

290 {
292  TestDecodingTxnData *txndata =
294 
295  txndata->xact_wrote_changes = false;
296  txn->output_plugin_private = txndata;
297 
298  /*
299  * If asked to skip empty transactions, we'll emit BEGIN at the point
300  * where the first operation is received for this transaction.
301  */
302  if (data->skip_empty_xacts)
303  return;
304 
305  pg_output_begin(ctx, data, txn, true);
306 }

References LogicalDecodingContext::context, data, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_begin(), and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_change()

static void pg_decode_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 602 of file test_decoding.c.

604 {
606  TestDecodingTxnData *txndata;
607  Form_pg_class class_form;
608  TupleDesc tupdesc;
609  MemoryContext old;
610 
612  txndata = txn->output_plugin_private;
613 
614  /* output BEGIN if we haven't yet */
615  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
616  {
617  pg_output_begin(ctx, data, txn, false);
618  }
619  txndata->xact_wrote_changes = true;
620 
621  class_form = RelationGetForm(relation);
622  tupdesc = RelationGetDescr(relation);
623 
624  /* Avoid leaking memory by using and resetting our own context */
625  old = MemoryContextSwitchTo(data->context);
626 
627  OutputPluginPrepareWrite(ctx, true);
628 
629  appendStringInfoString(ctx->out, "table ");
632  class_form->relrewrite ?
633  get_rel_name(class_form->relrewrite) :
634  NameStr(class_form->relname)));
635  appendStringInfoChar(ctx->out, ':');
636 
637  switch (change->action)
638  {
640  appendStringInfoString(ctx->out, " INSERT:");
641  if (change->data.tp.newtuple == NULL)
642  appendStringInfoString(ctx->out, " (no-tuple-data)");
643  else
644  tuple_to_stringinfo(ctx->out, tupdesc,
645  &change->data.tp.newtuple->tuple,
646  false);
647  break;
649  appendStringInfoString(ctx->out, " UPDATE:");
650  if (change->data.tp.oldtuple != NULL)
651  {
652  appendStringInfoString(ctx->out, " old-key:");
653  tuple_to_stringinfo(ctx->out, tupdesc,
654  &change->data.tp.oldtuple->tuple,
655  true);
656  appendStringInfoString(ctx->out, " new-tuple:");
657  }
658 
659  if (change->data.tp.newtuple == NULL)
660  appendStringInfoString(ctx->out, " (no-tuple-data)");
661  else
662  tuple_to_stringinfo(ctx->out, tupdesc,
663  &change->data.tp.newtuple->tuple,
664  false);
665  break;
667  appendStringInfoString(ctx->out, " DELETE:");
668 
669  /* if there was no PK, we only know that a delete happened */
670  if (change->data.tp.oldtuple == NULL)
671  appendStringInfoString(ctx->out, " (no-tuple-data)");
672  /* In DELETE, only the replica identity is present; display that */
673  else
674  tuple_to_stringinfo(ctx->out, tupdesc,
675  &change->data.tp.oldtuple->tuple,
676  true);
677  break;
678  default:
679  Assert(false);
680  }
681 
683  MemoryContextReset(data->context);
684 
685  OutputPluginWrite(ctx, true);
686 }
#define NameStr(name)
Definition: c.h:730
Assert(fmt[strlen(fmt) - 1] !='\n')
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:664
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:651
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3331
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1934
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1910
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:314
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
FormData_pg_class * Form_pg_class
Definition: pg_class.h:153
#define RelationGetForm(relation)
Definition: rel.h:495
#define RelationGetRelid(relation)
Definition: rel.h:501
#define RelationGetDescr(relation)
Definition: rel.h:527
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:65
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:67
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:66
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:11613
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
StringInfo out
Definition: logical.h:71
ReorderBufferChangeType action
Definition: reorderbuffer.h:94
struct ReorderBufferChange::@95::@96 tp
union ReorderBufferChange::@95 data
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)

References ReorderBufferChange::action, appendStringInfoChar(), appendStringInfoString(), Assert(), ReorderBufferChange::data, data, get_namespace_name(), get_rel_name(), get_rel_namespace(), MemoryContextReset(), MemoryContextSwitchTo(), NameStr, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), RelationGetDescr, RelationGetForm, RelationGetRelid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, tuple_to_stringinfo(), and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_commit_prepared_txn()

static void pg_decode_commit_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 400 of file test_decoding.c.

402 {
404 
405  OutputPluginPrepareWrite(ctx, true);
406 
407  appendStringInfo(ctx->out, "COMMIT PREPARED %s",
408  quote_literal_cstr(txn->gid));
409 
410  if (data->include_xids)
411  appendStringInfo(ctx->out, ", txid %u", txn->xid);
412 
413  if (data->include_timestamp)
414  appendStringInfo(ctx->out, " (at %s)",
416 
417  OutputPluginWrite(ctx, true);
418 }
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1793
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TimestampTz commit_time
union ReorderBufferTXN::@101 xact_time
TransactionId xid

References appendStringInfo(), ReorderBufferTXN::commit_time, data, ReorderBufferTXN::gid, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), quote_literal_cstr(), timestamptz_to_str(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_commit_txn()

static void pg_decode_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 321 of file test_decoding.c.

323 {
326  bool xact_wrote_changes = txndata->xact_wrote_changes;
327 
328  pfree(txndata);
329  txn->output_plugin_private = NULL;
330 
331  if (data->skip_empty_xacts && !xact_wrote_changes)
332  return;
333 
334  OutputPluginPrepareWrite(ctx, true);
335  if (data->include_xids)
336  appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
337  else
338  appendStringInfoString(ctx->out, "COMMIT");
339 
340  if (data->include_timestamp)
341  appendStringInfo(ctx->out, " (at %s)",
343 
344  OutputPluginWrite(ctx, true);
345 }
void pfree(void *pointer)
Definition: mcxt.c:1436

References appendStringInfo(), appendStringInfoString(), ReorderBufferTXN::commit_time, data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), timestamptz_to_str(), ReorderBufferTXN::xact_time, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_filter()

static bool pg_decode_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 462 of file test_decoding.c.

464 {
466 
467  if (data->only_local && origin_id != InvalidRepOriginId)
468  return true;
469  return false;
470 }
#define InvalidRepOriginId
Definition: origin.h:33

References data, InvalidRepOriginId, and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pg_decode_filter_prepare()

static bool pg_decode_filter_prepare ( LogicalDecodingContext ctx,
TransactionId  xid,
const char *  gid 
)
static

Definition at line 452 of file test_decoding.c.

454 {
455  if (strstr(gid, "_nodecode") != NULL)
456  return true;
457 
458  return false;
459 }

Referenced by _PG_output_plugin_init().

◆ pg_decode_message()

static void pg_decode_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 744 of file test_decoding.c.

747 {
748  OutputPluginPrepareWrite(ctx, true);
749  appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
750  transactional, prefix, sz);
751  appendBinaryStringInfo(ctx->out, message, sz);
752  OutputPluginWrite(ctx, true);
753 }
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:227

References appendBinaryStringInfo(), appendStringInfo(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pg_decode_prepare_txn()

static void pg_decode_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 370 of file test_decoding.c.

372 {
375 
376  /*
377  * If asked to skip empty transactions, we'll emit PREPARE at the point
378  * where the first operation is received for this transaction.
379  */
380  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
381  return;
382 
383  OutputPluginPrepareWrite(ctx, true);
384 
385  appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
386  quote_literal_cstr(txn->gid));
387 
388  if (data->include_xids)
389  appendStringInfo(ctx->out, ", txid %u", txn->xid);
390 
391  if (data->include_timestamp)
392  appendStringInfo(ctx->out, " (at %s)",
394 
395  OutputPluginWrite(ctx, true);
396 }
TimestampTz prepare_time

References appendStringInfo(), data, ReorderBufferTXN::gid, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), ReorderBufferTXN::prepare_time, quote_literal_cstr(), timestamptz_to_str(), ReorderBufferTXN::xact_time, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_rollback_prepared_txn()

static void pg_decode_rollback_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)
static

Definition at line 422 of file test_decoding.c.

426 {
428 
429  OutputPluginPrepareWrite(ctx, true);
430 
431  appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
432  quote_literal_cstr(txn->gid));
433 
434  if (data->include_xids)
435  appendStringInfo(ctx->out, ", txid %u", txn->xid);
436 
437  if (data->include_timestamp)
438  appendStringInfo(ctx->out, " (at %s)",
440 
441  OutputPluginWrite(ctx, true);
442 }

References appendStringInfo(), ReorderBufferTXN::commit_time, data, ReorderBufferTXN::gid, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), quote_literal_cstr(), timestamptz_to_str(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_shutdown()

static void pg_decode_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 279 of file test_decoding.c.

280 {
282 
283  /* cleanup our own resources via memory context reset */
284  MemoryContextDelete(data->context);
285 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:387

References data, MemoryContextDelete(), and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pg_decode_startup()

static void pg_decode_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 158 of file test_decoding.c.

160 {
161  ListCell *option;
163  bool enable_streaming = false;
164 
165  data = palloc0(sizeof(TestDecodingData));
166  data->context = AllocSetContextCreate(ctx->context,
167  "text conversion context",
169  data->include_xids = true;
170  data->include_timestamp = false;
171  data->skip_empty_xacts = false;
172  data->only_local = false;
173 
175 
177  opt->receive_rewrites = false;
178 
179  foreach(option, ctx->output_plugin_options)
180  {
181  DefElem *elem = lfirst(option);
182 
183  Assert(elem->arg == NULL || IsA(elem->arg, String));
184 
185  if (strcmp(elem->defname, "include-xids") == 0)
186  {
187  /* if option does not provide a value, it means its value is true */
188  if (elem->arg == NULL)
189  data->include_xids = true;
190  else if (!parse_bool(strVal(elem->arg), &data->include_xids))
191  ereport(ERROR,
192  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
193  errmsg("could not parse value \"%s\" for parameter \"%s\"",
194  strVal(elem->arg), elem->defname)));
195  }
196  else if (strcmp(elem->defname, "include-timestamp") == 0)
197  {
198  if (elem->arg == NULL)
199  data->include_timestamp = true;
200  else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
201  ereport(ERROR,
202  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
203  errmsg("could not parse value \"%s\" for parameter \"%s\"",
204  strVal(elem->arg), elem->defname)));
205  }
206  else if (strcmp(elem->defname, "force-binary") == 0)
207  {
208  bool force_binary;
209 
210  if (elem->arg == NULL)
211  continue;
212  else if (!parse_bool(strVal(elem->arg), &force_binary))
213  ereport(ERROR,
214  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
215  errmsg("could not parse value \"%s\" for parameter \"%s\"",
216  strVal(elem->arg), elem->defname)));
217 
218  if (force_binary)
220  }
221  else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
222  {
223 
224  if (elem->arg == NULL)
225  data->skip_empty_xacts = true;
226  else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
227  ereport(ERROR,
228  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
229  errmsg("could not parse value \"%s\" for parameter \"%s\"",
230  strVal(elem->arg), elem->defname)));
231  }
232  else if (strcmp(elem->defname, "only-local") == 0)
233  {
234 
235  if (elem->arg == NULL)
236  data->only_local = true;
237  else if (!parse_bool(strVal(elem->arg), &data->only_local))
238  ereport(ERROR,
239  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
240  errmsg("could not parse value \"%s\" for parameter \"%s\"",
241  strVal(elem->arg), elem->defname)));
242  }
243  else if (strcmp(elem->defname, "include-rewrites") == 0)
244  {
245 
246  if (elem->arg == NULL)
247  continue;
248  else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
249  ereport(ERROR,
250  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
251  errmsg("could not parse value \"%s\" for parameter \"%s\"",
252  strVal(elem->arg), elem->defname)));
253  }
254  else if (strcmp(elem->defname, "stream-changes") == 0)
255  {
256  if (elem->arg == NULL)
257  continue;
258  else if (!parse_bool(strVal(elem->arg), &enable_streaming))
259  ereport(ERROR,
260  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
261  errmsg("could not parse value \"%s\" for parameter \"%s\"",
262  strVal(elem->arg), elem->defname)));
263  }
264  else
265  {
266  ereport(ERROR,
267  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
268  errmsg("option \"%s\" = \"%s\" is unknown",
269  elem->defname,
270  elem->arg ? strVal(elem->arg) : "(null)")));
271  }
272  }
273 
274  ctx->streaming &= enable_streaming;
275 }
bool parse_bool(const char *value, bool *result)
Definition: bool.c:30
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
void * palloc0(Size size)
Definition: mcxt.c:1241
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define IsA(nodeptr, _type_)
Definition: nodes.h:179
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
@ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
Definition: output_plugin.h:20
#define lfirst(lc)
Definition: pg_list.h:172
char * defname
Definition: parsenodes.h:810
Node * arg
Definition: parsenodes.h:811
List * output_plugin_options
Definition: logical.h:59
OutputPluginOutputType output_type
Definition: output_plugin.h:28
Definition: value.h:64
#define strVal(v)
Definition: value.h:82

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, DefElem::arg, Assert(), LogicalDecodingContext::context, data, DefElem::defname, ereport, errcode(), errmsg(), ERROR, IsA, lfirst, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OUTPUT_PLUGIN_TEXTUAL_OUTPUT, OutputPluginOptions::output_type, palloc0(), parse_bool(), OutputPluginOptions::receive_rewrites, LogicalDecodingContext::streaming, and strVal.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_abort()

static void pg_decode_stream_abort ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 809 of file test_decoding.c.

812 {
814 
815  /*
816  * stream abort can be sent for an individual subtransaction but we
817  * maintain the output_plugin_private only under the toptxn so if this is
818  * not the toptxn then fetch the toptxn.
819  */
820  ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
821  TestDecodingTxnData *txndata = toptxn->output_plugin_private;
822  bool xact_wrote_changes = txndata->xact_wrote_changes;
823 
824  if (txn->toptxn == NULL)
825  {
826  Assert(txn->output_plugin_private != NULL);
827  pfree(txndata);
828  txn->output_plugin_private = NULL;
829  }
830 
831  if (data->skip_empty_xacts && !xact_wrote_changes)
832  return;
833 
834  OutputPluginPrepareWrite(ctx, true);
835  if (data->include_xids)
836  appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
837  else
838  appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
839  OutputPluginWrite(ctx, true);
840 }
struct ReorderBufferTXN * toptxn

References appendStringInfo(), appendStringInfoString(), Assert(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), ReorderBufferTXN::toptxn, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_change()

static void pg_decode_stream_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 904 of file test_decoding.c.

908 {
911 
912  /* output stream start if we haven't yet */
913  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
914  {
915  pg_output_stream_start(ctx, data, txn, false);
916  }
917  txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
918 
919  OutputPluginPrepareWrite(ctx, true);
920  if (data->include_xids)
921  appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
922  else
923  appendStringInfoString(ctx->out, "streaming change for transaction");
924  OutputPluginWrite(ctx, true);
925 }
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_stream_start(), TestDecodingTxnData::stream_wrote_changes, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_commit()

static void pg_decode_stream_commit ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 870 of file test_decoding.c.

873 {
876  bool xact_wrote_changes = txndata->xact_wrote_changes;
877 
878  pfree(txndata);
879  txn->output_plugin_private = NULL;
880 
881  if (data->skip_empty_xacts && !xact_wrote_changes)
882  return;
883 
884  OutputPluginPrepareWrite(ctx, true);
885 
886  if (data->include_xids)
887  appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
888  else
889  appendStringInfoString(ctx->out, "committing streamed transaction");
890 
891  if (data->include_timestamp)
892  appendStringInfo(ctx->out, " (at %s)",
894 
895  OutputPluginWrite(ctx, true);
896 }

References appendStringInfo(), appendStringInfoString(), ReorderBufferTXN::commit_time, data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), timestamptz_to_str(), ReorderBufferTXN::xact_time, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_message()

static void pg_decode_stream_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 933 of file test_decoding.c.

936 {
937  OutputPluginPrepareWrite(ctx, true);
938 
939  if (transactional)
940  {
941  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
942  transactional, prefix, sz);
943  }
944  else
945  {
946  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
947  transactional, prefix, sz);
948  appendBinaryStringInfo(ctx->out, message, sz);
949  }
950 
951  OutputPluginWrite(ctx, true);
952 }

References appendBinaryStringInfo(), appendStringInfo(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_prepare()

static void pg_decode_stream_prepare ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 843 of file test_decoding.c.

846 {
849 
850  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
851  return;
852 
853  OutputPluginPrepareWrite(ctx, true);
854 
855  if (data->include_xids)
856  appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
857  quote_literal_cstr(txn->gid), txn->xid);
858  else
859  appendStringInfo(ctx->out, "preparing streamed transaction %s",
860  quote_literal_cstr(txn->gid));
861 
862  if (data->include_timestamp)
863  appendStringInfo(ctx->out, " (at %s)",
865 
866  OutputPluginWrite(ctx, true);
867 }

References appendStringInfo(), data, ReorderBufferTXN::gid, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), ReorderBufferTXN::prepare_time, quote_literal_cstr(), timestamptz_to_str(), ReorderBufferTXN::xact_time, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_start()

static void pg_decode_stream_start ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 756 of file test_decoding.c.

758 {
761 
762  /*
763  * Allocate the txn plugin data for the first stream in the transaction.
764  */
765  if (txndata == NULL)
766  {
767  txndata =
769  txndata->xact_wrote_changes = false;
770  txn->output_plugin_private = txndata;
771  }
772 
773  txndata->stream_wrote_changes = false;
774  if (data->skip_empty_xacts)
775  return;
776  pg_output_stream_start(ctx, data, txn, true);
777 }

References LogicalDecodingContext::context, data, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_stream_start(), TestDecodingTxnData::stream_wrote_changes, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_stop()

static void pg_decode_stream_stop ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 791 of file test_decoding.c.

793 {
796 
797  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
798  return;
799 
800  OutputPluginPrepareWrite(ctx, true);
801  if (data->include_xids)
802  appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
803  else
804  appendStringInfoString(ctx->out, "closing a streamed block for transaction");
805  OutputPluginWrite(ctx, true);
806 }

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), TestDecodingTxnData::stream_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_stream_truncate()

static void pg_decode_stream_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 959 of file test_decoding.c.

962 {
965 
966  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
967  {
968  pg_output_stream_start(ctx, data, txn, false);
969  }
970  txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
971 
972  OutputPluginPrepareWrite(ctx, true);
973  if (data->include_xids)
974  appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
975  else
976  appendStringInfoString(ctx->out, "streaming truncate for transaction");
977  OutputPluginWrite(ctx, true);
978 }

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_stream_start(), TestDecodingTxnData::stream_wrote_changes, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pg_decode_truncate()

static void pg_decode_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 689 of file test_decoding.c.

691 {
693  TestDecodingTxnData *txndata;
694  MemoryContext old;
695  int i;
696 
698  txndata = txn->output_plugin_private;
699 
700  /* output BEGIN if we haven't yet */
701  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
702  {
703  pg_output_begin(ctx, data, txn, false);
704  }
705  txndata->xact_wrote_changes = true;
706 
707  /* Avoid leaking memory by using and resetting our own context */
708  old = MemoryContextSwitchTo(data->context);
709 
710  OutputPluginPrepareWrite(ctx, true);
711 
712  appendStringInfoString(ctx->out, "table ");
713 
714  for (i = 0; i < nrelations; i++)
715  {
716  if (i > 0)
717  appendStringInfoString(ctx->out, ", ");
718 
720  quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
721  NameStr(relations[i]->rd_rel->relname)));
722  }
723 
724  appendStringInfoString(ctx->out, ": TRUNCATE:");
725 
726  if (change->data.truncate.restart_seqs
727  || change->data.truncate.cascade)
728  {
729  if (change->data.truncate.restart_seqs)
730  appendStringInfoString(ctx->out, " restart_seqs");
731  if (change->data.truncate.cascade)
732  appendStringInfoString(ctx->out, " cascade");
733  }
734  else
735  appendStringInfoString(ctx->out, " (no-flags)");
736 
738  MemoryContextReset(data->context);
739 
740  OutputPluginWrite(ctx, true);
741 }
int i
Definition: isn.c:73
struct ReorderBufferChange::@95::@97 truncate

References appendStringInfoString(), ReorderBufferChange::data, data, get_namespace_name(), i, MemoryContextReset(), MemoryContextSwitchTo(), NameStr, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), ReorderBufferChange::truncate, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

◆ pg_output_begin()

static void pg_output_begin ( LogicalDecodingContext ctx,
TestDecodingData data,
ReorderBufferTXN txn,
bool  last_write 
)
static

Definition at line 309 of file test_decoding.c.

310 {
311  OutputPluginPrepareWrite(ctx, last_write);
312  if (data->include_xids)
313  appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
314  else
315  appendStringInfoString(ctx->out, "BEGIN");
316  OutputPluginWrite(ctx, last_write);
317 }

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by pg_decode_begin_prepare_txn(), pg_decode_begin_txn(), pg_decode_change(), and pg_decode_truncate().

◆ pg_output_stream_start()

static void pg_output_stream_start ( LogicalDecodingContext ctx,
TestDecodingData data,
ReorderBufferTXN txn,
bool  last_write 
)
static

Definition at line 780 of file test_decoding.c.

781 {
782  OutputPluginPrepareWrite(ctx, last_write);
783  if (data->include_xids)
784  appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
785  else
786  appendStringInfoString(ctx->out, "opening a streamed block for transaction");
787  OutputPluginWrite(ctx, last_write);
788 }

References appendStringInfo(), appendStringInfoString(), data, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by pg_decode_stream_change(), pg_decode_stream_start(), and pg_decode_stream_truncate().

◆ print_literal()

static void print_literal ( StringInfo  s,
Oid  typid,
char *  outputstr 
)
static

Definition at line 480 of file test_decoding.c.

481 {
482  const char *valptr;
483 
484  switch (typid)
485  {
486  case INT2OID:
487  case INT4OID:
488  case INT8OID:
489  case OIDOID:
490  case FLOAT4OID:
491  case FLOAT8OID:
492  case NUMERICOID:
493  /* NB: We don't care about Inf, NaN et al. */
494  appendStringInfoString(s, outputstr);
495  break;
496 
497  case BITOID:
498  case VARBITOID:
499  appendStringInfo(s, "B'%s'", outputstr);
500  break;
501 
502  case BOOLOID:
503  if (strcmp(outputstr, "t") == 0)
504  appendStringInfoString(s, "true");
505  else
506  appendStringInfoString(s, "false");
507  break;
508 
509  default:
510  appendStringInfoChar(s, '\'');
511  for (valptr = outputstr; *valptr; valptr++)
512  {
513  char ch = *valptr;
514 
515  if (SQL_STR_DOUBLE(ch, false))
516  appendStringInfoChar(s, ch);
517  appendStringInfoChar(s, ch);
518  }
519  appendStringInfoChar(s, '\'');
520  break;
521  }
522 }
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1153

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), and SQL_STR_DOUBLE.

Referenced by tuple_to_stringinfo().

◆ tuple_to_stringinfo()

static void tuple_to_stringinfo ( StringInfo  s,
TupleDesc  tupdesc,
HeapTuple  tuple,
bool  skip_nulls 
)
static

Definition at line 526 of file test_decoding.c.

527 {
528  int natt;
529 
530  /* print all columns individually */
531  for (natt = 0; natt < tupdesc->natts; natt++)
532  {
533  Form_pg_attribute attr; /* the attribute itself */
534  Oid typid; /* type of current attribute */
535  Oid typoutput; /* output function */
536  bool typisvarlena;
537  Datum origval; /* possibly toasted Datum */
538  bool isnull; /* column is null? */
539 
540  attr = TupleDescAttr(tupdesc, natt);
541 
542  /*
543  * don't print dropped columns, we can't be sure everything is
544  * available for them
545  */
546  if (attr->attisdropped)
547  continue;
548 
549  /*
550  * Don't print system columns, oid will already have been printed if
551  * present.
552  */
553  if (attr->attnum < 0)
554  continue;
555 
556  typid = attr->atttypid;
557 
558  /* get Datum from tuple */
559  origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
560 
561  if (isnull && skip_nulls)
562  continue;
563 
564  /* print attribute name */
565  appendStringInfoChar(s, ' ');
566  appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
567 
568  /* print attribute type */
569  appendStringInfoChar(s, '[');
571  appendStringInfoChar(s, ']');
572 
573  /* query output function */
574  getTypeOutputInfo(typid,
575  &typoutput, &typisvarlena);
576 
577  /* print separator */
578  appendStringInfoChar(s, ':');
579 
580  /* print data */
581  if (isnull)
582  appendStringInfoString(s, "null");
583  else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
584  appendStringInfoString(s, "unchanged-toast-datum");
585  else if (!typisvarlena)
586  print_literal(s, typid,
587  OidOutputFunctionCall(typoutput, origval));
588  else
589  {
590  Datum val; /* definitely detoasted Datum */
591 
593  print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
594  }
595  }
596 }
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1750
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
static Datum heap_getattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:792
long val
Definition: informix.c:664
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2865
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
unsigned int Oid
Definition: postgres_ext.h:31
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:11529
static void print_literal(StringInfo s, Oid typid, char *outputstr)
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290

References appendStringInfoChar(), appendStringInfoString(), format_type_be(), getTypeOutputInfo(), heap_getattr(), NameStr, TupleDescData::natts, OidOutputFunctionCall(), PG_DETOAST_DATUM, PointerGetDatum(), print_literal(), quote_identifier(), TupleDescAttr, val, and VARATT_IS_EXTERNAL_ONDISK.

Referenced by pg_decode_change().

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 25 of file test_decoding.c.