PostgreSQL Source Code  git master
test_decoding.c File Reference
#include "postgres.h"
#include "catalog/pg_type.h"
#include "replication/logical.h"
#include "replication/origin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for test_decoding.c:

Go to the source code of this file.

Data Structures

struct  TestDecodingData
 
struct  TestDecodingTxnData
 

Functions

void _PG_init (void)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void pg_decode_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pg_decode_shutdown (LogicalDecodingContext *ctx)
 
static void pg_decode_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_output_begin (LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 
static void pg_decode_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
 
static void pg_decode_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static bool pg_decode_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pg_decode_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pg_decode_filter_prepare (LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
 
static void pg_decode_begin_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_decode_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pg_decode_commit_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_rollback_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
static void pg_decode_stream_start (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_output_stream_start (LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 
static void pg_decode_stream_stop (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_decode_stream_abort (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pg_decode_stream_prepare (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pg_decode_stream_commit (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_stream_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pg_decode_stream_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static void pg_decode_stream_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static void print_literal (StringInfo s, Oid typid, char *outputstr)
 
static void tuple_to_stringinfo (StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
 

Variables

 PG_MODULE_MAGIC
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 125 of file test_decoding.c.

126 {
127  /* other plugins can perform things here */
128 }

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 132 of file test_decoding.c.

References AssertVariableIsOfType, OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::begin_prepare_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::commit_prepared_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::filter_prepare_cb, OutputPluginCallbacks::message_cb, pg_decode_begin_prepare_txn(), pg_decode_begin_txn(), pg_decode_change(), pg_decode_commit_prepared_txn(), pg_decode_commit_txn(), pg_decode_filter(), pg_decode_filter_prepare(), pg_decode_message(), pg_decode_prepare_txn(), pg_decode_rollback_prepared_txn(), pg_decode_shutdown(), pg_decode_startup(), pg_decode_stream_abort(), pg_decode_stream_change(), pg_decode_stream_commit(), pg_decode_stream_message(), pg_decode_stream_prepare(), pg_decode_stream_start(), pg_decode_stream_stop(), pg_decode_stream_truncate(), pg_decode_truncate(), OutputPluginCallbacks::prepare_cb, OutputPluginCallbacks::rollback_prepared_cb, OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_prepare_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

133 {
135 
157 }
LogicalDecodeTruncateCB truncate_cb
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
LogicalDecodeStreamPrepareCB stream_prepare_cb
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamMessageCB stream_message_cb
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
LogicalDecodeStreamAbortCB stream_abort_cb
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
LogicalDecodeBeginPrepareCB begin_prepare_cb
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
LogicalDecodePrepareCB prepare_cb
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
LogicalDecodeCommitCB commit_cb
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
LogicalDecodeChangeCB change_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeStartupCB startup_cb
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
LogicalDecodeStreamStartCB stream_start_cb
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeStreamChangeCB stream_change_cb
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:963

◆ pg_decode_begin_prepare_txn()

static void pg_decode_begin_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 349 of file test_decoding.c.

References LogicalDecodingContext::context, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_begin(), TestDecodingData::skip_empty_xacts, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

350 {
352  TestDecodingTxnData *txndata =
354 
355  txndata->xact_wrote_changes = false;
356  txn->output_plugin_private = txndata;
357 
358  if (data->skip_empty_xacts)
359  return;
360 
361  pg_output_begin(ctx, data, txn, true);
362 }
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: logical.h:35
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
void * output_plugin_private

◆ pg_decode_begin_txn()

static void pg_decode_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 293 of file test_decoding.c.

References LogicalDecodingContext::context, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_begin(), TestDecodingData::skip_empty_xacts, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

294 {
296  TestDecodingTxnData *txndata =
298 
299  txndata->xact_wrote_changes = false;
300  txn->output_plugin_private = txndata;
301 
302  if (data->skip_empty_xacts)
303  return;
304 
305  pg_output_begin(ctx, data, txn, true);
306 }
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: logical.h:35
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
void * output_plugin_private

◆ pg_decode_change()

static void pg_decode_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  rel,
ReorderBufferChange change 
)
static

Definition at line 594 of file test_decoding.c.

References ReorderBufferChange::action, appendStringInfoChar(), appendStringInfoString(), Assert, TestDecodingData::context, ReorderBufferChange::data, get_namespace_name(), get_rel_name(), get_rel_namespace(), MemoryContextReset(), MemoryContextSwitchTo(), NameStr, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), RelationGetDescr, RelationGetForm, RelationGetRelid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, TestDecodingData::skip_empty_xacts, ReorderBufferChange::tp, tuple_to_stringinfo(), and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

596 {
597  TestDecodingData *data;
598  TestDecodingTxnData *txndata;
599  Form_pg_class class_form;
600  TupleDesc tupdesc;
601  MemoryContext old;
602 
603  data = ctx->output_plugin_private;
604  txndata = txn->output_plugin_private;
605 
606  /* output BEGIN if we haven't yet */
607  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
608  {
609  pg_output_begin(ctx, data, txn, false);
610  }
611  txndata->xact_wrote_changes = true;
612 
613  class_form = RelationGetForm(relation);
614  tupdesc = RelationGetDescr(relation);
615 
616  /* Avoid leaking memory by using and resetting our own context */
617  old = MemoryContextSwitchTo(data->context);
618 
619  OutputPluginPrepareWrite(ctx, true);
620 
621  appendStringInfoString(ctx->out, "table ");
624  class_form->relrewrite ?
625  get_rel_name(class_form->relrewrite) :
626  NameStr(class_form->relname)));
627  appendStringInfoChar(ctx->out, ':');
628 
629  switch (change->action)
630  {
632  appendStringInfoString(ctx->out, " INSERT:");
633  if (change->data.tp.newtuple == NULL)
634  appendStringInfoString(ctx->out, " (no-tuple-data)");
635  else
636  tuple_to_stringinfo(ctx->out, tupdesc,
637  &change->data.tp.newtuple->tuple,
638  false);
639  break;
641  appendStringInfoString(ctx->out, " UPDATE:");
642  if (change->data.tp.oldtuple != NULL)
643  {
644  appendStringInfoString(ctx->out, " old-key:");
645  tuple_to_stringinfo(ctx->out, tupdesc,
646  &change->data.tp.oldtuple->tuple,
647  true);
648  appendStringInfoString(ctx->out, " new-tuple:");
649  }
650 
651  if (change->data.tp.newtuple == NULL)
652  appendStringInfoString(ctx->out, " (no-tuple-data)");
653  else
654  tuple_to_stringinfo(ctx->out, tupdesc,
655  &change->data.tp.newtuple->tuple,
656  false);
657  break;
659  appendStringInfoString(ctx->out, " DELETE:");
660 
661  /* if there was no PK, we only know that a delete happened */
662  if (change->data.tp.oldtuple == NULL)
663  appendStringInfoString(ctx->out, " (no-tuple-data)");
664  /* In DELETE, only the replica identity is present; display that */
665  else
666  tuple_to_stringinfo(ctx->out, tupdesc,
667  &change->data.tp.oldtuple->tuple,
668  true);
669  break;
670  default:
671  Assert(false);
672  }
673 
676 
677  OutputPluginWrite(ctx, true);
678 }
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
#define RelationGetDescr(relation)
Definition: rel.h:503
#define RelationGetForm(relation)
Definition: rel.h:471
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1923
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:75
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3316
union ReorderBufferChange::@97 data
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:11416
MemoryContext context
Definition: test_decoding.c:33
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
#define Assert(condition)
Definition: c.h:804
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
struct ReorderBufferChange::@97::@98 tp
FormData_pg_class * Form_pg_class
Definition: pg_class.h:153
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
#define NameStr(name)
Definition: c.h:681
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1899
void * output_plugin_private
#define RelationGetRelid(relation)
Definition: rel.h:477

◆ pg_decode_commit_prepared_txn()

static void pg_decode_commit_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 392 of file test_decoding.c.

References appendStringInfo(), ReorderBufferTXN::commit_time, ReorderBufferTXN::gid, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), quote_literal_cstr(), timestamptz_to_str(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

394 {
396 
397  OutputPluginPrepareWrite(ctx, true);
398 
399  appendStringInfo(ctx->out, "COMMIT PREPARED %s",
400  quote_literal_cstr(txn->gid));
401 
402  if (data->include_xids)
403  appendStringInfo(ctx->out, ", txid %u", txn->xid);
404 
405  if (data->include_timestamp)
406  appendStringInfo(ctx->out, " (at %s)",
408 
409  OutputPluginWrite(ctx, true);
410 }
TimestampTz commit_time
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ pg_decode_commit_txn()

static void pg_decode_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 321 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), ReorderBufferTXN::commit_time, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), TestDecodingData::skip_empty_xacts, timestamptz_to_str(), TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

323 {
326  bool xact_wrote_changes = txndata->xact_wrote_changes;
327 
328  pfree(txndata);
329  txn->output_plugin_private = NULL;
330 
331  if (data->skip_empty_xacts && !xact_wrote_changes)
332  return;
333 
334  OutputPluginPrepareWrite(ctx, true);
335  if (data->include_xids)
336  appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
337  else
338  appendStringInfoString(ctx->out, "COMMIT");
339 
340  if (data->include_timestamp)
341  appendStringInfo(ctx->out, " (at %s)",
343 
344  OutputPluginWrite(ctx, true);
345 }
TimestampTz commit_time
void * output_plugin_private
Definition: logical.h:75
void pfree(void *pointer)
Definition: mcxt.c:1169
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ pg_decode_filter()

static bool pg_decode_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 454 of file test_decoding.c.

References InvalidRepOriginId, TestDecodingData::only_local, and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

456 {
458 
459  if (data->only_local && origin_id != InvalidRepOriginId)
460  return true;
461  return false;
462 }
void * output_plugin_private
Definition: logical.h:75
#define InvalidRepOriginId
Definition: origin.h:33

◆ pg_decode_filter_prepare()

static bool pg_decode_filter_prepare ( LogicalDecodingContext ctx,
TransactionId  xid,
const char *  gid 
)
static

Definition at line 444 of file test_decoding.c.

Referenced by _PG_output_plugin_init().

446 {
447  if (strstr(gid, "_nodecode") != NULL)
448  return true;
449 
450  return false;
451 }

◆ pg_decode_message()

static void pg_decode_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 736 of file test_decoding.c.

References appendBinaryStringInfo(), appendStringInfo(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

739 {
740  OutputPluginPrepareWrite(ctx, true);
741  appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
742  transactional, prefix, sz);
743  appendBinaryStringInfo(ctx->out, message, sz);
744  OutputPluginWrite(ctx, true);
745 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227

◆ pg_decode_prepare_txn()

static void pg_decode_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 366 of file test_decoding.c.

References appendStringInfo(), ReorderBufferTXN::commit_time, ReorderBufferTXN::gid, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), quote_literal_cstr(), TestDecodingData::skip_empty_xacts, timestamptz_to_str(), TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

368 {
371 
372  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
373  return;
374 
375  OutputPluginPrepareWrite(ctx, true);
376 
377  appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
378  quote_literal_cstr(txn->gid));
379 
380  if (data->include_xids)
381  appendStringInfo(ctx->out, ", txid %u", txn->xid);
382 
383  if (data->include_timestamp)
384  appendStringInfo(ctx->out, " (at %s)",
386 
387  OutputPluginWrite(ctx, true);
388 }
TimestampTz commit_time
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ pg_decode_rollback_prepared_txn()

static void pg_decode_rollback_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)
static

Definition at line 414 of file test_decoding.c.

References appendStringInfo(), ReorderBufferTXN::commit_time, ReorderBufferTXN::gid, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), quote_literal_cstr(), timestamptz_to_str(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

418 {
420 
421  OutputPluginPrepareWrite(ctx, true);
422 
423  appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
424  quote_literal_cstr(txn->gid));
425 
426  if (data->include_xids)
427  appendStringInfo(ctx->out, ", txid %u", txn->xid);
428 
429  if (data->include_timestamp)
430  appendStringInfo(ctx->out, " (at %s)",
432 
433  OutputPluginWrite(ctx, true);
434 }
TimestampTz commit_time
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ pg_decode_shutdown()

static void pg_decode_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 283 of file test_decoding.c.

References TestDecodingData::context, MemoryContextDelete(), and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

284 {
286 
287  /* cleanup our own resources via memory context reset */
289 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: test_decoding.c:33

◆ pg_decode_startup()

static void pg_decode_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 162 of file test_decoding.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, DefElem::arg, Assert, TestDecodingData::context, LogicalDecodingContext::context, DefElem::defname, ereport, errcode(), errmsg(), ERROR, TestDecodingData::include_timestamp, TestDecodingData::include_xids, IsA, lfirst, TestDecodingData::only_local, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OUTPUT_PLUGIN_TEXTUAL_OUTPUT, OutputPluginOptions::output_type, palloc0(), parse_bool(), OutputPluginOptions::receive_rewrites, TestDecodingData::skip_empty_xacts, LogicalDecodingContext::streaming, and strVal.

Referenced by _PG_output_plugin_init().

164 {
165  ListCell *option;
166  TestDecodingData *data;
167  bool enable_streaming = false;
168 
169  data = palloc0(sizeof(TestDecodingData));
171  "text conversion context",
173  data->include_xids = true;
174  data->include_timestamp = false;
175  data->skip_empty_xacts = false;
176  data->only_local = false;
177 
178  ctx->output_plugin_private = data;
179 
181  opt->receive_rewrites = false;
182 
183  foreach(option, ctx->output_plugin_options)
184  {
185  DefElem *elem = lfirst(option);
186 
187  Assert(elem->arg == NULL || IsA(elem->arg, String));
188 
189  if (strcmp(elem->defname, "include-xids") == 0)
190  {
191  /* if option does not provide a value, it means its value is true */
192  if (elem->arg == NULL)
193  data->include_xids = true;
194  else if (!parse_bool(strVal(elem->arg), &data->include_xids))
195  ereport(ERROR,
196  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
197  errmsg("could not parse value \"%s\" for parameter \"%s\"",
198  strVal(elem->arg), elem->defname)));
199  }
200  else if (strcmp(elem->defname, "include-timestamp") == 0)
201  {
202  if (elem->arg == NULL)
203  data->include_timestamp = true;
204  else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
205  ereport(ERROR,
206  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
207  errmsg("could not parse value \"%s\" for parameter \"%s\"",
208  strVal(elem->arg), elem->defname)));
209  }
210  else if (strcmp(elem->defname, "force-binary") == 0)
211  {
212  bool force_binary;
213 
214  if (elem->arg == NULL)
215  continue;
216  else if (!parse_bool(strVal(elem->arg), &force_binary))
217  ereport(ERROR,
218  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
219  errmsg("could not parse value \"%s\" for parameter \"%s\"",
220  strVal(elem->arg), elem->defname)));
221 
222  if (force_binary)
224  }
225  else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
226  {
227 
228  if (elem->arg == NULL)
229  data->skip_empty_xacts = true;
230  else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
231  ereport(ERROR,
232  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
233  errmsg("could not parse value \"%s\" for parameter \"%s\"",
234  strVal(elem->arg), elem->defname)));
235  }
236  else if (strcmp(elem->defname, "only-local") == 0)
237  {
238 
239  if (elem->arg == NULL)
240  data->only_local = true;
241  else if (!parse_bool(strVal(elem->arg), &data->only_local))
242  ereport(ERROR,
243  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
244  errmsg("could not parse value \"%s\" for parameter \"%s\"",
245  strVal(elem->arg), elem->defname)));
246  }
247  else if (strcmp(elem->defname, "include-rewrites") == 0)
248  {
249 
250  if (elem->arg == NULL)
251  continue;
252  else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
253  ereport(ERROR,
254  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
255  errmsg("could not parse value \"%s\" for parameter \"%s\"",
256  strVal(elem->arg), elem->defname)));
257  }
258  else if (strcmp(elem->defname, "stream-changes") == 0)
259  {
260  if (elem->arg == NULL)
261  continue;
262  else if (!parse_bool(strVal(elem->arg), &enable_streaming))
263  ereport(ERROR,
264  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
265  errmsg("could not parse value \"%s\" for parameter \"%s\"",
266  strVal(elem->arg), elem->defname)));
267  }
268  else
269  {
270  ereport(ERROR,
271  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
272  errmsg("option \"%s\" = \"%s\" is unknown",
273  elem->defname,
274  elem->arg ? strVal(elem->arg) : "(null)")));
275  }
276  }
277 
278  ctx->streaming &= enable_streaming;
279 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
#define AllocSetContextCreate
Definition: memutils.h:173
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:698
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: logical.h:35
List * output_plugin_options
Definition: logical.h:58
bool parse_bool(const char *value, bool *result)
Definition: bool.c:30
OutputPluginOutputType output_type
Definition: output_plugin.h:28
#define ERROR
Definition: elog.h:46
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
Node * arg
Definition: parsenodes.h:747
MemoryContext context
Definition: test_decoding.c:33
void * palloc0(Size size)
Definition: mcxt.c:1093
#define ereport(elevel,...)
Definition: elog.h:157
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:909
char * defname
Definition: parsenodes.h:746

◆ pg_decode_stream_abort()

static void pg_decode_stream_abort ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 801 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), Assert, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), TestDecodingData::skip_empty_xacts, ReorderBufferTXN::toptxn, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

804 {
806 
807  /*
808  * stream abort can be sent for an individual subtransaction but we
809  * maintain the output_plugin_private only under the toptxn so if this is
810  * not the toptxn then fetch the toptxn.
811  */
812  ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
813  TestDecodingTxnData *txndata = toptxn->output_plugin_private;
814  bool xact_wrote_changes = txndata->xact_wrote_changes;
815 
816  if (txn->toptxn == NULL)
817  {
818  Assert(txn->output_plugin_private != NULL);
819  pfree(txndata);
820  txn->output_plugin_private = NULL;
821  }
822 
823  if (data->skip_empty_xacts && !xact_wrote_changes)
824  return;
825 
826  OutputPluginPrepareWrite(ctx, true);
827  if (data->include_xids)
828  appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
829  else
830  appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
831  OutputPluginWrite(ctx, true);
832 }
void * output_plugin_private
Definition: logical.h:75
void pfree(void *pointer)
Definition: mcxt.c:1169
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
struct ReorderBufferTXN * toptxn
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
#define Assert(condition)
Definition: c.h:804
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private

◆ pg_decode_stream_change()

static void pg_decode_stream_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 896 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_stream_start(), TestDecodingData::skip_empty_xacts, TestDecodingTxnData::stream_wrote_changes, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

900 {
903 
904  /* output stream start if we haven't yet */
905  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
906  {
907  pg_output_stream_start(ctx, data, txn, false);
908  }
909  txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
910 
911  OutputPluginPrepareWrite(ctx, true);
912  if (data->include_xids)
913  appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
914  else
915  appendStringInfoString(ctx->out, "streaming change for transaction");
916  OutputPluginWrite(ctx, true);
917 }
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

◆ pg_decode_stream_commit()

static void pg_decode_stream_commit ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 862 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), ReorderBufferTXN::commit_time, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), TestDecodingData::skip_empty_xacts, timestamptz_to_str(), TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

865 {
868  bool xact_wrote_changes = txndata->xact_wrote_changes;
869 
870  pfree(txndata);
871  txn->output_plugin_private = NULL;
872 
873  if (data->skip_empty_xacts && !xact_wrote_changes)
874  return;
875 
876  OutputPluginPrepareWrite(ctx, true);
877 
878  if (data->include_xids)
879  appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
880  else
881  appendStringInfoString(ctx->out, "committing streamed transaction");
882 
883  if (data->include_timestamp)
884  appendStringInfo(ctx->out, " (at %s)",
886 
887  OutputPluginWrite(ctx, true);
888 }
TimestampTz commit_time
void * output_plugin_private
Definition: logical.h:75
void pfree(void *pointer)
Definition: mcxt.c:1169
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ pg_decode_stream_message()

static void pg_decode_stream_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 925 of file test_decoding.c.

References appendBinaryStringInfo(), appendStringInfo(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

928 {
929  OutputPluginPrepareWrite(ctx, true);
930 
931  if (transactional)
932  {
933  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
934  transactional, prefix, sz);
935  }
936  else
937  {
938  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
939  transactional, prefix, sz);
940  appendBinaryStringInfo(ctx->out, message, sz);
941  }
942 
943  OutputPluginWrite(ctx, true);
944 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227

◆ pg_decode_stream_prepare()

static void pg_decode_stream_prepare ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 835 of file test_decoding.c.

References appendStringInfo(), ReorderBufferTXN::commit_time, ReorderBufferTXN::gid, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), quote_literal_cstr(), TestDecodingData::skip_empty_xacts, timestamptz_to_str(), TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

838 {
841 
842  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
843  return;
844 
845  OutputPluginPrepareWrite(ctx, true);
846 
847  if (data->include_xids)
848  appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
849  quote_literal_cstr(txn->gid), txn->xid);
850  else
851  appendStringInfo(ctx->out, "preparing streamed transaction %s",
852  quote_literal_cstr(txn->gid));
853 
854  if (data->include_timestamp)
855  appendStringInfo(ctx->out, " (at %s)",
857 
858  OutputPluginWrite(ctx, true);
859 }
TimestampTz commit_time
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ pg_decode_stream_start()

static void pg_decode_stream_start ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 748 of file test_decoding.c.

References LogicalDecodingContext::context, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_stream_start(), TestDecodingData::skip_empty_xacts, TestDecodingTxnData::stream_wrote_changes, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

750 {
753 
754  /*
755  * Allocate the txn plugin data for the first stream in the transaction.
756  */
757  if (txndata == NULL)
758  {
759  txndata =
761  txndata->xact_wrote_changes = false;
762  txn->output_plugin_private = txndata;
763  }
764 
765  txndata->stream_wrote_changes = false;
766  if (data->skip_empty_xacts)
767  return;
768  pg_output_stream_start(ctx, data, txn, true);
769 }
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: logical.h:35
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
void * output_plugin_private
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

◆ pg_decode_stream_stop()

static void pg_decode_stream_stop ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 783 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), TestDecodingData::skip_empty_xacts, TestDecodingTxnData::stream_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

785 {
788 
789  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
790  return;
791 
792  OutputPluginPrepareWrite(ctx, true);
793  if (data->include_xids)
794  appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
795  else
796  appendStringInfoString(ctx->out, "closing a streamed block for transaction");
797  OutputPluginWrite(ctx, true);
798 }
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private

◆ pg_decode_stream_truncate()

static void pg_decode_stream_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 951 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_stream_start(), TestDecodingData::skip_empty_xacts, TestDecodingTxnData::stream_wrote_changes, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

954 {
957 
958  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
959  {
960  pg_output_stream_start(ctx, data, txn, false);
961  }
962  txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
963 
964  OutputPluginPrepareWrite(ctx, true);
965  if (data->include_xids)
966  appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
967  else
968  appendStringInfoString(ctx->out, "streaming truncate for transaction");
969  OutputPluginWrite(ctx, true);
970 }
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

◆ pg_decode_truncate()

static void pg_decode_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 681 of file test_decoding.c.

References appendStringInfoString(), TestDecodingData::context, ReorderBufferChange::data, get_namespace_name(), i, MemoryContextReset(), MemoryContextSwitchTo(), NameStr, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), TestDecodingData::skip_empty_xacts, ReorderBufferChange::truncate, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

683 {
684  TestDecodingData *data;
685  TestDecodingTxnData *txndata;
686  MemoryContext old;
687  int i;
688 
689  data = ctx->output_plugin_private;
690  txndata = txn->output_plugin_private;
691 
692  /* output BEGIN if we haven't yet */
693  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
694  {
695  pg_output_begin(ctx, data, txn, false);
696  }
697  txndata->xact_wrote_changes = true;
698 
699  /* Avoid leaking memory by using and resetting our own context */
700  old = MemoryContextSwitchTo(data->context);
701 
702  OutputPluginPrepareWrite(ctx, true);
703 
704  appendStringInfoString(ctx->out, "table ");
705 
706  for (i = 0; i < nrelations; i++)
707  {
708  if (i > 0)
709  appendStringInfoString(ctx->out, ", ");
710 
712  quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
713  NameStr(relations[i]->rd_rel->relname)));
714  }
715 
716  appendStringInfoString(ctx->out, ": TRUNCATE:");
717 
718  if (change->data.truncate.restart_seqs
719  || change->data.truncate.cascade)
720  {
721  if (change->data.truncate.restart_seqs)
722  appendStringInfoString(ctx->out, " restart_seqs");
723  if (change->data.truncate.cascade)
724  appendStringInfoString(ctx->out, " cascade");
725  }
726  else
727  appendStringInfoString(ctx->out, " (no-flags)");
728 
731 
732  OutputPluginWrite(ctx, true);
733 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:75
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3316
union ReorderBufferChange::@97 data
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:11416
MemoryContext context
Definition: test_decoding.c:33
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
struct ReorderBufferChange::@97::@99 truncate
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
int i
#define NameStr(name)
Definition: c.h:681
void * output_plugin_private

◆ pg_output_begin()

static void pg_output_begin ( LogicalDecodingContext ctx,
TestDecodingData data,
ReorderBufferTXN txn,
bool  last_write 
)
static

Definition at line 309 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), TestDecodingData::include_xids, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by pg_decode_begin_prepare_txn(), pg_decode_begin_txn(), pg_decode_change(), and pg_decode_truncate().

310 {
311  OutputPluginPrepareWrite(ctx, last_write);
312  if (data->include_xids)
313  appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
314  else
315  appendStringInfoString(ctx->out, "BEGIN");
316  OutputPluginWrite(ctx, last_write);
317 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70

◆ pg_output_stream_start()

static void pg_output_stream_start ( LogicalDecodingContext ctx,
TestDecodingData data,
ReorderBufferTXN txn,
bool  last_write 
)
static

Definition at line 772 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), TestDecodingData::include_xids, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by pg_decode_stream_change(), pg_decode_stream_start(), and pg_decode_stream_truncate().

773 {
774  OutputPluginPrepareWrite(ctx, last_write);
775  if (data->include_xids)
776  appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
777  else
778  appendStringInfoString(ctx->out, "opening a streamed block for transaction");
779  OutputPluginWrite(ctx, last_write);
780 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70

◆ print_literal()

static void print_literal ( StringInfo  s,
Oid  typid,
char *  outputstr 
)
static

Definition at line 472 of file test_decoding.c.

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), and SQL_STR_DOUBLE.

Referenced by tuple_to_stringinfo().

473 {
474  const char *valptr;
475 
476  switch (typid)
477  {
478  case INT2OID:
479  case INT4OID:
480  case INT8OID:
481  case OIDOID:
482  case FLOAT4OID:
483  case FLOAT8OID:
484  case NUMERICOID:
485  /* NB: We don't care about Inf, NaN et al. */
486  appendStringInfoString(s, outputstr);
487  break;
488 
489  case BITOID:
490  case VARBITOID:
491  appendStringInfo(s, "B'%s'", outputstr);
492  break;
493 
494  case BOOLOID:
495  if (strcmp(outputstr, "t") == 0)
496  appendStringInfoString(s, "true");
497  else
498  appendStringInfoString(s, "false");
499  break;
500 
501  default:
502  appendStringInfoChar(s, '\'');
503  for (valptr = outputstr; *valptr; valptr++)
504  {
505  char ch = *valptr;
506 
507  if (SQL_STR_DOUBLE(ch, false))
508  appendStringInfoChar(s, ch);
509  appendStringInfoChar(s, ch);
510  }
511  appendStringInfoChar(s, '\'');
512  break;
513  }
514 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1164

◆ tuple_to_stringinfo()

static void tuple_to_stringinfo ( StringInfo  s,
TupleDesc  tupdesc,
HeapTuple  tuple,
bool  skip_nulls 
)
static

Definition at line 518 of file test_decoding.c.

References appendStringInfoChar(), appendStringInfoString(), format_type_be(), getTypeOutputInfo(), heap_getattr, NameStr, TupleDescData::natts, OidOutputFunctionCall(), PG_DETOAST_DATUM, PointerGetDatum, print_literal(), quote_identifier(), TupleDescAttr, val, and VARATT_IS_EXTERNAL_ONDISK.

Referenced by pg_decode_change().

519 {
520  int natt;
521 
522  /* print all columns individually */
523  for (natt = 0; natt < tupdesc->natts; natt++)
524  {
525  Form_pg_attribute attr; /* the attribute itself */
526  Oid typid; /* type of current attribute */
527  Oid typoutput; /* output function */
528  bool typisvarlena;
529  Datum origval; /* possibly toasted Datum */
530  bool isnull; /* column is null? */
531 
532  attr = TupleDescAttr(tupdesc, natt);
533 
534  /*
535  * don't print dropped columns, we can't be sure everything is
536  * available for them
537  */
538  if (attr->attisdropped)
539  continue;
540 
541  /*
542  * Don't print system columns, oid will already have been printed if
543  * present.
544  */
545  if (attr->attnum < 0)
546  continue;
547 
548  typid = attr->atttypid;
549 
550  /* get Datum from tuple */
551  origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
552 
553  if (isnull && skip_nulls)
554  continue;
555 
556  /* print attribute name */
557  appendStringInfoChar(s, ' ');
558  appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
559 
560  /* print attribute type */
561  appendStringInfoChar(s, '[');
563  appendStringInfoChar(s, ']');
564 
565  /* query output function */
566  getTypeOutputInfo(typid,
567  &typoutput, &typisvarlena);
568 
569  /* print separator */
570  appendStringInfoChar(s, ':');
571 
572  /* print data */
573  if (isnull)
574  appendStringInfoString(s, "null");
575  else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
576  appendStringInfoString(s, "unchanged-toast-datum");
577  else if (!typisvarlena)
578  print_literal(s, typid,
579  OidOutputFunctionCall(typoutput, origval));
580  else
581  {
582  Datum val; /* definitely detoasted Datum */
583 
584  val = PointerGetDatum(PG_DETOAST_DATUM(origval));
585  print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
586  }
587  }
588 }
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:327
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2854
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:11332
#define PointerGetDatum(X)
Definition: postgres.h:600
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
unsigned int Oid
Definition: postgres_ext.h:31
static void print_literal(StringInfo s, Oid typid, char *outputstr)
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
#define heap_getattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:761
uintptr_t Datum
Definition: postgres.h:411
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1653
#define NameStr(name)
Definition: c.h:681
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
long val
Definition: informix.c:664

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 25 of file test_decoding.c.