PostgreSQL Source Code  git master
test_decoding.c File Reference
#include "postgres.h"
#include "catalog/pg_type.h"
#include "replication/logical.h"
#include "replication/origin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for test_decoding.c:

Go to the source code of this file.

Data Structures

struct  TestDecodingData
 
struct  TestDecodingTxnData
 

Functions

void _PG_init (void)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void pg_decode_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pg_decode_shutdown (LogicalDecodingContext *ctx)
 
static void pg_decode_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_output_begin (LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 
static void pg_decode_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
 
static void pg_decode_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static bool pg_decode_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pg_decode_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pg_decode_filter_prepare (LogicalDecodingContext *ctx, const char *gid)
 
static void pg_decode_begin_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_decode_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pg_decode_commit_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_rollback_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
static void pg_decode_stream_start (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_output_stream_start (LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 
static void pg_decode_stream_stop (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_decode_stream_abort (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pg_decode_stream_prepare (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pg_decode_stream_commit (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_stream_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pg_decode_stream_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static void pg_decode_stream_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static void print_literal (StringInfo s, Oid typid, char *outputstr)
 
static void tuple_to_stringinfo (StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
 

Variables

 PG_MODULE_MAGIC
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 124 of file test_decoding.c.

125 {
126  /* other plugins can perform things here */
127 }

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 131 of file test_decoding.c.

References AssertVariableIsOfType, OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::begin_prepare_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::commit_prepared_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::filter_prepare_cb, OutputPluginCallbacks::message_cb, pg_decode_begin_prepare_txn(), pg_decode_begin_txn(), pg_decode_change(), pg_decode_commit_prepared_txn(), pg_decode_commit_txn(), pg_decode_filter(), pg_decode_filter_prepare(), pg_decode_message(), pg_decode_prepare_txn(), pg_decode_rollback_prepared_txn(), pg_decode_shutdown(), pg_decode_startup(), pg_decode_stream_abort(), pg_decode_stream_change(), pg_decode_stream_commit(), pg_decode_stream_message(), pg_decode_stream_prepare(), pg_decode_stream_start(), pg_decode_stream_stop(), pg_decode_stream_truncate(), pg_decode_truncate(), OutputPluginCallbacks::prepare_cb, OutputPluginCallbacks::rollback_prepared_cb, OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_prepare_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

132 {
134 
156 }
LogicalDecodeTruncateCB truncate_cb
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
LogicalDecodeStreamPrepareCB stream_prepare_cb
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamMessageCB stream_message_cb
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
LogicalDecodeStreamAbortCB stream_abort_cb
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
LogicalDecodeBeginPrepareCB begin_prepare_cb
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
LogicalDecodePrepareCB prepare_cb
LogicalDecodeCommitCB commit_cb
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
LogicalDecodeChangeCB change_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeStartupCB startup_cb
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
LogicalDecodeStreamStartCB stream_start_cb
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeStreamChangeCB stream_change_cb
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:963

◆ pg_decode_begin_prepare_txn()

static void pg_decode_begin_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 348 of file test_decoding.c.

References LogicalDecodingContext::context, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_begin(), TestDecodingData::skip_empty_xacts, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

349 {
351  TestDecodingTxnData *txndata =
353 
354  txndata->xact_wrote_changes = false;
355  txn->output_plugin_private = txndata;
356 
357  if (data->skip_empty_xacts)
358  return;
359 
360  pg_output_begin(ctx, data, txn, true);
361 }
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: logical.h:35
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
void * output_plugin_private

◆ pg_decode_begin_txn()

static void pg_decode_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 292 of file test_decoding.c.

References LogicalDecodingContext::context, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_begin(), TestDecodingData::skip_empty_xacts, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

293 {
295  TestDecodingTxnData *txndata =
297 
298  txndata->xact_wrote_changes = false;
299  txn->output_plugin_private = txndata;
300 
301  if (data->skip_empty_xacts)
302  return;
303 
304  pg_output_begin(ctx, data, txn, true);
305 }
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: logical.h:35
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
void * output_plugin_private

◆ pg_decode_change()

static void pg_decode_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  rel,
ReorderBufferChange change 
)
static

Definition at line 592 of file test_decoding.c.

References ReorderBufferChange::action, appendStringInfoChar(), appendStringInfoString(), Assert, TestDecodingData::context, ReorderBufferChange::data, get_namespace_name(), get_rel_name(), get_rel_namespace(), MemoryContextReset(), MemoryContextSwitchTo(), NameStr, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), RelationGetDescr, RelationGetForm, RelationGetRelid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, TestDecodingData::skip_empty_xacts, ReorderBufferChange::tp, tuple_to_stringinfo(), and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

594 {
595  TestDecodingData *data;
596  TestDecodingTxnData *txndata;
597  Form_pg_class class_form;
598  TupleDesc tupdesc;
599  MemoryContext old;
600 
601  data = ctx->output_plugin_private;
602  txndata = txn->output_plugin_private;
603 
604  /* output BEGIN if we haven't yet */
605  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
606  {
607  pg_output_begin(ctx, data, txn, false);
608  }
609  txndata->xact_wrote_changes = true;
610 
611  class_form = RelationGetForm(relation);
612  tupdesc = RelationGetDescr(relation);
613 
614  /* Avoid leaking memory by using and resetting our own context */
615  old = MemoryContextSwitchTo(data->context);
616 
617  OutputPluginPrepareWrite(ctx, true);
618 
619  appendStringInfoString(ctx->out, "table ");
622  class_form->relrewrite ?
623  get_rel_name(class_form->relrewrite) :
624  NameStr(class_form->relname)));
625  appendStringInfoChar(ctx->out, ':');
626 
627  switch (change->action)
628  {
630  appendStringInfoString(ctx->out, " INSERT:");
631  if (change->data.tp.newtuple == NULL)
632  appendStringInfoString(ctx->out, " (no-tuple-data)");
633  else
634  tuple_to_stringinfo(ctx->out, tupdesc,
635  &change->data.tp.newtuple->tuple,
636  false);
637  break;
639  appendStringInfoString(ctx->out, " UPDATE:");
640  if (change->data.tp.oldtuple != NULL)
641  {
642  appendStringInfoString(ctx->out, " old-key:");
643  tuple_to_stringinfo(ctx->out, tupdesc,
644  &change->data.tp.oldtuple->tuple,
645  true);
646  appendStringInfoString(ctx->out, " new-tuple:");
647  }
648 
649  if (change->data.tp.newtuple == NULL)
650  appendStringInfoString(ctx->out, " (no-tuple-data)");
651  else
652  tuple_to_stringinfo(ctx->out, tupdesc,
653  &change->data.tp.newtuple->tuple,
654  false);
655  break;
657  appendStringInfoString(ctx->out, " DELETE:");
658 
659  /* if there was no PK, we only know that a delete happened */
660  if (change->data.tp.oldtuple == NULL)
661  appendStringInfoString(ctx->out, " (no-tuple-data)");
662  /* In DELETE, only the replica identity is present; display that */
663  else
664  tuple_to_stringinfo(ctx->out, tupdesc,
665  &change->data.tp.oldtuple->tuple,
666  true);
667  break;
668  default:
669  Assert(false);
670  }
671 
674 
675  OutputPluginWrite(ctx, true);
676 }
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
#define RelationGetDescr(relation)
Definition: rel.h:483
#define RelationGetForm(relation)
Definition: rel.h:451
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1923
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:75
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3316
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:11078
MemoryContext context
Definition: test_decoding.c:33
struct ReorderBufferChange::@96::@97 tp
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
#define Assert(condition)
Definition: c.h:804
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
FormData_pg_class * Form_pg_class
Definition: pg_class.h:153
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
#define NameStr(name)
Definition: c.h:681
union ReorderBufferChange::@96 data
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1899
void * output_plugin_private
#define RelationGetRelid(relation)
Definition: rel.h:457

◆ pg_decode_commit_prepared_txn()

static void pg_decode_commit_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 391 of file test_decoding.c.

References appendStringInfo(), ReorderBufferTXN::commit_time, ReorderBufferTXN::gid, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), quote_literal_cstr(), timestamptz_to_str(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

393 {
395 
396  OutputPluginPrepareWrite(ctx, true);
397 
398  appendStringInfo(ctx->out, "COMMIT PREPARED %s",
399  quote_literal_cstr(txn->gid));
400 
401  if (data->include_xids)
402  appendStringInfo(ctx->out, ", txid %u", txn->xid);
403 
404  if (data->include_timestamp)
405  appendStringInfo(ctx->out, " (at %s)",
407 
408  OutputPluginWrite(ctx, true);
409 }
TimestampTz commit_time
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1772

◆ pg_decode_commit_txn()

static void pg_decode_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 320 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), ReorderBufferTXN::commit_time, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), TestDecodingData::skip_empty_xacts, timestamptz_to_str(), TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

322 {
325  bool xact_wrote_changes = txndata->xact_wrote_changes;
326 
327  pfree(txndata);
328  txn->output_plugin_private = NULL;
329 
330  if (data->skip_empty_xacts && !xact_wrote_changes)
331  return;
332 
333  OutputPluginPrepareWrite(ctx, true);
334  if (data->include_xids)
335  appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
336  else
337  appendStringInfoString(ctx->out, "COMMIT");
338 
339  if (data->include_timestamp)
340  appendStringInfo(ctx->out, " (at %s)",
342 
343  OutputPluginWrite(ctx, true);
344 }
TimestampTz commit_time
void * output_plugin_private
Definition: logical.h:75
void pfree(void *pointer)
Definition: mcxt.c:1057
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1772

◆ pg_decode_filter()

static bool pg_decode_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 452 of file test_decoding.c.

References InvalidRepOriginId, TestDecodingData::only_local, and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

454 {
456 
457  if (data->only_local && origin_id != InvalidRepOriginId)
458  return true;
459  return false;
460 }
void * output_plugin_private
Definition: logical.h:75
#define InvalidRepOriginId
Definition: origin.h:33

◆ pg_decode_filter_prepare()

static bool pg_decode_filter_prepare ( LogicalDecodingContext ctx,
const char *  gid 
)
static

Definition at line 443 of file test_decoding.c.

Referenced by _PG_output_plugin_init().

444 {
445  if (strstr(gid, "_nodecode") != NULL)
446  return true;
447 
448  return false;
449 }

◆ pg_decode_message()

static void pg_decode_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 734 of file test_decoding.c.

References appendBinaryStringInfo(), appendStringInfo(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

737 {
738  OutputPluginPrepareWrite(ctx, true);
739  appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
740  transactional, prefix, sz);
741  appendBinaryStringInfo(ctx->out, message, sz);
742  OutputPluginWrite(ctx, true);
743 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227

◆ pg_decode_prepare_txn()

static void pg_decode_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 365 of file test_decoding.c.

References appendStringInfo(), ReorderBufferTXN::commit_time, ReorderBufferTXN::gid, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), quote_literal_cstr(), TestDecodingData::skip_empty_xacts, timestamptz_to_str(), TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

367 {
370 
371  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
372  return;
373 
374  OutputPluginPrepareWrite(ctx, true);
375 
376  appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
377  quote_literal_cstr(txn->gid));
378 
379  if (data->include_xids)
380  appendStringInfo(ctx->out, ", txid %u", txn->xid);
381 
382  if (data->include_timestamp)
383  appendStringInfo(ctx->out, " (at %s)",
385 
386  OutputPluginWrite(ctx, true);
387 }
TimestampTz commit_time
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1772

◆ pg_decode_rollback_prepared_txn()

static void pg_decode_rollback_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)
static

Definition at line 413 of file test_decoding.c.

References appendStringInfo(), ReorderBufferTXN::commit_time, ReorderBufferTXN::gid, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), quote_literal_cstr(), timestamptz_to_str(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

417 {
419 
420  OutputPluginPrepareWrite(ctx, true);
421 
422  appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
423  quote_literal_cstr(txn->gid));
424 
425  if (data->include_xids)
426  appendStringInfo(ctx->out, ", txid %u", txn->xid);
427 
428  if (data->include_timestamp)
429  appendStringInfo(ctx->out, " (at %s)",
431 
432  OutputPluginWrite(ctx, true);
433 }
TimestampTz commit_time
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1772

◆ pg_decode_shutdown()

static void pg_decode_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 282 of file test_decoding.c.

References TestDecodingData::context, MemoryContextDelete(), and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

283 {
285 
286  /* cleanup our own resources via memory context reset */
288 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: test_decoding.c:33

◆ pg_decode_startup()

static void pg_decode_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 161 of file test_decoding.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, DefElem::arg, Assert, TestDecodingData::context, LogicalDecodingContext::context, DefElem::defname, ereport, errcode(), errmsg(), ERROR, TestDecodingData::include_timestamp, TestDecodingData::include_xids, IsA, lfirst, TestDecodingData::only_local, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OUTPUT_PLUGIN_TEXTUAL_OUTPUT, OutputPluginOptions::output_type, palloc0(), parse_bool(), OutputPluginOptions::receive_rewrites, TestDecodingData::skip_empty_xacts, LogicalDecodingContext::streaming, and strVal.

Referenced by _PG_output_plugin_init().

163 {
164  ListCell *option;
165  TestDecodingData *data;
166  bool enable_streaming = false;
167 
168  data = palloc0(sizeof(TestDecodingData));
170  "text conversion context",
172  data->include_xids = true;
173  data->include_timestamp = false;
174  data->skip_empty_xacts = false;
175  data->only_local = false;
176 
177  ctx->output_plugin_private = data;
178 
180  opt->receive_rewrites = false;
181 
182  foreach(option, ctx->output_plugin_options)
183  {
184  DefElem *elem = lfirst(option);
185 
186  Assert(elem->arg == NULL || IsA(elem->arg, String));
187 
188  if (strcmp(elem->defname, "include-xids") == 0)
189  {
190  /* if option does not provide a value, it means its value is true */
191  if (elem->arg == NULL)
192  data->include_xids = true;
193  else if (!parse_bool(strVal(elem->arg), &data->include_xids))
194  ereport(ERROR,
195  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
196  errmsg("could not parse value \"%s\" for parameter \"%s\"",
197  strVal(elem->arg), elem->defname)));
198  }
199  else if (strcmp(elem->defname, "include-timestamp") == 0)
200  {
201  if (elem->arg == NULL)
202  data->include_timestamp = true;
203  else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
204  ereport(ERROR,
205  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
206  errmsg("could not parse value \"%s\" for parameter \"%s\"",
207  strVal(elem->arg), elem->defname)));
208  }
209  else if (strcmp(elem->defname, "force-binary") == 0)
210  {
211  bool force_binary;
212 
213  if (elem->arg == NULL)
214  continue;
215  else if (!parse_bool(strVal(elem->arg), &force_binary))
216  ereport(ERROR,
217  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
218  errmsg("could not parse value \"%s\" for parameter \"%s\"",
219  strVal(elem->arg), elem->defname)));
220 
221  if (force_binary)
223  }
224  else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
225  {
226 
227  if (elem->arg == NULL)
228  data->skip_empty_xacts = true;
229  else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
230  ereport(ERROR,
231  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
232  errmsg("could not parse value \"%s\" for parameter \"%s\"",
233  strVal(elem->arg), elem->defname)));
234  }
235  else if (strcmp(elem->defname, "only-local") == 0)
236  {
237 
238  if (elem->arg == NULL)
239  data->only_local = true;
240  else if (!parse_bool(strVal(elem->arg), &data->only_local))
241  ereport(ERROR,
242  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
243  errmsg("could not parse value \"%s\" for parameter \"%s\"",
244  strVal(elem->arg), elem->defname)));
245  }
246  else if (strcmp(elem->defname, "include-rewrites") == 0)
247  {
248 
249  if (elem->arg == NULL)
250  continue;
251  else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
252  ereport(ERROR,
253  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
254  errmsg("could not parse value \"%s\" for parameter \"%s\"",
255  strVal(elem->arg), elem->defname)));
256  }
257  else if (strcmp(elem->defname, "stream-changes") == 0)
258  {
259  if (elem->arg == NULL)
260  continue;
261  else if (!parse_bool(strVal(elem->arg), &enable_streaming))
262  ereport(ERROR,
263  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
264  errmsg("could not parse value \"%s\" for parameter \"%s\"",
265  strVal(elem->arg), elem->defname)));
266  }
267  else
268  {
269  ereport(ERROR,
270  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
271  errmsg("option \"%s\" = \"%s\" is unknown",
272  elem->defname,
273  elem->arg ? strVal(elem->arg) : "(null)")));
274  }
275  }
276 
277  ctx->streaming &= enable_streaming;
278 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:584
#define AllocSetContextCreate
Definition: memutils.h:170
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:694
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: logical.h:35
List * output_plugin_options
Definition: logical.h:58
bool parse_bool(const char *value, bool *result)
Definition: bool.c:30
OutputPluginOutputType output_type
Definition: output_plugin.h:28
#define ERROR
Definition: elog.h:45
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
Node * arg
Definition: parsenodes.h:734
MemoryContext context
Definition: test_decoding.c:33
void * palloc0(Size size)
Definition: mcxt.c:981
#define ereport(elevel,...)
Definition: elog.h:155
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:905
char * defname
Definition: parsenodes.h:733

◆ pg_decode_stream_abort()

static void pg_decode_stream_abort ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 799 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), Assert, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), TestDecodingData::skip_empty_xacts, ReorderBufferTXN::toptxn, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

802 {
804 
805  /*
806  * stream abort can be sent for an individual subtransaction but we
807  * maintain the output_plugin_private only under the toptxn so if this is
808  * not the toptxn then fetch the toptxn.
809  */
810  ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
811  TestDecodingTxnData *txndata = toptxn->output_plugin_private;
812  bool xact_wrote_changes = txndata->xact_wrote_changes;
813 
814  if (txn->toptxn == NULL)
815  {
816  Assert(txn->output_plugin_private != NULL);
817  pfree(txndata);
818  txn->output_plugin_private = NULL;
819  }
820 
821  if (data->skip_empty_xacts && !xact_wrote_changes)
822  return;
823 
824  OutputPluginPrepareWrite(ctx, true);
825  if (data->include_xids)
826  appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
827  else
828  appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
829  OutputPluginWrite(ctx, true);
830 }
void * output_plugin_private
Definition: logical.h:75
void pfree(void *pointer)
Definition: mcxt.c:1057
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
struct ReorderBufferTXN * toptxn
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
#define Assert(condition)
Definition: c.h:804
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private

◆ pg_decode_stream_change()

static void pg_decode_stream_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 894 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_stream_start(), TestDecodingData::skip_empty_xacts, TestDecodingTxnData::stream_wrote_changes, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

898 {
901 
902  /* output stream start if we haven't yet */
903  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
904  {
905  pg_output_stream_start(ctx, data, txn, false);
906  }
907  txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
908 
909  OutputPluginPrepareWrite(ctx, true);
910  if (data->include_xids)
911  appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
912  else
913  appendStringInfoString(ctx->out, "streaming change for transaction");
914  OutputPluginWrite(ctx, true);
915 }
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

◆ pg_decode_stream_commit()

static void pg_decode_stream_commit ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 860 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), ReorderBufferTXN::commit_time, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), TestDecodingData::skip_empty_xacts, timestamptz_to_str(), TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

863 {
866  bool xact_wrote_changes = txndata->xact_wrote_changes;
867 
868  pfree(txndata);
869  txn->output_plugin_private = NULL;
870 
871  if (data->skip_empty_xacts && !xact_wrote_changes)
872  return;
873 
874  OutputPluginPrepareWrite(ctx, true);
875 
876  if (data->include_xids)
877  appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
878  else
879  appendStringInfoString(ctx->out, "committing streamed transaction");
880 
881  if (data->include_timestamp)
882  appendStringInfo(ctx->out, " (at %s)",
884 
885  OutputPluginWrite(ctx, true);
886 }
TimestampTz commit_time
void * output_plugin_private
Definition: logical.h:75
void pfree(void *pointer)
Definition: mcxt.c:1057
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1772

◆ pg_decode_stream_message()

static void pg_decode_stream_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 923 of file test_decoding.c.

References appendBinaryStringInfo(), appendStringInfo(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

926 {
927  OutputPluginPrepareWrite(ctx, true);
928 
929  if (transactional)
930  {
931  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
932  transactional, prefix, sz);
933  }
934  else
935  {
936  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
937  transactional, prefix, sz);
938  appendBinaryStringInfo(ctx->out, message, sz);
939  }
940 
941  OutputPluginWrite(ctx, true);
942 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227

◆ pg_decode_stream_prepare()

static void pg_decode_stream_prepare ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 833 of file test_decoding.c.

References appendStringInfo(), ReorderBufferTXN::commit_time, ReorderBufferTXN::gid, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), quote_literal_cstr(), TestDecodingData::skip_empty_xacts, timestamptz_to_str(), TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

836 {
839 
840  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
841  return;
842 
843  OutputPluginPrepareWrite(ctx, true);
844 
845  if (data->include_xids)
846  appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
847  quote_literal_cstr(txn->gid), txn->xid);
848  else
849  appendStringInfo(ctx->out, "preparing streamed transaction %s",
850  quote_literal_cstr(txn->gid));
851 
852  if (data->include_timestamp)
853  appendStringInfo(ctx->out, " (at %s)",
855 
856  OutputPluginWrite(ctx, true);
857 }
TimestampTz commit_time
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1772

◆ pg_decode_stream_start()

static void pg_decode_stream_start ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 746 of file test_decoding.c.

References LogicalDecodingContext::context, MemoryContextAllocZero(), LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, pg_output_stream_start(), TestDecodingData::skip_empty_xacts, TestDecodingTxnData::stream_wrote_changes, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

748 {
751 
752  /*
753  * Allocate the txn plugin data for the first stream in the transaction.
754  */
755  if (txndata == NULL)
756  {
757  txndata =
759  txndata->xact_wrote_changes = false;
760  txn->output_plugin_private = txndata;
761  }
762 
763  txndata->stream_wrote_changes = false;
764  if (data->skip_empty_xacts)
765  return;
766  pg_output_stream_start(ctx, data, txn, true);
767 }
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: logical.h:35
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
void * output_plugin_private
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

◆ pg_decode_stream_stop()

static void pg_decode_stream_stop ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 781 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), TestDecodingData::skip_empty_xacts, TestDecodingTxnData::stream_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

783 {
786 
787  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
788  return;
789 
790  OutputPluginPrepareWrite(ctx, true);
791  if (data->include_xids)
792  appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
793  else
794  appendStringInfoString(ctx->out, "closing a streamed block for transaction");
795  OutputPluginWrite(ctx, true);
796 }
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private

◆ pg_decode_stream_truncate()

static void pg_decode_stream_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 949 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_stream_start(), TestDecodingData::skip_empty_xacts, TestDecodingTxnData::stream_wrote_changes, TestDecodingTxnData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

952 {
955 
956  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
957  {
958  pg_output_stream_start(ctx, data, txn, false);
959  }
960  txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
961 
962  OutputPluginPrepareWrite(ctx, true);
963  if (data->include_xids)
964  appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
965  else
966  appendStringInfoString(ctx->out, "streaming truncate for transaction");
967  OutputPluginWrite(ctx, true);
968 }
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
void * output_plugin_private
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

◆ pg_decode_truncate()

static void pg_decode_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 679 of file test_decoding.c.

References appendStringInfoString(), TestDecodingData::context, ReorderBufferChange::data, get_namespace_name(), i, MemoryContextReset(), MemoryContextSwitchTo(), NameStr, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), TestDecodingData::skip_empty_xacts, ReorderBufferChange::truncate, and TestDecodingTxnData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

681 {
682  TestDecodingData *data;
683  TestDecodingTxnData *txndata;
684  MemoryContext old;
685  int i;
686 
687  data = ctx->output_plugin_private;
688  txndata = txn->output_plugin_private;
689 
690  /* output BEGIN if we haven't yet */
691  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
692  {
693  pg_output_begin(ctx, data, txn, false);
694  }
695  txndata->xact_wrote_changes = true;
696 
697  /* Avoid leaking memory by using and resetting our own context */
698  old = MemoryContextSwitchTo(data->context);
699 
700  OutputPluginPrepareWrite(ctx, true);
701 
702  appendStringInfoString(ctx->out, "table ");
703 
704  for (i = 0; i < nrelations; i++)
705  {
706  if (i > 0)
707  appendStringInfoString(ctx->out, ", ");
708 
710  quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
711  NameStr(relations[i]->rd_rel->relname)));
712  }
713 
714  appendStringInfoString(ctx->out, ": TRUNCATE:");
715 
716  if (change->data.truncate.restart_seqs
717  || change->data.truncate.cascade)
718  {
719  if (change->data.truncate.restart_seqs)
720  appendStringInfoString(ctx->out, " restart_seqs");
721  if (change->data.truncate.cascade)
722  appendStringInfoString(ctx->out, " cascade");
723  }
724  else
725  appendStringInfoString(ctx->out, " (no-flags)");
726 
729 
730  OutputPluginWrite(ctx, true);
731 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:75
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3316
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:11078
MemoryContext context
Definition: test_decoding.c:33
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
struct ReorderBufferChange::@96::@98 truncate
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70
int i
#define NameStr(name)
Definition: c.h:681
union ReorderBufferChange::@96 data
void * output_plugin_private

◆ pg_output_begin()

static void pg_output_begin ( LogicalDecodingContext ctx,
TestDecodingData data,
ReorderBufferTXN txn,
bool  last_write 
)
static

Definition at line 308 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), TestDecodingData::include_xids, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by pg_decode_begin_prepare_txn(), pg_decode_begin_txn(), pg_decode_change(), and pg_decode_truncate().

309 {
310  OutputPluginPrepareWrite(ctx, last_write);
311  if (data->include_xids)
312  appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
313  else
314  appendStringInfoString(ctx->out, "BEGIN");
315  OutputPluginWrite(ctx, last_write);
316 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70

◆ pg_output_stream_start()

static void pg_output_stream_start ( LogicalDecodingContext ctx,
TestDecodingData data,
ReorderBufferTXN txn,
bool  last_write 
)
static

Definition at line 770 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), TestDecodingData::include_xids, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by pg_decode_stream_change(), pg_decode_stream_start(), and pg_decode_stream_truncate().

771 {
772  OutputPluginPrepareWrite(ctx, last_write);
773  if (data->include_xids)
774  appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
775  else
776  appendStringInfoString(ctx->out, "opening a streamed block for transaction");
777  OutputPluginWrite(ctx, last_write);
778 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:629
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:642
StringInfo out
Definition: logical.h:70

◆ print_literal()

static void print_literal ( StringInfo  s,
Oid  typid,
char *  outputstr 
)
static

Definition at line 470 of file test_decoding.c.

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), and SQL_STR_DOUBLE.

Referenced by tuple_to_stringinfo().

471 {
472  const char *valptr;
473 
474  switch (typid)
475  {
476  case INT2OID:
477  case INT4OID:
478  case INT8OID:
479  case OIDOID:
480  case FLOAT4OID:
481  case FLOAT8OID:
482  case NUMERICOID:
483  /* NB: We don't care about Inf, NaN et al. */
484  appendStringInfoString(s, outputstr);
485  break;
486 
487  case BITOID:
488  case VARBITOID:
489  appendStringInfo(s, "B'%s'", outputstr);
490  break;
491 
492  case BOOLOID:
493  if (strcmp(outputstr, "t") == 0)
494  appendStringInfoString(s, "true");
495  else
496  appendStringInfoString(s, "false");
497  break;
498 
499  default:
500  appendStringInfoChar(s, '\'');
501  for (valptr = outputstr; *valptr; valptr++)
502  {
503  char ch = *valptr;
504 
505  if (SQL_STR_DOUBLE(ch, false))
506  appendStringInfoChar(s, ch);
507  appendStringInfoChar(s, ch);
508  }
509  appendStringInfoChar(s, '\'');
510  break;
511  }
512 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1164

◆ tuple_to_stringinfo()

static void tuple_to_stringinfo ( StringInfo  s,
TupleDesc  tupdesc,
HeapTuple  tuple,
bool  skip_nulls 
)
static

Definition at line 516 of file test_decoding.c.

References appendStringInfoChar(), appendStringInfoString(), format_type_be(), getTypeOutputInfo(), heap_getattr, NameStr, TupleDescData::natts, OidOutputFunctionCall(), PG_DETOAST_DATUM, PointerGetDatum, print_literal(), quote_identifier(), TupleDescAttr, val, and VARATT_IS_EXTERNAL_ONDISK.

Referenced by pg_decode_change().

517 {
518  int natt;
519 
520  /* print all columns individually */
521  for (natt = 0; natt < tupdesc->natts; natt++)
522  {
523  Form_pg_attribute attr; /* the attribute itself */
524  Oid typid; /* type of current attribute */
525  Oid typoutput; /* output function */
526  bool typisvarlena;
527  Datum origval; /* possibly toasted Datum */
528  bool isnull; /* column is null? */
529 
530  attr = TupleDescAttr(tupdesc, natt);
531 
532  /*
533  * don't print dropped columns, we can't be sure everything is
534  * available for them
535  */
536  if (attr->attisdropped)
537  continue;
538 
539  /*
540  * Don't print system columns, oid will already have been printed if
541  * present.
542  */
543  if (attr->attnum < 0)
544  continue;
545 
546  typid = attr->atttypid;
547 
548  /* get Datum from tuple */
549  origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
550 
551  if (isnull && skip_nulls)
552  continue;
553 
554  /* print attribute name */
555  appendStringInfoChar(s, ' ');
556  appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
557 
558  /* print attribute type */
559  appendStringInfoChar(s, '[');
561  appendStringInfoChar(s, ']');
562 
563  /* query output function */
564  getTypeOutputInfo(typid,
565  &typoutput, &typisvarlena);
566 
567  /* print separator */
568  appendStringInfoChar(s, ':');
569 
570  /* print data */
571  if (isnull)
572  appendStringInfoString(s, "null");
573  else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
574  appendStringInfoString(s, "unchanged-toast-datum");
575  else if (!typisvarlena)
576  print_literal(s, typid,
577  OidOutputFunctionCall(typoutput, origval));
578  else
579  {
580  Datum val; /* definitely detoasted Datum */
581 
582  val = PointerGetDatum(PG_DETOAST_DATUM(origval));
583  print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
584  }
585  }
586 }
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:314
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2854
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10994
#define PointerGetDatum(X)
Definition: postgres.h:556
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
unsigned int Oid
Definition: postgres_ext.h:31
static void print_literal(StringInfo s, Oid typid, char *outputstr)
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:197
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
#define heap_getattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:762
uintptr_t Datum
Definition: postgres.h:367
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1656
#define NameStr(name)
Definition: c.h:681
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
long val
Definition: informix.c:664

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 25 of file test_decoding.c.