PostgreSQL Source Code git master
wait.h File Reference
#include "nodes/parsenodes.h"
#include "parser/parse_node.h"
#include "tcop/dest.h"
Include dependency graph for wait.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void ExecWaitStmt (ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 
TupleDesc WaitStmtResultDesc (WaitStmt *stmt)
 

Function Documentation

◆ ExecWaitStmt()

void ExecWaitStmt ( ParseState pstate,
WaitStmt stmt,
DestReceiver dest 
)

Definition at line 33 of file wait.c.

34{
35 XLogRecPtr lsn;
36 int64 timeout = 0;
37 WaitLSNResult waitLSNResult;
38 WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
39 bool throw = true;
40 TupleDesc tupdesc;
41 TupOutputState *tstate;
42 const char *result = "<unset>";
43 bool timeout_specified = false;
44 bool no_throw_specified = false;
45 bool mode_specified = false;
46
47 /* Parse and validate the mandatory LSN */
49 CStringGetDatum(stmt->lsn_literal)));
50
51 foreach_node(DefElem, defel, stmt->options)
52 {
53 if (strcmp(defel->defname, "mode") == 0)
54 {
55 char *mode_str;
56
57 if (mode_specified)
58 errorConflictingDefElem(defel, pstate);
59 mode_specified = true;
60
61 mode_str = defGetString(defel);
62
63 if (pg_strcasecmp(mode_str, "standby_replay") == 0)
65 else if (pg_strcasecmp(mode_str, "standby_write") == 0)
67 else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
69 else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
71 else
73 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 errmsg("unrecognized value for %s option \"%s\": \"%s\"",
75 "WAIT", defel->defname, mode_str),
76 parser_errposition(pstate, defel->location)));
77 }
78 else if (strcmp(defel->defname, "timeout") == 0)
79 {
80 char *timeout_str;
81 const char *hintmsg;
82 double result;
83
84 if (timeout_specified)
85 errorConflictingDefElem(defel, pstate);
86 timeout_specified = true;
87
88 timeout_str = defGetString(defel);
89
90 if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
91 {
93 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
94 errmsg("invalid timeout value: \"%s\"", timeout_str),
95 hintmsg ? errhint("%s", _(hintmsg)) : 0);
96 }
97
98 /*
99 * Get rid of any fractional part in the input. This is so we
100 * don't fail on just-out-of-range values that would round into
101 * range.
102 */
103 result = rint(result);
104
105 /* Range check */
106 if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
108 errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
109 errmsg("timeout value is out of range"));
110
111 if (result < 0)
113 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
114 errmsg("timeout cannot be negative"));
115
116 timeout = (int64) result;
117 }
118 else if (strcmp(defel->defname, "no_throw") == 0)
119 {
120 if (no_throw_specified)
121 errorConflictingDefElem(defel, pstate);
122
123 no_throw_specified = true;
124
125 throw = !defGetBoolean(defel);
126 }
127 else
128 {
130 errcode(ERRCODE_SYNTAX_ERROR),
131 errmsg("option \"%s\" not recognized",
132 defel->defname),
133 parser_errposition(pstate, defel->location));
134 }
135 }
136
137 /*
138 * We are going to wait for the LSN. We should first care that we don't
139 * hold a snapshot and correspondingly our MyProc->xmin is invalid.
140 * Otherwise, our snapshot could prevent the replay of WAL records
141 * implying a kind of self-deadlock. This is the reason why WAIT FOR is a
142 * command, not a procedure or function.
143 *
144 * At first, we should check there is no active snapshot. According to
145 * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
146 * processed with a snapshot. Thankfully, we can pop this snapshot,
147 * because PortalRunUtility() can tolerate this.
148 */
149 if (ActiveSnapshotSet())
151
152 /*
153 * At second, invalidate a catalog snapshot if any. And we should be done
154 * with the preparation.
155 */
157
158 /* Give up if there is still an active or registered snapshot. */
161 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
162 errmsg("WAIT FOR must be called without an active or registered snapshot"),
163 errdetail("WAIT FOR cannot be executed from a function or procedure, nor within a transaction with an isolation level higher than READ COMMITTED."));
164
165 /*
166 * As the result we should hold no snapshot, and correspondingly our xmin
167 * should be unset.
168 */
170
171 /*
172 * Validate that the requested mode matches the current server state.
173 * Primary modes can only be used on a primary.
174 */
175 if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
176 {
177 if (RecoveryInProgress())
179 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
180 errmsg("recovery is in progress"),
181 errhint("Waiting for primary_flush can only be done on a primary server. "
182 "Use standby_flush mode on a standby server.")));
183 }
184
185 /* Now wait for the LSN */
186 waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
187
188 /*
189 * Process the result of WaitForLSN(). Throw appropriate error if needed.
190 */
191 switch (waitLSNResult)
192 {
194 /* Nothing to do on success */
195 result = "success";
196 break;
197
199 if (throw)
200 {
201 XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
202
203 switch (lsnType)
204 {
207 errcode(ERRCODE_QUERY_CANCELED),
208 errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
209 LSN_FORMAT_ARGS(lsn),
210 LSN_FORMAT_ARGS(currentLSN)));
211 break;
212
215 errcode(ERRCODE_QUERY_CANCELED),
216 errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X",
217 LSN_FORMAT_ARGS(lsn),
218 LSN_FORMAT_ARGS(currentLSN)));
219 break;
220
223 errcode(ERRCODE_QUERY_CANCELED),
224 errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X",
225 LSN_FORMAT_ARGS(lsn),
226 LSN_FORMAT_ARGS(currentLSN)));
227 break;
228
231 errcode(ERRCODE_QUERY_CANCELED),
232 errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X",
233 LSN_FORMAT_ARGS(lsn),
234 LSN_FORMAT_ARGS(currentLSN)));
235 break;
236
237 default:
238 elog(ERROR, "unexpected wait LSN type %d", lsnType);
240 }
241 }
242 else
243 result = "timeout";
244 break;
245
247 if (throw)
248 {
249 if (PromoteIsTriggered())
250 {
251 XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
252
253 switch (lsnType)
254 {
257 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
258 errmsg("recovery is not in progress"),
259 errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.",
260 LSN_FORMAT_ARGS(lsn),
261 LSN_FORMAT_ARGS(currentLSN)));
262 break;
263
266 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
267 errmsg("recovery is not in progress"),
268 errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.",
269 LSN_FORMAT_ARGS(lsn),
270 LSN_FORMAT_ARGS(currentLSN)));
271 break;
272
275 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
276 errmsg("recovery is not in progress"),
277 errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.",
278 LSN_FORMAT_ARGS(lsn),
279 LSN_FORMAT_ARGS(currentLSN)));
280 break;
281
282 default:
283 elog(ERROR, "unexpected wait LSN type %d", lsnType);
285 }
286 }
287 else
288 {
289 switch (lsnType)
290 {
293 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
294 errmsg("recovery is not in progress"),
295 errhint("Waiting for the standby_replay LSN can only be executed during recovery."));
296 break;
297
300 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
301 errmsg("recovery is not in progress"),
302 errhint("Waiting for the standby_write LSN can only be executed during recovery."));
303 break;
304
307 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
308 errmsg("recovery is not in progress"),
309 errhint("Waiting for the standby_flush LSN can only be executed during recovery."));
310 break;
311
312 default:
313 elog(ERROR, "unexpected wait LSN type %d", lsnType);
315 }
316 }
317 }
318 else
319 result = "not in recovery";
320 break;
321 }
322
323 /* need a tuple descriptor representing a single TEXT column */
324 tupdesc = WaitStmtResultDesc(stmt);
325
326 /* prepare for projection of tuples */
327 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
328
329 /* Send it */
330 do_text_output_oneline(tstate, result);
331
332 end_tup_output(tstate);
333}
int64_t int64
Definition: c.h:549
#define FLOAT8_FITS_IN_INT64(num)
Definition: c.h:1091
#define pg_unreachable()
Definition: c.h:347
#define unlikely(x)
Definition: c.h:418
char * defGetString(DefElem *def)
Definition: define.c:35
bool defGetBoolean(DefElem *def)
Definition: define.c:94
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition: define.c:371
int errdetail(const char *fmt,...)
Definition: elog.c:1216
int errhint(const char *fmt,...)
Definition: elog.c:1330
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define _(x)
Definition: elog.c:91
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2522
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2444
#define do_text_output_oneline(tstate, str_to_emit)
Definition: executor.h:628
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:684
bool parse_real(const char *value, double *result, int flags, const char **hintmsg)
Definition: guc.c:2833
#define GUC_UNIT_MS
Definition: guc.h:239
Assert(PointerIsAligned(start, uint64))
#define stmt
Definition: indent_codes.h:59
int parser_errposition(ParseState *pstate, int location)
Definition: parse_node.c:106
#define foreach_node(type, var, lst)
Definition: pg_list.h:496
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:64
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:25
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:32
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:380
bool ActiveSnapshotSet(void)
Definition: snapmgr.c:812
bool HaveRegisteredOrActiveSnapshot(void)
Definition: snapmgr.c:1644
void PopActiveSnapshot(void)
Definition: snapmgr.c:775
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:455
PGPROC * MyProc
Definition: proc.c:67
TransactionId xmin
Definition: proc.h:194
#define InvalidTransactionId
Definition: transam.h:31
TupleDesc WaitStmtResultDesc(WaitStmt *stmt)
Definition: wait.c:336
bool RecoveryInProgress(void)
Definition: xlog.c:6461
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool PromoteIsTriggered(void)
XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType)
Definition: xlogwait.c:89
WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
Definition: xlogwait.c:376
WaitLSNResult
Definition: xlogwait.h:26
@ WAIT_LSN_RESULT_NOT_IN_RECOVERY
Definition: xlogwait.h:28
@ WAIT_LSN_RESULT_TIMEOUT
Definition: xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition: xlogwait.h:27
WaitLSNType
Definition: xlogwait.h:37
@ WAIT_LSN_TYPE_PRIMARY_FLUSH
Definition: xlogwait.h:44
@ WAIT_LSN_TYPE_STANDBY_REPLAY
Definition: xlogwait.h:39
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition: xlogwait.h:41
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition: xlogwait.h:40

References _, ActiveSnapshotSet(), Assert(), begin_tup_output_tupdesc(), CStringGetDatum(), DatumGetLSN(), defGetBoolean(), defGetString(), generate_unaccent_rules::dest, DirectFunctionCall1, do_text_output_oneline, elog, end_tup_output(), ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, errorConflictingDefElem(), FLOAT8_FITS_IN_INT64, foreach_node, GetCurrentLSNForWaitType(), GUC_UNIT_MS, HaveRegisteredOrActiveSnapshot(), InvalidateCatalogSnapshot(), InvalidTransactionId, LSN_FORMAT_ARGS, MyProc, parse_real(), parser_errposition(), pg_lsn_in(), pg_strcasecmp(), pg_unreachable, PopActiveSnapshot(), PromoteIsTriggered(), RecoveryInProgress(), stmt, TTSOpsVirtual, unlikely, WAIT_LSN_RESULT_NOT_IN_RECOVERY, WAIT_LSN_RESULT_SUCCESS, WAIT_LSN_RESULT_TIMEOUT, WAIT_LSN_TYPE_PRIMARY_FLUSH, WAIT_LSN_TYPE_STANDBY_FLUSH, WAIT_LSN_TYPE_STANDBY_REPLAY, WAIT_LSN_TYPE_STANDBY_WRITE, WaitForLSN(), WaitStmtResultDesc(), and PGPROC::xmin.

Referenced by standard_ProcessUtility().

◆ WaitStmtResultDesc()

TupleDesc WaitStmtResultDesc ( WaitStmt stmt)

Definition at line 336 of file wait.c.

337{
338 TupleDesc tupdesc;
339
340 /* Need a tuple descriptor representing a single TEXT column */
341 tupdesc = CreateTemplateTupleDesc(1);
342 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
343 TEXTOID, -1, 0);
344 return tupdesc;
345}
int16 AttrNumber
Definition: attnum.h:21
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:182
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:842

References CreateTemplateTupleDesc(), and TupleDescInitEntry().

Referenced by ExecWaitStmt(), and UtilityTupleDescriptor().