40 #ifdef HAVE_SYS_EPOLL_H 41 #include <sys/epoll.h> 43 #ifdef HAVE_SYS_EVENT_H 44 #include <sys/event.h> 69 #if defined(WAIT_USE_EPOLL) || defined(WAIT_USE_POLL) || \ 70 defined(WAIT_USE_KQUEUE) || defined(WAIT_USE_WIN32) 72 #elif defined(HAVE_SYS_EPOLL_H) 73 #define WAIT_USE_EPOLL 74 #elif defined(HAVE_KQUEUE) 75 #define WAIT_USE_KQUEUE 76 #elif defined(HAVE_POLL) 79 #define WAIT_USE_WIN32 81 #error "no wait set implementation available" 85 #include <sys/signalfd.h> 116 #if defined(WAIT_USE_EPOLL) 119 struct epoll_event *epoll_ret_events;
120 #elif defined(WAIT_USE_KQUEUE) 123 struct kevent *kqueue_ret_events;
124 bool report_postmaster_not_running;
125 #elif defined(WAIT_USE_POLL) 127 struct pollfd *pollfds;
128 #elif defined(WAIT_USE_WIN32) 143 #define LatchWaitSetLatchPos 0 150 #ifdef WAIT_USE_EPOLL 152 static int signal_fd = -1;
155 #if defined(WAIT_USE_POLL) 157 static int selfpipe_readfd = -1;
158 static int selfpipe_writefd = -1;
161 static int selfpipe_owner_pid = 0;
165 static void sendSelfPipeByte(
void);
168 #if defined(WAIT_USE_POLL) || defined(WAIT_USE_EPOLL) 169 static void drain(
void);
172 #if defined(WAIT_USE_EPOLL) 174 #elif defined(WAIT_USE_KQUEUE) 176 #elif defined(WAIT_USE_POLL) 178 #elif defined(WAIT_USE_WIN32) 194 #if defined(WAIT_USE_POLL) 205 if (selfpipe_owner_pid != 0)
210 (void)
close(selfpipe_readfd);
211 (void)
close(selfpipe_writefd);
213 selfpipe_readfd = selfpipe_writefd = -1;
214 selfpipe_owner_pid = 0;
227 Assert(selfpipe_readfd == -1);
233 Assert(selfpipe_readfd == -1);
234 Assert(selfpipe_owner_pid == 0);
246 if (pipe(pipefd) < 0)
248 if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) == -1)
249 elog(
FATAL,
"fcntl(F_SETFL) failed on read-end of self-pipe: %m");
250 if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) == -1)
251 elog(
FATAL,
"fcntl(F_SETFL) failed on write-end of self-pipe: %m");
252 if (fcntl(pipefd[0], F_SETFD, FD_CLOEXEC) == -1)
253 elog(
FATAL,
"fcntl(F_SETFD) failed on read-end of self-pipe: %m");
254 if (fcntl(pipefd[1], F_SETFD, FD_CLOEXEC) == -1)
255 elog(
FATAL,
"fcntl(F_SETFD) failed on write-end of self-pipe: %m");
257 selfpipe_readfd = pipefd[0];
258 selfpipe_writefd = pipefd[1];
265 pqsignal(SIGURG, latch_sigurg_handler);
268 #ifdef WAIT_USE_EPOLL 269 sigset_t signalfd_mask;
275 sigemptyset(&signalfd_mask);
276 sigaddset(&signalfd_mask, SIGURG);
277 signal_fd = signalfd(-1, &signalfd_mask, SFD_NONBLOCK | SFD_CLOEXEC);
283 #ifdef WAIT_USE_KQUEUE 294 Assert(LatchWaitSet == NULL);
310 #if defined(WAIT_USE_POLL) 320 #if defined(WAIT_USE_POLL) 321 close(selfpipe_readfd);
322 close(selfpipe_writefd);
323 selfpipe_readfd = -1;
324 selfpipe_writefd = -1;
328 #if defined(WAIT_USE_EPOLL) 345 #if defined(WAIT_USE_POLL) 348 #elif defined(WAIT_USE_WIN32) 349 latch->event = CreateEvent(NULL, TRUE, FALSE, NULL);
350 if (latch->event == NULL)
351 elog(
ERROR,
"CreateEvent failed: error code %lu", GetLastError());
374 SECURITY_ATTRIBUTES
sa;
379 ZeroMemory(&sa,
sizeof(sa));
380 sa.nLength =
sizeof(sa);
381 sa.bInheritHandle = TRUE;
383 latch->event = CreateEvent(&sa, TRUE, FALSE, NULL);
384 if (latch->event == NULL)
385 elog(
ERROR,
"CreateEvent failed: error code %lu", GetLastError());
409 #if defined(WAIT_USE_POLL) 476 wait_event_info) == 0)
501 long timeout,
uint32 wait_event_info)
544 ret |=
event.events & (WL_LATCH_SET |
545 WL_POSTMASTER_DEATH |
621 #if defined(WAIT_USE_POLL) 630 kill(owner_pid, SIGURG);
641 handle = latch->event;
699 #if defined(WAIT_USE_EPOLL) 700 sz +=
MAXALIGN(
sizeof(
struct epoll_event) * nevents);
701 #elif defined(WAIT_USE_KQUEUE) 702 sz +=
MAXALIGN(
sizeof(
struct kevent) * nevents);
703 #elif defined(WAIT_USE_POLL) 704 sz +=
MAXALIGN(
sizeof(
struct pollfd) * nevents);
705 #elif defined(WAIT_USE_WIN32) 707 sz +=
MAXALIGN(
sizeof(HANDLE) * (nevents + 1));
718 #if defined(WAIT_USE_EPOLL) 719 set->epoll_ret_events = (
struct epoll_event *) data;
720 data +=
MAXALIGN(
sizeof(
struct epoll_event) * nevents);
721 #elif defined(WAIT_USE_KQUEUE) 722 set->kqueue_ret_events = (
struct kevent *) data;
723 data +=
MAXALIGN(
sizeof(
struct kevent) * nevents);
724 #elif defined(WAIT_USE_POLL) 725 set->pollfds = (
struct pollfd *) data;
726 data +=
MAXALIGN(
sizeof(
struct pollfd) * nevents);
727 #elif defined(WAIT_USE_WIN32) 728 set->handles = (HANDLE) data;
729 data +=
MAXALIGN(
sizeof(HANDLE) * nevents);
734 set->exit_on_postmaster_death =
false;
736 #if defined(WAIT_USE_EPOLL) 742 set->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
743 if (set->epoll_fd < 0)
748 #elif defined(WAIT_USE_KQUEUE) 754 set->kqueue_fd = kqueue();
755 if (set->kqueue_fd < 0)
760 if (fcntl(set->kqueue_fd, F_SETFD, FD_CLOEXEC) == -1)
762 int save_errno = errno;
764 close(set->kqueue_fd);
767 elog(
ERROR,
"fcntl(F_SETFD) failed on kqueue descriptor: %m");
769 set->report_postmaster_not_running =
false;
770 #elif defined(WAIT_USE_WIN32) 800 #if defined(WAIT_USE_EPOLL) 801 close(set->epoll_fd);
803 #elif defined(WAIT_USE_KQUEUE) 804 close(set->kqueue_fd);
806 #elif defined(WAIT_USE_WIN32) 809 for (cur_event = set->
events;
810 cur_event < (set->events + set->nevents);
824 WSAEventSelect(cur_event->
fd, NULL, 0);
825 WSACloseEvent(set->handles[cur_event->
pos + 1]);
868 Assert(set->nevents < set->nevents_space);
873 set->exit_on_postmaster_death =
true;
879 elog(
ERROR,
"cannot wait on a latch owned by another process");
881 elog(
ERROR,
"cannot wait on more than one latch");
883 elog(
ERROR,
"latch events only support being set");
888 elog(
ERROR,
"cannot wait on latch without a specified latch");
893 elog(
ERROR,
"cannot wait on socket event without a socket");
895 event = &
set->events[
set->nevents];
896 event->pos =
set->nevents++;
901 event->reset =
false;
907 set->latch_pos =
event->pos;
908 #if defined(WAIT_USE_POLL) 909 event->fd = selfpipe_readfd;
910 #elif defined(WAIT_USE_EPOLL) 911 event->fd = signal_fd;
914 #ifdef WAIT_USE_EPOLL 927 #if defined(WAIT_USE_EPOLL) 928 WaitEventAdjustEpoll(
set, event, EPOLL_CTL_ADD);
929 #elif defined(WAIT_USE_KQUEUE) 930 WaitEventAdjustKqueue(
set, event, 0);
931 #elif defined(WAIT_USE_POLL) 932 WaitEventAdjustPoll(
set, event);
933 #elif defined(WAIT_USE_WIN32) 934 WaitEventAdjustWin32(
set, event);
951 #if defined(WAIT_USE_KQUEUE) 957 event = &
set->events[pos];
958 #if defined(WAIT_USE_KQUEUE) 959 old_events =
event->events;
968 if (events == event->
events &&
975 elog(
ERROR,
"cannot modify latch event");
980 elog(
ERROR,
"cannot modify postmaster death event");
989 elog(
ERROR,
"cannot wait on a latch owned by another process");
999 #if defined(WAIT_USE_WIN32) 1007 #if defined(WAIT_USE_EPOLL) 1008 WaitEventAdjustEpoll(
set, event, EPOLL_CTL_MOD);
1009 #elif defined(WAIT_USE_KQUEUE) 1010 WaitEventAdjustKqueue(
set, event, old_events);
1011 #elif defined(WAIT_USE_POLL) 1012 WaitEventAdjustPoll(
set, event);
1013 #elif defined(WAIT_USE_WIN32) 1014 WaitEventAdjustWin32(
set, event);
1018 #if defined(WAIT_USE_EPOLL) 1025 struct epoll_event epoll_ev;
1029 epoll_ev.data.ptr = event;
1031 epoll_ev.
events = EPOLLERR | EPOLLHUP;
1036 Assert(set->latch != NULL);
1037 epoll_ev.events |= EPOLLIN;
1041 epoll_ev.events |= EPOLLIN;
1049 epoll_ev.events |= EPOLLIN;
1051 epoll_ev.events |= EPOLLOUT;
1059 rc = epoll_ctl(set->epoll_fd, action, event->
fd, &epoll_ev);
1070 #if defined(WAIT_USE_POLL) 1074 struct pollfd *pollfd = &
set->pollfds[
event->pos];
1076 pollfd->revents = 0;
1077 pollfd->fd =
event->fd;
1082 Assert(set->latch != NULL);
1083 pollfd->events = POLLIN;
1087 pollfd->events = POLLIN;
1094 pollfd->events |= POLLIN;
1096 pollfd->events |= POLLOUT;
1103 #if defined(WAIT_USE_KQUEUE) 1111 #define AccessWaitEvent(k_ev) (*((WaitEvent **)(&(k_ev)->udata))) 1114 WaitEventAdjustKqueueAdd(
struct kevent *k_ev,
int filter,
int action,
1117 k_ev->ident =
event->fd;
1118 k_ev->filter = filter;
1122 AccessWaitEvent(k_ev) = event;
1126 WaitEventAdjustKqueueAddPostmaster(
struct kevent *k_ev,
WaitEvent *event)
1130 k_ev->filter = EVFILT_PROC;
1131 k_ev->flags = EV_ADD;
1132 k_ev->fflags = NOTE_EXIT;
1134 AccessWaitEvent(k_ev) = event;
1138 WaitEventAdjustKqueueAddLatch(
struct kevent *k_ev,
WaitEvent *event)
1141 k_ev->ident = SIGURG;
1142 k_ev->filter = EVFILT_SIGNAL;
1143 k_ev->flags = EV_ADD;
1146 AccessWaitEvent(k_ev) = event;
1156 struct kevent k_ev[2];
1158 bool new_filt_read =
false;
1159 bool old_filt_read =
false;
1160 bool new_filt_write =
false;
1161 bool old_filt_write =
false;
1163 if (old_events == event->
events)
1178 WaitEventAdjustKqueueAddPostmaster(&k_ev[count++], event);
1183 WaitEventAdjustKqueueAddLatch(&k_ev[count++], event);
1193 old_filt_read =
true;
1194 if (event->
events & WL_SOCKET_READABLE)
1195 new_filt_read =
true;
1197 old_filt_write =
true;
1198 if (event->
events & WL_SOCKET_WRITEABLE)
1199 new_filt_write =
true;
1200 if (old_filt_read && !new_filt_read)
1201 WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_READ, EV_DELETE,
1203 else if (!old_filt_read && new_filt_read)
1204 WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_READ, EV_ADD,
1206 if (old_filt_write && !new_filt_write)
1207 WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_WRITE, EV_DELETE,
1209 else if (!old_filt_write && new_filt_write)
1210 WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_WRITE, EV_ADD,
1217 rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
1229 (errno == ESRCH || errno == EACCES))
1230 set->report_postmaster_not_running =
true;
1247 set->report_postmaster_not_running =
true;
1253 #if defined(WAIT_USE_WIN32) 1257 HANDLE *handle = &
set->handles[
event->pos + 1];
1261 Assert(set->latch != NULL);
1262 *handle =
set->latch->event;
1266 *handle = PostmasterHandle;
1270 int flags = FD_CLOSE;
1277 flags |= FD_CONNECT;
1279 if (*handle == WSA_INVALID_EVENT)
1281 *handle = WSACreateEvent();
1282 if (*handle == WSA_INVALID_EVENT)
1283 elog(
ERROR,
"failed to create event for socket: error code %u",
1286 if (WSAEventSelect(event->
fd, *handle, flags) != 0)
1287 elog(
ERROR,
"failed to set up event for socket: error code %u",
1312 int returned_events = 0;
1315 long cur_timeout = -1;
1326 Assert(timeout >= 0 && timeout <= INT_MAX);
1327 cur_timeout = timeout;
1338 while (returned_events == 0)
1369 if (set->latch && !set->latch->is_set)
1372 set->latch->maybe_sleeping =
true;
1377 if (set->latch && set->latch->is_set)
1380 occurred_events->
pos =
set->latch_pos;
1382 set->events[
set->latch_pos].user_data;
1388 set->latch->maybe_sleeping =
false;
1399 occurred_events, nevents);
1403 Assert(set->latch->maybe_sleeping);
1404 set->latch->maybe_sleeping =
false;
1410 returned_events = rc;
1413 if (returned_events == 0 && timeout >= 0)
1418 if (cur_timeout <= 0)
1428 return returned_events;
1432 #if defined(WAIT_USE_EPOLL) 1446 int returned_events = 0;
1449 struct epoll_event *cur_epoll_event;
1452 rc = epoll_wait(set->epoll_fd, set->epoll_ret_events,
1453 nevents, cur_timeout);
1481 for (cur_epoll_event = set->epoll_ret_events;
1482 cur_epoll_event < (set->epoll_ret_events + rc) &&
1483 returned_events < nevents;
1487 cur_event = (
WaitEvent *) cur_epoll_event->data.ptr;
1489 occurred_events->
pos = cur_event->
pos;
1491 occurred_events->
events = 0;
1494 cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
1499 if (set->latch && set->latch->is_set)
1508 cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
1523 if (set->exit_on_postmaster_death)
1536 (cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP)))
1543 (cur_epoll_event->events & (EPOLLOUT | EPOLLERR | EPOLLHUP)))
1549 if (occurred_events->
events != 0)
1551 occurred_events->
fd = cur_event->
fd;
1558 return returned_events;
1561 #elif defined(WAIT_USE_KQUEUE) 1574 int returned_events = 0;
1577 struct kevent *cur_kqueue_event;
1578 struct timespec timeout;
1579 struct timespec *timeout_p;
1581 if (cur_timeout < 0)
1585 timeout.tv_sec = cur_timeout / 1000;
1586 timeout.tv_nsec = (cur_timeout % 1000) * 1000000;
1587 timeout_p = &timeout;
1594 if (
unlikely(set->report_postmaster_not_running))
1596 if (set->exit_on_postmaster_death)
1604 rc = kevent(set->kqueue_fd, NULL, 0,
1605 set->kqueue_ret_events, nevents,
1634 for (cur_kqueue_event = set->kqueue_ret_events;
1635 cur_kqueue_event < (set->kqueue_ret_events + rc) &&
1636 returned_events < nevents;
1640 cur_event = AccessWaitEvent(cur_kqueue_event);
1642 occurred_events->
pos = cur_event->
pos;
1644 occurred_events->
events = 0;
1647 cur_kqueue_event->filter == EVFILT_SIGNAL)
1649 if (set->latch && set->latch->is_set)
1658 cur_kqueue_event->filter == EVFILT_PROC &&
1659 (cur_kqueue_event->fflags & NOTE_EXIT) != 0)
1666 set->report_postmaster_not_running =
true;
1668 if (set->exit_on_postmaster_death)
1680 (cur_kqueue_event->filter == EVFILT_READ))
1687 (cur_kqueue_event->filter == EVFILT_WRITE))
1693 if (occurred_events->
events != 0)
1695 occurred_events->
fd = cur_event->
fd;
1702 return returned_events;
1705 #elif defined(WAIT_USE_POLL) 1717 int returned_events = 0;
1720 struct pollfd *cur_pollfd;
1723 rc = poll(set->pollfds, set->nevents, (
int) cur_timeout);
1746 for (cur_event = set->
events, cur_pollfd = set->pollfds;
1747 cur_event < (set->events + set->nevents) &&
1748 returned_events < nevents;
1749 cur_event++, cur_pollfd++)
1752 if (cur_pollfd->revents == 0)
1755 occurred_events->
pos = cur_event->
pos;
1757 occurred_events->
events = 0;
1760 (cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
1765 if (set->latch && set->latch->is_set)
1774 (cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
1789 if (set->exit_on_postmaster_death)
1799 int errflags = POLLHUP | POLLERR | POLLNVAL;
1804 (cur_pollfd->revents & (POLLIN | errflags)))
1811 (cur_pollfd->revents & (POLLOUT | errflags)))
1817 if (occurred_events->
events != 0)
1819 occurred_events->
fd = cur_event->
fd;
1825 return returned_events;
1828 #elif defined(WAIT_USE_WIN32) 1844 int returned_events = 0;
1849 for (cur_event = set->
events;
1850 cur_event < (set->events + set->nevents);
1853 if (cur_event->reset)
1855 WaitEventAdjustWin32(
set, cur_event);
1856 cur_event->reset =
false;
1880 r = WSASend(cur_event->
fd, &buf, 1, &sent, 0, NULL, NULL);
1881 if (r == 0 || WSAGetLastError() != WSAEWOULDBLOCK)
1883 occurred_events->
pos = cur_event->
pos;
1886 occurred_events->
fd = cur_event->
fd;
1897 rc = WaitForMultipleObjects(set->nevents + 1, set->handles, FALSE,
1901 if (rc == WAIT_FAILED)
1902 elog(
ERROR,
"WaitForMultipleObjects() failed: error code %lu",
1904 else if (rc == WAIT_TIMEOUT)
1910 if (rc == WAIT_OBJECT_0)
1923 occurred_events->
pos = cur_event->
pos;
1925 occurred_events->
events = 0;
1933 if (!ResetEvent(set->handles[cur_event->
pos + 1]))
1934 elog(
ERROR,
"ResetEvent failed: error code %lu", GetLastError());
1936 if (set->latch && set->latch->is_set)
1955 if (set->exit_on_postmaster_death)
1965 WSANETWORKEVENTS resEvents;
1966 HANDLE handle =
set->handles[cur_event->
pos + 1];
1970 occurred_events->
fd = cur_event->
fd;
1972 ZeroMemory(&resEvents,
sizeof(resEvents));
1973 if (WSAEnumNetworkEvents(cur_event->
fd, handle, &resEvents) != 0)
1974 elog(
ERROR,
"failed to enumerate network events: error code %u",
1977 (resEvents.lNetworkEvents & FD_READ))
1992 cur_event->reset =
true;
1995 (resEvents.lNetworkEvents & FD_WRITE))
2001 (resEvents.lNetworkEvents & FD_CONNECT))
2006 if (resEvents.lNetworkEvents & FD_CLOSE)
2012 if (occurred_events->
events != 0)
2019 return returned_events;
2023 #if defined(WAIT_USE_POLL) 2033 int save_errno = errno;
2043 sendSelfPipeByte(
void)
2049 rc =
write(selfpipe_writefd, &dummy, 1);
2074 #if defined(WAIT_USE_POLL) || defined(WAIT_USE_EPOLL) 2090 #ifdef WAIT_USE_POLL 2091 fd = selfpipe_readfd;
2098 rc =
read(fd, buf,
sizeof(buf));
2103 else if (errno ==
EINTR)
2108 #ifdef WAIT_USE_POLL 2109 elog(
ERROR,
"read() on self-pipe failed: %m");
2111 elog(
ERROR,
"read() on signalfd failed: %m");
2118 #ifdef WAIT_USE_POLL 2119 elog(
ERROR,
"unexpected EOF on self-pipe");
2121 elog(
ERROR,
"unexpected EOF on signalfd");
2124 else if (rc <
sizeof(buf))
void InitSharedLatch(Latch *latch)
#define WL_SOCKET_WRITEABLE
void FreeWaitEventSet(WaitEventSet *set)
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
#define INSTR_TIME_GET_MILLISEC(t)
struct timeval instr_time
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
#define WL_SOCKET_READABLE
void DisownLatch(Latch *latch)
void InitLatch(Latch *latch)
void SetLatch(Latch *latch)
static int fd(const char *x, int i)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
WaitEventSet * CreateWaitEventSet(MemoryContext context, int nevents)
#define StaticAssertStmt(condition, errmessage)
HANDLE pgwin32_signal_event
void pfree(void *pointer)
void pgwin32_dispatch_queued_signals(void)
#define LatchWaitSetLatchPos
void OwnLatch(Latch *latch)
sig_atomic_t maybe_sleeping
#define INSTR_TIME_SUBTRACT(x, y)
void ReserveExternalFD(void)
bool exit_on_postmaster_death
static int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, WaitEvent *occurred_events, int nevents)
#define PostmasterIsAlive()
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
static void pgstat_report_wait_end(void)
MemoryContext CurrentMemoryContext
MemoryContext TopMemoryContext
int errcode_for_socket_access(void)
int postmaster_alive_fds[2]
#define WL_POSTMASTER_DEATH
void InitializeLatchSupport(void)
bool PostmasterIsAliveInternal(void)
bool AcquireExternalFD(void)
void * MemoryContextAllocZero(MemoryContext context, Size size)
#define ereport(elevel,...)
pqsigfunc pqsignal(int signum, pqsigfunc handler)
#define pg_memory_barrier()
#define Assert(condition)
void InitializeLatchWaitSet(void)
static void pgstat_report_wait_start(uint32 wait_event_info)
#define INSTR_TIME_SET_CURRENT(t)
void ShutdownLatchSupport(void)
void ReleaseExternalFD(void)
int errmsg(const char *fmt,...)
static WaitEventSet * LatchWaitSet
#define WL_SOCKET_CONNECTED
static volatile sig_atomic_t waiting
#define POSTMASTER_FD_WATCH
#define WL_EXIT_ON_PM_DEATH
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
#define PG_USED_FOR_ASSERTS_ONLY