PostgreSQL Source Code  git master
pgstat.c
Go to the documentation of this file.
1 /* ----------
2  * pgstat.c
3  *
4  * All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  * TODO: - Separate collector, postmaster and backend stuff
7  * into different files.
8  *
9  * - Add some automatic call for pgstat vacuuming.
10  *
11  * - Add a pgstat config column to pg_database, so this
12  * entire thing can be enabled/disabled on a per db basis.
13  *
14  * Copyright (c) 2001-2021, PostgreSQL Global Development Group
15  *
16  * src/backend/postmaster/pgstat.c
17  * ----------
18  */
19 #include "postgres.h"
20 
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <signal.h>
30 #include <time.h>
31 #ifdef HAVE_SYS_SELECT_H
32 #include <sys/select.h>
33 #endif
34 
35 #include "access/heapam.h"
36 #include "access/htup_details.h"
37 #include "access/tableam.h"
38 #include "access/transam.h"
39 #include "access/twophase_rmgr.h"
40 #include "access/xact.h"
41 #include "access/xlogprefetch.h"
42 #include "catalog/partition.h"
43 #include "catalog/pg_database.h"
44 #include "catalog/pg_proc.h"
45 #include "common/ip.h"
46 #include "executor/instrument.h"
47 #include "libpq/libpq.h"
48 #include "libpq/pqsignal.h"
49 #include "mb/pg_wchar.h"
50 #include "miscadmin.h"
51 #include "pgstat.h"
52 #include "postmaster/autovacuum.h"
54 #include "postmaster/interrupt.h"
55 #include "postmaster/postmaster.h"
56 #include "replication/slot.h"
57 #include "replication/walsender.h"
58 #include "storage/backendid.h"
59 #include "storage/dsm.h"
60 #include "storage/fd.h"
61 #include "storage/ipc.h"
62 #include "storage/latch.h"
63 #include "storage/lmgr.h"
64 #include "storage/pg_shmem.h"
65 #include "storage/proc.h"
66 #include "storage/procsignal.h"
67 #include "utils/builtins.h"
68 #include "utils/guc.h"
69 #include "utils/memutils.h"
70 #include "utils/ps_status.h"
71 #include "utils/rel.h"
72 #include "utils/snapmgr.h"
73 #include "utils/timestamp.h"
74 
75 /* ----------
76  * Timer definitions.
77  * ----------
78  */
79 #define PGSTAT_STAT_INTERVAL 500 /* Minimum time between stats file
80  * updates; in milliseconds. */
81 
82 #define PGSTAT_RETRY_DELAY 10 /* How long to wait between checks for a
83  * new file; in milliseconds. */
84 
85 #define PGSTAT_MAX_WAIT_TIME 10000 /* Maximum time to wait for a stats
86  * file update; in milliseconds. */
87 
88 #define PGSTAT_INQ_INTERVAL 640 /* How often to ping the collector for a
89  * new file; in milliseconds. */
90 
91 #define PGSTAT_RESTART_INTERVAL 60 /* How often to attempt to restart a
92  * failed statistics collector; in
93  * seconds. */
94 
95 #define PGSTAT_POLL_LOOP_COUNT (PGSTAT_MAX_WAIT_TIME / PGSTAT_RETRY_DELAY)
96 #define PGSTAT_INQ_LOOP_COUNT (PGSTAT_INQ_INTERVAL / PGSTAT_RETRY_DELAY)
97 
98 /* Minimum receive buffer size for the collector's socket. */
99 #define PGSTAT_MIN_RCVBUF (100 * 1024)
100 
101 
102 /* ----------
103  * The initial size hints for the hash tables used in the collector.
104  * ----------
105  */
106 #define PGSTAT_DB_HASH_SIZE 16
107 #define PGSTAT_TAB_HASH_SIZE 512
108 #define PGSTAT_FUNCTION_HASH_SIZE 512
109 
110 
111 /* ----------
112  * GUC parameters
113  * ----------
114  */
115 bool pgstat_track_counts = false;
117 
118 /* ----------
119  * Built from GUC parameter
120  * ----------
121  */
123 char *pgstat_stat_filename = NULL;
124 char *pgstat_stat_tmpname = NULL;
125 
126 /*
127  * BgWriter and WAL global statistics counters.
128  * Stored directly in a stats message structure so they can be sent
129  * without needing to copy things around. We assume these init to zeroes.
130  */
133 
134 /*
135  * WAL usage counters saved from pgWALUsage at the previous call to
136  * pgstat_report_wal(). This is used to calculate how much WAL usage
137  * happens between pgstat_report_wal() calls, by substracting
138  * the previous counters from the current ones.
139  */
141 
142 /*
143  * List of SLRU names that we keep stats for. There is no central registry of
144  * SLRUs, so we use this fixed list instead. The "other" entry is used for
145  * all SLRUs without an explicit entry (e.g. SLRUs in extensions).
146  */
147 static const char *const slru_names[] = {
148  "CommitTs",
149  "MultiXactMember",
150  "MultiXactOffset",
151  "Notify",
152  "Serial",
153  "Subtrans",
154  "Xact",
155  "other" /* has to be last */
156 };
157 
158 #define SLRU_NUM_ELEMENTS lengthof(slru_names)
159 
160 /*
161  * SLRU statistics counts waiting to be sent to the collector. These are
162  * stored directly in stats message format so they can be sent without needing
163  * to copy things around. We assume this variable inits to zeroes. Entries
164  * are one-to-one with slru_names[].
165  */
167 
168 /* ----------
169  * Local data
170  * ----------
171  */
173 
175 
177 
178 static bool pgStatRunningInCollector = false;
179 
180 /*
181  * Structures in which backends store per-table info that's waiting to be
182  * sent to the collector.
183  *
184  * NOTE: once allocated, TabStatusArray structures are never moved or deleted
185  * for the life of the backend. Also, we zero out the t_id fields of the
186  * contained PgStat_TableStatus structs whenever they are not actively in use.
187  * This allows relcache pgstat_info pointers to be treated as long-lived data,
188  * avoiding repeated searches in pgstat_initstats() when a relation is
189  * repeatedly opened during a transaction.
190  */
191 #define TABSTAT_QUANTUM 100 /* we alloc this many at a time */
192 
193 typedef struct TabStatusArray
194 {
195  struct TabStatusArray *tsa_next; /* link to next array, if any */
196  int tsa_used; /* # entries currently used */
199 
201 
202 /*
203  * pgStatTabHash entry: map from relation OID to PgStat_TableStatus pointer
204  */
205 typedef struct TabStatHashEntry
206 {
210 
211 /*
212  * Hash table for O(1) t_id -> tsa_entry lookup
213  */
214 static HTAB *pgStatTabHash = NULL;
215 
216 /*
217  * Backends store per-function info that's waiting to be sent to the collector
218  * in this hash table (indexed by function OID).
219  */
220 static HTAB *pgStatFunctions = NULL;
221 
222 /*
223  * Indicates if backend has some function stats that it hasn't yet
224  * sent to the collector.
225  */
226 static bool have_function_stats = false;
227 
228 /*
229  * Tuple insertion/deletion counts for an open transaction can't be propagated
230  * into PgStat_TableStatus counters until we know if it is going to commit
231  * or abort. Hence, we keep these counts in per-subxact structs that live
232  * in TopTransactionContext. This data structure is designed on the assumption
233  * that subxacts won't usually modify very many tables.
234  */
235 typedef struct PgStat_SubXactStatus
236 {
237  int nest_level; /* subtransaction nest level */
238  struct PgStat_SubXactStatus *prev; /* higher-level subxact if any */
239  PgStat_TableXactStatus *first; /* head of list for this subxact */
241 
243 
244 static int pgStatXactCommit = 0;
245 static int pgStatXactRollback = 0;
251 
252 /* Record that's written to 2PC state file when pgstat state is persisted */
253 typedef struct TwoPhasePgStatRecord
254 {
255  PgStat_Counter tuples_inserted; /* tuples inserted in xact */
256  PgStat_Counter tuples_updated; /* tuples updated in xact */
257  PgStat_Counter tuples_deleted; /* tuples deleted in xact */
258  PgStat_Counter inserted_pre_trunc; /* tuples inserted prior to truncate */
259  PgStat_Counter updated_pre_trunc; /* tuples updated prior to truncate */
260  PgStat_Counter deleted_pre_trunc; /* tuples deleted prior to truncate */
261  Oid t_id; /* table's OID */
262  bool t_shared; /* is it a shared catalog? */
263  bool t_truncated; /* was the relation truncated? */
265 
266 /*
267  * Info about current "snapshot" of stats file
268  */
270 static HTAB *pgStatDBHash = NULL;
271 
272 /*
273  * Cluster wide statistics, kept in the stats collector.
274  * Contains statistics that are not collected per database
275  * or per table.
276  */
282 static int nReplSlotStats;
284 
285 /*
286  * List of OIDs of databases we need to write out. If an entry is InvalidOid,
287  * it means to write only the shared-catalog stats ("DB 0"); otherwise, we
288  * will write both that DB's data and the shared stats.
289  */
291 
292 /*
293  * Total time charged to functions so far in the current backend.
294  * We use this to help separate "self" and "other" time charges.
295  * (We assume this initializes to zero.)
296  */
298 
299 
300 /* ----------
301  * Local function forward declarations
302  * ----------
303  */
304 #ifdef EXEC_BACKEND
305 static pid_t pgstat_forkexec(void);
306 #endif
307 
308 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_noreturn();
309 
310 static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
312  Oid tableoid, bool create);
313 static void pgstat_write_statsfiles(bool permanent, bool allDbs);
314 static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
315 static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
316 static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent);
317 static void backend_read_statsfile(void);
318 
319 static bool pgstat_write_statsfile_needed(void);
320 static bool pgstat_db_requested(Oid databaseid);
321 
322 static int pgstat_replslot_index(const char *name, bool create_it);
323 static void pgstat_reset_replslot(int i, TimestampTz ts);
324 
325 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
326 static void pgstat_send_funcstats(void);
327 static void pgstat_send_slru(void);
328 static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid);
329 static void pgstat_send_connstats(bool disconnect, TimestampTz last_report);
330 
331 static PgStat_TableStatus *get_tabstat_entry(Oid rel_id, bool isshared);
332 
333 static void pgstat_setup_memcxt(void);
334 
335 static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
336 static void pgstat_send(void *msg, int len);
337 
338 static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len);
339 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
340 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
341 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
342 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
347 static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
348 static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
349 static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
350 static void pgstat_recv_anl_ancestors(PgStat_MsgAnlAncestors *msg, int len);
351 static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len);
352 static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
353 static void pgstat_recv_wal(PgStat_MsgWal *msg, int len);
354 static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len);
356 static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
357 static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
359 static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
361 static void pgstat_recv_connstat(PgStat_MsgConn *msg, int len);
362 static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
363 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
364 
365 /* ------------------------------------------------------------
366  * Public functions called from postmaster follow
367  * ------------------------------------------------------------
368  */
369 
370 /* ----------
371  * pgstat_init() -
372  *
373  * Called from postmaster at startup. Create the resources required
374  * by the statistics collector process. If unable to do so, do not
375  * fail --- better to let the postmaster start with stats collection
376  * disabled.
377  * ----------
378  */
379 void
381 {
382  ACCEPT_TYPE_ARG3 alen;
383  struct addrinfo *addrs = NULL,
384  *addr,
385  hints;
386  int ret;
387  fd_set rset;
388  struct timeval tv;
389  char test_byte;
390  int sel_res;
391  int tries = 0;
392 
393 #define TESTBYTEVAL ((char) 199)
394 
395  /*
396  * This static assertion verifies that we didn't mess up the calculations
397  * involved in selecting maximum payload sizes for our UDP messages.
398  * Because the only consequence of overrunning PGSTAT_MAX_MSG_SIZE would
399  * be silent performance loss from fragmentation, it seems worth having a
400  * compile-time cross-check that we didn't.
401  */
403  "maximum stats message size exceeds PGSTAT_MAX_MSG_SIZE");
404 
405  /*
406  * Create the UDP socket for sending and receiving statistic messages
407  */
408  hints.ai_flags = AI_PASSIVE;
409  hints.ai_family = AF_UNSPEC;
410  hints.ai_socktype = SOCK_DGRAM;
411  hints.ai_protocol = 0;
412  hints.ai_addrlen = 0;
413  hints.ai_addr = NULL;
414  hints.ai_canonname = NULL;
415  hints.ai_next = NULL;
416  ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
417  if (ret || !addrs)
418  {
419  ereport(LOG,
420  (errmsg("could not resolve \"localhost\": %s",
421  gai_strerror(ret))));
422  goto startup_failed;
423  }
424 
425  /*
426  * On some platforms, pg_getaddrinfo_all() may return multiple addresses
427  * only one of which will actually work (eg, both IPv6 and IPv4 addresses
428  * when kernel will reject IPv6). Worse, the failure may occur at the
429  * bind() or perhaps even connect() stage. So we must loop through the
430  * results till we find a working combination. We will generate LOG
431  * messages, but no error, for bogus combinations.
432  */
433  for (addr = addrs; addr; addr = addr->ai_next)
434  {
435 #ifdef HAVE_UNIX_SOCKETS
436  /* Ignore AF_UNIX sockets, if any are returned. */
437  if (addr->ai_family == AF_UNIX)
438  continue;
439 #endif
440 
441  if (++tries > 1)
442  ereport(LOG,
443  (errmsg("trying another address for the statistics collector")));
444 
445  /*
446  * Create the socket.
447  */
448  if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) == PGINVALID_SOCKET)
449  {
450  ereport(LOG,
452  errmsg("could not create socket for statistics collector: %m")));
453  continue;
454  }
455 
456  /*
457  * Bind it to a kernel assigned port on localhost and get the assigned
458  * port via getsockname().
459  */
460  if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
461  {
462  ereport(LOG,
464  errmsg("could not bind socket for statistics collector: %m")));
467  continue;
468  }
469 
470  alen = sizeof(pgStatAddr);
471  if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0)
472  {
473  ereport(LOG,
475  errmsg("could not get address of socket for statistics collector: %m")));
478  continue;
479  }
480 
481  /*
482  * Connect the socket to its own address. This saves a few cycles by
483  * not having to respecify the target address on every send. This also
484  * provides a kernel-level check that only packets from this same
485  * address will be received.
486  */
487  if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0)
488  {
489  ereport(LOG,
491  errmsg("could not connect socket for statistics collector: %m")));
494  continue;
495  }
496 
497  /*
498  * Try to send and receive a one-byte test message on the socket. This
499  * is to catch situations where the socket can be created but will not
500  * actually pass data (for instance, because kernel packet filtering
501  * rules prevent it).
502  */
503  test_byte = TESTBYTEVAL;
504 
505 retry1:
506  if (send(pgStatSock, &test_byte, 1, 0) != 1)
507  {
508  if (errno == EINTR)
509  goto retry1; /* if interrupted, just retry */
510  ereport(LOG,
512  errmsg("could not send test message on socket for statistics collector: %m")));
515  continue;
516  }
517 
518  /*
519  * There could possibly be a little delay before the message can be
520  * received. We arbitrarily allow up to half a second before deciding
521  * it's broken.
522  */
523  for (;;) /* need a loop to handle EINTR */
524  {
525  FD_ZERO(&rset);
526  FD_SET(pgStatSock, &rset);
527 
528  tv.tv_sec = 0;
529  tv.tv_usec = 500000;
530  sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
531  if (sel_res >= 0 || errno != EINTR)
532  break;
533  }
534  if (sel_res < 0)
535  {
536  ereport(LOG,
538  errmsg("select() failed in statistics collector: %m")));
541  continue;
542  }
543  if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
544  {
545  /*
546  * This is the case we actually think is likely, so take pains to
547  * give a specific message for it.
548  *
549  * errno will not be set meaningfully here, so don't use it.
550  */
551  ereport(LOG,
552  (errcode(ERRCODE_CONNECTION_FAILURE),
553  errmsg("test message did not get through on socket for statistics collector")));
556  continue;
557  }
558 
559  test_byte++; /* just make sure variable is changed */
560 
561 retry2:
562  if (recv(pgStatSock, &test_byte, 1, 0) != 1)
563  {
564  if (errno == EINTR)
565  goto retry2; /* if interrupted, just retry */
566  ereport(LOG,
568  errmsg("could not receive test message on socket for statistics collector: %m")));
571  continue;
572  }
573 
574  if (test_byte != TESTBYTEVAL) /* strictly paranoia ... */
575  {
576  ereport(LOG,
577  (errcode(ERRCODE_INTERNAL_ERROR),
578  errmsg("incorrect test message transmission on socket for statistics collector")));
581  continue;
582  }
583 
584  /* If we get here, we have a working socket */
585  break;
586  }
587 
588  /* Did we find a working address? */
589  if (!addr || pgStatSock == PGINVALID_SOCKET)
590  goto startup_failed;
591 
592  /*
593  * Set the socket to non-blocking IO. This ensures that if the collector
594  * falls behind, statistics messages will be discarded; backends won't
595  * block waiting to send messages to the collector.
596  */
598  {
599  ereport(LOG,
601  errmsg("could not set statistics collector socket to nonblocking mode: %m")));
602  goto startup_failed;
603  }
604 
605  /*
606  * Try to ensure that the socket's receive buffer is at least
607  * PGSTAT_MIN_RCVBUF bytes, so that it won't easily overflow and lose
608  * data. Use of UDP protocol means that we are willing to lose data under
609  * heavy load, but we don't want it to happen just because of ridiculously
610  * small default buffer sizes (such as 8KB on older Windows versions).
611  */
612  {
613  int old_rcvbuf;
614  int new_rcvbuf;
615  ACCEPT_TYPE_ARG3 rcvbufsize = sizeof(old_rcvbuf);
616 
617  if (getsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
618  (char *) &old_rcvbuf, &rcvbufsize) < 0)
619  {
620  ereport(LOG,
621  (errmsg("getsockopt(%s) failed: %m", "SO_RCVBUF")));
622  /* if we can't get existing size, always try to set it */
623  old_rcvbuf = 0;
624  }
625 
626  new_rcvbuf = PGSTAT_MIN_RCVBUF;
627  if (old_rcvbuf < new_rcvbuf)
628  {
629  if (setsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
630  (char *) &new_rcvbuf, sizeof(new_rcvbuf)) < 0)
631  ereport(LOG,
632  (errmsg("setsockopt(%s) failed: %m", "SO_RCVBUF")));
633  }
634  }
635 
636  pg_freeaddrinfo_all(hints.ai_family, addrs);
637 
638  /* Now that we have a long-lived socket, tell fd.c about it. */
640 
641  return;
642 
643 startup_failed:
644  ereport(LOG,
645  (errmsg("disabling statistics collector for lack of working socket")));
646 
647  if (addrs)
648  pg_freeaddrinfo_all(hints.ai_family, addrs);
649 
653 
654  /*
655  * Adjust GUC variables to suppress useless activity, and for debugging
656  * purposes (seeing track_counts off is a clue that we failed here). We
657  * use PGC_S_OVERRIDE because there is no point in trying to turn it back
658  * on from postgresql.conf without a restart.
659  */
660  SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE);
661 }
662 
663 /*
664  * subroutine for pgstat_reset_all
665  */
666 static void
668 {
669  DIR *dir;
670  struct dirent *entry;
671  char fname[MAXPGPATH * 2];
672 
673  dir = AllocateDir(directory);
674  while ((entry = ReadDir(dir, directory)) != NULL)
675  {
676  int nchars;
677  Oid tmp_oid;
678 
679  /*
680  * Skip directory entries that don't match the file names we write.
681  * See get_dbstat_filename for the database-specific pattern.
682  */
683  if (strncmp(entry->d_name, "global.", 7) == 0)
684  nchars = 7;
685  else
686  {
687  nchars = 0;
688  (void) sscanf(entry->d_name, "db_%u.%n",
689  &tmp_oid, &nchars);
690  if (nchars <= 0)
691  continue;
692  /* %u allows leading whitespace, so reject that */
693  if (strchr("0123456789", entry->d_name[3]) == NULL)
694  continue;
695  }
696 
697  if (strcmp(entry->d_name + nchars, "tmp") != 0 &&
698  strcmp(entry->d_name + nchars, "stat") != 0)
699  continue;
700 
701  snprintf(fname, sizeof(fname), "%s/%s", directory,
702  entry->d_name);
703  unlink(fname);
704  }
705  FreeDir(dir);
706 }
707 
708 /*
709  * pgstat_reset_all() -
710  *
711  * Remove the stats files. This is currently used only if WAL
712  * recovery is needed after a crash.
713  */
714 void
716 {
719 }
720 
721 #ifdef EXEC_BACKEND
722 
723 /*
724  * pgstat_forkexec() -
725  *
726  * Format up the arglist for, then fork and exec, statistics collector process
727  */
728 static pid_t
729 pgstat_forkexec(void)
730 {
731  char *av[10];
732  int ac = 0;
733 
734  av[ac++] = "postgres";
735  av[ac++] = "--forkcol";
736  av[ac++] = NULL; /* filled in by postmaster_forkexec */
737 
738  av[ac] = NULL;
739  Assert(ac < lengthof(av));
740 
741  return postmaster_forkexec(ac, av);
742 }
743 #endif /* EXEC_BACKEND */
744 
745 
746 /*
747  * pgstat_start() -
748  *
749  * Called from postmaster at startup or after an existing collector
750  * died. Attempt to fire up a fresh statistics collector.
751  *
752  * Returns PID of child process, or 0 if fail.
753  *
754  * Note: if fail, we will be called again from the postmaster main loop.
755  */
756 int
758 {
759  time_t curtime;
760  pid_t pgStatPid;
761 
762  /*
763  * Check that the socket is there, else pgstat_init failed and we can do
764  * nothing useful.
765  */
767  return 0;
768 
769  /*
770  * Do nothing if too soon since last collector start. This is a safety
771  * valve to protect against continuous respawn attempts if the collector
772  * is dying immediately at launch. Note that since we will be re-called
773  * from the postmaster main loop, we will get another chance later.
774  */
775  curtime = time(NULL);
776  if ((unsigned int) (curtime - last_pgstat_start_time) <
777  (unsigned int) PGSTAT_RESTART_INTERVAL)
778  return 0;
779  last_pgstat_start_time = curtime;
780 
781  /*
782  * Okay, fork off the collector.
783  */
784 #ifdef EXEC_BACKEND
785  switch ((pgStatPid = pgstat_forkexec()))
786 #else
787  switch ((pgStatPid = fork_process()))
788 #endif
789  {
790  case -1:
791  ereport(LOG,
792  (errmsg("could not fork statistics collector: %m")));
793  return 0;
794 
795 #ifndef EXEC_BACKEND
796  case 0:
797  /* in postmaster child ... */
799 
800  /* Close the postmaster's sockets */
801  ClosePostmasterPorts(false);
802 
803  /* Drop our connection to postmaster's shared memory, as well */
804  dsm_detach_all();
806 
807  PgstatCollectorMain(0, NULL);
808  break;
809 #endif
810 
811  default:
812  return (int) pgStatPid;
813  }
814 
815  /* shouldn't get here */
816  return 0;
817 }
818 
819 void
821 {
823 }
824 
825 /* ------------------------------------------------------------
826  * Public functions used by backends follow
827  *------------------------------------------------------------
828  */
829 
830 
831 /* ----------
832  * pgstat_report_stat() -
833  *
834  * Must be called by processes that performs DML: tcop/postgres.c, logical
835  * receiver processes, SPI worker, etc. to send the so far collected
836  * per-table and function usage statistics to the collector. Note that this
837  * is called only when not within a transaction, so it is fair to use
838  * transaction stop time as an approximation of current time.
839  *
840  * "disconnect" is "true" only for the last call before the backend
841  * exits. This makes sure that no data is lost and that interrupted
842  * sessions are reported correctly.
843  * ----------
844  */
845 void
846 pgstat_report_stat(bool disconnect)
847 {
848  /* we assume this inits to all zeroes: */
849  static const PgStat_TableCounts all_zeroes;
850  static TimestampTz last_report = 0;
851 
853  PgStat_MsgTabstat regular_msg;
854  PgStat_MsgTabstat shared_msg;
855  TabStatusArray *tsa;
856  int i;
857 
858  /* Don't expend a clock check if nothing to do */
859  if ((pgStatTabList == NULL || pgStatTabList->tsa_used == 0) &&
860  pgStatXactCommit == 0 && pgStatXactRollback == 0 &&
861  !have_function_stats && !disconnect)
862  return;
863 
864  /*
865  * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
866  * msec since we last sent one, or the backend is about to exit.
867  */
869  if (!disconnect &&
871  return;
872 
873  /* for backends, send connection statistics */
874  if (MyBackendType == B_BACKEND)
875  pgstat_send_connstats(disconnect, last_report);
876 
877  last_report = now;
878 
879  /*
880  * Destroy pgStatTabHash before we start invalidating PgStat_TableEntry
881  * entries it points to. (Should we fail partway through the loop below,
882  * it's okay to have removed the hashtable already --- the only
883  * consequence is we'd get multiple entries for the same table in the
884  * pgStatTabList, and that's safe.)
885  */
886  if (pgStatTabHash)
887  hash_destroy(pgStatTabHash);
888  pgStatTabHash = NULL;
889 
890  /*
891  * Scan through the TabStatusArray struct(s) to find tables that actually
892  * have counts, and build messages to send. We have to separate shared
893  * relations from regular ones because the databaseid field in the message
894  * header has to depend on that.
895  */
896  regular_msg.m_databaseid = MyDatabaseId;
897  shared_msg.m_databaseid = InvalidOid;
898  regular_msg.m_nentries = 0;
899  shared_msg.m_nentries = 0;
900 
901  for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
902  {
903  for (i = 0; i < tsa->tsa_used; i++)
904  {
905  PgStat_TableStatus *entry = &tsa->tsa_entries[i];
906  PgStat_MsgTabstat *this_msg;
907  PgStat_TableEntry *this_ent;
908 
909  /* Shouldn't have any pending transaction-dependent counts */
910  Assert(entry->trans == NULL);
911 
912  /*
913  * Ignore entries that didn't accumulate any actual counts, such
914  * as indexes that were opened by the planner but not used.
915  */
916  if (memcmp(&entry->t_counts, &all_zeroes,
917  sizeof(PgStat_TableCounts)) == 0)
918  continue;
919 
920  /*
921  * OK, insert data into the appropriate message, and send if full.
922  */
923  this_msg = entry->t_shared ? &shared_msg : &regular_msg;
924  this_ent = &this_msg->m_entry[this_msg->m_nentries];
925  this_ent->t_id = entry->t_id;
926  memcpy(&this_ent->t_counts, &entry->t_counts,
927  sizeof(PgStat_TableCounts));
928  if (++this_msg->m_nentries >= PGSTAT_NUM_TABENTRIES)
929  {
930  pgstat_send_tabstat(this_msg);
931  this_msg->m_nentries = 0;
932  }
933  }
934  /* zero out PgStat_TableStatus structs after use */
935  MemSet(tsa->tsa_entries, 0,
936  tsa->tsa_used * sizeof(PgStat_TableStatus));
937  tsa->tsa_used = 0;
938  }
939 
940  /*
941  * Send partial messages. Make sure that any pending xact commit/abort
942  * gets counted, even if there are no table stats to send.
943  */
944  if (regular_msg.m_nentries > 0 ||
946  pgstat_send_tabstat(&regular_msg);
947  if (shared_msg.m_nentries > 0)
948  pgstat_send_tabstat(&shared_msg);
949 
950  /* Now, send function statistics */
952 
953  /* Send WAL statistics */
955 
956  /* Finally send SLRU statistics */
958 }
959 
960 /*
961  * Subroutine for pgstat_report_stat: finish and send a tabstat message
962  */
963 static void
965 {
966  int n;
967  int len;
968 
969  /* It's unlikely we'd get here with no socket, but maybe not impossible */
971  return;
972 
973  /*
974  * Report and reset accumulated xact commit/rollback and I/O timings
975  * whenever we send a normal tabstat message
976  */
977  if (OidIsValid(tsmsg->m_databaseid))
978  {
983  pgStatXactCommit = 0;
984  pgStatXactRollback = 0;
987  }
988  else
989  {
990  tsmsg->m_xact_commit = 0;
991  tsmsg->m_xact_rollback = 0;
992  tsmsg->m_block_read_time = 0;
993  tsmsg->m_block_write_time = 0;
994  }
995 
996  n = tsmsg->m_nentries;
997  len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
998  n * sizeof(PgStat_TableEntry);
999 
1001  pgstat_send(tsmsg, len);
1002 }
1003 
1004 /*
1005  * Subroutine for pgstat_report_stat: populate and send a function stat message
1006  */
1007 static void
1009 {
1010  /* we assume this inits to all zeroes: */
1011  static const PgStat_FunctionCounts all_zeroes;
1012 
1013  PgStat_MsgFuncstat msg;
1016 
1017  if (pgStatFunctions == NULL)
1018  return;
1019 
1021  msg.m_databaseid = MyDatabaseId;
1022  msg.m_nentries = 0;
1023 
1024  hash_seq_init(&fstat, pgStatFunctions);
1025  while ((entry = (PgStat_BackendFunctionEntry *) hash_seq_search(&fstat)) != NULL)
1026  {
1027  PgStat_FunctionEntry *m_ent;
1028 
1029  /* Skip it if no counts accumulated since last time */
1030  if (memcmp(&entry->f_counts, &all_zeroes,
1031  sizeof(PgStat_FunctionCounts)) == 0)
1032  continue;
1033 
1034  /* need to convert format of time accumulators */
1035  m_ent = &msg.m_entry[msg.m_nentries];
1036  m_ent->f_id = entry->f_id;
1037  m_ent->f_numcalls = entry->f_counts.f_numcalls;
1040 
1041  if (++msg.m_nentries >= PGSTAT_NUM_FUNCENTRIES)
1042  {
1043  pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
1044  msg.m_nentries * sizeof(PgStat_FunctionEntry));
1045  msg.m_nentries = 0;
1046  }
1047 
1048  /* reset the entry's counts */
1049  MemSet(&entry->f_counts, 0, sizeof(PgStat_FunctionCounts));
1050  }
1051 
1052  if (msg.m_nentries > 0)
1053  pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
1054  msg.m_nentries * sizeof(PgStat_FunctionEntry));
1055 
1056  have_function_stats = false;
1057 }
1058 
1059 
1060 /* ----------
1061  * pgstat_vacuum_stat() -
1062  *
1063  * Will tell the collector about objects he can get rid of.
1064  * ----------
1065  */
1066 void
1068 {
1069  HTAB *htab;
1070  PgStat_MsgTabpurge msg;
1071  PgStat_MsgFuncpurge f_msg;
1072  HASH_SEQ_STATUS hstat;
1073  PgStat_StatDBEntry *dbentry;
1074  PgStat_StatTabEntry *tabentry;
1075  PgStat_StatFuncEntry *funcentry;
1076  int len;
1077 
1079  return;
1080 
1081  /*
1082  * If not done for this transaction, read the statistics collector stats
1083  * file into some hash tables.
1084  */
1086 
1087  /*
1088  * Read pg_database and make a list of OIDs of all existing databases
1089  */
1090  htab = pgstat_collect_oids(DatabaseRelationId, Anum_pg_database_oid);
1091 
1092  /*
1093  * Search the database hash table for dead databases and tell the
1094  * collector to drop them.
1095  */
1096  hash_seq_init(&hstat, pgStatDBHash);
1097  while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
1098  {
1099  Oid dbid = dbentry->databaseid;
1100 
1102 
1103  /* the DB entry for shared tables (with InvalidOid) is never dropped */
1104  if (OidIsValid(dbid) &&
1105  hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
1106  pgstat_drop_database(dbid);
1107  }
1108 
1109  /* Clean up */
1110  hash_destroy(htab);
1111 
1112  /*
1113  * Lookup our own database entry; if not found, nothing more to do.
1114  */
1115  dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1116  (void *) &MyDatabaseId,
1117  HASH_FIND, NULL);
1118  if (dbentry == NULL || dbentry->tables == NULL)
1119  return;
1120 
1121  /*
1122  * Similarly to above, make a list of all known relations in this DB.
1123  */
1124  htab = pgstat_collect_oids(RelationRelationId, Anum_pg_class_oid);
1125 
1126  /*
1127  * Initialize our messages table counter to zero
1128  */
1129  msg.m_nentries = 0;
1130 
1131  /*
1132  * Check for all tables listed in stats hashtable if they still exist.
1133  */
1134  hash_seq_init(&hstat, dbentry->tables);
1135  while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
1136  {
1137  Oid tabid = tabentry->tableid;
1138 
1140 
1141  if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
1142  continue;
1143 
1144  /*
1145  * Not there, so add this table's Oid to the message
1146  */
1147  msg.m_tableid[msg.m_nentries++] = tabid;
1148 
1149  /*
1150  * If the message is full, send it out and reinitialize to empty
1151  */
1152  if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
1153  {
1154  len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
1155  + msg.m_nentries * sizeof(Oid);
1156 
1158  msg.m_databaseid = MyDatabaseId;
1159  pgstat_send(&msg, len);
1160 
1161  msg.m_nentries = 0;
1162  }
1163  }
1164 
1165  /*
1166  * Send the rest
1167  */
1168  if (msg.m_nentries > 0)
1169  {
1170  len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
1171  + msg.m_nentries * sizeof(Oid);
1172 
1174  msg.m_databaseid = MyDatabaseId;
1175  pgstat_send(&msg, len);
1176  }
1177 
1178  /* Clean up */
1179  hash_destroy(htab);
1180 
1181  /*
1182  * Now repeat the above steps for functions. However, we needn't bother
1183  * in the common case where no function stats are being collected.
1184  */
1185  if (dbentry->functions != NULL &&
1186  hash_get_num_entries(dbentry->functions) > 0)
1187  {
1188  htab = pgstat_collect_oids(ProcedureRelationId, Anum_pg_proc_oid);
1189 
1191  f_msg.m_databaseid = MyDatabaseId;
1192  f_msg.m_nentries = 0;
1193 
1194  hash_seq_init(&hstat, dbentry->functions);
1195  while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&hstat)) != NULL)
1196  {
1197  Oid funcid = funcentry->functionid;
1198 
1200 
1201  if (hash_search(htab, (void *) &funcid, HASH_FIND, NULL) != NULL)
1202  continue;
1203 
1204  /*
1205  * Not there, so add this function's Oid to the message
1206  */
1207  f_msg.m_functionid[f_msg.m_nentries++] = funcid;
1208 
1209  /*
1210  * If the message is full, send it out and reinitialize to empty
1211  */
1212  if (f_msg.m_nentries >= PGSTAT_NUM_FUNCPURGE)
1213  {
1214  len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
1215  + f_msg.m_nentries * sizeof(Oid);
1216 
1217  pgstat_send(&f_msg, len);
1218 
1219  f_msg.m_nentries = 0;
1220  }
1221  }
1222 
1223  /*
1224  * Send the rest
1225  */
1226  if (f_msg.m_nentries > 0)
1227  {
1228  len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
1229  + f_msg.m_nentries * sizeof(Oid);
1230 
1231  pgstat_send(&f_msg, len);
1232  }
1233 
1234  hash_destroy(htab);
1235  }
1236 }
1237 
1238 
1239 /* ----------
1240  * pgstat_collect_oids() -
1241  *
1242  * Collect the OIDs of all objects listed in the specified system catalog
1243  * into a temporary hash table. Caller should hash_destroy the result
1244  * when done with it. (However, we make the table in CurrentMemoryContext
1245  * so that it will be freed properly in event of an error.)
1246  * ----------
1247  */
1248 static HTAB *
1249 pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid)
1250 {
1251  HTAB *htab;
1252  HASHCTL hash_ctl;
1253  Relation rel;
1254  TableScanDesc scan;
1255  HeapTuple tup;
1256  Snapshot snapshot;
1257 
1258  hash_ctl.keysize = sizeof(Oid);
1259  hash_ctl.entrysize = sizeof(Oid);
1260  hash_ctl.hcxt = CurrentMemoryContext;
1261  htab = hash_create("Temporary table of OIDs",
1263  &hash_ctl,
1265 
1266  rel = table_open(catalogid, AccessShareLock);
1267  snapshot = RegisterSnapshot(GetLatestSnapshot());
1268  scan = table_beginscan(rel, snapshot, 0, NULL);
1269  while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
1270  {
1271  Oid thisoid;
1272  bool isnull;
1273 
1274  thisoid = heap_getattr(tup, anum_oid, RelationGetDescr(rel), &isnull);
1275  Assert(!isnull);
1276 
1278 
1279  (void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL);
1280  }
1281  table_endscan(scan);
1282  UnregisterSnapshot(snapshot);
1284 
1285  return htab;
1286 }
1287 
1288 
1289 /* ----------
1290  * pgstat_drop_database() -
1291  *
1292  * Tell the collector that we just dropped a database.
1293  * (If the message gets lost, we will still clean the dead DB eventually
1294  * via future invocations of pgstat_vacuum_stat().)
1295  * ----------
1296  */
1297 void
1299 {
1300  PgStat_MsgDropdb msg;
1301 
1303  return;
1304 
1306  msg.m_databaseid = databaseid;
1307  pgstat_send(&msg, sizeof(msg));
1308 }
1309 
1310 
1311 /* ----------
1312  * pgstat_drop_relation() -
1313  *
1314  * Tell the collector that we just dropped a relation.
1315  * (If the message gets lost, we will still clean the dead entry eventually
1316  * via future invocations of pgstat_vacuum_stat().)
1317  *
1318  * Currently not used for lack of any good place to call it; we rely
1319  * entirely on pgstat_vacuum_stat() to clean out stats for dead rels.
1320  * ----------
1321  */
1322 #ifdef NOT_USED
1323 void
1324 pgstat_drop_relation(Oid relid)
1325 {
1326  PgStat_MsgTabpurge msg;
1327  int len;
1328 
1330  return;
1331 
1332  msg.m_tableid[0] = relid;
1333  msg.m_nentries = 1;
1334 
1335  len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
1336 
1338  msg.m_databaseid = MyDatabaseId;
1339  pgstat_send(&msg, len);
1340 }
1341 #endif /* NOT_USED */
1342 
1343 
1344 /* ----------
1345  * pgstat_send_connstats() -
1346  *
1347  * Tell the collector about session statistics.
1348  * The parameter "disconnect" will be true when the backend exits.
1349  * "last_report" is the last time we were called (0 if never).
1350  * ----------
1351  */
1352 static void
1353 pgstat_send_connstats(bool disconnect, TimestampTz last_report)
1354 {
1355  PgStat_MsgConn msg;
1356  long secs;
1357  int usecs;
1358 
1360  return;
1361 
1363  msg.m_databaseid = MyDatabaseId;
1364 
1365  /* session time since the last report */
1366  TimestampDifference(((last_report == 0) ? MyStartTimestamp : last_report),
1368  &secs, &usecs);
1369  msg.m_session_time = secs * 1000000 + usecs;
1370 
1372 
1374  pgStatActiveTime = 0;
1375 
1378 
1379  /* report a new session only the first time */
1380  msg.m_count = (last_report == 0) ? 1 : 0;
1381 
1382  pgstat_send(&msg, sizeof(PgStat_MsgConn));
1383 }
1384 
1385 
1386 /* ----------
1387  * pgstat_reset_counters() -
1388  *
1389  * Tell the statistics collector to reset counters for our database.
1390  *
1391  * Permission checking for this function is managed through the normal
1392  * GRANT system.
1393  * ----------
1394  */
1395 void
1397 {
1399 
1401  return;
1402 
1404  msg.m_databaseid = MyDatabaseId;
1405  pgstat_send(&msg, sizeof(msg));
1406 }
1407 
1408 /* ----------
1409  * pgstat_reset_shared_counters() -
1410  *
1411  * Tell the statistics collector to reset cluster-wide shared counters.
1412  *
1413  * Permission checking for this function is managed through the normal
1414  * GRANT system.
1415  * ----------
1416  */
1417 void
1418 pgstat_reset_shared_counters(const char *target)
1419 {
1421 
1423  return;
1424 
1425  if (strcmp(target, "archiver") == 0)
1427  else if (strcmp(target, "bgwriter") == 0)
1429  else if (strcmp(target, "wal") == 0)
1430  msg.m_resettarget = RESET_WAL;
1431  else if (strcmp(target, "prefetch_recovery") == 0)
1432  {
1433  /*
1434  * We can't ask the stats collector to do this for us as it is not
1435  * attached to shared memory.
1436  */
1438  return;
1439  }
1440  else
1441  ereport(ERROR,
1442  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1443  errmsg("unrecognized reset target: \"%s\"", target),
1444  errhint("Target must be \"archiver\", \"bgwriter\", \"wal\" or \"prefetch_recovery\".")));
1445 
1447  pgstat_send(&msg, sizeof(msg));
1448 }
1449 
1450 /* ----------
1451  * pgstat_reset_single_counter() -
1452  *
1453  * Tell the statistics collector to reset a single counter.
1454  *
1455  * Permission checking for this function is managed through the normal
1456  * GRANT system.
1457  * ----------
1458  */
1459 void
1461 {
1463 
1465  return;
1466 
1468  msg.m_databaseid = MyDatabaseId;
1469  msg.m_resettype = type;
1470  msg.m_objectid = objoid;
1471 
1472  pgstat_send(&msg, sizeof(msg));
1473 }
1474 
1475 /* ----------
1476  * pgstat_reset_slru_counter() -
1477  *
1478  * Tell the statistics collector to reset a single SLRU counter, or all
1479  * SLRU counters (when name is null).
1480  *
1481  * Permission checking for this function is managed through the normal
1482  * GRANT system.
1483  * ----------
1484  */
1485 void
1487 {
1489 
1491  return;
1492 
1494  msg.m_index = (name) ? pgstat_slru_index(name) : -1;
1495 
1496  pgstat_send(&msg, sizeof(msg));
1497 }
1498 
1499 /* ----------
1500  * pgstat_reset_replslot_counter() -
1501  *
1502  * Tell the statistics collector to reset a single replication slot
1503  * counter, or all replication slots counters (when name is null).
1504  *
1505  * Permission checking for this function is managed through the normal
1506  * GRANT system.
1507  * ----------
1508  */
1509 void
1511 {
1513 
1515  return;
1516 
1517  if (name)
1518  {
1519  ReplicationSlot *slot;
1520 
1521  /*
1522  * Check if the slot exists with the given name. It is possible that by
1523  * the time this message is executed the slot is dropped but at least
1524  * this check will ensure that the given name is for a valid slot.
1525  */
1526  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1527  slot = SearchNamedReplicationSlot(name);
1528  LWLockRelease(ReplicationSlotControlLock);
1529 
1530  if (!slot)
1531  ereport(ERROR,
1532  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1533  errmsg("replication slot \"%s\" does not exist",
1534  name)));
1535 
1536  /*
1537  * Nothing to do for physical slots as we collect stats only for
1538  * logical slots.
1539  */
1540  if (SlotIsPhysical(slot))
1541  return;
1542 
1543  namestrcpy(&msg.m_slotname, name);
1544  msg.clearall = false;
1545  }
1546  else
1547  msg.clearall = true;
1548 
1550 
1551  pgstat_send(&msg, sizeof(msg));
1552 }
1553 
1554 /* ----------
1555  * pgstat_report_autovac() -
1556  *
1557  * Called from autovacuum.c to report startup of an autovacuum process.
1558  * We are called before InitPostgres is done, so can't rely on MyDatabaseId;
1559  * the db OID must be passed in, instead.
1560  * ----------
1561  */
1562 void
1564 {
1566 
1568  return;
1569 
1571  msg.m_databaseid = dboid;
1573 
1574  pgstat_send(&msg, sizeof(msg));
1575 }
1576 
1577 
1578 /* ---------
1579  * pgstat_report_vacuum() -
1580  *
1581  * Tell the collector about the table we just vacuumed.
1582  * ---------
1583  */
1584 void
1585 pgstat_report_vacuum(Oid tableoid, bool shared,
1586  PgStat_Counter livetuples, PgStat_Counter deadtuples)
1587 {
1588  PgStat_MsgVacuum msg;
1589 
1591  return;
1592 
1594  msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
1595  msg.m_tableoid = tableoid;
1598  msg.m_live_tuples = livetuples;
1599  msg.m_dead_tuples = deadtuples;
1600  pgstat_send(&msg, sizeof(msg));
1601 }
1602 
1603 /* --------
1604  * pgstat_report_analyze() -
1605  *
1606  * Tell the collector about the table we just analyzed.
1607  *
1608  * Caller must provide new live- and dead-tuples estimates, as well as a
1609  * flag indicating whether to reset the changes_since_analyze counter.
1610  * Exceptional support only changes_since_analyze for partitioned tables,
1611  * though they don't have any data. This counter will tell us whether
1612  * partitioned tables need autoanalyze or not.
1613  * --------
1614  */
1615 void
1617  PgStat_Counter livetuples, PgStat_Counter deadtuples,
1618  bool resetcounter)
1619 {
1620  PgStat_MsgAnalyze msg;
1621 
1623  return;
1624 
1625  /*
1626  * Unlike VACUUM, ANALYZE might be running inside a transaction that has
1627  * already inserted and/or deleted rows in the target table. ANALYZE will
1628  * have counted such rows as live or dead respectively. Because we will
1629  * report our counts of such rows at transaction end, we should subtract
1630  * off these counts from what we send to the collector now, else they'll
1631  * be double-counted after commit. (This approach also ensures that the
1632  * collector ends up with the right numbers if we abort instead of
1633  * committing.)
1634  *
1635  * For partitioned tables, we don't report live and dead tuples, because
1636  * such tables don't have any data.
1637  */
1638  if (rel->pgstat_info != NULL)
1639  {
1641 
1642  if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1643  /* If this rel is partitioned, skip modifying */
1644  livetuples = deadtuples = 0;
1645  else
1646  {
1647  for (trans = rel->pgstat_info->trans; trans; trans = trans->upper)
1648  {
1649  livetuples -= trans->tuples_inserted - trans->tuples_deleted;
1650  deadtuples -= trans->tuples_updated + trans->tuples_deleted;
1651  }
1652  /* count stuff inserted by already-aborted subxacts, too */
1653  deadtuples -= rel->pgstat_info->t_counts.t_delta_dead_tuples;
1654  /* Since ANALYZE's counts are estimates, we could have underflowed */
1655  livetuples = Max(livetuples, 0);
1656  deadtuples = Max(deadtuples, 0);
1657  }
1658 
1659  }
1660 
1662  msg.m_databaseid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId;
1663  msg.m_tableoid = RelationGetRelid(rel);
1665  msg.m_resetcounter = resetcounter;
1667  msg.m_live_tuples = livetuples;
1668  msg.m_dead_tuples = deadtuples;
1669  pgstat_send(&msg, sizeof(msg));
1670 
1671 }
1672 
1673 /*
1674  * pgstat_report_anl_ancestors
1675  *
1676  * Send list of partitioned table ancestors of the given partition to the
1677  * collector. The collector is in charge of propagating the analyze tuple
1678  * counts from the partition to its ancestors. This is necessary so that
1679  * other processes can decide whether to analyze the partitioned tables.
1680  */
1681 void
1683 {
1685  List *ancestors;
1686  ListCell *lc;
1687 
1689  msg.m_databaseid = MyDatabaseId;
1690  msg.m_tableoid = relid;
1691  msg.m_nancestors = 0;
1692 
1693  ancestors = get_partition_ancestors(relid);
1694  foreach(lc, ancestors)
1695  {
1696  Oid ancestor = lfirst_oid(lc);
1697 
1698  msg.m_ancestors[msg.m_nancestors] = ancestor;
1700  {
1701  pgstat_send(&msg, offsetof(PgStat_MsgAnlAncestors, m_ancestors[0]) +
1702  msg.m_nancestors * sizeof(Oid));
1703  msg.m_nancestors = 0;
1704  }
1705  }
1706 
1707  if (msg.m_nancestors > 0)
1708  pgstat_send(&msg, offsetof(PgStat_MsgAnlAncestors, m_ancestors[0]) +
1709  msg.m_nancestors * sizeof(Oid));
1710 
1711  list_free(ancestors);
1712 }
1713 
1714 /* --------
1715  * pgstat_report_recovery_conflict() -
1716  *
1717  * Tell the collector about a Hot Standby recovery conflict.
1718  * --------
1719  */
1720 void
1722 {
1724 
1726  return;
1727 
1729  msg.m_databaseid = MyDatabaseId;
1730  msg.m_reason = reason;
1731  pgstat_send(&msg, sizeof(msg));
1732 }
1733 
1734 /* --------
1735  * pgstat_report_deadlock() -
1736  *
1737  * Tell the collector about a deadlock detected.
1738  * --------
1739  */
1740 void
1742 {
1743  PgStat_MsgDeadlock msg;
1744 
1746  return;
1747 
1749  msg.m_databaseid = MyDatabaseId;
1750  pgstat_send(&msg, sizeof(msg));
1751 }
1752 
1753 
1754 
1755 /* --------
1756  * pgstat_report_checksum_failures_in_db() -
1757  *
1758  * Tell the collector about one or more checksum failures.
1759  * --------
1760  */
1761 void
1763 {
1765 
1767  return;
1768 
1770  msg.m_databaseid = dboid;
1771  msg.m_failurecount = failurecount;
1773 
1774  pgstat_send(&msg, sizeof(msg));
1775 }
1776 
1777 /* --------
1778  * pgstat_report_checksum_failure() -
1779  *
1780  * Tell the collector about a checksum failure.
1781  * --------
1782  */
1783 void
1785 {
1787 }
1788 
1789 /* --------
1790  * pgstat_report_tempfile() -
1791  *
1792  * Tell the collector about a temporary file.
1793  * --------
1794  */
1795 void
1796 pgstat_report_tempfile(size_t filesize)
1797 {
1798  PgStat_MsgTempFile msg;
1799 
1801  return;
1802 
1804  msg.m_databaseid = MyDatabaseId;
1805  msg.m_filesize = filesize;
1806  pgstat_send(&msg, sizeof(msg));
1807 }
1808 
1809 /* ----------
1810  * pgstat_report_replslot() -
1811  *
1812  * Tell the collector about replication slot statistics.
1813  * ----------
1814  */
1815 void
1817 {
1818  PgStat_MsgReplSlot msg;
1819 
1820  /*
1821  * Prepare and send the message
1822  */
1824  namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname));
1825  msg.m_drop = false;
1826  msg.m_spill_txns = repSlotStat->spill_txns;
1827  msg.m_spill_count = repSlotStat->spill_count;
1828  msg.m_spill_bytes = repSlotStat->spill_bytes;
1829  msg.m_stream_txns = repSlotStat->stream_txns;
1830  msg.m_stream_count = repSlotStat->stream_count;
1831  msg.m_stream_bytes = repSlotStat->stream_bytes;
1832  msg.m_total_txns = repSlotStat->total_txns;
1833  msg.m_total_bytes = repSlotStat->total_bytes;
1834  pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
1835 }
1836 
1837 /* ----------
1838  * pgstat_report_replslot_drop() -
1839  *
1840  * Tell the collector about dropping the replication slot.
1841  * ----------
1842  */
1843 void
1844 pgstat_report_replslot_drop(const char *slotname)
1845 {
1846  PgStat_MsgReplSlot msg;
1847 
1849  namestrcpy(&msg.m_slotname, slotname);
1850  msg.m_drop = true;
1851  pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
1852 }
1853 
1854 /* ----------
1855  * pgstat_ping() -
1856  *
1857  * Send some junk data to the collector to increase traffic.
1858  * ----------
1859  */
1860 void
1862 {
1863  PgStat_MsgDummy msg;
1864 
1866  return;
1867 
1869  pgstat_send(&msg, sizeof(msg));
1870 }
1871 
1872 /* ----------
1873  * pgstat_send_inquiry() -
1874  *
1875  * Notify collector that we need fresh data.
1876  * ----------
1877  */
1878 static void
1879 pgstat_send_inquiry(TimestampTz clock_time, TimestampTz cutoff_time, Oid databaseid)
1880 {
1881  PgStat_MsgInquiry msg;
1882 
1884  msg.clock_time = clock_time;
1885  msg.cutoff_time = cutoff_time;
1886  msg.databaseid = databaseid;
1887  pgstat_send(&msg, sizeof(msg));
1888 }
1889 
1890 
1891 /*
1892  * Initialize function call usage data.
1893  * Called by the executor before invoking a function.
1894  */
1895 void
1898 {
1899  PgStat_BackendFunctionEntry *htabent;
1900  bool found;
1901 
1902  if (pgstat_track_functions <= fcinfo->flinfo->fn_stats)
1903  {
1904  /* stats not wanted */
1905  fcu->fs = NULL;
1906  return;
1907  }
1908 
1909  if (!pgStatFunctions)
1910  {
1911  /* First time through - initialize function stat table */
1912  HASHCTL hash_ctl;
1913 
1914  hash_ctl.keysize = sizeof(Oid);
1915  hash_ctl.entrysize = sizeof(PgStat_BackendFunctionEntry);
1916  pgStatFunctions = hash_create("Function stat entries",
1918  &hash_ctl,
1919  HASH_ELEM | HASH_BLOBS);
1920  }
1921 
1922  /* Get the stats entry for this function, create if necessary */
1923  htabent = hash_search(pgStatFunctions, &fcinfo->flinfo->fn_oid,
1924  HASH_ENTER, &found);
1925  if (!found)
1926  MemSet(&htabent->f_counts, 0, sizeof(PgStat_FunctionCounts));
1927 
1928  fcu->fs = &htabent->f_counts;
1929 
1930  /* save stats for this function, later used to compensate for recursion */
1931  fcu->save_f_total_time = htabent->f_counts.f_total_time;
1932 
1933  /* save current backend-wide total time */
1934  fcu->save_total = total_func_time;
1935 
1936  /* get clock time as of function start */
1938 }
1939 
1940 /*
1941  * find_funcstat_entry - find any existing PgStat_BackendFunctionEntry entry
1942  * for specified function
1943  *
1944  * If no entry, return NULL, don't create a new one
1945  */
1948 {
1949  if (pgStatFunctions == NULL)
1950  return NULL;
1951 
1952  return (PgStat_BackendFunctionEntry *) hash_search(pgStatFunctions,
1953  (void *) &func_id,
1954  HASH_FIND, NULL);
1955 }
1956 
1957 /*
1958  * Calculate function call usage and update stat counters.
1959  * Called by the executor after invoking a function.
1960  *
1961  * In the case of a set-returning function that runs in value-per-call mode,
1962  * we will see multiple pgstat_init_function_usage/pgstat_end_function_usage
1963  * calls for what the user considers a single call of the function. The
1964  * finalize flag should be TRUE on the last call.
1965  */
1966 void
1968 {
1969  PgStat_FunctionCounts *fs = fcu->fs;
1970  instr_time f_total;
1971  instr_time f_others;
1972  instr_time f_self;
1973 
1974  /* stats not wanted? */
1975  if (fs == NULL)
1976  return;
1977 
1978  /* total elapsed time in this function call */
1979  INSTR_TIME_SET_CURRENT(f_total);
1980  INSTR_TIME_SUBTRACT(f_total, fcu->f_start);
1981 
1982  /* self usage: elapsed minus anything already charged to other calls */
1983  f_others = total_func_time;
1984  INSTR_TIME_SUBTRACT(f_others, fcu->save_total);
1985  f_self = f_total;
1986  INSTR_TIME_SUBTRACT(f_self, f_others);
1987 
1988  /* update backend-wide total time */
1990 
1991  /*
1992  * Compute the new f_total_time as the total elapsed time added to the
1993  * pre-call value of f_total_time. This is necessary to avoid
1994  * double-counting any time taken by recursive calls of myself. (We do
1995  * not need any similar kluge for self time, since that already excludes
1996  * any recursive calls.)
1997  */
1998  INSTR_TIME_ADD(f_total, fcu->save_f_total_time);
1999 
2000  /* update counters in function stats table */
2001  if (finalize)
2002  fs->f_numcalls++;
2003  fs->f_total_time = f_total;
2004  INSTR_TIME_ADD(fs->f_self_time, f_self);
2005 
2006  /* indicate that we have something to send */
2007  have_function_stats = true;
2008 }
2009 
2010 
2011 /* ----------
2012  * pgstat_initstats() -
2013  *
2014  * Initialize a relcache entry to count access statistics.
2015  * Called whenever a relation is opened.
2016  *
2017  * We assume that a relcache entry's pgstat_info field is zeroed by
2018  * relcache.c when the relcache entry is made; thereafter it is long-lived
2019  * data. We can avoid repeated searches of the TabStatus arrays when the
2020  * same relation is touched repeatedly within a transaction.
2021  * ----------
2022  */
2023 void
2025 {
2026  Oid rel_id = rel->rd_id;
2027  char relkind = rel->rd_rel->relkind;
2028 
2029  /* We only count stats for things that have storage */
2030  if (!RELKIND_HAS_STORAGE(relkind) &&
2031  relkind != RELKIND_PARTITIONED_TABLE)
2032  {
2033  rel->pgstat_info = NULL;
2034  return;
2035  }
2036 
2038  {
2039  /* We're not counting at all */
2040  rel->pgstat_info = NULL;
2041  return;
2042  }
2043 
2044  /*
2045  * If we already set up this relation in the current transaction, nothing
2046  * to do.
2047  */
2048  if (rel->pgstat_info != NULL &&
2049  rel->pgstat_info->t_id == rel_id)
2050  return;
2051 
2052  /* Else find or make the PgStat_TableStatus entry, and update link */
2053  rel->pgstat_info = get_tabstat_entry(rel_id, rel->rd_rel->relisshared);
2054 }
2055 
2056 /*
2057  * get_tabstat_entry - find or create a PgStat_TableStatus entry for rel
2058  */
2059 static PgStat_TableStatus *
2060 get_tabstat_entry(Oid rel_id, bool isshared)
2061 {
2062  TabStatHashEntry *hash_entry;
2063  PgStat_TableStatus *entry;
2064  TabStatusArray *tsa;
2065  bool found;
2066 
2067  /*
2068  * Create hash table if we don't have it already.
2069  */
2070  if (pgStatTabHash == NULL)
2071  {
2072  HASHCTL ctl;
2073 
2074  ctl.keysize = sizeof(Oid);
2075  ctl.entrysize = sizeof(TabStatHashEntry);
2076 
2077  pgStatTabHash = hash_create("pgstat TabStatusArray lookup hash table",
2079  &ctl,
2080  HASH_ELEM | HASH_BLOBS);
2081  }
2082 
2083  /*
2084  * Find an entry or create a new one.
2085  */
2086  hash_entry = hash_search(pgStatTabHash, &rel_id, HASH_ENTER, &found);
2087  if (!found)
2088  {
2089  /* initialize new entry with null pointer */
2090  hash_entry->tsa_entry = NULL;
2091  }
2092 
2093  /*
2094  * If entry is already valid, we're done.
2095  */
2096  if (hash_entry->tsa_entry)
2097  return hash_entry->tsa_entry;
2098 
2099  /*
2100  * Locate the first pgStatTabList entry with free space, making a new list
2101  * entry if needed. Note that we could get an OOM failure here, but if so
2102  * we have left the hashtable and the list in a consistent state.
2103  */
2104  if (pgStatTabList == NULL)
2105  {
2106  /* Set up first pgStatTabList entry */
2107  pgStatTabList = (TabStatusArray *)
2109  sizeof(TabStatusArray));
2110  }
2111 
2112  tsa = pgStatTabList;
2113  while (tsa->tsa_used >= TABSTAT_QUANTUM)
2114  {
2115  if (tsa->tsa_next == NULL)
2116  tsa->tsa_next = (TabStatusArray *)
2118  sizeof(TabStatusArray));
2119  tsa = tsa->tsa_next;
2120  }
2121 
2122  /*
2123  * Allocate a PgStat_TableStatus entry within this list entry. We assume
2124  * the entry was already zeroed, either at creation or after last use.
2125  */
2126  entry = &tsa->tsa_entries[tsa->tsa_used++];
2127  entry->t_id = rel_id;
2128  entry->t_shared = isshared;
2129 
2130  /*
2131  * Now we can fill the entry in pgStatTabHash.
2132  */
2133  hash_entry->tsa_entry = entry;
2134 
2135  return entry;
2136 }
2137 
2138 /*
2139  * find_tabstat_entry - find any existing PgStat_TableStatus entry for rel
2140  *
2141  * If no entry, return NULL, don't create a new one
2142  *
2143  * Note: if we got an error in the most recent execution of pgstat_report_stat,
2144  * it's possible that an entry exists but there's no hashtable entry for it.
2145  * That's okay, we'll treat this case as "doesn't exist".
2146  */
2149 {
2150  TabStatHashEntry *hash_entry;
2151 
2152  /* If hashtable doesn't exist, there are no entries at all */
2153  if (!pgStatTabHash)
2154  return NULL;
2155 
2156  hash_entry = hash_search(pgStatTabHash, &rel_id, HASH_FIND, NULL);
2157  if (!hash_entry)
2158  return NULL;
2159 
2160  /* Note that this step could also return NULL, but that's correct */
2161  return hash_entry->tsa_entry;
2162 }
2163 
2164 /*
2165  * get_tabstat_stack_level - add a new (sub)transaction stack entry if needed
2166  */
2167 static PgStat_SubXactStatus *
2169 {
2170  PgStat_SubXactStatus *xact_state;
2171 
2172  xact_state = pgStatXactStack;
2173  if (xact_state == NULL || xact_state->nest_level != nest_level)
2174  {
2175  xact_state = (PgStat_SubXactStatus *)
2177  sizeof(PgStat_SubXactStatus));
2178  xact_state->nest_level = nest_level;
2179  xact_state->prev = pgStatXactStack;
2180  xact_state->first = NULL;
2181  pgStatXactStack = xact_state;
2182  }
2183  return xact_state;
2184 }
2185 
2186 /*
2187  * add_tabstat_xact_level - add a new (sub)transaction state record
2188  */
2189 static void
2190 add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
2191 {
2192  PgStat_SubXactStatus *xact_state;
2194 
2195  /*
2196  * If this is the first rel to be modified at the current nest level, we
2197  * first have to push a transaction stack entry.
2198  */
2199  xact_state = get_tabstat_stack_level(nest_level);
2200 
2201  /* Now make a per-table stack entry */
2202  trans = (PgStat_TableXactStatus *)
2204  sizeof(PgStat_TableXactStatus));
2205  trans->nest_level = nest_level;
2206  trans->upper = pgstat_info->trans;
2207  trans->parent = pgstat_info;
2208  trans->next = xact_state->first;
2209  xact_state->first = trans;
2210  pgstat_info->trans = trans;
2211 }
2212 
2213 /*
2214  * pgstat_count_heap_insert - count a tuple insertion of n tuples
2215  */
2216 void
2218 {
2219  PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2220 
2221  if (pgstat_info != NULL)
2222  {
2223  /* We have to log the effect at the proper transactional level */
2224  int nest_level = GetCurrentTransactionNestLevel();
2225 
2226  if (pgstat_info->trans == NULL ||
2227  pgstat_info->trans->nest_level != nest_level)
2228  add_tabstat_xact_level(pgstat_info, nest_level);
2229 
2230  pgstat_info->trans->tuples_inserted += n;
2231  }
2232 }
2233 
2234 /*
2235  * pgstat_count_heap_update - count a tuple update
2236  */
2237 void
2239 {
2240  PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2241 
2242  if (pgstat_info != NULL)
2243  {
2244  /* We have to log the effect at the proper transactional level */
2245  int nest_level = GetCurrentTransactionNestLevel();
2246 
2247  if (pgstat_info->trans == NULL ||
2248  pgstat_info->trans->nest_level != nest_level)
2249  add_tabstat_xact_level(pgstat_info, nest_level);
2250 
2251  pgstat_info->trans->tuples_updated++;
2252 
2253  /* t_tuples_hot_updated is nontransactional, so just advance it */
2254  if (hot)
2255  pgstat_info->t_counts.t_tuples_hot_updated++;
2256  }
2257 }
2258 
2259 /*
2260  * pgstat_count_heap_delete - count a tuple deletion
2261  */
2262 void
2264 {
2265  PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2266 
2267  if (pgstat_info != NULL)
2268  {
2269  /* We have to log the effect at the proper transactional level */
2270  int nest_level = GetCurrentTransactionNestLevel();
2271 
2272  if (pgstat_info->trans == NULL ||
2273  pgstat_info->trans->nest_level != nest_level)
2274  add_tabstat_xact_level(pgstat_info, nest_level);
2275 
2276  pgstat_info->trans->tuples_deleted++;
2277  }
2278 }
2279 
2280 /*
2281  * pgstat_truncate_save_counters
2282  *
2283  * Whenever a table is truncated, we save its i/u/d counters so that they can
2284  * be cleared, and if the (sub)xact that executed the truncate later aborts,
2285  * the counters can be restored to the saved (pre-truncate) values. Note we do
2286  * this on the first truncate in any particular subxact level only.
2287  */
2288 static void
2290 {
2291  if (!trans->truncated)
2292  {
2293  trans->inserted_pre_trunc = trans->tuples_inserted;
2294  trans->updated_pre_trunc = trans->tuples_updated;
2295  trans->deleted_pre_trunc = trans->tuples_deleted;
2296  trans->truncated = true;
2297  }
2298 }
2299 
2300 /*
2301  * pgstat_truncate_restore_counters - restore counters when a truncate aborts
2302  */
2303 static void
2305 {
2306  if (trans->truncated)
2307  {
2308  trans->tuples_inserted = trans->inserted_pre_trunc;
2309  trans->tuples_updated = trans->updated_pre_trunc;
2310  trans->tuples_deleted = trans->deleted_pre_trunc;
2311  }
2312 }
2313 
2314 /*
2315  * pgstat_count_truncate - update tuple counters due to truncate
2316  */
2317 void
2319 {
2320  PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2321 
2322  if (pgstat_info != NULL)
2323  {
2324  /* We have to log the effect at the proper transactional level */
2325  int nest_level = GetCurrentTransactionNestLevel();
2326 
2327  if (pgstat_info->trans == NULL ||
2328  pgstat_info->trans->nest_level != nest_level)
2329  add_tabstat_xact_level(pgstat_info, nest_level);
2330 
2331  pgstat_truncate_save_counters(pgstat_info->trans);
2332  pgstat_info->trans->tuples_inserted = 0;
2333  pgstat_info->trans->tuples_updated = 0;
2334  pgstat_info->trans->tuples_deleted = 0;
2335  }
2336 }
2337 
2338 /*
2339  * pgstat_update_heap_dead_tuples - update dead-tuples count
2340  *
2341  * The semantics of this are that we are reporting the nontransactional
2342  * recovery of "delta" dead tuples; so t_delta_dead_tuples decreases
2343  * rather than increasing, and the change goes straight into the per-table
2344  * counter, not into transactional state.
2345  */
2346 void
2348 {
2349  PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2350 
2351  if (pgstat_info != NULL)
2352  pgstat_info->t_counts.t_delta_dead_tuples -= delta;
2353 }
2354 
2355 
2356 /* ----------
2357  * AtEOXact_PgStat
2358  *
2359  * Called from access/transam/xact.c at top-level transaction commit/abort.
2360  * ----------
2361  */
2362 void
2363 AtEOXact_PgStat(bool isCommit, bool parallel)
2364 {
2365  PgStat_SubXactStatus *xact_state;
2366 
2367  /* Don't count parallel worker transaction stats */
2368  if (!parallel)
2369  {
2370  /*
2371  * Count transaction commit or abort. (We use counters, not just
2372  * bools, in case the reporting message isn't sent right away.)
2373  */
2374  if (isCommit)
2375  pgStatXactCommit++;
2376  else
2378  }
2379 
2380  /*
2381  * Transfer transactional insert/update counts into the base tabstat
2382  * entries. We don't bother to free any of the transactional state, since
2383  * it's all in TopTransactionContext and will go away anyway.
2384  */
2385  xact_state = pgStatXactStack;
2386  if (xact_state != NULL)
2387  {
2389 
2390  Assert(xact_state->nest_level == 1);
2391  Assert(xact_state->prev == NULL);
2392  for (trans = xact_state->first; trans != NULL; trans = trans->next)
2393  {
2394  PgStat_TableStatus *tabstat;
2395 
2396  Assert(trans->nest_level == 1);
2397  Assert(trans->upper == NULL);
2398  tabstat = trans->parent;
2399  Assert(tabstat->trans == trans);
2400  /* restore pre-truncate stats (if any) in case of aborted xact */
2401  if (!isCommit)
2403  /* count attempted actions regardless of commit/abort */
2404  tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
2405  tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
2406  tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
2407  if (isCommit)
2408  {
2409  tabstat->t_counts.t_truncated = trans->truncated;
2410  if (trans->truncated)
2411  {
2412  /* forget live/dead stats seen by backend thus far */
2413  tabstat->t_counts.t_delta_live_tuples = 0;
2414  tabstat->t_counts.t_delta_dead_tuples = 0;
2415  }
2416  /* insert adds a live tuple, delete removes one */
2417  tabstat->t_counts.t_delta_live_tuples +=
2418  trans->tuples_inserted - trans->tuples_deleted;
2419  /* update and delete each create a dead tuple */
2420  tabstat->t_counts.t_delta_dead_tuples +=
2421  trans->tuples_updated + trans->tuples_deleted;
2422  /* insert, update, delete each count as one change event */
2423  tabstat->t_counts.t_changed_tuples +=
2424  trans->tuples_inserted + trans->tuples_updated +
2425  trans->tuples_deleted;
2426  }
2427  else
2428  {
2429  /* inserted tuples are dead, deleted tuples are unaffected */
2430  tabstat->t_counts.t_delta_dead_tuples +=
2431  trans->tuples_inserted + trans->tuples_updated;
2432  /* an aborted xact generates no changed_tuple events */
2433  }
2434  tabstat->trans = NULL;
2435  }
2436  }
2437  pgStatXactStack = NULL;
2438 
2439  /* Make sure any stats snapshot is thrown away */
2441 }
2442 
2443 /* ----------
2444  * AtEOSubXact_PgStat
2445  *
2446  * Called from access/transam/xact.c at subtransaction commit/abort.
2447  * ----------
2448  */
2449 void
2450 AtEOSubXact_PgStat(bool isCommit, int nestDepth)
2451 {
2452  PgStat_SubXactStatus *xact_state;
2453 
2454  /*
2455  * Transfer transactional insert/update counts into the next higher
2456  * subtransaction state.
2457  */
2458  xact_state = pgStatXactStack;
2459  if (xact_state != NULL &&
2460  xact_state->nest_level >= nestDepth)
2461  {
2463  PgStat_TableXactStatus *next_trans;
2464 
2465  /* delink xact_state from stack immediately to simplify reuse case */
2466  pgStatXactStack = xact_state->prev;
2467 
2468  for (trans = xact_state->first; trans != NULL; trans = next_trans)
2469  {
2470  PgStat_TableStatus *tabstat;
2471 
2472  next_trans = trans->next;
2473  Assert(trans->nest_level == nestDepth);
2474  tabstat = trans->parent;
2475  Assert(tabstat->trans == trans);
2476  if (isCommit)
2477  {
2478  if (trans->upper && trans->upper->nest_level == nestDepth - 1)
2479  {
2480  if (trans->truncated)
2481  {
2482  /* propagate the truncate status one level up */
2484  /* replace upper xact stats with ours */
2485  trans->upper->tuples_inserted = trans->tuples_inserted;
2486  trans->upper->tuples_updated = trans->tuples_updated;
2487  trans->upper->tuples_deleted = trans->tuples_deleted;
2488  }
2489  else
2490  {
2491  trans->upper->tuples_inserted += trans->tuples_inserted;
2492  trans->upper->tuples_updated += trans->tuples_updated;
2493  trans->upper->tuples_deleted += trans->tuples_deleted;
2494  }
2495  tabstat->trans = trans->upper;
2496  pfree(trans);
2497  }
2498  else
2499  {
2500  /*
2501  * When there isn't an immediate parent state, we can just
2502  * reuse the record instead of going through a
2503  * palloc/pfree pushup (this works since it's all in
2504  * TopTransactionContext anyway). We have to re-link it
2505  * into the parent level, though, and that might mean
2506  * pushing a new entry into the pgStatXactStack.
2507  */
2508  PgStat_SubXactStatus *upper_xact_state;
2509 
2510  upper_xact_state = get_tabstat_stack_level(nestDepth - 1);
2511  trans->next = upper_xact_state->first;
2512  upper_xact_state->first = trans;
2513  trans->nest_level = nestDepth - 1;
2514  }
2515  }
2516  else
2517  {
2518  /*
2519  * On abort, update top-level tabstat counts, then forget the
2520  * subtransaction
2521  */
2522 
2523  /* first restore values obliterated by truncate */
2525  /* count attempted actions regardless of commit/abort */
2526  tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
2527  tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
2528  tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
2529  /* inserted tuples are dead, deleted tuples are unaffected */
2530  tabstat->t_counts.t_delta_dead_tuples +=
2531  trans->tuples_inserted + trans->tuples_updated;
2532  tabstat->trans = trans->upper;
2533  pfree(trans);
2534  }
2535  }
2536  pfree(xact_state);
2537  }
2538 }
2539 
2540 
2541 /*
2542  * AtPrepare_PgStat
2543  * Save the transactional stats state at 2PC transaction prepare.
2544  *
2545  * In this phase we just generate 2PC records for all the pending
2546  * transaction-dependent stats work.
2547  */
2548 void
2550 {
2551  PgStat_SubXactStatus *xact_state;
2552 
2553  xact_state = pgStatXactStack;
2554  if (xact_state != NULL)
2555  {
2557 
2558  Assert(xact_state->nest_level == 1);
2559  Assert(xact_state->prev == NULL);
2560  for (trans = xact_state->first; trans != NULL; trans = trans->next)
2561  {
2562  PgStat_TableStatus *tabstat;
2563  TwoPhasePgStatRecord record;
2564 
2565  Assert(trans->nest_level == 1);
2566  Assert(trans->upper == NULL);
2567  tabstat = trans->parent;
2568  Assert(tabstat->trans == trans);
2569 
2570  record.tuples_inserted = trans->tuples_inserted;
2571  record.tuples_updated = trans->tuples_updated;
2572  record.tuples_deleted = trans->tuples_deleted;
2573  record.inserted_pre_trunc = trans->inserted_pre_trunc;
2574  record.updated_pre_trunc = trans->updated_pre_trunc;
2575  record.deleted_pre_trunc = trans->deleted_pre_trunc;
2576  record.t_id = tabstat->t_id;
2577  record.t_shared = tabstat->t_shared;
2578  record.t_truncated = trans->truncated;
2579 
2581  &record, sizeof(TwoPhasePgStatRecord));
2582  }
2583  }
2584 }
2585 
2586 /*
2587  * PostPrepare_PgStat
2588  * Clean up after successful PREPARE.
2589  *
2590  * All we need do here is unlink the transaction stats state from the
2591  * nontransactional state. The nontransactional action counts will be
2592  * reported to the stats collector immediately, while the effects on live
2593  * and dead tuple counts are preserved in the 2PC state file.
2594  *
2595  * Note: AtEOXact_PgStat is not called during PREPARE.
2596  */
2597 void
2599 {
2600  PgStat_SubXactStatus *xact_state;
2601 
2602  /*
2603  * We don't bother to free any of the transactional state, since it's all
2604  * in TopTransactionContext and will go away anyway.
2605  */
2606  xact_state = pgStatXactStack;
2607  if (xact_state != NULL)
2608  {
2610 
2611  for (trans = xact_state->first; trans != NULL; trans = trans->next)
2612  {
2613  PgStat_TableStatus *tabstat;
2614 
2615  tabstat = trans->parent;
2616  tabstat->trans = NULL;
2617  }
2618  }
2619  pgStatXactStack = NULL;
2620 
2621  /* Make sure any stats snapshot is thrown away */
2623 }
2624 
2625 /*
2626  * 2PC processing routine for COMMIT PREPARED case.
2627  *
2628  * Load the saved counts into our local pgstats state.
2629  */
2630 void
2632  void *recdata, uint32 len)
2633 {
2634  TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
2635  PgStat_TableStatus *pgstat_info;
2636 
2637  /* Find or create a tabstat entry for the rel */
2638  pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
2639 
2640  /* Same math as in AtEOXact_PgStat, commit case */
2641  pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
2642  pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
2643  pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
2644  pgstat_info->t_counts.t_truncated = rec->t_truncated;
2645  if (rec->t_truncated)
2646  {
2647  /* forget live/dead stats seen by backend thus far */
2648  pgstat_info->t_counts.t_delta_live_tuples = 0;
2649  pgstat_info->t_counts.t_delta_dead_tuples = 0;
2650  }
2651  pgstat_info->t_counts.t_delta_live_tuples +=
2652  rec->tuples_inserted - rec->tuples_deleted;
2653  pgstat_info->t_counts.t_delta_dead_tuples +=
2654  rec->tuples_updated + rec->tuples_deleted;
2655  pgstat_info->t_counts.t_changed_tuples +=
2656  rec->tuples_inserted + rec->tuples_updated +
2657  rec->tuples_deleted;
2658 }
2659 
2660 /*
2661  * 2PC processing routine for ROLLBACK PREPARED case.
2662  *
2663  * Load the saved counts into our local pgstats state, but treat them
2664  * as aborted.
2665  */
2666 void
2668  void *recdata, uint32 len)
2669 {
2670  TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
2671  PgStat_TableStatus *pgstat_info;
2672 
2673  /* Find or create a tabstat entry for the rel */
2674  pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
2675 
2676  /* Same math as in AtEOXact_PgStat, abort case */
2677  if (rec->t_truncated)
2678  {
2679  rec->tuples_inserted = rec->inserted_pre_trunc;
2680  rec->tuples_updated = rec->updated_pre_trunc;
2681  rec->tuples_deleted = rec->deleted_pre_trunc;
2682  }
2683  pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
2684  pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
2685  pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
2686  pgstat_info->t_counts.t_delta_dead_tuples +=
2687  rec->tuples_inserted + rec->tuples_updated;
2688 }
2689 
2690 
2691 /* ----------
2692  * pgstat_fetch_stat_dbentry() -
2693  *
2694  * Support function for the SQL-callable pgstat* functions. Returns
2695  * the collected statistics for one database or NULL. NULL doesn't mean
2696  * that the database doesn't exist, it is just not yet known by the
2697  * collector, so the caller is better off to report ZERO instead.
2698  * ----------
2699  */
2702 {
2703  /*
2704  * If not done for this transaction, read the statistics collector stats
2705  * file into some hash tables.
2706  */
2708 
2709  /*
2710  * Lookup the requested database; return NULL if not found
2711  */
2712  return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2713  (void *) &dbid,
2714  HASH_FIND, NULL);
2715 }
2716 
2717 
2718 /* ----------
2719  * pgstat_fetch_stat_tabentry() -
2720  *
2721  * Support function for the SQL-callable pgstat* functions. Returns
2722  * the collected statistics for one table or NULL. NULL doesn't mean
2723  * that the table doesn't exist, it is just not yet known by the
2724  * collector, so the caller is better off to report ZERO instead.
2725  * ----------
2726  */
2729 {
2730  Oid dbid;
2731  PgStat_StatDBEntry *dbentry;
2732  PgStat_StatTabEntry *tabentry;
2733 
2734  /*
2735  * If not done for this transaction, read the statistics collector stats
2736  * file into some hash tables.
2737  */
2739 
2740  /*
2741  * Lookup our database, then look in its table hash table.
2742  */
2743  dbid = MyDatabaseId;
2744  dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2745  (void *) &dbid,
2746  HASH_FIND, NULL);
2747  if (dbentry != NULL && dbentry->tables != NULL)
2748  {
2749  tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2750  (void *) &relid,
2751  HASH_FIND, NULL);
2752  if (tabentry)
2753  return tabentry;
2754  }
2755 
2756  /*
2757  * If we didn't find it, maybe it's a shared table.
2758  */
2759  dbid = InvalidOid;
2760  dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2761  (void *) &dbid,
2762  HASH_FIND, NULL);
2763  if (dbentry != NULL && dbentry->tables != NULL)
2764  {
2765  tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2766  (void *) &relid,
2767  HASH_FIND, NULL);
2768  if (tabentry)
2769  return tabentry;
2770  }
2771 
2772  return NULL;
2773 }
2774 
2775 
2776 /* ----------
2777  * pgstat_fetch_stat_funcentry() -
2778  *
2779  * Support function for the SQL-callable pgstat* functions. Returns
2780  * the collected statistics for one function or NULL.
2781  * ----------
2782  */
2785 {
2786  PgStat_StatDBEntry *dbentry;
2787  PgStat_StatFuncEntry *funcentry = NULL;
2788 
2789  /* load the stats file if needed */
2791 
2792  /* Lookup our database, then find the requested function. */
2794  if (dbentry != NULL && dbentry->functions != NULL)
2795  {
2796  funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
2797  (void *) &func_id,
2798  HASH_FIND, NULL);
2799  }
2800 
2801  return funcentry;
2802 }
2803 
2804 
2805 /*
2806  * ---------
2807  * pgstat_fetch_stat_archiver() -
2808  *
2809  * Support function for the SQL-callable pgstat* functions. Returns
2810  * a pointer to the archiver statistics struct.
2811  * ---------
2812  */
2815 {
2817 
2818  return &archiverStats;
2819 }
2820 
2821 
2822 /*
2823  * ---------
2824  * pgstat_fetch_global() -
2825  *
2826  * Support function for the SQL-callable pgstat* functions. Returns
2827  * a pointer to the global statistics struct.
2828  * ---------
2829  */
2832 {
2834 
2835  return &globalStats;
2836 }
2837 
2838 /*
2839  * ---------
2840  * pgstat_fetch_stat_wal() -
2841  *
2842  * Support function for the SQL-callable pgstat* functions. Returns
2843  * a pointer to the WAL statistics struct.
2844  * ---------
2845  */
2848 {
2850 
2851  return &walStats;
2852 }
2853 
2854 /*
2855  * ---------
2856  * pgstat_fetch_slru() -
2857  *
2858  * Support function for the SQL-callable pgstat* functions. Returns
2859  * a pointer to the slru statistics struct.
2860  * ---------
2861  */
2864 {
2866 
2867  return slruStats;
2868 }
2869 
2870 /*
2871  * ---------
2872  * pgstat_fetch_replslot() -
2873  *
2874  * Support function for the SQL-callable pgstat* functions. Returns
2875  * a pointer to the replication slot statistics struct and sets the
2876  * number of entries in nslots_p.
2877  * ---------
2878  */
2881 {
2883 
2884  *nslots_p = nReplSlotStats;
2885  return replSlotStats;
2886 }
2887 
2888 /*
2889  * ---------
2890  * pgstat_fetch_recoveryprefetch() -
2891  *
2892  * Support function for restoring the counters managed by xlogprefetch.c.
2893  * ---------
2894  */
2897 {
2899 
2900  return &recoveryPrefetchStats;
2901 }
2902 
2903 
2904 /*
2905  * Shut down a single backend's statistics reporting at process exit.
2906  *
2907  * Flush any remaining statistics counts out to the collector.
2908  * Without this, operations triggered during backend exit (such as
2909  * temp table deletions) won't be counted.
2910  */
2911 static void
2913 {
2914  /*
2915  * If we got as far as discovering our own database ID, we can report what
2916  * we did to the collector. Otherwise, we'd be sending an invalid
2917  * database ID, so forget it. (This means that accesses to pg_database
2918  * during failed backend starts might never get counted.)
2919  */
2920  if (OidIsValid(MyDatabaseId))
2921  pgstat_report_stat(true);
2922 }
2923 
2924 /* ----------
2925  * pgstat_initialize() -
2926  *
2927  * Initialize pgstats state, and set up our on-proc-exit hook.
2928  * Called from InitPostgres and AuxiliaryProcessMain.
2929  *
2930  * NOTE: MyDatabaseId isn't set yet; so the shutdown hook has to be careful.
2931  * ----------
2932  */
2933 void
2935 {
2936  /*
2937  * Initialize prevWalUsage with pgWalUsage so that pgstat_report_wal() can
2938  * calculate how much pgWalUsage counters are increased by substracting
2939  * prevWalUsage from pgWalUsage.
2940  */
2941  prevWalUsage = pgWalUsage;
2942 
2943  /* Set up a process-exit hook to clean up */
2945 }
2946 
2947 /* ------------------------------------------------------------
2948  * Local support functions follow
2949  * ------------------------------------------------------------
2950  */
2951 
2952 
2953 /* ----------
2954  * pgstat_setheader() -
2955  *
2956  * Set common header fields in a statistics message
2957  * ----------
2958  */
2959 static void
2961 {
2962  hdr->m_type = mtype;
2963 }
2964 
2965 
2966 /* ----------
2967  * pgstat_send() -
2968  *
2969  * Send out one statistics message to the collector
2970  * ----------
2971  */
2972 static void
2973 pgstat_send(void *msg, int len)
2974 {
2975  int rc;
2976 
2978  return;
2979 
2980  ((PgStat_MsgHdr *) msg)->m_size = len;
2981 
2982  /* We'll retry after EINTR, but ignore all other failures */
2983  do
2984  {
2985  rc = send(pgStatSock, msg, len, 0);
2986  } while (rc < 0 && errno == EINTR);
2987 
2988 #ifdef USE_ASSERT_CHECKING
2989  /* In debug builds, log send failures ... */
2990  if (rc < 0)
2991  elog(LOG, "could not send to statistics collector: %m");
2992 #endif
2993 }
2994 
2995 /* ----------
2996  * pgstat_send_archiver() -
2997  *
2998  * Tell the collector about the WAL file that we successfully
2999  * archived or failed to archive.
3000  * ----------
3001  */
3002 void
3003 pgstat_send_archiver(const char *xlog, bool failed)
3004 {
3005  PgStat_MsgArchiver msg;
3006 
3007  /*
3008  * Prepare and send the message
3009  */
3011  msg.m_failed = failed;
3012  strlcpy(msg.m_xlog, xlog, sizeof(msg.m_xlog));
3014  pgstat_send(&msg, sizeof(msg));
3015 }
3016 
3017 /* ----------
3018  * pgstat_send_bgwriter() -
3019  *
3020  * Send bgwriter statistics to the collector
3021  * ----------
3022  */
3023 void
3025 {
3026  /* We assume this initializes to zeroes */
3027  static const PgStat_MsgBgWriter all_zeroes;
3028 
3029  /*
3030  * This function can be called even if nothing at all has happened. In
3031  * this case, avoid sending a completely empty message to the stats
3032  * collector.
3033  */
3034  if (memcmp(&BgWriterStats, &all_zeroes, sizeof(PgStat_MsgBgWriter)) == 0)
3035  return;
3036 
3037  /*
3038  * Prepare and send the message
3039  */
3040  pgstat_setheader(&BgWriterStats.m_hdr, PGSTAT_MTYPE_BGWRITER);
3041  pgstat_send(&BgWriterStats, sizeof(BgWriterStats));
3042 
3043  /*
3044  * Clear out the statistics buffer, so it can be re-used.
3045  */
3046  MemSet(&BgWriterStats, 0, sizeof(BgWriterStats));
3047 }
3048 
3049 /* ----------
3050  * pgstat_report_wal() -
3051  *
3052  * Calculate how much WAL usage counters are increased and send
3053  * WAL statistics to the collector.
3054  *
3055  * Must be called by processes that generate WAL.
3056  * ----------
3057  */
3058 void
3060 {
3061  WalUsage walusage;
3062 
3063  /*
3064  * Calculate how much WAL usage counters are increased by substracting the
3065  * previous counters from the current ones. Fill the results in WAL stats
3066  * message.
3067  */
3068  MemSet(&walusage, 0, sizeof(WalUsage));
3069  WalUsageAccumDiff(&walusage, &pgWalUsage, &prevWalUsage);
3070 
3071  WalStats.m_wal_records = walusage.wal_records;
3072  WalStats.m_wal_fpi = walusage.wal_fpi;
3073  WalStats.m_wal_bytes = walusage.wal_bytes;
3074 
3075  /*
3076  * Send WAL stats message to the collector.
3077  */
3078  if (!pgstat_send_wal(true))
3079  return;
3080 
3081  /*
3082  * Save the current counters for the subsequent calculation of WAL usage.
3083  */
3084  prevWalUsage = pgWalUsage;
3085 }
3086 
3087 /* ----------
3088  * pgstat_send_wal() -
3089  *
3090  * Send WAL statistics to the collector.
3091  *
3092  * If 'force' is not set, WAL stats message is only sent if enough time has
3093  * passed since last one was sent to reach PGSTAT_STAT_INTERVAL.
3094  *
3095  * Return true if the message is sent, and false otherwise.
3096  * ----------
3097  */
3098 bool
3099 pgstat_send_wal(bool force)
3100 {
3101  /* We assume this initializes to zeroes */
3102  static const PgStat_MsgWal all_zeroes;
3103  static TimestampTz sendTime = 0;
3104 
3105  /*
3106  * This function can be called even if nothing at all has happened. In
3107  * this case, avoid sending a completely empty message to the stats
3108  * collector.
3109  */
3110  if (memcmp(&WalStats, &all_zeroes, sizeof(PgStat_MsgWal)) == 0)
3111  return false;
3112 
3113  if (!force)
3114  {
3116 
3117  /*
3118  * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
3119  * msec since we last sent one.
3120  */
3121  if (!TimestampDifferenceExceeds(sendTime, now, PGSTAT_STAT_INTERVAL))
3122  return false;
3123  sendTime = now;
3124  }
3125 
3126  /*
3127  * Prepare and send the message
3128  */
3130  pgstat_send(&WalStats, sizeof(WalStats));
3131 
3132  /*
3133  * Clear out the statistics buffer, so it can be re-used.
3134  */
3135  MemSet(&WalStats, 0, sizeof(WalStats));
3136 
3137  return true;
3138 }
3139 
3140 /* ----------
3141  * pgstat_send_slru() -
3142  *
3143  * Send SLRU statistics to the collector
3144  * ----------
3145  */
3146 static void
3148 {
3149  /* We assume this initializes to zeroes */
3150  static const PgStat_MsgSLRU all_zeroes;
3151 
3152  for (int i = 0; i < SLRU_NUM_ELEMENTS; i++)
3153  {
3154  /*
3155  * This function can be called even if nothing at all has happened. In
3156  * this case, avoid sending a completely empty message to the stats
3157  * collector.
3158  */
3159  if (memcmp(&SLRUStats[i], &all_zeroes, sizeof(PgStat_MsgSLRU)) == 0)
3160  continue;
3161 
3162  /* set the SLRU type before each send */
3163  SLRUStats[i].m_index = i;
3164 
3165  /*
3166  * Prepare and send the message
3167  */
3168  pgstat_setheader(&SLRUStats[i].m_hdr, PGSTAT_MTYPE_SLRU);
3169  pgstat_send(&SLRUStats[i], sizeof(PgStat_MsgSLRU));
3170 
3171  /*
3172  * Clear out the statistics buffer, so it can be re-used.
3173  */
3174  MemSet(&SLRUStats[i], 0, sizeof(PgStat_MsgSLRU));
3175  }
3176 }
3177 
3178 
3179 /* ----------
3180  * pgstat_send_recoveryprefetch() -
3181  *
3182  * Send recovery prefetch statistics to the collector
3183  * ----------
3184  */
3185 void
3187 {
3189 
3191  msg.m_stats = *stats;
3192  pgstat_send(&msg, sizeof(msg));
3193 }
3194 
3195 
3196 /* ----------
3197  * PgstatCollectorMain() -
3198  *
3199  * Start up the statistics collector process. This is the body of the
3200  * postmaster child process.
3201  *
3202  * The argc/argv parameters are valid only in EXEC_BACKEND case.
3203  * ----------
3204  */
3205 NON_EXEC_STATIC void
3206 PgstatCollectorMain(int argc, char *argv[])
3207 {
3208  int len;
3209  PgStat_Msg msg;
3210  int wr;
3211  WaitEvent event;
3212  WaitEventSet *wes;
3213 
3214  /*
3215  * Ignore all signals usually bound to some action in the postmaster,
3216  * except SIGHUP and SIGQUIT. Note we don't need a SIGUSR1 handler to
3217  * support latch operations, because we only use a local latch.
3218  */
3220  pqsignal(SIGINT, SIG_IGN);
3221  pqsignal(SIGTERM, SIG_IGN);
3227  /* Reset some signals that are accepted by postmaster but not here */
3230 
3232  init_ps_display(NULL);
3233 
3234  /*
3235  * Read in existing stats files or initialize the stats to zero.
3236  */
3237  pgStatRunningInCollector = true;
3238  pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
3239 
3240  /* Prepare to wait for our latch or data in our socket. */
3244  AddWaitEventToSet(wes, WL_SOCKET_READABLE, pgStatSock, NULL, NULL);
3245 
3246  /*
3247  * Loop to process messages until we get SIGQUIT or detect ungraceful
3248  * death of our parent postmaster.
3249  *
3250  * For performance reasons, we don't want to do ResetLatch/WaitLatch after
3251  * every message; instead, do that only after a recv() fails to obtain a
3252  * message. (This effectively means that if backends are sending us stuff
3253  * like mad, we won't notice postmaster death until things slack off a
3254  * bit; which seems fine.) To do that, we have an inner loop that
3255  * iterates as long as recv() succeeds. We do check ConfigReloadPending
3256  * inside the inner loop, which means that such interrupts will get
3257  * serviced but the latch won't get cleared until next time there is a
3258  * break in the action.
3259  */
3260  for (;;)
3261  {
3262  /* Clear any already-pending wakeups */
3264 
3265  /*
3266  * Quit if we get SIGQUIT from the postmaster.
3267  */
3269  break;
3270 
3271  /*
3272  * Inner loop iterates as long as we keep getting messages, or until
3273  * ShutdownRequestPending becomes set.
3274  */
3275  while (!ShutdownRequestPending)
3276  {
3277  /*
3278  * Reload configuration if we got SIGHUP from the postmaster.
3279  */
3280  if (ConfigReloadPending)
3281  {
3282  ConfigReloadPending = false;
3284  }
3285 
3286  /*
3287  * Write the stats file(s) if a new request has arrived that is
3288  * not satisfied by existing file(s).
3289  */
3291  pgstat_write_statsfiles(false, false);
3292 
3293  /*
3294  * Try to receive and process a message. This will not block,
3295  * since the socket is set to non-blocking mode.
3296  *
3297  * XXX On Windows, we have to force pgwin32_recv to cooperate,
3298  * despite the previous use of pg_set_noblock() on the socket.
3299  * This is extremely broken and should be fixed someday.
3300  */
3301 #ifdef WIN32
3302  pgwin32_noblock = 1;
3303 #endif
3304 
3305  len = recv(pgStatSock, (char *) &msg,
3306  sizeof(PgStat_Msg), 0);
3307 
3308 #ifdef WIN32
3309  pgwin32_noblock = 0;
3310 #endif
3311 
3312  if (len < 0)
3313  {
3314  if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
3315  break; /* out of inner loop */
3316  ereport(ERROR,
3318  errmsg("could not read statistics message: %m")));
3319  }
3320 
3321  /*
3322  * We ignore messages that are smaller than our common header
3323  */
3324  if (len < sizeof(PgStat_MsgHdr))
3325  continue;
3326 
3327  /*
3328  * The received length must match the length in the header
3329  */
3330  if (msg.msg_hdr.m_size != len)
3331  continue;
3332 
3333  /*
3334  * O.K. - we accept this message. Process it.
3335  */
3336  switch (msg.msg_hdr.m_type)
3337  {
3338  case PGSTAT_MTYPE_DUMMY:
3339  break;
3340 
3341  case PGSTAT_MTYPE_INQUIRY:
3342  pgstat_recv_inquiry(&msg.msg_inquiry, len);
3343  break;
3344 
3345  case PGSTAT_MTYPE_TABSTAT:
3346  pgstat_recv_tabstat(&msg.msg_tabstat, len);
3347  break;
3348 
3349  case PGSTAT_MTYPE_TABPURGE:
3350  pgstat_recv_tabpurge(&msg.msg_tabpurge, len);
3351  break;
3352 
3353  case PGSTAT_MTYPE_DROPDB:
3354  pgstat_recv_dropdb(&msg.msg_dropdb, len);
3355  break;
3356 
3358  pgstat_recv_resetcounter(&msg.msg_resetcounter, len);
3359  break;
3360 
3362  pgstat_recv_resetsharedcounter(&msg.msg_resetsharedcounter,
3363  len);
3364  break;
3365 
3367  pgstat_recv_resetsinglecounter(&msg.msg_resetsinglecounter,
3368  len);
3369  break;
3370 
3372  pgstat_recv_resetslrucounter(&msg.msg_resetslrucounter,
3373  len);
3374  break;
3375 
3377  pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
3378  len);
3379  break;
3380 
3382  pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
3383  break;
3384 
3385  case PGSTAT_MTYPE_VACUUM:
3386  pgstat_recv_vacuum(&msg.msg_vacuum, len);
3387  break;
3388 
3389  case PGSTAT_MTYPE_ANALYZE:
3390  pgstat_recv_analyze(&msg.msg_analyze, len);
3391  break;
3392 
3394  pgstat_recv_anl_ancestors(&msg.msg_anl_ancestors, len);
3395  break;
3396 
3397  case PGSTAT_MTYPE_ARCHIVER:
3398  pgstat_recv_archiver(&msg.msg_archiver, len);
3399  break;
3400 
3401  case PGSTAT_MTYPE_BGWRITER:
3402  pgstat_recv_bgwriter(&msg.msg_bgwriter, len);
3403  break;
3404 
3405  case PGSTAT_MTYPE_WAL:
3406  pgstat_recv_wal(&msg.msg_wal, len);
3407  break;
3408 
3409  case PGSTAT_MTYPE_SLRU:
3410  pgstat_recv_slru(&msg.msg_slru, len);
3411  break;
3412 
3414  pgstat_recv_recoveryprefetch(&msg.msg_recoveryprefetch, len);
3415  break;
3416 
3417  case PGSTAT_MTYPE_FUNCSTAT:
3418  pgstat_recv_funcstat(&msg.msg_funcstat, len);
3419  break;
3420 
3422  pgstat_recv_funcpurge(&msg.msg_funcpurge, len);
3423  break;
3424 
3426  pgstat_recv_recoveryconflict(&msg.msg_recoveryconflict,
3427  len);
3428  break;
3429 
3430  case PGSTAT_MTYPE_DEADLOCK:
3431  pgstat_recv_deadlock(&msg.msg_deadlock, len);
3432  break;
3433 
3434  case PGSTAT_MTYPE_TEMPFILE:
3435  pgstat_recv_tempfile(&msg.msg_tempfile, len);
3436  break;
3437 
3439  pgstat_recv_checksum_failure(&msg.msg_checksumfailure,
3440  len);
3441  break;
3442 
3443  case PGSTAT_MTYPE_REPLSLOT:
3444  pgstat_recv_replslot(&msg.msg_replslot, len);
3445  break;
3446 
3448  pgstat_recv_connstat(&msg.msg_conn, len);
3449  break;
3450 
3451  default:
3452  break;
3453  }
3454  } /* end of inner message-processing loop */
3455 
3456  /* Sleep until there's something to do */
3457 #ifndef WIN32
3458  wr = WaitEventSetWait(wes, -1L, &event, 1, WAIT_EVENT_PGSTAT_MAIN);
3459 #else
3460 
3461  /*
3462  * Windows, at least in its Windows Server 2003 R2 incarnation,
3463  * sometimes loses FD_READ events. Waking up and retrying the recv()
3464  * fixes that, so don't sleep indefinitely. This is a crock of the
3465  * first water, but until somebody wants to debug exactly what's
3466  * happening there, this is the best we can do. The two-second
3467  * timeout matches our pre-9.2 behavior, and needs to be short enough
3468  * to not provoke "using stale statistics" complaints from
3469  * backend_read_statsfile.
3470  */
3471  wr = WaitEventSetWait(wes, 2 * 1000L /* msec */ , &event, 1,
3473 #endif
3474 
3475  /*
3476  * Emergency bailout if postmaster has died. This is to avoid the
3477  * necessity for manual cleanup of all postmaster children.
3478  */
3479  if (wr == 1 && event.events == WL_POSTMASTER_DEATH)
3480  break;
3481  } /* end of outer loop */
3482 
3483  /*
3484  * Save the final stats to reuse at next startup.
3485  */
3486  pgstat_write_statsfiles(true, true);
3487 
3488  FreeWaitEventSet(wes);
3489 
3490  exit(0);
3491 }
3492 
3493 /*
3494  * Subroutine to clear stats in a database entry
3495  *
3496  * Tables and functions hashes are initialized to empty.
3497  */
3498 static void
3500 {
3501  HASHCTL hash_ctl;
3502 
3503  dbentry->n_xact_commit = 0;
3504  dbentry->n_xact_rollback = 0;
3505  dbentry->n_blocks_fetched = 0;
3506  dbentry->n_blocks_hit = 0;
3507  dbentry->n_tuples_returned = 0;
3508  dbentry->n_tuples_fetched = 0;
3509  dbentry->n_tuples_inserted = 0;
3510  dbentry->n_tuples_updated = 0;
3511  dbentry->n_tuples_deleted = 0;
3512  dbentry->last_autovac_time = 0;
3513  dbentry->n_conflict_tablespace = 0;
3514  dbentry->n_conflict_lock = 0;
3515  dbentry->n_conflict_snapshot = 0;
3516  dbentry->n_conflict_bufferpin = 0;
3517  dbentry->n_conflict_startup_deadlock = 0;
3518  dbentry->n_temp_files = 0;
3519  dbentry->n_temp_bytes = 0;
3520  dbentry->n_deadlocks = 0;
3521  dbentry->n_checksum_failures = 0;
3522  dbentry->last_checksum_failure = 0;
3523  dbentry->n_block_read_time = 0;
3524  dbentry->n_block_write_time = 0;
3525  dbentry->n_sessions = 0;
3526  dbentry->total_session_time = 0;
3527  dbentry->total_active_time = 0;
3528  dbentry->total_idle_in_xact_time = 0;
3529  dbentry->n_sessions_abandoned = 0;
3530  dbentry->n_sessions_fatal = 0;
3531  dbentry->n_sessions_killed = 0;
3532 
3534  dbentry->stats_timestamp = 0;
3535 
3536  hash_ctl.keysize = sizeof(Oid);
3537  hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3538  dbentry->tables = hash_create("Per-database table",
3540  &hash_ctl,
3541  HASH_ELEM | HASH_BLOBS);
3542 
3543  hash_ctl.keysize = sizeof(Oid);
3544  hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
3545  dbentry->functions = hash_create("Per-database function",
3547  &hash_ctl,
3548  HASH_ELEM | HASH_BLOBS);
3549 }
3550 
3551 /*
3552  * Lookup the hash table entry for the specified database. If no hash
3553  * table entry exists, initialize it, if the create parameter is true.
3554  * Else, return NULL.
3555  */
3556 static PgStat_StatDBEntry *
3557 pgstat_get_db_entry(Oid databaseid, bool create)
3558 {
3559  PgStat_StatDBEntry *result;
3560  bool found;
3561  HASHACTION action = (create ? HASH_ENTER : HASH_FIND);
3562 
3563  /* Lookup or create the hash table entry for this database */
3564  result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
3565  &databaseid,
3566  action, &found);
3567 
3568  if (!create && !found)
3569  return NULL;
3570 
3571  /*
3572  * If not found, initialize the new one. This creates empty hash tables
3573  * for tables and functions, too.
3574  */
3575  if (!found)
3576  reset_dbentry_counters(result);
3577 
3578  return result;
3579 }
3580 
3581 
3582 /*
3583  * Lookup the hash table entry for the specified table. If no hash
3584  * table entry exists, initialize it, if the create parameter is true.
3585  * Else, return NULL.
3586  */
3587 static PgStat_StatTabEntry *
3588 pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
3589 {
3590  PgStat_StatTabEntry *result;
3591  bool found;
3592  HASHACTION action = (create ? HASH_ENTER : HASH_FIND);
3593 
3594  /* Lookup or create the hash table entry for this table */
3595  result = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
3596  &tableoid,
3597  action, &found);
3598 
3599  if (!create && !found)
3600  return NULL;
3601 
3602  /* If not found, initialize the new one. */
3603  if (!found)
3604  {
3605  result->numscans = 0;
3606  result->tuples_returned = 0;
3607  result->tuples_fetched = 0;
3608  result->tuples_inserted = 0;
3609  result->tuples_updated = 0;
3610  result->tuples_deleted = 0;
3611  result->tuples_hot_updated = 0;
3612  result->n_live_tuples = 0;
3613  result->n_dead_tuples = 0;
3614  result->changes_since_analyze = 0;
3615  result->changes_since_analyze_reported = 0;
3616  result->inserts_since_vacuum = 0;
3617  result->blocks_fetched = 0;
3618  result->blocks_hit = 0;
3619  result->vacuum_timestamp = 0;
3620  result->vacuum_count = 0;
3621  result->autovac_vacuum_timestamp = 0;
3622  result->autovac_vacuum_count = 0;
3623  result->analyze_timestamp = 0;
3624  result->analyze_count = 0;
3625  result->autovac_analyze_timestamp = 0;
3626  result->autovac_analyze_count = 0;
3627  }
3628 
3629  return result;
3630 }
3631 
3632 
3633 /* ----------
3634  * pgstat_write_statsfiles() -
3635  * Write the global statistics file, as well as requested DB files.
3636  *
3637  * 'permanent' specifies writing to the permanent files not temporary ones.
3638  * When true (happens only when the collector is shutting down), also remove
3639  * the temporary files so that backends starting up under a new postmaster
3640  * can't read old data before the new collector is ready.
3641  *
3642  * When 'allDbs' is false, only the requested databases (listed in
3643  * pending_write_requests) will be written; otherwise, all databases
3644  * will be written.
3645  * ----------
3646  */
3647 static void
3648 pgstat_write_statsfiles(bool permanent, bool allDbs)
3649 {
3650  HASH_SEQ_STATUS hstat;
3651  PgStat_StatDBEntry *dbentry;
3652  FILE *fpout;
3653  int32 format_id;
3654  const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
3655  const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
3656  int rc;
3657  int i;
3658 
3659  elog(DEBUG2, "writing stats file \"%s\"", statfile);
3660 
3661  /*
3662  * Open the statistics temp file to write out the current values.
3663  */
3664  fpout = AllocateFile(tmpfile, PG_BINARY_W);
3665  if (fpout == NULL)
3666  {
3667  ereport(LOG,
3669  errmsg("could not open temporary statistics file \"%s\": %m",
3670  tmpfile)));
3671  return;
3672  }
3673 
3674  /*
3675  * Set the timestamp of the stats file.
3676  */
3677  globalStats.stats_timestamp = GetCurrentTimestamp();
3678 
3679  /*
3680  * Write the file header --- currently just a format ID.
3681  */
3682  format_id = PGSTAT_FILE_FORMAT_ID;
3683  rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
3684  (void) rc; /* we'll check for error with ferror */
3685 
3686  /*
3687  * Write global stats struct
3688  */
3689  rc = fwrite(&globalStats, sizeof(globalStats), 1, fpout);
3690  (void) rc; /* we'll check for error with ferror */
3691 
3692  /*
3693  * Write archiver stats struct
3694  */
3695  rc = fwrite(&archiverStats, sizeof(archiverStats), 1, fpout);
3696  (void) rc; /* we'll check for error with ferror */
3697 
3698  /*
3699  * Write WAL stats struct
3700  */
3701  rc = fwrite(&walStats, sizeof(walStats), 1, fpout);
3702  (void) rc; /* we'll check for error with ferror */
3703 
3704  /*
3705  * Write SLRU stats struct
3706  */
3707  rc = fwrite(slruStats, sizeof(slruStats), 1, fpout);
3708  (void) rc; /* we'll check for error with ferror */
3709 
3710  /*
3711  * Write recovery prefetch stats struct
3712  */
3713  rc = fwrite(&recoveryPrefetchStats, sizeof(recoveryPrefetchStats), 1,
3714  fpout);
3715  (void) rc; /* we'll check for error with ferror */
3716 
3717  /*
3718  * Walk through the database table.
3719  */
3720  hash_seq_init(&hstat, pgStatDBHash);
3721  while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
3722  {
3723  /*
3724  * Write out the table and function stats for this DB into the
3725  * appropriate per-DB stat file, if required.
3726  */
3727  if (allDbs || pgstat_db_requested(dbentry->databaseid))
3728  {
3729  /* Make DB's timestamp consistent with the global stats */
3730  dbentry->stats_timestamp = globalStats.stats_timestamp;
3731 
3732  pgstat_write_db_statsfile(dbentry, permanent);
3733  }
3734 
3735  /*
3736  * Write out the DB entry. We don't write the tables or functions
3737  * pointers, since they're of no use to any other process.
3738  */
3739  fputc('D', fpout);
3740  rc = fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
3741  (void) rc; /* we'll check for error with ferror */
3742  }
3743 
3744  /*
3745  * Write replication slot stats struct
3746  */
3747  for (i = 0; i < nReplSlotStats; i++)
3748  {
3749  fputc('R', fpout);
3750  rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
3751  (void) rc; /* we'll check for error with ferror */
3752  }
3753 
3754  /*
3755  * No more output to be done. Close the temp file and replace the old
3756  * pgstat.stat with it. The ferror() check replaces testing for error
3757  * after each individual fputc or fwrite above.
3758  */
3759  fputc('E', fpout);
3760 
3761  if (ferror(fpout))
3762  {
3763  ereport(LOG,
3765  errmsg("could not write temporary statistics file \"%s\": %m",
3766  tmpfile)));
3767  FreeFile(fpout);
3768  unlink(tmpfile);
3769  }
3770  else if (FreeFile(fpout) < 0)
3771  {
3772  ereport(LOG,
3774  errmsg("could not close temporary statistics file \"%s\": %m",
3775  tmpfile)));
3776  unlink(tmpfile);
3777  }
3778  else if (rename(tmpfile, statfile) < 0)
3779  {
3780  ereport(LOG,
3782  errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
3783  tmpfile, statfile)));
3784  unlink(tmpfile);
3785  }
3786 
3787  if (permanent)
3788  unlink(pgstat_stat_filename);
3789 
3790  /*
3791  * Now throw away the list of requests. Note that requests sent after we
3792  * started the write are still waiting on the network socket.
3793  */
3794  list_free(pending_write_requests);
3795  pending_write_requests = NIL;
3796 }
3797 
3798 /*
3799  * return the filename for a DB stat file; filename is the output buffer,
3800  * of length len.
3801  */
3802 static void
3803 get_dbstat_filename(bool permanent, bool tempname, Oid databaseid,
3804  char *filename, int len)
3805 {
3806  int printed;
3807 
3808  /* NB -- pgstat_reset_remove_files knows about the pattern this uses */
3809  printed = snprintf(filename, len, "%s/db_%u.%s",
3810  permanent ? PGSTAT_STAT_PERMANENT_DIRECTORY :
3812  databaseid,
3813  tempname ? "tmp" : "stat");
3814  if (printed >= len)
3815  elog(ERROR, "overlength pgstat path");
3816 }
3817 
3818 /* ----------
3819  * pgstat_write_db_statsfile() -
3820  * Write the stat file for a single database.
3821  *
3822  * If writing to the permanent file (happens when the collector is
3823  * shutting down only), remove the temporary file so that backends
3824  * starting up under a new postmaster can't read the old data before
3825  * the new collector is ready.
3826  * ----------
3827  */
3828 static void
3830 {
3831  HASH_SEQ_STATUS tstat;
3833  PgStat_StatTabEntry *tabentry;
3834  PgStat_StatFuncEntry *funcentry;
3835  FILE *fpout;
3836  int32 format_id;
3837  Oid dbid = dbentry->databaseid;
3838  int rc;
3839  char tmpfile[MAXPGPATH];
3840  char statfile[MAXPGPATH];
3841 
3842  get_dbstat_filename(permanent, true, dbid, tmpfile, MAXPGPATH);
3843  get_dbstat_filename(permanent, false, dbid, statfile, MAXPGPATH);
3844 
3845  elog(DEBUG2, "writing stats file \"%s\"", statfile);
3846 
3847  /*
3848  * Open the statistics temp file to write out the current values.
3849  */
3850  fpout = AllocateFile(tmpfile, PG_BINARY_W);
3851  if (fpout == NULL)
3852  {
3853  ereport(LOG,
3855  errmsg("could not open temporary statistics file \"%s\": %m",
3856  tmpfile)));
3857  return;
3858  }
3859 
3860  /*
3861  * Write the file header --- currently just a format ID.
3862  */
3863  format_id = PGSTAT_FILE_FORMAT_ID;
3864  rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
3865  (void) rc; /* we'll check for error with ferror */
3866 
3867  /*
3868  * Walk through the database's access stats per table.
3869  */
3870  hash_seq_init(&tstat, dbentry->tables);
3871  while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
3872  {
3873  fputc('T', fpout);
3874  rc = fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
3875  (void) rc; /* we'll check for error with ferror */
3876  }
3877 
3878  /*
3879  * Walk through the database's function stats table.
3880  */
3881  hash_seq_init(&fstat, dbentry->functions);
3882  while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&fstat)) != NULL)
3883  {
3884  fputc('F', fpout);
3885  rc = fwrite(funcentry, sizeof(PgStat_StatFuncEntry), 1, fpout);
3886  (void) rc; /* we'll check for error with ferror */
3887  }
3888 
3889  /*
3890  * No more output to be done. Close the temp file and replace the old
3891  * pgstat.stat with it. The ferror() check replaces testing for error
3892  * after each individual fputc or fwrite above.
3893  */
3894  fputc('E', fpout);
3895 
3896  if (ferror(fpout))
3897  {
3898  ereport(LOG,
3900  errmsg("could not write temporary statistics file \"%s\": %m",
3901  tmpfile)));
3902  FreeFile(fpout);
3903  unlink(tmpfile);
3904  }
3905  else if (FreeFile(fpout) < 0)
3906  {
3907  ereport(LOG,
3909  errmsg("could not close temporary statistics file \"%s\": %m",
3910  tmpfile)));
3911  unlink(tmpfile);
3912  }
3913  else if (rename(tmpfile, statfile) < 0)
3914  {
3915  ereport(LOG,
3917  errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
3918  tmpfile, statfile)));
3919  unlink(tmpfile);
3920  }
3921 
3922  if (permanent)
3923  {
3924  get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
3925 
3926  elog(DEBUG2, "removing temporary stats file \"%s\"", statfile);
3927  unlink(statfile);
3928  }
3929 }
3930 
3931 /* ----------
3932  * pgstat_read_statsfiles() -
3933  *
3934  * Reads in some existing statistics collector files and returns the
3935  * databases hash table that is the top level of the data.
3936  *
3937  * If 'onlydb' is not InvalidOid, it means we only want data for that DB
3938  * plus the shared catalogs ("DB 0"). We'll still populate the DB hash
3939  * table for all databases, but we don't bother even creating table/function
3940  * hash tables for other databases.
3941  *
3942  * 'permanent' specifies reading from the permanent files not temporary ones.
3943  * When true (happens only when the collector is starting up), remove the
3944  * files after reading; the in-memory status is now authoritative, and the
3945  * files would be out of date in case somebody else reads them.
3946  *
3947  * If a 'deep' read is requested, table/function stats are read, otherwise
3948  * the table/function hash tables remain empty.
3949  * ----------
3950  */
3951 static HTAB *
3952 pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
3953 {
3954  PgStat_StatDBEntry *dbentry;
3955  PgStat_StatDBEntry dbbuf;
3956  HASHCTL hash_ctl;
3957  HTAB *dbhash;
3958  FILE *fpin;
3959  int32 format_id;
3960  bool found;
3961  const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
3962  int i;
3963 
3964  /*
3965  * The tables will live in pgStatLocalContext.
3966  */
3968 
3969  /*
3970  * Create the DB hashtable
3971  */
3972  hash_ctl.keysize = sizeof(Oid);
3973  hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
3974  hash_ctl.hcxt = pgStatLocalContext;
3975  dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
3977 
3978  /* Allocate the space for replication slot statistics */
3979  replSlotStats = MemoryContextAllocZero(pgStatLocalContext,
3981  * sizeof(PgStat_ReplSlotStats));
3982  nReplSlotStats = 0;
3983 
3984  /*
3985  * Clear out global, archiver, WAL and SLRU statistics so they start from
3986  * zero in case we can't load an existing statsfile.
3987  */
3988  memset(&globalStats, 0, sizeof(globalStats));
3989  memset(&archiverStats, 0, sizeof(archiverStats));
3990  memset(&walStats, 0, sizeof(walStats));
3991  memset(&slruStats, 0, sizeof(slruStats));
3992  memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats));
3993 
3994  /*
3995  * Set the current timestamp (will be kept only in case we can't load an
3996  * existing statsfile).
3997  */
3998  globalStats.stat_reset_timestamp = GetCurrentTimestamp();
3999  archiverStats.stat_reset_timestamp = globalStats.stat_reset_timestamp;
4000  walStats.stat_reset_timestamp = globalStats.stat_reset_timestamp;
4001 
4002  /*
4003  * Set the same reset timestamp for all SLRU items too.
4004  */
4005  for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
4006  slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
4007 
4008  /*
4009  * Set the same reset timestamp for all replication slots too.
4010  */
4011  for (i = 0; i < max_replication_slots; i++)
4012  replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
4013 
4014  /*
4015  * Try to open the stats file. If it doesn't exist, the backends simply
4016  * return zero for anything and the collector simply starts from scratch
4017  * with empty counters.
4018  *
4019  * ENOENT is a possibility if the stats collector is not running or has
4020  * not yet written the stats file the first time. Any other failure
4021  * condition is suspicious.
4022  */
4023  if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
4024  {
4025  if (errno != ENOENT)
4028  errmsg("could not open statistics file \"%s\": %m",
4029  statfile)));
4030  return dbhash;
4031  }
4032 
4033  /*
4034  * Verify it's of the expected format.
4035  */
4036  if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
4037  format_id != PGSTAT_FILE_FORMAT_ID)
4038  {
4040  (errmsg("corrupted statistics file \"%s\"", statfile)));
4041  goto done;
4042  }
4043 
4044  /*
4045  * Read global stats struct
4046  */
4047  if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats))
4048  {
4050  (errmsg("corrupted statistics file \"%s\"", statfile)));
4051  memset(&globalStats, 0, sizeof(globalStats));
4052  goto done;
4053  }
4054 
4055  /*
4056  * In the collector, disregard the timestamp we read from the permanent
4057  * stats file; we should be willing to write a temp stats file immediately
4058  * upon the first request from any backend. This only matters if the old
4059  * file's timestamp is less than PGSTAT_STAT_INTERVAL ago, but that's not
4060  * an unusual scenario.
4061  */
4063  globalStats.stats_timestamp = 0;
4064 
4065  /*
4066  * Read archiver stats struct
4067  */
4068  if (fread(&archiverStats, 1, sizeof(archiverStats), fpin) != sizeof(archiverStats))
4069  {
4071  (errmsg("corrupted statistics file \"%s\"", statfile)));
4072  memset(&archiverStats, 0, sizeof(archiverStats));
4073  goto done;
4074  }
4075 
4076  /*
4077  * Read WAL stats struct
4078  */
4079  if (fread(&walStats, 1, sizeof(walStats), fpin) != sizeof(walStats))
4080  {
4082  (errmsg("corrupted statistics file \"%s\"", statfile)));
4083  memset(&walStats, 0, sizeof(walStats));
4084  goto done;
4085  }
4086 
4087  /*
4088  * Read SLRU stats struct
4089  */
4090  if (fread(slruStats, 1, sizeof(slruStats), fpin) != sizeof(slruStats))
4091  {
4093  (errmsg("corrupted statistics file \"%s\"", statfile)));
4094  memset(&slruStats, 0, sizeof(slruStats));
4095  goto done;
4096  }
4097 
4098  /*
4099  * Read recoveryPrefetchStats struct
4100  */
4101  if (fread(&recoveryPrefetchStats, 1, sizeof(recoveryPrefetchStats),
4102  fpin) != sizeof(recoveryPrefetchStats))
4103  {
4105  (errmsg("corrupted statistics file \"%s\"", statfile)));
4106  memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats));
4107  goto done;
4108  }
4109 
4110  /*
4111  * We found an existing collector stats file. Read it and put all the
4112  * hashtable entries into place.
4113  */
4114  for (;;)
4115  {
4116  switch (fgetc(fpin))
4117  {
4118  /*
4119  * 'D' A PgStat_StatDBEntry struct describing a database
4120  * follows.
4121  */
4122  case 'D':
4123  if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
4124  fpin) != offsetof(PgStat_StatDBEntry, tables))
4125  {
4127  (errmsg("corrupted statistics file \"%s\"",
4128  statfile)));
4129  goto done;
4130  }
4131 
4132  /*
4133  * Add to the DB hash
4134  */
4135  dbentry = (PgStat_StatDBEntry *) hash_search(dbhash,
4136  (void *) &dbbuf.databaseid,
4137  HASH_ENTER,
4138  &found);
4139  if (found)
4140  {
4142  (errmsg("corrupted statistics file \"%s\"",
4143  statfile)));
4144  goto done;
4145  }
4146 
4147  memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
4148  dbentry->tables = NULL;
4149  dbentry->functions = NULL;
4150 
4151  /*
4152  * In the collector, disregard the timestamp we read from the
4153  * permanent stats file; we should be willing to write a temp
4154  * stats file immediately upon the first request from any
4155  * backend.
4156  */
4158  dbentry->stats_timestamp = 0;
4159 
4160  /*
4161  * Don't create tables/functions hashtables for uninteresting
4162  * databases.
4163  */
4164  if (onlydb != InvalidOid)
4165  {
4166  if (dbbuf.databaseid != onlydb &&
4167  dbbuf.databaseid != InvalidOid)
4168  break;
4169  }
4170 
4171  hash_ctl.keysize = sizeof(Oid);
4172  hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
4173  hash_ctl.hcxt = pgStatLocalContext;
4174  dbentry->tables = hash_create("Per-database table",
4176  &hash_ctl,
4178 
4179  hash_ctl.keysize = sizeof(Oid);
4180  hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
4181  hash_ctl.hcxt = pgStatLocalContext;
4182  dbentry->functions = hash_create("Per-database function",
4184  &hash_ctl,
4186 
4187  /*
4188  * If requested, read the data from the database-specific
4189  * file. Otherwise we just leave the hashtables empty.
4190  */
4191  if (deep)
4193  dbentry->tables,
4194  dbentry->functions,
4195  permanent);
4196 
4197  break;
4198 
4199  /*
4200  * 'R' A PgStat_ReplSlotStats struct describing a replication
4201  * slot follows.
4202  */
4203  case 'R':
4204  if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
4205  != sizeof(PgStat_ReplSlotStats))
4206  {
4208  (errmsg("corrupted statistics file \"%s\"",
4209  statfile)));
4210  memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
4211  goto done;
4212  }
4213  nReplSlotStats++;
4214  break;
4215 
4216  case 'E':
4217  goto done;
4218 
4219  default:
4221  (errmsg("corrupted statistics file \"%s\"",
4222  statfile)));
4223  goto done;
4224  }
4225  }
4226 
4227 done:
4228  FreeFile(fpin);
4229 
4230  /* If requested to read the permanent file, also get rid of it. */
4231  if (permanent)
4232  {
4233  elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
4234  unlink(statfile);
4235  }
4236 
4237  return dbhash;
4238 }
4239 
4240 
4241 /* ----------
4242  * pgstat_read_db_statsfile() -
4243  *
4244  * Reads in the existing statistics collector file for the given database,
4245  * filling the passed-in tables and functions hash tables.
4246  *
4247  * As in pgstat_read_statsfiles, if the permanent file is requested, it is
4248  * removed after reading.
4249  *
4250  * Note: this code has the ability to skip storing per-table or per-function
4251  * data, if NULL is passed for the corresponding hashtable. That's not used
4252  * at the moment though.
4253  * ----------
4254  */
4255 static void
4256 pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
4257  bool permanent)
4258 {
4259  PgStat_StatTabEntry *tabentry;
4260  PgStat_StatTabEntry tabbuf;
4261  PgStat_StatFuncEntry funcbuf;
4262  PgStat_StatFuncEntry *funcentry;
4263  FILE *fpin;
4264  int32 format_id;
4265  bool found;
4266  char statfile[MAXPGPATH];
4267 
4268  get_dbstat_filename(permanent, false, databaseid, statfile, MAXPGPATH);
4269 
4270  /*
4271  * Try to open the stats file. If it doesn't exist, the backends simply
4272  * return zero for anything and the collector simply starts from scratch
4273  * with empty counters.
4274  *
4275  * ENOENT is a possibility if the stats collector is not running or has
4276  * not yet written the stats file the first time. Any other failure
4277  * condition is suspicious.
4278  */
4279  if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
4280  {
4281  if (errno != ENOENT)
4284  errmsg("could not open statistics file \"%s\": %m",
4285  statfile)));
4286  return;
4287  }
4288 
4289  /*
4290  * Verify it's of the expected format.
4291  */
4292  if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
4293  format_id != PGSTAT_FILE_FORMAT_ID)
4294  {
4296  (errmsg("corrupted statistics file \"%s\"", statfile)));
4297  goto done;
4298  }
4299 
4300  /*
4301  * We found an existing collector stats file. Read it and put all the
4302  * hashtable entries into place.
4303  */
4304  for (;;)
4305  {
4306  switch (fgetc(fpin))
4307  {
4308  /*
4309  * 'T' A PgStat_StatTabEntry follows.
4310  */
4311  case 'T':
4312  if (fread(&tabbuf, 1, sizeof(PgStat_StatTabEntry),
4313  fpin) != sizeof(PgStat_StatTabEntry))
4314  {
4316  (errmsg("corrupted statistics file \"%s\"",
4317  statfile)));
4318  goto done;
4319  }
4320 
4321  /*
4322  * Skip if table data not wanted.
4323  */
4324  if (tabhash == NULL)
4325  break;
4326 
4327  tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
4328  (void *) &tabbuf.tableid,
4329  HASH_ENTER, &found);
4330 
4331  if (found)
4332  {
4334  (errmsg("corrupted statistics file \"%s\"",
4335  statfile)));
4336  goto done;
4337  }
4338 
4339  memcpy(tabentry, &tabbuf, sizeof(tabbuf));
4340  break;
4341 
4342  /*
4343  * 'F' A PgStat_StatFuncEntry follows.
4344  */
4345  case 'F':
4346  if (fread(&funcbuf, 1, sizeof(PgStat_StatFuncEntry),
4347  fpin) != sizeof(PgStat_StatFuncEntry))
4348  {
4350  (errmsg("corrupted statistics file \"%s\"",
4351  statfile)));
4352  goto done;
4353  }
4354 
4355  /*
4356  * Skip if function data not wanted.
4357  */
4358  if (funchash == NULL)
4359  break;
4360 
4361  funcentry = (PgStat_StatFuncEntry *) hash_search(funchash,
4362  (void *) &funcbuf.functionid,
4363  HASH_ENTER, &found);
4364 
4365  if (found)
4366  {
4368  (errmsg("corrupted statistics file \"%s\"",
4369  statfile)));
4370  goto done;
4371  }
4372 
4373  memcpy(funcentry, &funcbuf, sizeof(funcbuf));
4374  break;
4375 
4376  /*
4377  * 'E' The EOF marker of a complete stats file.
4378  */
4379  case 'E':
4380  goto done;
4381 
4382  default:
4384  (errmsg("corrupted statistics file \"%s\"",
4385  statfile)));
4386  goto done;
4387  }
4388  }
4389 
4390 done:
4391  FreeFile(fpin);
4392 
4393  if (permanent)
4394  {
4395  elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
4396  unlink(statfile);
4397  }
4398 }
4399 
4400 /* ----------
4401  * pgstat_read_db_statsfile_timestamp() -
4402  *
4403  * Attempt to determine the timestamp of the last db statfile write.
4404  * Returns true if successful; the timestamp is stored in *ts. The caller must
4405  * rely on timestamp stored in *ts iff the function returns true.
4406  *
4407  * This needs to be careful about handling databases for which no stats file
4408  * exists, such as databases without a stat entry or those not yet written:
4409  *
4410  * - if there's a database entry in the global file, return the corresponding
4411  * stats_timestamp value.
4412  *
4413  * - if there's no db stat entry (e.g. for a new or inactive database),
4414  * there's no stats_timestamp value, but also nothing to write so we return
4415  * the timestamp of the global statfile.
4416  * ----------
4417  */
4418 static bool
4419 pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
4420  TimestampTz *ts)
4421 {
4422  PgStat_StatDBEntry dbentry;
4423  PgStat_GlobalStats myGlobalStats;
4424  PgStat_ArchiverStats myArchiverStats;
4425  PgStat_WalStats myWalStats;
4426  PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
4427  PgStat_ReplSlotStats myReplSlotStats;
4428  PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
4429  FILE *fpin;
4430  int32 format_id;
4431  const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
4432 
4433  /*
4434  * Try to open the stats file. As above, anything but ENOENT is worthy of
4435  * complaining about.
4436  */
4437  if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
4438  {
4439  if (errno != ENOENT)
4442  errmsg("could not open statistics file \"%s\": %m",
4443  statfile)));
4444  return false;
4445  }
4446 
4447  /*
4448  * Verify it's of the expected format.
4449  */
4450  if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
4451  format_id != PGSTAT_FILE_FORMAT_ID)
4452  {
4454  (errmsg("corrupted statistics file \"%s\"", statfile)));
4455  FreeFile(fpin);
4456  return false;
4457  }
4458 
4459  /*
4460  * Read global stats struct
4461  */
4462  if (fread(&myGlobalStats, 1, sizeof(myGlobalStats),
4463  fpin) != sizeof(myGlobalStats))
4464  {
4466  (errmsg("corrupted statistics file \"%s\"", statfile)));
4467  FreeFile(fpin);
4468  return false;
4469  }
4470 
4471  /*
4472  * Read archiver stats struct
4473  */
4474  if (fread(&myArchiverStats, 1, sizeof(myArchiverStats),
4475  fpin) != sizeof(myArchiverStats))
4476  {
4478  (errmsg("corrupted statistics file \"%s\"", statfile)));
4479  FreeFile(fpin);
4480  return false;
4481  }
4482 
4483  /*
4484  * Read WAL stats struct
4485  */
4486  if (fread(&myWalStats, 1, sizeof(myWalStats), fpin) != sizeof(myWalStats))
4487  {
4489  (errmsg("corrupted statistics file \"%s\"", statfile)));
4490  FreeFile(fpin);
4491  return false;
4492  }
4493 
4494  /*
4495  * Read SLRU stats struct
4496  */
4497  if (fread(mySLRUStats, 1, sizeof(mySLRUStats), fpin) != sizeof(mySLRUStats))
4498  {
4500  (errmsg("corrupted statistics file \"%s\"", statfile)));
4501  FreeFile(fpin);
4502  return false;
4503  }
4504 
4505  /*
4506  * Read recovery prefetch stats struct
4507  */
4508  if (fread(&myRecoveryPrefetchStats, 1, sizeof(myRecoveryPrefetchStats),
4509  fpin) != sizeof(myRecoveryPrefetchStats))
4510  {
4512  (errmsg("corrupted statistics file \"%s\"", statfile)));
4513  FreeFile(fpin);
4514  return false;
4515  }
4516 
4517  /* By default, we're going to return the timestamp of the global file. */
4518  *ts = myGlobalStats.stats_timestamp;
4519 
4520  /*
4521  * We found an existing collector stats file. Read it and look for a
4522  * record for the requested database. If found, use its timestamp.
4523  */
4524  for (;;)
4525  {
4526  switch (fgetc(fpin))
4527  {
4528  /*
4529  * 'D' A PgStat_StatDBEntry struct describing a database
4530  * follows.
4531  */
4532  case 'D':
4533  if (fread(&dbentry, 1, offsetof(PgStat_StatDBEntry, tables),
4534  fpin) != offsetof(PgStat_StatDBEntry, tables))
4535  {
4537  (errmsg("corrupted statistics file \"%s\"",
4538  statfile)));
4539  FreeFile(fpin);
4540  return false;
4541  }
4542 
4543  /*
4544  * If this is the DB we're looking for, save its timestamp and
4545  * we're done.
4546  */
4547  if (dbentry.databaseid == databaseid)
4548  {
4549  *ts = dbentry.stats_timestamp;
4550  goto done;
4551  }
4552 
4553  break;
4554 
4555  /*
4556  * 'R' A PgStat_ReplSlotStats struct describing a replication
4557  * slot follows.
4558  */
4559  case 'R':
4560  if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
4561  != sizeof(PgStat_ReplSlotStats))
4562  {
4564  (errmsg("corrupted statistics file \"%s\"",
4565  statfile)));
4566  FreeFile(fpin);
4567  return false;
4568  }
4569  break;
4570 
4571  case 'E':
4572  goto done;
4573 
4574  default:
4575  {
4577  (errmsg("corrupted statistics file \"%s\"",
4578  statfile)));
4579  FreeFile(fpin);
4580  return false;
4581  }
4582  }
4583  }
4584 
4585 done:
4586  FreeFile(fpin);
4587  return true;
4588 }
4589 
4590 /*
4591  * If not already done, read the statistics collector stats file into
4592  * some hash tables. The results will be kept until pgstat_clear_snapshot()
4593  * is called (typically, at end of transaction).
4594  */
4595 static void
4597 {
4598  TimestampTz min_ts = 0;
4599  TimestampTz ref_ts = 0;
4600  Oid inquiry_db;
4601  int count;
4602 
4603  /* already read it? */
4604  if (pgStatDBHash)
4605  return;
4607 
4608  /*
4609  * In a normal backend, we check staleness of the data for our own DB, and
4610  * so we send MyDatabaseId in inquiry messages. In the autovac launcher,
4611  * check staleness of the shared-catalog data, and send InvalidOid in
4612  * inquiry messages so as not to force writing unnecessary data.
4613  */
4615  inquiry_db = InvalidOid;
4616  else
4617  inquiry_db = MyDatabaseId;
4618 
4619  /*
4620  * Loop until fresh enough stats file is available or we ran out of time.
4621  * The stats inquiry message is sent repeatedly in case collector drops
4622  * it; but not every single time, as that just swamps the collector.
4623  */
4624  for (count = 0; count < PGSTAT_POLL_LOOP_COUNT; count++)
4625  {
4626  bool ok;
4627  TimestampTz file_ts = 0;
4628  TimestampTz cur_ts;
4629 
4631 
4632  ok = pgstat_read_db_statsfile_timestamp(inquiry_db, false, &file_ts);
4633 
4634  cur_ts = GetCurrentTimestamp();
4635  /* Calculate min acceptable timestamp, if we didn't already */
4636  if (count == 0 || cur_ts < ref_ts)
4637  {
4638  /*
4639  * We set the minimum acceptable timestamp to PGSTAT_STAT_INTERVAL
4640  * msec before now. This indirectly ensures that the collector
4641  * needn't write the file more often than PGSTAT_STAT_INTERVAL. In
4642  * an autovacuum worker, however, we want a lower delay to avoid
4643  * using stale data, so we use PGSTAT_RETRY_DELAY (since the
4644  * number of workers is low, this shouldn't be a problem).
4645  *
4646  * We don't recompute min_ts after sleeping, except in the
4647  * unlikely case that cur_ts went backwards. So we might end up
4648  * accepting a file a bit older than PGSTAT_STAT_INTERVAL. In
4649  * practice that shouldn't happen, though, as long as the sleep
4650  * time is less than PGSTAT_STAT_INTERVAL; and we don't want to
4651  * tell the collector that our cutoff time is less than what we'd
4652  * actually accept.
4653  */
4654  ref_ts = cur_ts;
4656  min_ts = TimestampTzPlusMilliseconds(ref_ts,
4658  else
4659  min_ts = TimestampTzPlusMilliseconds(ref_ts,
4661  }
4662 
4663  /*
4664  * If the file timestamp is actually newer than cur_ts, we must have
4665  * had a clock glitch (system time went backwards) or there is clock
4666  * skew between our processor and the stats collector's processor.
4667  * Accept the file, but send an inquiry message anyway to make
4668  * pgstat_recv_inquiry do a sanity check on the collector's time.
4669  */
4670  if (ok && file_ts > cur_ts)
4671  {
4672  /*
4673  * A small amount of clock skew between processors isn't terribly
4674  * surprising, but a large difference is worth logging. We
4675  * arbitrarily define "large" as 1000 msec.
4676  */
4677  if (file_ts >= TimestampTzPlusMilliseconds(cur_ts, 1000))
4678  {
4679  char *filetime;
4680  char *mytime;
4681 
4682  /* Copy because timestamptz_to_str returns a static buffer */
4683  filetime = pstrdup(timestamptz_to_str(file_ts));
4684  mytime = pstrdup(timestamptz_to_str(cur_ts));
4685  ereport(LOG,
4686  (errmsg("statistics collector's time %s is later than backend local time %s",
4687  filetime, mytime)));
4688  pfree(filetime);
4689  pfree(mytime);
4690  }
4691 
4692  pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
4693  break;
4694  }
4695 
4696  /* Normal acceptance case: file is not older than cutoff time */
4697  if (ok && file_ts >= min_ts)
4698  break;
4699 
4700  /*
4701  * If we're in crash recovery, the collector may not even be running,
4702  * so work with what we have.
4703  */
4704  if (InRecovery)
4705  break;
4706 
4707  /* Not there or too old, so kick the collector and wait a bit */
4708  if ((count % PGSTAT_INQ_LOOP_COUNT) == 0)
4709  pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
4710 
4711  pg_usleep(PGSTAT_RETRY_DELAY * 1000L);
4712  }
4713 
4714  if (count >= PGSTAT_POLL_LOOP_COUNT)
4715  ereport(LOG,
4716  (errmsg("using stale statistics instead of current ones "
4717  "because stats collector is not responding")));
4718 
4719  /*
4720  * Autovacuum launcher wants stats about all databases, but a shallow read
4721  * is sufficient. Regular backends want a deep read for just the tables
4722  * they can see (MyDatabaseId + shared catalogs).
4723  */
4725  pgStatDBHash = pgstat_read_statsfiles(InvalidOid, false, false);
4726  else
4727  pgStatDBHash = pgstat_read_statsfiles(MyDatabaseId, false, true);
4728 }
4729 
4730 
4731 /* ----------
4732  * pgstat_setup_memcxt() -
4733  *
4734  * Create pgStatLocalContext, if not already done.
4735  * ----------
4736  */
4737 static void
4739 {
4740  if (!pgStatLocalContext)
4741  pgStatLocalContext = AllocSetContextCreate(TopMemoryContext,
4742  "Statistics snapshot",
4744 }
4745 
4746 
4747 /* ----------
4748  * pgstat_clear_snapshot() -
4749  *
4750  * Discard any data collected in the current transaction. Any subsequent
4751  * request will cause new snapshots to be read.
4752  *
4753  * This is also invoked during transaction commit or abort to discard
4754  * the no-longer-wanted snapshot.
4755  * ----------
4756  */
4757 void
4759 {
4760  /* Release memory, if any was allocated */
4761  if (pgStatLocalContext)
4762  MemoryContextDelete(pgStatLocalContext);
4763 
4764  /* Reset variables */
4765  pgStatLocalContext = NULL;
4766  pgStatDBHash = NULL;
4767  replSlotStats = NULL;
4768  nReplSlotStats = 0;
4769 
4770  /*
4771  * Historically the backend_status.c facilities lived in this file, and
4772  * were reset with the same function. For now keep it that way, and
4773  * forward the reset request.
4774  */
4776 }
4777 
4778 
4779 /* ----------
4780  * pgstat_recv_inquiry() -
4781  *
4782  * Process stat inquiry requests.
4783  * ----------
4784  */
4785 static void
4787 {
4788  PgStat_StatDBEntry *dbentry;
4789 
4790  elog(DEBUG2, "received inquiry for database %u", msg->databaseid);
4791 
4792  /*
4793  * If there's already a write request for this DB, there's nothing to do.
4794  *
4795  * Note that if a request is found, we return early and skip the below
4796  * check for clock skew. This is okay, since the only way for a DB
4797  * request to be present in the list is that we have been here since the
4798  * last write round. It seems sufficient to check for clock skew once per
4799  * write round.
4800  */
4801  if (list_member_oid(pending_write_requests, msg->databaseid))
4802  return;
4803 
4804  /*
4805  * Check to see if we last wrote this database at a time >= the requested
4806  * cutoff time. If so, this is a stale request that was generated before
4807  * we updated the DB file, and we don't need to do so again.
4808  *
4809  * If the requestor's local clock time is older than stats_timestamp, we
4810  * should suspect a clock glitch, ie system time going backwards; though
4811  * the more likely explanation is just delayed message receipt. It is
4812  * worth expending a GetCurrentTimestamp call to be sure, since a large
4813  * retreat in the system clock reading could otherwise cause us to neglect
4814  * to update the stats file for a long time.
4815  */
4816  dbentry = pgstat_get_db_entry(msg->databaseid, false);
4817  if (dbentry == NULL)
4818  {
4819  /*
4820  * We have no data for this DB. Enter a write request anyway so that
4821  * the global stats will get updated. This is needed to prevent
4822  * backend_read_statsfile from waiting for data that we cannot supply,
4823  * in the case of a new DB that nobody has yet reported any stats for.
4824  * See the behavior of pgstat_read_db_statsfile_timestamp.
4825  */
4826  }
4827  else if (msg->clock_time < dbentry->stats_timestamp)
4828  {
4829  TimestampTz cur_ts = GetCurrentTimestamp();
4830 
4831  if (cur_ts < dbentry->stats_timestamp)
4832  {
4833  /*
4834  * Sure enough, time went backwards. Force a new stats file write
4835  * to get back in sync; but first, log a complaint.
4836  */
4837  char *writetime;
4838  char *mytime;
4839 
4840  /* Copy because timestamptz_to_str returns a static buffer */
4841  writetime = pstrdup(timestamptz_to_str(dbentry->stats_timestamp));
4842  mytime = pstrdup(timestamptz_to_str(cur_ts));
4843  ereport(LOG,
4844  (errmsg("stats_timestamp %s is later than collector's time %s for database %u",
4845  writetime, mytime, dbentry->databaseid)));
4846  pfree(writetime);
4847  pfree(mytime);
4848  }
4849  else
4850  {
4851  /*
4852  * Nope, it's just an old request. Assuming msg's clock_time is
4853  * >= its cutoff_time, it must be stale, so we can ignore it.
4854  */
4855  return;
4856  }
4857  }
4858  else if (msg->cutoff_time <= dbentry->stats_timestamp)
4859  {
4860  /* Stale request, ignore it */
4861  return;
4862  }
4863 
4864  /*
4865  * We need to write this DB, so create a request.
4866  */
4867  pending_write_requests = lappend_oid(pending_write_requests,
4868  msg->databaseid);
4869 }
4870 
4871 
4872 /* ----------
4873  * pgstat_recv_tabstat() -
4874  *
4875  * Count what the backend has done.
4876  * ----------
4877  */
4878 static void
4880 {
4881  PgStat_StatDBEntry *dbentry;
4882  PgStat_StatTabEntry *tabentry;
4883  int i;
4884  bool found;
4885 
4886  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
4887 
4888  /*
4889  * Update database-wide stats.
4890  */
4891  dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
4892  dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
4893  dbentry->n_block_read_time += msg->m_block_read_time;
4894  dbentry->n_block_write_time += msg->m_block_write_time;
4895 
4896  /*
4897  * Process all table entries in the message.
4898  */
4899  for (i = 0; i < msg->m_nentries; i++)
4900  {
4901  PgStat_TableEntry *tabmsg = &(msg->m_entry[i]);
4902 
4903  tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
4904  (void *) &(tabmsg->t_id),
4905  HASH_ENTER, &found);
4906 
4907  if (!found)
4908  {
4909  /*
4910  * If it's a new table entry, initialize counters to the values we
4911  * just got.
4912  */
4913  tabentry->numscans = tabmsg->t_counts.t_numscans;
4914  tabentry->tuples_returned = tabmsg->t_counts.t_tuples_returned;
4915  tabentry->tuples_fetched = tabmsg->t_counts.t_tuples_fetched;
4916  tabentry->tuples_inserted = tabmsg->t_counts.t_tuples_inserted;
4917  tabentry->tuples_updated = tabmsg->t_counts.t_tuples_updated;
4918  tabentry->tuples_deleted = tabmsg->t_counts.t_tuples_deleted;
4919  tabentry->tuples_hot_updated = tabmsg->t_counts.t_tuples_hot_updated;
4920  tabentry->n_live_tuples = tabmsg->t_counts.t_delta_live_tuples;
4921  tabentry->n_dead_tuples = tabmsg->t_counts.t_delta_dead_tuples;
4922  tabentry->changes_since_analyze = tabmsg->t_counts.t_changed_tuples;
4923  tabentry->changes_since_analyze_reported = 0;
4924  tabentry->inserts_since_vacuum = tabmsg->t_counts.t_tuples_inserted;
4925  tabentry->blocks_fetched = tabmsg->t_counts.t_blocks_fetched;
4926  tabentry->blocks_hit = tabmsg->t_counts.t_blocks_hit;
4927 
4928  tabentry->vacuum_timestamp = 0;
4929  tabentry->vacuum_count = 0;
4930  tabentry->autovac_vacuum_timestamp = 0;
4931  tabentry->autovac_vacuum_count = 0;
4932  tabentry->analyze_timestamp = 0;
4933  tabentry->analyze_count = 0;
4934  tabentry->autovac_analyze_timestamp = 0;
4935  tabentry->autovac_analyze_count = 0;
4936  }
4937  else
4938  {
4939  /*
4940  * Otherwise add the values to the existing entry.
4941  */
4942  tabentry->numscans += tabmsg->t_counts.t_numscans;
4943  tabentry->tuples_returned += tabmsg->t_counts.t_tuples_returned;
4944  tabentry->tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
4945  tabentry->tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
4946  tabentry->tuples_updated += tabmsg->t_counts.t_tuples_updated;
4947  tabentry->tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
4948  tabentry->tuples_hot_updated += tabmsg->t_counts.t_tuples_hot_updated;
4949  /* If table was truncated, first reset the live/dead counters */
4950  if (tabmsg->t_counts.t_truncated)
4951  {
4952  tabentry->n_live_tuples = 0;
4953  tabentry->n_dead_tuples = 0;
4954  tabentry->inserts_since_vacuum = 0;
4955  }
4956  tabentry->n_live_tuples += tabmsg->t_counts.t_delta_live_tuples;
4957  tabentry->n_dead_tuples += tabmsg->t_counts.t_delta_dead_tuples;
4958  tabentry->changes_since_analyze += tabmsg->t_counts.t_changed_tuples;
4959  tabentry->inserts_since_vacuum += tabmsg->t_counts.t_tuples_inserted;
4960  tabentry->blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
4961  tabentry->blocks_hit += tabmsg->t_counts.t_blocks_hit;
4962  }
4963 
4964  /* Clamp n_live_tuples in case of negative delta_live_tuples */
4965  tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0);
4966  /* Likewise for n_dead_tuples */
4967  tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0);
4968 
4969  /*
4970  * Add per-table stats to the per-database entry, too.
4971  */
4972  dbentry->n_tuples_returned += tabmsg->t_counts.t_tuples_returned;
4973  dbentry->n_tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
4974  dbentry->n_tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
4975  dbentry->n_tuples_updated += tabmsg->t_counts.t_tuples_updated;
4976  dbentry->n_tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
4977  dbentry->n_blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
4978  dbentry->n_blocks_hit += tabmsg->t_counts.t_blocks_hit;
4979  }
4980 }
4981 
4982 
4983 /* ----------
4984  * pgstat_recv_tabpurge() -
4985  *
4986  * Arrange for dead table removal.
4987  * ----------
4988  */
4989 static void
4991 {
4992  PgStat_StatDBEntry *dbentry;
4993  int i;
4994 
4995  dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
4996 
4997  /*
4998  * No need to purge if we don't even know the database.
4999  */
5000  if (!dbentry || !dbentry->tables)
5001  return;
5002 
5003  /*
5004  * Process all table entries in the message.
5005  */
5006  for (i = 0; i < msg->m_nentries; i++)
5007  {
5008  /* Remove from hashtable if present; we don't care if it's not. */
5009  (void) hash_search(dbentry->tables,
5010  (void *) &(msg->m_tableid[i]),
5011  HASH_REMOVE, NULL);
5012  }
5013 }
5014 
5015 
5016 /* ----------
5017  * pgstat_recv_dropdb() -
5018  *
5019  * Arrange for dead database removal
5020  * ----------
5021  */
5022 static void
5024 {
5025  Oid dbid = msg->m_databaseid;
5026  PgStat_StatDBEntry *dbentry;
5027 
5028  /*
5029  * Lookup the database in the hashtable.
5030  */
5031  dbentry = pgstat_get_db_entry(dbid, false);
5032 
5033  /*
5034  * If found, remove it (along with the db statfile).
5035  */
5036  if (dbentry)
5037  {
5038  char statfile[MAXPGPATH];
5039 
5040  get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
5041 
5042  elog(DEBUG2, "removing stats file \"%s\"", statfile);
5043  unlink(statfile);
5044 
5045  if (dbentry->tables != NULL)
5046  hash_destroy(dbentry->tables);
5047  if (dbentry->functions != NULL)
5048  hash_destroy(dbentry->functions);
5049 
5050  if (hash_search(pgStatDBHash,
5051  (void *) &dbid,
5052  HASH_REMOVE, NULL) == NULL)
5053  ereport(ERROR,
5054  (errmsg("database hash table corrupted during cleanup --- abort")));
5055  }
5056 }
5057 
5058 
5059 /* ----------
5060  * pgstat_recv_resetcounter() -
5061  *
5062  * Reset the statistics for the specified database.
5063  * ----------
5064  */
5065 static void
5067 {
5068  PgStat_StatDBEntry *dbentry;
5069 
5070  /*
5071  * Lookup the database in the hashtable. Nothing to do if not there.
5072  */
5073  dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5074 
5075  if (!dbentry)
5076  return;
5077 
5078  /*
5079  * We simply throw away all the database's table entries by recreating a
5080  * new hash table for them.
5081  */
5082  if (dbentry->tables != NULL)
5083  hash_destroy(dbentry->tables);
5084  if (dbentry->functions != NULL)
5085  hash_destroy(dbentry->functions);
5086 
5087  dbentry->tables = NULL;
5088  dbentry->functions = NULL;
5089 
5090  /*
5091  * Reset database-level stats, too. This creates empty hash tables for
5092  * tables and functions.
5093  */
5094  reset_dbentry_counters(dbentry);
5095 }
5096 
5097 /* ----------
5098  * pgstat_recv_resetsharedcounter() -
5099  *
5100  * Reset some shared statistics of the cluster.
5101  * ----------
5102  */
5103 static void
5105 {
5106  if (msg->m_resettarget == RESET_BGWRITER)
5107  {
5108  /* Reset the global background writer statistics for the cluster. */
5109  memset(&globalStats, 0, sizeof(globalStats));
5110  globalStats.stat_reset_timestamp = GetCurrentTimestamp();
5111  }
5112  else if (msg->m_resettarget == RESET_ARCHIVER)
5113  {
5114  /* Reset the archiver statistics for the cluster. */
5115  memset(&archiverStats, 0, sizeof(archiverStats));
5116  archiverStats.stat_reset_timestamp = GetCurrentTimestamp();
5117  }
5118  else if (msg->m_resettarget == RESET_WAL)
5119  {
5120  /* Reset the WAL statistics for the cluster. */
5121  memset(&walStats, 0, sizeof(walStats));
5123  }
5124 
5125  /*
5126  * Presumably the sender of this message validated the target, don't
5127  * complain here if it's not valid
5128  */
5129 }
5130 
5131 /* ----------
5132  * pgstat_recv_resetsinglecounter() -
5133  *
5134  * Reset a statistics for a single object
5135  * ----------
5136  */
5137 static void
5139 {
5140  PgStat_StatDBEntry *dbentry;
5141 
5142  dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5143 
5144  if (!dbentry)
5145  return;
5146 
5147  /* Set the reset timestamp for the whole database */
5149 
5150  /* Remove object if it exists, ignore it if not */
5151  if (msg->m_resettype == RESET_TABLE)
5152  (void) hash_search(dbentry->tables, (void *) &(msg->m_objectid),
5153  HASH_REMOVE, NULL);
5154  else if (msg->m_resettype == RESET_FUNCTION)
5155  (void) hash_search(dbentry->functions, (void *) &(msg->m_objectid),
5156  HASH_REMOVE, NULL);
5157 }
5158 
5159 /* ----------
5160  * pgstat_recv_resetslrucounter() -
5161  *
5162  * Reset some SLRU statistics of the cluster.
5163  * ----------
5164  */
5165 static void
5167 {
5168  int i;
5170 
5171  for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
5172  {
5173  /* reset entry with the given index, or all entries (index is -1) */
5174  if ((msg->m_index == -1) || (msg->m_index == i))
5175  {
5176  memset(&slruStats[i], 0, sizeof(slruStats[i]));
5177  slruStats[i].stat_reset_timestamp = ts;
5178  }
5179  }
5180 }
5181 
5182 /* ----------
5183  * pgstat_recv_resetreplslotcounter() -
5184  *
5185  * Reset some replication slot statistics of the cluster.
5186  * ----------
5187  */
5188 static void
5190  int len)
5191 {
5192  int i;
5193  int idx = -1;
5194  TimestampTz ts;
5195 
5196  ts = GetCurrentTimestamp();
5197  if (msg->clearall)
5198  {
5199  for (i = 0; i < nReplSlotStats; i++)
5200  pgstat_reset_replslot(i, ts);
5201  }
5202  else
5203  {
5204  /* Get the index of replication slot statistics to reset */
5205  idx = pgstat_replslot_index(NameStr(msg->m_slotname), false);
5206 
5207  /*
5208  * Nothing to do if the given slot entry is not found. This could
5209  * happen when the slot with the given name is removed and the
5210  * corresponding statistics entry is also removed before receiving the
5211  * reset message.
5212  */
5213  if (idx < 0)
5214  return;
5215 
5216  /* Reset the stats for the requested replication slot */
5217  pgstat_reset_replslot(idx, ts);
5218  }
5219 }
5220 
5221 
5222 /* ----------
5223  * pgstat_recv_autovac() -
5224  *
5225  * Process an autovacuum signaling message.
5226  * ----------
5227  */
5228 static void
5230 {
5231  PgStat_StatDBEntry *dbentry;
5232 
5233  /*
5234  * Store the last autovacuum time in the database's hashtable entry.
5235  */
5236  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5237 
5238  dbentry->last_autovac_time = msg->m_start_time;
5239 }
5240 
5241 /* ----------
5242  * pgstat_recv_vacuum() -
5243  *
5244  * Process a VACUUM message.
5245  * ----------
5246  */
5247 static void
5249 {
5250  PgStat_StatDBEntry *dbentry;
5251  PgStat_StatTabEntry *tabentry;
5252 
5253  /*
5254  * Store the data in the table's hashtable entry.
5255  */
5256  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5257 
5258  tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
5259 
5260  tabentry->n_live_tuples = msg->m_live_tuples;
5261  tabentry->n_dead_tuples = msg->m_dead_tuples;
5262 
5263  /*
5264  * It is quite possible that a non-aggressive VACUUM ended up skipping
5265  * various pages, however, we'll zero the insert counter here regardless.
5266  * It's currently used only to track when we need to perform an "insert"
5267  * autovacuum, which are mainly intended to freeze newly inserted tuples.
5268  * Zeroing this may just mean we'll not try to vacuum the table again
5269  * until enough tuples have been inserted to trigger another insert
5270  * autovacuum. An anti-wraparound autovacuum will catch any persistent
5271  * stragglers.
5272  */
5273  tabentry->inserts_since_vacuum = 0;
5274 
5275  if (msg->m_autovacuum)
5276  {
5277  tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime;
5278  tabentry->autovac_vacuum_count++;
5279  }
5280  else
5281  {
5282  tabentry->vacuum_timestamp = msg->m_vacuumtime;
5283  tabentry->vacuum_count++;
5284  }
5285 }
5286 
5287 /* ----------
5288  * pgstat_recv_analyze() -
5289  *
5290  * Process an ANALYZE message.
5291  * ----------
5292  */
5293 static void
5295 {
5296  PgStat_StatDBEntry *dbentry;
5297  PgStat_StatTabEntry *tabentry;
5298 
5299  /*
5300  * Store the data in the table's hashtable entry.
5301  */
5302  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5303 
5304  tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
5305 
5306  tabentry->n_live_tuples = msg->m_live_tuples;
5307  tabentry->n_dead_tuples = msg->m_dead_tuples;
5308 
5309  /*
5310  * If commanded, reset changes_since_analyze to zero. This forgets any
5311  * changes that were committed while the ANALYZE was in progress, but we
5312  * have no good way to estimate how many of those there were.
5313  */
5314  if (msg->m_resetcounter)
5315  {
5316  tabentry->changes_since_analyze = 0;
5317  tabentry->changes_since_analyze_reported = 0;
5318  }
5319 
5320  if (msg->m_autovacuum)
5321  {
5322  tabentry->autovac_analyze_timestamp = msg->m_analyzetime;
5323  tabentry->autovac_analyze_count++;
5324  }
5325  else
5326  {
5327  tabentry->analyze_timestamp = msg->m_analyzetime;
5328  tabentry->analyze_count++;
5329  }
5330 }
5331 
5332 static void
5334 {
5335  PgStat_StatDBEntry *dbentry;
5336  PgStat_StatTabEntry *tabentry;
5337 
5338  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5339 
5340  tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
5341 
5342  for (int i = 0; i < msg->m_nancestors; i++)
5343  {
5344  Oid ancestor_relid = msg->m_ancestors[i];
5345  PgStat_StatTabEntry *ancestor;
5346 
5347  ancestor = pgstat_get_tab_entry(dbentry, ancestor_relid, true);
5348  ancestor->changes_since_analyze +=
5350  }
5351 
5353 
5354 }
5355 
5356 /* ----------
5357  * pgstat_recv_archiver() -
5358  *
5359  * Process a ARCHIVER message.
5360  * ----------
5361  */
5362 static void
5364 {
5365  if (msg->m_failed)
5366  {
5367  /* Failed archival attempt */
5368  ++archiverStats.failed_count;
5369  memcpy(archiverStats.last_failed_wal, msg->m_xlog,
5370  sizeof(archiverStats.last_failed_wal));
5371  archiverStats.last_failed_timestamp = msg->m_timestamp;
5372  }
5373  else
5374  {
5375  /* Successful archival operation */
5376  ++archiverStats.archived_count;
5377  memcpy(archiverStats.last_archived_wal, msg->m_xlog,
5378  sizeof(archiverStats.last_archived_wal));
5379  archiverStats.last_archived_timestamp = msg->m_timestamp;
5380  }
5381 }
5382 
5383 /* ----------
5384  * pgstat_recv_bgwriter() -
5385  *
5386  * Process a BGWRITER message.
5387  * ----------
5388  */
5389 static void
5391 {
5392  globalStats.timed_checkpoints += msg->m_timed_checkpoints;
5393  globalStats.requested_checkpoints += msg->m_requested_checkpoints;
5394  globalStats.checkpoint_write_time += msg->m_checkpoint_write_time;
5395  globalStats.checkpoint_sync_time += msg->m_checkpoint_sync_time;
5397  globalStats.buf_written_clean += msg->m_buf_written_clean;
5398  globalStats.maxwritten_clean += msg->m_maxwritten_clean;
5399  globalStats.buf_written_backend += msg->m_buf_written_backend;
5400  globalStats.buf_fsync_backend += msg->m_buf_fsync_backend;
5401  globalStats.buf_alloc += msg->m_buf_alloc;
5402 }
5403 
5404 /* ----------
5405  * pgstat_recv_wal() -
5406  *
5407  * Process a WAL message.
5408  * ----------
5409  */
5410 static void
5412 {
5413  walStats.wal_records += msg->m_wal_records;
5414  walStats.wal_fpi += msg->m_wal_fpi;
5415  walStats.wal_bytes += msg->m_wal_bytes;
5416  walStats.wal_buffers_full += msg->m_wal_buffers_full;
5417  walStats.wal_write += msg->m_wal_write;
5418  walStats.wal_sync += msg->m_wal_sync;
5419  walStats.wal_write_time += msg->m_wal_write_time;
5420  walStats.wal_sync_time += msg->m_wal_sync_time;
5421 }
5422 
5423 /* ----------
5424  * pgstat_recv_slru() -
5425  *
5426  * Process a SLRU message.
5427  * ----------
5428  */
5429 static void
5431 {
5432  slruStats[msg->m_index].blocks_zeroed += msg->m_blocks_zeroed;
5433  slruStats[msg->m_index].blocks_hit += msg->m_blocks_hit;
5434  slruStats[msg->m_index].blocks_read += msg->m_blocks_read;
5435  slruStats[msg->m_index].blocks_written += msg->m_blocks_written;
5436  slruStats[msg->m_index].blocks_exists += msg->m_blocks_exists;
5437  slruStats[msg->m_index].flush += msg->m_flush;
5438  slruStats[msg->m_index].truncate += msg->m_truncate;
5439 }
5440 
5441 /* ----------
5442  * pgstat_recv_recoveryprefetch() -
5443  *
5444  * Process a recovery prefetch message.
5445  * ----------
5446  */
5447 static void
5449 {
5450  recoveryPrefetchStats = msg->m_stats;
5451 }
5452 
5453 /* ----------
5454  * pgstat_recv_recoveryconflict() -
5455  *
5456  * Process a RECOVERYCONFLICT message.
5457  * ----------
5458  */
5459 static void
5461 {
5462  PgStat_StatDBEntry *dbentry;
5463 
5464  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5465 
5466  switch (msg->m_reason)
5467  {
5469 
5470  /*
5471  * Since we drop the information about the database as soon as it
5472  * replicates, there is no point in counting these conflicts.
5473  */
5474  break;
5476  dbentry->n_conflict_tablespace++;
5477  break;
5479  dbentry->n_conflict_lock++;
5480  break;
5482  dbentry->n_conflict_snapshot++;
5483  break;
5485  dbentry->n_conflict_bufferpin++;
5486  break;
5488  dbentry->n_conflict_startup_deadlock++;
5489  break;
5490  }
5491 }
5492 
5493 /* ----------
5494  * pgstat_recv_deadlock() -
5495  *
5496  * Process a DEADLOCK message.
5497  * ----------
5498  */
5499 static void
5501 {
5502  PgStat_StatDBEntry *dbentry;
5503 
5504  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5505 
5506  dbentry->n_deadlocks++;
5507 }
5508 
5509 /* ----------
5510  * pgstat_recv_checksum_failure() -
5511  *
5512  * Process a CHECKSUMFAILURE message.
5513  * ----------
5514  */
5515 static void
5517 {
5518  PgStat_StatDBEntry *dbentry;
5519 
5520  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5521 
5522  dbentry->n_checksum_failures += msg->m_failurecount;
5523  dbentry->last_checksum_failure = msg->m_failure_time;
5524 }
5525 
5526 /* ----------
5527  * pgstat_recv_replslot() -
5528  *
5529  * Process a REPLSLOT message.
5530  * ----------
5531  */
5532 static void
5534 {
5535  int idx;
5536 
5537  /*
5538  * Get the index of replication slot statistics. On dropping, we don't
5539  * create the new statistics.
5540  */
5541  idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop);
5542 
5543  /*
5544  * The slot entry is not found or there is no space to accommodate the new
5545  * entry. This could happen when the message for the creation of a slot
5546  * reached before the drop message even though the actual operations
5547  * happen in reverse order. In such a case, the next update of the
5548  * statistics for the same slot will create the required entry.
5549  */
5550  if (idx < 0)
5551  return;
5552 
5553  /* it must be a valid replication slot index */
5554  Assert(idx < nReplSlotStats);
5555 
5556  if (msg->m_drop)
5557  {
5558  /* Remove the replication slot statistics with the given name */
5559  if (idx < nReplSlotStats - 1)
5560  memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
5561  sizeof(PgStat_ReplSlotStats));
5562  nReplSlotStats--;
5563  }
5564  else
5565  {
5566  /* Update the replication slot statistics */
5567  replSlotStats[idx].spill_txns += msg->m_spill_txns;
5568  replSlotStats[idx].spill_count += msg->m_spill_count;
5569  replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
5570  replSlotStats[idx].stream_txns += msg->m_stream_txns;
5571  replSlotStats[idx].stream_count += msg->m_stream_count;
5572  replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
5573  replSlotStats[idx].total_txns += msg->m_total_txns;
5574  replSlotStats[idx].total_bytes += msg->m_total_bytes;
5575  }
5576 }
5577 
5578 /* ----------
5579  * pgstat_recv_connstat() -
5580  *
5581  * Process connection information.
5582  * ----------
5583  */
5584 static void
5586 {
5587  PgStat_StatDBEntry *dbentry;
5588 
5589  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5590 
5591  dbentry->n_sessions += msg->m_count;
5592  dbentry->total_session_time += msg->m_session_time;
5593  dbentry->total_active_time += msg->m_active_time;
5595  switch (msg->m_disconnect)
5596  {
5597  case DISCONNECT_NOT_YET:
5598  case DISCONNECT_NORMAL:
5599  /* we don't collect these */
5600  break;
5601  case DISCONNECT_CLIENT_EOF:
5602  dbentry->n_sessions_abandoned++;
5603  break;
5604  case DISCONNECT_FATAL:
5605  dbentry->n_sessions_fatal++;
5606  break;
5607  case DISCONNECT_KILLED:
5608  dbentry->n_sessions_killed++;
5609  break;
5610  }
5611 }
5612 
5613 /* ----------
5614  * pgstat_recv_tempfile() -
5615  *
5616  * Process a TEMPFILE message.
5617  * ----------
5618  */
5619 static void
5621 {
5622  PgStat_StatDBEntry *dbentry;
5623 
5624  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5625 
5626  dbentry->n_temp_bytes += msg->m_filesize;
5627  dbentry->n_temp_files += 1;
5628 }
5629 
5630 /* ----------
5631  * pgstat_recv_funcstat() -
5632  *
5633  * Count what the backend has done.
5634  * ----------
5635  */
5636 static void
5638 {
5639  PgStat_FunctionEntry *funcmsg = &(msg->m_entry[0]);
5640  PgStat_StatDBEntry *dbentry;
5641  PgStat_StatFuncEntry *funcentry;
5642  int i;
5643  bool found;
5644 
5645  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5646 
5647  /*
5648  * Process all function entries in the message.
5649  */
5650  for (i = 0; i < msg->m_nentries; i++, funcmsg++)
5651  {
5652  funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
5653  (void *) &(funcmsg->f_id),
5654  HASH_ENTER, &found);
5655 
5656  if (!found)
5657  {
5658  /*
5659  * If it's a new function entry, initialize counters to the values
5660  * we just got.
5661  */
5662  funcentry->f_numcalls = funcmsg->f_numcalls;
5663  funcentry->f_total_time = funcmsg->f_total_time;
5664  funcentry->f_self_time = funcmsg->f_self_time;
5665  }
5666  else
5667  {
5668  /*
5669  * Otherwise add the values to the existing entry.
5670  */
5671  funcentry->f_numcalls += funcmsg->f_numcalls;
5672  funcentry->f_total_time += funcmsg->f_total_time;
5673  funcentry->f_self_time += funcmsg->f_self_time;
5674  }
5675  }
5676 }
5677 
5678 /* ----------
5679  * pgstat_recv_funcpurge() -
5680  *
5681  * Arrange for dead function removal.
5682  * ----------
5683  */
5684 static void
5686 {
5687  PgStat_StatDBEntry *dbentry;
5688  int i;
5689 
5690  dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5691 
5692  /*
5693  * No need to purge if we don't even know the database.
5694  */
5695  if (!dbentry || !dbentry->functions)
5696  return;
5697 
5698  /*
5699  * Process all function entries in the message.
5700  */
5701  for (i = 0; i < msg->m_nentries; i++)
5702  {
5703  /* Remove from hashtable if present; we don't care if it's not. */
5704  (void) hash_search(dbentry->functions,
5705  (void *) &(msg->m_functionid[i]),
5706  HASH_REMOVE, NULL);
5707  }
5708 }
5709 
5710 /* ----------
5711  * pgstat_write_statsfile_needed() -
5712  *
5713  * Do we need to write out any stats files?
5714  * ----------
5715  */
5716 static bool
5718 {
5719  if (pending_write_requests != NIL)
5720  return true;
5721 
5722  /* Everything was written recently */
5723  return false;
5724 }
5725 
5726 /* ----------
5727  * pgstat_db_requested() -
5728  *
5729  * Checks whether stats for a particular DB need to be written to a file.
5730  * ----------
5731  */
5732 static bool
5734 {
5735  /*
5736  * If any requests are outstanding at all, we should write the stats for
5737  * shared catalogs (the "database" with OID 0). This ensures that
5738  * backends will see up-to-date stats for shared catalogs, even though
5739  * they send inquiry messages mentioning only their own DB.
5740  */
5741  if (databaseid == InvalidOid && pending_write_requests != NIL)
5742  return true;
5743 
5744  /* Search to see if there's an open request to write this database. */
5745  if (list_member_oid(pending_write_requests, databaseid))
5746  return true;
5747 
5748  return false;
5749 }
5750 
5751 /* ----------
5752  * pgstat_replslot_index
5753  *
5754  * Return the index of entry of a replication slot with the given name, or
5755  * -1 if the slot is not found.
5756  *
5757  * create_it tells whether to create the new slot entry if it is not found.
5758  * ----------
5759  */
5760 static int
5761 pgstat_replslot_index(const char *name, bool create_it)
5762 {
5763  int i;
5764 
5766  for (i = 0; i < nReplSlotStats; i++)
5767  {
5768  if (namestrcmp(&replSlotStats[i].slotname, name) == 0)
5769  return i; /* found */
5770  }
5771 
5772  /*
5773  * The slot is not found. We don't want to register the new statistics if
5774  * the list is already full or the caller didn't request.
5775  */
5776  if (i == max_replication_slots || !create_it)
5777  return -1;
5778 
5779  /* Register new slot */
5780  memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
5781  namestrcpy(&replSlotStats[nReplSlotStats].slotname, name);
5782 
5783  return nReplSlotStats++;
5784 }
5785 
5786 /* ----------
5787  * pgstat_reset_replslot
5788  *
5789  * Reset the replication slot stats at index 'i'.
5790  * ----------
5791  */
5792 static void
5794 {
5795  /* reset only counters. Don't clear slot name */
5796  replSlotStats[i].spill_txns = 0;
5797  replSlotStats[i].spill_count = 0;
5798  replSlotStats[i].spill_bytes = 0;
5799  replSlotStats[i].stream_txns = 0;
5800  replSlotStats[i].stream_count = 0;
5801  replSlotStats[i].stream_bytes = 0;
5802  replSlotStats[i].total_txns = 0;
5803  replSlotStats[i].total_bytes = 0;
5804  replSlotStats[i].stat_reset_timestamp = ts;
5805 }
5806 
5807 /*
5808  * pgstat_slru_index
5809  *
5810  * Determine index of entry for a SLRU with a given name. If there's no exact
5811  * match, returns index of the last "other" entry used for SLRUs defined in
5812  * external projects.
5813  */
5814 int
5816 {
5817  int i;
5818 
5819  for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
5820  {
5821  if (strcmp(slru_names[i], name) == 0)
5822  return i;
5823  }
5824 
5825  /* return index of the last entry (which is the "other" one) */
5826  return (SLRU_NUM_ELEMENTS - 1);
5827 }
5828 
5829 /*
5830  * pgstat_slru_name
5831  *
5832  * Returns SLRU name for an index. The index may be above SLRU_NUM_ELEMENTS,
5833  * in which case this returns NULL. This allows writing code that does not
5834  * know the number of entries in advance.
5835  */
5836 const char *
5837 pgstat_slru_name(int slru_idx)
5838 {
5839  if (slru_idx < 0 || slru_idx >= SLRU_NUM_ELEMENTS)
5840  return NULL;
5841 
5842  return slru_names[slru_idx];
5843 }
5844 
5845 /*
5846  * slru_entry
5847  *
5848  * Returns pointer to entry with counters for given SLRU (based on the name
5849  * stored in SlruCtl as lwlock tranche name).
5850  */
5851 static inline PgStat_MsgSLRU *
5852 slru_entry(int slru_idx)
5853 {
5854  /*
5855  * The postmaster should never register any SLRU statistics counts; if it
5856  * did, the counts would be duplicated into child processes via fork().
5857  */
5859 
5860  Assert((slru_idx >= 0) && (slru_idx < SLRU_NUM_ELEMENTS));
5861 
5862  return &SLRUStats[slru_idx];
5863 }
5864 
5865 /*
5866  * SLRU statistics count accumulation functions --- called from slru.c
5867  */
5868 
5869 void
5871 {