PostgreSQL Source Code  git master
test_decoding.c File Reference
#include "postgres.h"
#include "catalog/pg_type.h"
#include "replication/logical.h"
#include "replication/origin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for test_decoding.c:

Go to the source code of this file.

Data Structures

struct  TestDecodingData
 

Functions

void _PG_init (void)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void pg_decode_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pg_decode_shutdown (LogicalDecodingContext *ctx)
 
static void pg_decode_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pg_output_begin (LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 
static void pg_decode_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pg_decode_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
 
static void pg_decode_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static bool pg_decode_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pg_decode_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static void print_literal (StringInfo s, Oid typid, char *outputstr)
 
static void tuple_to_stringinfo (StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
 

Variables

 PG_MODULE_MAGIC
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 67 of file test_decoding.c.

68 {
69  /* other plugins can perform things here */
70 }

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 74 of file test_decoding.c.

References AssertVariableIsOfType, OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::message_cb, pg_decode_begin_txn(), pg_decode_change(), pg_decode_commit_txn(), pg_decode_filter(), pg_decode_message(), pg_decode_shutdown(), pg_decode_startup(), pg_decode_truncate(), OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, and OutputPluginCallbacks::truncate_cb.

75 {
77 
86 }
LogicalDecodeTruncateCB truncate_cb
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
LogicalDecodeMessageCB message_cb
LogicalDecodeCommitCB commit_cb
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
LogicalDecodeChangeCB change_cb
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: test_decoding.c:91
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: test_decoding.c:74
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:834

◆ pg_decode_begin_txn()

static void pg_decode_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 209 of file test_decoding.c.

References LogicalDecodingContext::output_plugin_private, pg_output_begin(), TestDecodingData::skip_empty_xacts, and TestDecodingData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

210 {
212 
213  data->xact_wrote_changes = false;
214  if (data->skip_empty_xacts)
215  return;
216 
217  pg_output_begin(ctx, data, txn, true);
218 }
void * output_plugin_private
Definition: logical.h:78
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)

◆ pg_decode_change()

static void pg_decode_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  rel,
ReorderBufferChange change 
)
static

Definition at line 402 of file test_decoding.c.

References ReorderBufferChange::action, appendStringInfoChar(), appendStringInfoString(), Assert, TestDecodingData::context, ReorderBufferChange::data, get_namespace_name(), get_rel_name(), get_rel_namespace(), MemoryContextReset(), MemoryContextSwitchTo(), NameStr, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), RelationGetDescr, RelationGetForm, RelationGetRelid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, TestDecodingData::skip_empty_xacts, ReorderBufferChange::tp, tuple_to_stringinfo(), and TestDecodingData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

404 {
405  TestDecodingData *data;
406  Form_pg_class class_form;
407  TupleDesc tupdesc;
408  MemoryContext old;
409 
410  data = ctx->output_plugin_private;
411 
412  /* output BEGIN if we haven't yet */
413  if (data->skip_empty_xacts && !data->xact_wrote_changes)
414  {
415  pg_output_begin(ctx, data, txn, false);
416  }
417  data->xact_wrote_changes = true;
418 
419  class_form = RelationGetForm(relation);
420  tupdesc = RelationGetDescr(relation);
421 
422  /* Avoid leaking memory by using and resetting our own context */
423  old = MemoryContextSwitchTo(data->context);
424 
425  OutputPluginPrepareWrite(ctx, true);
426 
427  appendStringInfoString(ctx->out, "table ");
432  class_form->relrewrite ?
433  get_rel_name(class_form->relrewrite) :
434  NameStr(class_form->relname)));
435  appendStringInfoChar(ctx->out, ':');
436 
437  switch (change->action)
438  {
440  appendStringInfoString(ctx->out, " INSERT:");
441  if (change->data.tp.newtuple == NULL)
442  appendStringInfoString(ctx->out, " (no-tuple-data)");
443  else
444  tuple_to_stringinfo(ctx->out, tupdesc,
445  &change->data.tp.newtuple->tuple,
446  false);
447  break;
449  appendStringInfoString(ctx->out, " UPDATE:");
450  if (change->data.tp.oldtuple != NULL)
451  {
452  appendStringInfoString(ctx->out, " old-key:");
453  tuple_to_stringinfo(ctx->out, tupdesc,
454  &change->data.tp.oldtuple->tuple,
455  true);
456  appendStringInfoString(ctx->out, " new-tuple:");
457  }
458 
459  if (change->data.tp.newtuple == NULL)
460  appendStringInfoString(ctx->out, " (no-tuple-data)");
461  else
462  tuple_to_stringinfo(ctx->out, tupdesc,
463  &change->data.tp.newtuple->tuple,
464  false);
465  break;
467  appendStringInfoString(ctx->out, " DELETE:");
468 
469  /* if there was no PK, we only know that a delete happened */
470  if (change->data.tp.oldtuple == NULL)
471  appendStringInfoString(ctx->out, " (no-tuple-data)");
472  /* In DELETE, only the replica identity is present; display that */
473  else
474  tuple_to_stringinfo(ctx->out, tupdesc,
475  &change->data.tp.oldtuple->tuple,
476  true);
477  break;
478  default:
479  Assert(false);
480  }
481 
484 
485  OutputPluginWrite(ctx, true);
486 }
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
#define RelationGetDescr(relation)
Definition: rel.h:433
#define RelationGetForm(relation)
Definition: rel.h:401
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1754
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:78
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
struct ReorderBufferChange::@102::@103 tp
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:157
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3051
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:169
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:10574
MemoryContext context
Definition: test_decoding.c:33
union ReorderBufferChange::@102 data
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:503
#define Assert(condition)
Definition: c.h:699
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
FormData_pg_class * Form_pg_class
Definition: pg_class.h:93
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:516
StringInfo out
Definition: logical.h:73
#define NameStr(name)
Definition: c.h:576
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1730
#define RelationGetRelid(relation)
Definition: rel.h:407

◆ pg_decode_commit_txn()

static void pg_decode_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 233 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), ReorderBufferTXN::commit_time, TestDecodingData::include_timestamp, TestDecodingData::include_xids, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), TestDecodingData::skip_empty_xacts, timestamptz_to_str(), TestDecodingData::xact_wrote_changes, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

235 {
237 
238  if (data->skip_empty_xacts && !data->xact_wrote_changes)
239  return;
240 
241  OutputPluginPrepareWrite(ctx, true);
242  if (data->include_xids)
243  appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
244  else
245  appendStringInfoString(ctx->out, "COMMIT");
246 
247  if (data->include_timestamp)
248  appendStringInfo(ctx->out, " (at %s)",
250 
251  OutputPluginWrite(ctx, true);
252 }
TimestampTz commit_time
void * output_plugin_private
Definition: logical.h:78
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:157
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:503
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:516
StringInfo out
Definition: logical.h:73
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1710

◆ pg_decode_filter()

static bool pg_decode_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 255 of file test_decoding.c.

References InvalidRepOriginId, TestDecodingData::only_local, and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

257 {
259 
260  if (data->only_local && origin_id != InvalidRepOriginId)
261  return true;
262  return false;
263 }
void * output_plugin_private
Definition: logical.h:78
#define InvalidRepOriginId
Definition: origin.h:34

◆ pg_decode_message()

static void pg_decode_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 542 of file test_decoding.c.

References appendBinaryStringInfo(), appendStringInfo(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

545 {
546  OutputPluginPrepareWrite(ctx, true);
547  appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
548  transactional, prefix, sz);
549  appendBinaryStringInfo(ctx->out, message, sz);
550  OutputPluginWrite(ctx, true);
551 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:503
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:516
StringInfo out
Definition: logical.h:73
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:208

◆ pg_decode_shutdown()

static void pg_decode_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 199 of file test_decoding.c.

References TestDecodingData::context, MemoryContextDelete(), and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

200 {
202 
203  /* cleanup our own resources via memory context reset */
205 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
void * output_plugin_private
Definition: logical.h:78
MemoryContext context
Definition: test_decoding.c:33

◆ pg_decode_startup()

static void pg_decode_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 91 of file test_decoding.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, DefElem::arg, Assert, TestDecodingData::context, LogicalDecodingContext::context, DefElem::defname, ereport, errcode(), errmsg(), ERROR, TestDecodingData::include_timestamp, TestDecodingData::include_xids, IsA, lfirst, TestDecodingData::only_local, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OUTPUT_PLUGIN_TEXTUAL_OUTPUT, OutputPluginOptions::output_type, palloc0(), parse_bool(), OutputPluginOptions::receive_rewrites, TestDecodingData::skip_empty_xacts, and strVal.

Referenced by _PG_output_plugin_init().

93 {
95  TestDecodingData *data;
96 
97  data = palloc0(sizeof(TestDecodingData));
99  "text conversion context",
101  data->include_xids = true;
102  data->include_timestamp = false;
103  data->skip_empty_xacts = false;
104  data->only_local = false;
105 
106  ctx->output_plugin_private = data;
107 
109  opt->receive_rewrites = false;
110 
111  foreach(option, ctx->output_plugin_options)
112  {
113  DefElem *elem = lfirst(option);
114 
115  Assert(elem->arg == NULL || IsA(elem->arg, String));
116 
117  if (strcmp(elem->defname, "include-xids") == 0)
118  {
119  /* if option does not provide a value, it means its value is true */
120  if (elem->arg == NULL)
121  data->include_xids = true;
122  else if (!parse_bool(strVal(elem->arg), &data->include_xids))
123  ereport(ERROR,
124  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
125  errmsg("could not parse value \"%s\" for parameter \"%s\"",
126  strVal(elem->arg), elem->defname)));
127  }
128  else if (strcmp(elem->defname, "include-timestamp") == 0)
129  {
130  if (elem->arg == NULL)
131  data->include_timestamp = true;
132  else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
133  ereport(ERROR,
134  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
135  errmsg("could not parse value \"%s\" for parameter \"%s\"",
136  strVal(elem->arg), elem->defname)));
137  }
138  else if (strcmp(elem->defname, "force-binary") == 0)
139  {
140  bool force_binary;
141 
142  if (elem->arg == NULL)
143  continue;
144  else if (!parse_bool(strVal(elem->arg), &force_binary))
145  ereport(ERROR,
146  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
147  errmsg("could not parse value \"%s\" for parameter \"%s\"",
148  strVal(elem->arg), elem->defname)));
149 
150  if (force_binary)
152  }
153  else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
154  {
155 
156  if (elem->arg == NULL)
157  data->skip_empty_xacts = true;
158  else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
159  ereport(ERROR,
160  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
161  errmsg("could not parse value \"%s\" for parameter \"%s\"",
162  strVal(elem->arg), elem->defname)));
163  }
164  else if (strcmp(elem->defname, "only-local") == 0)
165  {
166 
167  if (elem->arg == NULL)
168  data->only_local = true;
169  else if (!parse_bool(strVal(elem->arg), &data->only_local))
170  ereport(ERROR,
171  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
172  errmsg("could not parse value \"%s\" for parameter \"%s\"",
173  strVal(elem->arg), elem->defname)));
174  }
175  else if (strcmp(elem->defname, "include-rewrites") == 0)
176  {
177 
178  if (elem->arg == NULL)
179  continue;
180  else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
181  ereport(ERROR,
182  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
183  errmsg("could not parse value \"%s\" for parameter \"%s\"",
184  strVal(elem->arg), elem->defname)));
185  }
186  else
187  {
188  ereport(ERROR,
189  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
190  errmsg("option \"%s\" = \"%s\" is unknown",
191  elem->defname,
192  elem->arg ? strVal(elem->arg) : "(null)")));
193  }
194  }
195 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:568
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
void * output_plugin_private
Definition: logical.h:78
MemoryContext context
Definition: logical.h:38
List * output_plugin_options
Definition: logical.h:61
bool parse_bool(const char *value, bool *result)
Definition: bool.c:30
OutputPluginOutputType output_type
Definition: output_plugin.h:28
#define ERROR
Definition: elog.h:43
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define ereport(elevel, rest)
Definition: elog.h:122
Node * arg
Definition: parsenodes.h:731
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:170
MemoryContext context
Definition: test_decoding.c:33
void * palloc0(Size size)
Definition: mcxt.c:955
#define Assert(condition)
Definition: c.h:699
#define lfirst(lc)
Definition: pg_list.h:106
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * defname
Definition: parsenodes.h:730

◆ pg_decode_truncate()

static void pg_decode_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 489 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), TestDecodingData::context, ReorderBufferChange::data, get_namespace_name(), i, MemoryContextReset(), MemoryContextSwitchTo(), NameStr, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pg_output_begin(), quote_qualified_identifier(), TestDecodingData::skip_empty_xacts, ReorderBufferChange::truncate, and TestDecodingData::xact_wrote_changes.

Referenced by _PG_output_plugin_init().

491 {
492  TestDecodingData *data;
493  MemoryContext old;
494  int i;
495 
496  data = ctx->output_plugin_private;
497 
498  /* output BEGIN if we haven't yet */
499  if (data->skip_empty_xacts && !data->xact_wrote_changes)
500  {
501  pg_output_begin(ctx, data, txn, false);
502  }
503  data->xact_wrote_changes = true;
504 
505  /* Avoid leaking memory by using and resetting our own context */
506  old = MemoryContextSwitchTo(data->context);
507 
508  OutputPluginPrepareWrite(ctx, true);
509 
510  appendStringInfoString(ctx->out, "table ");
511 
512  for (i = 0; i < nrelations; i++)
513  {
514  if (i > 0)
515  appendStringInfoString(ctx->out, ", ");
516 
518  quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
519  NameStr(relations[i]->rd_rel->relname)));
520  }
521 
522  appendStringInfoString(ctx->out, ": TRUNCATE:");
523 
524  if (change->data.truncate.restart_seqs
525  || change->data.truncate.cascade)
526  {
527  if (change->data.truncate.restart_seqs)
528  appendStringInfo(ctx->out, " restart_seqs");
529  if (change->data.truncate.cascade)
530  appendStringInfo(ctx->out, " cascade");
531  }
532  else
533  appendStringInfoString(ctx->out, " (no-flags)");
534 
537 
538  OutputPluginWrite(ctx, true);
539 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:78
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:157
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3051
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:10574
MemoryContext context
Definition: test_decoding.c:33
union ReorderBufferChange::@102 data
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:503
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:516
StringInfo out
Definition: logical.h:73
int i
#define NameStr(name)
Definition: c.h:576
struct ReorderBufferChange::@102::@104 truncate

◆ pg_output_begin()

static void pg_output_begin ( LogicalDecodingContext ctx,
TestDecodingData data,
ReorderBufferTXN txn,
bool  last_write 
)
static

Definition at line 221 of file test_decoding.c.

References appendStringInfo(), appendStringInfoString(), TestDecodingData::include_xids, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by pg_decode_begin_txn(), pg_decode_change(), and pg_decode_truncate().

222 {
223  OutputPluginPrepareWrite(ctx, last_write);
224  if (data->include_xids)
225  appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
226  else
227  appendStringInfoString(ctx->out, "BEGIN");
228  OutputPluginWrite(ctx, last_write);
229 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:157
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:503
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:516
StringInfo out
Definition: logical.h:73

◆ print_literal()

static void print_literal ( StringInfo  s,
Oid  typid,
char *  outputstr 
)
static

Definition at line 273 of file test_decoding.c.

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), and SQL_STR_DOUBLE.

Referenced by tuple_to_stringinfo().

274 {
275  const char *valptr;
276 
277  switch (typid)
278  {
279  case INT2OID:
280  case INT4OID:
281  case INT8OID:
282  case OIDOID:
283  case FLOAT4OID:
284  case FLOAT8OID:
285  case NUMERICOID:
286  /* NB: We don't care about Inf, NaN et al. */
287  appendStringInfoString(s, outputstr);
288  break;
289 
290  case BITOID:
291  case VARBITOID:
292  appendStringInfo(s, "B'%s'", outputstr);
293  break;
294 
295  case BOOLOID:
296  if (strcmp(outputstr, "t") == 0)
297  appendStringInfoString(s, "true");
298  else
299  appendStringInfoString(s, "false");
300  break;
301 
302  default:
303  appendStringInfoChar(s, '\'');
304  for (valptr = outputstr; *valptr; valptr++)
305  {
306  char ch = *valptr;
307 
308  if (SQL_STR_DOUBLE(ch, false))
309  appendStringInfoChar(s, ch);
310  appendStringInfoChar(s, ch);
311  }
312  appendStringInfoChar(s, '\'');
313  break;
314  }
315 }
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:157
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:169
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1002

◆ tuple_to_stringinfo()

static void tuple_to_stringinfo ( StringInfo  s,
TupleDesc  tupdesc,
HeapTuple  tuple,
bool  skip_nulls 
)
static

Definition at line 319 of file test_decoding.c.

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), format_type_be(), getTypeOutputInfo(), heap_getattr, HeapTupleHeaderGetOid, InvalidOid, NameStr, tupleDesc::natts, OidOutputFunctionCall(), PG_DETOAST_DATUM, PointerGetDatum, print_literal(), quote_identifier(), HeapTupleData::t_data, TupleDescAttr, val, and VARATT_IS_EXTERNAL_ONDISK.

Referenced by pg_decode_change().

320 {
321  int natt;
322  Oid oid;
323 
324  /* print oid of tuple, it's not included in the TupleDesc */
325  if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid)
326  {
327  appendStringInfo(s, " oid[oid]:%u", oid);
328  }
329 
330  /* print all columns individually */
331  for (natt = 0; natt < tupdesc->natts; natt++)
332  {
333  Form_pg_attribute attr; /* the attribute itself */
334  Oid typid; /* type of current attribute */
335  Oid typoutput; /* output function */
336  bool typisvarlena;
337  Datum origval; /* possibly toasted Datum */
338  bool isnull; /* column is null? */
339 
340  attr = TupleDescAttr(tupdesc, natt);
341 
342  /*
343  * don't print dropped columns, we can't be sure everything is
344  * available for them
345  */
346  if (attr->attisdropped)
347  continue;
348 
349  /*
350  * Don't print system columns, oid will already have been printed if
351  * present.
352  */
353  if (attr->attnum < 0)
354  continue;
355 
356  typid = attr->atttypid;
357 
358  /* get Datum from tuple */
359  origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
360 
361  if (isnull && skip_nulls)
362  continue;
363 
364  /* print attribute name */
365  appendStringInfoChar(s, ' ');
366  appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
367 
368  /* print attribute type */
369  appendStringInfoChar(s, '[');
371  appendStringInfoChar(s, ']');
372 
373  /* query output function */
374  getTypeOutputInfo(typid,
375  &typoutput, &typisvarlena);
376 
377  /* print separator */
378  appendStringInfoChar(s, ':');
379 
380  /* print data */
381  if (isnull)
382  appendStringInfoString(s, "null");
383  else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
384  appendStringInfoString(s, "unchanged-toast-datum");
385  else if (!typisvarlena)
386  print_literal(s, typid,
387  OidOutputFunctionCall(typoutput, origval));
388  else
389  {
390  Datum val; /* definitely detoasted Datum */
391 
392  val = PointerGetDatum(PG_DETOAST_DATUM(origval));
393  print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
394  }
395  }
396 }
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:314
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2650
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10488
#define PointerGetDatum(X)
Definition: postgres.h:541
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:93
char * format_type_be(Oid type_oid)
Definition: format_type.c:328
unsigned int Oid
Definition: postgres_ext.h:31
int natts
Definition: tupdesc.h:82
HeapTupleHeader t_data
Definition: htup.h:68
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
static void print_literal(StringInfo s, Oid typid, char *outputstr)
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:157
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:197
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:169
#define heap_getattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:781
uintptr_t Datum
Definition: postgres.h:367
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleHeaderGetOid(tup)
Definition: htup_details.h:477
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1833
#define NameStr(name)
Definition: c.h:576
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:210
long val
Definition: informix.c:689

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 25 of file test_decoding.c.