PostgreSQL Source Code  git master
test.c File Reference
#include "postgres.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "test_shm_mq.h"
Include dependency graph for test.c:

Go to the source code of this file.

Functions

 PG_FUNCTION_INFO_V1 (test_shm_mq)
 
 PG_FUNCTION_INFO_V1 (test_shm_mq_pipelined)
 
void _PG_init (void)
 
static void verify_message (Size origlen, char *origdata, Size newlen, char *newdata)
 
Datum test_shm_mq (PG_FUNCTION_ARGS)
 
Datum test_shm_mq_pipelined (PG_FUNCTION_ARGS)
 

Variables

 PG_MODULE_MAGIC
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 56 of file auth_delay.c.

57 {
58  /* Define custom GUC variables */
59  DefineCustomIntVariable("auth_delay.milliseconds",
60  "Milliseconds to delay before reporting authentication failure",
61  NULL,
63  0,
64  0, INT_MAX / 1000,
65  PGC_SIGHUP,
67  NULL,
68  NULL,
69  NULL);
70  /* Install Hooks */
73 }
ClientAuthentication_hook_type ClientAuthentication_hook
Definition: auth.c:234
static void auth_delay_checks(Port *port, int status)
Definition: auth_delay.c:35
static int auth_delay_milliseconds
Definition: auth_delay.c:26
static ClientAuthentication_hook_type original_client_auth_hook
Definition: auth_delay.c:29
void DefineCustomIntVariable(const char *name, const char *short_desc, const char *long_desc, int *valueAddr, int bootValue, int minValue, int maxValue, GucContext context, int flags, GucIntCheckHook check_hook, GucIntAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:9212
#define GUC_UNIT_MS
Definition: guc.h:224
@ PGC_SIGHUP
Definition: guc.h:72

References AccessExclusiveLock, add_int_reloption(), add_reloption_kind(), apw_start_leader_worker(), AssertVariableIsOfType, auth_delay_checks(), auth_delay_milliseconds, auto_explain_log_analyze, auto_explain_log_buffers, auto_explain_log_format, auto_explain_log_level, auto_explain_log_min_duration, auto_explain_log_nested_statements, auto_explain_log_settings, auto_explain_log_timing, auto_explain_log_triggers, auto_explain_log_verbose, auto_explain_log_wal, auto_explain_sample_rate, autoprewarm, autoprewarm_interval, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, bl_relopt_kind, bl_relopt_tab, buf, check_password(), check_password_hook, ClientAuthentication_hook, create_reloptions_table(), DEFAULT_BLOOM_BITS, DEFAULT_BLOOM_LENGTH, DefineCustomBoolVariable(), DefineCustomEnumVariable(), DefineCustomIntVariable(), DefineCustomRealVariable(), DefineCustomStringVariable(), delay_execution_planner(), dummy_object_relabel(), EAN13_index, EAN13_range, elog, EmitWarningsOnPlaceholders(), EnableQueryId(), HASHCTL::entrysize, ereport, errcode(), errmsg(), ERROR, ExecutorCheckPerms_hook, ExecutorEnd_hook, ExecutorFinish_hook, ExecutorRun_hook, ExecutorStart_hook, explain_ExecutorEnd(), explain_ExecutorFinish(), explain_ExecutorRun(), explain_ExecutorStart(), EXPLAIN_FORMAT_TEXT, find_rendezvous_variable(), format_options, gettext_noop, GUC_LIST_INPUT, GUC_NOT_IN_SAMPLE, GUC_UNIT_MS, GUC_UNIT_S, HASH_BLOBS, hash_create(), HASH_ELEM, hstoreCheckKeyLen, hstoreCheckKeyLen_p, hstoreCheckValLen, hstoreCheckValLen_p, hstorePairs, hstorePairs_p, hstoreUniquePairs, hstoreUniquePairs_p, hstoreUpgrade, hstoreUpgrade_p, i, INDEX_MAX_KEYS, Int32GetDatum, ISBN_index, ISBN_range, ISMN_index, ISMN_range, ISN_DEBUG, ISSN_index, ISSN_range, IsUnderPostmaster, HASHCTL::keysize, load_external_function(), LOG, loglevel_options, MAX_BLOOM_BITS, MAX_BLOOM_LENGTH, MAXALIGN, MemoryContextStrdup(), next_exec_check_perms_hook, next_object_access_hook, next_ProcessUtility_hook, object_access_hook, relopt_parse_elt::offset, offsetof, openssl_tls_init_hook, relopt_parse_elt::optname, relopt_parse_elt::opttype, original_client_auth_hook, pg_bindtextdomain(), PGC_POSTMASTER, PGC_SIGHUP, PGC_SUSET, PGC_USERSET, pgfdw_application_name, pgss_ExecutorEnd(), pgss_ExecutorFinish(), pgss_ExecutorRun(), pgss_ExecutorStart(), pgss_max, pgss_memsize(), pgss_planner(), pgss_post_parse_analyze(), pgss_ProcessUtility(), pgss_save, pgss_shmem_startup(), pgss_track, pgss_track_planning, PGSS_TRACK_TOP, pgss_track_utility, planner_hook, plperl_held_interp, plperl_init_interp(), plperl_interp_hash, plperl_on_init, plperl_on_plperl_init, plperl_on_plperlu_init, plperl_opmask, plperl_proc_hash, plperl_use_strict, plpgsql_check_asserts, plpgsql_extra_checks_check_hook(), plpgsql_extra_errors_assign_hook(), plpgsql_extra_errors_string, plpgsql_extra_warnings_assign_hook(), plpgsql_extra_warnings_string, plpgsql_HashTableInit(), plpgsql_plugin_ptr, plpgsql_print_strict_params, PLPGSQL_RESOLVE_ERROR, plpgsql_subxact_cb(), plpgsql_variable_conflict, plpgsql_xact_cb(), plpython_version_bitmask, plpython_version_bitmask_ptr, pltcl_AlertNotifier(), pltcl_CreateFileHandler(), pltcl_DeleteFileHandler(), pltcl_FinalizeNotifier(), pltcl_hold_interp, pltcl_InitNotifier(), pltcl_interp_htab, pltcl_pm_init_done, pltcl_proc_htab, pltcl_ServiceModeHook(), pltcl_SetTimer(), pltcl_start_proc, pltcl_WaitForEvent(), pltclu_start_proc, PLy_elog_impl(), PLy_elog_impl_p, PLyObject_AsString, PLyObject_AsString_p, PLyUnicode_FromStringAndSize, post_parse_analyze_hook, post_planning_lock_id, PQWalReceiverFunctions, prev_check_password_hook, prev_ExecutorEnd, prev_ExecutorFinish, prev_ExecutorRun, prev_ExecutorStart, prev_planner_hook, prev_post_parse_analyze_hook, prev_ProcessUtility, prev_row_security_policy_hook_permissive, prev_row_security_policy_hook_restrictive, prev_shmem_startup_hook, process_shared_preload_libraries_in_progress, ProcessUtility_hook, register_label_provider(), RegisterBackgroundWorker(), RegisterSubXactCallback(), RegisterXactCallback(), RELOPT_TYPE_INT, RequestAddinShmemSpace(), RequestNamedLWLockTranche(), row_security_policy_hook_permissive, row_security_policy_hook_restrictive, sepgsql_avc_init(), sepgsql_context_info, sepgsql_debug_audit, sepgsql_exec_check_perms(), sepgsql_init_client_label(), SEPGSQL_LABEL_TAG, SEPGSQL_MODE_DISABLED, sepgsql_object_access(), sepgsql_object_relabel(), sepgsql_permissive, sepgsql_set_mode(), sepgsql_utility_command(), set_rot13(), shmem_startup_hook, similarity_threshold, snprintf, sprintf, ssl_passphrase, strict_word_similarity_threshold, test_rls_hooks_permissive(), test_rls_hooks_restrictive(), TEXTDOMAIN, TopMemoryContext, track_options, UPC_index, UPC_range, variable_conflict_options, WalReceiverFunctions, and word_similarity_threshold.

◆ PG_FUNCTION_INFO_V1() [1/2]

PG_FUNCTION_INFO_V1 ( test_shm_mq  )

◆ PG_FUNCTION_INFO_V1() [2/2]

PG_FUNCTION_INFO_V1 ( test_shm_mq_pipelined  )

◆ test_shm_mq()

Datum test_shm_mq ( PG_FUNCTION_ARGS  )

Definition at line 41 of file test.c.

42 {
43  int64 queue_size = PG_GETARG_INT64(0);
44  text *message = PG_GETARG_TEXT_PP(1);
45  char *message_contents = VARDATA_ANY(message);
46  int message_size = VARSIZE_ANY_EXHDR(message);
47  int32 loop_count = PG_GETARG_INT32(2);
48  int32 nworkers = PG_GETARG_INT32(3);
49  dsm_segment *seg;
50  shm_mq_handle *outqh;
51  shm_mq_handle *inqh;
53  Size len;
54  void *data;
55 
56  /* A negative loopcount is nonsensical. */
57  if (loop_count < 0)
58  ereport(ERROR,
59  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
60  errmsg("repeat count size must be an integer value greater than or equal to zero")));
61 
62  /*
63  * Since this test sends data using the blocking interfaces, it cannot
64  * send data to itself. Therefore, a minimum of 1 worker is required. Of
65  * course, a negative worker count is nonsensical.
66  */
67  if (nworkers <= 0)
68  ereport(ERROR,
69  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
70  errmsg("number of workers must be an integer value greater than zero")));
71 
72  /* Set up dynamic shared memory segment and background workers. */
73  test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
74 
75  /* Send the initial message. */
76  res = shm_mq_send(outqh, message_size, message_contents, false, true);
77  if (res != SHM_MQ_SUCCESS)
78  ereport(ERROR,
79  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
80  errmsg("could not send message")));
81 
82  /*
83  * Receive a message and send it back out again. Do this a number of
84  * times equal to the loop count.
85  */
86  for (;;)
87  {
88  /* Receive a message. */
89  res = shm_mq_receive(inqh, &len, &data, false);
90  if (res != SHM_MQ_SUCCESS)
91  ereport(ERROR,
92  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
93  errmsg("could not receive message")));
94 
95  /* If this is supposed to be the last iteration, stop here. */
96  if (--loop_count <= 0)
97  break;
98 
99  /* Send it back out. */
100  res = shm_mq_send(outqh, len, data, false, true);
101  if (res != SHM_MQ_SUCCESS)
102  ereport(ERROR,
103  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
104  errmsg("could not send message")));
105  }
106 
107  /*
108  * Finally, check that we got back the same message from the last
109  * iteration that we originally sent.
110  */
111  verify_message(message_size, message_contents, len, data);
112 
113  /* Clean up. */
114  dsm_detach(seg);
115 
116  PG_RETURN_VOID();
117 }
signed int int32
Definition: c.h:429
size_t Size
Definition: c.h:540
void dsm_detach(dsm_segment *seg)
Definition: dsm.c:772
int errcode(int sqlerrcode)
Definition: elog.c:698
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define ERROR
Definition: elog.h:33
#define ereport(elevel,...)
Definition: elog.h:143
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_GETARG_INT64(n)
Definition: fmgr.h:283
#define PG_GETARG_INT32(n)
Definition: fmgr.h:269
const void size_t len
const void * data
#define VARDATA_ANY(PTR)
Definition: postgres.h:361
#define VARSIZE_ANY_EXHDR(PTR)
Definition: postgres.h:354
void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
Definition: setup.c:48
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:574
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:330
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
Definition: c.h:622
static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
Definition: test.c:252

References data, dsm_detach(), ereport, errcode(), errmsg(), ERROR, len, PG_GETARG_INT32, PG_GETARG_INT64, PG_GETARG_TEXT_PP, PG_RETURN_VOID, res, shm_mq_receive(), shm_mq_send(), SHM_MQ_SUCCESS, test_shm_mq_setup(), VARDATA_ANY, VARSIZE_ANY_EXHDR, and verify_message().

◆ test_shm_mq_pipelined()

Datum test_shm_mq_pipelined ( PG_FUNCTION_ARGS  )

Definition at line 130 of file test.c.

131 {
132  int64 queue_size = PG_GETARG_INT64(0);
133  text *message = PG_GETARG_TEXT_PP(1);
134  char *message_contents = VARDATA_ANY(message);
135  int message_size = VARSIZE_ANY_EXHDR(message);
136  int32 loop_count = PG_GETARG_INT32(2);
137  int32 nworkers = PG_GETARG_INT32(3);
138  bool verify = PG_GETARG_BOOL(4);
139  int32 send_count = 0;
140  int32 receive_count = 0;
141  dsm_segment *seg;
142  shm_mq_handle *outqh;
143  shm_mq_handle *inqh;
145  Size len;
146  void *data;
147 
148  /* A negative loopcount is nonsensical. */
149  if (loop_count < 0)
150  ereport(ERROR,
151  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
152  errmsg("repeat count size must be an integer value greater than or equal to zero")));
153 
154  /*
155  * Using the nonblocking interfaces, we can even send data to ourselves,
156  * so the minimum number of workers for this test is zero.
157  */
158  if (nworkers < 0)
159  ereport(ERROR,
160  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
161  errmsg("number of workers must be an integer value greater than or equal to zero")));
162 
163  /* Set up dynamic shared memory segment and background workers. */
164  test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
165 
166  /* Main loop. */
167  for (;;)
168  {
169  bool wait = true;
170 
171  /*
172  * If we haven't yet sent the message the requisite number of times,
173  * try again to send it now. Note that when shm_mq_send() returns
174  * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
175  * same message size and contents; that's not an issue here because
176  * we're sending the same message every time.
177  */
178  if (send_count < loop_count)
179  {
180  res = shm_mq_send(outqh, message_size, message_contents, true,
181  true);
182  if (res == SHM_MQ_SUCCESS)
183  {
184  ++send_count;
185  wait = false;
186  }
187  else if (res == SHM_MQ_DETACHED)
188  ereport(ERROR,
189  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190  errmsg("could not send message")));
191  }
192 
193  /*
194  * If we haven't yet received the message the requisite number of
195  * times, try to receive it again now.
196  */
197  if (receive_count < loop_count)
198  {
199  res = shm_mq_receive(inqh, &len, &data, true);
200  if (res == SHM_MQ_SUCCESS)
201  {
202  ++receive_count;
203  /* Verifying every time is slow, so it's optional. */
204  if (verify)
205  verify_message(message_size, message_contents, len, data);
206  wait = false;
207  }
208  else if (res == SHM_MQ_DETACHED)
209  ereport(ERROR,
210  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
211  errmsg("could not receive message")));
212  }
213  else
214  {
215  /*
216  * Otherwise, we've received the message enough times. This
217  * shouldn't happen unless we've also sent it enough times.
218  */
219  if (send_count != receive_count)
220  ereport(ERROR,
221  (errcode(ERRCODE_INTERNAL_ERROR),
222  errmsg("message sent %d times, but received %d times",
223  send_count, receive_count)));
224  break;
225  }
226 
227  if (wait)
228  {
229  /*
230  * If we made no progress, wait for one of the other processes to
231  * which we are connected to set our latch, indicating that they
232  * have read or written data and therefore there may now be work
233  * for us to do.
234  */
239  }
240  }
241 
242  /* Clean up. */
243  dsm_detach(seg);
244 
245  PG_RETURN_VOID();
246 }
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
struct Latch * MyLatch
Definition: globals.c:57
void ResetLatch(Latch *latch)
Definition: latch.c:660
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40
#define PG_WAIT_EXTENSION
Definition: wait_event.h:23

References CHECK_FOR_INTERRUPTS, data, dsm_detach(), ereport, errcode(), errmsg(), ERROR, len, MyLatch, PG_GETARG_BOOL, PG_GETARG_INT32, PG_GETARG_INT64, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PG_WAIT_EXTENSION, res, ResetLatch(), SHM_MQ_DETACHED, shm_mq_receive(), shm_mq_send(), SHM_MQ_SUCCESS, test_shm_mq_setup(), VARDATA_ANY, VARSIZE_ANY_EXHDR, verify_message(), WaitLatch(), WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

◆ verify_message()

static void verify_message ( Size  origlen,
char *  origdata,
Size  newlen,
char *  newdata 
)
static

Definition at line 252 of file test.c.

253 {
254  Size i;
255 
256  if (origlen != newlen)
257  ereport(ERROR,
258  (errmsg("message corrupted"),
259  errdetail("The original message was %zu bytes but the final message is %zu bytes.",
260  origlen, newlen)));
261 
262  for (i = 0; i < origlen; ++i)
263  if (origdata[i] != newdata[i])
264  ereport(ERROR,
265  (errmsg("message corrupted"),
266  errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
267 }
int errdetail(const char *fmt,...)
Definition: elog.c:1042
int i
Definition: isn.c:73

References ereport, errdetail(), errmsg(), ERROR, and i.

Referenced by test_shm_mq(), and test_shm_mq_pipelined().

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 22 of file test.c.