54#ifdef HAVE_SYS_EPOLL_H
57#ifdef HAVE_SYS_EVENT_H
60#ifdef HAVE_SYS_SIGNALFD_H
61#include <sys/signalfd.h>
87#if defined(WAIT_USE_EPOLL) || defined(WAIT_USE_POLL) || \
88 defined(WAIT_USE_KQUEUE) || defined(WAIT_USE_WIN32)
90#elif defined(HAVE_SYS_EPOLL_H)
92#elif defined(HAVE_KQUEUE)
93#define WAIT_USE_KQUEUE
94#elif defined(HAVE_POLL)
99#error "no wait set implementation available"
106#if defined(WAIT_USE_POLL) || defined(WAIT_USE_EPOLL)
107#if defined(WAIT_USE_SELF_PIPE) || defined(WAIT_USE_SIGNALFD)
109#elif defined(WAIT_USE_EPOLL) && defined(HAVE_SYS_SIGNALFD_H)
110#define WAIT_USE_SIGNALFD
112#define WAIT_USE_SELF_PIPE
146#if defined(WAIT_USE_EPOLL)
149 struct epoll_event *epoll_ret_events;
150#elif defined(WAIT_USE_KQUEUE)
153 struct kevent *kqueue_ret_events;
154 bool report_postmaster_not_running;
155#elif defined(WAIT_USE_POLL)
158#elif defined(WAIT_USE_WIN32)
174#ifdef WAIT_USE_SIGNALFD
176static int signal_fd = -1;
179#ifdef WAIT_USE_SELF_PIPE
192#if defined(WAIT_USE_SELF_PIPE) || defined(WAIT_USE_SIGNALFD)
193static void drain(
void);
196#if defined(WAIT_USE_EPOLL)
198#elif defined(WAIT_USE_KQUEUE)
200#elif defined(WAIT_USE_POLL)
202#elif defined(WAIT_USE_WIN32)
207 WaitEvent *occurred_events,
int nevents);
214 .
name =
"WaitEventSet",
243#if defined(WAIT_USE_SELF_PIPE)
295 if (pipe(pipefd) < 0)
297 if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) == -1)
298 elog(
FATAL,
"fcntl(F_SETFL) failed on read-end of self-pipe: %m");
299 if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) == -1)
300 elog(
FATAL,
"fcntl(F_SETFL) failed on write-end of self-pipe: %m");
301 if (fcntl(pipefd[0], F_SETFD, FD_CLOEXEC) == -1)
302 elog(
FATAL,
"fcntl(F_SETFD) failed on read-end of self-pipe: %m");
303 if (fcntl(pipefd[1], F_SETFD, FD_CLOEXEC) == -1)
304 elog(
FATAL,
"fcntl(F_SETFD) failed on write-end of self-pipe: %m");
317#ifdef WAIT_USE_SIGNALFD
318 sigset_t signalfd_mask;
330 (void)
close(signal_fd);
340 sigemptyset(&signalfd_mask);
341 sigaddset(&signalfd_mask, SIGURG);
342 signal_fd = signalfd(-1, &signalfd_mask, SFD_NONBLOCK | SFD_CLOEXEC);
348#ifdef WAIT_USE_KQUEUE
379#if defined(WAIT_USE_EPOLL)
380 sz +=
MAXALIGN(
sizeof(
struct epoll_event) * nevents);
381#elif defined(WAIT_USE_KQUEUE)
382 sz +=
MAXALIGN(
sizeof(
struct kevent) * nevents);
383#elif defined(WAIT_USE_POLL)
384 sz +=
MAXALIGN(
sizeof(
struct pollfd) * nevents);
385#elif defined(WAIT_USE_WIN32)
387 sz +=
MAXALIGN(
sizeof(HANDLE) * (nevents + 1));
390 if (resowner != NULL)
401#if defined(WAIT_USE_EPOLL)
402 set->epoll_ret_events = (
struct epoll_event *)
data;
404#elif defined(WAIT_USE_KQUEUE)
405 set->kqueue_ret_events = (
struct kevent *)
data;
407#elif defined(WAIT_USE_POLL)
410#elif defined(WAIT_USE_WIN32)
411 set->handles = (HANDLE)
data;
419 if (resowner != NULL)
422 set->
owner = resowner;
425#if defined(WAIT_USE_EPOLL)
427 elog(
ERROR,
"AcquireExternalFD, for epoll_create1, failed: %m");
428 set->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
429 if (set->epoll_fd < 0)
434#elif defined(WAIT_USE_KQUEUE)
436 elog(
ERROR,
"AcquireExternalFD, for kqueue, failed: %m");
437 set->kqueue_fd = kqueue();
438 if (set->kqueue_fd < 0)
443 if (fcntl(set->kqueue_fd, F_SETFD, FD_CLOEXEC) == -1)
445 int save_errno = errno;
447 close(set->kqueue_fd);
450 elog(
ERROR,
"fcntl(F_SETFD) failed on kqueue descriptor: %m");
452 set->report_postmaster_not_running =
false;
453#elif defined(WAIT_USE_WIN32)
488#if defined(WAIT_USE_EPOLL)
489 close(set->epoll_fd);
491#elif defined(WAIT_USE_KQUEUE)
492 close(set->kqueue_fd);
494#elif defined(WAIT_USE_WIN32)
510 WSAEventSelect(cur_event->fd, NULL, 0);
511 WSACloseEvent(set->handles[cur_event->pos + 1]);
525#if defined(WAIT_USE_EPOLL)
526 close(set->epoll_fd);
528#elif defined(WAIT_USE_KQUEUE)
586 elog(
ERROR,
"cannot wait on a latch owned by another process");
588 elog(
ERROR,
"cannot wait on more than one latch");
590 elog(
ERROR,
"latch events only support being set");
595 elog(
ERROR,
"cannot wait on latch without a specified latch");
600 elog(
ERROR,
"cannot wait on socket event without a socket");
605 event->events = events;
606 event->user_data = user_data;
608 event->reset =
false;
615#if defined(WAIT_USE_SELF_PIPE)
617#elif defined(WAIT_USE_SIGNALFD)
618 event->fd = signal_fd;
634#if defined(WAIT_USE_EPOLL)
635 WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
636#elif defined(WAIT_USE_KQUEUE)
637 WaitEventAdjustKqueue(set, event, 0);
638#elif defined(WAIT_USE_POLL)
640#elif defined(WAIT_USE_WIN32)
641 WaitEventAdjustWin32(set, event);
658#if defined(WAIT_USE_KQUEUE)
662 Assert(pos < set->nevents);
664 event = &set->
events[pos];
665#if defined(WAIT_USE_KQUEUE)
666 old_events =
event->
events;
679 elog(
ERROR,
"cannot remove postmaster death event");
690 if (events == event->
events &&
695 elog(
ERROR,
"cannot modify latch event");
698 event->events = events;
703 elog(
ERROR,
"cannot wait on a latch owned by another process");
713#if defined(WAIT_USE_WIN32)
721#if defined(WAIT_USE_EPOLL)
722 WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
723#elif defined(WAIT_USE_KQUEUE)
724 WaitEventAdjustKqueue(set, event, old_events);
725#elif defined(WAIT_USE_POLL)
727#elif defined(WAIT_USE_WIN32)
728 WaitEventAdjustWin32(set, event);
732#if defined(WAIT_USE_EPOLL)
739 struct epoll_event epoll_ev;
743 epoll_ev.data.ptr = event;
745 epoll_ev.
events = EPOLLERR | EPOLLHUP;
751 epoll_ev.events |= EPOLLIN;
755 epoll_ev.events |= EPOLLIN;
765 epoll_ev.events |= EPOLLIN;
767 epoll_ev.events |= EPOLLOUT;
769 epoll_ev.events |= EPOLLRDHUP;
777 rc = epoll_ctl(set->epoll_fd,
action, event->
fd, &epoll_ev);
787#if defined(WAIT_USE_POLL)
791 struct pollfd *pollfd = &set->
pollfds[
event->pos];
794 pollfd->fd =
event->fd;
800 pollfd->events = POLLIN;
804 pollfd->events = POLLIN;
813 pollfd->events |= POLLIN;
815 pollfd->events |= POLLOUT;
818 pollfd->events |= POLLRDHUP;
826#if defined(WAIT_USE_KQUEUE)
834#define AccessWaitEvent(k_ev) (*((WaitEvent **)(&(k_ev)->udata)))
837WaitEventAdjustKqueueAdd(
struct kevent *k_ev,
int filter,
int action,
840 k_ev->ident =
event->fd;
841 k_ev->filter = filter;
845 AccessWaitEvent(k_ev) = event;
849WaitEventAdjustKqueueAddPostmaster(
struct kevent *k_ev,
WaitEvent *event)
853 k_ev->filter = EVFILT_PROC;
854 k_ev->flags = EV_ADD;
855 k_ev->fflags = NOTE_EXIT;
857 AccessWaitEvent(k_ev) = event;
861WaitEventAdjustKqueueAddLatch(
struct kevent *k_ev,
WaitEvent *event)
864 k_ev->ident = SIGURG;
865 k_ev->filter = EVFILT_SIGNAL;
866 k_ev->flags = EV_ADD;
869 AccessWaitEvent(k_ev) = event;
879 struct kevent k_ev[2];
881 bool new_filt_read =
false;
882 bool old_filt_read =
false;
883 bool new_filt_write =
false;
884 bool old_filt_write =
false;
886 if (old_events == event->
events)
903 WaitEventAdjustKqueueAddPostmaster(&k_ev[count++], event);
908 WaitEventAdjustKqueueAddLatch(&k_ev[count++], event);
918 old_filt_read =
true;
920 new_filt_read =
true;
922 old_filt_write =
true;
924 new_filt_write =
true;
925 if (old_filt_read && !new_filt_read)
926 WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_READ, EV_DELETE,
928 else if (!old_filt_read && new_filt_read)
929 WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_READ, EV_ADD,
931 if (old_filt_write && !new_filt_write)
932 WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_WRITE, EV_DELETE,
934 else if (!old_filt_write && new_filt_write)
935 WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_WRITE, EV_ADD,
945 rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
957 (errno == ESRCH || errno == EACCES))
958 set->report_postmaster_not_running =
true;
974 set->report_postmaster_not_running =
true;
980#if defined(WAIT_USE_WIN32)
986 HANDLE *handle = &set->handles[
event->pos + 1];
991 *handle = set->
latch->event;
995 *handle = PostmasterHandle;
999 int flags = FD_CLOSE;
1006 flags |= FD_CONNECT;
1010 if (*handle == WSA_INVALID_EVENT)
1012 *handle = WSACreateEvent();
1013 if (*handle == WSA_INVALID_EVENT)
1014 elog(
ERROR,
"failed to create event for socket: error code %d",
1017 if (WSAEventSelect(event->
fd, *handle, flags) != 0)
1018 elog(
ERROR,
"failed to set up event for socket: error code %d",
1040 WaitEvent *occurred_events,
int nevents,
1043 int returned_events = 0;
1046 long cur_timeout = -1;
1057 Assert(timeout >= 0 && timeout <= INT_MAX);
1058 cur_timeout = timeout;
1071 while (returned_events == 0)
1123 if (returned_events == nevents)
1141 occurred_events, nevents - returned_events);
1150 returned_events += rc;
1153 if (returned_events == 0 && timeout >= 0)
1158 if (cur_timeout <= 0)
1168 return returned_events;
1172#if defined(WAIT_USE_EPOLL)
1184 WaitEvent *occurred_events,
int nevents)
1186 int returned_events = 0;
1189 struct epoll_event *cur_epoll_event;
1192 rc = epoll_wait(set->epoll_fd, set->epoll_ret_events,
1204 errmsg(
"%s() failed: %m",
1220 for (cur_epoll_event = set->epoll_ret_events;
1221 cur_epoll_event < (set->epoll_ret_events + rc) &&
1222 returned_events < nevents;
1226 cur_event = (
WaitEvent *) cur_epoll_event->data.ptr;
1228 occurred_events->
pos = cur_event->
pos;
1230 occurred_events->
events = 0;
1233 cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
1247 cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
1277 (cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP)))
1284 (cur_epoll_event->events & (EPOLLOUT | EPOLLERR | EPOLLHUP)))
1291 (cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)))
1297 if (occurred_events->
events != 0)
1299 occurred_events->
fd = cur_event->
fd;
1306 return returned_events;
1309#elif defined(WAIT_USE_KQUEUE)
1320 WaitEvent *occurred_events,
int nevents)
1322 int returned_events = 0;
1325 struct kevent *cur_kqueue_event;
1326 struct timespec timeout;
1327 struct timespec *timeout_p;
1329 if (cur_timeout < 0)
1333 timeout.tv_sec = cur_timeout / 1000;
1334 timeout.tv_nsec = (cur_timeout % 1000) * 1000000;
1335 timeout_p = &timeout;
1342 if (
unlikely(set->report_postmaster_not_running))
1352 rc = kevent(set->kqueue_fd, NULL, 0,
1353 set->kqueue_ret_events,
1366 errmsg(
"%s() failed: %m",
1382 for (cur_kqueue_event = set->kqueue_ret_events;
1383 cur_kqueue_event < (set->kqueue_ret_events + rc) &&
1384 returned_events < nevents;
1388 cur_event = AccessWaitEvent(cur_kqueue_event);
1390 occurred_events->
pos = cur_event->
pos;
1392 occurred_events->
events = 0;
1395 cur_kqueue_event->filter == EVFILT_SIGNAL)
1406 cur_kqueue_event->filter == EVFILT_PROC &&
1407 (cur_kqueue_event->fflags & NOTE_EXIT) != 0)
1414 set->report_postmaster_not_running =
true;
1430 (cur_kqueue_event->filter == EVFILT_READ))
1437 (cur_kqueue_event->filter == EVFILT_READ) &&
1438 (cur_kqueue_event->flags & EV_EOF))
1445 (cur_kqueue_event->filter == EVFILT_WRITE))
1451 if (occurred_events->
events != 0)
1453 occurred_events->
fd = cur_event->
fd;
1460 return returned_events;
1463#elif defined(WAIT_USE_POLL)
1473 WaitEvent *occurred_events,
int nevents)
1475 int returned_events = 0;
1478 struct pollfd *cur_pollfd;
1492 errmsg(
"%s() failed: %m",
1505 returned_events < nevents;
1506 cur_event++, cur_pollfd++)
1509 if (cur_pollfd->revents == 0)
1512 occurred_events->
pos = cur_event->
pos;
1514 occurred_events->
events = 0;
1517 (cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
1531 (cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
1558 int errflags = POLLHUP | POLLERR | POLLNVAL;
1563 (cur_pollfd->revents & (POLLIN | errflags)))
1570 (cur_pollfd->revents & (POLLOUT | errflags)))
1578 (cur_pollfd->revents & (POLLRDHUP | errflags)))
1585 if (occurred_events->
events != 0)
1587 occurred_events->
fd = cur_event->
fd;
1593 return returned_events;
1596#elif defined(WAIT_USE_WIN32)
1607 WaitEvent *occurred_events,
int nevents)
1609 int returned_events = 0;
1614 for (cur_event = set->
events;
1618 if (cur_event->reset)
1620 WaitEventAdjustWin32(set, cur_event);
1621 cur_event->reset =
false;
1646 if (WSARecv(cur_event->
fd, &
buf, 1, &received, &flags, NULL, NULL) == 0)
1648 occurred_events->
pos = cur_event->
pos;
1651 occurred_events->
fd = cur_event->
fd;
1677 r = WSASend(cur_event->
fd, &
buf, 1, &sent, 0, NULL, NULL);
1678 if (r == 0 || WSAGetLastError() != WSAEWOULDBLOCK)
1680 occurred_events->
pos = cur_event->
pos;
1683 occurred_events->
fd = cur_event->
fd;
1694 rc = WaitForMultipleObjects(set->
nevents + 1, set->handles, FALSE,
1698 if (rc == WAIT_FAILED)
1699 elog(
ERROR,
"WaitForMultipleObjects() failed: error code %lu",
1701 else if (rc == WAIT_TIMEOUT)
1707 if (rc == WAIT_OBJECT_0)
1725 occurred_events->
pos = cur_event->
pos;
1727 occurred_events->
events = 0;
1735 if (!ResetEvent(set->handles[cur_event->
pos + 1]))
1736 elog(
ERROR,
"ResetEvent failed: error code %lu", GetLastError());
1767 WSANETWORKEVENTS resEvents;
1768 HANDLE handle = set->handles[cur_event->
pos + 1];
1772 occurred_events->
fd = cur_event->
fd;
1774 ZeroMemory(&resEvents,
sizeof(resEvents));
1775 if (WSAEnumNetworkEvents(cur_event->
fd, handle, &resEvents) != 0)
1776 elog(
ERROR,
"failed to enumerate network events: error code %d",
1779 (resEvents.lNetworkEvents & FD_READ))
1797 cur_event->reset =
true;
1800 (resEvents.lNetworkEvents & FD_WRITE))
1806 (resEvents.lNetworkEvents & FD_CONNECT))
1812 (resEvents.lNetworkEvents & FD_ACCEPT))
1817 if (resEvents.lNetworkEvents & FD_CLOSE)
1823 if (occurred_events->
events != 0)
1831 if (returned_events == nevents)
1835 next_pos = cur_event->
pos + 1;
1844 count = set->
nevents - next_pos;
1845 rc = WaitForMultipleObjects(count,
1846 set->handles + 1 + next_pos,
1854 if (rc < WAIT_OBJECT_0 || rc >= WAIT_OBJECT_0 + count)
1858 cur_event = &set->
events[next_pos + (rc - WAIT_OBJECT_0)];
1861 return returned_events;
1871#if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \
1872 defined(WAIT_USE_EPOLL) || \
1873 defined(WAIT_USE_KQUEUE)
1889#if defined(WAIT_USE_SELF_PIPE)
1936#if defined(WAIT_USE_SELF_PIPE) || defined(WAIT_USE_SIGNALFD)
1952#ifdef WAIT_USE_SELF_PIPE
1965 else if (errno ==
EINTR)
1970#ifdef WAIT_USE_SELF_PIPE
1971 elog(
ERROR,
"read() on self-pipe failed: %m");
1973 elog(
ERROR,
"read() on signalfd failed: %m");
1980#ifdef WAIT_USE_SELF_PIPE
1981 elog(
ERROR,
"unexpected EOF on self-pipe");
1983 elog(
ERROR,
"unexpected EOF on signalfd");
1986 else if (rc <
sizeof(
buf))
2023#if defined(WAIT_USE_SELF_PIPE)
#define pg_memory_barrier()
#define StaticAssertDecl(condition, errmessage)
int errcode_for_socket_access(void)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void ReleaseExternalFD(void)
bool AcquireExternalFD(void)
void ReserveExternalFD(void)
Assert(PointerIsAligned(start, uint64))
#define INSTR_TIME_SET_CURRENT(t)
#define INSTR_TIME_SUBTRACT(x, y)
#define INSTR_TIME_GET_MILLISEC(t)
#define INSTR_TIME_SET_ZERO(t)
if(TABLE==NULL||TABLE_index==NULL)
void * MemoryContextAllocZero(MemoryContext context, Size size)
void pfree(void *pointer)
MemoryContext TopMemoryContext
static char buf[DEFAULT_XLOG_SEG_SIZE]
bool PostmasterIsAliveInternal(void)
#define PostmasterIsAlive()
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
int postmaster_alive_fds[2]
#define POSTMASTER_FD_WATCH
static int fd(const char *x, int i)
void ResourceOwnerForget(ResourceOwner owner, Datum value, const ResourceOwnerDesc *kind)
void ResourceOwnerRemember(ResourceOwner owner, Datum value, const ResourceOwnerDesc *kind)
void ResourceOwnerEnlarge(ResourceOwner owner)
#define RELEASE_PRIO_WAITEVENTSETS
@ RESOURCE_RELEASE_AFTER_LOCKS
void pgwin32_dispatch_queued_signals(void)
HANDLE pgwin32_signal_event
sig_atomic_t maybe_sleeping
bool exit_on_postmaster_death
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
static void latch_sigurg_handler(SIGNAL_ARGS)
static void sendSelfPipeByte(void)
static void ResourceOwnerForgetWaitEventSet(ResourceOwner owner, WaitEventSet *set)
static int selfpipe_readfd
static const ResourceOwnerDesc wait_event_set_resowner_desc
void FreeWaitEventSetAfterFork(WaitEventSet *set)
static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
static int selfpipe_owner_pid
static int selfpipe_writefd
int GetNumRegisteredWaitEvents(WaitEventSet *set)
void WakeupOtherProc(int pid)
static void ResourceOwnerRememberWaitEventSet(ResourceOwner owner, WaitEventSet *set)
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
static int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, WaitEvent *occurred_events, int nevents)
static void ResOwnerReleaseWaitEventSet(Datum res)
void InitializeWaitEventSupport(void)
bool WaitEventSetCanReportClosed(void)
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
static volatile sig_atomic_t waiting
void FreeWaitEventSet(WaitEventSet *set)
WaitEventSet * CreateWaitEventSet(ResourceOwner resowner, int nevents)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
#define WL_SOCKET_CONNECTED
#define WL_POSTMASTER_DEATH
#define WL_SOCKET_WRITEABLE