PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
test_decoding.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * test_decoding.c
4  * example logical decoding output plugin
5  *
6  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/test_decoding/test_decoding.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/sysattr.h"
16 
17 #include "catalog/pg_class.h"
18 #include "catalog/pg_type.h"
19 
20 #include "nodes/parsenodes.h"
21 
23 #include "replication/logical.h"
24 #include "replication/message.h"
25 #include "replication/origin.h"
26 
27 #include "utils/builtins.h"
28 #include "utils/lsyscache.h"
29 #include "utils/memutils.h"
30 #include "utils/rel.h"
31 #include "utils/relcache.h"
32 #include "utils/syscache.h"
33 #include "utils/typcache.h"
34 
36 
37 /* These must be available to pg_dlsym() */
38 extern void _PG_init(void);
40 
41 typedef struct
42 {
48  bool only_local;
50 
52  bool is_init);
55  ReorderBufferTXN *txn);
57  TestDecodingData *data,
58  ReorderBufferTXN *txn,
59  bool last_write);
61  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
63  ReorderBufferTXN *txn, Relation rel,
64  ReorderBufferChange *change);
66  RepOriginId origin_id);
68  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
69  bool transactional, const char *prefix,
70  Size sz, const char *message);
71 
72 void
73 _PG_init(void)
74 {
75  /* other plugins can perform things here */
76 }
77 
78 /* specify output plugin callbacks */
79 void
81 {
83 
91 }
92 
93 
94 /* initialize this plugin */
95 static void
97  bool is_init)
98 {
100  TestDecodingData *data;
101 
102  data = palloc0(sizeof(TestDecodingData));
104  "text conversion context",
106  data->include_xids = true;
107  data->include_timestamp = false;
108  data->skip_empty_xacts = false;
109  data->only_local = false;
110 
111  ctx->output_plugin_private = data;
112 
114 
115  foreach(option, ctx->output_plugin_options)
116  {
117  DefElem *elem = lfirst(option);
118 
119  Assert(elem->arg == NULL || IsA(elem->arg, String));
120 
121  if (strcmp(elem->defname, "include-xids") == 0)
122  {
123  /* if option does not provide a value, it means its value is true */
124  if (elem->arg == NULL)
125  data->include_xids = true;
126  else if (!parse_bool(strVal(elem->arg), &data->include_xids))
127  ereport(ERROR,
128  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
129  errmsg("could not parse value \"%s\" for parameter \"%s\"",
130  strVal(elem->arg), elem->defname)));
131  }
132  else if (strcmp(elem->defname, "include-timestamp") == 0)
133  {
134  if (elem->arg == NULL)
135  data->include_timestamp = true;
136  else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
137  ereport(ERROR,
138  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
139  errmsg("could not parse value \"%s\" for parameter \"%s\"",
140  strVal(elem->arg), elem->defname)));
141  }
142  else if (strcmp(elem->defname, "force-binary") == 0)
143  {
144  bool force_binary;
145 
146  if (elem->arg == NULL)
147  continue;
148  else if (!parse_bool(strVal(elem->arg), &force_binary))
149  ereport(ERROR,
150  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
151  errmsg("could not parse value \"%s\" for parameter \"%s\"",
152  strVal(elem->arg), elem->defname)));
153 
154  if (force_binary)
156  }
157  else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
158  {
159 
160  if (elem->arg == NULL)
161  data->skip_empty_xacts = true;
162  else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
163  ereport(ERROR,
164  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
165  errmsg("could not parse value \"%s\" for parameter \"%s\"",
166  strVal(elem->arg), elem->defname)));
167  }
168  else if (strcmp(elem->defname, "only-local") == 0)
169  {
170 
171  if (elem->arg == NULL)
172  data->only_local = true;
173  else if (!parse_bool(strVal(elem->arg), &data->only_local))
174  ereport(ERROR,
175  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
176  errmsg("could not parse value \"%s\" for parameter \"%s\"",
177  strVal(elem->arg), elem->defname)));
178  }
179  else
180  {
181  ereport(ERROR,
182  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
183  errmsg("option \"%s\" = \"%s\" is unknown",
184  elem->defname,
185  elem->arg ? strVal(elem->arg) : "(null)")));
186  }
187  }
188 }
189 
190 /* cleanup this plugin's resources */
191 static void
193 {
195 
196  /* cleanup our own resources via memory context reset */
198 }
199 
200 /* BEGIN callback */
201 static void
203 {
205 
206  data->xact_wrote_changes = false;
207  if (data->skip_empty_xacts)
208  return;
209 
210  pg_output_begin(ctx, data, txn, true);
211 }
212 
213 static void
215 {
216  OutputPluginPrepareWrite(ctx, last_write);
217  if (data->include_xids)
218  appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
219  else
220  appendStringInfoString(ctx->out, "BEGIN");
221  OutputPluginWrite(ctx, last_write);
222 }
223 
224 /* COMMIT callback */
225 static void
227  XLogRecPtr commit_lsn)
228 {
230 
231  if (data->skip_empty_xacts && !data->xact_wrote_changes)
232  return;
233 
234  OutputPluginPrepareWrite(ctx, true);
235  if (data->include_xids)
236  appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
237  else
238  appendStringInfoString(ctx->out, "COMMIT");
239 
240  if (data->include_timestamp)
241  appendStringInfo(ctx->out, " (at %s)",
243 
244  OutputPluginWrite(ctx, true);
245 }
246 
247 static bool
249  RepOriginId origin_id)
250 {
252 
253  if (data->only_local && origin_id != InvalidRepOriginId)
254  return true;
255  return false;
256 }
257 
258 /*
259  * Print literal `outputstr' already represented as string of type `typid'
260  * into stringbuf `s'.
261  *
262  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
263  * if standard_conforming_strings were enabled.
264  */
265 static void
266 print_literal(StringInfo s, Oid typid, char *outputstr)
267 {
268  const char *valptr;
269 
270  switch (typid)
271  {
272  case INT2OID:
273  case INT4OID:
274  case INT8OID:
275  case OIDOID:
276  case FLOAT4OID:
277  case FLOAT8OID:
278  case NUMERICOID:
279  /* NB: We don't care about Inf, NaN et al. */
280  appendStringInfoString(s, outputstr);
281  break;
282 
283  case BITOID:
284  case VARBITOID:
285  appendStringInfo(s, "B'%s'", outputstr);
286  break;
287 
288  case BOOLOID:
289  if (strcmp(outputstr, "t") == 0)
290  appendStringInfoString(s, "true");
291  else
292  appendStringInfoString(s, "false");
293  break;
294 
295  default:
296  appendStringInfoChar(s, '\'');
297  for (valptr = outputstr; *valptr; valptr++)
298  {
299  char ch = *valptr;
300 
301  if (SQL_STR_DOUBLE(ch, false))
302  appendStringInfoChar(s, ch);
303  appendStringInfoChar(s, ch);
304  }
305  appendStringInfoChar(s, '\'');
306  break;
307  }
308 }
309 
310 /* print the tuple 'tuple' into the StringInfo s */
311 static void
312 tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
313 {
314  int natt;
315  Oid oid;
316 
317  /* print oid of tuple, it's not included in the TupleDesc */
318  if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid)
319  {
320  appendStringInfo(s, " oid[oid]:%u", oid);
321  }
322 
323  /* print all columns individually */
324  for (natt = 0; natt < tupdesc->natts; natt++)
325  {
326  Form_pg_attribute attr; /* the attribute itself */
327  Oid typid; /* type of current attribute */
328  Oid typoutput; /* output function */
329  bool typisvarlena;
330  Datum origval; /* possibly toasted Datum */
331  bool isnull; /* column is null? */
332 
333  attr = tupdesc->attrs[natt];
334 
335  /*
336  * don't print dropped columns, we can't be sure everything is
337  * available for them
338  */
339  if (attr->attisdropped)
340  continue;
341 
342  /*
343  * Don't print system columns, oid will already have been printed if
344  * present.
345  */
346  if (attr->attnum < 0)
347  continue;
348 
349  typid = attr->atttypid;
350 
351  /* get Datum from tuple */
352  origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
353 
354  if (isnull && skip_nulls)
355  continue;
356 
357  /* print attribute name */
358  appendStringInfoChar(s, ' ');
359  appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
360 
361  /* print attribute type */
362  appendStringInfoChar(s, '[');
364  appendStringInfoChar(s, ']');
365 
366  /* query output function */
367  getTypeOutputInfo(typid,
368  &typoutput, &typisvarlena);
369 
370  /* print separator */
371  appendStringInfoChar(s, ':');
372 
373  /* print data */
374  if (isnull)
375  appendStringInfoString(s, "null");
376  else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
377  appendStringInfoString(s, "unchanged-toast-datum");
378  else if (!typisvarlena)
379  print_literal(s, typid,
380  OidOutputFunctionCall(typoutput, origval));
381  else
382  {
383  Datum val; /* definitely detoasted Datum */
384 
385  val = PointerGetDatum(PG_DETOAST_DATUM(origval));
386  print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
387  }
388  }
389 }
390 
391 /*
392  * callback for individual changed tuples
393  */
394 static void
396  Relation relation, ReorderBufferChange *change)
397 {
398  TestDecodingData *data;
399  Form_pg_class class_form;
400  TupleDesc tupdesc;
401  MemoryContext old;
402 
403  data = ctx->output_plugin_private;
404 
405  /* output BEGIN if we haven't yet */
406  if (data->skip_empty_xacts && !data->xact_wrote_changes)
407  {
408  pg_output_begin(ctx, data, txn, false);
409  }
410  data->xact_wrote_changes = true;
411 
412  class_form = RelationGetForm(relation);
413  tupdesc = RelationGetDescr(relation);
414 
415  /* Avoid leaking memory by using and resetting our own context */
416  old = MemoryContextSwitchTo(data->context);
417 
418  OutputPluginPrepareWrite(ctx, true);
419 
420  appendStringInfoString(ctx->out, "table ");
425  NameStr(class_form->relname)));
426  appendStringInfoChar(ctx->out, ':');
427 
428  switch (change->action)
429  {
431  appendStringInfoString(ctx->out, " INSERT:");
432  if (change->data.tp.newtuple == NULL)
433  appendStringInfoString(ctx->out, " (no-tuple-data)");
434  else
435  tuple_to_stringinfo(ctx->out, tupdesc,
436  &change->data.tp.newtuple->tuple,
437  false);
438  break;
440  appendStringInfoString(ctx->out, " UPDATE:");
441  if (change->data.tp.oldtuple != NULL)
442  {
443  appendStringInfoString(ctx->out, " old-key:");
444  tuple_to_stringinfo(ctx->out, tupdesc,
445  &change->data.tp.oldtuple->tuple,
446  true);
447  appendStringInfoString(ctx->out, " new-tuple:");
448  }
449 
450  if (change->data.tp.newtuple == NULL)
451  appendStringInfoString(ctx->out, " (no-tuple-data)");
452  else
453  tuple_to_stringinfo(ctx->out, tupdesc,
454  &change->data.tp.newtuple->tuple,
455  false);
456  break;
458  appendStringInfoString(ctx->out, " DELETE:");
459 
460  /* if there was no PK, we only know that a delete happened */
461  if (change->data.tp.oldtuple == NULL)
462  appendStringInfoString(ctx->out, " (no-tuple-data)");
463  /* In DELETE, only the replica identity is present; display that */
464  else
465  tuple_to_stringinfo(ctx->out, tupdesc,
466  &change->data.tp.oldtuple->tuple,
467  true);
468  break;
469  default:
470  Assert(false);
471  }
472 
475 
476  OutputPluginWrite(ctx, true);
477 }
478 
479 static void
481  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
482  const char *prefix, Size sz, const char *message)
483 {
484  OutputPluginPrepareWrite(ctx, true);
485  appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
486  transactional, prefix, sz);
487  appendBinaryStringInfo(ctx->out, message, sz);
488  OutputPluginWrite(ctx, true);
489 }
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
TimestampTz commit_time
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:315
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:35
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2632
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10404
#define RelationGetDescr(relation)
Definition: rel.h:428
#define OIDOID
Definition: pg_type.h:328
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
#define PointerGetDatum(X)
Definition: postgres.h:562
#define NUMERICOID
Definition: pg_type.h:554
#define RelationGetForm(relation)
Definition: rel.h:410
PG_MODULE_MAGIC
Definition: test_decoding.c:35
LogicalDecodeMessageCB message_cb
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1750
union ReorderBufferChange::@93 data
Form_pg_attribute * attrs
Definition: tupdesc.h:74
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define INT4OID
Definition: pg_type.h:316
uint16 RepOriginId
Definition: xlogdefs.h:51
#define VARBITOID
Definition: pg_type.h:546
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
void * output_plugin_private
Definition: logical.h:71
char * format_type_be(Oid type_oid)
Definition: format_type.c:94
MemoryContext context
Definition: logical.h:38
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
List * output_plugin_options
Definition: logical.h:54
bool parse_bool(const char *value, bool *result)
Definition: bool.c:30
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
int natts
Definition: tupdesc.h:73
HeapTupleHeader t_data
Definition: htup.h:67
OutputPluginOutputType output_type
Definition: output_plugin.h:28
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
static void print_literal(StringInfo s, Oid typid, char *outputstr)
#define ERROR
Definition: elog.h:43
LogicalDecodeCommitCB commit_cb
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define INT2OID
Definition: pg_type.h:308
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:157
struct ReorderBufferChange::@93::@94 tp
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3033
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
#define ereport(elevel, rest)
Definition: elog.h:122
Node * arg
Definition: parsenodes.h:720
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:169
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:10490
MemoryContext context
Definition: test_decoding.c:43
#define heap_getattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:769
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
#define FLOAT4OID
Definition: pg_type.h:416
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void * palloc0(Size size)
Definition: mcxt.c:878
LogicalDecodeChangeCB change_cb
Definition: output_plugin.h:99
uintptr_t Datum
Definition: postgres.h:372
TransactionId xid
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: test_decoding.c:96
#define InvalidOid
Definition: postgres_ext.h:36
#define INT8OID
Definition: pg_type.h:304
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:490
#define BITOID
Definition: pg_type.h:542
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
#define lfirst(lc)
Definition: pg_list.h:106
size_t Size
Definition: c.h:356
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
LogicalDecodeShutdownCB shutdown_cb
#define FLOAT8OID
Definition: pg_type.h:419
void _PG_init(void)
Definition: test_decoding.c:73
#define BOOLOID
Definition: pg_type.h:288
LogicalDecodeStartupCB startup_cb
Definition: output_plugin.h:97
#define InvalidRepOriginId
Definition: origin.h:34
FormData_pg_class * Form_pg_class
Definition: pg_class.h:95
#define HeapTupleHeaderGetOid(tup)
Definition: htup_details.h:465
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1747
int errmsg(const char *fmt,...)
Definition: elog.c:797
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:503
StringInfo out
Definition: logical.h:66
#define NameStr(name)
Definition: c.h:499
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:507
LogicalDecodeBeginCB begin_cb
Definition: output_plugin.h:98
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:205
char * defname
Definition: parsenodes.h:719
LogicalDecodeFilterByOriginCB filter_by_origin_cb
#define RelationGetRelid(relation)
Definition: rel.h:416
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:208
long val
Definition: informix.c:689
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: test_decoding.c:80
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1710
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:783