PostgreSQL Source Code  git master
latch.c File Reference
#include "postgres.h"
#include <fcntl.h>
#include <limits.h>
#include <signal.h>
#include <unistd.h>
#include "miscadmin.h"
#include "pgstat.h"
#include "port/atomics.h"
#include "portability/instr_time.h"
#include "postmaster/postmaster.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
Include dependency graph for latch.c:

Go to the source code of this file.

Data Structures

struct  WaitEventSet
 

Functions

static void sendSelfPipeByte (void)
 
static void drainSelfPipe (void)
 
static int WaitEventSetWaitBlock (WaitEventSet *set, int cur_timeout, WaitEvent *occurred_events, int nevents)
 
void InitializeLatchSupport (void)
 
void InitLatch (Latch *latch)
 
void InitSharedLatch (Latch *latch)
 
void OwnLatch (Latch *latch)
 
void DisownLatch (Latch *latch)
 
int WaitLatch (Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
 
int WaitLatchOrSocket (Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
 
void SetLatch (Latch *latch)
 
void ResetLatch (Latch *latch)
 
WaitEventSetCreateWaitEventSet (MemoryContext context, int nevents)
 
void FreeWaitEventSet (WaitEventSet *set)
 
int AddWaitEventToSet (WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
 
void ModifyWaitEvent (WaitEventSet *set, int pos, uint32 events, Latch *latch)
 
int WaitEventSetWait (WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
 
void latch_sigusr1_handler (void)
 

Variables

static volatile sig_atomic_t waiting = false
 
static int selfpipe_readfd = -1
 
static int selfpipe_writefd = -1
 
static int selfpipe_owner_pid = 0
 

Function Documentation

◆ AddWaitEventToSet()

int AddWaitEventToSet ( WaitEventSet set,
uint32  events,
pgsocket  fd,
Latch latch,
void *  user_data 
)

Definition at line 760 of file latch.c.

References Assert, elog, ERROR, WaitEventSet::events, fd(), WaitEventSet::latch, MyProcPid, Latch::owner_pid, PGINVALID_SOCKET, postmaster_alive_fds, POSTMASTER_FD_WATCH, selfpipe_readfd, WaitEvent::user_data, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_POSTMASTER_DEATH, and WL_SOCKET_MASK.

Referenced by ConditionVariablePrepareToSleep(), pq_init(), SysLoggerMain(), and WaitLatchOrSocket().

762 {
763  WaitEvent *event;
764 
765  /* not enough space */
766  Assert(set->nevents < set->nevents_space);
767 
768  if (events == WL_EXIT_ON_PM_DEATH)
769  {
770  events = WL_POSTMASTER_DEATH;
771  set->exit_on_postmaster_death = true;
772  }
773 
774  if (latch)
775  {
776  if (latch->owner_pid != MyProcPid)
777  elog(ERROR, "cannot wait on a latch owned by another process");
778  if (set->latch)
779  elog(ERROR, "cannot wait on more than one latch");
780  if ((events & WL_LATCH_SET) != WL_LATCH_SET)
781  elog(ERROR, "latch events only support being set");
782  }
783  else
784  {
785  if (events & WL_LATCH_SET)
786  elog(ERROR, "cannot wait on latch without a specified latch");
787  }
788 
789  /* waiting for socket readiness without a socket indicates a bug */
790  if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
791  elog(ERROR, "cannot wait on socket event without a socket");
792 
793  event = &set->events[set->nevents];
794  event->pos = set->nevents++;
795  event->fd = fd;
796  event->events = events;
797  event->user_data = user_data;
798 #ifdef WIN32
799  event->reset = false;
800 #endif
801 
802  if (events == WL_LATCH_SET)
803  {
804  set->latch = latch;
805  set->latch_pos = event->pos;
806 #ifndef WIN32
807  event->fd = selfpipe_readfd;
808 #endif
809  }
810  else if (events == WL_POSTMASTER_DEATH)
811  {
812 #ifndef WIN32
814 #endif
815  }
816 
817  /* perform wait primitive specific initialization, if needed */
818 #if defined(WAIT_USE_EPOLL)
819  WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
820 #elif defined(WAIT_USE_KQUEUE)
821  WaitEventAdjustKqueue(set, event, 0);
822 #elif defined(WAIT_USE_POLL)
823  WaitEventAdjustPoll(set, event);
824 #elif defined(WAIT_USE_WIN32)
825  WaitEventAdjustWin32(set, event);
826 #endif
827 
828  return event->pos;
829 }
int MyProcPid
Definition: globals.c:40
#define WL_SOCKET_MASK
Definition: latch.h:137
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define ERROR
Definition: elog.h:43
static int selfpipe_readfd
Definition: latch.c:137
int nevents
Definition: latch.c:84
int postmaster_alive_fds[2]
Definition: postmaster.c:557
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
#define PGINVALID_SOCKET
Definition: port.h:33
#define Assert(condition)
Definition: c.h:738
int nevents_space
Definition: latch.c:85
int owner_pid
Definition: latch.h:114
#define elog(elevel,...)
Definition: elog.h:214
Latch * latch
Definition: latch.c:99
#define WL_LATCH_SET
Definition: latch.h:124
#define POSTMASTER_FD_WATCH
Definition: postmaster.h:42
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ CreateWaitEventSet()

WaitEventSet* CreateWaitEventSet ( MemoryContext  context,
int  nevents 
)

Definition at line 563 of file latch.c.

References AcquireExternalFD(), close, elog, ERROR, MAXALIGN, MemoryContextAllocZero(), WaitEventSet::nevents, pgwin32_signal_event, ReleaseExternalFD(), and StaticAssertStmt.

Referenced by ConditionVariablePrepareToSleep(), pq_init(), SysLoggerMain(), and WaitLatchOrSocket().

564 {
565  WaitEventSet *set;
566  char *data;
567  Size sz = 0;
568 
569  /*
570  * Use MAXALIGN size/alignment to guarantee that later uses of memory are
571  * aligned correctly. E.g. epoll_event might need 8 byte alignment on some
572  * platforms, but earlier allocations like WaitEventSet and WaitEvent
573  * might not sized to guarantee that when purely using sizeof().
574  */
575  sz += MAXALIGN(sizeof(WaitEventSet));
576  sz += MAXALIGN(sizeof(WaitEvent) * nevents);
577 
578 #if defined(WAIT_USE_EPOLL)
579  sz += MAXALIGN(sizeof(struct epoll_event) * nevents);
580 #elif defined(WAIT_USE_KQUEUE)
581  sz += MAXALIGN(sizeof(struct kevent) * nevents);
582 #elif defined(WAIT_USE_POLL)
583  sz += MAXALIGN(sizeof(struct pollfd) * nevents);
584 #elif defined(WAIT_USE_WIN32)
585  /* need space for the pgwin32_signal_event */
586  sz += MAXALIGN(sizeof(HANDLE) * (nevents + 1));
587 #endif
588 
589  data = (char *) MemoryContextAllocZero(context, sz);
590 
591  set = (WaitEventSet *) data;
592  data += MAXALIGN(sizeof(WaitEventSet));
593 
594  set->events = (WaitEvent *) data;
595  data += MAXALIGN(sizeof(WaitEvent) * nevents);
596 
597 #if defined(WAIT_USE_EPOLL)
598  set->epoll_ret_events = (struct epoll_event *) data;
599  data += MAXALIGN(sizeof(struct epoll_event) * nevents);
600 #elif defined(WAIT_USE_KQUEUE)
601  set->kqueue_ret_events = (struct kevent *) data;
602  data += MAXALIGN(sizeof(struct kevent) * nevents);
603 #elif defined(WAIT_USE_POLL)
604  set->pollfds = (struct pollfd *) data;
605  data += MAXALIGN(sizeof(struct pollfd) * nevents);
606 #elif defined(WAIT_USE_WIN32)
607  set->handles = (HANDLE) data;
608  data += MAXALIGN(sizeof(HANDLE) * nevents);
609 #endif
610 
611  set->latch = NULL;
612  set->nevents_space = nevents;
613  set->exit_on_postmaster_death = false;
614 
615 #if defined(WAIT_USE_EPOLL)
616  if (!AcquireExternalFD())
617  {
618  /* treat this as though epoll_create1 itself returned EMFILE */
619  elog(ERROR, "epoll_create1 failed: %m");
620  }
621 #ifdef EPOLL_CLOEXEC
622  set->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
623  if (set->epoll_fd < 0)
624  {
626  elog(ERROR, "epoll_create1 failed: %m");
627  }
628 #else
629  /* cope with ancient glibc lacking epoll_create1 (e.g., RHEL5) */
630  set->epoll_fd = epoll_create(nevents);
631  if (set->epoll_fd < 0)
632  {
634  elog(ERROR, "epoll_create failed: %m");
635  }
636  if (fcntl(set->epoll_fd, F_SETFD, FD_CLOEXEC) == -1)
637  {
638  int save_errno = errno;
639 
640  close(set->epoll_fd);
642  errno = save_errno;
643  elog(ERROR, "fcntl(F_SETFD) failed on epoll descriptor: %m");
644  }
645 #endif /* EPOLL_CLOEXEC */
646 #elif defined(WAIT_USE_KQUEUE)
647  if (!AcquireExternalFD())
648  {
649  /* treat this as though kqueue itself returned EMFILE */
650  elog(ERROR, "kqueue failed: %m");
651  }
652  set->kqueue_fd = kqueue();
653  if (set->kqueue_fd < 0)
654  {
656  elog(ERROR, "kqueue failed: %m");
657  }
658  if (fcntl(set->kqueue_fd, F_SETFD, FD_CLOEXEC) == -1)
659  {
660  int save_errno = errno;
661 
662  close(set->kqueue_fd);
664  errno = save_errno;
665  elog(ERROR, "fcntl(F_SETFD) failed on kqueue descriptor: %m");
666  }
667  set->report_postmaster_not_running = false;
668 #elif defined(WAIT_USE_WIN32)
669 
670  /*
671  * To handle signals while waiting, we need to add a win32 specific event.
672  * We accounted for the additional event at the top of this routine. See
673  * port/win32/signal.c for more details.
674  *
675  * Note: pgwin32_signal_event should be first to ensure that it will be
676  * reported when multiple events are set. We want to guarantee that
677  * pending signals are serviced.
678  */
679  set->handles[0] = pgwin32_signal_event;
680  StaticAssertStmt(WSA_INVALID_EVENT == NULL, "");
681 #endif
682 
683  return set;
684 }
#define StaticAssertStmt(condition, errmessage)
Definition: c.h:852
HANDLE pgwin32_signal_event
Definition: signal.c:27
#define ERROR
Definition: elog.h:43
bool AcquireExternalFD(void)
Definition: fd.c:1045
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
size_t Size
Definition: c.h:466
#define MAXALIGN(LEN)
Definition: c.h:691
void ReleaseExternalFD(void)
Definition: fd.c:1098
#define elog(elevel,...)
Definition: elog.h:214
#define close(a)
Definition: win32.h:12

◆ DisownLatch()

void DisownLatch ( Latch latch)

Definition at line 337 of file latch.c.

References Assert, Latch::is_shared, MyProcPid, and Latch::owner_pid.

Referenced by AuxiliaryProcKill(), ProcKill(), and StartupXLOG().

338 {
339  Assert(latch->is_shared);
340  Assert(latch->owner_pid == MyProcPid);
341 
342  latch->owner_pid = 0;
343 }
int MyProcPid
Definition: globals.c:40
bool is_shared
Definition: latch.h:113
#define Assert(condition)
Definition: c.h:738
int owner_pid
Definition: latch.h:114

◆ drainSelfPipe()

static void drainSelfPipe ( void  )
static

Definition at line 1915 of file latch.c.

References buf, EAGAIN, EINTR, elog, ERROR, EWOULDBLOCK, read, selfpipe_readfd, and waiting.

Referenced by WaitEventSetWait().

1916 {
1917  /*
1918  * There shouldn't normally be more than one byte in the pipe, or maybe a
1919  * few bytes if multiple processes run SetLatch at the same instant.
1920  */
1921  char buf[16];
1922  int rc;
1923 
1924  for (;;)
1925  {
1926  rc = read(selfpipe_readfd, buf, sizeof(buf));
1927  if (rc < 0)
1928  {
1929  if (errno == EAGAIN || errno == EWOULDBLOCK)
1930  break; /* the pipe is empty */
1931  else if (errno == EINTR)
1932  continue; /* retry */
1933  else
1934  {
1935  waiting = false;
1936  elog(ERROR, "read() on self-pipe failed: %m");
1937  }
1938  }
1939  else if (rc == 0)
1940  {
1941  waiting = false;
1942  elog(ERROR, "unexpected EOF on self-pipe");
1943  }
1944  else if (rc < sizeof(buf))
1945  {
1946  /* we successfully drained the pipe; no need to read() again */
1947  break;
1948  }
1949  /* else buffer wasn't big enough, so read again */
1950  }
1951 }
#define EAGAIN
Definition: win32_port.h:321
#define ERROR
Definition: elog.h:43
static int selfpipe_readfd
Definition: latch.c:137
static char * buf
Definition: pg_test_fsync.c:67
#define elog(elevel,...)
Definition: elog.h:214
#define EWOULDBLOCK
Definition: win32_port.h:329
#define EINTR
Definition: win32_port.h:323
static volatile sig_atomic_t waiting
Definition: latch.c:134
#define read(a, b, c)
Definition: win32.h:13

◆ FreeWaitEventSet()

void FreeWaitEventSet ( WaitEventSet set)

Definition at line 696 of file latch.c.

References close, WaitEvent::events, WaitEvent::fd, pfree(), WaitEvent::pos, ReleaseExternalFD(), WL_LATCH_SET, and WL_POSTMASTER_DEATH.

Referenced by WaitLatchOrSocket().

697 {
698 #if defined(WAIT_USE_EPOLL)
699  close(set->epoll_fd);
701 #elif defined(WAIT_USE_KQUEUE)
702  close(set->kqueue_fd);
704 #elif defined(WAIT_USE_WIN32)
705  WaitEvent *cur_event;
706 
707  for (cur_event = set->events;
708  cur_event < (set->events + set->nevents);
709  cur_event++)
710  {
711  if (cur_event->events & WL_LATCH_SET)
712  {
713  /* uses the latch's HANDLE */
714  }
715  else if (cur_event->events & WL_POSTMASTER_DEATH)
716  {
717  /* uses PostmasterHandle */
718  }
719  else
720  {
721  /* Clean up the event object we created for the socket */
722  WSAEventSelect(cur_event->fd, NULL, 0);
723  WSACloseEvent(set->handles[cur_event->pos + 1]);
724  }
725  }
726 #endif
727 
728  pfree(set);
729 }
pgsocket fd
Definition: latch.h:145
int pos
Definition: latch.h:143
void pfree(void *pointer)
Definition: mcxt.c:1056
uint32 events
Definition: latch.h:144
int nevents
Definition: latch.c:84
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
WaitEvent * events
Definition: latch.c:91
void ReleaseExternalFD(void)
Definition: fd.c:1098
#define close(a)
Definition: win32.h:12
#define WL_LATCH_SET
Definition: latch.h:124

◆ InitializeLatchSupport()

void InitializeLatchSupport ( void  )

Definition at line 168 of file latch.c.

References Assert, close, elog, FATAL, IsUnderPostmaster, MyProcPid, ReleaseExternalFD(), ReserveExternalFD(), selfpipe_owner_pid, selfpipe_readfd, and selfpipe_writefd.

Referenced by InitPostmasterChild(), and InitStandaloneProcess().

169 {
170 #ifndef WIN32
171  int pipefd[2];
172 
173  if (IsUnderPostmaster)
174  {
175  /*
176  * We might have inherited connections to a self-pipe created by the
177  * postmaster. It's critical that child processes create their own
178  * self-pipes, of course, and we really want them to close the
179  * inherited FDs for safety's sake.
180  */
181  if (selfpipe_owner_pid != 0)
182  {
183  /* Assert we go through here but once in a child process */
185  /* Release postmaster's pipe FDs; ignore any error */
186  (void) close(selfpipe_readfd);
187  (void) close(selfpipe_writefd);
188  /* Clean up, just for safety's sake; we'll set these below */
190  selfpipe_owner_pid = 0;
191  /* Keep fd.c's accounting straight */
194  }
195  else
196  {
197  /*
198  * Postmaster didn't create a self-pipe ... or else we're in an
199  * EXEC_BACKEND build, in which case it doesn't matter since the
200  * postmaster's pipe FDs were closed by the action of FD_CLOEXEC.
201  * fd.c won't have state to clean up, either.
202  */
203  Assert(selfpipe_readfd == -1);
204  }
205  }
206  else
207  {
208  /* In postmaster or standalone backend, assert we do this but once */
209  Assert(selfpipe_readfd == -1);
211  }
212 
213  /*
214  * Set up the self-pipe that allows a signal handler to wake up the
215  * poll()/epoll_wait() in WaitLatch. Make the write-end non-blocking, so
216  * that SetLatch won't block if the event has already been set many times
217  * filling the kernel buffer. Make the read-end non-blocking too, so that
218  * we can easily clear the pipe by reading until EAGAIN or EWOULDBLOCK.
219  * Also, make both FDs close-on-exec, since we surely do not want any
220  * child processes messing with them.
221  */
222  if (pipe(pipefd) < 0)
223  elog(FATAL, "pipe() failed: %m");
224  if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) == -1)
225  elog(FATAL, "fcntl(F_SETFL) failed on read-end of self-pipe: %m");
226  if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) == -1)
227  elog(FATAL, "fcntl(F_SETFL) failed on write-end of self-pipe: %m");
228  if (fcntl(pipefd[0], F_SETFD, FD_CLOEXEC) == -1)
229  elog(FATAL, "fcntl(F_SETFD) failed on read-end of self-pipe: %m");
230  if (fcntl(pipefd[1], F_SETFD, FD_CLOEXEC) == -1)
231  elog(FATAL, "fcntl(F_SETFD) failed on write-end of self-pipe: %m");
232 
233  selfpipe_readfd = pipefd[0];
234  selfpipe_writefd = pipefd[1];
236 
237  /* Tell fd.c about these two long-lived FDs */
240 #else
241  /* currently, nothing to do here for Windows */
242 #endif
243 }
int MyProcPid
Definition: globals.c:40
static int selfpipe_writefd
Definition: latch.c:138
#define FATAL
Definition: elog.h:52
static int selfpipe_readfd
Definition: latch.c:137
void ReserveExternalFD(void)
Definition: fd.c:1080
bool IsUnderPostmaster
Definition: globals.c:109
#define Assert(condition)
Definition: c.h:738
void ReleaseExternalFD(void)
Definition: fd.c:1098
#define elog(elevel,...)
Definition: elog.h:214
#define close(a)
Definition: win32.h:12
static int selfpipe_owner_pid
Definition: latch.c:141

◆ InitLatch()

void InitLatch ( Latch latch)

Definition at line 249 of file latch.c.

References Assert, elog, ERROR, Latch::is_set, Latch::is_shared, MyProcPid, Latch::owner_pid, selfpipe_owner_pid, and selfpipe_readfd.

Referenced by InitPostmasterChild(), and InitStandaloneProcess().

250 {
251  latch->is_set = false;
252  latch->owner_pid = MyProcPid;
253  latch->is_shared = false;
254 
255 #ifndef WIN32
256  /* Assert InitializeLatchSupport has been called in this process */
258 #else
259  latch->event = CreateEvent(NULL, TRUE, FALSE, NULL);
260  if (latch->event == NULL)
261  elog(ERROR, "CreateEvent failed: error code %lu", GetLastError());
262 #endif /* WIN32 */
263 }
int MyProcPid
Definition: globals.c:40
bool is_shared
Definition: latch.h:113
#define ERROR
Definition: elog.h:43
static int selfpipe_readfd
Definition: latch.c:137
#define Assert(condition)
Definition: c.h:738
int owner_pid
Definition: latch.h:114
sig_atomic_t is_set
Definition: latch.h:112
#define elog(elevel,...)
Definition: elog.h:214
static int selfpipe_owner_pid
Definition: latch.c:141

◆ InitSharedLatch()

void InitSharedLatch ( Latch latch)

Definition at line 281 of file latch.c.

References elog, ERROR, Latch::is_set, Latch::is_shared, and Latch::owner_pid.

Referenced by InitProcGlobal(), and XLOGShmemInit().

282 {
283 #ifdef WIN32
284  SECURITY_ATTRIBUTES sa;
285 
286  /*
287  * Set up security attributes to specify that the events are inherited.
288  */
289  ZeroMemory(&sa, sizeof(sa));
290  sa.nLength = sizeof(sa);
291  sa.bInheritHandle = TRUE;
292 
293  latch->event = CreateEvent(&sa, TRUE, FALSE, NULL);
294  if (latch->event == NULL)
295  elog(ERROR, "CreateEvent failed: error code %lu", GetLastError());
296 #endif
297 
298  latch->is_set = false;
299  latch->owner_pid = 0;
300  latch->is_shared = true;
301 }
bool is_shared
Definition: latch.h:113
#define ERROR
Definition: elog.h:43
int owner_pid
Definition: latch.h:114
sig_atomic_t is_set
Definition: latch.h:112
#define elog(elevel,...)
Definition: elog.h:214

◆ latch_sigusr1_handler()

void latch_sigusr1_handler ( void  )

Definition at line 1866 of file latch.c.

References sendSelfPipeByte(), and waiting.

Referenced by bgworker_sigusr1_handler(), and procsignal_sigusr1_handler().

1867 {
1868  if (waiting)
1869  sendSelfPipeByte();
1870 }
static void sendSelfPipeByte(void)
Definition: latch.c:1876
static volatile sig_atomic_t waiting
Definition: latch.c:134

◆ ModifyWaitEvent()

void ModifyWaitEvent ( WaitEventSet set,
int  pos,
uint32  events,
Latch latch 
)

Definition at line 838 of file latch.c.

References generate_unaccent_rules::action, Assert, elog, ereport, errcode_for_socket_access(), errmsg(), ERROR, WaitEventSet::events, WaitEvent::events, WaitEvent::fd, WaitEventSet::latch, WaitEventSet::nevents, PGINVALID_SOCKET, PostmasterIsAlive, PostmasterPid, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_CONNECTED, WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by secure_read(), secure_write(), SwitchBackToLocalLatch(), and SwitchToSharedLatch().

839 {
840  WaitEvent *event;
841 #if defined(WAIT_USE_KQUEUE)
842  int old_events;
843 #endif
844 
845  Assert(pos < set->nevents);
846 
847  event = &set->events[pos];
848 #if defined(WAIT_USE_KQUEUE)
849  old_events = event->events;
850 #endif
851 
852  /*
853  * If neither the event mask nor the associated latch changes, return
854  * early. That's an important optimization for some sockets, where
855  * ModifyWaitEvent is frequently used to switch from waiting for reads to
856  * waiting on writes.
857  */
858  if (events == event->events &&
859  (!(event->events & WL_LATCH_SET) || set->latch == latch))
860  return;
861 
862  if (event->events & WL_LATCH_SET &&
863  events != event->events)
864  {
865  /* we could allow to disable latch events for a while */
866  elog(ERROR, "cannot modify latch event");
867  }
868 
869  if (event->events & WL_POSTMASTER_DEATH)
870  {
871  elog(ERROR, "cannot modify postmaster death event");
872  }
873 
874  /* FIXME: validate event mask */
875  event->events = events;
876 
877  if (events == WL_LATCH_SET)
878  {
879  set->latch = latch;
880  }
881 
882 #if defined(WAIT_USE_EPOLL)
883  WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
884 #elif defined(WAIT_USE_KQUEUE)
885  WaitEventAdjustKqueue(set, event, old_events);
886 #elif defined(WAIT_USE_POLL)
887  WaitEventAdjustPoll(set, event);
888 #elif defined(WAIT_USE_WIN32)
889  WaitEventAdjustWin32(set, event);
890 #endif
891 }
#define ERROR
Definition: elog.h:43
uint32 events
Definition: latch.h:144
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
#define Assert(condition)
Definition: c.h:738
#define elog(elevel,...)
Definition: elog.h:214
Latch * latch
Definition: latch.c:99
#define WL_LATCH_SET
Definition: latch.h:124

◆ OwnLatch()

void OwnLatch ( Latch latch)

Definition at line 317 of file latch.c.

References Assert, elog, ERROR, Latch::is_shared, MyProcPid, Latch::owner_pid, selfpipe_owner_pid, and selfpipe_readfd.

Referenced by InitAuxiliaryProcess(), InitProcess(), and StartupXLOG().

318 {
319  /* Sanity checks */
320  Assert(latch->is_shared);
321 
322 #ifndef WIN32
323  /* Assert InitializeLatchSupport has been called in this process */
325 #endif
326 
327  if (latch->owner_pid != 0)
328  elog(ERROR, "latch already owned");
329 
330  latch->owner_pid = MyProcPid;
331 }
int MyProcPid
Definition: globals.c:40
bool is_shared
Definition: latch.h:113
#define ERROR
Definition: elog.h:43
static int selfpipe_readfd
Definition: latch.c:137
#define Assert(condition)
Definition: c.h:738
int owner_pid
Definition: latch.h:114
#define elog(elevel,...)
Definition: elog.h:214
static int selfpipe_owner_pid
Definition: latch.c:141

◆ ResetLatch()

void ResetLatch ( Latch latch)

Definition at line 540 of file latch.c.

References Assert, Latch::is_set, MyProcPid, Latch::owner_pid, and pg_memory_barrier.

Referenced by ApplyLauncherMain(), autoprewarm_main(), AutoVacLauncherMain(), BackgroundWriterMain(), CheckpointerMain(), ConditionVariableTimedSleep(), copy_read_data(), gather_readnext(), initialize_worker_spi(), libpqrcv_connect(), libpqrcv_PQgetResult(), logicalrep_worker_stop(), LogicalRepApplyLoop(), mq_putmessage(), pg_promote(), pg_sleep(), pgarch_MainLoop(), pgfdw_get_cleanup_result(), pgfdw_get_result(), PgstatCollectorMain(), ProcSleep(), ProcWaitForSignal(), recoveryApplyDelay(), secure_read(), secure_write(), shm_mq_receive_bytes(), shm_mq_send_bytes(), shm_mq_wait_internal(), SyncRepWaitForLSN(), SysLoggerMain(), test_shm_mq_pipelined(), throttle(), wait_for_relation_state_change(), wait_for_worker_state_change(), wait_for_workers_to_become_ready(), WaitForBackgroundWorkerShutdown(), WaitForBackgroundWorkerStartup(), WaitForParallelWorkersToAttach(), WaitForParallelWorkersToFinish(), WaitForProcSignalBarrier(), WaitForReplicationWorkerAttach(), WaitForWALToBecomeAvailable(), WalRcvWaitForStartPosition(), WalReceiverMain(), WalSndLoop(), WalSndWaitForWal(), WalSndWriteData(), and WalWriterMain().

541 {
542  /* Only the owner should reset the latch */
543  Assert(latch->owner_pid == MyProcPid);
544 
545  latch->is_set = false;
546 
547  /*
548  * Ensure that the write to is_set gets flushed to main memory before we
549  * examine any flag variables. Otherwise a concurrent SetLatch might
550  * falsely conclude that it needn't signal us, even though we have missed
551  * seeing some flag updates that SetLatch was supposed to inform us of.
552  */
554 }
int MyProcPid
Definition: globals.c:40
#define pg_memory_barrier()
Definition: atomics.h:145
#define Assert(condition)
Definition: c.h:738
int owner_pid
Definition: latch.h:114
sig_atomic_t is_set
Definition: latch.h:112

◆ sendSelfPipeByte()

static void sendSelfPipeByte ( void  )
static

Definition at line 1876 of file latch.c.

References EAGAIN, EINTR, EWOULDBLOCK, selfpipe_writefd, and write.

Referenced by latch_sigusr1_handler(), and SetLatch().

1877 {
1878  int rc;
1879  char dummy = 0;
1880 
1881 retry:
1882  rc = write(selfpipe_writefd, &dummy, 1);
1883  if (rc < 0)
1884  {
1885  /* If interrupted by signal, just retry */
1886  if (errno == EINTR)
1887  goto retry;
1888 
1889  /*
1890  * If the pipe is full, we don't need to retry, the data that's there
1891  * already is enough to wake up WaitLatch.
1892  */
1893  if (errno == EAGAIN || errno == EWOULDBLOCK)
1894  return;
1895 
1896  /*
1897  * Oops, the write() failed for some other reason. We might be in a
1898  * signal handler, so it's not safe to elog(). We have no choice but
1899  * silently ignore the error.
1900  */
1901  return;
1902  }
1903 }
static int selfpipe_writefd
Definition: latch.c:138
#define EAGAIN
Definition: win32_port.h:321
#define write(a, b, c)
Definition: win32.h:14
#define EWOULDBLOCK
Definition: win32_port.h:329
#define EINTR
Definition: win32_port.h:323

◆ SetLatch()

void SetLatch ( Latch latch)

Definition at line 457 of file latch.c.

References Latch::is_set, kill, MyProcPid, Latch::owner_pid, pg_memory_barrier, sendSelfPipeByte(), SIGUSR1, and waiting.

Referenced by apw_sighup_handler(), apw_sigterm_handler(), avl_sigusr2_handler(), CheckDeadLockAlert(), ConditionVariableBroadcast(), ConditionVariableSignal(), die(), ForwardSyncRequest(), handle_sig_alarm(), handle_sigterm(), HandleCatchupInterrupt(), HandleNotifyInterrupt(), HandleParallelMessageInterrupt(), IdleInTransactionSessionTimeoutHandler(), logicalrep_worker_wakeup_ptr(), pgarch_waken(), pgarch_waken_stop(), ProcessClientReadInterrupt(), ProcessClientWriteInterrupt(), ProcSendSignal(), procsignal_sigusr1_handler(), ProcWakeup(), RecoveryConflictInterrupt(), ReqCheckpointHandler(), RequestXLogStreaming(), shm_mq_detach_internal(), shm_mq_inc_bytes_read(), shm_mq_send_bytes(), shm_mq_sendv(), shm_mq_set_receiver(), shm_mq_set_sender(), sigHupHandler(), SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), sigUsr1Handler(), StatementCancelHandler(), StrategyGetBuffer(), SwitchBackToLocalLatch(), SwitchToSharedLatch(), SyncRepWakeQueue(), test_shm_mq_main(), WakeupRecovery(), WalRcvForceReply(), WalRcvShutdownHandler(), WalSndLastCycleHandler(), WalSndWaitForWal(), WalSndWakeup(), WalSndWriteData(), worker_spi_sighup(), worker_spi_sigterm(), write_syslogger_file(), and XLogSetAsyncXactLSN().

458 {
459 #ifndef WIN32
460  pid_t owner_pid;
461 #else
462  HANDLE handle;
463 #endif
464 
465  /*
466  * The memory barrier has to be placed here to ensure that any flag
467  * variables possibly changed by this process have been flushed to main
468  * memory, before we check/set is_set.
469  */
471 
472  /* Quick exit if already set */
473  if (latch->is_set)
474  return;
475 
476  latch->is_set = true;
477 
478 #ifndef WIN32
479 
480  /*
481  * See if anyone's waiting for the latch. It can be the current process if
482  * we're in a signal handler. We use the self-pipe to wake up the
483  * poll()/epoll_wait() in that case. If it's another process, send a
484  * signal.
485  *
486  * Fetch owner_pid only once, in case the latch is concurrently getting
487  * owned or disowned. XXX: This assumes that pid_t is atomic, which isn't
488  * guaranteed to be true! In practice, the effective range of pid_t fits
489  * in a 32 bit integer, and so should be atomic. In the worst case, we
490  * might end up signaling the wrong process. Even then, you're very
491  * unlucky if a process with that bogus pid exists and belongs to
492  * Postgres; and PG database processes should handle excess SIGUSR1
493  * interrupts without a problem anyhow.
494  *
495  * Another sort of race condition that's possible here is for a new
496  * process to own the latch immediately after we look, so we don't signal
497  * it. This is okay so long as all callers of ResetLatch/WaitLatch follow
498  * the standard coding convention of waiting at the bottom of their loops,
499  * not the top, so that they'll correctly process latch-setting events
500  * that happen before they enter the loop.
501  */
502  owner_pid = latch->owner_pid;
503  if (owner_pid == 0)
504  return;
505  else if (owner_pid == MyProcPid)
506  {
507  if (waiting)
509  }
510  else
511  kill(owner_pid, SIGUSR1);
512 #else
513 
514  /*
515  * See if anyone's waiting for the latch. It can be the current process if
516  * we're in a signal handler.
517  *
518  * Use a local variable here just in case somebody changes the event field
519  * concurrently (which really should not happen).
520  */
521  handle = latch->event;
522  if (handle)
523  {
524  SetEvent(handle);
525 
526  /*
527  * Note that we silently ignore any errors. We might be in a signal
528  * handler or other critical path where it's not safe to call elog().
529  */
530  }
531 #endif
532 
533 }
int MyProcPid
Definition: globals.c:40
#define SIGUSR1
Definition: win32_port.h:165
#define kill(pid, sig)
Definition: win32_port.h:426
static void sendSelfPipeByte(void)
Definition: latch.c:1876
#define pg_memory_barrier()
Definition: atomics.h:145
int owner_pid
Definition: latch.h:114
sig_atomic_t is_set
Definition: latch.h:112
static volatile sig_atomic_t waiting
Definition: latch.c:134

◆ WaitEventSetWait()

int WaitEventSetWait ( WaitEventSet set,
long  timeout,
WaitEvent occurred_events,
int  nevents,
uint32  wait_event_info 
)

Definition at line 1167 of file latch.c.

References Assert, buf, drainSelfPipe(), EINTR, elog, ereport, errcode_for_socket_access(), errmsg(), ERROR, WaitEvent::events, WaitEvent::fd, INSTR_TIME_GET_MILLISEC, INSTR_TIME_SET_CURRENT, INSTR_TIME_SUBTRACT, WaitEventSet::nevents, PGINVALID_SOCKET, pgstat_report_wait_end(), pgstat_report_wait_start(), pgwin32_dispatch_queued_signals(), WaitEvent::pos, PostmasterIsAliveInternal(), proc_exit(), start_time, unlikely, WaitEvent::user_data, WaitEventSetWaitBlock(), waiting, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_CONNECTED, WL_SOCKET_MASK, WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by ConditionVariableTimedSleep(), secure_read(), secure_write(), SysLoggerMain(), and WaitLatchOrSocket().

1170 {
1171  int returned_events = 0;
1173  instr_time cur_time;
1174  long cur_timeout = -1;
1175 
1176  Assert(nevents > 0);
1177 
1178  /*
1179  * Initialize timeout if requested. We must record the current time so
1180  * that we can determine the remaining timeout if interrupted.
1181  */
1182  if (timeout >= 0)
1183  {
1184  INSTR_TIME_SET_CURRENT(start_time);
1185  Assert(timeout >= 0 && timeout <= INT_MAX);
1186  cur_timeout = timeout;
1187  }
1188 
1189  pgstat_report_wait_start(wait_event_info);
1190 
1191 #ifndef WIN32
1192  waiting = true;
1193 #else
1194  /* Ensure that signals are serviced even if latch is already set */
1196 #endif
1197  while (returned_events == 0)
1198  {
1199  int rc;
1200 
1201  /*
1202  * Check if the latch is set already. If so, leave the loop
1203  * immediately, avoid blocking again. We don't attempt to report any
1204  * other events that might also be satisfied.
1205  *
1206  * If someone sets the latch between this and the
1207  * WaitEventSetWaitBlock() below, the setter will write a byte to the
1208  * pipe (or signal us and the signal handler will do that), and the
1209  * readiness routine will return immediately.
1210  *
1211  * On unix, If there's a pending byte in the self pipe, we'll notice
1212  * whenever blocking. Only clearing the pipe in that case avoids
1213  * having to drain it every time WaitLatchOrSocket() is used. Should
1214  * the pipe-buffer fill up we're still ok, because the pipe is in
1215  * nonblocking mode. It's unlikely for that to happen, because the
1216  * self pipe isn't filled unless we're blocking (waiting = true), or
1217  * from inside a signal handler in latch_sigusr1_handler().
1218  *
1219  * On windows, we'll also notice if there's a pending event for the
1220  * latch when blocking, but there's no danger of anything filling up,
1221  * as "Setting an event that is already set has no effect.".
1222  *
1223  * Note: we assume that the kernel calls involved in latch management
1224  * will provide adequate synchronization on machines with weak memory
1225  * ordering, so that we cannot miss seeing is_set if a notification
1226  * has already been queued.
1227  */
1228  if (set->latch && set->latch->is_set)
1229  {
1230  occurred_events->fd = PGINVALID_SOCKET;
1231  occurred_events->pos = set->latch_pos;
1232  occurred_events->user_data =
1233  set->events[set->latch_pos].user_data;
1234  occurred_events->events = WL_LATCH_SET;
1235  occurred_events++;
1236  returned_events++;
1237 
1238  break;
1239  }
1240 
1241  /*
1242  * Wait for events using the readiness primitive chosen at the top of
1243  * this file. If -1 is returned, a timeout has occurred, if 0 we have
1244  * to retry, everything >= 1 is the number of returned events.
1245  */
1246  rc = WaitEventSetWaitBlock(set, cur_timeout,
1247  occurred_events, nevents);
1248 
1249  if (rc == -1)
1250  break; /* timeout occurred */
1251  else
1252  returned_events = rc;
1253 
1254  /* If we're not done, update cur_timeout for next iteration */
1255  if (returned_events == 0 && timeout >= 0)
1256  {
1257  INSTR_TIME_SET_CURRENT(cur_time);
1258  INSTR_TIME_SUBTRACT(cur_time, start_time);
1259  cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time);
1260  if (cur_timeout <= 0)
1261  break;
1262  }
1263  }
1264 #ifndef WIN32
1265  waiting = false;
1266 #endif
1267 
1269 
1270  return returned_events;
1271 }
pgsocket fd
Definition: latch.h:145
int pos
Definition: latch.h:143
#define INSTR_TIME_GET_MILLISEC(t)
Definition: instr_time.h:202
struct timeval instr_time
Definition: instr_time.h:150
static time_t start_time
Definition: pg_ctl.c:99
void pgwin32_dispatch_queued_signals(void)
Definition: signal.c:108
#define INSTR_TIME_SUBTRACT(x, y)
Definition: instr_time.h:170
uint32 events
Definition: latch.h:144
static int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, WaitEvent *occurred_events, int nevents)
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1330
#define PGINVALID_SOCKET
Definition: port.h:33
#define Assert(condition)
Definition: c.h:738
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1306
#define INSTR_TIME_SET_CURRENT(t)
Definition: instr_time.h:156
void * user_data
Definition: latch.h:146
sig_atomic_t is_set
Definition: latch.h:112
Latch * latch
Definition: latch.c:99
#define WL_LATCH_SET
Definition: latch.h:124
static volatile sig_atomic_t waiting
Definition: latch.c:134

◆ WaitEventSetWaitBlock()

static int WaitEventSetWaitBlock ( WaitEventSet set,
int  cur_timeout,
WaitEvent occurred_events,
int  nevents 
)
inlinestatic

Referenced by WaitEventSetWait().

◆ WaitLatch()

◆ WaitLatchOrSocket()

int WaitLatchOrSocket ( Latch latch,
int  wakeEvents,
pgsocket  sock,
long  timeout,
uint32  wait_event_info 
)

Definition at line 390 of file latch.c.

References AddWaitEventToSet(), Assert, CreateWaitEventSet(), CurrentMemoryContext, FreeWaitEventSet(), IsUnderPostmaster, PGINVALID_SOCKET, WaitEventSetWait(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_MASK, and WL_TIMEOUT.

Referenced by be_tls_open_server(), copy_read_data(), libpqrcv_connect(), libpqrcv_PQgetResult(), LogicalRepApplyLoop(), pgfdw_get_cleanup_result(), pgfdw_get_result(), PgstatCollectorMain(), read_or_wait(), secure_open_gssapi(), WaitLatch(), WalReceiverMain(), WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

392 {
393  int ret = 0;
394  int rc;
395  WaitEvent event;
397 
398  if (wakeEvents & WL_TIMEOUT)
399  Assert(timeout >= 0);
400  else
401  timeout = -1;
402 
403  if (wakeEvents & WL_LATCH_SET)
404  AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET,
405  latch, NULL);
406 
407  /* Postmaster-managed callers must handle postmaster death somehow. */
409  (wakeEvents & WL_EXIT_ON_PM_DEATH) ||
410  (wakeEvents & WL_POSTMASTER_DEATH));
411 
412  if ((wakeEvents & WL_POSTMASTER_DEATH) && IsUnderPostmaster)
413  AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
414  NULL, NULL);
415 
416  if ((wakeEvents & WL_EXIT_ON_PM_DEATH) && IsUnderPostmaster)
417  AddWaitEventToSet(set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
418  NULL, NULL);
419 
420  if (wakeEvents & WL_SOCKET_MASK)
421  {
422  int ev;
423 
424  ev = wakeEvents & WL_SOCKET_MASK;
425  AddWaitEventToSet(set, ev, sock, NULL, NULL);
426  }
427 
428  rc = WaitEventSetWait(set, timeout, &event, 1, wait_event_info);
429 
430  if (rc == 0)
431  ret |= WL_TIMEOUT;
432  else
433  {
434  ret |= event.events & (WL_LATCH_SET |
435  WL_POSTMASTER_DEATH |
437  }
438 
439  FreeWaitEventSet(set);
440 
441  return ret;
442 }
void FreeWaitEventSet(WaitEventSet *set)
Definition: latch.c:696
#define WL_TIMEOUT
Definition: latch.h:127
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
Definition: latch.c:760
#define WL_SOCKET_MASK
Definition: latch.h:137
WaitEventSet * CreateWaitEventSet(MemoryContext context, int nevents)
Definition: latch.c:563
bool IsUnderPostmaster
Definition: globals.c:109
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
#define PGINVALID_SOCKET
Definition: port.h:33
#define Assert(condition)
Definition: c.h:738
#define WL_LATCH_SET
Definition: latch.h:124
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1167

Variable Documentation

◆ selfpipe_owner_pid

int selfpipe_owner_pid = 0
static

Definition at line 141 of file latch.c.

Referenced by InitializeLatchSupport(), InitLatch(), and OwnLatch().

◆ selfpipe_readfd

int selfpipe_readfd = -1
static

◆ selfpipe_writefd

int selfpipe_writefd = -1
static

Definition at line 138 of file latch.c.

Referenced by InitializeLatchSupport(), and sendSelfPipeByte().

◆ waiting

volatile sig_atomic_t waiting = false
static