PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
tcn.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * tcn.c
4 * triggered change notification support for PostgreSQL
5 *
6 * Portions Copyright (c) 2011-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * contrib/tcn/tcn.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres.h"
17
18#include "access/htup_details.h"
19#include "commands/async.h"
20#include "commands/trigger.h"
21#include "executor/spi.h"
22#include "lib/stringinfo.h"
23#include "utils/rel.h"
24#include "utils/syscache.h"
25
27 .name = "tcn",
28 .version = PG_VERSION
29);
30
31/*
32 * Copy from s (for source) to r (for result), wrapping with q (quote)
33 * characters and doubling any quote characters found.
34 */
35static void
36strcpy_quoted(StringInfo r, const char *s, const char q)
37{
39 while (*s)
40 {
41 if (*s == q)
44 s++;
45 }
47}
48
49/*
50 * triggered_change_notification
51 *
52 * This trigger function will send a notification of data modification with
53 * primary key values. The channel will be "tcn" unless the trigger is
54 * created with a parameter, in which case that parameter will be used.
55 */
57
60{
61 TriggerData *trigdata = (TriggerData *) fcinfo->context;
62 Trigger *trigger;
63 int nargs;
64 HeapTuple trigtuple;
65 Relation rel;
66 TupleDesc tupdesc;
67 char *channel;
68 char operation;
69 StringInfo payload = makeStringInfo();
70 bool foundPK;
71
72 List *indexoidlist;
73 ListCell *indexoidscan;
74
75 /* make sure it's called as a trigger */
76 if (!CALLED_AS_TRIGGER(fcinfo))
78 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
79 errmsg("triggered_change_notification: must be called as trigger")));
80
81 /* and that it's called after the change */
82 if (!TRIGGER_FIRED_AFTER(trigdata->tg_event))
84 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
85 errmsg("triggered_change_notification: must be called after the change")));
86
87 /* and that it's called for each row */
88 if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
90 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
91 errmsg("triggered_change_notification: must be called for each row")));
92
93 if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
94 operation = 'I';
95 else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
96 operation = 'U';
97 else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
98 operation = 'D';
99 else
100 {
101 elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation");
102 operation = 'X'; /* silence compiler warning */
103 }
104
105 trigger = trigdata->tg_trigger;
106 nargs = trigger->tgnargs;
107 if (nargs > 1)
109 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
110 errmsg("triggered_change_notification: must not be called with more than one parameter")));
111
112 if (nargs == 0)
113 channel = "tcn";
114 else
115 channel = trigger->tgargs[0];
116
117 /* get tuple data */
118 trigtuple = trigdata->tg_trigtuple;
119 rel = trigdata->tg_relation;
120 tupdesc = rel->rd_att;
121
122 foundPK = false;
123
124 /*
125 * Get the list of index OIDs for the table from the relcache, and look up
126 * each one in the pg_index syscache until we find one marked primary key
127 * (hopefully there isn't more than one such).
128 */
129 indexoidlist = RelationGetIndexList(rel);
130
131 foreach(indexoidscan, indexoidlist)
132 {
133 Oid indexoid = lfirst_oid(indexoidscan);
134 HeapTuple indexTuple;
136
137 indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
138 if (!HeapTupleIsValid(indexTuple)) /* should not happen */
139 elog(ERROR, "cache lookup failed for index %u", indexoid);
140 index = (Form_pg_index) GETSTRUCT(indexTuple);
141 /* we're only interested if it is the primary key and valid */
142 if (index->indisprimary && index->indisvalid)
143 {
144 int indnkeyatts = index->indnkeyatts;
145
146 if (indnkeyatts > 0)
147 {
148 int i;
149
150 foundPK = true;
151
152 strcpy_quoted(payload, RelationGetRelationName(rel), '"');
153 appendStringInfoCharMacro(payload, ',');
154 appendStringInfoCharMacro(payload, operation);
155
156 for (i = 0; i < indnkeyatts; i++)
157 {
158 int colno = index->indkey.values[i];
159 Form_pg_attribute attr = TupleDescAttr(tupdesc, colno - 1);
160
161 appendStringInfoCharMacro(payload, ',');
162 strcpy_quoted(payload, NameStr(attr->attname), '"');
163 appendStringInfoCharMacro(payload, '=');
164 strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
165 }
166
167 Async_Notify(channel, payload->data);
168 }
169 ReleaseSysCache(indexTuple);
170 break;
171 }
172 ReleaseSysCache(indexTuple);
173 }
174
175 list_free(indexoidlist);
176
177 if (!foundPK)
179 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
180 errmsg("triggered_change_notification: must be called on a table with a primary key")));
181
182 return PointerGetDatum(NULL); /* after trigger; value doesn't matter */
183}
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:591
#define NameStr(name)
Definition: c.h:717
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:149
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
int i
Definition: isn.c:77
void list_free(List *list)
Definition: list.c:1546
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
FormData_pg_index * Form_pg_index
Definition: pg_index.h:70
#define lfirst_oid(lc)
Definition: pg_list.h:174
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:327
uintptr_t Datum
Definition: postgres.h:69
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
unsigned int Oid
Definition: postgres_ext.h:30
#define RelationGetRelationName(relation)
Definition: rel.h:550
List * RelationGetIndexList(Relation relation)
Definition: relcache.c:4833
char * SPI_getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber)
Definition: spi.c:1221
StringInfo makeStringInfo(void)
Definition: stringinfo.c:72
#define appendStringInfoCharMacro(str, ch)
Definition: stringinfo.h:231
Definition: pg_list.h:54
TupleDesc rd_att
Definition: rel.h:112
Relation tg_relation
Definition: trigger.h:35
TriggerEvent tg_event
Definition: trigger.h:34
Trigger * tg_trigger
Definition: trigger.h:38
HeapTuple tg_trigtuple
Definition: trigger.h:36
int16 tgnargs
Definition: reltrigger.h:38
Definition: type.h:96
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:221
static void strcpy_quoted(StringInfo r, const char *s, const char q)
Definition: tcn.c:36
Datum triggered_change_notification(PG_FUNCTION_ARGS)
Definition: tcn.c:59
PG_MODULE_MAGIC_EXT(.name="tcn",.version=PG_VERSION)
PG_FUNCTION_INFO_V1(triggered_change_notification)
#define TRIGGER_FIRED_BY_DELETE(event)
Definition: trigger.h:113
#define CALLED_AS_TRIGGER(fcinfo)
Definition: trigger.h:26
#define TRIGGER_FIRED_FOR_ROW(event)
Definition: trigger.h:122
#define TRIGGER_FIRED_AFTER(event)
Definition: trigger.h:131
#define TRIGGER_FIRED_BY_INSERT(event)
Definition: trigger.h:110
#define TRIGGER_FIRED_BY_UPDATE(event)
Definition: trigger.h:116
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
const char * name