PostgreSQL Source Code  git master
pgstat.c
Go to the documentation of this file.
1 /* ----------
2  * pgstat.c
3  *
4  * All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  * TODO: - Separate collector, postmaster and backend stuff
7  * into different files.
8  *
9  * - Add some automatic call for pgstat vacuuming.
10  *
11  * - Add a pgstat config column to pg_database, so this
12  * entire thing can be enabled/disabled on a per db basis.
13  *
14  * Copyright (c) 2001-2021, PostgreSQL Global Development Group
15  *
16  * src/backend/postmaster/pgstat.c
17  * ----------
18  */
19 #include "postgres.h"
20 
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <signal.h>
30 #include <time.h>
31 #ifdef HAVE_SYS_SELECT_H
32 #include <sys/select.h>
33 #endif
34 
35 #include "access/heapam.h"
36 #include "access/htup_details.h"
37 #include "access/tableam.h"
38 #include "access/transam.h"
39 #include "access/twophase_rmgr.h"
40 #include "access/xact.h"
41 #include "catalog/catalog.h"
42 #include "catalog/pg_database.h"
43 #include "catalog/pg_proc.h"
44 #include "common/ip.h"
45 #include "executor/instrument.h"
46 #include "libpq/libpq.h"
47 #include "libpq/pqsignal.h"
48 #include "mb/pg_wchar.h"
49 #include "miscadmin.h"
50 #include "pgstat.h"
51 #include "postmaster/autovacuum.h"
53 #include "postmaster/interrupt.h"
54 #include "postmaster/postmaster.h"
55 #include "replication/slot.h"
56 #include "replication/walsender.h"
57 #include "storage/backendid.h"
58 #include "storage/dsm.h"
59 #include "storage/fd.h"
60 #include "storage/ipc.h"
61 #include "storage/latch.h"
62 #include "storage/lmgr.h"
63 #include "storage/pg_shmem.h"
64 #include "storage/proc.h"
65 #include "storage/procsignal.h"
66 #include "utils/builtins.h"
67 #include "utils/guc.h"
68 #include "utils/memutils.h"
69 #include "utils/ps_status.h"
70 #include "utils/rel.h"
71 #include "utils/snapmgr.h"
72 #include "utils/timestamp.h"
73 
74 /* ----------
75  * Timer definitions.
76  * ----------
77  */
78 #define PGSTAT_STAT_INTERVAL 500 /* Minimum time between stats file
79  * updates; in milliseconds. */
80 
81 #define PGSTAT_RETRY_DELAY 10 /* How long to wait between checks for a
82  * new file; in milliseconds. */
83 
84 #define PGSTAT_MAX_WAIT_TIME 10000 /* Maximum time to wait for a stats
85  * file update; in milliseconds. */
86 
87 #define PGSTAT_INQ_INTERVAL 640 /* How often to ping the collector for a
88  * new file; in milliseconds. */
89 
90 #define PGSTAT_RESTART_INTERVAL 60 /* How often to attempt to restart a
91  * failed statistics collector; in
92  * seconds. */
93 
94 #define PGSTAT_POLL_LOOP_COUNT (PGSTAT_MAX_WAIT_TIME / PGSTAT_RETRY_DELAY)
95 #define PGSTAT_INQ_LOOP_COUNT (PGSTAT_INQ_INTERVAL / PGSTAT_RETRY_DELAY)
96 
97 /* Minimum receive buffer size for the collector's socket. */
98 #define PGSTAT_MIN_RCVBUF (100 * 1024)
99 
100 
101 /* ----------
102  * The initial size hints for the hash tables used in the collector.
103  * ----------
104  */
105 #define PGSTAT_DB_HASH_SIZE 16
106 #define PGSTAT_TAB_HASH_SIZE 512
107 #define PGSTAT_FUNCTION_HASH_SIZE 512
108 #define PGSTAT_REPLSLOT_HASH_SIZE 32
109 
110 
111 /* ----------
112  * GUC parameters
113  * ----------
114  */
115 bool pgstat_track_counts = false;
117 
118 /* ----------
119  * Built from GUC parameter
120  * ----------
121  */
123 char *pgstat_stat_filename = NULL;
124 char *pgstat_stat_tmpname = NULL;
125 
126 /*
127  * BgWriter and WAL global statistics counters.
128  * Stored directly in a stats message structure so they can be sent
129  * without needing to copy things around. We assume these init to zeroes.
130  */
134 
135 /*
136  * WAL usage counters saved from pgWALUsage at the previous call to
137  * pgstat_send_wal(). This is used to calculate how much WAL usage
138  * happens between pgstat_send_wal() calls, by subtracting
139  * the previous counters from the current ones.
140  */
142 
143 /*
144  * List of SLRU names that we keep stats for. There is no central registry of
145  * SLRUs, so we use this fixed list instead. The "other" entry is used for
146  * all SLRUs without an explicit entry (e.g. SLRUs in extensions).
147  */
148 static const char *const slru_names[] = {
149  "CommitTs",
150  "MultiXactMember",
151  "MultiXactOffset",
152  "Notify",
153  "Serial",
154  "Subtrans",
155  "Xact",
156  "other" /* has to be last */
157 };
158 
159 #define SLRU_NUM_ELEMENTS lengthof(slru_names)
160 
161 /*
162  * SLRU statistics counts waiting to be sent to the collector. These are
163  * stored directly in stats message format so they can be sent without needing
164  * to copy things around. We assume this variable inits to zeroes. Entries
165  * are one-to-one with slru_names[].
166  */
168 
169 /* ----------
170  * Local data
171  * ----------
172  */
174 
176 
178 
179 static bool pgStatRunningInCollector = false;
180 
181 /*
182  * Structures in which backends store per-table info that's waiting to be
183  * sent to the collector.
184  *
185  * NOTE: once allocated, TabStatusArray structures are never moved or deleted
186  * for the life of the backend. Also, we zero out the t_id fields of the
187  * contained PgStat_TableStatus structs whenever they are not actively in use.
188  * This allows relcache pgstat_info pointers to be treated as long-lived data,
189  * avoiding repeated searches in pgstat_initstats() when a relation is
190  * repeatedly opened during a transaction.
191  */
192 #define TABSTAT_QUANTUM 100 /* we alloc this many at a time */
193 
194 typedef struct TabStatusArray
195 {
196  struct TabStatusArray *tsa_next; /* link to next array, if any */
197  int tsa_used; /* # entries currently used */
200 
202 
203 /*
204  * pgStatTabHash entry: map from relation OID to PgStat_TableStatus pointer
205  */
206 typedef struct TabStatHashEntry
207 {
211 
212 /*
213  * Hash table for O(1) t_id -> tsa_entry lookup
214  */
215 static HTAB *pgStatTabHash = NULL;
216 
217 /*
218  * Backends store per-function info that's waiting to be sent to the collector
219  * in this hash table (indexed by function OID).
220  */
221 static HTAB *pgStatFunctions = NULL;
222 
223 /*
224  * Indicates if backend has some function stats that it hasn't yet
225  * sent to the collector.
226  */
227 static bool have_function_stats = false;
228 
229 /*
230  * Tuple insertion/deletion counts for an open transaction can't be propagated
231  * into PgStat_TableStatus counters until we know if it is going to commit
232  * or abort. Hence, we keep these counts in per-subxact structs that live
233  * in TopTransactionContext. This data structure is designed on the assumption
234  * that subxacts won't usually modify very many tables.
235  */
236 typedef struct PgStat_SubXactStatus
237 {
238  int nest_level; /* subtransaction nest level */
239  struct PgStat_SubXactStatus *prev; /* higher-level subxact if any */
240  PgStat_TableXactStatus *first; /* head of list for this subxact */
242 
244 
245 static int pgStatXactCommit = 0;
246 static int pgStatXactRollback = 0;
253 
254 /* Record that's written to 2PC state file when pgstat state is persisted */
255 typedef struct TwoPhasePgStatRecord
256 {
257  PgStat_Counter tuples_inserted; /* tuples inserted in xact */
258  PgStat_Counter tuples_updated; /* tuples updated in xact */
259  PgStat_Counter tuples_deleted; /* tuples deleted in xact */
260  PgStat_Counter inserted_pre_trunc; /* tuples inserted prior to truncate */
261  PgStat_Counter updated_pre_trunc; /* tuples updated prior to truncate */
262  PgStat_Counter deleted_pre_trunc; /* tuples deleted prior to truncate */
263  Oid t_id; /* table's OID */
264  bool t_shared; /* is it a shared catalog? */
265  bool t_truncated; /* was the relation truncated? */
267 
268 /*
269  * Info about current "snapshot" of stats file
270  */
272 static HTAB *pgStatDBHash = NULL;
273 
274 /*
275  * Cluster wide statistics, kept in the stats collector.
276  * Contains statistics that are not collected per database
277  * or per table.
278  */
283 static HTAB *replSlotStatHash = NULL;
284 
285 /*
286  * List of OIDs of databases we need to write out. If an entry is InvalidOid,
287  * it means to write only the shared-catalog stats ("DB 0"); otherwise, we
288  * will write both that DB's data and the shared stats.
289  */
291 
292 /*
293  * Total time charged to functions so far in the current backend.
294  * We use this to help separate "self" and "other" time charges.
295  * (We assume this initializes to zero.)
296  */
298 
299 /*
300  * For assertions that check pgstat is not used before initialization / after
301  * shutdown.
302  */
303 #ifdef USE_ASSERT_CHECKING
304 static bool pgstat_is_initialized = false;
305 static bool pgstat_is_shutdown = false;
306 #endif
307 
308 
309 /* ----------
310  * Local function forward declarations
311  * ----------
312  */
313 #ifdef EXEC_BACKEND
314 static pid_t pgstat_forkexec(void);
315 #endif
316 
317 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_noreturn();
318 
319 static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
321  Oid tableoid, bool create);
322 static void pgstat_write_statsfiles(bool permanent, bool allDbs);
323 static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
324 static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
325 static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent);
326 static void backend_read_statsfile(void);
327 
328 static bool pgstat_write_statsfile_needed(void);
329 static bool pgstat_db_requested(Oid databaseid);
330 
333 
335 static void pgstat_send_funcstats(void);
336 static void pgstat_send_slru(void);
337 static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid);
338 static bool pgstat_should_report_connstat(void);
339 static void pgstat_report_disconnect(Oid dboid);
340 
341 static PgStat_TableStatus *get_tabstat_entry(Oid rel_id, bool isshared);
342 
343 static void pgstat_setup_memcxt(void);
344 static void pgstat_assert_is_up(void);
345 
346 static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
347 static void pgstat_send(void *msg, int len);
348 
349 static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len);
350 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
351 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
352 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
353 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
358 static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
359 static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
360 static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
361 static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len);
362 static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
363 static void pgstat_recv_checkpointer(PgStat_MsgCheckpointer *msg, int len);
364 static void pgstat_recv_wal(PgStat_MsgWal *msg, int len);
365 static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len);
366 static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
367 static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
369 static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
371 static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len);
372 static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len);
373 static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
374 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
375 
376 /* ------------------------------------------------------------
377  * Public functions called from postmaster follow
378  * ------------------------------------------------------------
379  */
380 
381 /* ----------
382  * pgstat_init() -
383  *
384  * Called from postmaster at startup. Create the resources required
385  * by the statistics collector process. If unable to do so, do not
386  * fail --- better to let the postmaster start with stats collection
387  * disabled.
388  * ----------
389  */
390 void
392 {
393  ACCEPT_TYPE_ARG3 alen;
394  struct addrinfo *addrs = NULL,
395  *addr,
396  hints;
397  int ret;
398  fd_set rset;
399  struct timeval tv;
400  char test_byte;
401  int sel_res;
402  int tries = 0;
403 
404 #define TESTBYTEVAL ((char) 199)
405 
406  /*
407  * This static assertion verifies that we didn't mess up the calculations
408  * involved in selecting maximum payload sizes for our UDP messages.
409  * Because the only consequence of overrunning PGSTAT_MAX_MSG_SIZE would
410  * be silent performance loss from fragmentation, it seems worth having a
411  * compile-time cross-check that we didn't.
412  */
414  "maximum stats message size exceeds PGSTAT_MAX_MSG_SIZE");
415 
416  /*
417  * Create the UDP socket for sending and receiving statistic messages
418  */
419  hints.ai_flags = AI_PASSIVE;
420  hints.ai_family = AF_UNSPEC;
421  hints.ai_socktype = SOCK_DGRAM;
422  hints.ai_protocol = 0;
423  hints.ai_addrlen = 0;
424  hints.ai_addr = NULL;
425  hints.ai_canonname = NULL;
426  hints.ai_next = NULL;
427  ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
428  if (ret || !addrs)
429  {
430  ereport(LOG,
431  (errmsg("could not resolve \"localhost\": %s",
432  gai_strerror(ret))));
433  goto startup_failed;
434  }
435 
436  /*
437  * On some platforms, pg_getaddrinfo_all() may return multiple addresses
438  * only one of which will actually work (eg, both IPv6 and IPv4 addresses
439  * when kernel will reject IPv6). Worse, the failure may occur at the
440  * bind() or perhaps even connect() stage. So we must loop through the
441  * results till we find a working combination. We will generate LOG
442  * messages, but no error, for bogus combinations.
443  */
444  for (addr = addrs; addr; addr = addr->ai_next)
445  {
446 #ifdef HAVE_UNIX_SOCKETS
447  /* Ignore AF_UNIX sockets, if any are returned. */
448  if (addr->ai_family == AF_UNIX)
449  continue;
450 #endif
451 
452  if (++tries > 1)
453  ereport(LOG,
454  (errmsg("trying another address for the statistics collector")));
455 
456  /*
457  * Create the socket.
458  */
459  if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) == PGINVALID_SOCKET)
460  {
461  ereport(LOG,
463  errmsg("could not create socket for statistics collector: %m")));
464  continue;
465  }
466 
467  /*
468  * Bind it to a kernel assigned port on localhost and get the assigned
469  * port via getsockname().
470  */
471  if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
472  {
473  ereport(LOG,
475  errmsg("could not bind socket for statistics collector: %m")));
478  continue;
479  }
480 
481  alen = sizeof(pgStatAddr);
482  if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0)
483  {
484  ereport(LOG,
486  errmsg("could not get address of socket for statistics collector: %m")));
489  continue;
490  }
491 
492  /*
493  * Connect the socket to its own address. This saves a few cycles by
494  * not having to respecify the target address on every send. This also
495  * provides a kernel-level check that only packets from this same
496  * address will be received.
497  */
498  if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0)
499  {
500  ereport(LOG,
502  errmsg("could not connect socket for statistics collector: %m")));
505  continue;
506  }
507 
508  /*
509  * Try to send and receive a one-byte test message on the socket. This
510  * is to catch situations where the socket can be created but will not
511  * actually pass data (for instance, because kernel packet filtering
512  * rules prevent it).
513  */
514  test_byte = TESTBYTEVAL;
515 
516 retry1:
517  if (send(pgStatSock, &test_byte, 1, 0) != 1)
518  {
519  if (errno == EINTR)
520  goto retry1; /* if interrupted, just retry */
521  ereport(LOG,
523  errmsg("could not send test message on socket for statistics collector: %m")));
526  continue;
527  }
528 
529  /*
530  * There could possibly be a little delay before the message can be
531  * received. We arbitrarily allow up to half a second before deciding
532  * it's broken.
533  */
534  for (;;) /* need a loop to handle EINTR */
535  {
536  FD_ZERO(&rset);
537  FD_SET(pgStatSock, &rset);
538 
539  tv.tv_sec = 0;
540  tv.tv_usec = 500000;
541  sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
542  if (sel_res >= 0 || errno != EINTR)
543  break;
544  }
545  if (sel_res < 0)
546  {
547  ereport(LOG,
549  errmsg("select() failed in statistics collector: %m")));
552  continue;
553  }
554  if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
555  {
556  /*
557  * This is the case we actually think is likely, so take pains to
558  * give a specific message for it.
559  *
560  * errno will not be set meaningfully here, so don't use it.
561  */
562  ereport(LOG,
563  (errcode(ERRCODE_CONNECTION_FAILURE),
564  errmsg("test message did not get through on socket for statistics collector")));
567  continue;
568  }
569 
570  test_byte++; /* just make sure variable is changed */
571 
572 retry2:
573  if (recv(pgStatSock, &test_byte, 1, 0) != 1)
574  {
575  if (errno == EINTR)
576  goto retry2; /* if interrupted, just retry */
577  ereport(LOG,
579  errmsg("could not receive test message on socket for statistics collector: %m")));
582  continue;
583  }
584 
585  if (test_byte != TESTBYTEVAL) /* strictly paranoia ... */
586  {
587  ereport(LOG,
588  (errcode(ERRCODE_INTERNAL_ERROR),
589  errmsg("incorrect test message transmission on socket for statistics collector")));
592  continue;
593  }
594 
595  /* If we get here, we have a working socket */
596  break;
597  }
598 
599  /* Did we find a working address? */
600  if (!addr || pgStatSock == PGINVALID_SOCKET)
601  goto startup_failed;
602 
603  /*
604  * Set the socket to non-blocking IO. This ensures that if the collector
605  * falls behind, statistics messages will be discarded; backends won't
606  * block waiting to send messages to the collector.
607  */
609  {
610  ereport(LOG,
612  errmsg("could not set statistics collector socket to nonblocking mode: %m")));
613  goto startup_failed;
614  }
615 
616  /*
617  * Try to ensure that the socket's receive buffer is at least
618  * PGSTAT_MIN_RCVBUF bytes, so that it won't easily overflow and lose
619  * data. Use of UDP protocol means that we are willing to lose data under
620  * heavy load, but we don't want it to happen just because of ridiculously
621  * small default buffer sizes (such as 8KB on older Windows versions).
622  */
623  {
624  int old_rcvbuf;
625  int new_rcvbuf;
626  ACCEPT_TYPE_ARG3 rcvbufsize = sizeof(old_rcvbuf);
627 
628  if (getsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
629  (char *) &old_rcvbuf, &rcvbufsize) < 0)
630  {
631  ereport(LOG,
632  (errmsg("%s(%s) failed: %m", "getsockopt", "SO_RCVBUF")));
633  /* if we can't get existing size, always try to set it */
634  old_rcvbuf = 0;
635  }
636 
637  new_rcvbuf = PGSTAT_MIN_RCVBUF;
638  if (old_rcvbuf < new_rcvbuf)
639  {
640  if (setsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
641  (char *) &new_rcvbuf, sizeof(new_rcvbuf)) < 0)
642  ereport(LOG,
643  (errmsg("%s(%s) failed: %m", "setsockopt", "SO_RCVBUF")));
644  }
645  }
646 
647  pg_freeaddrinfo_all(hints.ai_family, addrs);
648 
649  /* Now that we have a long-lived socket, tell fd.c about it. */
651 
652  return;
653 
654 startup_failed:
655  ereport(LOG,
656  (errmsg("disabling statistics collector for lack of working socket")));
657 
658  if (addrs)
659  pg_freeaddrinfo_all(hints.ai_family, addrs);
660 
664 
665  /*
666  * Adjust GUC variables to suppress useless activity, and for debugging
667  * purposes (seeing track_counts off is a clue that we failed here). We
668  * use PGC_S_OVERRIDE because there is no point in trying to turn it back
669  * on from postgresql.conf without a restart.
670  */
671  SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE);
672 }
673 
674 /*
675  * subroutine for pgstat_reset_all
676  */
677 static void
679 {
680  DIR *dir;
681  struct dirent *entry;
682  char fname[MAXPGPATH * 2];
683 
684  dir = AllocateDir(directory);
685  while ((entry = ReadDir(dir, directory)) != NULL)
686  {
687  int nchars;
688  Oid tmp_oid;
689 
690  /*
691  * Skip directory entries that don't match the file names we write.
692  * See get_dbstat_filename for the database-specific pattern.
693  */
694  if (strncmp(entry->d_name, "global.", 7) == 0)
695  nchars = 7;
696  else
697  {
698  nchars = 0;
699  (void) sscanf(entry->d_name, "db_%u.%n",
700  &tmp_oid, &nchars);
701  if (nchars <= 0)
702  continue;
703  /* %u allows leading whitespace, so reject that */
704  if (strchr("0123456789", entry->d_name[3]) == NULL)
705  continue;
706  }
707 
708  if (strcmp(entry->d_name + nchars, "tmp") != 0 &&
709  strcmp(entry->d_name + nchars, "stat") != 0)
710  continue;
711 
712  snprintf(fname, sizeof(fname), "%s/%s", directory,
713  entry->d_name);
714  unlink(fname);
715  }
716  FreeDir(dir);
717 }
718 
719 /*
720  * pgstat_reset_all() -
721  *
722  * Remove the stats files. This is currently used only if WAL
723  * recovery is needed after a crash.
724  */
725 void
727 {
730 }
731 
732 #ifdef EXEC_BACKEND
733 
734 /*
735  * pgstat_forkexec() -
736  *
737  * Format up the arglist for, then fork and exec, statistics collector process
738  */
739 static pid_t
740 pgstat_forkexec(void)
741 {
742  char *av[10];
743  int ac = 0;
744 
745  av[ac++] = "postgres";
746  av[ac++] = "--forkcol";
747  av[ac++] = NULL; /* filled in by postmaster_forkexec */
748 
749  av[ac] = NULL;
750  Assert(ac < lengthof(av));
751 
752  return postmaster_forkexec(ac, av);
753 }
754 #endif /* EXEC_BACKEND */
755 
756 
757 /*
758  * pgstat_start() -
759  *
760  * Called from postmaster at startup or after an existing collector
761  * died. Attempt to fire up a fresh statistics collector.
762  *
763  * Returns PID of child process, or 0 if fail.
764  *
765  * Note: if fail, we will be called again from the postmaster main loop.
766  */
767 int
769 {
770  time_t curtime;
771  pid_t pgStatPid;
772 
773  /*
774  * Check that the socket is there, else pgstat_init failed and we can do
775  * nothing useful.
776  */
778  return 0;
779 
780  /*
781  * Do nothing if too soon since last collector start. This is a safety
782  * valve to protect against continuous respawn attempts if the collector
783  * is dying immediately at launch. Note that since we will be re-called
784  * from the postmaster main loop, we will get another chance later.
785  */
786  curtime = time(NULL);
787  if ((unsigned int) (curtime - last_pgstat_start_time) <
788  (unsigned int) PGSTAT_RESTART_INTERVAL)
789  return 0;
790  last_pgstat_start_time = curtime;
791 
792  /*
793  * Okay, fork off the collector.
794  */
795 #ifdef EXEC_BACKEND
796  switch ((pgStatPid = pgstat_forkexec()))
797 #else
798  switch ((pgStatPid = fork_process()))
799 #endif
800  {
801  case -1:
802  ereport(LOG,
803  (errmsg("could not fork statistics collector: %m")));
804  return 0;
805 
806 #ifndef EXEC_BACKEND
807  case 0:
808  /* in postmaster child ... */
810 
811  /* Close the postmaster's sockets */
812  ClosePostmasterPorts(false);
813 
814  /* Drop our connection to postmaster's shared memory, as well */
815  dsm_detach_all();
817 
818  PgstatCollectorMain(0, NULL);
819  break;
820 #endif
821 
822  default:
823  return (int) pgStatPid;
824  }
825 
826  /* shouldn't get here */
827  return 0;
828 }
829 
830 void
832 {
834 }
835 
836 /* ------------------------------------------------------------
837  * Public functions used by backends follow
838  *------------------------------------------------------------
839  */
840 
841 
842 /* ----------
843  * pgstat_report_stat() -
844  *
845  * Must be called by processes that performs DML: tcop/postgres.c, logical
846  * receiver processes, SPI worker, etc. to send the so far collected
847  * per-table and function usage statistics to the collector. Note that this
848  * is called only when not within a transaction, so it is fair to use
849  * transaction stop time as an approximation of current time.
850  *
851  * "disconnect" is "true" only for the last call before the backend
852  * exits. This makes sure that no data is lost and that interrupted
853  * sessions are reported correctly.
854  * ----------
855  */
856 void
857 pgstat_report_stat(bool disconnect)
858 {
859  /* we assume this inits to all zeroes: */
860  static const PgStat_TableCounts all_zeroes;
861  static TimestampTz last_report = 0;
862 
864  PgStat_MsgTabstat regular_msg;
865  PgStat_MsgTabstat shared_msg;
866  TabStatusArray *tsa;
867  int i;
868 
870 
871  /*
872  * Don't expend a clock check if nothing to do.
873  *
874  * To determine whether any WAL activity has occurred since last time, not
875  * only the number of generated WAL records but also the numbers of WAL
876  * writes and syncs need to be checked. Because even transaction that
877  * generates no WAL records can write or sync WAL data when flushing the
878  * data pages.
879  */
880  if ((pgStatTabList == NULL || pgStatTabList->tsa_used == 0) &&
881  pgStatXactCommit == 0 && pgStatXactRollback == 0 &&
882  pgWalUsage.wal_records == prevWalUsage.wal_records &&
883  WalStats.m_wal_write == 0 && WalStats.m_wal_sync == 0 &&
884  !have_function_stats && !disconnect)
885  return;
886 
887  /*
888  * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
889  * msec since we last sent one, or the backend is about to exit.
890  */
892  if (!disconnect &&
894  return;
895 
896  last_report = now;
897 
898  if (disconnect)
900 
901  /*
902  * Destroy pgStatTabHash before we start invalidating PgStat_TableEntry
903  * entries it points to. (Should we fail partway through the loop below,
904  * it's okay to have removed the hashtable already --- the only
905  * consequence is we'd get multiple entries for the same table in the
906  * pgStatTabList, and that's safe.)
907  */
908  if (pgStatTabHash)
909  hash_destroy(pgStatTabHash);
910  pgStatTabHash = NULL;
911 
912  /*
913  * Scan through the TabStatusArray struct(s) to find tables that actually
914  * have counts, and build messages to send. We have to separate shared
915  * relations from regular ones because the databaseid field in the message
916  * header has to depend on that.
917  */
918  regular_msg.m_databaseid = MyDatabaseId;
919  shared_msg.m_databaseid = InvalidOid;
920  regular_msg.m_nentries = 0;
921  shared_msg.m_nentries = 0;
922 
923  for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
924  {
925  for (i = 0; i < tsa->tsa_used; i++)
926  {
927  PgStat_TableStatus *entry = &tsa->tsa_entries[i];
928  PgStat_MsgTabstat *this_msg;
929  PgStat_TableEntry *this_ent;
930 
931  /* Shouldn't have any pending transaction-dependent counts */
932  Assert(entry->trans == NULL);
933 
934  /*
935  * Ignore entries that didn't accumulate any actual counts, such
936  * as indexes that were opened by the planner but not used.
937  */
938  if (memcmp(&entry->t_counts, &all_zeroes,
939  sizeof(PgStat_TableCounts)) == 0)
940  continue;
941 
942  /*
943  * OK, insert data into the appropriate message, and send if full.
944  */
945  this_msg = entry->t_shared ? &shared_msg : &regular_msg;
946  this_ent = &this_msg->m_entry[this_msg->m_nentries];
947  this_ent->t_id = entry->t_id;
948  memcpy(&this_ent->t_counts, &entry->t_counts,
949  sizeof(PgStat_TableCounts));
950  if (++this_msg->m_nentries >= PGSTAT_NUM_TABENTRIES)
951  {
952  pgstat_send_tabstat(this_msg, now);
953  this_msg->m_nentries = 0;
954  }
955  }
956  /* zero out PgStat_TableStatus structs after use */
957  MemSet(tsa->tsa_entries, 0,
958  tsa->tsa_used * sizeof(PgStat_TableStatus));
959  tsa->tsa_used = 0;
960  }
961 
962  /*
963  * Send partial messages. Make sure that any pending xact commit/abort
964  * and connection stats get counted, even if there are no table stats to
965  * send.
966  */
967  if (regular_msg.m_nentries > 0 ||
968  pgStatXactCommit > 0 || pgStatXactRollback > 0 || disconnect)
969  pgstat_send_tabstat(&regular_msg, now);
970  if (shared_msg.m_nentries > 0)
971  pgstat_send_tabstat(&shared_msg, now);
972 
973  /* Now, send function statistics */
975 
976  /* Send WAL statistics */
977  pgstat_send_wal(true);
978 
979  /* Finally send SLRU statistics */
981 }
982 
983 /*
984  * Subroutine for pgstat_report_stat: finish and send a tabstat message
985  */
986 static void
988 {
989  int n;
990  int len;
991 
992  /* It's unlikely we'd get here with no socket, but maybe not impossible */
994  return;
995 
996  /*
997  * Report and reset accumulated xact commit/rollback and I/O timings
998  * whenever we send a normal tabstat message
999  */
1000  if (OidIsValid(tsmsg->m_databaseid))
1001  {
1006 
1008  {
1009  long secs;
1010  int usecs;
1011 
1012  /*
1013  * pgLastSessionReportTime is initialized to MyStartTimestamp by
1014  * pgstat_report_connect().
1015  */
1016  TimestampDifference(pgLastSessionReportTime, now, &secs, &usecs);
1018  tsmsg->m_session_time = (PgStat_Counter) secs * 1000000 + usecs;
1021  }
1022  else
1023  {
1024  tsmsg->m_session_time = 0;
1025  tsmsg->m_active_time = 0;
1026  tsmsg->m_idle_in_xact_time = 0;
1027  }
1028  pgStatXactCommit = 0;
1029  pgStatXactRollback = 0;
1030  pgStatBlockReadTime = 0;
1032  pgStatActiveTime = 0;
1034  }
1035  else
1036  {
1037  tsmsg->m_xact_commit = 0;
1038  tsmsg->m_xact_rollback = 0;
1039  tsmsg->m_block_read_time = 0;
1040  tsmsg->m_block_write_time = 0;
1041  tsmsg->m_session_time = 0;
1042  tsmsg->m_active_time = 0;
1043  tsmsg->m_idle_in_xact_time = 0;
1044  }
1045 
1046  n = tsmsg->m_nentries;
1047  len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
1048  n * sizeof(PgStat_TableEntry);
1049 
1051  pgstat_send(tsmsg, len);
1052 }
1053 
1054 /*
1055  * Subroutine for pgstat_report_stat: populate and send a function stat message
1056  */
1057 static void
1059 {
1060  /* we assume this inits to all zeroes: */
1061  static const PgStat_FunctionCounts all_zeroes;
1062 
1063  PgStat_MsgFuncstat msg;
1066 
1067  if (pgStatFunctions == NULL)
1068  return;
1069 
1071  msg.m_databaseid = MyDatabaseId;
1072  msg.m_nentries = 0;
1073 
1074  hash_seq_init(&fstat, pgStatFunctions);
1075  while ((entry = (PgStat_BackendFunctionEntry *) hash_seq_search(&fstat)) != NULL)
1076  {
1077  PgStat_FunctionEntry *m_ent;
1078 
1079  /* Skip it if no counts accumulated since last time */
1080  if (memcmp(&entry->f_counts, &all_zeroes,
1081  sizeof(PgStat_FunctionCounts)) == 0)
1082  continue;
1083 
1084  /* need to convert format of time accumulators */
1085  m_ent = &msg.m_entry[msg.m_nentries];
1086  m_ent->f_id = entry->f_id;
1087  m_ent->f_numcalls = entry->f_counts.f_numcalls;
1090 
1091  if (++msg.m_nentries >= PGSTAT_NUM_FUNCENTRIES)
1092  {
1093  pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
1094  msg.m_nentries * sizeof(PgStat_FunctionEntry));
1095  msg.m_nentries = 0;
1096  }
1097 
1098  /* reset the entry's counts */
1099  MemSet(&entry->f_counts, 0, sizeof(PgStat_FunctionCounts));
1100  }
1101 
1102  if (msg.m_nentries > 0)
1103  pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
1104  msg.m_nentries * sizeof(PgStat_FunctionEntry));
1105 
1106  have_function_stats = false;
1107 }
1108 
1109 
1110 /* ----------
1111  * pgstat_vacuum_stat() -
1112  *
1113  * Will tell the collector about objects he can get rid of.
1114  * ----------
1115  */
1116 void
1118 {
1119  HTAB *htab;
1120  PgStat_MsgTabpurge msg;
1121  PgStat_MsgFuncpurge f_msg;
1122  HASH_SEQ_STATUS hstat;
1123  PgStat_StatDBEntry *dbentry;
1124  PgStat_StatTabEntry *tabentry;
1125  PgStat_StatFuncEntry *funcentry;
1126  int len;
1127 
1129  return;
1130 
1131  /*
1132  * If not done for this transaction, read the statistics collector stats
1133  * file into some hash tables.
1134  */
1136 
1137  /*
1138  * Read pg_database and make a list of OIDs of all existing databases
1139  */
1140  htab = pgstat_collect_oids(DatabaseRelationId, Anum_pg_database_oid);
1141 
1142  /*
1143  * Search the database hash table for dead databases and tell the
1144  * collector to drop them.
1145  */
1146  hash_seq_init(&hstat, pgStatDBHash);
1147  while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
1148  {
1149  Oid dbid = dbentry->databaseid;
1150 
1152 
1153  /* the DB entry for shared tables (with InvalidOid) is never dropped */
1154  if (OidIsValid(dbid) &&
1155  hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
1156  pgstat_drop_database(dbid);
1157  }
1158 
1159  /* Clean up */
1160  hash_destroy(htab);
1161 
1162  /*
1163  * Search for all the dead replication slots in stats hashtable and tell
1164  * the stats collector to drop them.
1165  */
1166  if (replSlotStatHash)
1167  {
1168  PgStat_StatReplSlotEntry *slotentry;
1169 
1170  hash_seq_init(&hstat, replSlotStatHash);
1171  while ((slotentry = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
1172  {
1174 
1175  if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
1177  }
1178  }
1179 
1180  /*
1181  * Lookup our own database entry; if not found, nothing more to do.
1182  */
1183  dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1184  (void *) &MyDatabaseId,
1185  HASH_FIND, NULL);
1186  if (dbentry == NULL || dbentry->tables == NULL)
1187  return;
1188 
1189  /*
1190  * Similarly to above, make a list of all known relations in this DB.
1191  */
1192  htab = pgstat_collect_oids(RelationRelationId, Anum_pg_class_oid);
1193 
1194  /*
1195  * Initialize our messages table counter to zero
1196  */
1197  msg.m_nentries = 0;
1198 
1199  /*
1200  * Check for all tables listed in stats hashtable if they still exist.
1201  */
1202  hash_seq_init(&hstat, dbentry->tables);
1203  while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
1204  {
1205  Oid tabid = tabentry->tableid;
1206 
1208 
1209  if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
1210  continue;
1211 
1212  /*
1213  * Not there, so add this table's Oid to the message
1214  */
1215  msg.m_tableid[msg.m_nentries++] = tabid;
1216 
1217  /*
1218  * If the message is full, send it out and reinitialize to empty
1219  */
1220  if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
1221  {
1222  len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
1223  + msg.m_nentries * sizeof(Oid);
1224 
1226  msg.m_databaseid = MyDatabaseId;
1227  pgstat_send(&msg, len);
1228 
1229  msg.m_nentries = 0;
1230  }
1231  }
1232 
1233  /*
1234  * Send the rest
1235  */
1236  if (msg.m_nentries > 0)
1237  {
1238  len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
1239  + msg.m_nentries * sizeof(Oid);
1240 
1242  msg.m_databaseid = MyDatabaseId;
1243  pgstat_send(&msg, len);
1244  }
1245 
1246  /* Clean up */
1247  hash_destroy(htab);
1248 
1249  /*
1250  * Now repeat the above steps for functions. However, we needn't bother
1251  * in the common case where no function stats are being collected.
1252  */
1253  if (dbentry->functions != NULL &&
1254  hash_get_num_entries(dbentry->functions) > 0)
1255  {
1256  htab = pgstat_collect_oids(ProcedureRelationId, Anum_pg_proc_oid);
1257 
1259  f_msg.m_databaseid = MyDatabaseId;
1260  f_msg.m_nentries = 0;
1261 
1262  hash_seq_init(&hstat, dbentry->functions);
1263  while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&hstat)) != NULL)
1264  {
1265  Oid funcid = funcentry->functionid;
1266 
1268 
1269  if (hash_search(htab, (void *) &funcid, HASH_FIND, NULL) != NULL)
1270  continue;
1271 
1272  /*
1273  * Not there, so add this function's Oid to the message
1274  */
1275  f_msg.m_functionid[f_msg.m_nentries++] = funcid;
1276 
1277  /*
1278  * If the message is full, send it out and reinitialize to empty
1279  */
1280  if (f_msg.m_nentries >= PGSTAT_NUM_FUNCPURGE)
1281  {
1282  len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
1283  + f_msg.m_nentries * sizeof(Oid);
1284 
1285  pgstat_send(&f_msg, len);
1286 
1287  f_msg.m_nentries = 0;
1288  }
1289  }
1290 
1291  /*
1292  * Send the rest
1293  */
1294  if (f_msg.m_nentries > 0)
1295  {
1296  len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
1297  + f_msg.m_nentries * sizeof(Oid);
1298 
1299  pgstat_send(&f_msg, len);
1300  }
1301 
1302  hash_destroy(htab);
1303  }
1304 }
1305 
1306 
1307 /* ----------
1308  * pgstat_collect_oids() -
1309  *
1310  * Collect the OIDs of all objects listed in the specified system catalog
1311  * into a temporary hash table. Caller should hash_destroy the result
1312  * when done with it. (However, we make the table in CurrentMemoryContext
1313  * so that it will be freed properly in event of an error.)
1314  * ----------
1315  */
1316 static HTAB *
1317 pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid)
1318 {
1319  HTAB *htab;
1320  HASHCTL hash_ctl;
1321  Relation rel;
1322  TableScanDesc scan;
1323  HeapTuple tup;
1324  Snapshot snapshot;
1325 
1326  hash_ctl.keysize = sizeof(Oid);
1327  hash_ctl.entrysize = sizeof(Oid);
1328  hash_ctl.hcxt = CurrentMemoryContext;
1329  htab = hash_create("Temporary table of OIDs",
1331  &hash_ctl,
1333 
1334  rel = table_open(catalogid, AccessShareLock);
1335  snapshot = RegisterSnapshot(GetLatestSnapshot());
1336  scan = table_beginscan(rel, snapshot, 0, NULL);
1337  while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
1338  {
1339  Oid thisoid;
1340  bool isnull;
1341 
1342  thisoid = heap_getattr(tup, anum_oid, RelationGetDescr(rel), &isnull);
1343  Assert(!isnull);
1344 
1346 
1347  (void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL);
1348  }
1349  table_endscan(scan);
1350  UnregisterSnapshot(snapshot);
1352 
1353  return htab;
1354 }
1355 
1356 
1357 /* ----------
1358  * pgstat_drop_database() -
1359  *
1360  * Tell the collector that we just dropped a database.
1361  * (If the message gets lost, we will still clean the dead DB eventually
1362  * via future invocations of pgstat_vacuum_stat().)
1363  * ----------
1364  */
1365 void
1367 {
1368  PgStat_MsgDropdb msg;
1369 
1371  return;
1372 
1374  msg.m_databaseid = databaseid;
1375  pgstat_send(&msg, sizeof(msg));
1376 }
1377 
1378 
1379 /* ----------
1380  * pgstat_drop_relation() -
1381  *
1382  * Tell the collector that we just dropped a relation.
1383  * (If the message gets lost, we will still clean the dead entry eventually
1384  * via future invocations of pgstat_vacuum_stat().)
1385  *
1386  * Currently not used for lack of any good place to call it; we rely
1387  * entirely on pgstat_vacuum_stat() to clean out stats for dead rels.
1388  * ----------
1389  */
1390 #ifdef NOT_USED
1391 void
1392 pgstat_drop_relation(Oid relid)
1393 {
1394  PgStat_MsgTabpurge msg;
1395  int len;
1396 
1398  return;
1399 
1400  msg.m_tableid[0] = relid;
1401  msg.m_nentries = 1;
1402 
1403  len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
1404 
1406  msg.m_databaseid = MyDatabaseId;
1407  pgstat_send(&msg, len);
1408 }
1409 #endif /* NOT_USED */
1410 
1411 /* ----------
1412  * pgstat_reset_counters() -
1413  *
1414  * Tell the statistics collector to reset counters for our database.
1415  *
1416  * Permission checking for this function is managed through the normal
1417  * GRANT system.
1418  * ----------
1419  */
1420 void
1422 {
1424 
1426  return;
1427 
1429  msg.m_databaseid = MyDatabaseId;
1430  pgstat_send(&msg, sizeof(msg));
1431 }
1432 
1433 /* ----------
1434  * pgstat_reset_shared_counters() -
1435  *
1436  * Tell the statistics collector to reset cluster-wide shared counters.
1437  *
1438  * Permission checking for this function is managed through the normal
1439  * GRANT system.
1440  * ----------
1441  */
1442 void
1443 pgstat_reset_shared_counters(const char *target)
1444 {
1446 
1448  return;
1449 
1450  if (strcmp(target, "archiver") == 0)
1452  else if (strcmp(target, "bgwriter") == 0)
1454  else if (strcmp(target, "wal") == 0)
1455  msg.m_resettarget = RESET_WAL;
1456  else
1457  ereport(ERROR,
1458  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1459  errmsg("unrecognized reset target: \"%s\"", target),
1460  errhint("Target must be \"archiver\", \"bgwriter\", or \"wal\".")));
1461 
1463  pgstat_send(&msg, sizeof(msg));
1464 }
1465 
1466 /* ----------
1467  * pgstat_reset_single_counter() -
1468  *
1469  * Tell the statistics collector to reset a single counter.
1470  *
1471  * Permission checking for this function is managed through the normal
1472  * GRANT system.
1473  * ----------
1474  */
1475 void
1477 {
1479 
1481  return;
1482 
1484  msg.m_databaseid = MyDatabaseId;
1485  msg.m_resettype = type;
1486  msg.m_objectid = objoid;
1487 
1488  pgstat_send(&msg, sizeof(msg));
1489 }
1490 
1491 /* ----------
1492  * pgstat_reset_slru_counter() -
1493  *
1494  * Tell the statistics collector to reset a single SLRU counter, or all
1495  * SLRU counters (when name is null).
1496  *
1497  * Permission checking for this function is managed through the normal
1498  * GRANT system.
1499  * ----------
1500  */
1501 void
1503 {
1505 
1507  return;
1508 
1510  msg.m_index = (name) ? pgstat_slru_index(name) : -1;
1511 
1512  pgstat_send(&msg, sizeof(msg));
1513 }
1514 
1515 /* ----------
1516  * pgstat_reset_replslot_counter() -
1517  *
1518  * Tell the statistics collector to reset a single replication slot
1519  * counter, or all replication slots counters (when name is null).
1520  *
1521  * Permission checking for this function is managed through the normal
1522  * GRANT system.
1523  * ----------
1524  */
1525 void
1527 {
1529 
1531  return;
1532 
1533  if (name)
1534  {
1535  namestrcpy(&msg.m_slotname, name);
1536  msg.clearall = false;
1537  }
1538  else
1539  msg.clearall = true;
1540 
1542 
1543  pgstat_send(&msg, sizeof(msg));
1544 }
1545 
1546 /* ----------
1547  * pgstat_report_autovac() -
1548  *
1549  * Called from autovacuum.c to report startup of an autovacuum process.
1550  * We are called before InitPostgres is done, so can't rely on MyDatabaseId;
1551  * the db OID must be passed in, instead.
1552  * ----------
1553  */
1554 void
1556 {
1558 
1560  return;
1561 
1563  msg.m_databaseid = dboid;
1565 
1566  pgstat_send(&msg, sizeof(msg));
1567 }
1568 
1569 
1570 /* ---------
1571  * pgstat_report_vacuum() -
1572  *
1573  * Tell the collector about the table we just vacuumed.
1574  * ---------
1575  */
1576 void
1577 pgstat_report_vacuum(Oid tableoid, bool shared,
1578  PgStat_Counter livetuples, PgStat_Counter deadtuples)
1579 {
1580  PgStat_MsgVacuum msg;
1581 
1583  return;
1584 
1586  msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
1587  msg.m_tableoid = tableoid;
1590  msg.m_live_tuples = livetuples;
1591  msg.m_dead_tuples = deadtuples;
1592  pgstat_send(&msg, sizeof(msg));
1593 }
1594 
1595 /* --------
1596  * pgstat_report_analyze() -
1597  *
1598  * Tell the collector about the table we just analyzed.
1599  *
1600  * Caller must provide new live- and dead-tuples estimates, as well as a
1601  * flag indicating whether to reset the changes_since_analyze counter.
1602  * --------
1603  */
1604 void
1606  PgStat_Counter livetuples, PgStat_Counter deadtuples,
1607  bool resetcounter)
1608 {
1609  PgStat_MsgAnalyze msg;
1610 
1612  return;
1613 
1614  /*
1615  * Unlike VACUUM, ANALYZE might be running inside a transaction that has
1616  * already inserted and/or deleted rows in the target table. ANALYZE will
1617  * have counted such rows as live or dead respectively. Because we will
1618  * report our counts of such rows at transaction end, we should subtract
1619  * off these counts from what we send to the collector now, else they'll
1620  * be double-counted after commit. (This approach also ensures that the
1621  * collector ends up with the right numbers if we abort instead of
1622  * committing.)
1623  *
1624  * Waste no time on partitioned tables, though.
1625  */
1626  if (rel->pgstat_info != NULL &&
1627  rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1628  {
1630 
1631  for (trans = rel->pgstat_info->trans; trans; trans = trans->upper)
1632  {
1633  livetuples -= trans->tuples_inserted - trans->tuples_deleted;
1634  deadtuples -= trans->tuples_updated + trans->tuples_deleted;
1635  }
1636  /* count stuff inserted by already-aborted subxacts, too */
1637  deadtuples -= rel->pgstat_info->t_counts.t_delta_dead_tuples;
1638  /* Since ANALYZE's counts are estimates, we could have underflowed */
1639  livetuples = Max(livetuples, 0);
1640  deadtuples = Max(deadtuples, 0);
1641  }
1642 
1644  msg.m_databaseid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId;
1645  msg.m_tableoid = RelationGetRelid(rel);
1647  msg.m_resetcounter = resetcounter;
1649  msg.m_live_tuples = livetuples;
1650  msg.m_dead_tuples = deadtuples;
1651  pgstat_send(&msg, sizeof(msg));
1652 }
1653 
1654 /* --------
1655  * pgstat_report_recovery_conflict() -
1656  *
1657  * Tell the collector about a Hot Standby recovery conflict.
1658  * --------
1659  */
1660 void
1662 {
1664 
1666  return;
1667 
1669  msg.m_databaseid = MyDatabaseId;
1670  msg.m_reason = reason;
1671  pgstat_send(&msg, sizeof(msg));
1672 }
1673 
1674 /* --------
1675  * pgstat_report_deadlock() -
1676  *
1677  * Tell the collector about a deadlock detected.
1678  * --------
1679  */
1680 void
1682 {
1683  PgStat_MsgDeadlock msg;
1684 
1686  return;
1687 
1689  msg.m_databaseid = MyDatabaseId;
1690  pgstat_send(&msg, sizeof(msg));
1691 }
1692 
1693 
1694 
1695 /* --------
1696  * pgstat_report_checksum_failures_in_db() -
1697  *
1698  * Tell the collector about one or more checksum failures.
1699  * --------
1700  */
1701 void
1703 {
1705 
1707  return;
1708 
1710  msg.m_databaseid = dboid;
1711  msg.m_failurecount = failurecount;
1713 
1714  pgstat_send(&msg, sizeof(msg));
1715 }
1716 
1717 /* --------
1718  * pgstat_report_checksum_failure() -
1719  *
1720  * Tell the collector about a checksum failure.
1721  * --------
1722  */
1723 void
1725 {
1727 }
1728 
1729 /* --------
1730  * pgstat_report_tempfile() -
1731  *
1732  * Tell the collector about a temporary file.
1733  * --------
1734  */
1735 void
1736 pgstat_report_tempfile(size_t filesize)
1737 {
1738  PgStat_MsgTempFile msg;
1739 
1741  return;
1742 
1744  msg.m_databaseid = MyDatabaseId;
1745  msg.m_filesize = filesize;
1746  pgstat_send(&msg, sizeof(msg));
1747 }
1748 
1749 /* --------
1750  * pgstat_report_connect() -
1751  *
1752  * Tell the collector about a new connection.
1753  * --------
1754  */
1755 void
1757 {
1758  PgStat_MsgConnect msg;
1759 
1761  return;
1762 
1764 
1766  msg.m_databaseid = MyDatabaseId;
1767  pgstat_send(&msg, sizeof(PgStat_MsgConnect));
1768 }
1769 
1770 /* --------
1771  * pgstat_report_disconnect() -
1772  *
1773  * Tell the collector about a disconnect.
1774  * --------
1775  */
1776 static void
1778 {
1780 
1782  return;
1783 
1785  msg.m_databaseid = MyDatabaseId;
1787  pgstat_send(&msg, sizeof(PgStat_MsgDisconnect));
1788 }
1789 
1790 /* --------
1791  * pgstat_should_report_connstats() -
1792  *
1793  * We report session statistics only for normal backend processes. Parallel
1794  * workers run in parallel, so they don't contribute to session times, even
1795  * though they use CPU time. Walsender processes could be considered here,
1796  * but they have different session characteristics from normal backends (for
1797  * example, they are always "active"), so they would skew session statistics.
1798  * ----------
1799  */
1800 static bool
1802 {
1803  return MyBackendType == B_BACKEND;
1804 }
1805 
1806 /* ----------
1807  * pgstat_report_replslot() -
1808  *
1809  * Tell the collector about replication slot statistics.
1810  * ----------
1811  */
1812 void
1814 {
1815  PgStat_MsgReplSlot msg;
1816 
1817  /*
1818  * Prepare and send the message
1819  */
1821  namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname));
1822  msg.m_create = false;
1823  msg.m_drop = false;
1824  msg.m_spill_txns = repSlotStat->spill_txns;
1825  msg.m_spill_count = repSlotStat->spill_count;
1826  msg.m_spill_bytes = repSlotStat->spill_bytes;
1827  msg.m_stream_txns = repSlotStat->stream_txns;
1828  msg.m_stream_count = repSlotStat->stream_count;
1829  msg.m_stream_bytes = repSlotStat->stream_bytes;
1830  msg.m_total_txns = repSlotStat->total_txns;
1831  msg.m_total_bytes = repSlotStat->total_bytes;
1832  pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
1833 }
1834 
1835 /* ----------
1836  * pgstat_report_replslot_create() -
1837  *
1838  * Tell the collector about creating the replication slot.
1839  * ----------
1840  */
1841 void
1842 pgstat_report_replslot_create(const char *slotname)
1843 {
1844  PgStat_MsgReplSlot msg;
1845 
1847  namestrcpy(&msg.m_slotname, slotname);
1848  msg.m_create = true;
1849  msg.m_drop = false;
1850  pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
1851 }
1852 
1853 /* ----------
1854  * pgstat_report_replslot_drop() -
1855  *
1856  * Tell the collector about dropping the replication slot.
1857  * ----------
1858  */
1859 void
1860 pgstat_report_replslot_drop(const char *slotname)
1861 {
1862  PgStat_MsgReplSlot msg;
1863 
1865  namestrcpy(&msg.m_slotname, slotname);
1866  msg.m_create = false;
1867  msg.m_drop = true;
1868  pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
1869 }
1870 
1871 /* ----------
1872  * pgstat_ping() -
1873  *
1874  * Send some junk data to the collector to increase traffic.
1875  * ----------
1876  */
1877 void
1879 {
1880  PgStat_MsgDummy msg;
1881 
1883  return;
1884 
1886  pgstat_send(&msg, sizeof(msg));
1887 }
1888 
1889 /* ----------
1890  * pgstat_send_inquiry() -
1891  *
1892  * Notify collector that we need fresh data.
1893  * ----------
1894  */
1895 static void
1896 pgstat_send_inquiry(TimestampTz clock_time, TimestampTz cutoff_time, Oid databaseid)
1897 {
1898  PgStat_MsgInquiry msg;
1899 
1901  msg.clock_time = clock_time;
1902  msg.cutoff_time = cutoff_time;
1903  msg.databaseid = databaseid;
1904  pgstat_send(&msg, sizeof(msg));
1905 }
1906 
1907 
1908 /*
1909  * Initialize function call usage data.
1910  * Called by the executor before invoking a function.
1911  */
1912 void
1915 {
1916  PgStat_BackendFunctionEntry *htabent;
1917  bool found;
1918 
1919  if (pgstat_track_functions <= fcinfo->flinfo->fn_stats)
1920  {
1921  /* stats not wanted */
1922  fcu->fs = NULL;
1923  return;
1924  }
1925 
1926  if (!pgStatFunctions)
1927  {
1928  /* First time through - initialize function stat table */
1929  HASHCTL hash_ctl;
1930 
1931  hash_ctl.keysize = sizeof(Oid);
1932  hash_ctl.entrysize = sizeof(PgStat_BackendFunctionEntry);
1933  pgStatFunctions = hash_create("Function stat entries",
1935  &hash_ctl,
1936  HASH_ELEM | HASH_BLOBS);
1937  }
1938 
1939  /* Get the stats entry for this function, create if necessary */
1940  htabent = hash_search(pgStatFunctions, &fcinfo->flinfo->fn_oid,
1941  HASH_ENTER, &found);
1942  if (!found)
1943  MemSet(&htabent->f_counts, 0, sizeof(PgStat_FunctionCounts));
1944 
1945  fcu->fs = &htabent->f_counts;
1946 
1947  /* save stats for this function, later used to compensate for recursion */
1948  fcu->save_f_total_time = htabent->f_counts.f_total_time;
1949 
1950  /* save current backend-wide total time */
1951  fcu->save_total = total_func_time;
1952 
1953  /* get clock time as of function start */
1955 }
1956 
1957 /*
1958  * find_funcstat_entry - find any existing PgStat_BackendFunctionEntry entry
1959  * for specified function
1960  *
1961  * If no entry, return NULL, don't create a new one
1962  */
1965 {
1967 
1968  if (pgStatFunctions == NULL)
1969  return NULL;
1970 
1971  return (PgStat_BackendFunctionEntry *) hash_search(pgStatFunctions,
1972  (void *) &func_id,
1973  HASH_FIND, NULL);
1974 }
1975 
1976 /*
1977  * Calculate function call usage and update stat counters.
1978  * Called by the executor after invoking a function.
1979  *
1980  * In the case of a set-returning function that runs in value-per-call mode,
1981  * we will see multiple pgstat_init_function_usage/pgstat_end_function_usage
1982  * calls for what the user considers a single call of the function. The
1983  * finalize flag should be TRUE on the last call.
1984  */
1985 void
1987 {
1988  PgStat_FunctionCounts *fs = fcu->fs;
1989  instr_time f_total;
1990  instr_time f_others;
1991  instr_time f_self;
1992 
1993  /* stats not wanted? */
1994  if (fs == NULL)
1995  return;
1996 
1997  /* total elapsed time in this function call */
1998  INSTR_TIME_SET_CURRENT(f_total);
1999  INSTR_TIME_SUBTRACT(f_total, fcu->f_start);
2000 
2001  /* self usage: elapsed minus anything already charged to other calls */
2002  f_others = total_func_time;
2003  INSTR_TIME_SUBTRACT(f_others, fcu->save_total);
2004  f_self = f_total;
2005  INSTR_TIME_SUBTRACT(f_self, f_others);
2006 
2007  /* update backend-wide total time */
2009 
2010  /*
2011  * Compute the new f_total_time as the total elapsed time added to the
2012  * pre-call value of f_total_time. This is necessary to avoid
2013  * double-counting any time taken by recursive calls of myself. (We do
2014  * not need any similar kluge for self time, since that already excludes
2015  * any recursive calls.)
2016  */
2017  INSTR_TIME_ADD(f_total, fcu->save_f_total_time);
2018 
2019  /* update counters in function stats table */
2020  if (finalize)
2021  fs->f_numcalls++;
2022  fs->f_total_time = f_total;
2023  INSTR_TIME_ADD(fs->f_self_time, f_self);
2024 
2025  /* indicate that we have something to send */
2026  have_function_stats = true;
2027 }
2028 
2029 
2030 /* ----------
2031  * pgstat_initstats() -
2032  *
2033  * Initialize a relcache entry to count access statistics.
2034  * Called whenever a relation is opened.
2035  *
2036  * We assume that a relcache entry's pgstat_info field is zeroed by
2037  * relcache.c when the relcache entry is made; thereafter it is long-lived
2038  * data. We can avoid repeated searches of the TabStatus arrays when the
2039  * same relation is touched repeatedly within a transaction.
2040  * ----------
2041  */
2042 void
2044 {
2045  Oid rel_id = rel->rd_id;
2046  char relkind = rel->rd_rel->relkind;
2047 
2048  /*
2049  * We only count stats for relations with storage and partitioned tables
2050  */
2051  if (!RELKIND_HAS_STORAGE(relkind) && relkind != RELKIND_PARTITIONED_TABLE)
2052  {
2053  rel->pgstat_info = NULL;
2054  return;
2055  }
2056 
2058  {
2059  /* We're not counting at all */
2060  rel->pgstat_info = NULL;
2061  return;
2062  }
2063 
2064  /*
2065  * If we already set up this relation in the current transaction, nothing
2066  * to do.
2067  */
2068  if (rel->pgstat_info != NULL &&
2069  rel->pgstat_info->t_id == rel_id)
2070  return;
2071 
2072  /* Else find or make the PgStat_TableStatus entry, and update link */
2073  rel->pgstat_info = get_tabstat_entry(rel_id, rel->rd_rel->relisshared);
2074 }
2075 
2076 /*
2077  * get_tabstat_entry - find or create a PgStat_TableStatus entry for rel
2078  */
2079 static PgStat_TableStatus *
2080 get_tabstat_entry(Oid rel_id, bool isshared)
2081 {
2082  TabStatHashEntry *hash_entry;
2083  PgStat_TableStatus *entry;
2084  TabStatusArray *tsa;
2085  bool found;
2086 
2088 
2089  /*
2090  * Create hash table if we don't have it already.
2091  */
2092  if (pgStatTabHash == NULL)
2093  {
2094  HASHCTL ctl;
2095 
2096  ctl.keysize = sizeof(Oid);
2097  ctl.entrysize = sizeof(TabStatHashEntry);
2098 
2099  pgStatTabHash = hash_create("pgstat TabStatusArray lookup hash table",
2101  &ctl,
2102  HASH_ELEM | HASH_BLOBS);
2103  }
2104 
2105  /*
2106  * Find an entry or create a new one.
2107  */
2108  hash_entry = hash_search(pgStatTabHash, &rel_id, HASH_ENTER, &found);
2109  if (!found)
2110  {
2111  /* initialize new entry with null pointer */
2112  hash_entry->tsa_entry = NULL;
2113  }
2114 
2115  /*
2116  * If entry is already valid, we're done.
2117  */
2118  if (hash_entry->tsa_entry)
2119  return hash_entry->tsa_entry;
2120 
2121  /*
2122  * Locate the first pgStatTabList entry with free space, making a new list
2123  * entry if needed. Note that we could get an OOM failure here, but if so
2124  * we have left the hashtable and the list in a consistent state.
2125  */
2126  if (pgStatTabList == NULL)
2127  {
2128  /* Set up first pgStatTabList entry */
2129  pgStatTabList = (TabStatusArray *)
2131  sizeof(TabStatusArray));
2132  }
2133 
2134  tsa = pgStatTabList;
2135  while (tsa->tsa_used >= TABSTAT_QUANTUM)
2136  {
2137  if (tsa->tsa_next == NULL)
2138  tsa->tsa_next = (TabStatusArray *)
2140  sizeof(TabStatusArray));
2141  tsa = tsa->tsa_next;
2142  }
2143 
2144  /*
2145  * Allocate a PgStat_TableStatus entry within this list entry. We assume
2146  * the entry was already zeroed, either at creation or after last use.
2147  */
2148  entry = &tsa->tsa_entries[tsa->tsa_used++];
2149  entry->t_id = rel_id;
2150  entry->t_shared = isshared;
2151 
2152  /*
2153  * Now we can fill the entry in pgStatTabHash.
2154  */
2155  hash_entry->tsa_entry = entry;
2156 
2157  return entry;
2158 }
2159 
2160 /*
2161  * find_tabstat_entry - find any existing PgStat_TableStatus entry for rel
2162  *
2163  * If no entry, return NULL, don't create a new one
2164  *
2165  * Note: if we got an error in the most recent execution of pgstat_report_stat,
2166  * it's possible that an entry exists but there's no hashtable entry for it.
2167  * That's okay, we'll treat this case as "doesn't exist".
2168  */
2171 {
2172  TabStatHashEntry *hash_entry;
2173 
2174  /* If hashtable doesn't exist, there are no entries at all */
2175  if (!pgStatTabHash)
2176  return NULL;
2177 
2178  hash_entry = hash_search(pgStatTabHash, &rel_id, HASH_FIND, NULL);
2179  if (!hash_entry)
2180  return NULL;
2181 
2182  /* Note that this step could also return NULL, but that's correct */
2183  return hash_entry->tsa_entry;
2184 }
2185 
2186 /*
2187  * get_tabstat_stack_level - add a new (sub)transaction stack entry if needed
2188  */
2189 static PgStat_SubXactStatus *
2191 {
2192  PgStat_SubXactStatus *xact_state;
2193 
2194  xact_state = pgStatXactStack;
2195  if (xact_state == NULL || xact_state->nest_level != nest_level)
2196  {
2197  xact_state = (PgStat_SubXactStatus *)
2199  sizeof(PgStat_SubXactStatus));
2200  xact_state->nest_level = nest_level;
2201  xact_state->prev = pgStatXactStack;
2202  xact_state->first = NULL;
2203  pgStatXactStack = xact_state;
2204  }
2205  return xact_state;
2206 }
2207 
2208 /*
2209  * add_tabstat_xact_level - add a new (sub)transaction state record
2210  */
2211 static void
2212 add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
2213 {
2214  PgStat_SubXactStatus *xact_state;
2216 
2217  /*
2218  * If this is the first rel to be modified at the current nest level, we
2219  * first have to push a transaction stack entry.
2220  */
2221  xact_state = get_tabstat_stack_level(nest_level);
2222 
2223  /* Now make a per-table stack entry */
2224  trans = (PgStat_TableXactStatus *)
2226  sizeof(PgStat_TableXactStatus));
2227  trans->nest_level = nest_level;
2228  trans->upper = pgstat_info->trans;
2229  trans->parent = pgstat_info;
2230  trans->next = xact_state->first;
2231  xact_state->first = trans;
2232  pgstat_info->trans = trans;
2233 }
2234 
2235 /*
2236  * pgstat_count_heap_insert - count a tuple insertion of n tuples
2237  */
2238 void
2240 {
2241  PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2242 
2243  if (pgstat_info != NULL)
2244  {
2245  /* We have to log the effect at the proper transactional level */
2246  int nest_level = GetCurrentTransactionNestLevel();
2247 
2248  if (pgstat_info->trans == NULL ||
2249  pgstat_info->trans->nest_level != nest_level)
2250  add_tabstat_xact_level(pgstat_info, nest_level);
2251 
2252  pgstat_info->trans->tuples_inserted += n;
2253  }
2254 }
2255 
2256 /*
2257  * pgstat_count_heap_update - count a tuple update
2258  */
2259 void
2261 {
2262  PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2263 
2264  if (pgstat_info != NULL)
2265  {
2266  /* We have to log the effect at the proper transactional level */
2267  int nest_level = GetCurrentTransactionNestLevel();
2268 
2269  if (pgstat_info->trans == NULL ||
2270  pgstat_info->trans->nest_level != nest_level)
2271  add_tabstat_xact_level(pgstat_info, nest_level);
2272 
2273  pgstat_info->trans->tuples_updated++;
2274 
2275  /* t_tuples_hot_updated is nontransactional, so just advance it */
2276  if (hot)
2277  pgstat_info->t_counts.t_tuples_hot_updated++;
2278  }
2279 }
2280 
2281 /*
2282  * pgstat_count_heap_delete - count a tuple deletion
2283  */
2284 void
2286 {
2287  PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2288 
2289  if (pgstat_info != NULL)
2290  {
2291  /* We have to log the effect at the proper transactional level */
2292  int nest_level = GetCurrentTransactionNestLevel();
2293 
2294  if (pgstat_info->trans == NULL ||
2295  pgstat_info->trans->nest_level != nest_level)
2296  add_tabstat_xact_level(pgstat_info, nest_level);
2297 
2298  pgstat_info->trans->tuples_deleted++;
2299  }
2300 }
2301 
2302 /*
2303  * pgstat_truncate_save_counters
2304  *
2305  * Whenever a table is truncated, we save its i/u/d counters so that they can
2306  * be cleared, and if the (sub)xact that executed the truncate later aborts,
2307  * the counters can be restored to the saved (pre-truncate) values. Note we do
2308  * this on the first truncate in any particular subxact level only.
2309  */
2310 static void
2312 {
2313  if (!trans->truncated)
2314  {
2315  trans->inserted_pre_trunc = trans->tuples_inserted;
2316  trans->updated_pre_trunc = trans->tuples_updated;
2317  trans->deleted_pre_trunc = trans->tuples_deleted;
2318  trans->truncated = true;
2319  }
2320 }
2321 
2322 /*
2323  * pgstat_truncate_restore_counters - restore counters when a truncate aborts
2324  */
2325 static void
2327 {
2328  if (trans->truncated)
2329  {
2330  trans->tuples_inserted = trans->inserted_pre_trunc;
2331  trans->tuples_updated = trans->updated_pre_trunc;
2332  trans->tuples_deleted = trans->deleted_pre_trunc;
2333  }
2334 }
2335 
2336 /*
2337  * pgstat_count_truncate - update tuple counters due to truncate
2338  */
2339 void
2341 {
2342  PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2343 
2344  if (pgstat_info != NULL)
2345  {
2346  /* We have to log the effect at the proper transactional level */
2347  int nest_level = GetCurrentTransactionNestLevel();
2348 
2349  if (pgstat_info->trans == NULL ||
2350  pgstat_info->trans->nest_level != nest_level)
2351  add_tabstat_xact_level(pgstat_info, nest_level);
2352 
2353  pgstat_truncate_save_counters(pgstat_info->trans);
2354  pgstat_info->trans->tuples_inserted = 0;
2355  pgstat_info->trans->tuples_updated = 0;
2356  pgstat_info->trans->tuples_deleted = 0;
2357  }
2358 }
2359 
2360 /*
2361  * pgstat_update_heap_dead_tuples - update dead-tuples count
2362  *
2363  * The semantics of this are that we are reporting the nontransactional
2364  * recovery of "delta" dead tuples; so t_delta_dead_tuples decreases
2365  * rather than increasing, and the change goes straight into the per-table
2366  * counter, not into transactional state.
2367  */
2368 void
2370 {
2371  PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2372 
2373  if (pgstat_info != NULL)
2374  pgstat_info->t_counts.t_delta_dead_tuples -= delta;
2375 }
2376 
2377 
2378 /* ----------
2379  * AtEOXact_PgStat
2380  *
2381  * Called from access/transam/xact.c at top-level transaction commit/abort.
2382  * ----------
2383  */
2384 void
2385 AtEOXact_PgStat(bool isCommit, bool parallel)
2386 {
2387  PgStat_SubXactStatus *xact_state;
2388 
2389  /* Don't count parallel worker transaction stats */
2390  if (!parallel)
2391  {
2392  /*
2393  * Count transaction commit or abort. (We use counters, not just
2394  * bools, in case the reporting message isn't sent right away.)
2395  */
2396  if (isCommit)
2397  pgStatXactCommit++;
2398  else
2400  }
2401 
2402  /*
2403  * Transfer transactional insert/update counts into the base tabstat
2404  * entries. We don't bother to free any of the transactional state, since
2405  * it's all in TopTransactionContext and will go away anyway.
2406  */
2407  xact_state = pgStatXactStack;
2408  if (xact_state != NULL)
2409  {
2411 
2412  Assert(xact_state->nest_level == 1);
2413  Assert(xact_state->prev == NULL);
2414  for (trans = xact_state->first; trans != NULL; trans = trans->next)
2415  {
2416  PgStat_TableStatus *tabstat;
2417 
2418  Assert(trans->nest_level == 1);
2419  Assert(trans->upper == NULL);
2420  tabstat = trans->parent;
2421  Assert(tabstat->trans == trans);
2422  /* restore pre-truncate stats (if any) in case of aborted xact */
2423  if (!isCommit)
2425  /* count attempted actions regardless of commit/abort */
2426  tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
2427  tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
2428  tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
2429  if (isCommit)
2430  {
2431  tabstat->t_counts.t_truncated = trans->truncated;
2432  if (trans->truncated)
2433  {
2434  /* forget live/dead stats seen by backend thus far */
2435  tabstat->t_counts.t_delta_live_tuples = 0;
2436  tabstat->t_counts.t_delta_dead_tuples = 0;
2437  }
2438  /* insert adds a live tuple, delete removes one */
2439  tabstat->t_counts.t_delta_live_tuples +=
2440  trans->tuples_inserted - trans->tuples_deleted;
2441  /* update and delete each create a dead tuple */
2442  tabstat->t_counts.t_delta_dead_tuples +=
2443  trans->tuples_updated + trans->tuples_deleted;
2444  /* insert, update, delete each count as one change event */
2445  tabstat->t_counts.t_changed_tuples +=
2446  trans->tuples_inserted + trans->tuples_updated +
2447  trans->tuples_deleted;
2448  }
2449  else
2450  {
2451  /* inserted tuples are dead, deleted tuples are unaffected */
2452  tabstat->t_counts.t_delta_dead_tuples +=
2453  trans->tuples_inserted + trans->tuples_updated;
2454  /* an aborted xact generates no changed_tuple events */
2455  }
2456  tabstat->trans = NULL;
2457  }
2458  }
2459  pgStatXactStack = NULL;
2460 
2461  /* Make sure any stats snapshot is thrown away */
2463 }
2464 
2465 /* ----------
2466  * AtEOSubXact_PgStat
2467  *
2468  * Called from access/transam/xact.c at subtransaction commit/abort.
2469  * ----------
2470  */
2471 void
2472 AtEOSubXact_PgStat(bool isCommit, int nestDepth)
2473 {
2474  PgStat_SubXactStatus *xact_state;
2475 
2476  /*
2477  * Transfer transactional insert/update counts into the next higher
2478  * subtransaction state.
2479  */
2480  xact_state = pgStatXactStack;
2481  if (xact_state != NULL &&
2482  xact_state->nest_level >= nestDepth)
2483  {
2485  PgStat_TableXactStatus *next_trans;
2486 
2487  /* delink xact_state from stack immediately to simplify reuse case */
2488  pgStatXactStack = xact_state->prev;
2489 
2490  for (trans = xact_state->first; trans != NULL; trans = next_trans)
2491  {
2492  PgStat_TableStatus *tabstat;
2493 
2494  next_trans = trans->next;
2495  Assert(trans->nest_level == nestDepth);
2496  tabstat = trans->parent;
2497  Assert(tabstat->trans == trans);
2498  if (isCommit)
2499  {
2500  if (trans->upper && trans->upper->nest_level == nestDepth - 1)
2501  {
2502  if (trans->truncated)
2503  {
2504  /* propagate the truncate status one level up */
2506  /* replace upper xact stats with ours */
2507  trans->upper->tuples_inserted = trans->tuples_inserted;
2508  trans->upper->tuples_updated = trans->tuples_updated;
2509  trans->upper->tuples_deleted = trans->tuples_deleted;
2510  }
2511  else
2512  {
2513  trans->upper->tuples_inserted += trans->tuples_inserted;
2514  trans->upper->tuples_updated += trans->tuples_updated;
2515  trans->upper->tuples_deleted += trans->tuples_deleted;
2516  }
2517  tabstat->trans = trans->upper;
2518  pfree(trans);
2519  }
2520  else
2521  {
2522  /*
2523  * When there isn't an immediate parent state, we can just
2524  * reuse the record instead of going through a
2525  * palloc/pfree pushup (this works since it's all in
2526  * TopTransactionContext anyway). We have to re-link it
2527  * into the parent level, though, and that might mean
2528  * pushing a new entry into the pgStatXactStack.
2529  */
2530  PgStat_SubXactStatus *upper_xact_state;
2531 
2532  upper_xact_state = get_tabstat_stack_level(nestDepth - 1);
2533  trans->next = upper_xact_state->first;
2534  upper_xact_state->first = trans;
2535  trans->nest_level = nestDepth - 1;
2536  }
2537  }
2538  else
2539  {
2540  /*
2541  * On abort, update top-level tabstat counts, then forget the
2542  * subtransaction
2543  */
2544 
2545  /* first restore values obliterated by truncate */
2547  /* count attempted actions regardless of commit/abort */
2548  tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
2549  tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
2550  tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
2551  /* inserted tuples are dead, deleted tuples are unaffected */
2552  tabstat->t_counts.t_delta_dead_tuples +=
2553  trans->tuples_inserted + trans->tuples_updated;
2554  tabstat->trans = trans->upper;
2555  pfree(trans);
2556  }
2557  }
2558  pfree(xact_state);
2559  }
2560 }
2561 
2562 
2563 /*
2564  * AtPrepare_PgStat
2565  * Save the transactional stats state at 2PC transaction prepare.
2566  *
2567  * In this phase we just generate 2PC records for all the pending
2568  * transaction-dependent stats work.
2569  */
2570 void
2572 {
2573  PgStat_SubXactStatus *xact_state;
2574 
2575  xact_state = pgStatXactStack;
2576  if (xact_state != NULL)
2577  {
2579 
2580  Assert(xact_state->nest_level == 1);
2581  Assert(xact_state->prev == NULL);
2582  for (trans = xact_state->first; trans != NULL; trans = trans->next)
2583  {
2584  PgStat_TableStatus *tabstat;
2585  TwoPhasePgStatRecord record;
2586 
2587  Assert(trans->nest_level == 1);
2588  Assert(trans->upper == NULL);
2589  tabstat = trans->parent;
2590  Assert(tabstat->trans == trans);
2591 
2592  record.tuples_inserted = trans->tuples_inserted;
2593  record.tuples_updated = trans->tuples_updated;
2594  record.tuples_deleted = trans->tuples_deleted;
2595  record.inserted_pre_trunc = trans->inserted_pre_trunc;
2596  record.updated_pre_trunc = trans->updated_pre_trunc;
2597  record.deleted_pre_trunc = trans->deleted_pre_trunc;
2598  record.t_id = tabstat->t_id;
2599  record.t_shared = tabstat->t_shared;
2600  record.t_truncated = trans->truncated;
2601 
2603  &record, sizeof(TwoPhasePgStatRecord));
2604  }
2605  }
2606 }
2607 
2608 /*
2609  * PostPrepare_PgStat
2610  * Clean up after successful PREPARE.
2611  *
2612  * All we need do here is unlink the transaction stats state from the
2613  * nontransactional state. The nontransactional action counts will be
2614  * reported to the stats collector immediately, while the effects on live
2615  * and dead tuple counts are preserved in the 2PC state file.
2616  *
2617  * Note: AtEOXact_PgStat is not called during PREPARE.
2618  */
2619 void
2621 {
2622  PgStat_SubXactStatus *xact_state;
2623 
2624  /*
2625  * We don't bother to free any of the transactional state, since it's all
2626  * in TopTransactionContext and will go away anyway.
2627  */
2628  xact_state = pgStatXactStack;
2629  if (xact_state != NULL)
2630  {
2632 
2633  for (trans = xact_state->first; trans != NULL; trans = trans->next)
2634  {
2635  PgStat_TableStatus *tabstat;
2636 
2637  tabstat = trans->parent;
2638  tabstat->trans = NULL;
2639  }
2640  }
2641  pgStatXactStack = NULL;
2642 
2643  /* Make sure any stats snapshot is thrown away */
2645 }
2646 
2647 /*
2648  * 2PC processing routine for COMMIT PREPARED case.
2649  *
2650  * Load the saved counts into our local pgstats state.
2651  */
2652 void
2654  void *recdata, uint32 len)
2655 {
2656  TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
2657  PgStat_TableStatus *pgstat_info;
2658 
2659  /* Find or create a tabstat entry for the rel */
2660  pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
2661 
2662  /* Same math as in AtEOXact_PgStat, commit case */
2663  pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
2664  pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
2665  pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
2666  pgstat_info->t_counts.t_truncated = rec->t_truncated;
2667  if (rec->t_truncated)
2668  {
2669  /* forget live/dead stats seen by backend thus far */
2670  pgstat_info->t_counts.t_delta_live_tuples = 0;
2671  pgstat_info->t_counts.t_delta_dead_tuples = 0;
2672  }
2673  pgstat_info->t_counts.t_delta_live_tuples +=
2674  rec->tuples_inserted - rec->tuples_deleted;
2675  pgstat_info->t_counts.t_delta_dead_tuples +=
2676  rec->tuples_updated + rec->tuples_deleted;
2677  pgstat_info->t_counts.t_changed_tuples +=
2678  rec->tuples_inserted + rec->tuples_updated +
2679  rec->tuples_deleted;
2680 }
2681 
2682 /*
2683  * 2PC processing routine for ROLLBACK PREPARED case.
2684  *
2685  * Load the saved counts into our local pgstats state, but treat them
2686  * as aborted.
2687  */
2688 void
2690  void *recdata, uint32 len)
2691 {
2692  TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
2693  PgStat_TableStatus *pgstat_info;
2694 
2695  /* Find or create a tabstat entry for the rel */
2696  pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
2697 
2698  /* Same math as in AtEOXact_PgStat, abort case */
2699  if (rec->t_truncated)
2700  {
2701  rec->tuples_inserted = rec->inserted_pre_trunc;
2702  rec->tuples_updated = rec->updated_pre_trunc;
2703  rec->tuples_deleted = rec->deleted_pre_trunc;
2704  }
2705  pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
2706  pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
2707  pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
2708  pgstat_info->t_counts.t_delta_dead_tuples +=
2709  rec->tuples_inserted + rec->tuples_updated;
2710 }
2711 
2712 
2713 /* ----------
2714  * pgstat_fetch_stat_dbentry() -
2715  *
2716  * Support function for the SQL-callable pgstat* functions. Returns
2717  * the collected statistics for one database or NULL. NULL doesn't mean
2718  * that the database doesn't exist, it is just not yet known by the
2719  * collector, so the caller is better off to report ZERO instead.
2720  * ----------
2721  */
2724 {
2725  /*
2726  * If not done for this transaction, read the statistics collector stats
2727  * file into some hash tables.
2728  */
2730 
2731  /*
2732  * Lookup the requested database; return NULL if not found
2733  */
2734  return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2735  (void *) &dbid,
2736  HASH_FIND, NULL);
2737 }
2738 
2739 
2740 /* ----------
2741  * pgstat_fetch_stat_tabentry() -
2742  *
2743  * Support function for the SQL-callable pgstat* functions. Returns
2744  * the collected statistics for one table or NULL. NULL doesn't mean
2745  * that the table doesn't exist, it is just not yet known by the
2746  * collector, so the caller is better off to report ZERO instead.
2747  * ----------
2748  */
2751 {
2752  Oid dbid;
2753  PgStat_StatDBEntry *dbentry;
2754  PgStat_StatTabEntry *tabentry;
2755 
2756  /*
2757  * If not done for this transaction, read the statistics collector stats
2758  * file into some hash tables.
2759  */
2761 
2762  /*
2763  * Lookup our database, then look in its table hash table.
2764  */
2765  dbid = MyDatabaseId;
2766  dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2767  (void *) &dbid,
2768  HASH_FIND, NULL);
2769  if (dbentry != NULL && dbentry->tables != NULL)
2770  {
2771  tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2772  (void *) &relid,
2773  HASH_FIND, NULL);
2774  if (tabentry)
2775  return tabentry;
2776  }
2777 
2778  /*
2779  * If we didn't find it, maybe it's a shared table.
2780  */
2781  dbid = InvalidOid;
2782  dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2783  (void *) &dbid,
2784  HASH_FIND, NULL);
2785  if (dbentry != NULL && dbentry->tables != NULL)
2786  {
2787  tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2788  (void *) &relid,
2789  HASH_FIND, NULL);
2790  if (tabentry)
2791  return tabentry;
2792  }
2793 
2794  return NULL;
2795 }
2796 
2797 
2798 /* ----------
2799  * pgstat_fetch_stat_funcentry() -
2800  *
2801  * Support function for the SQL-callable pgstat* functions. Returns
2802  * the collected statistics for one function or NULL.
2803  * ----------
2804  */
2807 {
2808  PgStat_StatDBEntry *dbentry;
2809  PgStat_StatFuncEntry *funcentry = NULL;
2810 
2811  /* load the stats file if needed */
2813 
2814  /* Lookup our database, then find the requested function. */
2816  if (dbentry != NULL && dbentry->functions != NULL)
2817  {
2818  funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
2819  (void *) &func_id,
2820  HASH_FIND, NULL);
2821  }
2822 
2823  return funcentry;
2824 }
2825 
2826 
2827 /*
2828  * ---------
2829  * pgstat_fetch_stat_archiver() -
2830  *
2831  * Support function for the SQL-callable pgstat* functions. Returns
2832  * a pointer to the archiver statistics struct.
2833  * ---------
2834  */
2837 {
2839 
2840  return &archiverStats;
2841 }
2842 
2843 /*
2844  * ---------
2845  * pgstat_fetch_stat_bgwriter() -
2846  *
2847  * Support function for the SQL-callable pgstat* functions. Returns
2848  * a pointer to the bgwriter statistics struct.
2849  * ---------
2850  */
2853 {
2855 
2856  return &globalStats.bgwriter;
2857 }
2858 
2859 /*
2860  * ---------
2861  * pgstat_fetch_stat_checkpointer() -
2862  *
2863  * Support function for the SQL-callable pgstat* functions. Returns
2864  * a pointer to the checkpointer statistics struct.
2865  * ---------
2866  */
2869 {
2871 
2872  return &globalStats.checkpointer;
2873 }
2874 
2875 /*
2876  * ---------
2877  * pgstat_fetch_global() -
2878  *
2879  * Support function for the SQL-callable pgstat* functions. Returns
2880  * a pointer to the global statistics struct.
2881  * ---------
2882  */
2885 {
2887 
2888  return &globalStats;
2889 }
2890 
2891 /*
2892  * ---------
2893  * pgstat_fetch_stat_wal() -
2894  *
2895  * Support function for the SQL-callable pgstat* functions. Returns
2896  * a pointer to the WAL statistics struct.
2897  * ---------
2898  */
2901 {
2903 
2904  return &walStats;
2905 }
2906 
2907 /*
2908  * ---------
2909  * pgstat_fetch_slru() -
2910  *
2911  * Support function for the SQL-callable pgstat* functions. Returns
2912  * a pointer to the slru statistics struct.
2913  * ---------
2914  */
2917 {
2919 
2920  return slruStats;
2921 }
2922 
2923 /*
2924  * ---------
2925  * pgstat_fetch_replslot() -
2926  *
2927  * Support function for the SQL-callable pgstat* functions. Returns
2928  * a pointer to the replication slot statistics struct.
2929  * ---------
2930  */
2933 {
2935 
2936  return pgstat_get_replslot_entry(slotname, false);
2937 }
2938 
2939 /*
2940  * Shut down a single backend's statistics reporting at process exit.
2941  *
2942  * Flush any remaining statistics counts out to the collector.
2943  * Without this, operations triggered during backend exit (such as
2944  * temp table deletions) won't be counted.
2945  */
2946 static void
2948 {
2949  Assert(!pgstat_is_shutdown);
2950 
2951  /*
2952  * If we got as far as discovering our own database ID, we can report what
2953  * we did to the collector. Otherwise, we'd be sending an invalid
2954  * database ID, so forget it. (This means that accesses to pg_database
2955  * during failed backend starts might never get counted.)
2956  */
2957  if (OidIsValid(MyDatabaseId))
2958  pgstat_report_stat(true);
2959 
2960 #ifdef USE_ASSERT_CHECKING
2961  pgstat_is_shutdown = true;
2962 #endif
2963 }
2964 
2965 /* ----------
2966  * pgstat_initialize() -
2967  *
2968  * Initialize pgstats state, and set up our on-proc-exit hook. Called from
2969  * BaseInit().
2970  *
2971  * NOTE: MyDatabaseId isn't set yet; so the shutdown hook has to be careful.
2972  * ----------
2973  */
2974 void
2976 {
2977  Assert(!pgstat_is_initialized);
2978 
2979  /*
2980  * Initialize prevWalUsage with pgWalUsage so that pgstat_send_wal() can
2981  * calculate how much pgWalUsage counters are increased by subtracting
2982  * prevWalUsage from pgWalUsage.
2983  */
2984  prevWalUsage = pgWalUsage;
2985 
2986  /* Set up a process-exit hook to clean up */
2988 
2989 #ifdef USE_ASSERT_CHECKING
2990  pgstat_is_initialized = true;
2991 #endif
2992 }
2993 
2994 /* ------------------------------------------------------------
2995  * Local support functions follow
2996  * ------------------------------------------------------------
2997  */
2998 
2999 
3000 /* ----------
3001  * pgstat_setheader() -
3002  *
3003  * Set common header fields in a statistics message
3004  * ----------
3005  */
3006 static void
3008 {
3009  hdr->m_type = mtype;
3010 }
3011 
3012 
3013 /* ----------
3014  * pgstat_send() -
3015  *
3016  * Send out one statistics message to the collector
3017  * ----------
3018  */
3019 static void
3020 pgstat_send(void *msg, int len)
3021 {
3022  int rc;
3023 
3025 
3027  return;
3028 
3029  ((PgStat_MsgHdr *) msg)->m_size = len;
3030 
3031  /* We'll retry after EINTR, but ignore all other failures */
3032  do
3033  {
3034  rc = send(pgStatSock, msg, len, 0);
3035  } while (rc < 0 && errno == EINTR);
3036 
3037 #ifdef USE_ASSERT_CHECKING
3038  /* In debug builds, log send failures ... */
3039  if (rc < 0)
3040  elog(LOG, "could not send to statistics collector: %m");
3041 #endif
3042 }
3043 
3044 /* ----------
3045  * pgstat_send_archiver() -
3046  *
3047  * Tell the collector about the WAL file that we successfully
3048  * archived or failed to archive.
3049  * ----------
3050  */
3051 void
3052 pgstat_send_archiver(const char *xlog, bool failed)
3053 {
3054  PgStat_MsgArchiver msg;
3055 
3056  /*
3057  * Prepare and send the message
3058  */
3060  msg.m_failed = failed;
3061  strlcpy(msg.m_xlog, xlog, sizeof(msg.m_xlog));
3063  pgstat_send(&msg, sizeof(msg));
3064 }
3065 
3066 /* ----------
3067  * pgstat_send_bgwriter() -
3068  *
3069  * Send bgwriter statistics to the collector
3070  * ----------
3071  */
3072 void
3074 {
3075  /* We assume this initializes to zeroes */
3076  static const PgStat_MsgBgWriter all_zeroes;
3077 
3079 
3080  /*
3081  * This function can be called even if nothing at all has happened. In
3082  * this case, avoid sending a completely empty message to the stats
3083  * collector.
3084  */
3085  if (memcmp(&PendingBgWriterStats, &all_zeroes, sizeof(PgStat_MsgBgWriter)) == 0)
3086  return;
3087 
3088  /*
3089  * Prepare and send the message
3090  */
3091  pgstat_setheader(&PendingBgWriterStats.m_hdr, PGSTAT_MTYPE_BGWRITER);
3092  pgstat_send(&PendingBgWriterStats, sizeof(PendingBgWriterStats));
3093 
3094  /*
3095  * Clear out the statistics buffer, so it can be re-used.
3096  */
3097  MemSet(&PendingBgWriterStats, 0, sizeof(PendingBgWriterStats));
3098 }
3099 
3100 /* ----------
3101  * pgstat_send_checkpointer() -
3102  *
3103  * Send checkpointer statistics to the collector
3104  * ----------
3105  */
3106 void
3108 {
3109  /* We assume this initializes to zeroes */
3110  static const PgStat_MsgCheckpointer all_zeroes;
3111 
3112  /*
3113  * This function can be called even if nothing at all has happened. In
3114  * this case, avoid sending a completely empty message to the stats
3115  * collector.
3116  */
3117  if (memcmp(&PendingCheckpointerStats, &all_zeroes, sizeof(PgStat_MsgCheckpointer)) == 0)
3118  return;
3119 
3120  /*
3121  * Prepare and send the message
3122  */
3123  pgstat_setheader(&PendingCheckpointerStats.m_hdr, PGSTAT_MTYPE_CHECKPOINTER);
3124  pgstat_send(&PendingCheckpointerStats, sizeof(PendingCheckpointerStats));
3125 
3126  /*
3127  * Clear out the statistics buffer, so it can be re-used.
3128  */
3129  MemSet(&PendingCheckpointerStats, 0, sizeof(PendingCheckpointerStats));
3130 }
3131 
3132 /* ----------
3133  * pgstat_send_wal() -
3134  *
3135  * Send WAL statistics to the collector.
3136  *
3137  * If 'force' is not set, WAL stats message is only sent if enough time has
3138  * passed since last one was sent to reach PGSTAT_STAT_INTERVAL.
3139  * ----------
3140  */
3141 void
3142 pgstat_send_wal(bool force)
3143 {
3144  static TimestampTz sendTime = 0;
3145 
3146  /*
3147  * This function can be called even if nothing at all has happened. In
3148  * this case, avoid sending a completely empty message to the stats
3149  * collector.
3150  *
3151  * Check wal_records counter to determine whether any WAL activity has
3152  * happened since last time. Note that other WalUsage counters don't need
3153  * to be checked because they are incremented always together with
3154  * wal_records counter.
3155  *
3156  * m_wal_buffers_full also doesn't need to be checked because it's
3157  * incremented only when at least one WAL record is generated (i.e.,
3158  * wal_records counter is incremented). But for safely, we assert that
3159  * m_wal_buffers_full is always zero when no WAL record is generated
3160  *
3161  * This function can be called by a process like walwriter that normally
3162  * generates no WAL records. To determine whether any WAL activity has
3163  * happened at that process since the last time, the numbers of WAL writes
3164  * and syncs are also checked.
3165  */
3166  if (pgWalUsage.wal_records == prevWalUsage.wal_records &&
3167  WalStats.m_wal_write == 0 && WalStats.m_wal_sync == 0)
3168  {
3169  Assert(WalStats.m_wal_buffers_full == 0);
3170  return;
3171  }
3172 
3173  if (!force)
3174  {
3176 
3177  /*
3178  * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
3179  * msec since we last sent one to avoid overloading the stats
3180  * collector.
3181  */
3182  if (!TimestampDifferenceExceeds(sendTime, now, PGSTAT_STAT_INTERVAL))
3183  return;
3184  sendTime = now;
3185  }
3186 
3187  /*
3188  * Set the counters related to generated WAL data if the counters were
3189  * updated.
3190  */
3191  if (pgWalUsage.wal_records != prevWalUsage.wal_records)
3192  {
3193  WalUsage walusage;
3194 
3195  /*
3196  * Calculate how much WAL usage counters were increased by
3197  * subtracting the previous counters from the current ones. Fill the
3198  * results in WAL stats message.
3199  */
3200  MemSet(&walusage, 0, sizeof(WalUsage));
3201  WalUsageAccumDiff(&walusage, &pgWalUsage, &prevWalUsage);
3202 
3203  WalStats.m_wal_records = walusage.wal_records;
3204  WalStats.m_wal_fpi = walusage.wal_fpi;
3205  WalStats.m_wal_bytes = walusage.wal_bytes;
3206 
3207  /*
3208  * Save the current counters for the subsequent calculation of WAL
3209  * usage.
3210  */
3211  prevWalUsage = pgWalUsage;
3212  }
3213 
3214  /*
3215  * Prepare and send the message
3216  */
3218  pgstat_send(&WalStats, sizeof(WalStats));
3219 
3220  /*
3221  * Clear out the statistics buffer, so it can be re-used.
3222  */
3223  MemSet(&WalStats, 0, sizeof(WalStats));
3224 }
3225 
3226 /* ----------
3227  * pgstat_send_slru() -
3228  *
3229  * Send SLRU statistics to the collector
3230  * ----------
3231  */
3232 static void
3234 {
3235  /* We assume this initializes to zeroes */
3236  static const PgStat_MsgSLRU all_zeroes;
3237 
3238  for (int i = 0; i < SLRU_NUM_ELEMENTS; i++)
3239  {
3240  /*
3241  * This function can be called even if nothing at all has happened. In
3242  * this case, avoid sending a completely empty message to the stats
3243  * collector.
3244  */
3245  if (memcmp(&SLRUStats[i], &all_zeroes, sizeof(PgStat_MsgSLRU)) == 0)
3246  continue;
3247 
3248  /* set the SLRU type before each send */
3249  SLRUStats[i].m_index = i;
3250 
3251  /*
3252  * Prepare and send the message
3253  */
3254  pgstat_setheader(&SLRUStats[i].m_hdr, PGSTAT_MTYPE_SLRU);
3255  pgstat_send(&SLRUStats[i], sizeof(PgStat_MsgSLRU));
3256 
3257  /*
3258  * Clear out the statistics buffer, so it can be re-used.
3259  */
3260  MemSet(&SLRUStats[i], 0, sizeof(PgStat_MsgSLRU));
3261  }
3262 }
3263 
3264 
3265 /* ----------
3266  * PgstatCollectorMain() -
3267  *
3268  * Start up the statistics collector process. This is the body of the
3269  * postmaster child process.
3270  *
3271  * The argc/argv parameters are valid only in EXEC_BACKEND case.
3272  * ----------
3273  */
3274 NON_EXEC_STATIC void
3275 PgstatCollectorMain(int argc, char *argv[])
3276 {
3277  int len;
3278  PgStat_Msg msg;
3279  int wr;
3280  WaitEvent event;
3281  WaitEventSet *wes;
3282 
3283  /*
3284  * Ignore all signals usually bound to some action in the postmaster,
3285  * except SIGHUP and SIGQUIT. Note we don't need a SIGUSR1 handler to
3286  * support latch operations, because we only use a local latch.
3287  */
3289  pqsignal(SIGINT, SIG_IGN);
3290  pqsignal(SIGTERM, SIG_IGN);
3296  /* Reset some signals that are accepted by postmaster but not here */
3299 
3301  init_ps_display(NULL);
3302 
3303  /*
3304  * Read in existing stats files or initialize the stats to zero.
3305  */
3306  pgStatRunningInCollector = true;
3307  pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
3308 
3309  /* Prepare to wait for our latch or data in our socket. */
3313  AddWaitEventToSet(wes, WL_SOCKET_READABLE, pgStatSock, NULL, NULL);
3314 
3315  /*
3316  * Loop to process messages until we get SIGQUIT or detect ungraceful
3317  * death of our parent postmaster.
3318  *
3319  * For performance reasons, we don't want to do ResetLatch/WaitLatch after
3320  * every message; instead, do that only after a recv() fails to obtain a
3321  * message. (This effectively means that if backends are sending us stuff
3322  * like mad, we won't notice postmaster death until things slack off a
3323  * bit; which seems fine.) To do that, we have an inner loop that
3324  * iterates as long as recv() succeeds. We do check ConfigReloadPending
3325  * inside the inner loop, which means that such interrupts will get
3326  * serviced but the latch won't get cleared until next time there is a
3327  * break in the action.
3328  */
3329  for (;;)
3330  {
3331  /* Clear any already-pending wakeups */
3333 
3334  /*
3335  * Quit if we get SIGQUIT from the postmaster.
3336  */
3338  break;
3339 
3340  /*
3341  * Inner loop iterates as long as we keep getting messages, or until
3342  * ShutdownRequestPending becomes set.
3343  */
3344  while (!ShutdownRequestPending)
3345  {
3346  /*
3347  * Reload configuration if we got SIGHUP from the postmaster.
3348  */
3349  if (ConfigReloadPending)
3350  {
3351  ConfigReloadPending = false;
3353  }
3354 
3355  /*
3356  * Write the stats file(s) if a new request has arrived that is
3357  * not satisfied by existing file(s).
3358  */
3360  pgstat_write_statsfiles(false, false);
3361 
3362  /*
3363  * Try to receive and process a message. This will not block,
3364  * since the socket is set to non-blocking mode.
3365  *
3366  * XXX On Windows, we have to force pgwin32_recv to cooperate,
3367  * despite the previous use of pg_set_noblock() on the socket.
3368  * This is extremely broken and should be fixed someday.
3369  */
3370 #ifdef WIN32
3371  pgwin32_noblock = 1;
3372 #endif
3373 
3374  len = recv(pgStatSock, (char *) &msg,
3375  sizeof(PgStat_Msg), 0);
3376 
3377 #ifdef WIN32
3378  pgwin32_noblock = 0;
3379 #endif
3380 
3381  if (len < 0)
3382  {
3383  if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
3384  break; /* out of inner loop */
3385  ereport(ERROR,
3387  errmsg("could not read statistics message: %m")));
3388  }
3389 
3390  /*
3391  * We ignore messages that are smaller than our common header
3392  */
3393  if (len < sizeof(PgStat_MsgHdr))
3394  continue;
3395 
3396  /*
3397  * The received length must match the length in the header
3398  */
3399  if (msg.msg_hdr.m_size != len)
3400  continue;
3401 
3402  /*
3403  * O.K. - we accept this message. Process it.
3404  */
3405  switch (msg.msg_hdr.m_type)
3406  {
3407  case PGSTAT_MTYPE_DUMMY:
3408  break;
3409 
3410  case PGSTAT_MTYPE_INQUIRY:
3411  pgstat_recv_inquiry(&msg.msg_inquiry, len);
3412  break;
3413 
3414  case PGSTAT_MTYPE_TABSTAT:
3415  pgstat_recv_tabstat(&msg.msg_tabstat, len);
3416  break;
3417 
3418  case PGSTAT_MTYPE_TABPURGE:
3419  pgstat_recv_tabpurge(&msg.msg_tabpurge, len);
3420  break;
3421 
3422  case PGSTAT_MTYPE_DROPDB:
3423  pgstat_recv_dropdb(&msg.msg_dropdb, len);
3424  break;
3425 
3427  pgstat_recv_resetcounter(&msg.msg_resetcounter, len);
3428  break;
3429 
3431  pgstat_recv_resetsharedcounter(&msg.msg_resetsharedcounter,
3432  len);
3433  break;
3434 
3436  pgstat_recv_resetsinglecounter(&msg.msg_resetsinglecounter,
3437  len);
3438  break;
3439 
3441  pgstat_recv_resetslrucounter(&msg.msg_resetslrucounter,
3442  len);
3443  break;
3444 
3446  pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
3447  len);
3448  break;
3449 
3451  pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
3452  break;
3453 
3454  case PGSTAT_MTYPE_VACUUM:
3455  pgstat_recv_vacuum(&msg.msg_vacuum, len);
3456  break;
3457 
3458  case PGSTAT_MTYPE_ANALYZE:
3459  pgstat_recv_analyze(&msg.msg_analyze, len);
3460  break;
3461 
3462  case PGSTAT_MTYPE_ARCHIVER:
3463  pgstat_recv_archiver(&msg.msg_archiver, len);
3464  break;
3465 
3466  case PGSTAT_MTYPE_BGWRITER:
3467  pgstat_recv_bgwriter(&msg.msg_bgwriter, len);
3468  break;
3469 
3471  pgstat_recv_checkpointer(&msg.msg_checkpointer, len);
3472  break;
3473 
3474  case PGSTAT_MTYPE_WAL:
3475  pgstat_recv_wal(&msg.msg_wal, len);
3476  break;
3477 
3478  case PGSTAT_MTYPE_SLRU:
3479  pgstat_recv_slru(&msg.msg_slru, len);
3480  break;
3481 
3482  case PGSTAT_MTYPE_FUNCSTAT:
3483  pgstat_recv_funcstat(&msg.msg_funcstat, len);
3484  break;
3485 
3487  pgstat_recv_funcpurge(&msg.msg_funcpurge, len);
3488  break;
3489 
3491  pgstat_recv_recoveryconflict(&msg.msg_recoveryconflict,
3492  len);
3493  break;
3494 
3495  case PGSTAT_MTYPE_DEADLOCK:
3496  pgstat_recv_deadlock(&msg.msg_deadlock, len);
3497  break;
3498 
3499  case PGSTAT_MTYPE_TEMPFILE:
3500  pgstat_recv_tempfile(&msg.msg_tempfile, len);
3501  break;
3502 
3504  pgstat_recv_checksum_failure(&msg.msg_checksumfailure,
3505  len);
3506  break;
3507 
3508  case PGSTAT_MTYPE_REPLSLOT:
3509  pgstat_recv_replslot(&msg.msg_replslot, len);
3510  break;
3511 
3512  case PGSTAT_MTYPE_CONNECT:
3513  pgstat_recv_connect(&msg.msg_connect, len);
3514  break;
3515 
3517  pgstat_recv_disconnect(&msg.msg_disconnect, len);
3518  break;
3519 
3520  default:
3521  break;
3522  }
3523  } /* end of inner message-processing loop */
3524 
3525  /* Sleep until there's something to do */
3526 #ifndef WIN32
3527  wr = WaitEventSetWait(wes, -1L, &event, 1, WAIT_EVENT_PGSTAT_MAIN);
3528 #else
3529 
3530  /*
3531  * Windows, at least in its Windows Server 2003 R2 incarnation,
3532  * sometimes loses FD_READ events. Waking up and retrying the recv()
3533  * fixes that, so don't sleep indefinitely. This is a crock of the
3534  * first water, but until somebody wants to debug exactly what's
3535  * happening there, this is the best we can do. The two-second
3536  * timeout matches our pre-9.2 behavior, and needs to be short enough
3537  * to not provoke "using stale statistics" complaints from
3538  * backend_read_statsfile.
3539  */
3540  wr = WaitEventSetWait(wes, 2 * 1000L /* msec */ , &event, 1,
3542 #endif
3543 
3544  /*
3545  * Emergency bailout if postmaster has died. This is to avoid the
3546  * necessity for manual cleanup of all postmaster children.
3547  */
3548  if (wr == 1 && event.events == WL_POSTMASTER_DEATH)
3549  break;
3550  } /* end of outer loop */
3551 
3552  /*
3553  * Save the final stats to reuse at next startup.
3554  */
3555  pgstat_write_statsfiles(true, true);
3556 
3557  FreeWaitEventSet(wes);
3558 
3559  exit(0);
3560 }
3561 
3562 /*
3563  * Subroutine to clear stats in a database entry
3564  *
3565  * Tables and functions hashes are initialized to empty.
3566  */
3567 static void
3569 {
3570  HASHCTL hash_ctl;
3571 
3572  dbentry->n_xact_commit = 0;
3573  dbentry->n_xact_rollback = 0;
3574  dbentry->n_blocks_fetched = 0;
3575  dbentry->n_blocks_hit = 0;
3576  dbentry->n_tuples_returned = 0;
3577  dbentry->n_tuples_fetched = 0;
3578  dbentry->n_tuples_inserted = 0;
3579  dbentry->n_tuples_updated = 0;
3580  dbentry->n_tuples_deleted = 0;
3581  dbentry->last_autovac_time = 0;
3582  dbentry->n_conflict_tablespace = 0;
3583  dbentry->n_conflict_lock = 0;
3584  dbentry->n_conflict_snapshot = 0;
3585  dbentry->n_conflict_bufferpin = 0;
3586  dbentry->n_conflict_startup_deadlock = 0;
3587  dbentry->n_temp_files = 0;
3588  dbentry->n_temp_bytes = 0;
3589  dbentry->n_deadlocks = 0;
3590  dbentry->n_checksum_failures = 0;
3591  dbentry->last_checksum_failure = 0;
3592  dbentry->n_block_read_time = 0;
3593  dbentry->n_block_write_time = 0;
3594  dbentry->n_sessions = 0;
3595  dbentry->total_session_time = 0;
3596  dbentry->total_active_time = 0;
3597  dbentry->total_idle_in_xact_time = 0;
3598  dbentry->n_sessions_abandoned = 0;
3599  dbentry->n_sessions_fatal = 0;
3600  dbentry->n_sessions_killed = 0;
3601 
3603  dbentry->stats_timestamp = 0;
3604 
3605  hash_ctl.keysize = sizeof(Oid);
3606  hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3607  dbentry->tables = hash_create("Per-database table",
3609  &hash_ctl,
3610  HASH_ELEM | HASH_BLOBS);
3611 
3612  hash_ctl.keysize = sizeof(Oid);
3613  hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
3614  dbentry->functions = hash_create("Per-database function",
3616  &hash_ctl,
3617  HASH_ELEM | HASH_BLOBS);
3618 }
3619 
3620 /*
3621  * Lookup the hash table entry for the specified database. If no hash
3622  * table entry exists, initialize it, if the create parameter is true.
3623  * Else, return NULL.
3624  */
3625 static PgStat_StatDBEntry *
3626 pgstat_get_db_entry(Oid databaseid, bool create)
3627 {
3628  PgStat_StatDBEntry *result;
3629  bool found;
3630  HASHACTION action = (create ? HASH_ENTER : HASH_FIND);
3631 
3632  /* Lookup or create the hash table entry for this database */
3633  result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
3634  &databaseid,
3635  action, &found);
3636 
3637  if (!create && !found)
3638  return NULL;
3639 
3640  /*
3641  * If not found, initialize the new one. This creates empty hash tables
3642  * for tables and functions, too.
3643  */
3644  if (!found)
3645  reset_dbentry_counters(result);
3646 
3647  return result;
3648 }
3649 
3650 
3651 /*
3652  * Lookup the hash table entry for the specified table. If no hash
3653  * table entry exists, initialize it, if the create parameter is true.
3654  * Else, return NULL.
3655  */
3656 static PgStat_StatTabEntry *
3657 pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
3658 {
3659  PgStat_StatTabEntry *result;
3660  bool found;
3661  HASHACTION action = (create ? HASH_ENTER : HASH_FIND);
3662 
3663  /* Lookup or create the hash table entry for this table */
3664  result = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
3665  &tableoid,
3666  action, &found);
3667 
3668  if (!create && !found)
3669  return NULL;
3670 
3671  /* If not found, initialize the new one. */
3672  if (!found)
3673  {
3674  result->numscans = 0;
3675  result->tuples_returned = 0;
3676  result->tuples_fetched = 0;
3677  result->tuples_inserted = 0;
3678  result->tuples_updated = 0;
3679  result->tuples_deleted = 0;
3680  result->tuples_hot_updated = 0;
3681  result->n_live_tuples = 0;
3682  result->n_dead_tuples = 0;
3683  result->changes_since_analyze = 0;
3684  result->inserts_since_vacuum = 0;
3685  result->blocks_fetched = 0;
3686  result->blocks_hit = 0;
3687  result->vacuum_timestamp = 0;
3688  result->vacuum_count = 0;
3689  result->autovac_vacuum_timestamp = 0;
3690  result->autovac_vacuum_count = 0;
3691  result->analyze_timestamp = 0;
3692  result->analyze_count = 0;
3693  result->autovac_analyze_timestamp = 0;
3694  result->autovac_analyze_count = 0;
3695  }
3696 
3697  return result;
3698 }
3699 
3700 
3701 /* ----------
3702  * pgstat_write_statsfiles() -
3703  * Write the global statistics file, as well as requested DB files.
3704  *
3705  * 'permanent' specifies writing to the permanent files not temporary ones.
3706  * When true (happens only when the collector is shutting down), also remove
3707  * the temporary files so that backends starting up under a new postmaster
3708  * can't read old data before the new collector is ready.
3709  *
3710  * When 'allDbs' is false, only the requested databases (listed in
3711  * pending_write_requests) will be written; otherwise, all databases
3712  * will be written.
3713  * ----------
3714  */
3715 static void
3716 pgstat_write_statsfiles(bool permanent, bool allDbs)
3717 {
3718  HASH_SEQ_STATUS hstat;
3719  PgStat_StatDBEntry *dbentry;
3720  FILE *fpout;
3721  int32 format_id;
3722  const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
3723  const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
3724  int rc;
3725 
3726  elog(DEBUG2, "writing stats file \"%s\"", statfile);
3727 
3728  /*
3729  * Open the statistics temp file to write out the current values.
3730  */
3731  fpout = AllocateFile(tmpfile, PG_BINARY_W);
3732  if (fpout == NULL)
3733  {
3734  ereport(LOG,
3736  errmsg("could not open temporary statistics file \"%s\": %m",
3737  tmpfile)));
3738  return;
3739  }
3740 
3741  /*
3742  * Set the timestamp of the stats file.
3743  */
3744  globalStats.stats_timestamp = GetCurrentTimestamp();
3745 
3746  /*
3747  * Write the file header --- currently just a format ID.
3748  */
3749  format_id = PGSTAT_FILE_FORMAT_ID;
3750  rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
3751  (void) rc; /* we'll check for error with ferror */
3752 
3753  /*
3754  * Write global stats struct
3755  */
3756  rc = fwrite(&globalStats, sizeof(globalStats), 1, fpout);
3757  (void) rc; /* we'll check for error with ferror */
3758 
3759  /*
3760  * Write archiver stats struct
3761  */
3762  rc = fwrite(&archiverStats, sizeof(archiverStats), 1, fpout);
3763  (void) rc; /* we'll check for error with ferror */
3764 
3765  /*
3766  * Write WAL stats struct
3767  */
3768  rc = fwrite(&walStats, sizeof(walStats), 1, fpout);
3769  (void) rc; /* we'll check for error with ferror */
3770 
3771  /*
3772  * Write SLRU stats struct
3773  */
3774  rc = fwrite(slruStats, sizeof(slruStats), 1, fpout);
3775  (void) rc; /* we'll check for error with ferror */
3776 
3777  /*
3778  * Walk through the database table.
3779  */
3780  hash_seq_init(&hstat, pgStatDBHash);
3781  while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
3782  {
3783  /*
3784  * Write out the table and function stats for this DB into the
3785  * appropriate per-DB stat file, if required.
3786  */
3787  if (allDbs || pgstat_db_requested(dbentry->databaseid))
3788  {
3789  /* Make DB's timestamp consistent with the global stats */
3790  dbentry->stats_timestamp = globalStats.stats_timestamp;
3791 
3792  pgstat_write_db_statsfile(dbentry, permanent);
3793  }
3794 
3795  /*
3796  * Write out the DB entry. We don't write the tables or functions
3797  * pointers, since they're of no use to any other process.
3798  */
3799  fputc('D', fpout);
3800  rc = fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
3801  (void) rc; /* we'll check for error with ferror */
3802  }
3803 
3804  /*
3805  * Write replication slot stats struct
3806  */
3807  if (replSlotStatHash)
3808  {
3809  PgStat_StatReplSlotEntry *slotent;
3810 
3811  hash_seq_init(&hstat, replSlotStatHash);
3812  while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
3813  {
3814  fputc('R', fpout);
3815  rc = fwrite(slotent, sizeof(PgStat_StatReplSlotEntry), 1, fpout);
3816  (void) rc; /* we'll check for error with ferror */
3817  }
3818  }
3819 
3820  /*
3821  * No more output to be done. Close the temp file and replace the old
3822  * pgstat.stat with it. The ferror() check replaces testing for error
3823  * after each individual fputc or fwrite above.
3824  */
3825  fputc('E', fpout);
3826 
3827  if (ferror(fpout))
3828  {
3829  ereport(LOG,
3831  errmsg("could not write temporary statistics file \"%s\": %m",
3832  tmpfile)));
3833  FreeFile(fpout);
3834  unlink(tmpfile);
3835  }
3836  else if (FreeFile(fpout) < 0)
3837  {
3838  ereport(LOG,
3840  errmsg("could not close temporary statistics file \"%s\": %m",
3841  tmpfile)));
3842  unlink(tmpfile);
3843  }
3844  else if (rename(tmpfile, statfile) < 0)
3845  {
3846  ereport(LOG,
3848  errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
3849  tmpfile, statfile)));
3850  unlink(tmpfile);
3851  }
3852 
3853  if (permanent)
3854  unlink(pgstat_stat_filename);
3855 
3856  /*
3857  * Now throw away the list of requests. Note that requests sent after we
3858  * started the write are still waiting on the network socket.
3859  */
3860  list_free(pending_write_requests);
3861  pending_write_requests = NIL;
3862 }
3863 
3864 /*
3865  * return the filename for a DB stat file; filename is the output buffer,
3866  * of length len.
3867  */
3868 static void
3869 get_dbstat_filename(bool permanent, bool tempname, Oid databaseid,
3870  char *filename, int len)
3871 {
3872  int printed;
3873 
3874  /* NB -- pgstat_reset_remove_files knows about the pattern this uses */
3875  printed = snprintf(filename, len, "%s/db_%u.%s",
3876  permanent ? PGSTAT_STAT_PERMANENT_DIRECTORY :
3878  databaseid,
3879  tempname ? "tmp" : "stat");
3880  if (printed >= len)
3881  elog(ERROR, "overlength pgstat path");
3882 }
3883 
3884 /* ----------
3885  * pgstat_write_db_statsfile() -
3886  * Write the stat file for a single database.
3887  *
3888  * If writing to the permanent file (happens when the collector is
3889  * shutting down only), remove the temporary file so that backends
3890  * starting up under a new postmaster can't read the old data before
3891  * the new collector is ready.
3892  * ----------
3893  */
3894 static void
3896 {
3897  HASH_SEQ_STATUS tstat;
3899  PgStat_StatTabEntry *tabentry;
3900  PgStat_StatFuncEntry *funcentry;
3901  FILE *fpout;
3902  int32 format_id;
3903  Oid dbid = dbentry->databaseid;
3904  int rc;
3905  char tmpfile[MAXPGPATH];
3906  char statfile[MAXPGPATH];
3907 
3908  get_dbstat_filename(permanent, true, dbid, tmpfile, MAXPGPATH);
3909  get_dbstat_filename(permanent, false, dbid, statfile, MAXPGPATH);
3910 
3911  elog(DEBUG2, "writing stats file \"%s\"", statfile);
3912 
3913  /*
3914  * Open the statistics temp file to write out the current values.
3915  */
3916  fpout = AllocateFile(tmpfile, PG_BINARY_W);
3917  if (fpout == NULL)
3918  {
3919  ereport(LOG,
3921  errmsg("could not open temporary statistics file \"%s\": %m",
3922  tmpfile)));
3923  return;
3924  }
3925 
3926  /*
3927  * Write the file header --- currently just a format ID.
3928  */
3929  format_id = PGSTAT_FILE_FORMAT_ID;
3930  rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
3931  (void) rc; /* we'll check for error with ferror */
3932 
3933  /*
3934  * Walk through the database's access stats per table.
3935  */
3936  hash_seq_init(&tstat, dbentry->tables);
3937  while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
3938  {
3939  fputc('T', fpout);
3940  rc = fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
3941  (void) rc; /* we'll check for error with ferror */
3942  }
3943 
3944  /*
3945  * Walk through the database's function stats table.
3946  */
3947  hash_seq_init(&fstat, dbentry->functions);
3948  while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&fstat)) != NULL)
3949  {
3950  fputc('F', fpout);
3951  rc = fwrite(funcentry, sizeof(PgStat_StatFuncEntry), 1, fpout);
3952  (void) rc; /* we'll check for error with ferror */
3953  }
3954 
3955  /*
3956  * No more output to be done. Close the temp file and replace the old
3957  * pgstat.stat with it. The ferror() check replaces testing for error
3958  * after each individual fputc or fwrite above.
3959  */
3960  fputc('E', fpout);
3961 
3962  if (ferror(fpout))
3963  {
3964  ereport(LOG,
3966  errmsg("could not write temporary statistics file \"%s\": %m",
3967  tmpfile)));
3968  FreeFile(fpout);
3969  unlink(tmpfile);
3970  }
3971  else if (FreeFile(fpout) < 0)
3972  {
3973  ereport(LOG,
3975  errmsg("could not close temporary statistics file \"%s\": %m",
3976  tmpfile)));
3977  unlink(tmpfile);
3978  }
3979  else if (rename(tmpfile, statfile) < 0)
3980  {
3981  ereport(LOG,
3983  errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
3984  tmpfile, statfile)));
3985  unlink(tmpfile);
3986  }
3987 
3988  if (permanent)
3989  {
3990  get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
3991 
3992  elog(DEBUG2, "removing temporary stats file \"%s\"", statfile);
3993  unlink(statfile);
3994  }
3995 }
3996 
3997 /* ----------
3998  * pgstat_read_statsfiles() -
3999  *
4000  * Reads in some existing statistics collector files and returns the
4001  * databases hash table that is the top level of the data.
4002  *
4003  * If 'onlydb' is not InvalidOid, it means we only want data for that DB
4004  * plus the shared catalogs ("DB 0"). We'll still populate the DB hash
4005  * table for all databases, but we don't bother even creating table/function
4006  * hash tables for other databases.
4007  *
4008  * 'permanent' specifies reading from the permanent files not temporary ones.
4009  * When true (happens only when the collector is starting up), remove the
4010  * files after reading; the in-memory status is now authoritative, and the
4011  * files would be out of date in case somebody else reads them.
4012  *
4013  * If a 'deep' read is requested, table/function stats are read, otherwise
4014  * the table/function hash tables remain empty.
4015  * ----------
4016  */
4017 static HTAB *
4018 pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
4019 {
4020  PgStat_StatDBEntry *dbentry;
4021  PgStat_StatDBEntry dbbuf;
4022  HASHCTL hash_ctl;
4023  HTAB *dbhash;
4024  FILE *fpin;
4025  int32 format_id;
4026  bool found;
4027  const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
4028  int i;
4029  TimestampTz ts;
4030 
4031  /*
4032  * The tables will live in pgStatLocalContext.
4033  */
4035 
4036  /*
4037  * Create the DB hashtable
4038  */
4039  hash_ctl.keysize = sizeof(Oid);
4040  hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
4041  hash_ctl.hcxt = pgStatLocalContext;
4042  dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
4044 
4045  /*
4046  * Clear out global, archiver, WAL and SLRU statistics so they start from
4047  * zero in case we can't load an existing statsfile.
4048  */
4049  memset(&globalStats, 0, sizeof(globalStats));
4050  memset(&archiverStats, 0, sizeof(archiverStats));
4051  memset(&walStats, 0, sizeof(walStats));
4052  memset(&slruStats, 0, sizeof(slruStats));
4053 
4054  /*
4055  * Set the current timestamp (will be kept only in case we can't load an
4056  * existing statsfile).
4057  */
4058  ts = GetCurrentTimestamp();
4059  globalStats.bgwriter.stat_reset_timestamp = ts;
4060  archiverStats.stat_reset_timestamp = ts;
4061  walStats.stat_reset_timestamp = ts;
4062 
4063  /*
4064  * Set the same reset timestamp for all SLRU items too.
4065  */
4066  for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
4067  slruStats[i].stat_reset_timestamp = ts;
4068 
4069  /*
4070  * Try to open the stats file. If it doesn't exist, the backends simply
4071  * return zero for anything and the collector simply starts from scratch
4072  * with empty counters.
4073  *
4074  * ENOENT is a possibility if the stats collector is not running or has
4075  * not yet written the stats file the first time. Any other failure
4076  * condition is suspicious.
4077  */
4078  if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
4079  {
4080  if (errno != ENOENT)
4083  errmsg("could not open statistics file \"%s\": %m",
4084  statfile)));
4085  return dbhash;
4086  }
4087 
4088  /*
4089  * Verify it's of the expected format.
4090  */
4091  if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
4092  format_id != PGSTAT_FILE_FORMAT_ID)
4093  {
4095  (errmsg("corrupted statistics file \"%s\"", statfile)));
4096  goto done;
4097  }
4098 
4099  /*
4100  * Read global stats struct
4101  */
4102  if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats))
4103  {
4105  (errmsg("corrupted statistics file \"%s\"", statfile)));
4106  memset(&globalStats, 0, sizeof(globalStats));
4107  goto done;
4108  }
4109 
4110  /*
4111  * In the collector, disregard the timestamp we read from the permanent
4112  * stats file; we should be willing to write a temp stats file immediately
4113  * upon the first request from any backend. This only matters if the old
4114  * file's timestamp is less than PGSTAT_STAT_INTERVAL ago, but that's not
4115  * an unusual scenario.
4116  */
4118  globalStats.stats_timestamp = 0;
4119 
4120  /*
4121  * Read archiver stats struct
4122  */
4123  if (fread(&archiverStats, 1, sizeof(archiverStats), fpin) != sizeof(archiverStats))
4124  {
4126  (errmsg("corrupted statistics file \"%s\"", statfile)));
4127  memset(&archiverStats, 0, sizeof(archiverStats));
4128  goto done;
4129  }
4130 
4131  /*
4132  * Read WAL stats struct
4133  */
4134  if (fread(&walStats, 1, sizeof(walStats), fpin) != sizeof(walStats))
4135  {
4137  (errmsg("corrupted statistics file \"%s\"", statfile)));
4138  memset(&walStats, 0, sizeof(walStats));
4139  goto done;
4140  }
4141 
4142  /*
4143  * Read SLRU stats struct
4144  */
4145  if (fread(slruStats, 1, sizeof(slruStats), fpin) != sizeof(slruStats))
4146  {
4148  (errmsg("corrupted statistics file \"%s\"", statfile)));
4149  memset(&slruStats, 0, sizeof(slruStats));
4150  goto done;
4151  }
4152 
4153  /*
4154  * We found an existing collector stats file. Read it and put all the
4155  * hashtable entries into place.
4156  */
4157  for (;;)
4158  {
4159  switch (fgetc(fpin))
4160  {
4161  /*
4162  * 'D' A PgStat_StatDBEntry struct describing a database
4163  * follows.
4164  */
4165  case 'D':
4166  if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
4167  fpin) != offsetof(PgStat_StatDBEntry, tables))
4168  {
4170  (errmsg("corrupted statistics file \"%s\"",
4171  statfile)));
4172  goto done;
4173  }
4174 
4175  /*
4176  * Add to the DB hash
4177  */
4178  dbentry = (PgStat_StatDBEntry *) hash_search(dbhash,
4179  (void *) &dbbuf.databaseid,
4180  HASH_ENTER,
4181  &found);
4182  if (found)
4183  {
4185  (errmsg("corrupted statistics file \"%s\"",
4186  statfile)));
4187  goto done;
4188  }
4189 
4190  memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
4191  dbentry->tables = NULL;
4192  dbentry->functions = NULL;
4193 
4194  /*
4195  * In the collector, disregard the timestamp we read from the
4196  * permanent stats file; we should be willing to write a temp
4197  * stats file immediately upon the first request from any
4198  * backend.
4199  */
4201  dbentry->stats_timestamp = 0;
4202 
4203  /*
4204  * Don't create tables/functions hashtables for uninteresting
4205  * databases.
4206  */
4207  if (onlydb != InvalidOid)
4208  {
4209  if (dbbuf.databaseid != onlydb &&
4210  dbbuf.databaseid != InvalidOid)
4211  break;
4212  }
4213 
4214  hash_ctl.keysize = sizeof(Oid);
4215  hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
4216  hash_ctl.hcxt = pgStatLocalContext;
4217  dbentry->tables = hash_create("Per-database table",
4219  &hash_ctl,
4221 
4222  hash_ctl.keysize = sizeof(Oid);
4223  hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
4224  hash_ctl.hcxt = pgStatLocalContext;
4225  dbentry->functions = hash_create("Per-database function",
4227  &hash_ctl,
4229 
4230  /*
4231  * If requested, read the data from the database-specific
4232  * file. Otherwise we just leave the hashtables empty.
4233  */
4234  if (deep)
4236  dbentry->tables,
4237  dbentry->functions,
4238  permanent);
4239 
4240  break;
4241 
4242  /*
4243  * 'R' A PgStat_StatReplSlotEntry struct describing a
4244  * replication slot follows.
4245  */
4246  case 'R':
4247  {
4248  PgStat_StatReplSlotEntry slotbuf;
4249  PgStat_StatReplSlotEntry *slotent;
4250 
4251  if (fread(&slotbuf, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
4252  != sizeof(PgStat_StatReplSlotEntry))
4253  {
4255  (errmsg("corrupted statistics file \"%s\"",
4256  statfile)));
4257  goto done;
4258  }
4259 
4260  /* Create hash table if we don't have it already. */
4261  if (replSlotStatHash == NULL)
4262  {
4263  HASHCTL hash_ctl;
4264 
4265  hash_ctl.keysize = sizeof(NameData);
4266  hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
4267  hash_ctl.hcxt = pgStatLocalContext;
4268  replSlotStatHash = hash_create("Replication slots hash",
4270  &hash_ctl,
4272  }
4273 
4274  slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
4275  (void *) &slotbuf.slotname,
4276  HASH_ENTER, NULL);
4277  memcpy(slotent, &slotbuf, sizeof(PgStat_StatReplSlotEntry));
4278  break;
4279  }
4280 
4281  case 'E':
4282  goto done;
4283 
4284  default:
4286  (errmsg("corrupted statistics file \"%s\"",
4287  statfile)));
4288  goto done;
4289  }
4290  }
4291 
4292 done:
4293  FreeFile(fpin);
4294 
4295  /* If requested to read the permanent file, also get rid of it. */
4296  if (permanent)
4297  {
4298  elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
4299  unlink(statfile);
4300  }
4301 
4302  return dbhash;
4303 }
4304 
4305 
4306 /* ----------
4307  * pgstat_read_db_statsfile() -
4308  *
4309  * Reads in the existing statistics collector file for the given database,
4310  * filling the passed-in tables and functions hash tables.
4311  *
4312  * As in pgstat_read_statsfiles, if the permanent file is requested, it is
4313  * removed after reading.
4314  *
4315  * Note: this code has the ability to skip storing per-table or per-function
4316  * data, if NULL is passed for the corresponding hashtable. That's not used
4317  * at the moment though.
4318  * ----------
4319  */
4320 static void
4321 pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
4322  bool permanent)
4323 {
4324  PgStat_StatTabEntry *tabentry;
4325  PgStat_StatTabEntry tabbuf;
4326  PgStat_StatFuncEntry funcbuf;
4327  PgStat_StatFuncEntry *funcentry;
4328  FILE *fpin;
4329  int32 format_id;
4330  bool found;
4331  char statfile[MAXPGPATH];
4332 
4333  get_dbstat_filename(permanent, false, databaseid, statfile, MAXPGPATH);
4334 
4335  /*
4336  * Try to open the stats file. If it doesn't exist, the backends simply
4337  * return zero for anything and the collector simply starts from scratch
4338  * with empty counters.
4339  *
4340  * ENOENT is a possibility if the stats collector is not running or has
4341  * not yet written the stats file the first time. Any other failure
4342  * condition is suspicious.
4343  */
4344  if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
4345  {
4346  if (errno != ENOENT)
4349  errmsg("could not open statistics file \"%s\": %m",
4350  statfile)));
4351  return;
4352  }
4353 
4354  /*
4355  * Verify it's of the expected format.
4356  */
4357  if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
4358  format_id != PGSTAT_FILE_FORMAT_ID)
4359  {
4361  (errmsg("corrupted statistics file \"%s\"", statfile)));
4362  goto done;
4363  }
4364 
4365  /*
4366  * We found an existing collector stats file. Read it and put all the
4367  * hashtable entries into place.
4368  */
4369  for (;;)
4370  {
4371  switch (fgetc(fpin))
4372  {
4373  /*
4374  * 'T' A PgStat_StatTabEntry follows.
4375  */
4376  case 'T':
4377  if (fread(&tabbuf, 1, sizeof(PgStat_StatTabEntry),
4378  fpin) != sizeof(PgStat_StatTabEntry))
4379  {
4381  (errmsg("corrupted statistics file \"%s\"",
4382  statfile)));
4383  goto done;
4384  }
4385 
4386  /*
4387  * Skip if table data not wanted.
4388  */
4389  if (tabhash == NULL)
4390  break;
4391 
4392  tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
4393  (void *) &tabbuf.tableid,
4394  HASH_ENTER, &found);
4395 
4396  if (found)
4397  {
4399  (errmsg("corrupted statistics file \"%s\"",
4400  statfile)));
4401  goto done;
4402  }
4403 
4404  memcpy(tabentry, &tabbuf, sizeof(tabbuf));
4405  break;
4406 
4407  /*
4408  * 'F' A PgStat_StatFuncEntry follows.
4409  */
4410  case 'F':
4411  if (fread(&funcbuf, 1, sizeof(PgStat_StatFuncEntry),
4412  fpin) != sizeof(PgStat_StatFuncEntry))
4413  {
4415  (errmsg("corrupted statistics file \"%s\"",
4416  statfile)));
4417  goto done;
4418  }
4419 
4420  /*
4421  * Skip if function data not wanted.
4422  */
4423  if (funchash == NULL)
4424  break;
4425 
4426  funcentry = (PgStat_StatFuncEntry *) hash_search(funchash,
4427  (void *) &funcbuf.functionid,
4428  HASH_ENTER, &found);
4429 
4430  if (found)
4431  {
4433  (errmsg("corrupted statistics file \"%s\"",
4434  statfile)));
4435  goto done;
4436  }
4437 
4438  memcpy(funcentry, &funcbuf, sizeof(funcbuf));
4439  break;
4440 
4441  /*
4442  * 'E' The EOF marker of a complete stats file.
4443  */
4444  case 'E':
4445  goto done;
4446 
4447  default:
4449  (errmsg("corrupted statistics file \"%s\"",
4450  statfile)));
4451  goto done;
4452  }
4453  }
4454 
4455 done:
4456  FreeFile(fpin);
4457 
4458  if (permanent)
4459  {
4460  elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
4461  unlink(statfile);
4462  }
4463 }
4464 
4465 /* ----------
4466  * pgstat_read_db_statsfile_timestamp() -
4467  *
4468  * Attempt to determine the timestamp of the last db statfile write.
4469  * Returns true if successful; the timestamp is stored in *ts. The caller must
4470  * rely on timestamp stored in *ts iff the function returns true.
4471  *
4472  * This needs to be careful about handling databases for which no stats file
4473  * exists, such as databases without a stat entry or those not yet written:
4474  *
4475  * - if there's a database entry in the global file, return the corresponding
4476  * stats_timestamp value.
4477  *
4478  * - if there's no db stat entry (e.g. for a new or inactive database),
4479  * there's no stats_timestamp value, but also nothing to write so we return
4480  * the timestamp of the global statfile.
4481  * ----------
4482  */
4483 static bool
4484 pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
4485  TimestampTz *ts)
4486 {
4487  PgStat_StatDBEntry dbentry;
4488  PgStat_GlobalStats myGlobalStats;
4489  PgStat_ArchiverStats myArchiverStats;
4490  PgStat_WalStats myWalStats;
4491  PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
4492  PgStat_StatReplSlotEntry myReplSlotStats;
4493  FILE *fpin;
4494  int32 format_id;
4495  const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
4496 
4497  /*
4498  * Try to open the stats file. As above, anything but ENOENT is worthy of
4499  * complaining about.
4500  */
4501  if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
4502  {
4503  if (errno != ENOENT)
4506  errmsg("could not open statistics file \"%s\": %m",
4507  statfile)));
4508  return false;
4509  }
4510 
4511  /*
4512  * Verify it's of the expected format.
4513  */
4514  if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
4515  format_id != PGSTAT_FILE_FORMAT_ID)
4516  {
4518  (errmsg("corrupted statistics file \"%s\"", statfile)));
4519  FreeFile(fpin);
4520  return false;
4521  }
4522 
4523  /*
4524  * Read global stats struct
4525  */
4526  if (fread(&myGlobalStats, 1, sizeof(myGlobalStats),
4527  fpin) != sizeof(myGlobalStats))
4528  {
4530  (errmsg("corrupted statistics file \"%s\"", statfile)));
4531  FreeFile(fpin);
4532  return false;
4533  }
4534 
4535  /*
4536  * Read archiver stats struct
4537  */
4538  if (fread(&myArchiverStats, 1, sizeof(myArchiverStats),
4539  fpin) != sizeof(myArchiverStats))
4540  {
4542  (errmsg("corrupted statistics file \"%s\"", statfile)));
4543  FreeFile(fpin);
4544  return false;
4545  }
4546 
4547  /*
4548  * Read WAL stats struct
4549  */
4550  if (fread(&myWalStats, 1, sizeof(myWalStats), fpin) != sizeof(myWalStats))
4551  {
4553  (errmsg("corrupted statistics file \"%s\"", statfile)));
4554  FreeFile(fpin);
4555  return false;
4556  }
4557 
4558  /*
4559  * Read SLRU stats struct
4560  */
4561  if (fread(mySLRUStats, 1, sizeof(mySLRUStats), fpin) != sizeof(mySLRUStats))
4562  {
4564  (errmsg("corrupted statistics file \"%s\"", statfile)));
4565  FreeFile(fpin);
4566  return false;
4567  }
4568 
4569  /* By default, we're going to return the timestamp of the global file. */
4570  *ts = myGlobalStats.stats_timestamp;
4571 
4572  /*
4573  * We found an existing collector stats file. Read it and look for a
4574  * record for the requested database. If found, use its timestamp.
4575  */
4576  for (;;)
4577  {
4578  switch (fgetc(fpin))
4579  {
4580  /*
4581  * 'D' A PgStat_StatDBEntry struct describing a database
4582  * follows.
4583  */
4584  case 'D':
4585  if (fread(&dbentry, 1, offsetof(PgStat_StatDBEntry, tables),
4586  fpin) != offsetof(PgStat_StatDBEntry, tables))
4587  {
4589  (errmsg("corrupted statistics file \"%s\"",
4590  statfile)));
4591  FreeFile(fpin);
4592  return false;
4593  }
4594 
4595  /*
4596  * If this is the DB we're looking for, save its timestamp and
4597  * we're done.
4598  */
4599  if (dbentry.databaseid == databaseid)
4600  {
4601  *ts = dbentry.stats_timestamp;
4602  goto done;
4603  }
4604 
4605  break;
4606 
4607  /*
4608  * 'R' A PgStat_StatReplSlotEntry struct describing a
4609  * replication slot follows.
4610  */
4611  case 'R':
4612  if (fread(&myReplSlotStats, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
4613  != sizeof(PgStat_StatReplSlotEntry))
4614  {
4616  (errmsg("corrupted statistics file \"%s\"",
4617  statfile)));
4618  FreeFile(fpin);
4619  return false;
4620  }
4621  break;
4622 
4623  case 'E':
4624  goto done;
4625 
4626  default:
4627  {
4629  (errmsg("corrupted statistics file \"%s\"",
4630  statfile)));
4631  FreeFile(fpin);
4632  return false;
4633  }
4634  }
4635  }
4636 
4637 done:
4638  FreeFile(fpin);
4639  return true;
4640 }
4641 
4642 /*
4643  * If not already done, read the statistics collector stats file into
4644  * some hash tables. The results will be kept until pgstat_clear_snapshot()
4645  * is called (typically, at end of transaction).
4646  */
4647 static void
4649 {
4650  TimestampTz min_ts = 0;
4651  TimestampTz ref_ts = 0;
4652  Oid inquiry_db;
4653  int count;
4654 
4656 
4657  /* already read it? */
4658  if (pgStatDBHash)
4659  return;
4661 
4662  /*
4663  * In a normal backend, we check staleness of the data for our own DB, and
4664  * so we send MyDatabaseId in inquiry messages. In the autovac launcher,
4665  * check staleness of the shared-catalog data, and send InvalidOid in
4666  * inquiry messages so as not to force writing unnecessary data.
4667  */
4669  inquiry_db = InvalidOid;
4670  else
4671  inquiry_db = MyDatabaseId;
4672 
4673  /*
4674  * Loop until fresh enough stats file is available or we ran out of time.
4675  * The stats inquiry message is sent repeatedly in case collector drops
4676  * it; but not every single time, as that just swamps the collector.
4677  */
4678  for (count = 0; count < PGSTAT_POLL_LOOP_COUNT; count++)
4679  {
4680  bool ok;
4681  TimestampTz file_ts = 0;
4682  TimestampTz cur_ts;
4683 
4685 
4686  ok = pgstat_read_db_statsfile_timestamp(inquiry_db, false, &file_ts);
4687 
4688  cur_ts = GetCurrentTimestamp();
4689  /* Calculate min acceptable timestamp, if we didn't already */
4690  if (count == 0 || cur_ts < ref_ts)
4691  {
4692  /*
4693  * We set the minimum acceptable timestamp to PGSTAT_STAT_INTERVAL
4694  * msec before now. This indirectly ensures that the collector
4695  * needn't write the file more often than PGSTAT_STAT_INTERVAL. In
4696  * an autovacuum worker, however, we want a lower delay to avoid
4697  * using stale data, so we use PGSTAT_RETRY_DELAY (since the
4698  * number of workers is low, this shouldn't be a problem).
4699  *
4700  * We don't recompute min_ts after sleeping, except in the
4701  * unlikely case that cur_ts went backwards. So we might end up
4702  * accepting a file a bit older than PGSTAT_STAT_INTERVAL. In
4703  * practice that shouldn't happen, though, as long as the sleep
4704  * time is less than PGSTAT_STAT_INTERVAL; and we don't want to
4705  * tell the collector that our cutoff time is less than what we'd
4706  * actually accept.
4707  */
4708  ref_ts = cur_ts;
4710  min_ts = TimestampTzPlusMilliseconds(ref_ts,
4712  else
4713  min_ts = TimestampTzPlusMilliseconds(ref_ts,
4715  }
4716 
4717  /*
4718  * If the file timestamp is actually newer than cur_ts, we must have
4719  * had a clock glitch (system time went backwards) or there is clock
4720  * skew between our processor and the stats collector's processor.
4721  * Accept the file, but send an inquiry message anyway to make
4722  * pgstat_recv_inquiry do a sanity check on the collector's time.
4723  */
4724  if (ok && file_ts > cur_ts)
4725  {
4726  /*
4727  * A small amount of clock skew between processors isn't terribly
4728  * surprising, but a large difference is worth logging. We
4729  * arbitrarily define "large" as 1000 msec.
4730  */
4731  if (file_ts >= TimestampTzPlusMilliseconds(cur_ts, 1000))
4732  {
4733  char *filetime;
4734  char *mytime;
4735 
4736  /* Copy because timestamptz_to_str returns a static buffer */
4737  filetime = pstrdup(timestamptz_to_str(file_ts));
4738  mytime = pstrdup(timestamptz_to_str(cur_ts));
4739  ereport(LOG,
4740  (errmsg("statistics collector's time %s is later than backend local time %s",
4741  filetime, mytime)));
4742  pfree(filetime);
4743  pfree(mytime);
4744  }
4745 
4746  pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
4747  break;
4748  }
4749 
4750  /* Normal acceptance case: file is not older than cutoff time */
4751  if (ok && file_ts >= min_ts)
4752  break;
4753 
4754  /* Not there or too old, so kick the collector and wait a bit */
4755  if ((count % PGSTAT_INQ_LOOP_COUNT) == 0)
4756  pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
4757 
4758  pg_usleep(PGSTAT_RETRY_DELAY * 1000L);
4759  }
4760 
4761  if (count >= PGSTAT_POLL_LOOP_COUNT)
4762  ereport(LOG,
4763  (errmsg("using stale statistics instead of current ones "
4764  "because stats collector is not responding")));
4765 
4766  /*
4767  * Autovacuum launcher wants stats about all databases, but a shallow read
4768  * is sufficient. Regular backends want a deep read for just the tables
4769  * they can see (MyDatabaseId + shared catalogs).
4770  */
4772  pgStatDBHash = pgstat_read_statsfiles(InvalidOid, false, false);
4773  else
4774  pgStatDBHash = pgstat_read_statsfiles(MyDatabaseId, false, true);
4775 }
4776 
4777 
4778 /* ----------
4779  * pgstat_setup_memcxt() -
4780  *
4781  * Create pgStatLocalContext, if not already done.
4782  * ----------
4783  */
4784 static void
4786 {
4787  if (!pgStatLocalContext)
4788  pgStatLocalContext = AllocSetContextCreate(TopMemoryContext,
4789  "Statistics snapshot",
4791 }
4792 
4793 /*
4794  * Stats should only be reported after pgstat_initialize() and before
4795  * pgstat_shutdown(). This check is put in a few central places to catch
4796  * violations of this rule more easily.
4797  */
4798 static void
4800 {
4801  Assert(pgstat_is_initialized && !pgstat_is_shutdown);
4802 }
4803 
4804 
4805 /* ----------
4806  * pgstat_clear_snapshot() -
4807  *
4808  * Discard any data collected in the current transaction. Any subsequent
4809  * request will cause new snapshots to be read.
4810  *
4811  * This is also invoked during transaction commit or abort to discard
4812  * the no-longer-wanted snapshot.
4813  * ----------
4814  */
4815 void
4817 {
4819 
4820  /* Release memory, if any was allocated */
4821  if (pgStatLocalContext)
4822  MemoryContextDelete(pgStatLocalContext);
4823 
4824  /* Reset variables */
4825  pgStatLocalContext = NULL;
4826  pgStatDBHash = NULL;
4827  replSlotStatHash = NULL;
4828 
4829  /*
4830  * Historically the backend_status.c facilities lived in this file, and
4831  * were reset with the same function. For now keep it that way, and
4832  * forward the reset request.
4833  */
4835 }
4836 
4837 
4838 /* ----------
4839  * pgstat_recv_inquiry() -
4840  *
4841  * Process stat inquiry requests.
4842  * ----------
4843  */
4844 static void
4846 {
4847  PgStat_StatDBEntry *dbentry;
4848 
4849  elog(DEBUG2, "received inquiry for database %u", msg->databaseid);
4850 
4851  /*
4852  * If there's already a write request for this DB, there's nothing to do.
4853  *
4854  * Note that if a request is found, we return early and skip the below
4855  * check for clock skew. This is okay, since the only way for a DB
4856  * request to be present in the list is that we have been here since the
4857  * last write round. It seems sufficient to check for clock skew once per
4858  * write round.
4859  */
4860  if (list_member_oid(pending_write_requests, msg->databaseid))
4861  return;
4862 
4863  /*
4864  * Check to see if we last wrote this database at a time >= the requested
4865  * cutoff time. If so, this is a stale request that was generated before
4866  * we updated the DB file, and we don't need to do so again.
4867  *
4868  * If the requestor's local clock time is older than stats_timestamp, we
4869  * should suspect a clock glitch, ie system time going backwards; though
4870  * the more likely explanation is just delayed message receipt. It is
4871  * worth expending a GetCurrentTimestamp call to be sure, since a large
4872  * retreat in the system clock reading could otherwise cause us to neglect
4873  * to update the stats file for a long time.
4874  */
4875  dbentry = pgstat_get_db_entry(msg->databaseid, false);
4876  if (dbentry == NULL)
4877  {
4878  /*
4879  * We have no data for this DB. Enter a write request anyway so that
4880  * the global stats will get updated. This is needed to prevent
4881  * backend_read_statsfile from waiting for data that we cannot supply,
4882  * in the case of a new DB that nobody has yet reported any stats for.
4883  * See the behavior of pgstat_read_db_statsfile_timestamp.
4884  */
4885  }
4886  else if (msg->clock_time < dbentry->stats_timestamp)
4887  {
4888  TimestampTz cur_ts = GetCurrentTimestamp();
4889 
4890  if (cur_ts < dbentry->stats_timestamp)
4891  {
4892  /*
4893  * Sure enough, time went backwards. Force a new stats file write
4894  * to get back in sync; but first, log a complaint.
4895  */
4896  char *writetime;
4897  char *mytime;
4898 
4899  /* Copy because timestamptz_to_str returns a static buffer */
4900  writetime = pstrdup(timestamptz_to_str(dbentry->stats_timestamp));
4901  mytime = pstrdup(timestamptz_to_str(cur_ts));
4902  ereport(LOG,
4903  (errmsg("stats_timestamp %s is later than collector's time %s for database %u",
4904  writetime, mytime, dbentry->databaseid)));
4905  pfree(writetime);
4906  pfree(mytime);
4907  }
4908  else
4909  {
4910  /*
4911  * Nope, it's just an old request. Assuming msg's clock_time is
4912  * >= its cutoff_time, it must be stale, so we can ignore it.
4913  */
4914  return;
4915  }
4916  }
4917  else if (msg->cutoff_time <= dbentry->stats_timestamp)
4918  {
4919  /* Stale request, ignore it */
4920  return;
4921  }
4922 
4923  /*
4924  * We need to write this DB, so create a request.
4925  */
4926  pending_write_requests = lappend_oid(pending_write_requests,
4927  msg->databaseid);
4928 }
4929 
4930 
4931 /* ----------
4932  * pgstat_recv_tabstat() -
4933  *
4934  * Count what the backend has done.
4935  * ----------
4936  */
4937 static void
4939 {
4940  PgStat_StatDBEntry *dbentry;
4941  PgStat_StatTabEntry *tabentry;
4942  int i;
4943  bool found;
4944 
4945  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
4946 
4947  /*
4948  * Update database-wide stats.
4949  */
4950  dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
4951  dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
4952  dbentry->n_block_read_time += msg->m_block_read_time;
4953  dbentry->n_block_write_time += msg->m_block_write_time;
4954 
4955  dbentry->total_session_time += msg->m_session_time;
4956  dbentry->total_active_time += msg->m_active_time;
4958 
4959  /*
4960  * Process all table entries in the message.
4961  */
4962  for (i = 0; i < msg->m_nentries; i++)
4963  {
4964  PgStat_TableEntry *tabmsg = &(msg->m_entry[i]);
4965 
4966  tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
4967  (void *) &(tabmsg->t_id),
4968  HASH_ENTER, &found);
4969 
4970  if (!found)
4971  {
4972  /*
4973  * If it's a new table entry, initialize counters to the values we
4974  * just got.
4975  */
4976  tabentry->numscans = tabmsg->t_counts.t_numscans;
4977  tabentry->tuples_returned = tabmsg->t_counts.t_tuples_returned;
4978  tabentry->tuples_fetched = tabmsg->t_counts.t_tuples_fetched;
4979  tabentry->tuples_inserted = tabmsg->t_counts.t_tuples_inserted;
4980  tabentry->tuples_updated = tabmsg->t_counts.t_tuples_updated;
4981  tabentry->tuples_deleted = tabmsg->t_counts.t_tuples_deleted;
4982  tabentry->tuples_hot_updated = tabmsg->t_counts.t_tuples_hot_updated;
4983  tabentry->n_live_tuples = tabmsg->t_counts.t_delta_live_tuples;
4984  tabentry->n_dead_tuples = tabmsg->t_counts.t_delta_dead_tuples;
4985  tabentry->changes_since_analyze = tabmsg->t_counts.t_changed_tuples;
4986  tabentry->inserts_since_vacuum = tabmsg->t_counts.t_tuples_inserted;
4987  tabentry->blocks_fetched = tabmsg->t_counts.t_blocks_fetched;
4988  tabentry->blocks_hit = tabmsg->t_counts.t_blocks_hit;
4989 
4990  tabentry->vacuum_timestamp = 0;
4991  tabentry->vacuum_count = 0;
4992  tabentry->autovac_vacuum_timestamp = 0;
4993  tabentry->autovac_vacuum_count = 0;
4994  tabentry->analyze_timestamp = 0;
4995  tabentry->analyze_count = 0;
4996  tabentry->autovac_analyze_timestamp = 0;
4997  tabentry->autovac_analyze_count = 0;
4998  }
4999  else
5000  {
5001  /*
5002  * Otherwise add the values to the existing entry.
5003  */
5004  tabentry->numscans += tabmsg->t_counts.t_numscans;
5005  tabentry->tuples_returned += tabmsg->t_counts.t_tuples_returned;
5006  tabentry->tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
5007  tabentry->tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
5008  tabentry->tuples_updated += tabmsg->t_counts.t_tuples_updated;
5009  tabentry->tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
5010  tabentry->tuples_hot_updated += tabmsg->t_counts.t_tuples_hot_updated;
5011  /* If table was truncated, first reset the live/dead counters */
5012  if (tabmsg->t_counts.t_truncated)
5013  {
5014  tabentry->n_live_tuples = 0;
5015  tabentry->n_dead_tuples = 0;
5016  tabentry->inserts_since_vacuum = 0;
5017  }
5018  tabentry->n_live_tuples += tabmsg->t_counts.t_delta_live_tuples;
5019  tabentry->n_dead_tuples += tabmsg->t_counts.t_delta_dead_tuples;
5020  tabentry->changes_since_analyze += tabmsg->t_counts.t_changed_tuples;
5021  tabentry->inserts_since_vacuum += tabmsg->t_counts.t_tuples_inserted;
5022  tabentry->blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
5023  tabentry->blocks_hit += tabmsg->t_counts.t_blocks_hit;
5024  }
5025 
5026  /* Clamp n_live_tuples in case of negative delta_live_tuples */
5027  tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0);
5028  /* Likewise for n_dead_tuples */
5029  tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0);
5030 
5031  /*
5032  * Add per-table stats to the per-database entry, too.
5033  */
5034  dbentry->n_tuples_returned += tabmsg->t_counts.t_tuples_returned;
5035  dbentry->n_tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
5036  dbentry->n_tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
5037  dbentry->n_tuples_updated += tabmsg->t_counts.t_tuples_updated;
5038  dbentry->n_tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
5039  dbentry->n_blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
5040  dbentry->n_blocks_hit += tabmsg->t_counts.t_blocks_hit;
5041  }
5042 }
5043 
5044 
5045 /* ----------
5046  * pgstat_recv_tabpurge() -
5047  *
5048  * Arrange for dead table removal.
5049  * ----------
5050  */
5051 static void
5053 {
5054  PgStat_StatDBEntry *dbentry;
5055  int i;
5056 
5057  dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5058 
5059  /*
5060  * No need to purge if we don't even know the database.
5061  */
5062  if (!dbentry || !dbentry->tables)
5063  return;
5064 
5065  /*
5066  * Process all table entries in the message.
5067  */
5068  for (i = 0; i < msg->m_nentries; i++)
5069  {
5070  /* Remove from hashtable if present; we don't care if it's not. */
5071  (void) hash_search(dbentry->tables,
5072  (void *) &(msg->m_tableid[i]),
5073  HASH_REMOVE, NULL);
5074  }
5075 }
5076 
5077 
5078 /* ----------
5079  * pgstat_recv_dropdb() -
5080  *
5081  * Arrange for dead database removal
5082  * ----------
5083  */
5084 static void
5086 {
5087  Oid dbid = msg->m_databaseid;
5088  PgStat_StatDBEntry *dbentry;
5089 
5090  /*
5091  * Lookup the database in the hashtable.
5092  */
5093  dbentry = pgstat_get_db_entry(dbid, false);
5094 
5095  /*
5096  * If found, remove it (along with the db statfile).
5097  */
5098  if (dbentry)
5099  {
5100  char statfile[MAXPGPATH];
5101 
5102  get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
5103 
5104  elog(DEBUG2, "removing stats file \"%s\"", statfile);
5105  unlink(statfile);
5106 
5107  if (dbentry->tables != NULL)
5108  hash_destroy(dbentry->tables);
5109  if (dbentry->functions != NULL)
5110  hash_destroy(dbentry->functions);
5111 
5112  if (hash_search(pgStatDBHash,
5113  (void *) &dbid,
5114  HASH_REMOVE, NULL) == NULL)
5115  ereport(ERROR,
5116  (errmsg("database hash table corrupted during cleanup --- abort")));
5117  }
5118 }
5119 
5120 
5121 /* ----------
5122  * pgstat_recv_resetcounter() -
5123  *
5124  * Reset the statistics for the specified database.
5125  * ----------
5126  */
5127 static void
5129 {
5130  PgStat_StatDBEntry *dbentry;
5131 
5132  /*
5133  * Lookup the database in the hashtable. Nothing to do if not there.
5134  */
5135  dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5136 
5137  if (!dbentry)
5138  return;
5139 
5140  /*
5141  * We simply throw away all the database's table entries by recreating a
5142  * new hash table for them.
5143  */
5144  if (dbentry->tables != NULL)
5145  hash_destroy(dbentry->tables);
5146  if (dbentry->functions != NULL)
5147  hash_destroy(dbentry->functions);
5148 
5149  dbentry->tables = NULL;
5150  dbentry->functions = NULL;
5151 
5152  /*
5153  * Reset database-level stats, too. This creates empty hash tables for
5154  * tables and functions.
5155  */
5156  reset_dbentry_counters(dbentry);
5157 }
5158 
5159 /* ----------
5160  * pgstat_recv_resetsharedcounter() -
5161  *
5162  * Reset some shared statistics of the cluster.
5163  * ----------
5164  */
5165 static void
5167 {
5168  if (msg->m_resettarget == RESET_BGWRITER)
5169  {
5170  /* Reset the global, bgwriter and checkpointer statistics for the cluster. */
5171  memset(&globalStats, 0, sizeof(globalStats));
5173  }
5174  else if (msg->m_resettarget == RESET_ARCHIVER)
5175  {
5176  /* Reset the archiver statistics for the cluster. */
5177  memset(&archiverStats, 0, sizeof(archiverStats));
5178  archiverStats.stat_reset_timestamp = GetCurrentTimestamp();
5179  }
5180  else if (msg->m_resettarget == RESET_WAL)
5181  {
5182  /* Reset the WAL statistics for the cluster. */
5183  memset(&walStats, 0, sizeof(walStats));
5185  }
5186 
5187  /*
5188  * Presumably the sender of this message validated the target, don't
5189  * complain here if it's not valid
5190  */
5191 }
5192 
5193 /* ----------
5194  * pgstat_recv_resetsinglecounter() -
5195  *
5196  * Reset a statistics for a single object, which may be of current
5197  * database or shared across all databases in the cluster.
5198  * ----------
5199  */
5200 static void
5202 {
5203  PgStat_StatDBEntry *dbentry;
5204 
5205  if (IsSharedRelation(msg->m_objectid))
5206  dbentry = pgstat_get_db_entry(InvalidOid, false);
5207  else
5208  dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5209 
5210  if (!dbentry)
5211  return;
5212 
5213  /* Set the reset timestamp for the whole database */
5215 
5216  /* Remove object if it exists, ignore it if not */
5217  if (msg->m_resettype == RESET_TABLE)
5218  (void) hash_search(dbentry->tables, (void *) &(msg->m_objectid),
5219  HASH_REMOVE, NULL);
5220  else if (msg->m_resettype == RESET_FUNCTION)
5221  (void) hash_search(dbentry->functions, (void *) &(msg->m_objectid),
5222  HASH_REMOVE, NULL);
5223 }
5224 
5225 /* ----------
5226  * pgstat_recv_resetslrucounter() -
5227  *
5228  * Reset some SLRU statistics of the cluster.
5229  * ----------
5230  */
5231 static void
5233 {
5234  int i;
5236 
5237  for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
5238  {
5239  /* reset entry with the given index, or all entries (index is -1) */
5240  if ((msg->m_index == -1) || (msg->m_index == i))
5241  {
5242  memset(&slruStats[i], 0, sizeof(slruStats[i]));
5243  slruStats[i].stat_reset_timestamp = ts;
5244  }
5245  }
5246 }
5247 
5248 /* ----------
5249  * pgstat_recv_resetreplslotcounter() -
5250  *
5251  * Reset some replication slot statistics of the cluster.
5252  * ----------
5253  */
5254 static void
5256  int len)
5257 {
5258  PgStat_StatReplSlotEntry *slotent;
5259  TimestampTz ts;
5260 
5261  /* Return if we don't have replication slot statistics */
5262  if (replSlotStatHash == NULL)
5263  return;
5264 
5265  ts = GetCurrentTimestamp();
5266  if (msg->clearall)
5267  {
5268  HASH_SEQ_STATUS sstat;
5269 
5270  hash_seq_init(&sstat, replSlotStatHash);
5271  while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&sstat)) != NULL)
5272  pgstat_reset_replslot(slotent, ts);
5273  }
5274  else
5275  {
5276  /* Get the slot statistics to reset */
5277  slotent = pgstat_get_replslot_entry(msg->m_slotname, false);
5278 
5279  /*
5280  * Nothing to do if the given slot entry is not found. This could
5281  * happen when the slot with the given name is removed and the
5282  * corresponding statistics entry is also removed before receiving the
5283  * reset message.
5284  */
5285  if (!slotent)
5286  return;
5287 
5288  /* Reset the stats for the requested replication slot */
5289  pgstat_reset_replslot(slotent, ts);
5290  }
5291 }
5292 
5293 
5294 /* ----------
5295  * pgstat_recv_autovac() -
5296  *
5297  * Process an autovacuum signaling message.
5298  * ----------
5299  */
5300 static void
5302 {
5303  PgStat_StatDBEntry *dbentry;
5304 
5305  /*
5306  * Store the last autovacuum time in the database's hashtable entry.
5307  */
5308  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5309 
5310  dbentry->last_autovac_time = msg->m_start_time;
5311 }
5312 
5313 /* ----------
5314  * pgstat_recv_vacuum() -
5315  *
5316  * Process a VACUUM message.
5317  * ----------
5318  */
5319 static void
5321 {
5322  PgStat_StatDBEntry *dbentry;
5323  PgStat_StatTabEntry *tabentry;
5324 
5325  /*
5326  * Store the data in the table's hashtable entry.
5327  */
5328  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5329 
5330  tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
5331 
5332  tabentry->n_live_tuples = msg->m_live_tuples;
5333  tabentry->n_dead_tuples = msg->m_dead_tuples;
5334 
5335  /*
5336  * It is quite possible that a non-aggressive VACUUM ended up skipping
5337  * various pages, however, we'll zero the insert counter here regardless.
5338  * It's currently used only to track when we need to perform an "insert"
5339  * autovacuum, which are mainly intended to freeze newly inserted tuples.
5340  * Zeroing this may just mean we'll not try to vacuum the table again
5341  * until enough tuples have been inserted to trigger another insert
5342  * autovacuum. An anti-wraparound autovacuum will catch any persistent
5343  * stragglers.
5344  */
5345  tabentry->inserts_since_vacuum = 0;
5346 
5347  if (msg->m_autovacuum)
5348  {
5349  tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime;
5350  tabentry->autovac_vacuum_count++;
5351  }
5352  else
5353  {
5354  tabentry->vacuum_timestamp = msg->m_vacuumtime;
5355  tabentry->vacuum_count++;
5356  }
5357 }
5358 
5359 /* ----------
5360  * pgstat_recv_analyze() -
5361  *
5362  * Process an ANALYZE message.
5363  * ----------
5364  */
5365 static void
5367 {
5368  PgStat_StatDBEntry *dbentry;
5369  PgStat_StatTabEntry *tabentry;
5370 
5371  /*
5372  * Store the data in the table's hashtable entry.
5373  */
5374  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5375 
5376  tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
5377 
5378  tabentry->n_live_tuples = msg->m_live_tuples;
5379  tabentry->n_dead_tuples = msg->m_dead_tuples;
5380 
5381  /*
5382  * If commanded, reset changes_since_analyze to zero. This forgets any
5383  * changes that were committed while the ANALYZE was in progress, but we
5384  * have no good way to estimate how many of those there were.
5385  */
5386  if (msg->m_resetcounter)
5387  tabentry->changes_since_analyze = 0;
5388 
5389  if (msg->m_autovacuum)
5390  {
5391  tabentry->autovac_analyze_timestamp = msg->m_analyzetime;
5392  tabentry->autovac_analyze_count++;
5393  }
5394  else
5395  {
5396  tabentry->analyze_timestamp = msg->m_analyzetime;
5397  tabentry->analyze_count++;
5398  }
5399 }
5400 
5401 
5402 /* ----------
5403  * pgstat_recv_archiver() -
5404  *
5405  * Process a ARCHIVER message.
5406  * ----------
5407  */
5408 static void
5410 {
5411  if (msg->m_failed)
5412  {
5413  /* Failed archival attempt */
5414  ++archiverStats.failed_count;
5415  memcpy(archiverStats.last_failed_wal, msg->m_xlog,
5416  sizeof(archiverStats.last_failed_wal));
5417  archiverStats.last_failed_timestamp = msg->m_timestamp;
5418  }
5419  else
5420  {
5421  /* Successful archival operation */
5422  ++archiverStats.archived_count;
5423  memcpy(archiverStats.last_archived_wal, msg->m_xlog,
5424  sizeof(archiverStats.last_archived_wal));
5425  archiverStats.last_archived_timestamp = msg->m_timestamp;
5426  }
5427 }
5428 
5429 /* ----------
5430  * pgstat_recv_bgwriter() -
5431  *
5432  * Process a BGWRITER message.
5433  * ----------
5434  */
5435 static void
5437 {
5438  globalStats.bgwriter.buf_written_clean += msg->m_buf_written_clean;
5439  globalStats.bgwriter.maxwritten_clean += msg->m_maxwritten_clean;
5440  globalStats.bgwriter.buf_alloc += msg->m_buf_alloc;
5441 }
5442 
5443 /* ----------
5444  * pgstat_recv_checkpointer() -
5445  *
5446  * Process a CHECKPOINTER message.
5447  * ----------
5448  */
5449 static void
5451 {
5459 }
5460 
5461 /* ----------
5462  * pgstat_recv_wal() -
5463  *
5464  * Process a WAL message.
5465  * ----------
5466  */
5467 static void
5469 {
5470  walStats.wal_records += msg->m_wal_records;
5471  walStats.wal_fpi += msg->m_wal_fpi;
5472  walStats.wal_bytes += msg->m_wal_bytes;
5473  walStats.wal_buffers_full += msg->m_wal_buffers_full;
5474  walStats.wal_write += msg->m_wal_write;
5475  walStats.wal_sync += msg->m_wal_sync;
5476  walStats.wal_write_time += msg->m_wal_write_time;
5477  walStats.wal_sync_time += msg->m_wal_sync_time;
5478 }
5479 
5480 /* ----------
5481  * pgstat_recv_slru() -
5482  *
5483  * Process a SLRU message.
5484  * ----------
5485  */
5486 static void
5488 {
5489  slruStats[msg->m_index].blocks_zeroed += msg->m_blocks_zeroed;
5490  slruStats[msg->m_index].blocks_hit += msg->m_blocks_hit;
5491  slruStats[msg->m_index].blocks_read += msg->m_blocks_read;
5492  slruStats[msg->m_index].blocks_written += msg->m_blocks_written;
5493  slruStats[msg->m_index].blocks_exists += msg->m_blocks_exists;
5494  slruStats[msg->m_index].flush += msg->m_flush;
5495  slruStats[msg->m_index].truncate += msg->m_truncate;
5496 }
5497 
5498 /* ----------
5499  * pgstat_recv_recoveryconflict() -
5500  *
5501  * Process a RECOVERYCONFLICT message.
5502  * ----------
5503  */
5504 static void
5506 {
5507  PgStat_StatDBEntry *dbentry;
5508 
5509  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5510 
5511  switch (msg->m_reason)
5512  {
5514 
5515  /*
5516  * Since we drop the information about the database as soon as it
5517  * replicates, there is no point in counting these conflicts.
5518  */
5519  break;
5521  dbentry->n_conflict_tablespace++;
5522  break;
5524  dbentry->n_conflict_lock++;
5525  break;
5527  dbentry->n_conflict_snapshot++;
5528  break;
5530  dbentry->n_conflict_bufferpin++;
5531  break;
5533  dbentry->n_conflict_startup_deadlock++;
5534  break;
5535  }
5536 }
5537 
5538 /* ----------
5539  * pgstat_recv_deadlock() -
5540  *
5541  * Process a DEADLOCK message.
5542  * ----------
5543  */
5544 static void
5546 {
5547  PgStat_StatDBEntry *dbentry;
5548 
5549  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5550 
5551  dbentry->n_deadlocks++;
5552 }
5553 
5554 /* ----------
5555  * pgstat_recv_checksum_failure() -
5556  *
5557  * Process a CHECKSUMFAILURE message.
5558  * ----------
5559  */
5560 static void
5562 {
5563  PgStat_StatDBEntry *dbentry;
5564 
5565  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5566 
5567  dbentry->n_checksum_failures += msg->m_failurecount;
5568  dbentry->last_checksum_failure = msg->m_failure_time;
5569 }
5570 
5571 /* ----------
5572  * pgstat_recv_replslot() -
5573  *
5574  * Process a REPLSLOT message.
5575  * ----------
5576  */
5577 static void
5579 {
5580  if (msg->m_drop)
5581  {
5582  Assert(!msg->m_create);
5583 
5584  /* Remove the replication slot statistics with the given name */
5585  if (replSlotStatHash != NULL)
5586  (void) hash_search(replSlotStatHash,
5587  (void *) &(msg->m_slotname),
5588  HASH_REMOVE,
5589  NULL);
5590  }
5591  else
5592  {
5593  PgStat_StatReplSlotEntry *slotent;
5594 
5595  slotent = pgstat_get_replslot_entry(msg->m_slotname, true);
5596  Assert(slotent);
5597 
5598  if (msg->m_create)
5599  {
5600  /*
5601  * If the message for dropping the slot with the same name gets
5602  * lost, slotent has stats for the old slot. So we initialize all
5603  * counters at slot creation.
5604  */
5605  pgstat_reset_replslot(slotent, 0);
5606  }
5607  else
5608  {
5609  /* Update the replication slot statistics */
5610  slotent->spill_txns += msg->m_spill_txns;
5611  slotent->spill_count += msg->m_spill_count;
5612  slotent->spill_bytes += msg->m_spill_bytes;
5613  slotent->stream_txns += msg->m_stream_txns;
5614  slotent->stream_count += msg->m_stream_count;
5615  slotent->stream_bytes += msg->m_stream_bytes;
5616  slotent->total_txns += msg->m_total_txns;
5617  slotent->total_bytes += msg->m_total_bytes;
5618  }
5619  }
5620 }
5621 
5622 /* ----------
5623  * pgstat_recv_connect() -
5624  *
5625  * Process a CONNECT message.
5626  * ----------
5627  */
5628 static void
5630 {
5631  PgStat_StatDBEntry *dbentry;
5632 
5633  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5634  dbentry->n_sessions++;
5635 }
5636 
5637 /* ----------
5638  * pgstat_recv_disconnect() -
5639  *
5640  * Process a DISCONNECT message.
5641  * ----------
5642  */
5643 static void
5645 {
5646  PgStat_StatDBEntry *dbentry;
5647 
5648  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5649 
5650  switch (msg->m_cause)
5651  {
5652  case DISCONNECT_NOT_YET:
5653  case DISCONNECT_NORMAL:
5654  /* we don't collect these */
5655  break;
5656  case DISCONNECT_CLIENT_EOF:
5657  dbentry->n_sessions_abandoned++;
5658  break;
5659  case DISCONNECT_FATAL:
5660  dbentry->n_sessions_fatal++;
5661  break;
5662  case DISCONNECT_KILLED:
5663  dbentry->n_sessions_killed++;
5664  break;
5665  }
5666 }
5667 
5668 /* ----------
5669  * pgstat_recv_tempfile() -
5670  *
5671  * Process a TEMPFILE message.
5672  * ----------
5673  */
5674 static void
5676 {
5677  PgStat_StatDBEntry *dbentry;
5678 
5679  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5680 
5681  dbentry->n_temp_bytes += msg->m_filesize;
5682  dbentry->n_temp_files += 1;
5683 }
5684 
5685 /* ----------
5686  * pgstat_recv_funcstat() -
5687  *
5688  * Count what the backend has done.
5689  * ----------
5690  */
5691 static void
5693 {
5694  PgStat_FunctionEntry *funcmsg = &(msg->m_entry[0]);
5695  PgStat_StatDBEntry *dbentry;
5696  PgStat_StatFuncEntry *funcentry;
5697  int i;
5698  bool found;
5699 
5700  dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5701 
5702  /*
5703  * Process all function entries in the message.
5704  */
5705  for (i = 0; i < msg->m_nentries; i++, funcmsg++)
5706  {
5707  funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
5708  (void *) &(funcmsg->f_id),
5709  HASH_ENTER, &found);
5710 
5711  if (!found)
5712  {
5713  /*
5714  * If it's a new function entry, initialize counters to the values
5715  * we just got.
5716  */
5717  funcentry->f_numcalls = funcmsg->f_numcalls;
5718  funcentry->f_total_time = funcmsg->f_total_time;
5719  funcentry->f_self_time = funcmsg->f_self_time;
5720  }
5721  else
5722  {
5723  /*
5724  * Otherwise add the values to the existing entry.
5725  */
5726  funcentry->f_numcalls += funcmsg->f_numcalls;
5727  funcentry->f_total_time += funcmsg->f_total_time;
5728  funcentry->f_self_time += funcmsg->f_self_time;
5729  }
5730  }
5731 }
5732 
5733 /* ----------
5734  * pgstat_recv_funcpurge() -
5735  *
5736  * Arrange for dead function removal.
5737  * ----------
5738  */
5739 static void
5741 {
5742  PgStat_StatDBEntry *dbentry;
5743  int i;
5744 
5745  dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5746 
5747  /*
5748  * No need to purge if we don't even know the database.
5749  */
5750  if (!dbentry || !dbentry->functions)
5751  return;
5752 
5753  /*
5754  * Process all function entries in the message.
5755  */
5756  for (i = 0; i < msg->m_nentries; i++)
5757  {
5758  /* Remove from hashtable if present; we don't care if it's not. */
5759  (void) hash_search(dbentry->functions,
5760  (void *) &(msg->m_functionid[i]),
5761  HASH_REMOVE, NULL);
5762  }
5763 }
5764 
5765 /* ----------
5766  * pgstat_write_statsfile_needed() -
5767  *
5768  * Do we need to write out any stats files?
5769  * ----------
5770  */
5771 static bool
5773 {
5774  if (pending_write_requests != NIL)
5775  return true;
5776 
5777  /* Everything was written recently */
5778  return false;
5779 }
5780 
5781 /* ----------
5782  * pgstat_db_requested() -
5783  *
5784  * Checks whether stats for a particular DB need to be written to a file.
5785  * ----------
5786  */
5787 static bool
5789 {
5790  /*
5791  * If any requests are outstanding at all, we should write the stats for
5792  * shared catalogs (the "database" with OID 0). This ensures that
5793  * backends will see up-to-date stats for shared catalogs, even though
5794  * they send inquiry messages mentioning only their own DB.
5795  */
5796  if (databaseid == InvalidOid && pending_write_requests != NIL)
5797  return true;
5798 
5799  /* Search to see if there's an open request to write this database. */
5800  if (list_member_oid(pending_write_requests, databaseid))
5801  return true;
5802 
5803  return false;
5804 }
5805 
5806 /* ----------
5807  * pgstat_replslot_entry
5808  *
5809  * Return the entry of replication slot stats with the given name. Return
5810  * NULL if not found and the caller didn't request to create it.
5811  *
5812  * create tells whether to create the new slot entry if it is not found.
5813  * ----------
5814  */
5815 static PgStat_StatReplSlotEntry *
5817 {
5818  PgStat_StatReplSlotEntry *slotent;
5819  bool found;
5820 
5821  if (replSlotStatHash == NULL)
5822  {
5823  HASHCTL hash_ctl;
5824 
5825  /*
5826  * Quick return NULL if the hash table is empty and the caller didn't
5827  * request to create the entry.
5828  */
5829  if (!create)
5830  return NULL;
5831 
5832  hash_ctl.keysize = sizeof(NameData);
5833  hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
5834  replSlotStatHash = hash_create("Replication slots hash",
5836  &hash_ctl,
5837  HASH_ELEM | HASH_BLOBS);
5838  }
5839 
5840  slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
5841  (void *) &name,
5842  create ? HASH_ENTER : HASH_FIND,
5843  &found);
5844 
5845  if (!slotent)
5846  {
5847  /* not found */
5848  Assert(!create && !found);
5849  return NULL;
5850  }
5851 
5852  /* initialize the entry */
5853  if (create && !found)
5854  {
5855  namestrcpy(&(slotent->slotname), NameStr(name));
5856  pgstat_reset_replslot(slotent, 0);
5857  }
5858 
5859  return slotent;
5860 }
5861 
5862 /* ----------
5863  * pgstat_reset_replslot
5864  *
5865  * Reset the given replication slot stats.
5866  * ----------
5867  */
5868 static void
5870 {
5871  /* reset only counters. Don't clear slot name */
5872  slotent->spill_txns = 0;
5873  slotent->