67 int nrelations,
Relation relations[],
73 bool transactional,
const char *prefix,
74 Size sz,
const char *message);
113 bool transactional,
const char *prefix,
114 Size sz,
const char *message);
117 int nrelations,
Relation relations[],
163 bool enable_streaming =
false;
167 "text conversion context",
169 data->include_xids =
true;
170 data->include_timestamp =
false;
171 data->skip_empty_xacts =
false;
172 data->only_local =
false;
185 if (strcmp(elem->
defname,
"include-xids") == 0)
188 if (elem->
arg == NULL)
189 data->include_xids =
true;
192 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
193 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
196 else if (strcmp(elem->
defname,
"include-timestamp") == 0)
198 if (elem->
arg == NULL)
199 data->include_timestamp =
true;
202 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
203 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
206 else if (strcmp(elem->
defname,
"force-binary") == 0)
210 if (elem->
arg == NULL)
214 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
215 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
221 else if (strcmp(elem->
defname,
"skip-empty-xacts") == 0)
224 if (elem->
arg == NULL)
225 data->skip_empty_xacts =
true;
228 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
229 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
232 else if (strcmp(elem->
defname,
"only-local") == 0)
235 if (elem->
arg == NULL)
236 data->only_local =
true;
239 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
240 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
243 else if (strcmp(elem->
defname,
"include-rewrites") == 0)
246 if (elem->
arg == NULL)
250 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
251 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
254 else if (strcmp(elem->
defname,
"stream-changes") == 0)
256 if (elem->
arg == NULL)
260 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
261 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
267 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
268 errmsg(
"option \"%s\" = \"%s\" is unknown",
302 if (
data->skip_empty_xacts)
312 if (
data->include_xids)
331 if (
data->skip_empty_xacts && !xact_wrote_changes)
335 if (
data->include_xids)
340 if (
data->include_timestamp)
362 if (
data->skip_empty_xacts)
388 if (
data->include_xids)
391 if (
data->include_timestamp)
410 if (
data->include_xids)
413 if (
data->include_timestamp)
434 if (
data->include_xids)
437 if (
data->include_timestamp)
455 if (strstr(gid,
"_nodecode") != NULL)
503 if (strcmp(outputstr,
"t") == 0)
511 for (valptr = outputstr; *valptr; valptr++)
531 for (natt = 0; natt < tupdesc->
natts; natt++)
546 if (attr->attisdropped)
553 if (attr->attnum < 0)
556 typid = attr->atttypid;
559 origval =
heap_getattr(tuple, natt + 1, tupdesc, &isnull);
561 if (isnull && skip_nulls)
575 &typoutput, &typisvarlena);
585 else if (!typisvarlena)
632 class_form->relrewrite ?
634 NameStr(class_form->relname)));
641 if (change->
data.
tp.newtuple == NULL)
645 &change->
data.
tp.newtuple->tuple,
650 if (change->
data.
tp.oldtuple != NULL)
654 &change->
data.
tp.oldtuple->tuple,
659 if (change->
data.
tp.newtuple == NULL)
663 &change->
data.
tp.newtuple->tuple,
670 if (change->
data.
tp.oldtuple == NULL)
675 &change->
data.
tp.oldtuple->tuple,
714 for (
i = 0;
i < nrelations;
i++)
721 NameStr(relations[
i]->rd_rel->relname)));
746 const char *prefix,
Size sz,
const char *message)
750 transactional, prefix, sz);
774 if (
data->skip_empty_xacts)
783 if (
data->include_xids)
801 if (
data->include_xids)
831 if (
data->skip_empty_xacts && !xact_wrote_changes)
835 if (
data->include_xids)
855 if (
data->include_xids)
862 if (
data->include_timestamp)
881 if (
data->skip_empty_xacts && !xact_wrote_changes)
886 if (
data->include_xids)
891 if (
data->include_timestamp)
920 if (
data->include_xids)
935 const char *prefix,
Size sz,
const char *message)
942 transactional, prefix, sz);
946 appendStringInfo(ctx->
out,
"streaming message: transactional: %d prefix: %s, sz: %zu content:",
947 transactional, prefix, sz);
960 int nrelations,
Relation relations[],
973 if (
data->include_xids)
const char * timestamptz_to_str(TimestampTz t)
bool parse_bool(const char *value, bool *result)
#define AssertVariableIsOfType(varname, typename)
#define SQL_STR_DOUBLE(ch, escape_backslash)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
char * OidOutputFunctionCall(Oid functionId, Datum val)
#define PG_DETOAST_DATUM(datum)
static Datum heap_getattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Assert(fmt[strlen(fmt) - 1] !='\n')
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
char * get_namespace_name(Oid nspid)
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Oid get_rel_namespace(Oid relid)
char * get_rel_name(Oid relid)
void MemoryContextReset(MemoryContext context)
void pfree(void *pointer)
void * palloc0(Size size)
void * MemoryContextAllocZero(MemoryContext context, Size size)
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define IsA(nodeptr, _type_)
#define InvalidRepOriginId
@ OUTPUT_PLUGIN_BINARY_OUTPUT
@ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
FormData_pg_attribute * Form_pg_attribute
FormData_pg_class * Form_pg_class
static Datum PointerGetDatum(const void *X)
char * quote_literal_cstr(const char *rawstr)
#define RelationGetForm(relation)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
@ REORDER_BUFFER_CHANGE_INSERT
@ REORDER_BUFFER_CHANGE_DELETE
@ REORDER_BUFFER_CHANGE_UPDATE
const char * quote_identifier(const char *ident)
char * quote_qualified_identifier(const char *qualifier, const char *ident)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void * output_plugin_private
List * output_plugin_options
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
OutputPluginOutputType output_type
ReorderBufferChangeType action
struct ReorderBufferChange::@95::@97 truncate
struct ReorderBufferChange::@95::@96 tp
union ReorderBufferChange::@95 data
struct ReorderBufferTXN * toptxn
void * output_plugin_private
union ReorderBufferTXN::@101 xact_time
bool stream_wrote_changes
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void print_literal(StringInfo s, Oid typid, char *outputstr)
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
#define TupleDescAttr(tupdesc, i)
#define VARATT_IS_EXTERNAL_ONDISK(PTR)