40 #ifdef HAVE_SYS_EPOLL_H
41 #include <sys/epoll.h>
43 #ifdef HAVE_SYS_EVENT_H
44 #include <sys/event.h>
46 #ifdef HAVE_SYS_SIGNALFD_H
47 #include <sys/signalfd.h>
72 #if defined(WAIT_USE_EPOLL) || defined(WAIT_USE_POLL) || \
73 defined(WAIT_USE_KQUEUE) || defined(WAIT_USE_WIN32)
75 #elif defined(HAVE_SYS_EPOLL_H)
76 #define WAIT_USE_EPOLL
77 #elif defined(HAVE_KQUEUE)
78 #define WAIT_USE_KQUEUE
79 #elif defined(HAVE_POLL)
82 #define WAIT_USE_WIN32
84 #error "no wait set implementation available"
91 #if defined(WAIT_USE_POLL) || defined(WAIT_USE_EPOLL)
92 #if defined(WAIT_USE_SELF_PIPE) || defined(WAIT_USE_SIGNALFD)
94 #elif defined(WAIT_USE_EPOLL) && defined(HAVE_SYS_SIGNALFD_H)
95 #define WAIT_USE_SIGNALFD
97 #define WAIT_USE_SELF_PIPE
129 #if defined(WAIT_USE_EPOLL)
132 struct epoll_event *epoll_ret_events;
133 #elif defined(WAIT_USE_KQUEUE)
136 struct kevent *kqueue_ret_events;
137 bool report_postmaster_not_running;
138 #elif defined(WAIT_USE_POLL)
141 #elif defined(WAIT_USE_WIN32)
156 #define LatchWaitSetLatchPos 0
163 #ifdef WAIT_USE_SIGNALFD
165 static int signal_fd = -1;
168 #ifdef WAIT_USE_SELF_PIPE
181 #if defined(WAIT_USE_SELF_PIPE) || defined(WAIT_USE_SIGNALFD)
182 static void drain(
void);
185 #if defined(WAIT_USE_EPOLL)
187 #elif defined(WAIT_USE_KQUEUE)
189 #elif defined(WAIT_USE_POLL)
191 #elif defined(WAIT_USE_WIN32)
196 WaitEvent *occurred_events,
int nevents);
207 #if defined(WAIT_USE_SELF_PIPE)
259 if (pipe(pipefd) < 0)
261 if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) == -1)
262 elog(
FATAL,
"fcntl(F_SETFL) failed on read-end of self-pipe: %m");
263 if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) == -1)
264 elog(
FATAL,
"fcntl(F_SETFL) failed on write-end of self-pipe: %m");
265 if (fcntl(pipefd[0], F_SETFD, FD_CLOEXEC) == -1)
266 elog(
FATAL,
"fcntl(F_SETFD) failed on read-end of self-pipe: %m");
267 if (fcntl(pipefd[1], F_SETFD, FD_CLOEXEC) == -1)
268 elog(
FATAL,
"fcntl(F_SETFD) failed on write-end of self-pipe: %m");
281 #ifdef WAIT_USE_SIGNALFD
282 sigset_t signalfd_mask;
294 (void)
close(signal_fd);
304 sigemptyset(&signalfd_mask);
305 sigaddset(&signalfd_mask, SIGURG);
306 signal_fd = signalfd(-1, &signalfd_mask, SFD_NONBLOCK | SFD_CLOEXEC);
312 #ifdef WAIT_USE_KQUEUE
339 #if defined(WAIT_USE_POLL)
349 #if defined(WAIT_USE_SELF_PIPE)
357 #if defined(WAIT_USE_SIGNALFD)
374 #if defined(WAIT_USE_SELF_PIPE)
377 #elif defined(WAIT_USE_SIGNALFD)
380 #elif defined(WAIT_USE_WIN32)
381 latch->event = CreateEvent(NULL, TRUE, FALSE, NULL);
382 if (latch->event == NULL)
383 elog(
ERROR,
"CreateEvent failed: error code %lu", GetLastError());
406 SECURITY_ATTRIBUTES
sa;
411 ZeroMemory(&
sa,
sizeof(
sa));
412 sa.nLength =
sizeof(
sa);
413 sa.bInheritHandle = TRUE;
415 latch->event = CreateEvent(&
sa, TRUE, FALSE, NULL);
416 if (latch->event == NULL)
417 elog(
ERROR,
"CreateEvent failed: error code %lu", GetLastError());
443 #if defined(WAIT_USE_SELF_PIPE)
446 #elif defined(WAIT_USE_SIGNALFD)
453 elog(
PANIC,
"latch already owned by PID %d", owner_pid);
514 wait_event_info) == 0)
539 long timeout,
uint32 wait_event_info)
659 #if defined(WAIT_USE_SELF_PIPE)
668 kill(owner_pid, SIGURG);
679 handle = latch->event;
736 #if defined(WAIT_USE_EPOLL)
737 sz +=
MAXALIGN(
sizeof(
struct epoll_event) * nevents);
738 #elif defined(WAIT_USE_KQUEUE)
739 sz +=
MAXALIGN(
sizeof(
struct kevent) * nevents);
740 #elif defined(WAIT_USE_POLL)
741 sz +=
MAXALIGN(
sizeof(
struct pollfd) * nevents);
742 #elif defined(WAIT_USE_WIN32)
744 sz +=
MAXALIGN(
sizeof(HANDLE) * (nevents + 1));
755 #if defined(WAIT_USE_EPOLL)
756 set->epoll_ret_events = (
struct epoll_event *)
data;
758 #elif defined(WAIT_USE_KQUEUE)
759 set->kqueue_ret_events = (
struct kevent *)
data;
761 #elif defined(WAIT_USE_POLL)
764 #elif defined(WAIT_USE_WIN32)
765 set->handles = (HANDLE)
data;
773 #if defined(WAIT_USE_EPOLL)
779 set->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
780 if (set->epoll_fd < 0)
785 #elif defined(WAIT_USE_KQUEUE)
791 set->kqueue_fd = kqueue();
792 if (set->kqueue_fd < 0)
797 if (fcntl(set->kqueue_fd, F_SETFD, FD_CLOEXEC) == -1)
799 int save_errno = errno;
801 close(set->kqueue_fd);
804 elog(
ERROR,
"fcntl(F_SETFD) failed on kqueue descriptor: %m");
806 set->report_postmaster_not_running =
false;
807 #elif defined(WAIT_USE_WIN32)
837 #if defined(WAIT_USE_EPOLL)
838 close(set->epoll_fd);
840 #elif defined(WAIT_USE_KQUEUE)
841 close(set->kqueue_fd);
843 #elif defined(WAIT_USE_WIN32)
846 for (cur_event = set->
events;
861 WSAEventSelect(cur_event->
fd, NULL, 0);
862 WSACloseEvent(set->handles[cur_event->
pos + 1]);
876 #if defined(WAIT_USE_EPOLL)
877 close(set->epoll_fd);
879 #elif defined(WAIT_USE_KQUEUE)
937 elog(
ERROR,
"cannot wait on a latch owned by another process");
939 elog(
ERROR,
"cannot wait on more than one latch");
941 elog(
ERROR,
"latch events only support being set");
946 elog(
ERROR,
"cannot wait on latch without a specified latch");
951 elog(
ERROR,
"cannot wait on socket event without a socket");
956 event->events = events;
957 event->user_data = user_data;
959 event->reset =
false;
966 #if defined(WAIT_USE_SELF_PIPE)
968 #elif defined(WAIT_USE_SIGNALFD)
969 event->fd = signal_fd;
972 #ifdef WAIT_USE_EPOLL
985 #if defined(WAIT_USE_EPOLL)
986 WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
987 #elif defined(WAIT_USE_KQUEUE)
988 WaitEventAdjustKqueue(set, event, 0);
989 #elif defined(WAIT_USE_POLL)
991 #elif defined(WAIT_USE_WIN32)
992 WaitEventAdjustWin32(set, event);
1009 #if defined(WAIT_USE_KQUEUE)
1013 Assert(pos < set->nevents);
1015 event = &set->
events[pos];
1016 #if defined(WAIT_USE_KQUEUE)
1017 old_events =
event->
events;
1026 if (events == event->
events &&
1033 elog(
ERROR,
"cannot modify latch event");
1038 elog(
ERROR,
"cannot modify postmaster death event");
1042 event->events = events;
1047 elog(
ERROR,
"cannot wait on a latch owned by another process");
1057 #if defined(WAIT_USE_WIN32)
1065 #if defined(WAIT_USE_EPOLL)
1066 WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
1067 #elif defined(WAIT_USE_KQUEUE)
1068 WaitEventAdjustKqueue(set, event, old_events);
1069 #elif defined(WAIT_USE_POLL)
1071 #elif defined(WAIT_USE_WIN32)
1072 WaitEventAdjustWin32(set, event);
1076 #if defined(WAIT_USE_EPOLL)
1083 struct epoll_event epoll_ev;
1087 epoll_ev.data.ptr = event;
1089 epoll_ev.
events = EPOLLERR | EPOLLHUP;
1095 epoll_ev.events |= EPOLLIN;
1099 epoll_ev.events |= EPOLLIN;
1109 epoll_ev.events |= EPOLLIN;
1111 epoll_ev.events |= EPOLLOUT;
1113 epoll_ev.events |= EPOLLRDHUP;
1121 rc = epoll_ctl(set->epoll_fd,
action, event->
fd, &epoll_ev);
1126 errmsg(
"%s() failed: %m",
1131 #if defined(WAIT_USE_POLL)
1135 struct pollfd *pollfd = &set->
pollfds[
event->pos];
1137 pollfd->revents = 0;
1138 pollfd->fd =
event->fd;
1144 pollfd->events = POLLIN;
1148 pollfd->events = POLLIN;
1157 pollfd->events |= POLLIN;
1159 pollfd->events |= POLLOUT;
1162 pollfd->events |= POLLRDHUP;
1170 #if defined(WAIT_USE_KQUEUE)
1178 #define AccessWaitEvent(k_ev) (*((WaitEvent **)(&(k_ev)->udata)))
1181 WaitEventAdjustKqueueAdd(
struct kevent *k_ev,
int filter,
int action,
1184 k_ev->ident =
event->fd;
1185 k_ev->filter = filter;
1189 AccessWaitEvent(k_ev) = event;
1193 WaitEventAdjustKqueueAddPostmaster(
struct kevent *k_ev,
WaitEvent *event)
1197 k_ev->filter = EVFILT_PROC;
1198 k_ev->flags = EV_ADD;
1199 k_ev->fflags = NOTE_EXIT;
1201 AccessWaitEvent(k_ev) = event;
1205 WaitEventAdjustKqueueAddLatch(
struct kevent *k_ev,
WaitEvent *event)
1208 k_ev->ident = SIGURG;
1209 k_ev->filter = EVFILT_SIGNAL;
1210 k_ev->flags = EV_ADD;
1213 AccessWaitEvent(k_ev) = event;
1223 struct kevent k_ev[2];
1225 bool new_filt_read =
false;
1226 bool old_filt_read =
false;
1227 bool new_filt_write =
false;
1228 bool old_filt_write =
false;
1230 if (old_events == event->
events)
1247 WaitEventAdjustKqueueAddPostmaster(&k_ev[count++], event);
1252 WaitEventAdjustKqueueAddLatch(&k_ev[count++], event);
1262 old_filt_read =
true;
1264 new_filt_read =
true;
1266 old_filt_write =
true;
1268 new_filt_write =
true;
1269 if (old_filt_read && !new_filt_read)
1270 WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_READ, EV_DELETE,
1272 else if (!old_filt_read && new_filt_read)
1273 WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_READ, EV_ADD,
1275 if (old_filt_write && !new_filt_write)
1276 WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_WRITE, EV_DELETE,
1278 else if (!old_filt_write && new_filt_write)
1279 WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_WRITE, EV_ADD,
1289 rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
1301 (errno == ESRCH || errno == EACCES))
1302 set->report_postmaster_not_running =
true;
1306 errmsg(
"%s() failed: %m",
1318 set->report_postmaster_not_running =
true;
1324 #if defined(WAIT_USE_WIN32)
1328 HANDLE *handle = &set->handles[
event->pos + 1];
1333 *handle = set->
latch->event;
1337 *handle = PostmasterHandle;
1341 int flags = FD_CLOSE;
1348 flags |= FD_CONNECT;
1352 if (*handle == WSA_INVALID_EVENT)
1354 *handle = WSACreateEvent();
1355 if (*handle == WSA_INVALID_EVENT)
1356 elog(
ERROR,
"failed to create event for socket: error code %d",
1359 if (WSAEventSelect(event->
fd, *handle, flags) != 0)
1360 elog(
ERROR,
"failed to set up event for socket: error code %d",
1382 WaitEvent *occurred_events,
int nevents,
1385 int returned_events = 0;
1388 long cur_timeout = -1;
1399 Assert(timeout >= 0 && timeout <= INT_MAX);
1400 cur_timeout = timeout;
1413 while (returned_events == 0)
1474 occurred_events, nevents);
1485 returned_events = rc;
1488 if (returned_events == 0 && timeout >= 0)
1493 if (cur_timeout <= 0)
1503 return returned_events;
1507 #if defined(WAIT_USE_EPOLL)
1519 WaitEvent *occurred_events,
int nevents)
1521 int returned_events = 0;
1524 struct epoll_event *cur_epoll_event;
1527 rc = epoll_wait(set->epoll_fd, set->epoll_ret_events,
1539 errmsg(
"%s() failed: %m",
1555 for (cur_epoll_event = set->epoll_ret_events;
1556 cur_epoll_event < (set->epoll_ret_events + rc) &&
1557 returned_events < nevents;
1561 cur_event = (
WaitEvent *) cur_epoll_event->data.ptr;
1563 occurred_events->
pos = cur_event->
pos;
1565 occurred_events->
events = 0;
1568 cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
1582 cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
1612 (cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP)))
1619 (cur_epoll_event->events & (EPOLLOUT | EPOLLERR | EPOLLHUP)))
1626 (cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)))
1632 if (occurred_events->
events != 0)
1634 occurred_events->
fd = cur_event->
fd;
1641 return returned_events;
1644 #elif defined(WAIT_USE_KQUEUE)
1655 WaitEvent *occurred_events,
int nevents)
1657 int returned_events = 0;
1660 struct kevent *cur_kqueue_event;
1661 struct timespec timeout;
1662 struct timespec *timeout_p;
1664 if (cur_timeout < 0)
1668 timeout.tv_sec = cur_timeout / 1000;
1669 timeout.tv_nsec = (cur_timeout % 1000) * 1000000;
1670 timeout_p = &timeout;
1677 if (
unlikely(set->report_postmaster_not_running))
1687 rc = kevent(set->kqueue_fd, NULL, 0,
1688 set->kqueue_ret_events,
1701 errmsg(
"%s() failed: %m",
1717 for (cur_kqueue_event = set->kqueue_ret_events;
1718 cur_kqueue_event < (set->kqueue_ret_events + rc) &&
1719 returned_events < nevents;
1723 cur_event = AccessWaitEvent(cur_kqueue_event);
1725 occurred_events->
pos = cur_event->
pos;
1727 occurred_events->
events = 0;
1730 cur_kqueue_event->filter == EVFILT_SIGNAL)
1741 cur_kqueue_event->filter == EVFILT_PROC &&
1742 (cur_kqueue_event->fflags & NOTE_EXIT) != 0)
1749 set->report_postmaster_not_running =
true;
1765 (cur_kqueue_event->filter == EVFILT_READ))
1772 (cur_kqueue_event->filter == EVFILT_READ) &&
1773 (cur_kqueue_event->flags & EV_EOF))
1780 (cur_kqueue_event->filter == EVFILT_WRITE))
1786 if (occurred_events->
events != 0)
1788 occurred_events->
fd = cur_event->
fd;
1795 return returned_events;
1798 #elif defined(WAIT_USE_POLL)
1808 WaitEvent *occurred_events,
int nevents)
1810 int returned_events = 0;
1813 struct pollfd *cur_pollfd;
1827 errmsg(
"%s() failed: %m",
1840 returned_events < nevents;
1841 cur_event++, cur_pollfd++)
1844 if (cur_pollfd->revents == 0)
1847 occurred_events->
pos = cur_event->
pos;
1849 occurred_events->
events = 0;
1852 (cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
1866 (cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
1893 int errflags = POLLHUP | POLLERR | POLLNVAL;
1898 (cur_pollfd->revents & (POLLIN | errflags)))
1905 (cur_pollfd->revents & (POLLOUT | errflags)))
1913 (cur_pollfd->revents & (POLLRDHUP | errflags)))
1920 if (occurred_events->
events != 0)
1922 occurred_events->
fd = cur_event->
fd;
1928 return returned_events;
1931 #elif defined(WAIT_USE_WIN32)
1942 WaitEvent *occurred_events,
int nevents)
1944 int returned_events = 0;
1949 for (cur_event = set->
events;
1953 if (cur_event->reset)
1955 WaitEventAdjustWin32(set, cur_event);
1956 cur_event->reset =
false;
1980 r = WSASend(cur_event->
fd, &
buf, 1, &sent, 0, NULL, NULL);
1981 if (r == 0 || WSAGetLastError() != WSAEWOULDBLOCK)
1983 occurred_events->
pos = cur_event->
pos;
1986 occurred_events->
fd = cur_event->
fd;
1997 rc = WaitForMultipleObjects(set->
nevents + 1, set->handles, FALSE,
2001 if (rc == WAIT_FAILED)
2002 elog(
ERROR,
"WaitForMultipleObjects() failed: error code %lu",
2004 else if (rc == WAIT_TIMEOUT)
2010 if (rc == WAIT_OBJECT_0)
2028 occurred_events->
pos = cur_event->
pos;
2030 occurred_events->
events = 0;
2038 if (!ResetEvent(set->handles[cur_event->
pos + 1]))
2039 elog(
ERROR,
"ResetEvent failed: error code %lu", GetLastError());
2070 WSANETWORKEVENTS resEvents;
2071 HANDLE handle = set->handles[cur_event->
pos + 1];
2075 occurred_events->
fd = cur_event->
fd;
2077 ZeroMemory(&resEvents,
sizeof(resEvents));
2078 if (WSAEnumNetworkEvents(cur_event->
fd, handle, &resEvents) != 0)
2079 elog(
ERROR,
"failed to enumerate network events: error code %d",
2082 (resEvents.lNetworkEvents & FD_READ))
2100 cur_event->reset =
true;
2103 (resEvents.lNetworkEvents & FD_WRITE))
2109 (resEvents.lNetworkEvents & FD_CONNECT))
2115 (resEvents.lNetworkEvents & FD_ACCEPT))
2120 if (resEvents.lNetworkEvents & FD_CLOSE)
2126 if (occurred_events->
events != 0)
2134 if (returned_events == nevents)
2138 next_pos = cur_event->
pos + 1;
2147 count = set->
nevents - next_pos;
2148 rc = WaitForMultipleObjects(count,
2149 set->handles + 1 + next_pos,
2157 if (rc < WAIT_OBJECT_0 || rc >= WAIT_OBJECT_0 + count)
2161 cur_event = &set->
events[next_pos + (rc - WAIT_OBJECT_0)];
2164 return returned_events;
2174 #if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \
2175 defined(WAIT_USE_EPOLL) || \
2176 defined(WAIT_USE_KQUEUE)
2192 #if defined(WAIT_USE_SELF_PIPE)
2202 int save_errno = errno;
2243 #if defined(WAIT_USE_SELF_PIPE) || defined(WAIT_USE_SIGNALFD)
2259 #ifdef WAIT_USE_SELF_PIPE
2272 else if (errno ==
EINTR)
2277 #ifdef WAIT_USE_SELF_PIPE
2278 elog(
ERROR,
"read() on self-pipe failed: %m");
2280 elog(
ERROR,
"read() on signalfd failed: %m");
2287 #ifdef WAIT_USE_SELF_PIPE
2288 elog(
ERROR,
"unexpected EOF on self-pipe");
2290 elog(
ERROR,
"unexpected EOF on signalfd");
2293 else if (rc <
sizeof(
buf))
#define pg_memory_barrier()
#define PG_USED_FOR_ASSERTS_ONLY
#define StaticAssertStmt(condition, errmessage)
elog(ERROR, "%s: %s", p2, msg)
int errcode_for_socket_access(void)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void ReleaseExternalFD(void)
bool AcquireExternalFD(void)
void ReserveExternalFD(void)
#define INSTR_TIME_SET_CURRENT(t)
#define INSTR_TIME_SUBTRACT(x, y)
#define INSTR_TIME_GET_MILLISEC(t)
#define INSTR_TIME_SET_ZERO(t)
if(TABLE==NULL||TABLE_index==NULL)
static void latch_sigurg_handler(SIGNAL_ARGS)
static void sendSelfPipeByte(void)
void InitializeLatchWaitSet(void)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
#define LatchWaitSetLatchPos
static int selfpipe_readfd
void OwnLatch(Latch *latch)
void DisownLatch(Latch *latch)
void FreeWaitEventSetAfterFork(WaitEventSet *set)
static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
static int selfpipe_owner_pid
static int selfpipe_writefd
int GetNumRegisteredWaitEvents(WaitEventSet *set)
WaitEventSet * CreateWaitEventSet(MemoryContext context, int nevents)
void InitSharedLatch(Latch *latch)
void InitializeLatchSupport(void)
static WaitEventSet * LatchWaitSet
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
static int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, WaitEvent *occurred_events, int nevents)
void SetLatch(Latch *latch)
void ShutdownLatchSupport(void)
bool WaitEventSetCanReportClosed(void)
void InitLatch(Latch *latch)
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
static volatile sig_atomic_t waiting
void FreeWaitEventSet(WaitEventSet *set)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
#define WL_SOCKET_CONNECTED
#define WL_POSTMASTER_DEATH
#define WL_SOCKET_WRITEABLE
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
MemoryContext TopMemoryContext
void * MemoryContextAllocZero(MemoryContext context, Size size)
MemoryContext CurrentMemoryContext
bool PostmasterIsAliveInternal(void)
#define PostmasterIsAlive()
pqsigfunc pqsignal(int signo, pqsigfunc func)
int postmaster_alive_fds[2]
#define POSTMASTER_FD_WATCH
static int fd(const char *x, int i)
void pgwin32_dispatch_queued_signals(void)
HANDLE pgwin32_signal_event
sig_atomic_t maybe_sleeping
bool exit_on_postmaster_death
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)