PostgreSQL Source Code  git master
test_decoding.c File Reference
#include "postgres.h"
#include "catalog/pg_type.h"
#include "replication/logical.h"
#include "replication/origin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for test_decoding.c:

Go to the source code of this file.

Data Structures

struct  TestDecodingData
 

Functions

void _PG_init (void)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void pg_decode_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pg_decode_shutdown (LogicalDecodingContext *ctx)
 
static void pg_decode_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_output_begin (LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 
static void pg_decode_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
 
static void pg_decode_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static bool pg_decode_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pg_decode_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static void pg_decode_stream_start (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_decode_stream_stop (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_decode_stream_abort (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pg_decode_stream_commit (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_stream_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pg_decode_stream_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static void pg_decode_stream_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static void print_literal (StringInfo s, Oid typid, char *outputstr)
 
static void tuple_to_stringinfo (StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
 

Variables

 PG_MODULE_MAGIC
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 89 of file test_decoding.c.

90 {
91  /* other plugins can perform things here */
92 }

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 96 of file test_decoding.c.

References AssertVariableIsOfType, OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::message_cb, pg_decode_begin_txn(), pg_decode_change(), pg_decode_commit_txn(), pg_decode_filter(), pg_decode_message(), pg_decode_shutdown(), pg_decode_startup(), pg_decode_stream_abort(), pg_decode_stream_change(), pg_decode_stream_commit(), pg_decode_stream_message(), pg_decode_stream_start(), pg_decode_stream_stop(), pg_decode_stream_truncate(), pg_decode_truncate(), OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

97 {
99 
115 }
LogicalDecodeTruncateCB truncate_cb
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamMessageCB stream_message_cb
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
LogicalDecodeStreamAbortCB stream_abort_cb
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
LogicalDecodeCommitCB commit_cb
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
LogicalDecodeChangeCB change_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeStartupCB startup_cb
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeStreamChangeCB stream_change_cb
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: test_decoding.c:96
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:904

◆ pg_decode_begin_txn()

static void pg_decode_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 251 of file test_decoding.c.

References LogicalDecodingContext::output_plugin_private, pg_output_begin(), TestDecodingData::skip_empty_xacts, and TestDecodingData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

252 {
254 
255  data->xact_wrote_changes = false;
256  if (data->skip_empty_xacts)
257  return;
258 
259  pg_output_begin(ctx, data, txn, true);
260 }
void * output_plugin_private
Definition: logical.h:75
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

◆ pg_decode_change()

static void pg_decode_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  rel,
ReorderBufferChange change 
)
static

Definition at line 437 of file test_decoding.c.

References ReorderBufferChange::action, appendStringInfoChar(), appendStringInfoString(), Assert, TestDecodingData::context, ReorderBufferChange::data, get_namespace_name(), get_rel_name(), get_rel_namespace(), MemoryContextReset(), MemoryContextSwitchTo(), NameStr, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), RelationGetDescr, RelationGetForm, RelationGetRelid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, TestDecodingData::skip_empty_xacts, ReorderBufferChange::tp, tuple_to_stringinfo(), and TestDecodingData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

439 {
440  TestDecodingData *data;
441  Form_pg_class class_form;
442  TupleDesc tupdesc;
443  MemoryContext old;
444 
445  data = ctx->output_plugin_private;
446 
447  /* output BEGIN if we haven't yet */
448  if (data->skip_empty_xacts && !data->xact_wrote_changes)
449  {
450  pg_output_begin(ctx, data, txn, false);
451  }
452  data->xact_wrote_changes = true;
453 
454  class_form = RelationGetForm(relation);
455  tupdesc = RelationGetDescr(relation);
456 
457  /* Avoid leaking memory by using and resetting our own context */
458  old = MemoryContextSwitchTo(data->context);
459 
460  OutputPluginPrepareWrite(ctx, true);
461 
462  appendStringInfoString(ctx->out, "table ");
465  class_form->relrewrite ?
466  get_rel_name(class_form->relrewrite) :
467  NameStr(class_form->relname)));
468  appendStringInfoChar(ctx->out, ':');
469 
470  switch (change->action)
471  {
473  appendStringInfoString(ctx->out, " INSERT:");
474  if (change->data.tp.newtuple == NULL)
475  appendStringInfoString(ctx->out, " (no-tuple-data)");
476  else
477  tuple_to_stringinfo(ctx->out, tupdesc,
478  &change->data.tp.newtuple->tuple,
479  false);
480  break;
482  appendStringInfoString(ctx->out, " UPDATE:");
483  if (change->data.tp.oldtuple != NULL)
484  {
485  appendStringInfoString(ctx->out, " old-key:");
486  tuple_to_stringinfo(ctx->out, tupdesc,
487  &change->data.tp.oldtuple->tuple,
488  true);
489  appendStringInfoString(ctx->out, " new-tuple:");
490  }
491 
492  if (change->data.tp.newtuple == NULL)
493  appendStringInfoString(ctx->out, " (no-tuple-data)");
494  else
495  tuple_to_stringinfo(ctx->out, tupdesc,
496  &change->data.tp.newtuple->tuple,
497  false);
498  break;
500  appendStringInfoString(ctx->out, " DELETE:");
501 
502  /* if there was no PK, we only know that a delete happened */
503  if (change->data.tp.oldtuple == NULL)
504  appendStringInfoString(ctx->out, " (no-tuple-data)");
505  /* In DELETE, only the replica identity is present; display that */
506  else
507  tuple_to_stringinfo(ctx->out, tupdesc,
508  &change->data.tp.oldtuple->tuple,
509  true);
510  break;
511  default:
512  Assert(false);
513  }
514 
517 
518  OutputPluginWrite(ctx, true);
519 }
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
#define RelationGetDescr(relation)
Definition: rel.h:482
#define RelationGetForm(relation)
Definition: rel.h:450
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1864
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:75
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3191
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:10811
MemoryContext context
Definition: test_decoding.c:33
union ReorderBufferChange::@99 data
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:583
struct ReorderBufferChange::@99::@100 tp
#define Assert(condition)
Definition: c.h:745
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
FormData_pg_class * Form_pg_class
Definition: pg_class.h:153
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:596
StringInfo out
Definition: logical.h:70
#define NameStr(name)
Definition: c.h:622
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1840
#define RelationGetRelid(relation)
Definition: rel.h:456

◆ pg_decode_commit_txn()

static void pg_decode_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 275 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), ReorderBufferTXN::commit_time, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), TestDecodingData::skip_empty_xacts, timestamptz_to_str(), TestDecodingData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

277 {
279 
280  if (data->skip_empty_xacts && !data->xact_wrote_changes)
281  return;
282 
283  OutputPluginPrepareWrite(ctx, true);
284  if (data->include_xids)
285  appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
286  else
287  appendStringInfoString(ctx->out, "COMMIT");
288 
289  if (data->include_timestamp)
290  appendStringInfo(ctx->out, " (at %s)",
292 
293  OutputPluginWrite(ctx, true);
294 }
TimestampTz commit_time
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:583
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:596
StringInfo out
Definition: logical.h:70
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1736

◆ pg_decode_filter()

static bool pg_decode_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 297 of file test_decoding.c.

References InvalidRepOriginId, TestDecodingData::only_local, and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

299 {
301 
302  if (data->only_local && origin_id != InvalidRepOriginId)
303  return true;
304  return false;
305 }
void * output_plugin_private
Definition: logical.h:75
#define InvalidRepOriginId
Definition: origin.h:33

◆ pg_decode_message()

static void pg_decode_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 575 of file test_decoding.c.

References appendBinaryStringInfo(), appendStringInfo(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

578 {
579  OutputPluginPrepareWrite(ctx, true);
580  appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
581  transactional, prefix, sz);
582  appendBinaryStringInfo(ctx->out, message, sz);
583  OutputPluginWrite(ctx, true);
584 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:583
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:596
StringInfo out
Definition: logical.h:70
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227

◆ pg_decode_shutdown()

static void pg_decode_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 241 of file test_decoding.c.

References TestDecodingData::context, MemoryContextDelete(), and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

242 {
244 
245  /* cleanup our own resources via memory context reset */
247 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: test_decoding.c:33

◆ pg_decode_startup()

static void pg_decode_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 120 of file test_decoding.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, DefElem::arg, Assert, TestDecodingData::context, LogicalDecodingContext::context, DefElem::defname, ereport, errcode(), errmsg(), ERROR, TestDecodingData::include_timestamp, TestDecodingData::include_xids, IsA, lfirst, TestDecodingData::only_local, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OUTPUT_PLUGIN_TEXTUAL_OUTPUT, OutputPluginOptions::output_type, palloc0(), parse_bool(), OutputPluginOptions::receive_rewrites, TestDecodingData::skip_empty_xacts, LogicalDecodingContext::streaming, and strVal.

Referenced by _PG_output_plugin_init().

122 {
123  ListCell *option;
124  TestDecodingData *data;
125  bool enable_streaming = false;
126 
127  data = palloc0(sizeof(TestDecodingData));
129  "text conversion context",
131  data->include_xids = true;
132  data->include_timestamp = false;
133  data->skip_empty_xacts = false;
134  data->only_local = false;
135 
136  ctx->output_plugin_private = data;
137 
139  opt->receive_rewrites = false;
140 
141  foreach(option, ctx->output_plugin_options)
142  {
143  DefElem *elem = lfirst(option);
144 
145  Assert(elem->arg == NULL || IsA(elem->arg, String));
146 
147  if (strcmp(elem->defname, "include-xids") == 0)
148  {
149  /* if option does not provide a value, it means its value is true */
150  if (elem->arg == NULL)
151  data->include_xids = true;
152  else if (!parse_bool(strVal(elem->arg), &data->include_xids))
153  ereport(ERROR,
154  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
155  errmsg("could not parse value \"%s\" for parameter \"%s\"",
156  strVal(elem->arg), elem->defname)));
157  }
158  else if (strcmp(elem->defname, "include-timestamp") == 0)
159  {
160  if (elem->arg == NULL)
161  data->include_timestamp = true;
162  else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
163  ereport(ERROR,
164  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
165  errmsg("could not parse value \"%s\" for parameter \"%s\"",
166  strVal(elem->arg), elem->defname)));
167  }
168  else if (strcmp(elem->defname, "force-binary") == 0)
169  {
170  bool force_binary;
171 
172  if (elem->arg == NULL)
173  continue;
174  else if (!parse_bool(strVal(elem->arg), &force_binary))
175  ereport(ERROR,
176  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
177  errmsg("could not parse value \"%s\" for parameter \"%s\"",
178  strVal(elem->arg), elem->defname)));
179 
180  if (force_binary)
182  }
183  else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
184  {
185 
186  if (elem->arg == NULL)
187  data->skip_empty_xacts = true;
188  else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
189  ereport(ERROR,
190  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
191  errmsg("could not parse value \"%s\" for parameter \"%s\"",
192  strVal(elem->arg), elem->defname)));
193  }
194  else if (strcmp(elem->defname, "only-local") == 0)
195  {
196 
197  if (elem->arg == NULL)
198  data->only_local = true;
199  else if (!parse_bool(strVal(elem->arg), &data->only_local))
200  ereport(ERROR,
201  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
202  errmsg("could not parse value \"%s\" for parameter \"%s\"",
203  strVal(elem->arg), elem->defname)));
204  }
205  else if (strcmp(elem->defname, "include-rewrites") == 0)
206  {
207 
208  if (elem->arg == NULL)
209  continue;
210  else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
211  ereport(ERROR,
212  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
213  errmsg("could not parse value \"%s\" for parameter \"%s\"",
214  strVal(elem->arg), elem->defname)));
215  }
216  else if (strcmp(elem->defname, "stream-changes") == 0)
217  {
218  if (elem->arg == NULL)
219  continue;
220  else if (!parse_bool(strVal(elem->arg), &enable_streaming))
221  ereport(ERROR,
222  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
223  errmsg("could not parse value \"%s\" for parameter \"%s\"",
224  strVal(elem->arg), elem->defname)));
225  }
226  else
227  {
228  ereport(ERROR,
229  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
230  errmsg("option \"%s\" = \"%s\" is unknown",
231  elem->defname,
232  elem->arg ? strVal(elem->arg) : "(null)")));
233  }
234  }
235 
236  ctx->streaming &= enable_streaming;
237 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:580
#define AllocSetContextCreate
Definition: memutils.h:170
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:610
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: logical.h:35
List * output_plugin_options
Definition: logical.h:58
bool parse_bool(const char *value, bool *result)
Definition: bool.c:30
OutputPluginOutputType output_type
Definition: output_plugin.h:28
#define ERROR
Definition: elog.h:43
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
Node * arg
Definition: parsenodes.h:733
MemoryContext context
Definition: test_decoding.c:33
void * palloc0(Size size)
Definition: mcxt.c:980
#define ereport(elevel,...)
Definition: elog.h:144
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:190
int errmsg(const char *fmt,...)
Definition: elog.c:824
char * defname
Definition: parsenodes.h:732

◆ pg_decode_stream_abort()

static void pg_decode_stream_abort ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 627 of file test_decoding.c.

References appendStringInfo(), TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

630 {
632 
633  OutputPluginPrepareWrite(ctx, true);
634  if (data->include_xids)
635  appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
636  else
637  appendStringInfo(ctx->out, "aborting streamed (sub)transaction");
638  OutputPluginWrite(ctx, true);
639 }
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:583
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:596
StringInfo out
Definition: logical.h:70

◆ pg_decode_stream_change()

static void pg_decode_stream_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 672 of file test_decoding.c.

References appendStringInfo(), TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

676 {
678 
679  OutputPluginPrepareWrite(ctx, true);
680  if (data->include_xids)
681  appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
682  else
683  appendStringInfo(ctx->out, "streaming change for transaction");
684  OutputPluginWrite(ctx, true);
685 }
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:583
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:596
StringInfo out
Definition: logical.h:70

◆ pg_decode_stream_commit()

static void pg_decode_stream_commit ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 646 of file test_decoding.c.

References appendStringInfo(), ReorderBufferTXN::commit_time, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), timestamptz_to_str(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

649 {
651 
652  OutputPluginPrepareWrite(ctx, true);
653 
654  if (data->include_xids)
655  appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
656  else
657  appendStringInfo(ctx->out, "committing streamed transaction");
658 
659  if (data->include_timestamp)
660  appendStringInfo(ctx->out, " (at %s)",
662 
663  OutputPluginWrite(ctx, true);
664 }
TimestampTz commit_time
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:583
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:596
StringInfo out
Definition: logical.h:70
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1736

◆ pg_decode_stream_message()

static void pg_decode_stream_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 693 of file test_decoding.c.

References appendBinaryStringInfo(), appendStringInfo(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

696 {
697  OutputPluginPrepareWrite(ctx, true);
698 
699  if (transactional)
700  {
701  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
702  transactional, prefix, sz);
703  }
704  else
705  {
706  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
707  transactional, prefix, sz);
708  appendBinaryStringInfo(ctx->out, message, sz);
709  }
710 
711  OutputPluginWrite(ctx, true);
712 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:583
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:596
StringInfo out
Definition: logical.h:70
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227

◆ pg_decode_stream_start()

static void pg_decode_stream_start ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 591 of file test_decoding.c.

References appendStringInfo(), TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

593 {
595 
596  OutputPluginPrepareWrite(ctx, true);
597  if (data->include_xids)
598  appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
599  else
600  appendStringInfo(ctx->out, "opening a streamed block for transaction");
601  OutputPluginWrite(ctx, true);
602 }
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:583
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:596
StringInfo out
Definition: logical.h:70

◆ pg_decode_stream_stop()

static void pg_decode_stream_stop ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 609 of file test_decoding.c.

References appendStringInfo(), TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

611 {
613 
614  OutputPluginPrepareWrite(ctx, true);
615  if (data->include_xids)
616  appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
617  else
618  appendStringInfo(ctx->out, "closing a streamed block for transaction");
619  OutputPluginWrite(ctx, true);
620 }
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:583
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:596
StringInfo out
Definition: logical.h:70

◆ pg_decode_stream_truncate()

static void pg_decode_stream_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 719 of file test_decoding.c.

References appendStringInfo(), TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

722 {
724 
725  OutputPluginPrepareWrite(ctx, true);
726  if (data->include_xids)
727  appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
728  else
729  appendStringInfo(ctx->out, "streaming truncate for transaction");
730  OutputPluginWrite(ctx, true);
731 }
void * output_plugin_private
Definition: logical.h:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:583
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:596
StringInfo out
Definition: logical.h:70

◆ pg_decode_truncate()

static void pg_decode_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 522 of file test_decoding.c.

References appendStringInfoString(), TestDecodingData::context, ReorderBufferChange::data, get_namespace_name(), i, MemoryContextReset(), MemoryContextSwitchTo(), NameStr, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), TestDecodingData::skip_empty_xacts, ReorderBufferChange::truncate, and TestDecodingData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

524 {
525  TestDecodingData *data;
526  MemoryContext old;
527  int i;
528 
529  data = ctx->output_plugin_private;
530 
531  /* output BEGIN if we haven't yet */
532  if (data->skip_empty_xacts && !data->xact_wrote_changes)
533  {
534  pg_output_begin(ctx, data, txn, false);
535  }
536  data->xact_wrote_changes = true;
537 
538  /* Avoid leaking memory by using and resetting our own context */
539  old = MemoryContextSwitchTo(data->context);
540 
541  OutputPluginPrepareWrite(ctx, true);
542 
543  appendStringInfoString(ctx->out, "table ");
544 
545  for (i = 0; i < nrelations; i++)
546  {
547  if (i > 0)
548  appendStringInfoString(ctx->out, ", ");
549 
551  quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
552  NameStr(relations[i]->rd_rel->relname)));
553  }
554 
555  appendStringInfoString(ctx->out, ": TRUNCATE:");
556 
557  if (change->data.truncate.restart_seqs
558  || change->data.truncate.cascade)
559  {
560  if (change->data.truncate.restart_seqs)
561  appendStringInfoString(ctx->out, " restart_seqs");
562  if (change->data.truncate.cascade)
563  appendStringInfoString(ctx->out, " cascade");
564  }
565  else
566  appendStringInfoString(ctx->out, " (no-flags)");
567 
570 
571  OutputPluginWrite(ctx, true);
572 }
struct ReorderBufferChange::@99::@101 truncate
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:75
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3191
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:10811
MemoryContext context
Definition: test_decoding.c:33
union ReorderBufferChange::@99 data
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:583
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:596
StringInfo out
Definition: logical.h:70
int i
#define NameStr(name)
Definition: c.h:622

◆ pg_output_begin()

static void pg_output_begin ( LogicalDecodingContext ctx,
TestDecodingData data,
ReorderBufferTXN txn,
bool  last_write 
)
static

Definition at line 263 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), TestDecodingData::include_xids, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by pg_decode_begin_txn(), pg_decode_change(), and pg_decode_truncate().

264 {
265  OutputPluginPrepareWrite(ctx, last_write);
266  if (data->include_xids)
267  appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
268  else
269  appendStringInfoString(ctx->out, "BEGIN");
270  OutputPluginWrite(ctx, last_write);
271 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:583
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:596
StringInfo out
Definition: logical.h:70

◆ print_literal()

static void print_literal ( StringInfo  s,
Oid  typid,
char *  outputstr 
)
static

Definition at line 315 of file test_decoding.c.

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), and SQL_STR_DOUBLE.

Referenced by tuple_to_stringinfo().

316 {
317  const char *valptr;
318 
319  switch (typid)
320  {
321  case INT2OID:
322  case INT4OID:
323  case INT8OID:
324  case OIDOID:
325  case FLOAT4OID:
326  case FLOAT8OID:
327  case NUMERICOID:
328  /* NB: We don't care about Inf, NaN et al. */
329  appendStringInfoString(s, outputstr);
330  break;
331 
332  case BITOID:
333  case VARBITOID:
334  appendStringInfo(s, "B'%s'", outputstr);
335  break;
336 
337  case BOOLOID:
338  if (strcmp(outputstr, "t") == 0)
339  appendStringInfoString(s, "true");
340  else
341  appendStringInfoString(s, "false");
342  break;
343 
344  default:
345  appendStringInfoChar(s, '\'');
346  for (valptr = outputstr; *valptr; valptr++)
347  {
348  char ch = *valptr;
349 
350  if (SQL_STR_DOUBLE(ch, false))
351  appendStringInfoChar(s, ch);
352  appendStringInfoChar(s, ch);
353  }
354  appendStringInfoChar(s, '\'');
355  break;
356  }
357 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1105

◆ tuple_to_stringinfo()

static void tuple_to_stringinfo ( StringInfo  s,
TupleDesc  tupdesc,
HeapTuple  tuple,
bool  skip_nulls 
)
static

Definition at line 361 of file test_decoding.c.

References appendStringInfoChar(), appendStringInfoString(), format_type_be(), getTypeOutputInfo(), heap_getattr, NameStr, TupleDescData::natts, OidOutputFunctionCall(), PG_DETOAST_DATUM, PointerGetDatum, print_literal(), quote_identifier(), TupleDescAttr, val, and VARATT_IS_EXTERNAL_ONDISK.

Referenced by pg_decode_change().

362 {
363  int natt;
364 
365  /* print all columns individually */
366  for (natt = 0; natt < tupdesc->natts; natt++)
367  {
368  Form_pg_attribute attr; /* the attribute itself */
369  Oid typid; /* type of current attribute */
370  Oid typoutput; /* output function */
371  bool typisvarlena;
372  Datum origval; /* possibly toasted Datum */
373  bool isnull; /* column is null? */
374 
375  attr = TupleDescAttr(tupdesc, natt);
376 
377  /*
378  * don't print dropped columns, we can't be sure everything is
379  * available for them
380  */
381  if (attr->attisdropped)
382  continue;
383 
384  /*
385  * Don't print system columns, oid will already have been printed if
386  * present.
387  */
388  if (attr->attnum < 0)
389  continue;
390 
391  typid = attr->atttypid;
392 
393  /* get Datum from tuple */
394  origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
395 
396  if (isnull && skip_nulls)
397  continue;
398 
399  /* print attribute name */
400  appendStringInfoChar(s, ' ');
401  appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
402 
403  /* print attribute type */
404  appendStringInfoChar(s, '[');
406  appendStringInfoChar(s, ']');
407 
408  /* query output function */
409  getTypeOutputInfo(typid,
410  &typoutput, &typisvarlena);
411 
412  /* print separator */
413  appendStringInfoChar(s, ':');
414 
415  /* print data */
416  if (isnull)
417  appendStringInfoString(s, "null");
418  else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
419  appendStringInfoString(s, "unchanged-toast-datum");
420  else if (!typisvarlena)
421  print_literal(s, typid,
422  OidOutputFunctionCall(typoutput, origval));
423  else
424  {
425  Datum val; /* definitely detoasted Datum */
426 
427  val = PointerGetDatum(PG_DETOAST_DATUM(origval));
428  print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
429  }
430  }
431 }
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:314
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2784
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10727
#define PointerGetDatum(X)
Definition: postgres.h:556
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
unsigned int Oid
Definition: postgres_ext.h:31
static void print_literal(StringInfo s, Oid typid, char *outputstr)
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
#define heap_getattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:762
uintptr_t Datum
Definition: postgres.h:367
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1657
#define NameStr(name)
Definition: c.h:622
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
long val
Definition: informix.c:664

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 25 of file test_decoding.c.