PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
commit_ts.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  * PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_xact-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes. Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26 
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "storage/shmem.h"
36 #include "utils/builtins.h"
37 #include "utils/snapmgr.h"
38 #include "utils/timestamp.h"
39 
40 /*
41  * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
42  * everywhere else in Postgres.
43  *
44  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
45  * CommitTs page numbering also wraps around at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
47  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
48  * explicit notice of that fact in this module, except when comparing segment
49  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
50  */
51 
52 /*
53  * We need 8+2 bytes per xact. Note that enlarging this struct might mean
54  * the largest possible file name is more than 5 chars long; see
55  * SlruScanDirectory.
56  */
57 typedef struct CommitTimestampEntry
58 {
62 
63 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
64  sizeof(RepOriginId))
65 
66 #define COMMIT_TS_XACTS_PER_PAGE \
67  (BLCKSZ / SizeOfCommitTimestampEntry)
68 
69 #define TransactionIdToCTsPage(xid) \
70  ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
71 #define TransactionIdToCTsEntry(xid) \
72  ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
73 
74 /*
75  * Link to shared-memory data structures for CommitTs control
76  */
78 
79 #define CommitTsCtl (&CommitTsCtlData)
80 
81 /*
82  * We keep a cache of the last value set in shared memory.
83  *
84  * This is also good place to keep the activation status. We keep this
85  * separate from the GUC so that the standby can activate the module if the
86  * primary has it active independently of the value of the GUC.
87  *
88  * This is protected by CommitTsLock. In some places, we use commitTsActive
89  * without acquiring the lock; where this happens, a comment explains the
90  * rationale for it.
91  */
92 typedef struct CommitTimestampShared
93 {
98 
100 
101 
102 /* GUC variable */
104 
105 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
106  TransactionId *subxids, TimestampTz ts,
107  RepOriginId nodeid, int pageno);
109  RepOriginId nodeid, int slotno);
110 static void error_commit_ts_disabled(void);
111 static int ZeroCommitTsPage(int pageno, bool writeXlog);
112 static bool CommitTsPagePrecedes(int page1, int page2);
113 static void ActivateCommitTs(void);
114 static void DeactivateCommitTs(void);
115 static void WriteZeroPageXlogRec(int pageno);
116 static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
117 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
119  RepOriginId nodeid);
120 
121 /*
122  * TransactionTreeSetCommitTsData
123  *
124  * Record the final commit timestamp of transaction entries in the commit log
125  * for a transaction and its subtransaction tree, as efficiently as possible.
126  *
127  * xid is the top level transaction id.
128  *
129  * subxids is an array of xids of length nsubxids, representing subtransactions
130  * in the tree of xid. In various cases nsubxids may be zero.
131  * The reason why tracking just the parent xid commit timestamp is not enough
132  * is that the subtrans SLRU does not stay valid across crashes (it's not
133  * permanent) so we need to keep the information about them here. If the
134  * subtrans implementation changes in the future, we might want to revisit the
135  * decision of storing timestamp info for each subxid.
136  *
137  * The write_xlog parameter tells us whether to include an XLog record of this
138  * or not. Normally, this is called from transaction commit routines (both
139  * normal and prepared) and the information will be stored in the transaction
140  * commit XLog record, and so they should pass "false" for this. The XLog redo
141  * code should use "false" here as well. Other callers probably want to pass
142  * true, so that the given values persist in case of crashes.
143  */
144 void
147  RepOriginId nodeid, bool write_xlog)
148 {
149  int i;
150  TransactionId headxid;
151  TransactionId newestXact;
152 
153  /*
154  * No-op if the module is not active.
155  *
156  * An unlocked read here is fine, because in a standby (the only place
157  * where the flag can change in flight) this routine is only called by the
158  * recovery process, which is also the only process which can change the
159  * flag.
160  */
161  if (!commitTsShared->commitTsActive)
162  return;
163 
164  /*
165  * Comply with the WAL-before-data rule: if caller specified it wants this
166  * value to be recorded in WAL, do so before touching the data.
167  */
168  if (write_xlog)
169  WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
170 
171  /*
172  * Figure out the latest Xid in this batch: either the last subxid if
173  * there's any, otherwise the parent xid.
174  */
175  if (nsubxids > 0)
176  newestXact = subxids[nsubxids - 1];
177  else
178  newestXact = xid;
179 
180  /*
181  * We split the xids to set the timestamp to in groups belonging to the
182  * same SLRU page; the first element in each such set is its head. The
183  * first group has the main XID as the head; subsequent sets use the first
184  * subxid not on the previous page as head. This way, we only have to
185  * lock/modify each SLRU page once.
186  */
187  for (i = 0, headxid = xid;;)
188  {
189  int pageno = TransactionIdToCTsPage(headxid);
190  int j;
191 
192  for (j = i; j < nsubxids; j++)
193  {
194  if (TransactionIdToCTsPage(subxids[j]) != pageno)
195  break;
196  }
197  /* subxids[i..j] are on the same page as the head */
198 
199  SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
200  pageno);
201 
202  /* if we wrote out all subxids, we're done. */
203  if (j + 1 >= nsubxids)
204  break;
205 
206  /*
207  * Set the new head and skip over it, as well as over the subxids we
208  * just wrote.
209  */
210  headxid = subxids[j];
211  i += j - i + 1;
212  }
213 
214  /* update the cached value in shared memory */
215  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
216  commitTsShared->xidLastCommit = xid;
217  commitTsShared->dataLastCommit.time = timestamp;
218  commitTsShared->dataLastCommit.nodeid = nodeid;
219 
220  /* and move forwards our endpoint, if needed */
223  LWLockRelease(CommitTsLock);
224 }
225 
226 /*
227  * Record the commit timestamp of transaction entries in the commit log for all
228  * entries on a single page. Atomic only on this page.
229  */
230 static void
232  TransactionId *subxids, TimestampTz ts,
233  RepOriginId nodeid, int pageno)
234 {
235  int slotno;
236  int i;
237 
238  LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
239 
240  slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
241 
242  TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
243  for (i = 0; i < nsubxids; i++)
244  TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
245 
246  CommitTsCtl->shared->page_dirty[slotno] = true;
247 
248  LWLockRelease(CommitTsControlLock);
249 }
250 
251 /*
252  * Sets the commit timestamp of a single transaction.
253  *
254  * Must be called with CommitTsControlLock held
255  */
256 static void
258  RepOriginId nodeid, int slotno)
259 {
260  int entryno = TransactionIdToCTsEntry(xid);
261  CommitTimestampEntry entry;
262 
264 
265  entry.time = ts;
266  entry.nodeid = nodeid;
267 
268  memcpy(CommitTsCtl->shared->page_buffer[slotno] +
269  SizeOfCommitTimestampEntry * entryno,
271 }
272 
273 /*
274  * Interrogate the commit timestamp of a transaction.
275  *
276  * The return value indicates whether a commit timestamp record was found for
277  * the given xid. The timestamp value is returned in *ts (which may not be
278  * null), and the origin node for the Xid is returned in *nodeid, if it's not
279  * null.
280  */
281 bool
283  RepOriginId *nodeid)
284 {
285  int pageno = TransactionIdToCTsPage(xid);
286  int entryno = TransactionIdToCTsEntry(xid);
287  int slotno;
288  CommitTimestampEntry entry;
289  TransactionId oldestCommitTsXid;
290  TransactionId newestCommitTsXid;
291 
292  if (!TransactionIdIsValid(xid))
293  ereport(ERROR,
294  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296  else if (!TransactionIdIsNormal(xid))
297  {
298  /* frozen and bootstrap xids are always committed far in the past */
299  *ts = 0;
300  if (nodeid)
301  *nodeid = 0;
302  return false;
303  }
304 
305  LWLockAcquire(CommitTsLock, LW_SHARED);
306 
307  /* Error if module not enabled */
308  if (!commitTsShared->commitTsActive)
310 
311  /*
312  * If we're asked for the cached value, return that. Otherwise, fall
313  * through to read from SLRU.
314  */
315  if (commitTsShared->xidLastCommit == xid)
316  {
317  *ts = commitTsShared->dataLastCommit.time;
318  if (nodeid)
319  *nodeid = commitTsShared->dataLastCommit.nodeid;
320 
321  LWLockRelease(CommitTsLock);
322  return *ts != 0;
323  }
324 
325  oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
326  newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
327  /* neither is invalid, or both are */
328  Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
329  LWLockRelease(CommitTsLock);
330 
331  /*
332  * Return empty if the requested value is outside our valid range.
333  */
334  if (!TransactionIdIsValid(oldestCommitTsXid) ||
335  TransactionIdPrecedes(xid, oldestCommitTsXid) ||
336  TransactionIdPrecedes(newestCommitTsXid, xid))
337  {
338  *ts = 0;
339  if (nodeid)
340  *nodeid = InvalidRepOriginId;
341  return false;
342  }
343 
344  /* lock is acquired by SimpleLruReadPage_ReadOnly */
345  slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
346  memcpy(&entry,
347  CommitTsCtl->shared->page_buffer[slotno] +
348  SizeOfCommitTimestampEntry * entryno,
350 
351  *ts = entry.time;
352  if (nodeid)
353  *nodeid = entry.nodeid;
354 
355  LWLockRelease(CommitTsControlLock);
356  return *ts != 0;
357 }
358 
359 /*
360  * Return the Xid of the latest committed transaction. (As far as this module
361  * is concerned, anyway; it's up to the caller to ensure the value is useful
362  * for its purposes.)
363  *
364  * ts and extra are filled with the corresponding data; they can be passed
365  * as NULL if not wanted.
366  */
369 {
370  TransactionId xid;
371 
372  LWLockAcquire(CommitTsLock, LW_SHARED);
373 
374  /* Error if module not enabled */
375  if (!commitTsShared->commitTsActive)
377 
378  xid = commitTsShared->xidLastCommit;
379  if (ts)
380  *ts = commitTsShared->dataLastCommit.time;
381  if (nodeid)
382  *nodeid = commitTsShared->dataLastCommit.nodeid;
383  LWLockRelease(CommitTsLock);
384 
385  return xid;
386 }
387 
388 static void
390 {
391  ereport(ERROR,
392  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
393  errmsg("could not get commit timestamp data"),
395  errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
396  "track_commit_timestamp") :
397  errhint("Make sure the configuration parameter \"%s\" is set.",
398  "track_commit_timestamp")));
399 }
400 
401 /*
402  * SQL-callable wrapper to obtain commit time of a transaction
403  */
404 Datum
406 {
408  TimestampTz ts;
409  bool found;
410 
411  found = TransactionIdGetCommitTsData(xid, &ts, NULL);
412 
413  if (!found)
414  PG_RETURN_NULL();
415 
417 }
418 
419 
420 Datum
422 {
423  TransactionId xid;
424  TimestampTz ts;
425  Datum values[2];
426  bool nulls[2];
427  TupleDesc tupdesc;
428  HeapTuple htup;
429 
430  /* and construct a tuple with our data */
431  xid = GetLatestCommitTsData(&ts, NULL);
432 
433  /*
434  * Construct a tuple descriptor for the result row. This must match this
435  * function's pg_proc entry!
436  */
437  tupdesc = CreateTemplateTupleDesc(2, false);
438  TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
439  XIDOID, -1, 0);
440  TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
441  TIMESTAMPTZOID, -1, 0);
442  tupdesc = BlessTupleDesc(tupdesc);
443 
444  if (!TransactionIdIsNormal(xid))
445  {
446  memset(nulls, true, sizeof(nulls));
447  }
448  else
449  {
450  values[0] = TransactionIdGetDatum(xid);
451  nulls[0] = false;
452 
453  values[1] = TimestampTzGetDatum(ts);
454  nulls[1] = false;
455  }
456 
457  htup = heap_form_tuple(tupdesc, values, nulls);
458 
460 }
461 
462 
463 /*
464  * Number of shared CommitTS buffers.
465  *
466  * We use a very similar logic as for the number of CLOG buffers; see comments
467  * in CLOGShmemBuffers.
468  */
469 Size
471 {
472  return Min(16, Max(4, NBuffers / 1024));
473 }
474 
475 /*
476  * Shared memory sizing for CommitTs
477  */
478 Size
480 {
482  sizeof(CommitTimestampShared);
483 }
484 
485 /*
486  * Initialize CommitTs at system startup (postmaster start or standalone
487  * backend)
488  */
489 void
491 {
492  bool found;
493 
494  CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
495  SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
496  CommitTsControlLock, "pg_commit_ts",
498 
499  commitTsShared = ShmemInitStruct("CommitTs shared",
500  sizeof(CommitTimestampShared),
501  &found);
502 
503  if (!IsUnderPostmaster)
504  {
505  Assert(!found);
506 
507  commitTsShared->xidLastCommit = InvalidTransactionId;
508  TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
509  commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
510  commitTsShared->commitTsActive = false;
511  }
512  else
513  Assert(found);
514 }
515 
516 /*
517  * This function must be called ONCE on system install.
518  *
519  * (The CommitTs directory is assumed to have been created by initdb, and
520  * CommitTsShmemInit must have been called already.)
521  */
522 void
524 {
525  /*
526  * Nothing to do here at present, unlike most other SLRU modules; segments
527  * are created when the server is started with this module enabled. See
528  * ActivateCommitTs.
529  */
530 }
531 
532 /*
533  * Initialize (or reinitialize) a page of CommitTs to zeroes.
534  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
535  *
536  * The page is not actually written, just set up in shared memory.
537  * The slot number of the new page is returned.
538  *
539  * Control lock must be held at entry, and will be held at exit.
540  */
541 static int
542 ZeroCommitTsPage(int pageno, bool writeXlog)
543 {
544  int slotno;
545 
546  slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
547 
548  if (writeXlog)
549  WriteZeroPageXlogRec(pageno);
550 
551  return slotno;
552 }
553 
554 /*
555  * This must be called ONCE during postmaster or standalone-backend startup,
556  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
557  */
558 void
560 {
562 }
563 
564 /*
565  * This must be called ONCE during postmaster or standalone-backend startup,
566  * after recovery has finished.
567  */
568 void
570 {
571  /*
572  * If the feature is not enabled, turn it off for good. This also removes
573  * any leftover data.
574  *
575  * Conversely, we activate the module if the feature is enabled. This is
576  * not necessary in a master system because we already did it earlier, but
577  * if we're in a standby server that got promoted which had the feature
578  * enabled and was following a master that had the feature disabled, this
579  * is where we turn it on locally.
580  */
583  else
585 }
586 
587 /*
588  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
589  * XLog record in a standby.
590  */
591 void
592 CommitTsParameterChange(bool newvalue, bool oldvalue)
593 {
594  /*
595  * If the commit_ts module is disabled in this server and we get word from
596  * the master server that it is enabled there, activate it so that we can
597  * replay future WAL records involving it; also mark it as active on
598  * pg_control. If the old value was already set, we already did this, so
599  * don't do anything.
600  *
601  * If the module is disabled in the master, disable it here too, unless
602  * the module is enabled locally.
603  *
604  * Note this only runs in the recovery process, so an unlocked read is
605  * fine.
606  */
607  if (newvalue)
608  {
609  if (!commitTsShared->commitTsActive)
611  }
612  else if (commitTsShared->commitTsActive)
614 }
615 
616 /*
617  * Activate this module whenever necessary.
618  * This must happen during postmaster or standalone-backend startup,
619  * or during WAL replay anytime the track_commit_timestamp setting is
620  * changed in the master.
621  *
622  * The reason why this SLRU needs separate activation/deactivation functions is
623  * that it can be enabled/disabled during start and the activation/deactivation
624  * on master is propagated to slave via replay. Other SLRUs don't have this
625  * property and they can be just initialized during normal startup.
626  *
627  * This is in charge of creating the currently active segment, if it's not
628  * already there. The reason for this is that the server might have been
629  * running with this module disabled for a while and thus might have skipped
630  * the normal creation point.
631  */
632 static void
634 {
635  TransactionId xid;
636  int pageno;
637 
638  /* If we've done this already, there's nothing to do */
639  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
640  if (commitTsShared->commitTsActive)
641  {
642  LWLockRelease(CommitTsLock);
643  return;
644  }
645  LWLockRelease(CommitTsLock);
646 
648  pageno = TransactionIdToCTsPage(xid);
649 
650  /*
651  * Re-Initialize our idea of the latest page number.
652  */
653  LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
654  CommitTsCtl->shared->latest_page_number = pageno;
655  LWLockRelease(CommitTsControlLock);
656 
657  /*
658  * If CommitTs is enabled, but it wasn't in the previous server run, we
659  * need to set the oldest and newest values to the next Xid; that way, we
660  * will not try to read data that might not have been set.
661  *
662  * XXX does this have a problem if a server is started with commitTs
663  * enabled, then started with commitTs disabled, then restarted with it
664  * enabled again? It doesn't look like it does, because there should be a
665  * checkpoint that sets the value to InvalidTransactionId at end of
666  * recovery; and so any chance of injecting new transactions without
667  * CommitTs values would occur after the oldestCommitTsXid has been set to
668  * Invalid temporarily.
669  */
670  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
672  {
675  }
676  LWLockRelease(CommitTsLock);
677 
678  /* Create the current segment file, if necessary */
680  {
681  int slotno;
682 
683  LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
684  slotno = ZeroCommitTsPage(pageno, false);
686  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
687  LWLockRelease(CommitTsControlLock);
688  }
689 
690  /* Change the activation status in shared memory. */
691  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
692  commitTsShared->commitTsActive = true;
693  LWLockRelease(CommitTsLock);
694 }
695 
696 /*
697  * Deactivate this module.
698  *
699  * This must be called when the track_commit_timestamp parameter is turned off.
700  * This happens during postmaster or standalone-backend startup, or during WAL
701  * replay.
702  *
703  * Resets CommitTs into invalid state to make sure we don't hand back
704  * possibly-invalid data; also removes segments of old data.
705  */
706 static void
708 {
709  /*
710  * Cleanup the status in the shared memory.
711  *
712  * We reset everything in the commitTsShared record to prevent user from
713  * getting confusing data about last committed transaction on the standby
714  * when the module was activated repeatedly on the primary.
715  */
716  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
717 
718  commitTsShared->commitTsActive = false;
719  commitTsShared->xidLastCommit = InvalidTransactionId;
720  TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
721  commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
722 
725 
726  LWLockRelease(CommitTsLock);
727 
728  /*
729  * Remove *all* files. This is necessary so that there are no leftover
730  * files; in the case where this feature is later enabled after running
731  * with it disabled for some time there may be a gap in the file sequence.
732  * (We can probably tolerate out-of-sequence files, as they are going to
733  * be overwritten anyway when we wrap around, but it seems better to be
734  * tidy.)
735  */
736  LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
738  LWLockRelease(CommitTsControlLock);
739 }
740 
741 /*
742  * This must be called ONCE during postmaster or standalone-backend shutdown
743  */
744 void
746 {
747  /* Flush dirty CommitTs pages to disk */
748  SimpleLruFlush(CommitTsCtl, false);
749 
750  /*
751  * fsync pg_commit_ts to ensure that any files flushed previously are
752  * durably on disk.
753  */
754  fsync_fname("pg_commit_ts", true);
755 }
756 
757 /*
758  * Perform a checkpoint --- either during shutdown, or on-the-fly
759  */
760 void
762 {
763  /* Flush dirty CommitTs pages to disk */
765 
766  /*
767  * fsync pg_commit_ts to ensure that any files flushed previously are
768  * durably on disk.
769  */
770  fsync_fname("pg_commit_ts", true);
771 }
772 
773 /*
774  * Make sure that CommitTs has room for a newly-allocated XID.
775  *
776  * NB: this is called while holding XidGenLock. We want it to be very fast
777  * most of the time; even when it's not so fast, no actual I/O need happen
778  * unless we're forced to write out a dirty CommitTs or xlog page to make room
779  * in shared memory.
780  *
781  * NB: the current implementation relies on track_commit_timestamp being
782  * PGC_POSTMASTER.
783  */
784 void
786 {
787  int pageno;
788 
789  /*
790  * Nothing to do if module not enabled. Note we do an unlocked read of
791  * the flag here, which is okay because this routine is only called from
792  * GetNewTransactionId, which is never called in a standby.
793  */
794  Assert(!InRecovery);
795  if (!commitTsShared->commitTsActive)
796  return;
797 
798  /*
799  * No work except at first XID of a page. But beware: just after
800  * wraparound, the first XID of page zero is FirstNormalTransactionId.
801  */
802  if (TransactionIdToCTsEntry(newestXact) != 0 &&
804  return;
805 
806  pageno = TransactionIdToCTsPage(newestXact);
807 
808  LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
809 
810  /* Zero the page and make an XLOG entry about it */
811  ZeroCommitTsPage(pageno, !InRecovery);
812 
813  LWLockRelease(CommitTsControlLock);
814 }
815 
816 /*
817  * Remove all CommitTs segments before the one holding the passed
818  * transaction ID.
819  *
820  * Note that we don't need to flush XLOG here.
821  */
822 void
824 {
825  int cutoffPage;
826 
827  /*
828  * The cutoff point is the start of the segment containing oldestXact. We
829  * pass the *page* containing oldestXact to SimpleLruTruncate.
830  */
831  cutoffPage = TransactionIdToCTsPage(oldestXact);
832 
833  /* Check to see if there's any files that could be removed */
835  &cutoffPage))
836  return; /* nothing to remove */
837 
838  /* Write XLOG record */
839  WriteTruncateXlogRec(cutoffPage, oldestXact);
840 
841  /* Now we can remove the old CommitTs segment(s) */
842  SimpleLruTruncate(CommitTsCtl, cutoffPage);
843 }
844 
845 /*
846  * Set the limit values between which commit TS can be consulted.
847  */
848 void
850 {
851  /*
852  * Be careful not to overwrite values that are either further into the
853  * "future" or signal a disabled committs.
854  */
855  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
857  {
862  }
863  else
864  {
868  }
869  LWLockRelease(CommitTsLock);
870 }
871 
872 /*
873  * Move forwards the oldest commitTS value that can be consulted
874  */
875 void
877 {
878  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
882  LWLockRelease(CommitTsLock);
883 }
884 
885 
886 /*
887  * Decide which of two CLOG page numbers is "older" for truncation purposes.
888  *
889  * We need to use comparison of TransactionIds here in order to do the right
890  * thing with wraparound XID arithmetic. However, if we are asked about
891  * page number zero, we don't want to hand InvalidTransactionId to
892  * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
893  * offset both xids by FirstNormalTransactionId to avoid that.
894  */
895 static bool
896 CommitTsPagePrecedes(int page1, int page2)
897 {
898  TransactionId xid1;
899  TransactionId xid2;
900 
901  xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
902  xid1 += FirstNormalTransactionId;
903  xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
904  xid2 += FirstNormalTransactionId;
905 
906  return TransactionIdPrecedes(xid1, xid2);
907 }
908 
909 
910 /*
911  * Write a ZEROPAGE xlog record
912  */
913 static void
915 {
916  XLogBeginInsert();
917  XLogRegisterData((char *) (&pageno), sizeof(int));
918  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
919 }
920 
921 /*
922  * Write a TRUNCATE xlog record
923  */
924 static void
925 WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
926 {
927  xl_commit_ts_truncate xlrec;
928 
929  xlrec.pageno = pageno;
930  xlrec.oldestXid = oldestXid;
931 
932  XLogBeginInsert();
933  XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
934  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
935 }
936 
937 /*
938  * Write a SETTS xlog record
939  */
940 static void
943  RepOriginId nodeid)
944 {
945  xl_commit_ts_set record;
946 
947  record.timestamp = timestamp;
948  record.nodeid = nodeid;
949  record.mainxid = mainxid;
950 
951  XLogBeginInsert();
952  XLogRegisterData((char *) &record,
953  offsetof(xl_commit_ts_set, mainxid) +
954  sizeof(TransactionId));
955  XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
956  XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
957 }
958 
959 /*
960  * CommitTS resource manager's routines
961  */
962 void
964 {
965  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
966 
967  /* Backup blocks are not used in commit_ts records */
968  Assert(!XLogRecHasAnyBlockRefs(record));
969 
970  if (info == COMMIT_TS_ZEROPAGE)
971  {
972  int pageno;
973  int slotno;
974 
975  memcpy(&pageno, XLogRecGetData(record), sizeof(int));
976 
977  LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
978 
979  slotno = ZeroCommitTsPage(pageno, false);
981  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
982 
983  LWLockRelease(CommitTsControlLock);
984  }
985  else if (info == COMMIT_TS_TRUNCATE)
986  {
988 
990 
991  /*
992  * During XLOG replay, latest_page_number isn't set up yet; insert a
993  * suitable value to bypass the sanity test in SimpleLruTruncate.
994  */
995  CommitTsCtl->shared->latest_page_number = trunc->pageno;
996 
998  }
999  else if (info == COMMIT_TS_SETTS)
1000  {
1001  xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
1002  int nsubxids;
1003  TransactionId *subxids;
1004 
1005  nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
1006  sizeof(TransactionId));
1007  if (nsubxids > 0)
1008  {
1009  subxids = palloc(sizeof(TransactionId) * nsubxids);
1010  memcpy(subxids,
1012  sizeof(TransactionId) * nsubxids);
1013  }
1014  else
1015  subxids = NULL;
1016 
1017  TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
1018  setts->timestamp, setts->nodeid, true);
1019  if (subxids)
1020  pfree(subxids);
1021  }
1022  else
1023  elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1024 }
#define COMMIT_TS_ZEROPAGE
Definition: commit_ts.h:49
#define TIMESTAMPTZOID
Definition: pg_type.h:525
#define PG_GETARG_UINT32(n)
Definition: fmgr.h:235
CommitTimestampEntry dataLastCommit
Definition: commit_ts.c:95
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
Definition: commit_ts.c:421
int errhint(const char *fmt,...)
Definition: elog.c:987
#define COMMIT_TS_SETTS
Definition: commit_ts.h:51
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
uint32 TransactionId
Definition: c.h:397
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
Definition: commit_ts.c:849
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1350
static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
Definition: commit_ts.c:941
#define SizeOfCommitTimestampEntry
Definition: commit_ts.c:63
int64 timestamp
int64 TimestampTz
Definition: timestamp.h:39
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int pageno)
Definition: commit_ts.c:231
#define CommitTsCtl
Definition: commit_ts.c:79
static int ZeroCommitTsPage(int pageno, bool writeXlog)
Definition: commit_ts.c:542
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1165
bool InRecovery
Definition: xlog.c:192
#define Min(x, y)
Definition: c.h:806
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:567
unsigned char uint8
Definition: c.h:266
uint16 RepOriginId
Definition: xlogdefs.h:51
void StartupCommitTs(void)
Definition: commit_ts.c:559
int errcode(int sqlerrcode)
Definition: elog.c:575
#define XIDOID
Definition: pg_type.h:336
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
bool RecoveryInProgress(void)
Definition: xlog.c:7872
static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
Definition: commit_ts.c:925
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Definition: commit_ts.c:405
#define PANIC
Definition: elog.h:53
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:145
#define PG_RETURN_TIMESTAMPTZ(x)
Definition: timestamp.h:40
void SimpleLruFlush(SlruCtl ctl, bool allow_redirtied)
Definition: slru.c:1100
Size CommitTsShmemBuffers(void)
Definition: commit_ts.c:470
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition: commit_ts.c:876
TimestampTz timestamp
Definition: commit_ts.h:55
void CompleteCommitTsInitialization(void)
Definition: commit_ts.c:569
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
void pfree(void *pointer)
Definition: mcxt.c:950
#define XLogRecGetData(decoder)
Definition: xlogreader.h:220
#define FirstNormalTransactionId
Definition: transam.h:34
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Definition: commit_ts.c:257
#define ERROR
Definition: elog.h:43
#define XLogRecGetDataLen(decoder)
Definition: xlogreader.h:221
#define COMMIT_TS_XACTS_PER_PAGE
Definition: commit_ts.c:66
void CommitTsParameterChange(bool newvalue, bool oldvalue)
Definition: commit_ts.c:592
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
TransactionId nextXid
Definition: transam.h:117
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:371
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
Definition: execTuples.c:1031
bool track_commit_timestamp
Definition: commit_ts.c:103
#define SizeOfCommitTsSet
Definition: commit_ts.h:61
bool IsUnderPostmaster
Definition: globals.c:101
void commit_ts_redo(XLogReaderState *record)
Definition: commit_ts.c:963
VariableCache ShmemVariableCache
Definition: varsup.c:34
#define InvalidTransactionId
Definition: transam.h:31
TransactionId oldestXid
Definition: commit_ts.h:67
TransactionId ReadNewTransactionId(void)
Definition: varsup.c:250
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:112
static void ActivateCommitTs(void)
Definition: commit_ts.c:633
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
Definition: slru.c:586
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:574
static void WriteZeroPageXlogRec(int pageno)
Definition: commit_ts.c:914
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:497
#define ereport(elevel, rest)
Definition: elog.h:122
struct CommitTimestampEntry CommitTimestampEntry
TransactionId oldestCommitTsXid
Definition: transam.h:129
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:216
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
void TruncateCommitTs(TransactionId oldestXact)
Definition: commit_ts.c:823
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
TimestampTz time
Definition: commit_ts.c:59
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
#define TransactionIdGetDatum(X)
Definition: postgres.h:527
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1318
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:282
#define TransactionIdToCTsPage(xid)
Definition: commit_ts.c:69
uintptr_t Datum
Definition: postgres.h:372
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:313
#define COMMIT_TS_TRUNCATE
Definition: commit_ts.h:50
static bool CommitTsPagePrecedes(int page1, int page2)
Definition: commit_ts.c:896
TransactionId xidLastCommit
Definition: commit_ts.c:94
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:463
void CheckPointCommitTs(void)
Definition: commit_ts.c:761
#define Max(x, y)
Definition: c.h:800
TransactionId mainxid
Definition: commit_ts.h:57
void ShutdownCommitTs(void)
Definition: commit_ts.c:745
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
void ExtendCommitTs(TransactionId newestXact)
Definition: commit_ts.c:785
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1373
TransactionId newestCommitTsXid
Definition: transam.h:130
size_t Size
Definition: c.h:356
Size CommitTsShmemSize(void)
Definition: commit_ts.c:479
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:222
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
#define TransactionIdToCTsEntry(xid)
Definition: commit_ts.c:71
static SlruCtlData CommitTsCtlData
Definition: commit_ts.c:77
#define InvalidRepOriginId
Definition: origin.h:34
static void DeactivateCommitTs(void)
Definition: commit_ts.c:707
static Datum values[MAXATTR]
Definition: bootstrap.c:163
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
RepOriginId nodeid
Definition: commit_ts.h:56
int i
int NBuffers
Definition: globals.c:123
#define XLogRecHasAnyBlockRefs(decoder)
Definition: xlogreader.h:222
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void BootStrapCommitTs(void)
Definition: commit_ts.c:523
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
RepOriginId nodeid
Definition: commit_ts.c:60
void XLogBeginInsert(void)
Definition: xloginsert.c:120
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid, bool write_xlog)
Definition: commit_ts.c:145
CommitTimestampShared * commitTsShared
Definition: commit_ts.c:99
int16 AttrNumber
Definition: attnum.h:21
static void error_commit_ts_disabled(void)
Definition: commit_ts.c:389
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:259
#define SizeOfCommitTsTruncate
Definition: commit_ts.h:70
#define PG_RETURN_NULL()
Definition: fmgr.h:305
#define offsetof(type, field)
Definition: c.h:555
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:368
void CommitTsShmemInit(void)
Definition: commit_ts.c:490
struct CommitTimestampShared CommitTimestampShared
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)
Definition: slru.c:165