PostgreSQL Source Code  git master
test_decoding.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * test_decoding.c
4  * example logical decoding output plugin
5  *
6  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/test_decoding/test_decoding.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "catalog/pg_type.h"
16 
17 #include "replication/logical.h"
18 #include "replication/origin.h"
19 
20 #include "utils/builtins.h"
21 #include "utils/lsyscache.h"
22 #include "utils/memutils.h"
23 #include "utils/rel.h"
24 
26 
27 /* These must be available to dlsym() */
28 extern void _PG_init(void);
30 
31 typedef struct
32 {
38  bool only_local;
40 
42  bool is_init);
45  ReorderBufferTXN *txn);
47  TestDecodingData *data,
48  ReorderBufferTXN *txn,
49  bool last_write);
51  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
53  ReorderBufferTXN *txn, Relation rel,
54  ReorderBufferChange *change);
56  ReorderBufferTXN *txn,
57  int nrelations, Relation relations[],
58  ReorderBufferChange *change);
60  RepOriginId origin_id);
62  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
63  bool transactional, const char *prefix,
64  Size sz, const char *message);
65 
66 void
67 _PG_init(void)
68 {
69  /* other plugins can perform things here */
70 }
71 
72 /* specify output plugin callbacks */
73 void
75 {
77 
86 }
87 
88 
89 /* initialize this plugin */
90 static void
92  bool is_init)
93 {
95  TestDecodingData *data;
96 
97  data = palloc0(sizeof(TestDecodingData));
99  "text conversion context",
101  data->include_xids = true;
102  data->include_timestamp = false;
103  data->skip_empty_xacts = false;
104  data->only_local = false;
105 
106  ctx->output_plugin_private = data;
107 
109  opt->receive_rewrites = false;
110 
111  foreach(option, ctx->output_plugin_options)
112  {
113  DefElem *elem = lfirst(option);
114 
115  Assert(elem->arg == NULL || IsA(elem->arg, String));
116 
117  if (strcmp(elem->defname, "include-xids") == 0)
118  {
119  /* if option does not provide a value, it means its value is true */
120  if (elem->arg == NULL)
121  data->include_xids = true;
122  else if (!parse_bool(strVal(elem->arg), &data->include_xids))
123  ereport(ERROR,
124  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
125  errmsg("could not parse value \"%s\" for parameter \"%s\"",
126  strVal(elem->arg), elem->defname)));
127  }
128  else if (strcmp(elem->defname, "include-timestamp") == 0)
129  {
130  if (elem->arg == NULL)
131  data->include_timestamp = true;
132  else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
133  ereport(ERROR,
134  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
135  errmsg("could not parse value \"%s\" for parameter \"%s\"",
136  strVal(elem->arg), elem->defname)));
137  }
138  else if (strcmp(elem->defname, "force-binary") == 0)
139  {
140  bool force_binary;
141 
142  if (elem->arg == NULL)
143  continue;
144  else if (!parse_bool(strVal(elem->arg), &force_binary))
145  ereport(ERROR,
146  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
147  errmsg("could not parse value \"%s\" for parameter \"%s\"",
148  strVal(elem->arg), elem->defname)));
149 
150  if (force_binary)
152  }
153  else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
154  {
155 
156  if (elem->arg == NULL)
157  data->skip_empty_xacts = true;
158  else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
159  ereport(ERROR,
160  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
161  errmsg("could not parse value \"%s\" for parameter \"%s\"",
162  strVal(elem->arg), elem->defname)));
163  }
164  else if (strcmp(elem->defname, "only-local") == 0)
165  {
166 
167  if (elem->arg == NULL)
168  data->only_local = true;
169  else if (!parse_bool(strVal(elem->arg), &data->only_local))
170  ereport(ERROR,
171  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
172  errmsg("could not parse value \"%s\" for parameter \"%s\"",
173  strVal(elem->arg), elem->defname)));
174  }
175  else if (strcmp(elem->defname, "include-rewrites") == 0)
176  {
177 
178  if (elem->arg == NULL)
179  continue;
180  else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
181  ereport(ERROR,
182  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
183  errmsg("could not parse value \"%s\" for parameter \"%s\"",
184  strVal(elem->arg), elem->defname)));
185  }
186  else
187  {
188  ereport(ERROR,
189  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
190  errmsg("option \"%s\" = \"%s\" is unknown",
191  elem->defname,
192  elem->arg ? strVal(elem->arg) : "(null)")));
193  }
194  }
195 }
196 
197 /* cleanup this plugin's resources */
198 static void
200 {
202 
203  /* cleanup our own resources via memory context reset */
205 }
206 
207 /* BEGIN callback */
208 static void
210 {
212 
213  data->xact_wrote_changes = false;
214  if (data->skip_empty_xacts)
215  return;
216 
217  pg_output_begin(ctx, data, txn, true);
218 }
219 
220 static void
222 {
223  OutputPluginPrepareWrite(ctx, last_write);
224  if (data->include_xids)
225  appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
226  else
227  appendStringInfoString(ctx->out, "BEGIN");
228  OutputPluginWrite(ctx, last_write);
229 }
230 
231 /* COMMIT callback */
232 static void
234  XLogRecPtr commit_lsn)
235 {
237 
238  if (data->skip_empty_xacts && !data->xact_wrote_changes)
239  return;
240 
241  OutputPluginPrepareWrite(ctx, true);
242  if (data->include_xids)
243  appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
244  else
245  appendStringInfoString(ctx->out, "COMMIT");
246 
247  if (data->include_timestamp)
248  appendStringInfo(ctx->out, " (at %s)",
250 
251  OutputPluginWrite(ctx, true);
252 }
253 
254 static bool
256  RepOriginId origin_id)
257 {
259 
260  if (data->only_local && origin_id != InvalidRepOriginId)
261  return true;
262  return false;
263 }
264 
265 /*
266  * Print literal `outputstr' already represented as string of type `typid'
267  * into stringbuf `s'.
268  *
269  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
270  * if standard_conforming_strings were enabled.
271  */
272 static void
273 print_literal(StringInfo s, Oid typid, char *outputstr)
274 {
275  const char *valptr;
276 
277  switch (typid)
278  {
279  case INT2OID:
280  case INT4OID:
281  case INT8OID:
282  case OIDOID:
283  case FLOAT4OID:
284  case FLOAT8OID:
285  case NUMERICOID:
286  /* NB: We don't care about Inf, NaN et al. */
287  appendStringInfoString(s, outputstr);
288  break;
289 
290  case BITOID:
291  case VARBITOID:
292  appendStringInfo(s, "B'%s'", outputstr);
293  break;
294 
295  case BOOLOID:
296  if (strcmp(outputstr, "t") == 0)
297  appendStringInfoString(s, "true");
298  else
299  appendStringInfoString(s, "false");
300  break;
301 
302  default:
303  appendStringInfoChar(s, '\'');
304  for (valptr = outputstr; *valptr; valptr++)
305  {
306  char ch = *valptr;
307 
308  if (SQL_STR_DOUBLE(ch, false))
309  appendStringInfoChar(s, ch);
310  appendStringInfoChar(s, ch);
311  }
312  appendStringInfoChar(s, '\'');
313  break;
314  }
315 }
316 
317 /* print the tuple 'tuple' into the StringInfo s */
318 static void
319 tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
320 {
321  int natt;
322 
323  /* print all columns individually */
324  for (natt = 0; natt < tupdesc->natts; natt++)
325  {
326  Form_pg_attribute attr; /* the attribute itself */
327  Oid typid; /* type of current attribute */
328  Oid typoutput; /* output function */
329  bool typisvarlena;
330  Datum origval; /* possibly toasted Datum */
331  bool isnull; /* column is null? */
332 
333  attr = TupleDescAttr(tupdesc, natt);
334 
335  /*
336  * don't print dropped columns, we can't be sure everything is
337  * available for them
338  */
339  if (attr->attisdropped)
340  continue;
341 
342  /*
343  * Don't print system columns, oid will already have been printed if
344  * present.
345  */
346  if (attr->attnum < 0)
347  continue;
348 
349  typid = attr->atttypid;
350 
351  /* get Datum from tuple */
352  origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
353 
354  if (isnull && skip_nulls)
355  continue;
356 
357  /* print attribute name */
358  appendStringInfoChar(s, ' ');
359  appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
360 
361  /* print attribute type */
362  appendStringInfoChar(s, '[');
364  appendStringInfoChar(s, ']');
365 
366  /* query output function */
367  getTypeOutputInfo(typid,
368  &typoutput, &typisvarlena);
369 
370  /* print separator */
371  appendStringInfoChar(s, ':');
372 
373  /* print data */
374  if (isnull)
375  appendStringInfoString(s, "null");
376  else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
377  appendStringInfoString(s, "unchanged-toast-datum");
378  else if (!typisvarlena)
379  print_literal(s, typid,
380  OidOutputFunctionCall(typoutput, origval));
381  else
382  {
383  Datum val; /* definitely detoasted Datum */
384 
385  val = PointerGetDatum(PG_DETOAST_DATUM(origval));
386  print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
387  }
388  }
389 }
390 
391 /*
392  * callback for individual changed tuples
393  */
394 static void
396  Relation relation, ReorderBufferChange *change)
397 {
398  TestDecodingData *data;
399  Form_pg_class class_form;
400  TupleDesc tupdesc;
401  MemoryContext old;
402 
403  data = ctx->output_plugin_private;
404 
405  /* output BEGIN if we haven't yet */
406  if (data->skip_empty_xacts && !data->xact_wrote_changes)
407  {
408  pg_output_begin(ctx, data, txn, false);
409  }
410  data->xact_wrote_changes = true;
411 
412  class_form = RelationGetForm(relation);
413  tupdesc = RelationGetDescr(relation);
414 
415  /* Avoid leaking memory by using and resetting our own context */
416  old = MemoryContextSwitchTo(data->context);
417 
418  OutputPluginPrepareWrite(ctx, true);
419 
420  appendStringInfoString(ctx->out, "table ");
425  class_form->relrewrite ?
426  get_rel_name(class_form->relrewrite) :
427  NameStr(class_form->relname)));
428  appendStringInfoChar(ctx->out, ':');
429 
430  switch (change->action)
431  {
433  appendStringInfoString(ctx->out, " INSERT:");
434  if (change->data.tp.newtuple == NULL)
435  appendStringInfoString(ctx->out, " (no-tuple-data)");
436  else
437  tuple_to_stringinfo(ctx->out, tupdesc,
438  &change->data.tp.newtuple->tuple,
439  false);
440  break;
442  appendStringInfoString(ctx->out, " UPDATE:");
443  if (change->data.tp.oldtuple != NULL)
444  {
445  appendStringInfoString(ctx->out, " old-key:");
446  tuple_to_stringinfo(ctx->out, tupdesc,
447  &change->data.tp.oldtuple->tuple,
448  true);
449  appendStringInfoString(ctx->out, " new-tuple:");
450  }
451 
452  if (change->data.tp.newtuple == NULL)
453  appendStringInfoString(ctx->out, " (no-tuple-data)");
454  else
455  tuple_to_stringinfo(ctx->out, tupdesc,
456  &change->data.tp.newtuple->tuple,
457  false);
458  break;
460  appendStringInfoString(ctx->out, " DELETE:");
461 
462  /* if there was no PK, we only know that a delete happened */
463  if (change->data.tp.oldtuple == NULL)
464  appendStringInfoString(ctx->out, " (no-tuple-data)");
465  /* In DELETE, only the replica identity is present; display that */
466  else
467  tuple_to_stringinfo(ctx->out, tupdesc,
468  &change->data.tp.oldtuple->tuple,
469  true);
470  break;
471  default:
472  Assert(false);
473  }
474 
477 
478  OutputPluginWrite(ctx, true);
479 }
480 
481 static void
483  int nrelations, Relation relations[], ReorderBufferChange *change)
484 {
485  TestDecodingData *data;
486  MemoryContext old;
487  int i;
488 
489  data = ctx->output_plugin_private;
490 
491  /* output BEGIN if we haven't yet */
492  if (data->skip_empty_xacts && !data->xact_wrote_changes)
493  {
494  pg_output_begin(ctx, data, txn, false);
495  }
496  data->xact_wrote_changes = true;
497 
498  /* Avoid leaking memory by using and resetting our own context */
499  old = MemoryContextSwitchTo(data->context);
500 
501  OutputPluginPrepareWrite(ctx, true);
502 
503  appendStringInfoString(ctx->out, "table ");
504 
505  for (i = 0; i < nrelations; i++)
506  {
507  if (i > 0)
508  appendStringInfoString(ctx->out, ", ");
509 
511  quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
512  NameStr(relations[i]->rd_rel->relname)));
513  }
514 
515  appendStringInfoString(ctx->out, ": TRUNCATE:");
516 
517  if (change->data.truncate.restart_seqs
518  || change->data.truncate.cascade)
519  {
520  if (change->data.truncate.restart_seqs)
521  appendStringInfoString(ctx->out, " restart_seqs");
522  if (change->data.truncate.cascade)
523  appendStringInfoString(ctx->out, " cascade");
524  }
525  else
526  appendStringInfoString(ctx->out, " (no-flags)");
527 
530 
531  OutputPluginWrite(ctx, true);
532 }
533 
534 static void
536  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
537  const char *prefix, Size sz, const char *message)
538 {
539  OutputPluginPrepareWrite(ctx, true);
540  appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
541  transactional, prefix, sz);
542  appendBinaryStringInfo(ctx->out, message, sz);
543  OutputPluginWrite(ctx, true);
544 }
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
LogicalDecodeTruncateCB truncate_cb
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
TimestampTz commit_time
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:314
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2674
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10626
#define RelationGetDescr(relation)
Definition: rel.h:449
struct ReorderBufferChange::@101::@102 tp
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
#define PointerGetDatum(X)
Definition: postgres.h:556
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define RelationGetForm(relation)
Definition: rel.h:417
PG_MODULE_MAGIC
Definition: test_decoding.c:25
LogicalDecodeMessageCB message_cb
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1754
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
uint16 RepOriginId
Definition: xlogdefs.h:58
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:608
void * output_plugin_private
Definition: logical.h:76
char * format_type_be(Oid type_oid)
Definition: format_type.c:326
MemoryContext context
Definition: logical.h:36
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
List * output_plugin_options
Definition: logical.h:59
bool parse_bool(const char *value, bool *result)
Definition: bool.c:30
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
OutputPluginOutputType output_type
Definition: output_plugin.h:28
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
static void print_literal(StringInfo s, Oid typid, char *outputstr)
#define ERROR
Definition: elog.h:43
LogicalDecodeCommitCB commit_cb
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3094
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
#define ereport(elevel, rest)
Definition: elog.h:141
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Node * arg
Definition: parsenodes.h:731
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:10710
MemoryContext context
Definition: test_decoding.c:33
#define heap_getattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:762
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void * palloc0(Size size)
Definition: mcxt.c:980
LogicalDecodeChangeCB change_cb
uintptr_t Datum
Definition: postgres.h:367
TransactionId xid
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: test_decoding.c:91
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:523
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:733
#define lfirst(lc)
Definition: pg_list.h:190
union ReorderBufferChange::@101 data
size_t Size
Definition: c.h:467
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
LogicalDecodeShutdownCB shutdown_cb
void _PG_init(void)
Definition: test_decoding.c:67
LogicalDecodeStartupCB startup_cb
#define InvalidRepOriginId
Definition: origin.h:33
FormData_pg_class * Form_pg_class
Definition: pg_class.h:150
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1655
int errmsg(const char *fmt,...)
Definition: elog.c:822
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:536
StringInfo out
Definition: logical.h:71
int i
#define NameStr(name)
Definition: c.h:610
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1108
LogicalDecodeBeginCB begin_cb
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:235
char * defname
Definition: parsenodes.h:730
LogicalDecodeFilterByOriginCB filter_by_origin_cb
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1730
#define RelationGetRelid(relation)
Definition: rel.h:423
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
long val
Definition: informix.c:684
struct ReorderBufferChange::@101::@103 truncate
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: test_decoding.c:74
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1743
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:882