PostgreSQL Source Code  git master
pgbench.c
Go to the documentation of this file.
1 /*
2  * pgbench.c
3  *
4  * A simple benchmark program for PostgreSQL
5  * Originally written by Tatsuo Ishii and enhanced by many contributors.
6  *
7  * src/bin/pgbench/pgbench.c
8  * Copyright (c) 2000-2018, PostgreSQL Global Development Group
9  * ALL RIGHTS RESERVED;
10  *
11  * Permission to use, copy, modify, and distribute this software and its
12  * documentation for any purpose, without fee, and without a written agreement
13  * is hereby granted, provided that the above copyright notice and this
14  * paragraph and the following two paragraphs appear in all copies.
15  *
16  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20  * POSSIBILITY OF SUCH DAMAGE.
21  *
22  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24  * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27  *
28  */
29 
30 #ifdef WIN32
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
32 #endif /* ! WIN32 */
33 
34 #include "postgres_fe.h"
35 #include "fe_utils/conditional.h"
36 
37 #include "getopt_long.h"
38 #include "libpq-fe.h"
39 #include "portability/instr_time.h"
40 
41 #include <ctype.h>
42 #include <float.h>
43 #include <limits.h>
44 #include <math.h>
45 #include <signal.h>
46 #include <time.h>
47 #include <sys/time.h>
48 #ifdef HAVE_SYS_SELECT_H
49 #include <sys/select.h>
50 #endif
51 
52 #ifdef HAVE_SYS_RESOURCE_H
53 #include <sys/resource.h> /* for getrlimit */
54 #endif
55 
56 #ifndef M_PI
57 #define M_PI 3.14159265358979323846
58 #endif
59 
60 #include "pgbench.h"
61 
62 #define ERRCODE_UNDEFINED_TABLE "42P01"
63 
64 /*
65  * Hashing constants
66  */
67 #define FNV_PRIME UINT64CONST(0x100000001b3)
68 #define FNV_OFFSET_BASIS UINT64CONST(0xcbf29ce484222325)
69 #define MM2_MUL UINT64CONST(0xc6a4a7935bd1e995)
70 #define MM2_MUL_TIMES_8 UINT64CONST(0x35253c9ade8f4ca8)
71 #define MM2_ROT 47
72 
73 /*
74  * Multi-platform pthread implementations
75  */
76 
77 #ifdef WIN32
78 /* Use native win32 threads on Windows */
79 typedef struct win32_pthread *pthread_t;
80 typedef int pthread_attr_t;
81 
82 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
83 static int pthread_join(pthread_t th, void **thread_return);
84 #elif defined(ENABLE_THREAD_SAFETY)
85 /* Use platform-dependent pthread capability */
86 #include <pthread.h>
87 #else
88 /* No threads implementation, use none (-j 1) */
89 #define pthread_t void *
90 #endif
91 
92 
93 /********************************************************************
94  * some configurable parameters */
95 
96 /* max number of clients allowed */
97 #ifdef FD_SETSIZE
98 #define MAXCLIENTS (FD_SETSIZE - 10)
99 #else
100 #define MAXCLIENTS 1024
101 #endif
102 
103 #define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */
104 
105 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
106 #define DEFAULT_NXACTS 10 /* default nxacts */
107 
108 #define ZIPF_CACHE_SIZE 15 /* cache cells number */
109 
110 #define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */
111 #define MAX_ZIPFIAN_PARAM 1000 /* maximum parameter for zipfian */
112 
113 int nxacts = 0; /* number of transactions per client */
114 int duration = 0; /* duration in seconds */
115 int64 end_time = 0; /* when to stop in micro seconds, under -T */
116 
117 /*
118  * scaling factor. for example, scale = 10 will make 1000000 tuples in
119  * pgbench_accounts table.
120  */
121 int scale = 1;
122 
123 /*
124  * fillfactor. for example, fillfactor = 90 will use only 90 percent
125  * space during inserts and leave 10 percent free.
126  */
127 int fillfactor = 100;
128 
129 /*
130  * use unlogged tables?
131  */
132 bool unlogged_tables = false;
133 
134 /*
135  * log sampling rate (1.0 = log everything, 0.0 = option not given)
136  */
137 double sample_rate = 0.0;
138 
139 /*
140  * When threads are throttled to a given rate limit, this is the target delay
141  * to reach that rate in usec. 0 is the default and means no throttling.
142  */
143 int64 throttle_delay = 0;
144 
145 /*
146  * Transactions which take longer than this limit (in usec) are counted as
147  * late, and reported as such, although they are completed anyway. When
148  * throttling is enabled, execution time slots that are more than this late
149  * are skipped altogether, and counted separately.
150  */
151 int64 latency_limit = 0;
152 
153 /*
154  * tablespace selection
155  */
156 char *tablespace = NULL;
157 char *index_tablespace = NULL;
158 
159 /* random seed used when calling srandom() */
160 int64 random_seed = -1;
161 
162 /*
163  * end of configurable parameters
164  *********************************************************************/
165 
166 #define nbranches 1 /* Makes little sense to change this. Change
167  * -s instead */
168 #define ntellers 10
169 #define naccounts 100000
170 
171 /*
172  * The scale factor at/beyond which 32bit integers are incapable of storing
173  * 64bit values.
174  *
175  * Although the actual threshold is 21474, we use 20000 because it is easier to
176  * document and remember, and isn't that far away from the real threshold.
177  */
178 #define SCALE_32BIT_THRESHOLD 20000
179 
180 bool use_log; /* log transaction latencies to a file */
181 bool use_quiet; /* quiet logging onto stderr */
182 int agg_interval; /* log aggregates instead of individual
183  * transactions */
184 bool per_script_stats = false; /* whether to collect stats per script */
185 int progress = 0; /* thread progress report every this seconds */
186 bool progress_timestamp = false; /* progress report with Unix time */
187 int nclients = 1; /* number of clients */
188 int nthreads = 1; /* number of threads */
189 bool is_connect; /* establish connection for each transaction */
190 bool is_latencies; /* report per-command latencies */
191 int main_pid; /* main process id used in log filename */
192 
193 char *pghost = "";
194 char *pgport = "";
195 char *login = NULL;
196 char *dbName;
197 char *logfile_prefix = NULL;
198 const char *progname;
199 
200 #define WSEP '@' /* weight separator */
201 
202 volatile bool timer_exceeded = false; /* flag from signal handler */
203 
204 /*
205  * Variable definitions.
206  *
207  * If a variable only has a string value, "svalue" is that value, and value is
208  * "not set". If the value is known, "value" contains the value (in any
209  * variant).
210  *
211  * In this case "svalue" contains the string equivalent of the value, if we've
212  * had occasion to compute that, or NULL if we haven't.
213  */
214 typedef struct
215 {
216  char *name; /* variable's name */
217  char *svalue; /* its value in string form, if known */
218  PgBenchValue value; /* actual variable's value */
219 } Variable;
220 
221 #define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */
222 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
223 
224 /*
225  * Simple data structure to keep stats about something.
226  *
227  * XXX probably the first value should be kept and used as an offset for
228  * better numerical stability...
229  */
230 typedef struct SimpleStats
231 {
232  int64 count; /* how many values were encountered */
233  double min; /* the minimum seen */
234  double max; /* the maximum seen */
235  double sum; /* sum of values */
236  double sum2; /* sum of squared values */
237 } SimpleStats;
238 
239 /*
240  * Data structure to hold various statistics: per-thread and per-script stats
241  * are maintained and merged together.
242  */
243 typedef struct StatsData
244 {
245  time_t start_time; /* interval start time, for aggregates */
246  int64 cnt; /* number of transactions, including skipped */
247  int64 skipped; /* number of transactions skipped under --rate
248  * and --latency-limit */
251 } StatsData;
252 
253 /*
254  * Connection state machine states.
255  */
256 typedef enum
257 {
258  /*
259  * The client must first choose a script to execute. Once chosen, it can
260  * either be throttled (state CSTATE_START_THROTTLE under --rate) or start
261  * right away (state CSTATE_START_TX).
262  */
264 
265  /*
266  * In CSTATE_START_THROTTLE state, we calculate when to begin the next
267  * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
268  * sleeps until that moment. (If throttling is not enabled, doCustom()
269  * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
270  */
273 
274  /*
275  * CSTATE_START_TX performs start-of-transaction processing. Establishes
276  * a new connection for the transaction, in --connect mode, and records
277  * the transaction start time.
278  */
280 
281  /*
282  * We loop through these states, to process each command in the script:
283  *
284  * CSTATE_START_COMMAND starts the execution of a command. On a SQL
285  * command, the command is sent to the server, and we move to
286  * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
287  * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
288  * meta-commands are executed immediately.
289  *
290  * CSTATE_SKIP_COMMAND for conditional branches which are not executed,
291  * quickly skip commands that do not need any evaluation.
292  *
293  * CSTATE_WAIT_RESULT waits until we get a result set back from the server
294  * for the current command.
295  *
296  * CSTATE_SLEEP waits until the end of \sleep.
297  *
298  * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
299  * command counter, and loops back to CSTATE_START_COMMAND state.
300  */
306 
307  /*
308  * CSTATE_END_TX performs end-of-transaction processing. Calculates
309  * latency, and logs the transaction. In --connect mode, closes the
310  * current connection. Chooses the next script to execute and starts over
311  * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no
312  * more work to do.
313  */
315 
316  /*
317  * Final states. CSTATE_ABORTED means that the script execution was
318  * aborted because a command failed, CSTATE_FINISHED means success.
319  */
323 
324 /*
325  * Connection state.
326  */
327 typedef struct
328 {
329  PGconn *con; /* connection handle to DB */
330  int id; /* client No. */
331  ConnectionStateEnum state; /* state machine's current state. */
332  ConditionalStack cstack; /* enclosing conditionals state */
333 
334  int use_file; /* index in sql_script for this client */
335  int command; /* command number in script */
336 
337  /* client variables */
338  Variable *variables; /* array of variable definitions */
339  int nvariables; /* number of variables */
340  bool vars_sorted; /* are variables sorted by name? */
341 
342  /* various times about current transaction */
343  int64 txn_scheduled; /* scheduled start time of transaction (usec) */
344  int64 sleep_until; /* scheduled start time of next cmd (usec) */
345  instr_time txn_begin; /* used for measuring schedule lag times */
346  instr_time stmt_begin; /* used for measuring statement latencies */
347 
348  bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
349 
350  /* per client collected stats */
351  int64 cnt; /* client transaction count, for -t */
352  int ecnt; /* error count */
353 } CState;
354 
355 /*
356  * Cache cell for zipfian_random call
357  */
358 typedef struct
359 {
360  /* cell keys */
361  double s; /* s - parameter of zipfan_random function */
362  int64 n; /* number of elements in range (max - min + 1) */
363 
364  double harmonicn; /* generalizedHarmonicNumber(n, s) */
365  double alpha;
366  double beta;
367  double eta;
368 
369  uint64 last_used; /* last used logical time */
370 } ZipfCell;
371 
372 /*
373  * Zipf cache for zeta values
374  */
375 typedef struct
376 {
377  uint64 current; /* counter for LRU cache replacement algorithm */
378 
379  int nb_cells; /* number of filled cells */
380  int overflowCount; /* number of cache overflows */
382 } ZipfCache;
383 
384 /*
385  * Thread state
386  */
387 typedef struct
388 {
389  int tid; /* thread id */
390  pthread_t thread; /* thread handle */
391  CState *state; /* array of CState */
392  int nstate; /* length of state[] */
393  unsigned short random_state[3]; /* separate randomness for each thread */
394  int64 throttle_trigger; /* previous/next throttling (us) */
395  FILE *logfile; /* where to log, or NULL */
396  ZipfCache zipf_cache; /* for thread-safe zipfian random number
397  * generation */
398 
399  /* per thread collected stats */
400  instr_time start_time; /* thread start time */
403  int64 latency_late; /* executed but late transactions */
404 } TState;
405 
406 #define INVALID_THREAD ((pthread_t) 0)
407 
408 /*
409  * queries read from files
410  */
411 #define SQL_COMMAND 1
412 #define META_COMMAND 2
413 #define MAX_ARGS 10
414 
415 typedef enum MetaCommand
416 {
417  META_NONE, /* not a known meta-command */
418  META_SET, /* \set */
419  META_SETSHELL, /* \setshell */
420  META_SHELL, /* \shell */
421  META_SLEEP, /* \sleep */
422  META_IF, /* \if */
423  META_ELIF, /* \elif */
424  META_ELSE, /* \else */
425  META_ENDIF /* \endif */
426 } MetaCommand;
427 
428 typedef enum QueryMode
429 {
430  QUERY_SIMPLE, /* simple query */
431  QUERY_EXTENDED, /* extended query */
432  QUERY_PREPARED, /* extended query with prepared statements */
434 } QueryMode;
435 
436 static QueryMode querymode = QUERY_SIMPLE;
437 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
438 
439 typedef struct
440 {
441  char *line; /* text of command line */
442  int command_num; /* unique index of this Command struct */
443  int type; /* command type (SQL_COMMAND or META_COMMAND) */
444  MetaCommand meta; /* meta command identifier, or META_NONE */
445  int argc; /* number of command words */
446  char *argv[MAX_ARGS]; /* command word list */
447  PgBenchExpr *expr; /* parsed expression, if needed */
448  SimpleStats stats; /* time spent in this command */
449 } Command;
450 
451 typedef struct ParsedScript
452 {
453  const char *desc; /* script descriptor (eg, file name) */
454  int weight; /* selection weight */
455  Command **commands; /* NULL-terminated array of Commands */
456  StatsData stats; /* total time spent in script */
457 } ParsedScript;
458 
459 static ParsedScript sql_script[MAX_SCRIPTS]; /* SQL script files */
460 static int num_scripts; /* number of scripts in sql_script[] */
461 static int num_commands = 0; /* total number of Command structs */
462 static int64 total_weight = 0;
463 
464 static int debug = 0; /* debug flag */
465 
466 /* Builtin test scripts */
467 typedef struct BuiltinScript
468 {
469  const char *name; /* very short name for -b ... */
470  const char *desc; /* short description */
471  const char *script; /* actual pgbench script */
472 } BuiltinScript;
473 
475 {
476  {
477  "tpcb-like",
478  "<builtin: TPC-B (sort of)>",
479  "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
480  "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
481  "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
482  "\\set delta random(-5000, 5000)\n"
483  "BEGIN;\n"
484  "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
485  "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
486  "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
487  "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
488  "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
489  "END;\n"
490  },
491  {
492  "simple-update",
493  "<builtin: simple update>",
494  "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
495  "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
496  "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
497  "\\set delta random(-5000, 5000)\n"
498  "BEGIN;\n"
499  "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
500  "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
501  "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
502  "END;\n"
503  },
504  {
505  "select-only",
506  "<builtin: select only>",
507  "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
508  "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
509  }
510 };
511 
512 
513 /* Function prototypes */
514 static void setNullValue(PgBenchValue *pv);
515 static void setBoolValue(PgBenchValue *pv, bool bval);
516 static void setIntValue(PgBenchValue *pv, int64 ival);
517 static void setDoubleValue(PgBenchValue *pv, double dval);
518 static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *);
519 static void doLog(TState *thread, CState *st,
520  StatsData *agg, bool skipped, double latency, double lag);
521 static void processXactStats(TState *thread, CState *st, instr_time *now,
522  bool skipped, StatsData *agg);
523 static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
524 static void addScript(ParsedScript script);
525 static void *threadRun(void *arg);
526 static void setalarm(int seconds);
527 static void finishCon(CState *st);
528 
529 
530 /* callback functions for our flex lexer */
532  NULL, /* don't need get_variable functionality */
534 };
535 
536 
537 static void
538 usage(void)
539 {
540  printf("%s is a benchmarking tool for PostgreSQL.\n\n"
541  "Usage:\n"
542  " %s [OPTION]... [DBNAME]\n"
543  "\nInitialization options:\n"
544  " -i, --initialize invokes initialization mode\n"
545  " -I, --init-steps=[dtgvpf]+ (default \"dtgvp\")\n"
546  " run selected initialization steps\n"
547  " -F, --fillfactor=NUM set fill factor\n"
548  " -n, --no-vacuum do not run VACUUM during initialization\n"
549  " -q, --quiet quiet logging (one message each 5 seconds)\n"
550  " -s, --scale=NUM scaling factor\n"
551  " --foreign-keys create foreign key constraints between tables\n"
552  " --index-tablespace=TABLESPACE\n"
553  " create indexes in the specified tablespace\n"
554  " --tablespace=TABLESPACE create tables in the specified tablespace\n"
555  " --unlogged-tables create tables as unlogged tables\n"
556  "\nOptions to select what to run:\n"
557  " -b, --builtin=NAME[@W] add builtin script NAME weighted at W (default: 1)\n"
558  " (use \"-b list\" to list available scripts)\n"
559  " -f, --file=FILENAME[@W] add script FILENAME weighted at W (default: 1)\n"
560  " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
561  " (same as \"-b simple-update\")\n"
562  " -S, --select-only perform SELECT-only transactions\n"
563  " (same as \"-b select-only\")\n"
564  "\nBenchmarking options:\n"
565  " -c, --client=NUM number of concurrent database clients (default: 1)\n"
566  " -C, --connect establish new connection for each transaction\n"
567  " -D, --define=VARNAME=VALUE\n"
568  " define variable for use by custom script\n"
569  " -j, --jobs=NUM number of threads (default: 1)\n"
570  " -l, --log write transaction times to log file\n"
571  " -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n"
572  " -M, --protocol=simple|extended|prepared\n"
573  " protocol for submitting queries (default: simple)\n"
574  " -n, --no-vacuum do not run VACUUM before tests\n"
575  " -P, --progress=NUM show thread progress report every NUM seconds\n"
576  " -r, --report-latencies report average latency per command\n"
577  " -R, --rate=NUM target rate in transactions per second\n"
578  " -s, --scale=NUM report this scale factor in output\n"
579  " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
580  " -T, --time=NUM duration of benchmark test in seconds\n"
581  " -v, --vacuum-all vacuum all four standard tables before tests\n"
582  " --aggregate-interval=NUM aggregate data over NUM seconds\n"
583  " --log-prefix=PREFIX prefix for transaction time log file\n"
584  " (default: \"pgbench_log\")\n"
585  " --progress-timestamp use Unix epoch timestamps for progress\n"
586  " --random-seed=SEED set random seed (\"time\", \"rand\", integer)\n"
587  " --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n"
588  "\nCommon options:\n"
589  " -d, --debug print debugging output\n"
590  " -h, --host=HOSTNAME database server host or socket directory\n"
591  " -p, --port=PORT database server port number\n"
592  " -U, --username=USERNAME connect as specified database user\n"
593  " -V, --version output version information, then exit\n"
594  " -?, --help show this help, then exit\n"
595  "\n"
596  "Report bugs to <pgsql-bugs@postgresql.org>.\n",
597  progname, progname);
598 }
599 
600 /* return whether str matches "^\s*[-+]?[0-9]+$" */
601 static bool
602 is_an_int(const char *str)
603 {
604  const char *ptr = str;
605 
606  /* skip leading spaces; cast is consistent with strtoint64 */
607  while (*ptr && isspace((unsigned char) *ptr))
608  ptr++;
609 
610  /* skip sign */
611  if (*ptr == '+' || *ptr == '-')
612  ptr++;
613 
614  /* at least one digit */
615  if (*ptr && !isdigit((unsigned char) *ptr))
616  return false;
617 
618  /* eat all digits */
619  while (*ptr && isdigit((unsigned char) *ptr))
620  ptr++;
621 
622  /* must have reached end of string */
623  return *ptr == '\0';
624 }
625 
626 
627 /*
628  * strtoint64 -- convert a string to 64-bit integer
629  *
630  * This function is a modified version of scanint8() from
631  * src/backend/utils/adt/int8.c.
632  */
633 int64
634 strtoint64(const char *str)
635 {
636  const char *ptr = str;
637  int64 result = 0;
638  int sign = 1;
639 
640  /*
641  * Do our own scan, rather than relying on sscanf which might be broken
642  * for long long.
643  */
644 
645  /* skip leading spaces */
646  while (*ptr && isspace((unsigned char) *ptr))
647  ptr++;
648 
649  /* handle sign */
650  if (*ptr == '-')
651  {
652  ptr++;
653 
654  /*
655  * Do an explicit check for INT64_MIN. Ugly though this is, it's
656  * cleaner than trying to get the loop below to handle it portably.
657  */
658  if (strncmp(ptr, "9223372036854775808", 19) == 0)
659  {
660  result = PG_INT64_MIN;
661  ptr += 19;
662  goto gotdigits;
663  }
664  sign = -1;
665  }
666  else if (*ptr == '+')
667  ptr++;
668 
669  /* require at least one digit */
670  if (!isdigit((unsigned char) *ptr))
671  fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
672 
673  /* process digits */
674  while (*ptr && isdigit((unsigned char) *ptr))
675  {
676  int64 tmp = result * 10 + (*ptr++ - '0');
677 
678  if ((tmp / 10) != result) /* overflow? */
679  fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
680  result = tmp;
681  }
682 
683 gotdigits:
684 
685  /* allow trailing whitespace, but not other trailing chars */
686  while (*ptr != '\0' && isspace((unsigned char) *ptr))
687  ptr++;
688 
689  if (*ptr != '\0')
690  fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
691 
692  return ((sign < 0) ? -result : result);
693 }
694 
695 /* random number generator: uniform distribution from min to max inclusive */
696 static int64
697 getrand(TState *thread, int64 min, int64 max)
698 {
699  /*
700  * Odd coding is so that min and max have approximately the same chance of
701  * being selected as do numbers between them.
702  *
703  * pg_erand48() is thread-safe and concurrent, which is why we use it
704  * rather than random(), which in glibc is non-reentrant, and therefore
705  * protected by a mutex, and therefore a bottleneck on machines with many
706  * CPUs.
707  */
708  return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
709 }
710 
711 /*
712  * random number generator: exponential distribution from min to max inclusive.
713  * the parameter is so that the density of probability for the last cut-off max
714  * value is exp(-parameter).
715  */
716 static int64
717 getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
718 {
719  double cut,
720  uniform,
721  rand;
722 
723  /* abort if wrong parameter, but must really be checked beforehand */
724  Assert(parameter > 0.0);
725  cut = exp(-parameter);
726  /* erand in [0, 1), uniform in (0, 1] */
727  uniform = 1.0 - pg_erand48(thread->random_state);
728 
729  /*
730  * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
731  */
732  Assert((1.0 - cut) != 0.0);
733  rand = -log(cut + (1.0 - cut) * uniform) / parameter;
734  /* return int64 random number within between min and max */
735  return min + (int64) ((max - min + 1) * rand);
736 }
737 
738 /* random number generator: gaussian distribution from min to max inclusive */
739 static int64
740 getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
741 {
742  double stdev;
743  double rand;
744 
745  /* abort if parameter is too low, but must really be checked beforehand */
746  Assert(parameter >= MIN_GAUSSIAN_PARAM);
747 
748  /*
749  * Get user specified random number from this loop, with -parameter <
750  * stdev <= parameter
751  *
752  * This loop is executed until the number is in the expected range.
753  *
754  * As the minimum parameter is 2.0, the probability of looping is low:
755  * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
756  * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
757  * the worst case. For a parameter value of 5.0, the looping probability
758  * is about e^{-5} * 2 / pi ~ 0.43%.
759  */
760  do
761  {
762  /*
763  * pg_erand48 generates [0,1), but for the basic version of the
764  * Box-Muller transform the two uniformly distributed random numbers
765  * are expected in (0, 1] (see
766  * http://en.wikipedia.org/wiki/Box_muller)
767  */
768  double rand1 = 1.0 - pg_erand48(thread->random_state);
769  double rand2 = 1.0 - pg_erand48(thread->random_state);
770 
771  /* Box-Muller basic form transform */
772  double var_sqrt = sqrt(-2.0 * log(rand1));
773 
774  stdev = var_sqrt * sin(2.0 * M_PI * rand2);
775 
776  /*
777  * we may try with cos, but there may be a bias induced if the
778  * previous value fails the test. To be on the safe side, let us try
779  * over.
780  */
781  }
782  while (stdev < -parameter || stdev >= parameter);
783 
784  /* stdev is in [-parameter, parameter), normalization to [0,1) */
785  rand = (stdev + parameter) / (parameter * 2.0);
786 
787  /* return int64 random number within between min and max */
788  return min + (int64) ((max - min + 1) * rand);
789 }
790 
791 /*
792  * random number generator: generate a value, such that the series of values
793  * will approximate a Poisson distribution centered on the given value.
794  */
795 static int64
796 getPoissonRand(TState *thread, int64 center)
797 {
798  /*
799  * Use inverse transform sampling to generate a value > 0, such that the
800  * expected (i.e. average) value is the given argument.
801  */
802  double uniform;
803 
804  /* erand in [0, 1), uniform in (0, 1] */
805  uniform = 1.0 - pg_erand48(thread->random_state);
806 
807  return (int64) (-log(uniform) * ((double) center) + 0.5);
808 }
809 
810 /* helper function for getZipfianRand */
811 static double
812 generalizedHarmonicNumber(int64 n, double s)
813 {
814  int i;
815  double ans = 0.0;
816 
817  for (i = n; i > 1; i--)
818  ans += pow(i, -s);
819  return ans + 1.0;
820 }
821 
822 /* set harmonicn and other parameters to cache cell */
823 static void
824 zipfSetCacheCell(ZipfCell *cell, int64 n, double s)
825 {
826  double harmonic2;
827 
828  cell->n = n;
829  cell->s = s;
830 
831  harmonic2 = generalizedHarmonicNumber(2, s);
832  cell->harmonicn = generalizedHarmonicNumber(n, s);
833 
834  cell->alpha = 1.0 / (1.0 - s);
835  cell->beta = pow(0.5, s);
836  cell->eta = (1.0 - pow(2.0 / n, 1.0 - s)) / (1.0 - harmonic2 / cell->harmonicn);
837 }
838 
839 /*
840  * search for cache cell with keys (n, s)
841  * and create new cell if it does not exist
842  */
843 static ZipfCell *
844 zipfFindOrCreateCacheCell(ZipfCache *cache, int64 n, double s)
845 {
846  int i,
847  least_recently_used = 0;
848  ZipfCell *cell;
849 
850  /* search cached cell for given parameters */
851  for (i = 0; i < cache->nb_cells; i++)
852  {
853  cell = &cache->cells[i];
854  if (cell->n == n && cell->s == s)
855  return &cache->cells[i];
856 
857  if (cell->last_used < cache->cells[least_recently_used].last_used)
858  least_recently_used = i;
859  }
860 
861  /* create new one if it does not exist */
862  if (cache->nb_cells < ZIPF_CACHE_SIZE)
863  i = cache->nb_cells++;
864  else
865  {
866  /* replace LRU cell if cache is full */
867  i = least_recently_used;
868  cache->overflowCount++;
869  }
870 
871  zipfSetCacheCell(&cache->cells[i], n, s);
872 
873  cache->cells[i].last_used = cache->current++;
874  return &cache->cells[i];
875 }
876 
877 /*
878  * Computing zipfian using rejection method, based on
879  * "Non-Uniform Random Variate Generation",
880  * Luc Devroye, p. 550-551, Springer 1986.
881  */
882 static int64
883 computeIterativeZipfian(TState *thread, int64 n, double s)
884 {
885  double b = pow(2.0, s - 1.0);
886  double x,
887  t,
888  u,
889  v;
890 
891  while (true)
892  {
893  /* random variates */
894  u = pg_erand48(thread->random_state);
895  v = pg_erand48(thread->random_state);
896 
897  x = floor(pow(u, -1.0 / (s - 1.0)));
898 
899  t = pow(1.0 + 1.0 / x, s - 1.0);
900  /* reject if too large or out of bound */
901  if (v * x * (t - 1.0) / (b - 1.0) <= t / b && x <= n)
902  break;
903  }
904  return (int64) x;
905 }
906 
907 /*
908  * Computing zipfian using harmonic numbers, based on algorithm described in
909  * "Quickly Generating Billion-Record Synthetic Databases",
910  * Jim Gray et al, SIGMOD 1994
911  */
912 static int64
913 computeHarmonicZipfian(TState *thread, int64 n, double s)
914 {
915  ZipfCell *cell = zipfFindOrCreateCacheCell(&thread->zipf_cache, n, s);
916  double uniform = pg_erand48(thread->random_state);
917  double uz = uniform * cell->harmonicn;
918 
919  if (uz < 1.0)
920  return 1;
921  if (uz < 1.0 + cell->beta)
922  return 2;
923  return 1 + (int64) (cell->n * pow(cell->eta * uniform - cell->eta + 1.0, cell->alpha));
924 }
925 
926 /* random number generator: zipfian distribution from min to max inclusive */
927 static int64
928 getZipfianRand(TState *thread, int64 min, int64 max, double s)
929 {
930  int64 n = max - min + 1;
931 
932  /* abort if parameter is invalid */
933  Assert(s > 0.0 && s != 1.0 && s <= MAX_ZIPFIAN_PARAM);
934 
935 
936  return min - 1 + ((s > 1)
937  ? computeIterativeZipfian(thread, n, s)
938  : computeHarmonicZipfian(thread, n, s));
939 }
940 
941 /*
942  * FNV-1a hash function
943  */
944 static int64
945 getHashFnv1a(int64 val, uint64 seed)
946 {
947  int64 result;
948  int i;
949 
950  result = FNV_OFFSET_BASIS ^ seed;
951  for (i = 0; i < 8; ++i)
952  {
953  int32 octet = val & 0xff;
954 
955  val = val >> 8;
956  result = result ^ octet;
957  result = result * FNV_PRIME;
958  }
959 
960  return result;
961 }
962 
963 /*
964  * Murmur2 hash function
965  *
966  * Based on original work of Austin Appleby
967  * https://github.com/aappleby/smhasher/blob/master/src/MurmurHash2.cpp
968  */
969 static int64
970 getHashMurmur2(int64 val, uint64 seed)
971 {
972  uint64 result = seed ^ MM2_MUL_TIMES_8; /* sizeof(int64) */
973  uint64 k = (uint64) val;
974 
975  k *= MM2_MUL;
976  k ^= k >> MM2_ROT;
977  k *= MM2_MUL;
978 
979  result ^= k;
980  result *= MM2_MUL;
981 
982  result ^= result >> MM2_ROT;
983  result *= MM2_MUL;
984  result ^= result >> MM2_ROT;
985 
986  return (int64) result;
987 }
988 
989 /*
990  * Initialize the given SimpleStats struct to all zeroes
991  */
992 static void
994 {
995  memset(ss, 0, sizeof(SimpleStats));
996 }
997 
998 /*
999  * Accumulate one value into a SimpleStats struct.
1000  */
1001 static void
1003 {
1004  if (ss->count == 0 || val < ss->min)
1005  ss->min = val;
1006  if (ss->count == 0 || val > ss->max)
1007  ss->max = val;
1008  ss->count++;
1009  ss->sum += val;
1010  ss->sum2 += val * val;
1011 }
1012 
1013 /*
1014  * Merge two SimpleStats objects
1015  */
1016 static void
1018 {
1019  if (acc->count == 0 || ss->min < acc->min)
1020  acc->min = ss->min;
1021  if (acc->count == 0 || ss->max > acc->max)
1022  acc->max = ss->max;
1023  acc->count += ss->count;
1024  acc->sum += ss->sum;
1025  acc->sum2 += ss->sum2;
1026 }
1027 
1028 /*
1029  * Initialize a StatsData struct to mostly zeroes, with its start time set to
1030  * the given value.
1031  */
1032 static void
1034 {
1035  sd->start_time = start_time;
1036  sd->cnt = 0;
1037  sd->skipped = 0;
1038  initSimpleStats(&sd->latency);
1039  initSimpleStats(&sd->lag);
1040 }
1041 
1042 /*
1043  * Accumulate one additional item into the given stats object.
1044  */
1045 static void
1046 accumStats(StatsData *stats, bool skipped, double lat, double lag)
1047 {
1048  stats->cnt++;
1049 
1050  if (skipped)
1051  {
1052  /* no latency to record on skipped transactions */
1053  stats->skipped++;
1054  }
1055  else
1056  {
1057  addToSimpleStats(&stats->latency, lat);
1058 
1059  /* and possibly the same for schedule lag */
1060  if (throttle_delay)
1061  addToSimpleStats(&stats->lag, lag);
1062  }
1063 }
1064 
1065 /* call PQexec() and exit() on failure */
1066 static void
1067 executeStatement(PGconn *con, const char *sql)
1068 {
1069  PGresult *res;
1070 
1071  res = PQexec(con, sql);
1072  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1073  {
1074  fprintf(stderr, "%s", PQerrorMessage(con));
1075  exit(1);
1076  }
1077  PQclear(res);
1078 }
1079 
1080 /* call PQexec() and complain, but without exiting, on failure */
1081 static void
1082 tryExecuteStatement(PGconn *con, const char *sql)
1083 {
1084  PGresult *res;
1085 
1086  res = PQexec(con, sql);
1087  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1088  {
1089  fprintf(stderr, "%s", PQerrorMessage(con));
1090  fprintf(stderr, "(ignoring this error and continuing anyway)\n");
1091  }
1092  PQclear(res);
1093 }
1094 
1095 /* set up a connection to the backend */
1096 static PGconn *
1098 {
1099  PGconn *conn;
1100  bool new_pass;
1101  static bool have_password = false;
1102  static char password[100];
1103 
1104  /*
1105  * Start the connection. Loop until we have a password if requested by
1106  * backend.
1107  */
1108  do
1109  {
1110 #define PARAMS_ARRAY_SIZE 7
1111 
1112  const char *keywords[PARAMS_ARRAY_SIZE];
1113  const char *values[PARAMS_ARRAY_SIZE];
1114 
1115  keywords[0] = "host";
1116  values[0] = pghost;
1117  keywords[1] = "port";
1118  values[1] = pgport;
1119  keywords[2] = "user";
1120  values[2] = login;
1121  keywords[3] = "password";
1122  values[3] = have_password ? password : NULL;
1123  keywords[4] = "dbname";
1124  values[4] = dbName;
1125  keywords[5] = "fallback_application_name";
1126  values[5] = progname;
1127  keywords[6] = NULL;
1128  values[6] = NULL;
1129 
1130  new_pass = false;
1131 
1132  conn = PQconnectdbParams(keywords, values, true);
1133 
1134  if (!conn)
1135  {
1136  fprintf(stderr, "connection to database \"%s\" failed\n",
1137  dbName);
1138  return NULL;
1139  }
1140 
1141  if (PQstatus(conn) == CONNECTION_BAD &&
1142  PQconnectionNeedsPassword(conn) &&
1143  !have_password)
1144  {
1145  PQfinish(conn);
1146  simple_prompt("Password: ", password, sizeof(password), false);
1147  have_password = true;
1148  new_pass = true;
1149  }
1150  } while (new_pass);
1151 
1152  /* check to see that the backend connection was successfully made */
1153  if (PQstatus(conn) == CONNECTION_BAD)
1154  {
1155  fprintf(stderr, "connection to database \"%s\" failed:\n%s",
1156  dbName, PQerrorMessage(conn));
1157  PQfinish(conn);
1158  return NULL;
1159  }
1160 
1161  return conn;
1162 }
1163 
1164 /* throw away response from backend */
1165 static void
1167 {
1168  PGresult *res;
1169 
1170  do
1171  {
1172  res = PQgetResult(state->con);
1173  if (res)
1174  PQclear(res);
1175  } while (res);
1176 }
1177 
1178 /* qsort comparator for Variable array */
1179 static int
1180 compareVariableNames(const void *v1, const void *v2)
1181 {
1182  return strcmp(((const Variable *) v1)->name,
1183  ((const Variable *) v2)->name);
1184 }
1185 
1186 /* Locate a variable by name; returns NULL if unknown */
1187 static Variable *
1189 {
1190  Variable key;
1191 
1192  /* On some versions of Solaris, bsearch of zero items dumps core */
1193  if (st->nvariables <= 0)
1194  return NULL;
1195 
1196  /* Sort if we have to */
1197  if (!st->vars_sorted)
1198  {
1199  qsort((void *) st->variables, st->nvariables, sizeof(Variable),
1201  st->vars_sorted = true;
1202  }
1203 
1204  /* Now we can search */
1205  key.name = name;
1206  return (Variable *) bsearch((void *) &key,
1207  (void *) st->variables,
1208  st->nvariables,
1209  sizeof(Variable),
1211 }
1212 
1213 /* Get the value of a variable, in string form; returns NULL if unknown */
1214 static char *
1216 {
1217  Variable *var;
1218  char stringform[64];
1219 
1220  var = lookupVariable(st, name);
1221  if (var == NULL)
1222  return NULL; /* not found */
1223 
1224  if (var->svalue)
1225  return var->svalue; /* we have it in string form */
1226 
1227  /* We need to produce a string equivalent of the value */
1228  Assert(var->value.type != PGBT_NO_VALUE);
1229  if (var->value.type == PGBT_NULL)
1230  snprintf(stringform, sizeof(stringform), "NULL");
1231  else if (var->value.type == PGBT_BOOLEAN)
1232  snprintf(stringform, sizeof(stringform),
1233  "%s", var->value.u.bval ? "true" : "false");
1234  else if (var->value.type == PGBT_INT)
1235  snprintf(stringform, sizeof(stringform),
1236  INT64_FORMAT, var->value.u.ival);
1237  else if (var->value.type == PGBT_DOUBLE)
1238  snprintf(stringform, sizeof(stringform),
1239  "%.*g", DBL_DIG, var->value.u.dval);
1240  else /* internal error, unexpected type */
1241  Assert(0);
1242  var->svalue = pg_strdup(stringform);
1243  return var->svalue;
1244 }
1245 
1246 /* Try to convert variable to a value; return false on failure */
1247 static bool
1249 {
1250  size_t slen;
1251 
1252  if (var->value.type != PGBT_NO_VALUE)
1253  return true; /* no work */
1254 
1255  slen = strlen(var->svalue);
1256 
1257  if (slen == 0)
1258  /* what should it do on ""? */
1259  return false;
1260 
1261  if (pg_strcasecmp(var->svalue, "null") == 0)
1262  {
1263  setNullValue(&var->value);
1264  }
1265 
1266  /*
1267  * accept prefixes such as y, ye, n, no... but not for "o". 0/1 are
1268  * recognized later as an int, which is converted to bool if needed.
1269  */
1270  else if (pg_strncasecmp(var->svalue, "true", slen) == 0 ||
1271  pg_strncasecmp(var->svalue, "yes", slen) == 0 ||
1272  pg_strcasecmp(var->svalue, "on") == 0)
1273  {
1274  setBoolValue(&var->value, true);
1275  }
1276  else if (pg_strncasecmp(var->svalue, "false", slen) == 0 ||
1277  pg_strncasecmp(var->svalue, "no", slen) == 0 ||
1278  pg_strcasecmp(var->svalue, "off") == 0 ||
1279  pg_strcasecmp(var->svalue, "of") == 0)
1280  {
1281  setBoolValue(&var->value, false);
1282  }
1283  else if (is_an_int(var->svalue))
1284  {
1285  setIntValue(&var->value, strtoint64(var->svalue));
1286  }
1287  else /* type should be double */
1288  {
1289  double dv;
1290  char xs;
1291 
1292  if (sscanf(var->svalue, "%lf%c", &dv, &xs) != 1)
1293  {
1294  fprintf(stderr,
1295  "malformed variable \"%s\" value: \"%s\"\n",
1296  var->name, var->svalue);
1297  return false;
1298  }
1299  setDoubleValue(&var->value, dv);
1300  }
1301  return true;
1302 }
1303 
1304 /*
1305  * Check whether a variable's name is allowed.
1306  *
1307  * We allow any non-ASCII character, as well as ASCII letters, digits, and
1308  * underscore.
1309  *
1310  * Keep this in sync with the definitions of variable name characters in
1311  * "src/fe_utils/psqlscan.l", "src/bin/psql/psqlscanslash.l" and
1312  * "src/bin/pgbench/exprscan.l". Also see parseVariable(), below.
1313  *
1314  * Note: this static function is copied from "src/bin/psql/variables.c"
1315  */
1316 static bool
1318 {
1319  const unsigned char *ptr = (const unsigned char *) name;
1320 
1321  /* Mustn't be zero-length */
1322  if (*ptr == '\0')
1323  return false;
1324 
1325  while (*ptr)
1326  {
1327  if (IS_HIGHBIT_SET(*ptr) ||
1328  strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1329  "_0123456789", *ptr) != NULL)
1330  ptr++;
1331  else
1332  return false;
1333  }
1334 
1335  return true;
1336 }
1337 
1338 /*
1339  * Lookup a variable by name, creating it if need be.
1340  * Caller is expected to assign a value to the variable.
1341  * Returns NULL on failure (bad name).
1342  */
1343 static Variable *
1344 lookupCreateVariable(CState *st, const char *context, char *name)
1345 {
1346  Variable *var;
1347 
1348  var = lookupVariable(st, name);
1349  if (var == NULL)
1350  {
1351  Variable *newvars;
1352 
1353  /*
1354  * Check for the name only when declaring a new variable to avoid
1355  * overhead.
1356  */
1357  if (!valid_variable_name(name))
1358  {
1359  fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
1360  context, name);
1361  return NULL;
1362  }
1363 
1364  /* Create variable at the end of the array */
1365  if (st->variables)
1366  newvars = (Variable *) pg_realloc(st->variables,
1367  (st->nvariables + 1) * sizeof(Variable));
1368  else
1369  newvars = (Variable *) pg_malloc(sizeof(Variable));
1370 
1371  st->variables = newvars;
1372 
1373  var = &newvars[st->nvariables];
1374 
1375  var->name = pg_strdup(name);
1376  var->svalue = NULL;
1377  /* caller is expected to initialize remaining fields */
1378 
1379  st->nvariables++;
1380  /* we don't re-sort the array till we have to */
1381  st->vars_sorted = false;
1382  }
1383 
1384  return var;
1385 }
1386 
1387 /* Assign a string value to a variable, creating it if need be */
1388 /* Returns false on failure (bad name) */
1389 static bool
1390 putVariable(CState *st, const char *context, char *name, const char *value)
1391 {
1392  Variable *var;
1393  char *val;
1394 
1395  var = lookupCreateVariable(st, context, name);
1396  if (!var)
1397  return false;
1398 
1399  /* dup then free, in case value is pointing at this variable */
1400  val = pg_strdup(value);
1401 
1402  if (var->svalue)
1403  free(var->svalue);
1404  var->svalue = val;
1405  var->value.type = PGBT_NO_VALUE;
1406 
1407  return true;
1408 }
1409 
1410 /* Assign a value to a variable, creating it if need be */
1411 /* Returns false on failure (bad name) */
1412 static bool
1413 putVariableValue(CState *st, const char *context, char *name,
1414  const PgBenchValue *value)
1415 {
1416  Variable *var;
1417 
1418  var = lookupCreateVariable(st, context, name);
1419  if (!var)
1420  return false;
1421 
1422  if (var->svalue)
1423  free(var->svalue);
1424  var->svalue = NULL;
1425  var->value = *value;
1426 
1427  return true;
1428 }
1429 
1430 /* Assign an integer value to a variable, creating it if need be */
1431 /* Returns false on failure (bad name) */
1432 static bool
1433 putVariableInt(CState *st, const char *context, char *name, int64 value)
1434 {
1435  PgBenchValue val;
1436 
1437  setIntValue(&val, value);
1438  return putVariableValue(st, context, name, &val);
1439 }
1440 
1441 /*
1442  * Parse a possible variable reference (:varname).
1443  *
1444  * "sql" points at a colon. If what follows it looks like a valid
1445  * variable name, return a malloc'd string containing the variable name,
1446  * and set *eaten to the number of characters consumed.
1447  * Otherwise, return NULL.
1448  */
1449 static char *
1450 parseVariable(const char *sql, int *eaten)
1451 {
1452  int i = 0;
1453  char *name;
1454 
1455  do
1456  {
1457  i++;
1458  } while (IS_HIGHBIT_SET(sql[i]) ||
1459  strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1460  "_0123456789", sql[i]) != NULL);
1461  if (i == 1)
1462  return NULL; /* no valid variable name chars */
1463 
1464  name = pg_malloc(i);
1465  memcpy(name, &sql[1], i - 1);
1466  name[i - 1] = '\0';
1467 
1468  *eaten = i;
1469  return name;
1470 }
1471 
1472 static char *
1473 replaceVariable(char **sql, char *param, int len, char *value)
1474 {
1475  int valueln = strlen(value);
1476 
1477  if (valueln > len)
1478  {
1479  size_t offset = param - *sql;
1480 
1481  *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
1482  param = *sql + offset;
1483  }
1484 
1485  if (valueln != len)
1486  memmove(param + valueln, param + len, strlen(param + len) + 1);
1487  memcpy(param, value, valueln);
1488 
1489  return param + valueln;
1490 }
1491 
1492 static char *
1493 assignVariables(CState *st, char *sql)
1494 {
1495  char *p,
1496  *name,
1497  *val;
1498 
1499  p = sql;
1500  while ((p = strchr(p, ':')) != NULL)
1501  {
1502  int eaten;
1503 
1504  name = parseVariable(p, &eaten);
1505  if (name == NULL)
1506  {
1507  while (*p == ':')
1508  {
1509  p++;
1510  }
1511  continue;
1512  }
1513 
1514  val = getVariable(st, name);
1515  free(name);
1516  if (val == NULL)
1517  {
1518  p++;
1519  continue;
1520  }
1521 
1522  p = replaceVariable(&sql, p, eaten, val);
1523  }
1524 
1525  return sql;
1526 }
1527 
1528 static void
1529 getQueryParams(CState *st, const Command *command, const char **params)
1530 {
1531  int i;
1532 
1533  for (i = 0; i < command->argc - 1; i++)
1534  params[i] = getVariable(st, command->argv[i + 1]);
1535 }
1536 
1537 static char *
1539 {
1540  if (pval->type == PGBT_NO_VALUE)
1541  return "none";
1542  else if (pval->type == PGBT_NULL)
1543  return "null";
1544  else if (pval->type == PGBT_INT)
1545  return "int";
1546  else if (pval->type == PGBT_DOUBLE)
1547  return "double";
1548  else if (pval->type == PGBT_BOOLEAN)
1549  return "boolean";
1550  else
1551  {
1552  /* internal error, should never get there */
1553  Assert(false);
1554  return NULL;
1555  }
1556 }
1557 
1558 /* get a value as a boolean, or tell if there is a problem */
1559 static bool
1560 coerceToBool(PgBenchValue *pval, bool *bval)
1561 {
1562  if (pval->type == PGBT_BOOLEAN)
1563  {
1564  *bval = pval->u.bval;
1565  return true;
1566  }
1567  else /* NULL, INT or DOUBLE */
1568  {
1569  fprintf(stderr, "cannot coerce %s to boolean\n", valueTypeName(pval));
1570  *bval = false; /* suppress uninitialized-variable warnings */
1571  return false;
1572  }
1573 }
1574 
1575 /*
1576  * Return true or false from an expression for conditional purposes.
1577  * Non zero numerical values are true, zero and NULL are false.
1578  */
1579 static bool
1581 {
1582  switch (pval->type)
1583  {
1584  case PGBT_NULL:
1585  return false;
1586  case PGBT_BOOLEAN:
1587  return pval->u.bval;
1588  case PGBT_INT:
1589  return pval->u.ival != 0;
1590  case PGBT_DOUBLE:
1591  return pval->u.dval != 0.0;
1592  default:
1593  /* internal error, unexpected type */
1594  Assert(0);
1595  return false;
1596  }
1597 }
1598 
1599 /* get a value as an int, tell if there is a problem */
1600 static bool
1601 coerceToInt(PgBenchValue *pval, int64 *ival)
1602 {
1603  if (pval->type == PGBT_INT)
1604  {
1605  *ival = pval->u.ival;
1606  return true;
1607  }
1608  else if (pval->type == PGBT_DOUBLE)
1609  {
1610  double dval = pval->u.dval;
1611 
1612  if (dval < PG_INT64_MIN || PG_INT64_MAX < dval)
1613  {
1614  fprintf(stderr, "double to int overflow for %f\n", dval);
1615  return false;
1616  }
1617  *ival = (int64) dval;
1618  return true;
1619  }
1620  else /* BOOLEAN or NULL */
1621  {
1622  fprintf(stderr, "cannot coerce %s to int\n", valueTypeName(pval));
1623  return false;
1624  }
1625 }
1626 
1627 /* get a value as a double, or tell if there is a problem */
1628 static bool
1629 coerceToDouble(PgBenchValue *pval, double *dval)
1630 {
1631  if (pval->type == PGBT_DOUBLE)
1632  {
1633  *dval = pval->u.dval;
1634  return true;
1635  }
1636  else if (pval->type == PGBT_INT)
1637  {
1638  *dval = (double) pval->u.ival;
1639  return true;
1640  }
1641  else /* BOOLEAN or NULL */
1642  {
1643  fprintf(stderr, "cannot coerce %s to double\n", valueTypeName(pval));
1644  return false;
1645  }
1646 }
1647 
1648 /* assign a null value */
1649 static void
1651 {
1652  pv->type = PGBT_NULL;
1653  pv->u.ival = 0;
1654 }
1655 
1656 /* assign a boolean value */
1657 static void
1659 {
1660  pv->type = PGBT_BOOLEAN;
1661  pv->u.bval = bval;
1662 }
1663 
1664 /* assign an integer value */
1665 static void
1666 setIntValue(PgBenchValue *pv, int64 ival)
1667 {
1668  pv->type = PGBT_INT;
1669  pv->u.ival = ival;
1670 }
1671 
1672 /* assign a double value */
1673 static void
1674 setDoubleValue(PgBenchValue *pv, double dval)
1675 {
1676  pv->type = PGBT_DOUBLE;
1677  pv->u.dval = dval;
1678 }
1679 
1680 static bool
1682 {
1683  return func == PGBENCH_AND || func == PGBENCH_OR || func == PGBENCH_CASE;
1684 }
1685 
1686 /* lazy evaluation of some functions */
1687 static bool
1690 {
1691  PgBenchValue a1,
1692  a2;
1693  bool ba1,
1694  ba2;
1695 
1696  Assert(isLazyFunc(func) && args != NULL && args->next != NULL);
1697 
1698  /* args points to first condition */
1699  if (!evaluateExpr(thread, st, args->expr, &a1))
1700  return false;
1701 
1702  /* second condition for AND/OR and corresponding branch for CASE */
1703  args = args->next;
1704 
1705  switch (func)
1706  {
1707  case PGBENCH_AND:
1708  if (a1.type == PGBT_NULL)
1709  {
1710  setNullValue(retval);
1711  return true;
1712  }
1713 
1714  if (!coerceToBool(&a1, &ba1))
1715  return false;
1716 
1717  if (!ba1)
1718  {
1719  setBoolValue(retval, false);
1720  return true;
1721  }
1722 
1723  if (!evaluateExpr(thread, st, args->expr, &a2))
1724  return false;
1725 
1726  if (a2.type == PGBT_NULL)
1727  {
1728  setNullValue(retval);
1729  return true;
1730  }
1731  else if (!coerceToBool(&a2, &ba2))
1732  return false;
1733  else
1734  {
1735  setBoolValue(retval, ba2);
1736  return true;
1737  }
1738 
1739  return true;
1740 
1741  case PGBENCH_OR:
1742 
1743  if (a1.type == PGBT_NULL)
1744  {
1745  setNullValue(retval);
1746  return true;
1747  }
1748 
1749  if (!coerceToBool(&a1, &ba1))
1750  return false;
1751 
1752  if (ba1)
1753  {
1754  setBoolValue(retval, true);
1755  return true;
1756  }
1757 
1758  if (!evaluateExpr(thread, st, args->expr, &a2))
1759  return false;
1760 
1761  if (a2.type == PGBT_NULL)
1762  {
1763  setNullValue(retval);
1764  return true;
1765  }
1766  else if (!coerceToBool(&a2, &ba2))
1767  return false;
1768  else
1769  {
1770  setBoolValue(retval, ba2);
1771  return true;
1772  }
1773 
1774  case PGBENCH_CASE:
1775  /* when true, execute branch */
1776  if (valueTruth(&a1))
1777  return evaluateExpr(thread, st, args->expr, retval);
1778 
1779  /* now args contains next condition or final else expression */
1780  args = args->next;
1781 
1782  /* final else case? */
1783  if (args->next == NULL)
1784  return evaluateExpr(thread, st, args->expr, retval);
1785 
1786  /* no, another when, proceed */
1787  return evalLazyFunc(thread, st, PGBENCH_CASE, args, retval);
1788 
1789  default:
1790  /* internal error, cannot get here */
1791  Assert(0);
1792  break;
1793  }
1794  return false;
1795 }
1796 
1797 /* maximum number of function arguments */
1798 #define MAX_FARGS 16
1799 
1800 /*
1801  * Recursive evaluation of standard functions,
1802  * which do not require lazy evaluation.
1803  */
1804 static bool
1807  PgBenchValue *retval)
1808 {
1809  /* evaluate all function arguments */
1810  int nargs = 0;
1811  PgBenchValue vargs[MAX_FARGS];
1812  PgBenchExprLink *l = args;
1813  bool has_null = false;
1814 
1815  for (nargs = 0; nargs < MAX_FARGS && l != NULL; nargs++, l = l->next)
1816  {
1817  if (!evaluateExpr(thread, st, l->expr, &vargs[nargs]))
1818  return false;
1819  has_null |= vargs[nargs].type == PGBT_NULL;
1820  }
1821 
1822  if (l != NULL)
1823  {
1824  fprintf(stderr,
1825  "too many function arguments, maximum is %d\n", MAX_FARGS);
1826  return false;
1827  }
1828 
1829  /* NULL arguments */
1830  if (has_null && func != PGBENCH_IS && func != PGBENCH_DEBUG)
1831  {
1832  setNullValue(retval);
1833  return true;
1834  }
1835 
1836  /* then evaluate function */
1837  switch (func)
1838  {
1839  /* overloaded operators */
1840  case PGBENCH_ADD:
1841  case PGBENCH_SUB:
1842  case PGBENCH_MUL:
1843  case PGBENCH_DIV:
1844  case PGBENCH_MOD:
1845  case PGBENCH_EQ:
1846  case PGBENCH_NE:
1847  case PGBENCH_LE:
1848  case PGBENCH_LT:
1849  {
1850  PgBenchValue *lval = &vargs[0],
1851  *rval = &vargs[1];
1852 
1853  Assert(nargs == 2);
1854 
1855  /* overloaded type management, double if some double */
1856  if ((lval->type == PGBT_DOUBLE ||
1857  rval->type == PGBT_DOUBLE) && func != PGBENCH_MOD)
1858  {
1859  double ld,
1860  rd;
1861 
1862  if (!coerceToDouble(lval, &ld) ||
1863  !coerceToDouble(rval, &rd))
1864  return false;
1865 
1866  switch (func)
1867  {
1868  case PGBENCH_ADD:
1869  setDoubleValue(retval, ld + rd);
1870  return true;
1871 
1872  case PGBENCH_SUB:
1873  setDoubleValue(retval, ld - rd);
1874  return true;
1875 
1876  case PGBENCH_MUL:
1877  setDoubleValue(retval, ld * rd);
1878  return true;
1879 
1880  case PGBENCH_DIV:
1881  setDoubleValue(retval, ld / rd);
1882  return true;
1883 
1884  case PGBENCH_EQ:
1885  setBoolValue(retval, ld == rd);
1886  return true;
1887 
1888  case PGBENCH_NE:
1889  setBoolValue(retval, ld != rd);
1890  return true;
1891 
1892  case PGBENCH_LE:
1893  setBoolValue(retval, ld <= rd);
1894  return true;
1895 
1896  case PGBENCH_LT:
1897  setBoolValue(retval, ld < rd);
1898  return true;
1899 
1900  default:
1901  /* cannot get here */
1902  Assert(0);
1903  }
1904  }
1905  else /* we have integer operands, or % */
1906  {
1907  int64 li,
1908  ri;
1909 
1910  if (!coerceToInt(lval, &li) ||
1911  !coerceToInt(rval, &ri))
1912  return false;
1913 
1914  switch (func)
1915  {
1916  case PGBENCH_ADD:
1917  setIntValue(retval, li + ri);
1918  return true;
1919 
1920  case PGBENCH_SUB:
1921  setIntValue(retval, li - ri);
1922  return true;
1923 
1924  case PGBENCH_MUL:
1925  setIntValue(retval, li * ri);
1926  return true;
1927 
1928  case PGBENCH_EQ:
1929  setBoolValue(retval, li == ri);
1930  return true;
1931 
1932  case PGBENCH_NE:
1933  setBoolValue(retval, li != ri);
1934  return true;
1935 
1936  case PGBENCH_LE:
1937  setBoolValue(retval, li <= ri);
1938  return true;
1939 
1940  case PGBENCH_LT:
1941  setBoolValue(retval, li < ri);
1942  return true;
1943 
1944  case PGBENCH_DIV:
1945  case PGBENCH_MOD:
1946  if (ri == 0)
1947  {
1948  fprintf(stderr, "division by zero\n");
1949  return false;
1950  }
1951  /* special handling of -1 divisor */
1952  if (ri == -1)
1953  {
1954  if (func == PGBENCH_DIV)
1955  {
1956  /* overflow check (needed for INT64_MIN) */
1957  if (li == PG_INT64_MIN)
1958  {
1959  fprintf(stderr, "bigint out of range\n");
1960  return false;
1961  }
1962  else
1963  setIntValue(retval, -li);
1964  }
1965  else
1966  setIntValue(retval, 0);
1967  return true;
1968  }
1969  /* else divisor is not -1 */
1970  if (func == PGBENCH_DIV)
1971  setIntValue(retval, li / ri);
1972  else /* func == PGBENCH_MOD */
1973  setIntValue(retval, li % ri);
1974 
1975  return true;
1976 
1977  default:
1978  /* cannot get here */
1979  Assert(0);
1980  }
1981  }
1982 
1983  Assert(0);
1984  return false; /* NOTREACHED */
1985  }
1986 
1987  /* integer bitwise operators */
1988  case PGBENCH_BITAND:
1989  case PGBENCH_BITOR:
1990  case PGBENCH_BITXOR:
1991  case PGBENCH_LSHIFT:
1992  case PGBENCH_RSHIFT:
1993  {
1994  int64 li,
1995  ri;
1996 
1997  if (!coerceToInt(&vargs[0], &li) || !coerceToInt(&vargs[1], &ri))
1998  return false;
1999 
2000  if (func == PGBENCH_BITAND)
2001  setIntValue(retval, li & ri);
2002  else if (func == PGBENCH_BITOR)
2003  setIntValue(retval, li | ri);
2004  else if (func == PGBENCH_BITXOR)
2005  setIntValue(retval, li ^ ri);
2006  else if (func == PGBENCH_LSHIFT)
2007  setIntValue(retval, li << ri);
2008  else if (func == PGBENCH_RSHIFT)
2009  setIntValue(retval, li >> ri);
2010  else /* cannot get here */
2011  Assert(0);
2012 
2013  return true;
2014  }
2015 
2016  /* logical operators */
2017  case PGBENCH_NOT:
2018  {
2019  bool b;
2020 
2021  if (!coerceToBool(&vargs[0], &b))
2022  return false;
2023 
2024  setBoolValue(retval, !b);
2025  return true;
2026  }
2027 
2028  /* no arguments */
2029  case PGBENCH_PI:
2030  setDoubleValue(retval, M_PI);
2031  return true;
2032 
2033  /* 1 overloaded argument */
2034  case PGBENCH_ABS:
2035  {
2036  PgBenchValue *varg = &vargs[0];
2037 
2038  Assert(nargs == 1);
2039 
2040  if (varg->type == PGBT_INT)
2041  {
2042  int64 i = varg->u.ival;
2043 
2044  setIntValue(retval, i < 0 ? -i : i);
2045  }
2046  else
2047  {
2048  double d = varg->u.dval;
2049 
2050  Assert(varg->type == PGBT_DOUBLE);
2051  setDoubleValue(retval, d < 0.0 ? -d : d);
2052  }
2053 
2054  return true;
2055  }
2056 
2057  case PGBENCH_DEBUG:
2058  {
2059  PgBenchValue *varg = &vargs[0];
2060 
2061  Assert(nargs == 1);
2062 
2063  fprintf(stderr, "debug(script=%d,command=%d): ",
2064  st->use_file, st->command + 1);
2065 
2066  if (varg->type == PGBT_NULL)
2067  fprintf(stderr, "null\n");
2068  else if (varg->type == PGBT_BOOLEAN)
2069  fprintf(stderr, "boolean %s\n", varg->u.bval ? "true" : "false");
2070  else if (varg->type == PGBT_INT)
2071  fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
2072  else if (varg->type == PGBT_DOUBLE)
2073  fprintf(stderr, "double %.*g\n", DBL_DIG, varg->u.dval);
2074  else /* internal error, unexpected type */
2075  Assert(0);
2076 
2077  *retval = *varg;
2078 
2079  return true;
2080  }
2081 
2082  /* 1 double argument */
2083  case PGBENCH_DOUBLE:
2084  case PGBENCH_SQRT:
2085  case PGBENCH_LN:
2086  case PGBENCH_EXP:
2087  {
2088  double dval;
2089 
2090  Assert(nargs == 1);
2091 
2092  if (!coerceToDouble(&vargs[0], &dval))
2093  return false;
2094 
2095  if (func == PGBENCH_SQRT)
2096  dval = sqrt(dval);
2097  else if (func == PGBENCH_LN)
2098  dval = log(dval);
2099  else if (func == PGBENCH_EXP)
2100  dval = exp(dval);
2101  /* else is cast: do nothing */
2102 
2103  setDoubleValue(retval, dval);
2104  return true;
2105  }
2106 
2107  /* 1 int argument */
2108  case PGBENCH_INT:
2109  {
2110  int64 ival;
2111 
2112  Assert(nargs == 1);
2113 
2114  if (!coerceToInt(&vargs[0], &ival))
2115  return false;
2116 
2117  setIntValue(retval, ival);
2118  return true;
2119  }
2120 
2121  /* variable number of arguments */
2122  case PGBENCH_LEAST:
2123  case PGBENCH_GREATEST:
2124  {
2125  bool havedouble;
2126  int i;
2127 
2128  Assert(nargs >= 1);
2129 
2130  /* need double result if any input is double */
2131  havedouble = false;
2132  for (i = 0; i < nargs; i++)
2133  {
2134  if (vargs[i].type == PGBT_DOUBLE)
2135  {
2136  havedouble = true;
2137  break;
2138  }
2139  }
2140  if (havedouble)
2141  {
2142  double extremum;
2143 
2144  if (!coerceToDouble(&vargs[0], &extremum))
2145  return false;
2146  for (i = 1; i < nargs; i++)
2147  {
2148  double dval;
2149 
2150  if (!coerceToDouble(&vargs[i], &dval))
2151  return false;
2152  if (func == PGBENCH_LEAST)
2153  extremum = Min(extremum, dval);
2154  else
2155  extremum = Max(extremum, dval);
2156  }
2157  setDoubleValue(retval, extremum);
2158  }
2159  else
2160  {
2161  int64 extremum;
2162 
2163  if (!coerceToInt(&vargs[0], &extremum))
2164  return false;
2165  for (i = 1; i < nargs; i++)
2166  {
2167  int64 ival;
2168 
2169  if (!coerceToInt(&vargs[i], &ival))
2170  return false;
2171  if (func == PGBENCH_LEAST)
2172  extremum = Min(extremum, ival);
2173  else
2174  extremum = Max(extremum, ival);
2175  }
2176  setIntValue(retval, extremum);
2177  }
2178  return true;
2179  }
2180 
2181  /* random functions */
2182  case PGBENCH_RANDOM:
2186  {
2187  int64 imin,
2188  imax;
2189 
2190  Assert(nargs >= 2);
2191 
2192  if (!coerceToInt(&vargs[0], &imin) ||
2193  !coerceToInt(&vargs[1], &imax))
2194  return false;
2195 
2196  /* check random range */
2197  if (imin > imax)
2198  {
2199  fprintf(stderr, "empty range given to random\n");
2200  return false;
2201  }
2202  else if (imax - imin < 0 || (imax - imin) + 1 < 0)
2203  {
2204  /* prevent int overflows in random functions */
2205  fprintf(stderr, "random range is too large\n");
2206  return false;
2207  }
2208 
2209  if (func == PGBENCH_RANDOM)
2210  {
2211  Assert(nargs == 2);
2212  setIntValue(retval, getrand(thread, imin, imax));
2213  }
2214  else /* gaussian & exponential */
2215  {
2216  double param;
2217 
2218  Assert(nargs == 3);
2219 
2220  if (!coerceToDouble(&vargs[2], &param))
2221  return false;
2222 
2223  if (func == PGBENCH_RANDOM_GAUSSIAN)
2224  {
2225  if (param < MIN_GAUSSIAN_PARAM)
2226  {
2227  fprintf(stderr,
2228  "gaussian parameter must be at least %f "
2229  "(not %f)\n", MIN_GAUSSIAN_PARAM, param);
2230  return false;
2231  }
2232 
2233  setIntValue(retval,
2234  getGaussianRand(thread, imin, imax, param));
2235  }
2236  else if (func == PGBENCH_RANDOM_ZIPFIAN)
2237  {
2238  if (param <= 0.0 || param == 1.0 || param > MAX_ZIPFIAN_PARAM)
2239  {
2240  fprintf(stderr,
2241  "zipfian parameter must be in range (0, 1) U (1, %d]"
2242  " (got %f)\n", MAX_ZIPFIAN_PARAM, param);
2243  return false;
2244  }
2245  setIntValue(retval,
2246  getZipfianRand(thread, imin, imax, param));
2247  }
2248  else /* exponential */
2249  {
2250  if (param <= 0.0)
2251  {
2252  fprintf(stderr,
2253  "exponential parameter must be greater than zero"
2254  " (got %f)\n", param);
2255  return false;
2256  }
2257 
2258  setIntValue(retval,
2259  getExponentialRand(thread, imin, imax, param));
2260  }
2261  }
2262 
2263  return true;
2264  }
2265 
2266  case PGBENCH_POW:
2267  {
2268  PgBenchValue *lval = &vargs[0];
2269  PgBenchValue *rval = &vargs[1];
2270  double ld,
2271  rd;
2272 
2273  Assert(nargs == 2);
2274 
2275  if (!coerceToDouble(lval, &ld) ||
2276  !coerceToDouble(rval, &rd))
2277  return false;
2278 
2279  setDoubleValue(retval, pow(ld, rd));
2280 
2281  return true;
2282  }
2283 
2284  case PGBENCH_IS:
2285  {
2286  Assert(nargs == 2);
2287 
2288  /*
2289  * note: this simple implementation is more permissive than
2290  * SQL
2291  */
2292  setBoolValue(retval,
2293  vargs[0].type == vargs[1].type &&
2294  vargs[0].u.bval == vargs[1].u.bval);
2295  return true;
2296  }
2297 
2298  /* hashing */
2299  case PGBENCH_HASH_FNV1A:
2300  case PGBENCH_HASH_MURMUR2:
2301  {
2302  int64 val,
2303  seed;
2304 
2305  Assert(nargs == 2);
2306 
2307  if (!coerceToInt(&vargs[0], &val) ||
2308  !coerceToInt(&vargs[1], &seed))
2309  return false;
2310 
2311  if (func == PGBENCH_HASH_MURMUR2)
2312  setIntValue(retval, getHashMurmur2(val, seed));
2313  else if (func == PGBENCH_HASH_FNV1A)
2314  setIntValue(retval, getHashFnv1a(val, seed));
2315  else
2316  /* cannot get here */
2317  Assert(0);
2318 
2319  return true;
2320  }
2321 
2322  default:
2323  /* cannot get here */
2324  Assert(0);
2325  /* dead code to avoid a compiler warning */
2326  return false;
2327  }
2328 }
2329 
2330 /* evaluate some function */
2331 static bool
2332 evalFunc(TState *thread, CState *st,
2334 {
2335  if (isLazyFunc(func))
2336  return evalLazyFunc(thread, st, func, args, retval);
2337  else
2338  return evalStandardFunc(thread, st, func, args, retval);
2339 }
2340 
2341 /*
2342  * Recursive evaluation of an expression in a pgbench script
2343  * using the current state of variables.
2344  * Returns whether the evaluation was ok,
2345  * the value itself is returned through the retval pointer.
2346  */
2347 static bool
2348 evaluateExpr(TState *thread, CState *st, PgBenchExpr *expr, PgBenchValue *retval)
2349 {
2350  switch (expr->etype)
2351  {
2352  case ENODE_CONSTANT:
2353  {
2354  *retval = expr->u.constant;
2355  return true;
2356  }
2357 
2358  case ENODE_VARIABLE:
2359  {
2360  Variable *var;
2361 
2362  if ((var = lookupVariable(st, expr->u.variable.varname)) == NULL)
2363  {
2364  fprintf(stderr, "undefined variable \"%s\"\n",
2365  expr->u.variable.varname);
2366  return false;
2367  }
2368 
2369  if (!makeVariableValue(var))
2370  return false;
2371 
2372  *retval = var->value;
2373  return true;
2374  }
2375 
2376  case ENODE_FUNCTION:
2377  return evalFunc(thread, st,
2378  expr->u.function.function,
2379  expr->u.function.args,
2380  retval);
2381 
2382  default:
2383  /* internal error which should never occur */
2384  fprintf(stderr, "unexpected enode type in evaluation: %d\n",
2385  expr->etype);
2386  exit(1);
2387  }
2388 }
2389 
2390 /*
2391  * Convert command name to meta-command enum identifier
2392  */
2393 static MetaCommand
2394 getMetaCommand(const char *cmd)
2395 {
2396  MetaCommand mc;
2397 
2398  if (cmd == NULL)
2399  mc = META_NONE;
2400  else if (pg_strcasecmp(cmd, "set") == 0)
2401  mc = META_SET;
2402  else if (pg_strcasecmp(cmd, "setshell") == 0)
2403  mc = META_SETSHELL;
2404  else if (pg_strcasecmp(cmd, "shell") == 0)
2405  mc = META_SHELL;
2406  else if (pg_strcasecmp(cmd, "sleep") == 0)
2407  mc = META_SLEEP;
2408  else if (pg_strcasecmp(cmd, "if") == 0)
2409  mc = META_IF;
2410  else if (pg_strcasecmp(cmd, "elif") == 0)
2411  mc = META_ELIF;
2412  else if (pg_strcasecmp(cmd, "else") == 0)
2413  mc = META_ELSE;
2414  else if (pg_strcasecmp(cmd, "endif") == 0)
2415  mc = META_ENDIF;
2416  else
2417  mc = META_NONE;
2418  return mc;
2419 }
2420 
2421 /*
2422  * Run a shell command. The result is assigned to the variable if not NULL.
2423  * Return true if succeeded, or false on error.
2424  */
2425 static bool
2426 runShellCommand(CState *st, char *variable, char **argv, int argc)
2427 {
2428  char command[SHELL_COMMAND_SIZE];
2429  int i,
2430  len = 0;
2431  FILE *fp;
2432  char res[64];
2433  char *endptr;
2434  int retval;
2435 
2436  /*----------
2437  * Join arguments with whitespace separators. Arguments starting with
2438  * exactly one colon are treated as variables:
2439  * name - append a string "name"
2440  * :var - append a variable named 'var'
2441  * ::name - append a string ":name"
2442  *----------
2443  */
2444  for (i = 0; i < argc; i++)
2445  {
2446  char *arg;
2447  int arglen;
2448 
2449  if (argv[i][0] != ':')
2450  {
2451  arg = argv[i]; /* a string literal */
2452  }
2453  else if (argv[i][1] == ':')
2454  {
2455  arg = argv[i] + 1; /* a string literal starting with colons */
2456  }
2457  else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
2458  {
2459  fprintf(stderr, "%s: undefined variable \"%s\"\n",
2460  argv[0], argv[i]);
2461  return false;
2462  }
2463 
2464  arglen = strlen(arg);
2465  if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
2466  {
2467  fprintf(stderr, "%s: shell command is too long\n", argv[0]);
2468  return false;
2469  }
2470 
2471  if (i > 0)
2472  command[len++] = ' ';
2473  memcpy(command + len, arg, arglen);
2474  len += arglen;
2475  }
2476 
2477  command[len] = '\0';
2478 
2479  /* Fast path for non-assignment case */
2480  if (variable == NULL)
2481  {
2482  if (system(command))
2483  {
2484  if (!timer_exceeded)
2485  fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
2486  return false;
2487  }
2488  return true;
2489  }
2490 
2491  /* Execute the command with pipe and read the standard output. */
2492  if ((fp = popen(command, "r")) == NULL)
2493  {
2494  fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
2495  return false;
2496  }
2497  if (fgets(res, sizeof(res), fp) == NULL)
2498  {
2499  if (!timer_exceeded)
2500  fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
2501  (void) pclose(fp);
2502  return false;
2503  }
2504  if (pclose(fp) < 0)
2505  {
2506  fprintf(stderr, "%s: could not close shell command\n", argv[0]);
2507  return false;
2508  }
2509 
2510  /* Check whether the result is an integer and assign it to the variable */
2511  retval = (int) strtol(res, &endptr, 10);
2512  while (*endptr != '\0' && isspace((unsigned char) *endptr))
2513  endptr++;
2514  if (*res == '\0' || *endptr != '\0')
2515  {
2516  fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
2517  argv[0], res);
2518  return false;
2519  }
2520  if (!putVariableInt(st, "setshell", variable, retval))
2521  return false;
2522 
2523 #ifdef DEBUG
2524  printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
2525 #endif
2526  return true;
2527 }
2528 
2529 #define MAX_PREPARE_NAME 32
2530 static void
2531 preparedStatementName(char *buffer, int file, int state)
2532 {
2533  sprintf(buffer, "P%d_%d", file, state);
2534 }
2535 
2536 static void
2537 commandFailed(CState *st, const char *cmd, const char *message)
2538 {
2539  fprintf(stderr,
2540  "client %d aborted in command %d (%s) of script %d; %s\n",
2541  st->id, st->command, cmd, st->use_file, message);
2542 }
2543 
2544 /* return a script number with a weighted choice. */
2545 static int
2547 {
2548  int i = 0;
2549  int64 w;
2550 
2551  if (num_scripts == 1)
2552  return 0;
2553 
2554  w = getrand(thread, 0, total_weight - 1);
2555  do
2556  {
2557  w -= sql_script[i++].weight;
2558  } while (w >= 0);
2559 
2560  return i - 1;
2561 }
2562 
2563 /* Send a SQL command, using the chosen querymode */
2564 static bool
2566 {
2567  int r;
2568 
2569  if (querymode == QUERY_SIMPLE)
2570  {
2571  char *sql;
2572 
2573  sql = pg_strdup(command->argv[0]);
2574  sql = assignVariables(st, sql);
2575 
2576  if (debug)
2577  fprintf(stderr, "client %d sending %s\n", st->id, sql);
2578  r = PQsendQuery(st->con, sql);
2579  free(sql);
2580  }
2581  else if (querymode == QUERY_EXTENDED)
2582  {
2583  const char *sql = command->argv[0];
2584  const char *params[MAX_ARGS];
2585 
2586  getQueryParams(st, command, params);
2587 
2588  if (debug)
2589  fprintf(stderr, "client %d sending %s\n", st->id, sql);
2590  r = PQsendQueryParams(st->con, sql, command->argc - 1,
2591  NULL, params, NULL, NULL, 0);
2592  }
2593  else if (querymode == QUERY_PREPARED)
2594  {
2595  char name[MAX_PREPARE_NAME];
2596  const char *params[MAX_ARGS];
2597 
2598  if (!st->prepared[st->use_file])
2599  {
2600  int j;
2601  Command **commands = sql_script[st->use_file].commands;
2602 
2603  for (j = 0; commands[j] != NULL; j++)
2604  {
2605  PGresult *res;
2606  char name[MAX_PREPARE_NAME];
2607 
2608  if (commands[j]->type != SQL_COMMAND)
2609  continue;
2610  preparedStatementName(name, st->use_file, j);
2611  res = PQprepare(st->con, name,
2612  commands[j]->argv[0], commands[j]->argc - 1, NULL);
2613  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2614  fprintf(stderr, "%s", PQerrorMessage(st->con));
2615  PQclear(res);
2616  }
2617  st->prepared[st->use_file] = true;
2618  }
2619 
2620  getQueryParams(st, command, params);
2621  preparedStatementName(name, st->use_file, st->command);
2622 
2623  if (debug)
2624  fprintf(stderr, "client %d sending %s\n", st->id, name);
2625  r = PQsendQueryPrepared(st->con, name, command->argc - 1,
2626  params, NULL, NULL, 0);
2627  }
2628  else /* unknown sql mode */
2629  r = 0;
2630 
2631  if (r == 0)
2632  {
2633  if (debug)
2634  fprintf(stderr, "client %d could not send %s\n",
2635  st->id, command->argv[0]);
2636  st->ecnt++;
2637  return false;
2638  }
2639  else
2640  return true;
2641 }
2642 
2643 /*
2644  * Parse the argument to a \sleep command, and return the requested amount
2645  * of delay, in microseconds. Returns true on success, false on error.
2646  */
2647 static bool
2648 evaluateSleep(CState *st, int argc, char **argv, int *usecs)
2649 {
2650  char *var;
2651  int usec;
2652 
2653  if (*argv[1] == ':')
2654  {
2655  if ((var = getVariable(st, argv[1] + 1)) == NULL)
2656  {
2657  fprintf(stderr, "%s: undefined variable \"%s\"\n",
2658  argv[0], argv[1]);
2659  return false;
2660  }
2661  usec = atoi(var);
2662  }
2663  else
2664  usec = atoi(argv[1]);
2665 
2666  if (argc > 2)
2667  {
2668  if (pg_strcasecmp(argv[2], "ms") == 0)
2669  usec *= 1000;
2670  else if (pg_strcasecmp(argv[2], "s") == 0)
2671  usec *= 1000000;
2672  }
2673  else
2674  usec *= 1000000;
2675 
2676  *usecs = usec;
2677  return true;
2678 }
2679 
2680 /*
2681  * Advance the state machine of a connection, if possible.
2682  */
2683 static void
2684 doCustom(TState *thread, CState *st, StatsData *agg)
2685 {
2686  PGresult *res;
2687  Command *command;
2688  instr_time now;
2689  bool end_tx_processed = false;
2690  int64 wait;
2691 
2692  /*
2693  * gettimeofday() isn't free, so we get the current timestamp lazily the
2694  * first time it's needed, and reuse the same value throughout this
2695  * function after that. This also ensures that e.g. the calculated
2696  * latency reported in the log file and in the totals are the same. Zero
2697  * means "not set yet". Reset "now" when we execute shell commands or
2698  * expressions, which might take a non-negligible amount of time, though.
2699  */
2700  INSTR_TIME_SET_ZERO(now);
2701 
2702  /*
2703  * Loop in the state machine, until we have to wait for a result from the
2704  * server (or have to sleep, for throttling or for \sleep).
2705  *
2706  * Note: In the switch-statement below, 'break' will loop back here,
2707  * meaning "continue in the state machine". Return is used to return to
2708  * the caller.
2709  */
2710  for (;;)
2711  {
2712  switch (st->state)
2713  {
2714  /*
2715  * Select transaction to run.
2716  */
2717  case CSTATE_CHOOSE_SCRIPT:
2718 
2719  st->use_file = chooseScript(thread);
2720 
2721  if (debug)
2722  fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
2723  sql_script[st->use_file].desc);
2724 
2725  if (throttle_delay > 0)
2727  else
2728  st->state = CSTATE_START_TX;
2729  /* check consistency */
2731  break;
2732 
2733  /*
2734  * Handle throttling once per transaction by sleeping.
2735  */
2736  case CSTATE_START_THROTTLE:
2737 
2738  /*
2739  * Generate a delay such that the series of delays will
2740  * approximate a Poisson distribution centered on the
2741  * throttle_delay time.
2742  *
2743  * If transactions are too slow or a given wait is shorter
2744  * than a transaction, the next transaction will start right
2745  * away.
2746  */
2747  Assert(throttle_delay > 0);
2748  wait = getPoissonRand(thread, throttle_delay);
2749 
2750  thread->throttle_trigger += wait;
2751  st->txn_scheduled = thread->throttle_trigger;
2752 
2753  /*
2754  * stop client if next transaction is beyond pgbench end of
2755  * execution
2756  */
2757  if (duration > 0 && st->txn_scheduled > end_time)
2758  {
2759  st->state = CSTATE_FINISHED;
2760  break;
2761  }
2762 
2763  /*
2764  * If --latency-limit is used, and this slot is already late
2765  * so that the transaction will miss the latency limit even if
2766  * it completed immediately, we skip this time slot and
2767  * iterate till the next slot that isn't late yet. But don't
2768  * iterate beyond the -t limit, if one is given.
2769  */
2770  if (latency_limit)
2771  {
2772  int64 now_us;
2773 
2774  if (INSTR_TIME_IS_ZERO(now))
2776  now_us = INSTR_TIME_GET_MICROSEC(now);
2777  while (thread->throttle_trigger < now_us - latency_limit &&
2778  (nxacts <= 0 || st->cnt < nxacts))
2779  {
2780  processXactStats(thread, st, &now, true, agg);
2781  /* next rendez-vous */
2782  wait = getPoissonRand(thread, throttle_delay);
2783  thread->throttle_trigger += wait;
2784  st->txn_scheduled = thread->throttle_trigger;
2785  }
2786  /* stop client if -t exceeded */
2787  if (nxacts > 0 && st->cnt >= nxacts)
2788  {
2789  st->state = CSTATE_FINISHED;
2790  break;
2791  }
2792  }
2793 
2794  st->state = CSTATE_THROTTLE;
2795  if (debug)
2796  fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
2797  st->id, wait);
2798  break;
2799 
2800  /*
2801  * Wait until it's time to start next transaction.
2802  */
2803  case CSTATE_THROTTLE:
2804  if (INSTR_TIME_IS_ZERO(now))
2806  if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
2807  return; /* Still sleeping, nothing to do here */
2808 
2809  /* Else done sleeping, start the transaction */
2810  st->state = CSTATE_START_TX;
2811  break;
2812 
2813  /* Start new transaction */
2814  case CSTATE_START_TX:
2815 
2816  /*
2817  * Establish connection on first call, or if is_connect is
2818  * true.
2819  */
2820  if (st->con == NULL)
2821  {
2822  instr_time start;
2823 
2824  if (INSTR_TIME_IS_ZERO(now))
2826  start = now;
2827  if ((st->con = doConnect()) == NULL)
2828  {
2829  fprintf(stderr, "client %d aborted while establishing connection\n",
2830  st->id);
2831  st->state = CSTATE_ABORTED;
2832  break;
2833  }
2835  INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
2836 
2837  /* Reset session-local state */
2838  memset(st->prepared, 0, sizeof(st->prepared));
2839  }
2840 
2841  /*
2842  * Record transaction start time under logging, progress or
2843  * throttling.
2844  */
2845  if (use_log || progress || throttle_delay || latency_limit ||
2846  per_script_stats)
2847  {
2848  if (INSTR_TIME_IS_ZERO(now))
2850  st->txn_begin = now;
2851 
2852  /*
2853  * When not throttling, this is also the transaction's
2854  * scheduled start time.
2855  */
2856  if (!throttle_delay)
2858  }
2859 
2860  /* Begin with the first command */
2861  st->command = 0;
2863  break;
2864 
2865  /*
2866  * Send a command to server (or execute a meta-command)
2867  */
2868  case CSTATE_START_COMMAND:
2869  command = sql_script[st->use_file].commands[st->command];
2870 
2871  /*
2872  * If we reached the end of the script, move to end-of-xact
2873  * processing.
2874  */
2875  if (command == NULL)
2876  {
2877  st->state = CSTATE_END_TX;
2878  break;
2879  }
2880 
2881  /*
2882  * Record statement start time if per-command latencies are
2883  * requested
2884  */
2885  if (is_latencies)
2886  {
2887  if (INSTR_TIME_IS_ZERO(now))
2889  st->stmt_begin = now;
2890  }
2891 
2892  if (command->type == SQL_COMMAND)
2893  {
2894  if (!sendCommand(st, command))
2895  {
2896  commandFailed(st, "SQL", "SQL command send failed");
2897  st->state = CSTATE_ABORTED;
2898  }
2899  else
2900  st->state = CSTATE_WAIT_RESULT;
2901  }
2902  else if (command->type == META_COMMAND)
2903  {
2904  int argc = command->argc,
2905  i;
2906  char **argv = command->argv;
2907 
2908  if (debug)
2909  {
2910  fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
2911  for (i = 1; i < argc; i++)
2912  fprintf(stderr, " %s", argv[i]);
2913  fprintf(stderr, "\n");
2914  }
2915 
2916  if (command->meta == META_SLEEP)
2917  {
2918  /*
2919  * A \sleep doesn't execute anything, we just get the
2920  * delay from the argument, and enter the CSTATE_SLEEP
2921  * state. (The per-command latency will be recorded
2922  * in CSTATE_SLEEP state, not here, after the delay
2923  * has elapsed.)
2924  */
2925  int usec;
2926 
2927  if (!evaluateSleep(st, argc, argv, &usec))
2928  {
2929  commandFailed(st, "sleep", "execution of meta-command failed");
2930  st->state = CSTATE_ABORTED;
2931  break;
2932  }
2933 
2934  if (INSTR_TIME_IS_ZERO(now))
2936  st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
2937  st->state = CSTATE_SLEEP;
2938  break;
2939  }
2940  else if (command->meta == META_SET ||
2941  command->meta == META_IF ||
2942  command->meta == META_ELIF)
2943  {
2944  /* backslash commands with an expression to evaluate */
2945  PgBenchExpr *expr = command->expr;
2946  PgBenchValue result;
2947 
2948  if (command->meta == META_ELIF &&
2950  {
2951  /*
2952  * elif after executed block, skip eval and wait
2953  * for endif
2954  */
2956  goto move_to_end_command;
2957  }
2958 
2959  if (!evaluateExpr(thread, st, expr, &result))
2960  {
2961  commandFailed(st, argv[0], "evaluation of meta-command failed");
2962  st->state = CSTATE_ABORTED;
2963  break;
2964  }
2965 
2966  if (command->meta == META_SET)
2967  {
2968  if (!putVariableValue(st, argv[0], argv[1], &result))
2969  {
2970  commandFailed(st, "set", "assignment of meta-command failed");
2971  st->state = CSTATE_ABORTED;
2972  break;
2973  }
2974  }
2975  else /* if and elif evaluated cases */
2976  {
2977  bool cond = valueTruth(&result);
2978 
2979  /* execute or not depending on evaluated condition */
2980  if (command->meta == META_IF)
2981  {
2983  }
2984  else /* elif */
2985  {
2986  /*
2987  * we should get here only if the "elif"
2988  * needed evaluation
2989  */
2992  }
2993  }
2994  }
2995  else if (command->meta == META_ELSE)
2996  {
2997  switch (conditional_stack_peek(st->cstack))
2998  {
2999  case IFSTATE_TRUE:
3001  break;
3002  case IFSTATE_FALSE: /* inconsistent if active */
3003  case IFSTATE_IGNORED: /* inconsistent if active */
3004  case IFSTATE_NONE: /* else without if */
3005  case IFSTATE_ELSE_TRUE: /* else after else */
3006  case IFSTATE_ELSE_FALSE: /* else after else */
3007  default:
3008  /* dead code if conditional check is ok */
3009  Assert(false);
3010  }
3011  goto move_to_end_command;
3012  }
3013  else if (command->meta == META_ENDIF)
3014  {
3017  goto move_to_end_command;
3018  }
3019  else if (command->meta == META_SETSHELL)
3020  {
3021  bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
3022 
3023  if (timer_exceeded) /* timeout */
3024  {
3025  st->state = CSTATE_FINISHED;
3026  break;
3027  }
3028  else if (!ret) /* on error */
3029  {
3030  commandFailed(st, "setshell", "execution of meta-command failed");
3031  st->state = CSTATE_ABORTED;
3032  break;
3033  }
3034  else
3035  {
3036  /* succeeded */
3037  }
3038  }
3039  else if (command->meta == META_SHELL)
3040  {
3041  bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
3042 
3043  if (timer_exceeded) /* timeout */
3044  {
3045  st->state = CSTATE_FINISHED;
3046  break;
3047  }
3048  else if (!ret) /* on error */
3049  {
3050  commandFailed(st, "shell", "execution of meta-command failed");
3051  st->state = CSTATE_ABORTED;
3052  break;
3053  }
3054  else
3055  {
3056  /* succeeded */
3057  }
3058  }
3059 
3060  move_to_end_command:
3061 
3062  /*
3063  * executing the expression or shell command might take a
3064  * non-negligible amount of time, so reset 'now'
3065  */
3066  INSTR_TIME_SET_ZERO(now);
3067 
3068  st->state = CSTATE_END_COMMAND;
3069  }
3070  break;
3071 
3072  /*
3073  * non executed conditional branch
3074  */
3075  case CSTATE_SKIP_COMMAND:
3077  /* quickly skip commands until something to do... */
3078  while (true)
3079  {
3080  command = sql_script[st->use_file].commands[st->command];
3081 
3082  /* cannot reach end of script in that state */
3083  Assert(command != NULL);
3084 
3085  /*
3086  * if this is conditional related, update conditional
3087  * state
3088  */
3089  if (command->type == META_COMMAND &&
3090  (command->meta == META_IF ||
3091  command->meta == META_ELIF ||
3092  command->meta == META_ELSE ||
3093  command->meta == META_ENDIF))
3094  {
3095  switch (conditional_stack_peek(st->cstack))
3096  {
3097  case IFSTATE_FALSE:
3098  if (command->meta == META_IF || command->meta == META_ELIF)
3099  {
3100  /* we must evaluate the condition */
3102  }
3103  else if (command->meta == META_ELSE)
3104  {
3105  /* we must execute next command */
3108  st->command++;
3109  }
3110  else if (command->meta == META_ENDIF)
3111  {
3114  if (conditional_active(st->cstack))
3116 
3117  /*
3118  * else state remains in
3119  * CSTATE_SKIP_COMMAND
3120  */
3121  st->command++;
3122  }
3123  break;
3124 
3125  case IFSTATE_IGNORED:
3126  case IFSTATE_ELSE_FALSE:
3127  if (command->meta == META_IF)
3129  else if (command->meta == META_ENDIF)
3130  {
3133  if (conditional_active(st->cstack))
3135  }
3136  /* could detect "else" & "elif" after "else" */
3137  st->command++;
3138  break;
3139 
3140  case IFSTATE_NONE:
3141  case IFSTATE_TRUE:
3142  case IFSTATE_ELSE_TRUE:
3143  default:
3144 
3145  /*
3146  * inconsistent if inactive, unreachable dead
3147  * code
3148  */
3149  Assert(false);
3150  }
3151  }
3152  else
3153  {
3154  /* skip and consider next */
3155  st->command++;
3156  }
3157 
3158  if (st->state != CSTATE_SKIP_COMMAND)
3159  break;
3160  }
3161  break;
3162 
3163  /*
3164  * Wait for the current SQL command to complete
3165  */
3166  case CSTATE_WAIT_RESULT:
3167  command = sql_script[st->use_file].commands[st->command];
3168  if (debug)
3169  fprintf(stderr, "client %d receiving\n", st->id);
3170  if (!PQconsumeInput(st->con))
3171  { /* there's something wrong */
3172  commandFailed(st, "SQL", "perhaps the backend died while processing");
3173  st->state = CSTATE_ABORTED;
3174  break;
3175  }
3176  if (PQisBusy(st->con))
3177  return; /* don't have the whole result yet */
3178 
3179  /*
3180  * Read and discard the query result;
3181  */
3182  res = PQgetResult(st->con);
3183  switch (PQresultStatus(res))
3184  {
3185  case PGRES_COMMAND_OK:
3186  case PGRES_TUPLES_OK:
3187  case PGRES_EMPTY_QUERY:
3188  /* OK */
3189  PQclear(res);
3190  discard_response(st);
3191  st->state = CSTATE_END_COMMAND;
3192  break;
3193  default:
3194  commandFailed(st, "SQL", PQerrorMessage(st->con));
3195  PQclear(res);
3196  st->state = CSTATE_ABORTED;
3197  break;
3198  }
3199  break;
3200 
3201  /*
3202  * Wait until sleep is done. This state is entered after a
3203  * \sleep metacommand. The behavior is similar to
3204  * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
3205  * instead of CSTATE_START_TX.
3206  */
3207  case CSTATE_SLEEP:
3208  if (INSTR_TIME_IS_ZERO(now))
3210  if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
3211  return; /* Still sleeping, nothing to do here */
3212  /* Else done sleeping. */
3213  st->state = CSTATE_END_COMMAND;
3214  break;
3215 
3216  /*
3217  * End of command: record stats and proceed to next command.
3218  */
3219  case CSTATE_END_COMMAND:
3220 
3221  /*
3222  * command completed: accumulate per-command execution times
3223  * in thread-local data structure, if per-command latencies
3224  * are requested.
3225  */
3226  if (is_latencies)
3227  {
3228  if (INSTR_TIME_IS_ZERO(now))
3230 
3231  /* XXX could use a mutex here, but we choose not to */
3232  command = sql_script[st->use_file].commands[st->command];
3233  addToSimpleStats(&command->stats,
3234  INSTR_TIME_GET_DOUBLE(now) -
3236  }
3237 
3238  /* Go ahead with next command, to be executed or skipped */
3239  st->command++;
3240  st->state = conditional_active(st->cstack) ?
3242  break;
3243 
3244  /*
3245  * End of transaction.
3246  */
3247  case CSTATE_END_TX:
3248 
3249  /* transaction finished: calculate latency and do log */
3250  processXactStats(thread, st, &now, false, agg);
3251 
3252  /* conditional stack must be empty */
3253  if (!conditional_stack_empty(st->cstack))
3254  {
3255  fprintf(stderr, "end of script reached within a conditional, missing \\endif\n");
3256  exit(1);
3257  }
3258 
3259  if (is_connect)
3260  {
3261  finishCon(st);
3262  INSTR_TIME_SET_ZERO(now);
3263  }
3264 
3265  if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
3266  {
3267  /* exit success */
3268  st->state = CSTATE_FINISHED;
3269  break;
3270  }
3271 
3272  /*
3273  * No transaction is underway anymore.
3274  */
3276 
3277  /*
3278  * If we paced through all commands in the script in this
3279  * loop, without returning to the caller even once, do it now.
3280  * This gives the thread a chance to process other
3281  * connections, and to do progress reporting. This can
3282  * currently only happen if the script consists entirely of
3283  * meta-commands.
3284  */
3285  if (end_tx_processed)
3286  return;
3287  else
3288  {
3289  end_tx_processed = true;
3290  break;
3291  }
3292 
3293  /*
3294  * Final states. Close the connection if it's still open.
3295  */
3296  case CSTATE_ABORTED:
3297  case CSTATE_FINISHED:
3298  finishCon(st);
3299  return;
3300  }
3301  }
3302 }
3303 
3304 /*
3305  * Print log entry after completing one transaction.
3306  *
3307  * We print Unix-epoch timestamps in the log, so that entries can be
3308  * correlated against other logs. On some platforms this could be obtained
3309  * from the instr_time reading the caller has, but rather than get entangled
3310  * with that, we just eat the cost of an extra syscall in all cases.
3311  */
3312 static void
3313 doLog(TState *thread, CState *st,
3314  StatsData *agg, bool skipped, double latency, double lag)
3315 {
3316  FILE *logfile = thread->logfile;
3317 
3318  Assert(use_log);
3319 
3320  /*
3321  * Skip the log entry if sampling is enabled and this row doesn't belong
3322  * to the random sample.
3323  */
3324  if (sample_rate != 0.0 &&
3325  pg_erand48(thread->random_state) > sample_rate)
3326  return;
3327 
3328  /* should we aggregate the results or not? */
3329  if (agg_interval > 0)
3330  {
3331  /*
3332  * Loop until we reach the interval of the current moment, and print
3333  * any empty intervals in between (this may happen with very low tps,
3334  * e.g. --rate=0.1).
3335  */
3336  time_t now = time(NULL);
3337 
3338  while (agg->start_time + agg_interval <= now)
3339  {
3340  /* print aggregated report to logfile */
3341  fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
3342  (long) agg->start_time,
3343  agg->cnt,
3344  agg->latency.sum,
3345  agg->latency.sum2,
3346  agg->latency.min,
3347  agg->latency.max);
3348  if (throttle_delay)
3349  {
3350  fprintf(logfile, " %.0f %.0f %.0f %.0f",
3351  agg->lag.sum,
3352  agg->lag.sum2,
3353  agg->lag.min,
3354  agg->lag.max);
3355  if (latency_limit)
3356  fprintf(logfile, " " INT64_FORMAT, agg->skipped);
3357  }
3358  fputc('\n', logfile);
3359 
3360  /* reset data and move to next interval */
3361  initStats(agg, agg->start_time + agg_interval);
3362  }
3363 
3364  /* accumulate the current transaction */
3365  accumStats(agg, skipped, latency, lag);
3366  }
3367  else
3368  {
3369  /* no, print raw transactions */
3370  struct timeval tv;
3371 
3372  gettimeofday(&tv, NULL);
3373  if (skipped)
3374  fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
3375  st->id, st->cnt, st->use_file,
3376  (long) tv.tv_sec, (long) tv.tv_usec);
3377  else
3378  fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
3379  st->id, st->cnt, latency, st->use_file,
3380  (long) tv.tv_sec, (long) tv.tv_usec);
3381  if (throttle_delay)
3382  fprintf(logfile, " %.0f", lag);
3383  fputc('\n', logfile);
3384  }
3385 }
3386 
3387 /*
3388  * Accumulate and report statistics at end of a transaction.
3389  *
3390  * (This is also called when a transaction is late and thus skipped.
3391  * Note that even skipped transactions are counted in the "cnt" fields.)
3392  */
3393 static void
3395  bool skipped, StatsData *agg)
3396 {
3397  double latency = 0.0,
3398  lag = 0.0;
3399  bool thread_details = progress || throttle_delay || latency_limit,
3400  detailed = thread_details || use_log || per_script_stats;
3401 
3402  if (detailed && !skipped)
3403  {
3404  if (INSTR_TIME_IS_ZERO(*now))
3405  INSTR_TIME_SET_CURRENT(*now);
3406 
3407  /* compute latency & lag */
3408  latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
3410  }
3411 
3412  if (thread_details)
3413  {
3414  /* keep detailed thread stats */
3415  accumStats(&thread->stats, skipped, latency, lag);
3416 
3417  /* count transactions over the latency limit, if needed */
3418  if (latency_limit && latency > latency_limit)
3419  thread->latency_late++;
3420  }
3421  else
3422  {
3423  /* no detailed stats, just count */
3424  thread->stats.cnt++;
3425  }
3426 
3427  /* client stat is just counting */
3428  st->cnt++;
3429 
3430  if (use_log)
3431  doLog(thread, st, agg, skipped, latency, lag);
3432 
3433  /* XXX could use a mutex here, but we choose not to */
3434  if (per_script_stats)
3435  accumStats(&sql_script[st->use_file].stats, skipped, latency, lag);
3436 }
3437 
3438 
3439 /* discard connections */
3440 static void
3442 {
3443  int i;
3444 
3445  for (i = 0; i < length; i++)
3446  finishCon(&state[i]);
3447 }
3448 
3449 /*
3450  * Remove old pgbench tables, if any exist
3451  */
3452 static void
3454 {
3455  fprintf(stderr, "dropping old tables...\n");
3456 
3457  /*
3458  * We drop all the tables in one command, so that whether there are
3459  * foreign key dependencies or not doesn't matter.
3460  */
3461  executeStatement(con, "drop table if exists "
3462  "pgbench_accounts, "
3463  "pgbench_branches, "
3464  "pgbench_history, "
3465  "pgbench_tellers");
3466 }
3467 
3468 /*
3469  * Create pgbench's standard tables
3470  */
3471 static void
3473 {
3474  /*
3475  * The scale factor at/beyond which 32-bit integers are insufficient for
3476  * storing TPC-B account IDs.
3477  *
3478  * Although the actual threshold is 21474, we use 20000 because it is
3479  * easier to document and remember, and isn't that far away from the real
3480  * threshold.
3481  */
3482 #define SCALE_32BIT_THRESHOLD 20000
3483 
3484  /*
3485  * Note: TPC-B requires at least 100 bytes per row, and the "filler"
3486  * fields in these table declarations were intended to comply with that.
3487  * The pgbench_accounts table complies with that because the "filler"
3488  * column is set to blank-padded empty string. But for all other tables
3489  * the columns default to NULL and so don't actually take any space. We
3490  * could fix that by giving them non-null default values. However, that
3491  * would completely break comparability of pgbench results with prior
3492  * versions. Since pgbench has never pretended to be fully TPC-B compliant
3493  * anyway, we stick with the historical behavior.
3494  */
3495  struct ddlinfo
3496  {
3497  const char *table; /* table name */
3498  const char *smcols; /* column decls if accountIDs are 32 bits */
3499  const char *bigcols; /* column decls if accountIDs are 64 bits */
3500  int declare_fillfactor;
3501  };
3502  static const struct ddlinfo DDLs[] = {
3503  {
3504  "pgbench_history",
3505  "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
3506  "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
3507  0
3508  },
3509  {
3510  "pgbench_tellers",
3511  "tid int not null,bid int,tbalance int,filler char(84)",
3512  "tid int not null,bid int,tbalance int,filler char(84)",
3513  1
3514  },
3515  {
3516  "pgbench_accounts",
3517  "aid int not null,bid int,abalance int,filler char(84)",
3518  "aid bigint not null,bid int,abalance int,filler char(84)",
3519  1
3520  },
3521  {
3522  "pgbench_branches",
3523  "bid int not null,bbalance int,filler char(88)",
3524  "bid int not null,bbalance int,filler char(88)",
3525  1
3526  }
3527  };
3528  int i;
3529 
3530  fprintf(stderr, "creating tables...\n");
3531 
3532  for (i = 0; i < lengthof(DDLs); i++)
3533  {
3534  char opts[256];
3535  char buffer[256];
3536  const struct ddlinfo *ddl = &DDLs[i];
3537  const char *cols;
3538 
3539  /* Construct new create table statement. */
3540  opts[0] = '\0';
3541  if (ddl->declare_fillfactor)
3542  snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
3543  " with (fillfactor=%d)", fillfactor);
3544  if (tablespace != NULL)
3545  {
3546  char *escape_tablespace;
3547 
3548  escape_tablespace = PQescapeIdentifier(con, tablespace,
3549  strlen(tablespace));
3550  snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
3551  " tablespace %s", escape_tablespace);
3552  PQfreemem(escape_tablespace);
3553  }
3554 
3555  cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
3556 
3557  snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
3558  unlogged_tables ? " unlogged" : "",
3559  ddl->table, cols, opts);
3560 
3561  executeStatement(con, buffer);
3562  }
3563 }
3564 
3565 /*
3566  * Fill the standard tables with some data
3567  */
3568 static void
3570 {
3571  char sql[256];
3572  PGresult *res;
3573  int i;
3574  int64 k;
3575 
3576  /* used to track elapsed time and estimate of the remaining time */
3577  instr_time start,
3578  diff;
3579  double elapsed_sec,
3580  remaining_sec;
3581  int log_interval = 1;
3582 
3583  fprintf(stderr, "generating data...\n");
3584 
3585  /*
3586  * we do all of this in one transaction to enable the backend's
3587  * data-loading optimizations
3588  */
3589  executeStatement(con, "begin");
3590 
3591  /*
3592  * truncate away any old data, in one command in case there are foreign
3593  * keys
3594  */
3595  executeStatement(con, "truncate table "
3596  "pgbench_accounts, "
3597  "pgbench_branches, "
3598  "pgbench_history, "
3599  "pgbench_tellers");
3600 
3601  /*
3602  * fill branches, tellers, accounts in that order in case foreign keys
3603  * already exist
3604  */
3605  for (i = 0; i < nbranches * scale; i++)
3606  {
3607  /* "filler" column defaults to NULL */
3608  snprintf(sql, sizeof(sql),
3609  "insert into pgbench_branches(bid,bbalance) values(%d,0)",
3610  i + 1);
3611  executeStatement(con, sql);
3612  }
3613 
3614  for (i = 0; i < ntellers * scale; i++)
3615  {
3616  /* "filler" column defaults to NULL */
3617  snprintf(sql, sizeof(sql),
3618  "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
3619  i + 1, i / ntellers + 1);
3620  executeStatement(con, sql);
3621  }
3622 
3623  /*
3624  * accounts is big enough to be worth using COPY and tracking runtime
3625  */
3626  res = PQexec(con, "copy pgbench_accounts from stdin");
3627  if (PQresultStatus(res) != PGRES_COPY_IN)
3628  {
3629  fprintf(stderr, "%s", PQerrorMessage(con));
3630  exit(1);
3631  }
3632  PQclear(res);
3633 
3634  INSTR_TIME_SET_CURRENT(start);
3635 
3636  for (k = 0; k < (int64) naccounts * scale; k++)
3637  {
3638  int64 j = k + 1;
3639 
3640  /* "filler" column defaults to blank padded empty string */
3641  snprintf(sql, sizeof(sql),
3642  INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
3643  j, k / naccounts + 1, 0);
3644  if (PQputline(con, sql))
3645  {
3646  fprintf(stderr, "PQputline failed\n");
3647  exit(1);
3648  }
3649 
3650  /*
3651  * If we want to stick with the original logging, print a message each
3652  * 100k inserted rows.
3653  */
3654  if ((!use_quiet) && (j % 100000 == 0))
3655  {
3656  INSTR_TIME_SET_CURRENT(diff);
3657  INSTR_TIME_SUBTRACT(diff, start);
3658 
3659  elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
3660  remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
3661 
3662  fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
3663  j, (int64) naccounts * scale,
3664  (int) (((int64) j * 100) / (naccounts * (int64) scale)),
3665  elapsed_sec, remaining_sec);
3666  }
3667  /* let's not call the timing for each row, but only each 100 rows */
3668  else if (use_quiet && (j % 100 == 0))
3669  {
3670  INSTR_TIME_SET_CURRENT(diff);
3671  INSTR_TIME_SUBTRACT(diff, start);
3672 
3673  elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
3674  remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
3675 
3676  /* have we reached the next interval (or end)? */
3677  if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
3678  {
3679  fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
3680  j, (int64) naccounts * scale,
3681  (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
3682 
3683  /* skip to the next interval */
3684  log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
3685  }
3686  }
3687 
3688  }
3689  if (PQputline(con, "\\.\n"))
3690  {
3691  fprintf(stderr, "very last PQputline failed\n");
3692  exit(1);
3693  }
3694  if (PQendcopy(con))
3695  {
3696  fprintf(stderr, "PQendcopy failed\n");
3697  exit(1);
3698  }
3699 
3700  executeStatement(con, "commit");
3701 }
3702 
3703 /*
3704  * Invoke vacuum on the standard tables
3705  */
3706 static void
3708 {
3709  fprintf(stderr, "vacuuming...\n");
3710  executeStatement(con, "vacuum analyze pgbench_branches");
3711  executeStatement(con, "vacuum analyze pgbench_tellers");
3712  executeStatement(con, "vacuum analyze pgbench_accounts");
3713  executeStatement(con, "vacuum analyze pgbench_history");
3714 }
3715 
3716 /*
3717  * Create primary keys on the standard tables
3718  */
3719 static void
3721 {
3722  static const char *const DDLINDEXes[] = {
3723  "alter table pgbench_branches add primary key (bid)",
3724  "alter table pgbench_tellers add primary key (tid)",
3725  "alter table pgbench_accounts add primary key (aid)"
3726  };
3727  int i;
3728 
3729  fprintf(stderr, "creating primary keys...\n");
3730  for (i = 0; i < lengthof(DDLINDEXes); i++)
3731  {
3732  char buffer[256];
3733 
3734  strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
3735 
3736  if (index_tablespace != NULL)
3737  {
3738  char *escape_tablespace;
3739 
3740  escape_tablespace = PQescapeIdentifier(con, index_tablespace,
3741  strlen(index_tablespace));
3742  snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
3743  " using index tablespace %s", escape_tablespace);
3744  PQfreemem(escape_tablespace);
3745  }
3746 
3747  executeStatement(con, buffer);
3748  }
3749 }
3750 
3751 /*
3752  * Create foreign key constraints between the standard tables
3753  */
3754 static void
3756 {
3757  static const char *const DDLKEYs[] = {
3758  "alter table pgbench_tellers add constraint pgbench_tellers_bid_fkey foreign key (bid) references pgbench_branches",
3759  "alter table pgbench_accounts add constraint pgbench_accounts_bid_fkey foreign key (bid) references pgbench_branches",
3760  "alter table pgbench_history add constraint pgbench_history_bid_fkey foreign key (bid) references pgbench_branches",
3761  "alter table pgbench_history add constraint pgbench_history_tid_fkey foreign key (tid) references pgbench_tellers",
3762  "alter table pgbench_history add constraint pgbench_history_aid_fkey foreign key (aid) references pgbench_accounts"
3763  };
3764  int i;
3765 
3766  fprintf(stderr, "creating foreign keys...\n");
3767  for (i = 0; i < lengthof(DDLKEYs); i++)
3768  {
3769  executeStatement(con, DDLKEYs[i]);
3770  }
3771 }
3772 
3773 /*
3774  * Validate an initialization-steps string
3775  *
3776  * (We could just leave it to runInitSteps() to fail if there are wrong
3777  * characters, but since initialization can take awhile, it seems friendlier
3778  * to check during option parsing.)
3779  */
3780 static void
3781 checkInitSteps(const char *initialize_steps)
3782 {
3783  const char *step;
3784 
3785  if (initialize_steps[0] == '\0')
3786  {
3787  fprintf(stderr, "no initialization steps specified\n");
3788  exit(1);
3789  }
3790 
3791  for (step = initialize_steps; *step != '\0'; step++)
3792  {
3793  if (strchr("dtgvpf ", *step) == NULL)
3794  {
3795  fprintf(stderr, "unrecognized initialization step \"%c\"\n",
3796  *step);
3797  fprintf(stderr, "allowed steps are: \"d\", \"t\", \"g\", \"v\", \"p\", \"f\"\n");
3798  exit(1);
3799  }
3800  }
3801 }
3802 
3803 /*
3804  * Invoke each initialization step in the given string
3805  */
3806 static void
3807 runInitSteps(const char *initialize_steps)
3808 {
3809  PGconn *con;
3810  const char *step;
3811 
3812  if ((con = doConnect()) == NULL)
3813  exit(1);
3814 
3815  for (step = initialize_steps; *step != '\0'; step++)
3816  {
3817  switch (*step)
3818  {
3819  case 'd':
3820  initDropTables(con);
3821  break;
3822  case 't':
3823  initCreateTables(con);
3824  break;
3825  case 'g':
3826  initGenerateData(con);
3827  break;
3828  case 'v':
3829  initVacuum(con);
3830  break;
3831  case 'p':
3832  initCreatePKeys(con);
3833  break;
3834  case 'f':
3835  initCreateFKeys(con);
3836  break;
3837  case ' ':
3838  break; /* ignore */
3839  default:
3840  fprintf(stderr, "unrecognized initialization step \"%c\"\n",
3841  *step);
3842  PQfinish(con);
3843  exit(1);
3844  }
3845  }
3846 
3847  fprintf(stderr, "done.\n");
3848  PQfinish(con);
3849 }
3850 
3851 /*
3852  * Replace :param with $n throughout the command's SQL text, which
3853  * is a modifiable string in cmd->argv[0].
3854  */
3855 static bool
3857 {
3858  char *sql,
3859  *p;
3860 
3861  /* We don't want to scribble on cmd->argv[0] until done */
3862  sql = pg_strdup(cmd->argv[0]);
3863 
3864  cmd->argc = 1;
3865 
3866  p = sql;
3867  while ((p = strchr(p, ':')) != NULL)
3868  {
3869  char var[13];
3870  char *name;
3871  int eaten;
3872 
3873  name = parseVariable(p, &eaten);
3874  if (name == NULL)
3875  {
3876  while (*p == ':')
3877  {
3878  p++;
3879  }
3880  continue;
3881  }
3882 
3883  if (cmd->argc >= MAX_ARGS)
3884  {
3885  fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n",
3886  MAX_ARGS - 1, cmd->argv[0]);
3887  pg_free(name);
3888  return false;
3889  }
3890 
3891  sprintf(var, "$%d", cmd->argc);
3892  p = replaceVariable(&sql, p, eaten, var);
3893 
3894  cmd->argv[cmd->argc] = name;
3895  cmd->argc++;
3896  }
3897 
3898  pg_free(cmd->argv[0]);
3899  cmd->argv[0] = sql;
3900  return true;
3901 }
3902 
3903 /*
3904  * Simple error-printing function, might be needed by lexer
3905  */
3906 static void
3907 pgbench_error(const char *fmt,...)
3908 {
3909  va_list ap;
3910 
3911  fflush(stdout);
3912  va_start(ap, fmt);
3913  vfprintf(stderr, _(fmt), ap);
3914  va_end(ap);
3915 }
3916 
3917 /*
3918  * syntax error while parsing a script (in practice, while parsing a
3919  * backslash command, because we don't detect syntax errors in SQL)
3920  *
3921  * source: source of script (filename or builtin-script ID)
3922  * lineno: line number within script (count from 1)
3923  * line: whole line of backslash command, if available
3924  * command: backslash command name, if available
3925  * msg: the actual error message
3926  * more: optional extra message
3927  * column: zero-based column number, or -1 if unknown
3928  */
3929 void
3930 syntax_error(const char *source, int lineno,
3931  const char *line, const char *command,
3932  const char *msg, const char *more, int column)
3933 {
3934  fprintf(stderr, "%s:%d: %s", source, lineno, msg);
3935  if (more != NULL)
3936  fprintf(stderr, " (%s)", more);
3937  if (column >= 0 && line == NULL)
3938  fprintf(stderr, " at column %d", column + 1);
3939  if (command != NULL)
3940  fprintf(stderr, " in command \"%s\"", command);
3941  fprintf(stderr, "\n");
3942  if (line != NULL)
3943  {
3944  fprintf(stderr, "%s\n", line);
3945  if (column >= 0)
3946  {
3947  int i;
3948 
3949  for (i = 0; i < column; i++)
3950  fprintf(stderr, " ");
3951  fprintf(stderr, "^ error found here\n");
3952  }
3953  }
3954  exit(1);
3955 }
3956 
3957 /*
3958  * Parse a SQL command; return a Command struct, or NULL if it's a comment
3959  *
3960  * On entry, psqlscan.l has collected the command into "buf", so we don't
3961  * really need to do much here except check for comment and set up a
3962  * Command struct.
3963  */
3964 static Command *
3966 {
3967  Command *my_command;
3968  char *p;
3969  char *nlpos;
3970 
3971  /* Skip any leading whitespace, as well as "--" style comments */
3972  p = buf->data;
3973  for (;;)
3974  {
3975  if (isspace((unsigned char) *p))
3976  p++;
3977  else if (strncmp(p, "--", 2) == 0)
3978  {
3979  p = strchr(p, '\n');
3980  if (p == NULL)
3981  return NULL;
3982  p++;
3983  }
3984  else
3985  break;
3986  }
3987 
3988  /* If there's nothing but whitespace and comments, we're done */
3989  if (*p == '\0')
3990  return NULL;
3991 
3992  /* Allocate and initialize Command structure */
3993  my_command = (Command *) pg_malloc0(sizeof(Command));
3994  my_command->command_num = num_commands++;
3995  my_command->type = SQL_COMMAND;
3996  my_command->meta = META_NONE;
3997  initSimpleStats(&my_command->stats);
3998 
3999  /*
4000  * Install query text as the sole argv string. If we are using a
4001  * non-simple query mode, we'll extract parameters from it later.
4002  */
4003  my_command->argv[0] = pg_strdup(p);
4004  my_command->argc = 1;
4005 
4006  /*
4007  * If SQL command is multi-line, we only want to save the first line as
4008  * the "line" label.
4009  */
4010  nlpos = strchr(p, '\n');
4011  if (nlpos)
4012  {
4013  my_command->line = pg_malloc(nlpos - p + 1);
4014  memcpy(my_command->line, p, nlpos - p);
4015  my_command->line[nlpos - p] = '\0';
4016  }
4017  else
4018  my_command->line = pg_strdup(p);
4019 
4020  return my_command;
4021 }
4022 
4023 /*
4024  * Parse a backslash command; return a Command struct, or NULL if comment
4025  *
4026  * At call, we have scanned only the initial backslash.
4027  */
4028 static Command *
4029 process_backslash_command(PsqlScanState sstate, const char *source)
4030 {
4031  Command *my_command;
4032  PQExpBufferData word_buf;
4033  int word_offset;
4034  int offsets[MAX_ARGS]; /* offsets of argument words */
4035  int start_offset;
4036  int lineno;
4037  int j;
4038 
4039  initPQExpBuffer(&word_buf);
4040 
4041  /* Remember location of the backslash */
4042  start_offset = expr_scanner_offset(sstate) - 1;
4043  lineno = expr_scanner_get_lineno(sstate, start_offset);
4044 
4045  /* Collect first word of command */
4046  if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
4047  {
4048  termPQExpBuffer(&word_buf);
4049  return NULL;
4050  }
4051 
4052  /* Allocate and initialize Command structure */
4053  my_command = (Command *) pg_malloc0(sizeof(Command));
4054  my_command->command_num = num_commands++;
4055  my_command->type = META_COMMAND;
4056  my_command->argc = 0;
4057  initSimpleStats(&my_command->stats);
4058 
4059  /* Save first word (command name) */
4060  j = 0;
4061  offsets[j] = word_offset;
4062  my_command->argv[j++] = pg_strdup(word_buf.data);
4063  my_command->argc++;
4064 
4065  /* ... and convert it to enum form */
4066  my_command->meta = getMetaCommand(my_command->argv[0]);
4067 
4068  if (my_command->meta == META_SET ||
4069  my_command->meta == META_IF ||
4070  my_command->meta == META_ELIF)
4071  {
4073 
4074  /* For \set, collect var name */
4075  if (my_command->meta == META_SET)
4076  {
4077  if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
4078  syntax_error(source, lineno, my_command->line, my_command->argv[0],
4079  "missing argument", NULL, -1);
4080 
4081  offsets[j] = word_offset;
4082  my_command->argv[j++] = pg_strdup(word_buf.data);
4083  my_command->argc++;
4084  }
4085 
4086  /* then for all parse the expression */
4087  yyscanner = expr_scanner_init(sstate, source, lineno, start_offset,
4088  my_command->argv[0]);
4089 
4090  if (expr_yyparse(yyscanner) != 0)
4091  {
4092  /* dead code: exit done from syntax_error called by yyerror */
4093  exit(1);
4094  }
4095 
4096  my_command->expr = expr_parse_result;
4097 
4098  /* Save line, trimming any trailing newline */
4099  my_command->line = expr_scanner_get_substring(sstate,
4100  start_offset,
4101  expr_scanner_offset(sstate),
4102  true);
4103 
4104  expr_scanner_finish(yyscanner);
4105 
4106  termPQExpBuffer(&word_buf);
4107 
4108  return my_command;
4109  }
4110 
4111  /* For all other commands, collect remaining words. */
4112  while (expr_lex_one_word(sstate, &word_buf, &word_offset))
4113  {
4114  if (j >= MAX_ARGS)
4115  syntax_error(source, lineno, my_command->line, my_command->argv[0],
4116  "too many arguments", NULL, -1);
4117 
4118  offsets[j] = word_offset;
4119  my_command->argv[j++] = pg_strdup(word_buf.data);
4120  my_command->argc++;
4121  }
4122 
4123  /* Save line, trimming any trailing newline */
4124  my_command->line = expr_scanner_get_substring(sstate,
4125  start_offset,
4126  expr_scanner_offset(sstate),
4127  true);
4128 
4129  if (my_command->meta == META_SLEEP)
4130  {
4131  if (my_command->argc < 2)
4132  syntax_error(source, lineno, my_command->line, my_command->argv[0],
4133  "missing argument", NULL, -1);
4134 
4135  if (my_command->argc > 3)
4136  syntax_error(source, lineno, my_command->line, my_command->argv[0],
4137  "too many arguments", NULL,
4138  offsets[3] - start_offset);
4139 
4140  /*
4141  * Split argument into number and unit to allow "sleep 1ms" etc. We
4142  * don't have to terminate the number argument with null because it
4143  * will be parsed with atoi, which ignores trailing non-digit
4144  * characters.
4145  */
4146  if (my_command->argc == 2 && my_command->argv[1][0] != ':')
4147  {
4148  char *c = my_command->argv[1];
4149 
4150  while (isdigit((unsigned char) *c))
4151  c++;
4152  if (*c)
4153  {
4154  my_command->argv[2] = c;
4155  offsets[2] = offsets[1] + (c - my_command->argv[1]);
4156  my_command->argc = 3;
4157  }
4158  }
4159 
4160  if (my_command->argc == 3)
4161  {
4162  if (pg_strcasecmp(my_command->argv[2], "us") != 0 &&
4163  pg_strcasecmp(my_command->argv[2], "ms") != 0 &&
4164  pg_strcasecmp(my_command->argv[2], "s") != 0)
4165  syntax_error(source, lineno, my_command->line, my_command->argv[0],
4166  "unrecognized time unit, must be us, ms or s",
4167  my_command->argv[2], offsets[2] - start_offset);
4168  }
4169  }
4170  else if (my_command->meta == META_SETSHELL)
4171  {
4172  if (my_command->argc < 3)
4173  syntax_error(source, lineno, my_command->line, my_command->argv[0],
4174  "missing argument", NULL, -1);
4175  }
4176  else if (my_command->meta == META_SHELL)
4177  {
4178  if (my_command->argc < 2)
4179  syntax_error(source, lineno, my_command->line, my_command->argv[0],
4180  "missing command", NULL, -1);
4181  }
4182  else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF)
4183  {
4184  if (my_command->argc != 1)
4185  syntax_error(source, lineno, my_command->line, my_command->argv[0],
4186  "unexpected argument", NULL, -1);
4187  }
4188  else
4189  {
4190  /* my_command->meta == META_NONE */
4191  syntax_error(source, lineno, my_command->line, my_command->argv[0],
4192  "invalid command", NULL, -1);
4193  }
4194 
4195  termPQExpBuffer(&word_buf);
4196 
4197  return my_command;
4198 }
4199 
4200 static void
4201 ConditionError(const char *desc, int cmdn, const char *msg)
4202 {
4203  fprintf(stderr,
4204  "condition error in script \"%s\" command %d: %s\n",
4205  desc, cmdn, msg);
4206  exit(1);
4207 }
4208 
4209 /*
4210  * Partial evaluation of conditionals before recording and running the script.
4211  */
4212 static void
4214 {
4215  /* statically check conditional structure */
4217  int i;
4218 
4219  for (i = 0; ps.commands[i] != NULL; i++)
4220  {
4221  Command *cmd = ps.commands[i];
4222 
4223  if (cmd->type == META_COMMAND)
4224  {
4225  switch (cmd->meta)
4226  {
4227  case META_IF:
4229  break;
4230  case META_ELIF:
4231  if (conditional_stack_empty(cs))
4232  ConditionError(ps.desc, i + 1, "\\elif without matching \\if");
4234  ConditionError(ps.desc, i + 1, "\\elif after \\else");
4235  break;
4236  case META_ELSE:
4237  if (conditional_stack_empty(cs))
4238  ConditionError(ps.desc, i + 1, "\\else without matching \\if");
4240  ConditionError(ps.desc, i + 1, "\\else after \\else");
4242  break;
4243  case META_ENDIF:
4244  if (!conditional_stack_pop(cs))
4245  ConditionError(ps.desc, i + 1, "\\endif without matching \\if");
4246  break;
4247  default:
4248  /* ignore anything else... */
4249  break;
4250  }
4251  }
4252  }
4253  if (!conditional_stack_empty(cs))
4254  ConditionError(ps.desc, i + 1, "\\if without matching \\endif");
4256 }
4257 
4258 /*
4259  * Parse a script (either the contents of a file, or a built-in script)
4260  * and add it to the list of scripts.
4261  */
4262 static void
4263 ParseScript(const char *script, const char *desc, int weight)
4264 {
4265  ParsedScript ps;
4266  PsqlScanState sstate;
4267  PQExpBufferData line_buf;
4268  int alloc_num;
4269  int index;
4270 
4271 #define COMMANDS_ALLOC_NUM 128
4272  alloc_num = COMMANDS_ALLOC_NUM;
4273 
4274  /* Initialize all fields of ps */
4275  ps.desc = desc;
4276  ps.weight = weight;
4277  ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
4278  initStats(&ps.stats, 0);
4279 
4280  /* Prepare to parse script */
4281  sstate = psql_scan_create(&pgbench_callbacks);
4282 
4283  /*
4284  * Ideally, we'd scan scripts using the encoding and stdstrings settings
4285  * we get from a DB connection. However, without major rearrangement of
4286  * pgbench's argument parsing, we can't have a DB connection at the time
4287  * we parse scripts. Using SQL_ASCII (encoding 0) should work well enough
4288  * with any backend-safe encoding, though conceivably we could be fooled
4289  * if a script file uses a client-only encoding. We also assume that
4290  * stdstrings should be true, which is a bit riskier.
4291  */
4292  psql_scan_setup(sstate, script, strlen(script), 0, true);
4293 
4294  initPQExpBuffer(&line_buf);
4295 
4296  index = 0;
4297 
4298  for (;;)
4299  {
4300  PsqlScanResult sr;
4301  promptStatus_t prompt;
4302  Command *command;
4303 
4304  resetPQExpBuffer(&line_buf);
4305 
4306  sr = psql_scan(sstate, &line_buf, &prompt);
4307 
4308  /* If we collected a SQL command, process that */
4309  command = process_sql_command(&line_buf, desc);
4310  if (command)
4311  {
4312  ps.commands[index] = command;
4313  index++;
4314 
4315  if (index >= alloc_num)
4316  {
4317  alloc_num += COMMANDS_ALLOC_NUM;
4318  ps.commands = (Command **)
4319  pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
4320  }
4321  }
4322 
4323  /* If we reached a backslash, process that */
4324  if (sr == PSCAN_BACKSLASH)
4325  {
4326  command = process_backslash_command(sstate, desc);
4327  if (command)
4328  {
4329  ps.commands[index] = command;
4330  index++;
4331 
4332  if (index >= alloc_num)
4333  {
4334  alloc_num += COMMANDS_ALLOC_NUM;
4335  ps.commands = (Command **)
4336  pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
4337  }
4338  }
4339  }
4340 
4341  /* Done if we reached EOF */
4342  if (sr == PSCAN_INCOMPLETE || sr == PSCAN_EOL)
4343  break;
4344  }
4345 
4346  ps.commands[index] = NULL;
4347 
4348  addScript(ps);
4349 
4350  termPQExpBuffer(&line_buf);
4351  psql_scan_finish(sstate);
4352  psql_scan_destroy(sstate);
4353 }
4354 
4355 /*
4356  * Read the entire contents of file fd, and return it in a malloc'd buffer.
4357  *
4358  * The buffer will typically be larger than necessary, but we don't care
4359  * in this program, because we'll free it as soon as we've parsed the script.
4360  */
4361 static char *
4363 {
4364  char *buf;
4365  size_t buflen = BUFSIZ;
4366  size_t used = 0;
4367 
4368  buf = (char *) pg_malloc(buflen);
4369 
4370  for (;;)
4371  {
4372  size_t nread;
4373 
4374  nread = fread(buf + used, 1, BUFSIZ, fd);
4375  used += nread;
4376  /* If fread() read less than requested, must be EOF or error */
4377  if (nread < BUFSIZ)
4378  break;
4379  /* Enlarge buf so we can read some more */
4380  buflen += BUFSIZ;
4381  buf = (char *) pg_realloc(buf, buflen);
4382  }
4383  /* There is surely room for a terminator */
4384  buf[used] = '\0';
4385 
4386  return buf;
4387 }
4388 
4389 /*
4390  * Given a file name, read it and add its script to the list.
4391  * "-" means to read stdin.
4392  * NB: filename must be storage that won't disappear.
4393  */
4394 static void
4395 process_file(const char *filename, int weight)
4396 {
4397  FILE *fd;
4398  char *buf;
4399 
4400  /* Slurp the file contents into "buf" */
4401  if (strcmp(filename, "-") == 0)
4402  fd = stdin;
4403  else if ((fd = fopen(filename, "r")) == NULL)
4404  {
4405  fprintf(stderr, "could not open file \"%s\": %s\n",
4406  filename, strerror(errno));
4407  exit(1);
4408  }
4409 
4410  buf = read_file_contents(fd);
4411 
4412  if (ferror(fd))
4413  {
4414  fprintf(stderr, "could not read file \"%s\": %s\n",
4415  filename, strerror(errno));
4416  exit(1);
4417  }
4418 
4419  if (fd != stdin)
4420  fclose(fd);
4421 
4422  ParseScript(buf, filename, weight);
4423 
4424  free(buf);
4425 }
4426 
4427 /* Parse the given builtin script and add it to the list. */
4428 static void
4429 process_builtin(const BuiltinScript *bi, int weight)
4430 {
4431  ParseScript(bi->script, bi->desc, weight);
4432 }
4433 
4434 /* show available builtin scripts */
4435 static void
4437 {
4438  int i;
4439 
4440  fprintf(stderr, "Available builtin scripts:\n");
4441  for (i = 0; i < lengthof(builtin_script); i++)
4442  fprintf(stderr, "\t%s\n", builtin_script[i].name);
4443  fprintf(stderr, "\n");
4444 }
4445 
4446 /* return builtin script "name" if unambiguous, fails if not found */
4447 static const BuiltinScript *
4448 findBuiltin(const char *name)
4449 {
4450  int i,
4451  found = 0,
4452  len = strlen(name);
4453  const BuiltinScript *result = NULL;
4454 
4455  for (i = 0; i < lengthof(builtin_script); i++)
4456  {
4457  if (strncmp(builtin_script[i].name, name, len) == 0)
4458  {
4459  result = &builtin_script[i];
4460  found++;
4461  }
4462  }
4463 
4464  /* ok, unambiguous result */
4465  if (found == 1)
4466  return result;
4467 
4468  /* error cases */
4469  if (found == 0)
4470  fprintf(stderr, "no builtin script found for name \"%s\"\n", name);
4471  else /* found > 1 */
4472  fprintf(stderr,
4473  "ambiguous builtin name: %d builtin scripts found for prefix \"%s\"\n", found, name);
4474 
4476  exit(1);
4477 }
4478 
4479 /*
4480  * Determine the weight specification from a script option (-b, -f), if any,
4481  * and return it as an integer (1 is returned if there's no weight). The
4482  * script name is returned in *script as a malloc'd string.
4483  */
4484 static int
4485 parseScriptWeight(const char *option, char **script)
4486 {
4487  char *sep;
4488  int weight;
4489 
4490  if ((sep = strrchr(option, WSEP)))
4491  {
4492  int namelen = sep - option;
4493  long wtmp;
4494  char *badp;
4495 
4496  /* generate the script name */
4497  *script = pg_malloc(namelen + 1);
4498  strncpy(*script, option, namelen);
4499  (*script)[namelen] = '\0';
4500 
4501  /* process digits of the weight spec */
4502  errno = 0;
4503  wtmp = strtol(sep + 1, &badp, 10);
4504  if (errno != 0 || badp == sep + 1 || *badp != '\0')
4505  {
4506  fprintf(stderr, "invalid weight specification: %s\n", sep);
4507  exit(1);
4508  }
4509  if (wtmp > INT_MAX || wtmp < 0)
4510  {
4511  fprintf(stderr,
4512  "weight specification out of range (0 .. %u): " INT64_FORMAT "\n",
4513  INT_MAX, (int64) wtmp);
4514  exit(1);
4515  }
4516  weight = wtmp;
4517  }
4518  else
4519  {
4520  *script = pg_strdup(option);
4521  weight = 1;
4522  }
4523 
4524  return weight;
4525 }
4526 
4527 /* append a script to the list of scripts to process */
4528 static void
4530 {
4531  if (script.commands == NULL || script.commands[0] == NULL)
4532  {
4533  fprintf(stderr, "empty command list for script \"%s\"\n", script.desc);
4534  exit(1);
4535  }
4536 
4537  if (num_scripts >= MAX_SCRIPTS)
4538  {
4539  fprintf(stderr, "at most %d SQL scripts are allowed\n", MAX_SCRIPTS);
4540  exit(1);
4541  }
4542 
4543  CheckConditional(script);
4544 
4545  sql_script[num_scripts] = script;
4546  num_scripts++;
4547 }
4548 
4549 static void
4550 printSimpleStats(const char *prefix, SimpleStats *ss)
4551 {
4552  if (ss->count > 0)
4553  {
4554  double latency = ss->sum / ss->count;
4555  double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
4556 
4557  printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
4558  printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
4559  }
4560 }
4561 
4562 /* print out results */
4563 static void
4564 printResults(TState *threads, StatsData *total, instr_time total_time,
4565  instr_time conn_total_time, int64 latency_late)
4566 {
4567  double time_include,
4568  tps_include,
4569  tps_exclude;
4570  int64 ntx = total->cnt - total->skipped;
4571  int i,
4572  totalCacheOverflows = 0;
4573 
4574  time_include = INSTR_TIME_GET_DOUBLE(total_time);
4575 
4576  /* tps is about actually executed transactions */
4577  tps_include = ntx / time_include;
4578  tps_exclude = ntx /
4579  (time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
4580 
4581  /* Report test parameters. */
4582  printf("transaction type: %s\n",
4583  num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
4584  printf("scaling factor: %d\n", scale);
4585  printf("query mode: %s\n", QUERYMODE[querymode]);
4586  printf("number of clients: %d\n", nclients);
4587  printf("number of threads: %d\n", nthreads);
4588  if (duration <= 0)
4589  {
4590  printf("number of transactions per client: %d\n", nxacts);
4591  printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
4592  ntx, nxacts * nclients);
4593  }
4594  else
4595  {
4596  printf("duration: %d s\n", duration);
4597  printf("number of transactions actually processed: " INT64_FORMAT "\n",
4598  ntx);
4599  }
4600  /* Report zipfian cache overflow */
4601  for (i = 0; i < nthreads; i++)
4602  {
4603  totalCacheOverflows += threads[i].zipf_cache.overflowCount;
4604  }
4605  if (totalCacheOverflows > 0)
4606  {
4607  printf("zipfian cache array overflowed %d time(s)\n", totalCacheOverflows);
4608  }
4609 
4610  /* Remaining stats are nonsensical if we failed to execute any xacts */
4611  if (total->cnt <= 0)
4612  return;
4613 
4614  if (throttle_delay && latency_limit)
4615  printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
4616  total->skipped,
4617  100.0 * total->skipped / total->cnt);
4618 
4619  if (latency_limit)
4620  printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT "/" INT64_FORMAT " (%.3f %%)\n",
4621  latency_limit / 1000.0, latency_late, ntx,
4622  (ntx > 0) ? 100.0 * latency_late / ntx : 0.0);
4623 
4624  if (throttle_delay || progress || latency_limit)
4625  printSimpleStats("latency", &total->latency);
4626  else
4627  {
4628  /* no measurement, show average latency computed from run time */
4629  printf("latency average = %.3f ms\n",
4630  1000.0 * time_include * nclients / total->cnt);
4631  }
4632 
4633  if (throttle_delay)
4634  {
4635  /*
4636  * Report average transaction lag under rate limit throttling. This
4637  * is the delay between scheduled and actual start times for the
4638  * transaction. The measured lag may be caused by thread/client load,
4639  * the database load, or the Poisson throttling process.
4640  */
4641  printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
4642  0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
4643  }
4644 
4645  printf("tps = %f (including connections establishing)\n", tps_include);
4646  printf("tps = %f (excluding connections establishing)\n", tps_exclude);
4647 
4648  /* Report per-script/command statistics */
4649  if (per_script_stats || is_latencies)
4650  {
4651  int i;
4652 
4653  for (i = 0; i < num_scripts; i++)
4654  {
4655  if (per_script_stats)
4656  {
4657  StatsData *sstats = &sql_script[i].stats;
4658 
4659  printf("SQL script %d: %s\n"
4660  " - weight: %d (targets %.1f%% of total)\n"
4661  " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n",
4662  i + 1, sql_script[i].desc,
4663  sql_script[i].weight,
4664  100.0 * sql_script[i].weight / total_weight,
4665  sstats->cnt,
4666  100.0 * sstats->cnt / total->cnt,
4667  (sstats->cnt - sstats->skipped) / time_include);
4668 
4669  if (throttle_delay && latency_limit && sstats->cnt > 0)
4670  printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
4671  sstats->skipped,
4672  100.0 * sstats->skipped / sstats->cnt);
4673 
4674  printSimpleStats(" - latency", &sstats->latency);
4675  }
4676 
4677  /* Report per-command latencies */
4678  if (is_latencies)
4679  {
4680  Command **commands;
4681 
4682  if (per_script_stats)
4683  printf(" - statement latencies in milliseconds:\n");
4684  else
4685  printf("statement latencies in milliseconds:\n");
4686 
4687  for (commands = sql_script[i].commands;
4688  *commands != NULL;
4689  commands++)
4690  {
4691  SimpleStats *cstats = &(*commands)->stats;
4692 
4693  printf(" %11.3f %s\n",
4694  (cstats->count > 0) ?
4695  1000.0 * cstats->sum / cstats->count : 0.0,
4696  (*commands)->line);
4697  }
4698  }
4699  }
4700  }
4701 }
4702 
4703 /* call srandom based on some seed. NULL triggers the default behavior. */
4704 static bool
4705 set_random_seed(const char *seed)
4706 {
4707  /* srandom expects an unsigned int */
4708  unsigned int iseed;
4709 
4710  if (seed == NULL || strcmp(seed, "time") == 0)
4711  {
4712  /* rely on current time */
4713  instr_time now;
4714 
4716  iseed = (unsigned int) INSTR_TIME_GET_MICROSEC(now);
4717  }
4718  else if (strcmp(seed, "rand") == 0)
4719  {
4720  /* use some "strong" random source */
4721 #ifdef HAVE_STRONG_RANDOM
4722  if (!pg_strong_random(&iseed, sizeof(iseed)))
4723 #endif
4724  {
4725  fprintf(stderr,
4726  "cannot seed random from a strong source, none available: "
4727  "use \"time\" or an unsigned integer value.\n");
4728  return false;
4729  }
4730  }
4731  else
4732  {
4733  /* parse seed unsigned int value */
4734  char garbage;
4735 
4736  if (sscanf(seed, "%u%c", &iseed, &garbage) != 1)
4737  {
4738  fprintf(stderr,
4739  "unrecognized random seed option \"%s\": expecting an unsigned integer, \"time\" or \"rand\"\n",
4740  seed);
4741  return false;
4742  }
4743  }
4744 
4745  if (seed != NULL)
4746  fprintf(stderr, "setting random seed to %u\n", iseed);
4747  srandom(iseed);
4748  /* no precision loss: 32 bit unsigned int cast to 64 bit int */
4749  random_seed = iseed;
4750  return true;
4751 }
4752 
4753 
4754 int
4755 main(int argc, char **argv)
4756 {
4757  static struct option long_options[] = {
4758  /* systematic long/short named options */
4759  {"builtin", required_argument, NULL, 'b'},
4760  {"client", required_argument, NULL, 'c'},
4761  {"connect", no_argument, NULL, 'C'},
4762  {"debug", no_argument, NULL, 'd'},
4763  {"define", required_argument, NULL, 'D'},
4764  {"file", required_argument, NULL, 'f'},
4765  {"fillfactor", required_argument, NULL, 'F'},
4766  {"host", required_argument, NULL, 'h'},
4767  {"initialize", no_argument, NULL, 'i'},
4768  {"init-steps", required_argument, NULL, 'I'},
4769  {"jobs", required_argument, NULL, 'j'},
4770  {"log", no_argument, NULL, 'l'},
4771  {"latency-limit", required_argument, NULL, 'L'},
4772  {"no-vacuum", no_argument, NULL, 'n'},
4773  {"port", required_argument, NULL, 'p'},
4774  {"progress", required_argument, NULL, 'P'},
4775  {"protocol", required_argument, NULL, 'M'},
4776  {"quiet", no_argument, NULL, 'q'},
4777  {"report-latencies", no_argument, NULL, 'r'},
4778  {"rate", required_argument, NULL, 'R'},
4779  {"scale", required_argument, NULL, 's'},
4780  {"select-only", no_argument, NULL, 'S'},
4781  {"skip-some-updates", no_argument, NULL, 'N'},
4782  {"time", required_argument, NULL, 'T'},
4783  {"transactions", required_argument, NULL, 't'},
4784  {"username", required_argument, NULL, 'U'},
4785  {"vacuum-all", no_argument, NULL, 'v'},
4786  /* long-named only options */
4787  {"unlogged-tables", no_argument, NULL, 1},
4788  {"tablespace", required_argument, NULL, 2},
4789  {"index-tablespace", required_argument, NULL, 3},
4790  {"sampling-rate", required_argument, NULL, 4},
4791  {"aggregate-interval", required_argument, NULL, 5},
4792  {"progress-timestamp", no_argument, NULL, 6},
4793  {"log-prefix", required_argument, NULL, 7},
4794  {"foreign-keys", no_argument, NULL, 8},
4795  {"random-seed", required_argument, NULL, 9},
4796  {NULL, 0, NULL, 0}
4797  };
4798 
4799  int c;
4800  bool is_init_mode = false; /* initialize mode? */
4801  char *initialize_steps = NULL;
4802  bool foreign_keys = false;
4803  bool is_no_vacuum = false;
4804  bool do_vacuum_accounts = false; /* vacuum accounts table? */
4805  int optindex;
4806  bool scale_given = false;
4807 
4808  bool benchmarking_option_set = false;
4809  bool initialization_option_set = false;
4810  bool internal_script_used = false;
4811 
4812  CState *state; /* status of clients */
4813  TState *threads; /* array of thread */
4814 
4815  instr_time start_time; /* start up time */
4816  instr_time total_time;
4817  instr_time conn_total_time;
4818  int64 latency_late = 0;
4819  StatsData stats;
4820  int weight;
4821 
4822  int i;
4823  int nclients_dealt;
4824 
4825 #ifdef HAVE_GETRLIMIT
4826  struct rlimit rlim;
4827 #endif
4828 
4829  PGconn *con;
4830  PGresult *res;
4831  char *env;
4832 
4833  progname = get_progname(argv[0]);
4834 
4835  if (argc > 1)
4836  {
4837  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
4838  {
4839  usage();
4840  exit(0);
4841  }
4842  if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
4843  {
4844  puts("pgbench (PostgreSQL) " PG_VERSION);
4845  exit(0);
4846  }
4847  }
4848 
4849 #ifdef WIN32
4850  /* stderr is buffered on Win32. */
4851  setvbuf(stderr, NULL, _IONBF, 0);
4852 #endif
4853 
4854  if ((env = getenv("PGHOST")) != NULL && *env != '\0')
4855  pghost = env;
4856  if ((env = getenv("PGPORT")) != NULL && *env != '\0')
4857  pgport = env;
4858  else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
4859  login = env;
4860 
4861  state = (CState *) pg_malloc(sizeof(CState));
4862  memset(state, 0, sizeof(CState));
4863 
4864  /* set random seed early, because it may be used while parsing scripts. */
4865  if (!set_random_seed(getenv("PGBENCH_RANDOM_SEED")))
4866  {
4867  fprintf(stderr, "error while setting random seed from PGBENCH_RANDOM_SEED environment variable\n");
4868  exit(1);
4869  }
4870 
4871  while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
4872  {
4873  char *script;
4874 
4875  switch (c)
4876  {
4877  case 'i':
4878  is_init_mode = true;
4879  break;
4880  case 'I':
4881  if (initialize_steps)
4882  pg_free(initialize_steps);
4883  initialize_steps = pg_strdup(optarg);
4884  checkInitSteps(initialize_steps);
4885  initialization_option_set = true;
4886  break;
4887  case 'h':
4888  pghost = pg_strdup(optarg);
4889  break;
4890  case 'n':
4891  is_no_vacuum = true;
4892  break;
4893  case 'v':
4894  benchmarking_option_set = true;
4895  do_vacuum_accounts = true;
4896  break;
4897  case 'p':
4898  pgport = pg_strdup(optarg);
4899  break;
4900  case 'd':
4901  debug++;
4902  break;
4903  case 'c':
4904  benchmarking_option_set = true;
4905  nclients = atoi(optarg);
4906  if (nclients <= 0 || nclients > MAXCLIENTS)
4907  {
4908  fprintf(stderr, "invalid number of clients: \"%s\"\n",
4909  optarg);
4910  exit(1);
4911  }
4912 #ifdef HAVE_GETRLIMIT
4913 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
4914  if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
4915 #else /* but BSD doesn't ... */
4916  if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
4917 #endif /* RLIMIT_NOFILE */
4918  {
4919  fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
4920  exit(1);
4921  }
4922  if (rlim.rlim_cur < nclients + 3)
4923  {
4924  fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
4925  nclients + 3, (long) rlim.rlim_cur);
4926  fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
4927  exit(1);
4928  }
4929 #endif /* HAVE_GETRLIMIT */
4930  break;
4931  case 'j': /* jobs */
4932  benchmarking_option_set = true;
4933  nthreads = atoi(optarg);
4934  if (nthreads <= 0)
4935  {
4936  fprintf(stderr, "invalid number of threads: \"%s\"\n",
4937  optarg);
4938  exit(1);
4939  }
4940 #ifndef ENABLE_THREAD_SAFETY
4941  if (nthreads != 1)
4942  {
4943  fprintf(stderr, "threads are not supported on this platform; use -j1\n");
4944  exit(1);
4945  }
4946 #endif /* !ENABLE_THREAD_SAFETY */
4947  break;
4948  case 'C':
4949  benchmarking_option_set = true;
4950  is_connect = true;
4951  break;
4952  case 'r':
4953  benchmarking_option_set = true;
4954  is_latencies = true;
4955  break;
4956  case 's':
4957  scale_given = true;
4958  scale = atoi(optarg);
4959  if (scale <= 0)
4960  {
4961  fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
4962  exit(1);
4963  }
4964  break;
4965  case 't':
4966  benchmarking_option_set = true;
4967  nxacts = atoi(optarg);
4968  if (nxacts <= 0)
4969  {
4970  fprintf(stderr, "invalid number of transactions: \"%s\"\n",
4971  optarg);
4972  exit(1);
4973  }
4974  break;
4975  case 'T':
4976  benchmarking_option_set = true;
4977  duration = atoi(optarg);
4978  if (duration <= 0)
4979  {
4980  fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
4981  exit(1);
4982  }
4983  break;
4984  case 'U':
4985  login = pg_strdup(optarg);
4986  break;
4987  case 'l':
4988  benchmarking_option_set = true;
4989  use_log = true;
4990  break;
4991  case 'q':
4992  initialization_option_set = true;
4993  use_quiet = true;
4994  break;
4995  case 'b':
4996  if (strcmp(optarg, "list") == 0)
4997  {
4999  exit(0);
5000  }
5001  weight = parseScriptWeight(optarg, &script);
5002  process_builtin(findBuiltin(script), weight);
5003  benchmarking_option_set = true;
5004  internal_script_used = true;
5005  break;
5006  case 'S':
5007  process_builtin(findBuiltin("select-only"), 1);
5008  benchmarking_option_set = true;
5009  internal_script_used = true;
5010  break;
5011  case 'N':
5012  process_builtin(findBuiltin("simple-update"), 1);
5013  benchmarking_option_set = true;
5014  internal_script_used = true;
5015  break;
5016  case 'f':
5017  weight = parseScriptWeight(optarg, &script);
5018  process_file(script, weight);
5019  benchmarking_option_set = true;
5020  break;
5021  case 'D':
5022  {
5023  char *p;
5024 
5025  benchmarking_option_set = true;
5026 
5027  if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
5028  {
5029  fprintf(stderr, "invalid variable definition: \"%s\"\n",
5030  optarg);
5031  exit(1);
5032  }
5033 
5034  *p++ = '\0';
5035  if (!putVariable(&state[0], "option", optarg, p))
5036  exit(1);
5037  }
5038  break;
5039  case 'F':
5040  initialization_option_set = true;
5041  fillfactor = atoi(optarg);
5042  if (fillfactor < 10 || fillfactor > 100)
5043  {
5044  fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
5045  exit(1);
5046  }
5047  break;
5048  case 'M':
5049  benchmarking_option_set = true;
5050  for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
5051  if (strcmp(optarg, QUERYMODE[querymode]) == 0)
5052  break;
5053  if (querymode >= NUM_QUERYMODE)
5054  {
5055  fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
5056  optarg);
5057  exit(1);
5058  }
5059  break;
5060  case 'P':
5061  benchmarking_option_set = true;
5062  progress = atoi(optarg);
5063  if (progress <= 0)
5064  {
5065  fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
5066  optarg);
5067  exit(1);
5068  }
5069  break;
5070  case 'R':
5071  {
5072  /* get a double from the beginning of option value */
5073  double throttle_value = atof(optarg);
5074 
5075  benchmarking_option_set = true;
5076 
5077  if (throttle_value <= 0.0)
5078  {
5079  fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
5080  exit(1);
5081  }
5082  /* Invert rate limit into a time offset */
5083  throttle_delay = (int64) (1000000.0 / throttle_value);
5084  }
5085  break;
5086  case 'L':
5087  {
5088  double limit_ms = atof(optarg);
5089 
5090  if (limit_ms <= 0.0)
5091  {
5092  fprintf(stderr, "invalid latency limit: \"%s\"\n",
5093  optarg);
5094  exit(1);
5095  }
5096  benchmarking_option_set = true;
5097  latency_limit = (int64) (limit_ms * 1000);
5098  }
5099  break;
5100  case 1: /* unlogged-tables */
5101  initialization_option_set = true;
5102  unlogged_tables = true;
5103  break;
5104  case 2: /* tablespace */
5105  initialization_option_set = true;
5106  tablespace = pg_strdup(optarg);
5107  break;
5108  case 3: /* index-tablespace */
5109  initialization_option_set = true;
5110  index_tablespace = pg_strdup(optarg);
5111  break;
5112  case 4: /* sampling-rate */
5113  benchmarking_option_set = true;
5114  sample_rate = atof(optarg);
5115  if (sample_rate <= 0.0 || sample_rate > 1.0)
5116  {
5117  fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
5118  exit(1);
5119  }
5120  break;
5121  case 5: /* aggregate-interval */
5122  benchmarking_option_set = true;
5123  agg_interval = atoi(optarg);
5124  if (agg_interval <= 0)
5125  {
5126  fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
5127  optarg);
5128  exit(1);
5129  }
5130  break;
5131  case 6: /* progress-timestamp */
5132  progress_timestamp = true;
5133  benchmarking_option_set = true;
5134  break;
5135  case 7: /* log-prefix */
5136  benchmarking_option_set = true;
5137  logfile_prefix = pg_strdup(optarg);
5138  break;
5139  case 8: /* foreign-keys */
5140  initialization_option_set = true;
5141  foreign_keys = true;
5142  break;
5143  case 9: /* random-seed */
5144  benchmarking_option_set = true;
5145  if (!set_random_seed(optarg))
5146  {
5147  fprintf(stderr, "error while setting random seed from --random-seed option\n");
5148  exit(1);
5149  }
5150  break;
5151  default:
5152  fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
5153  exit(1);
5154  break;
5155  }
5156  }
5157 
5158  /* set default script if none */
5159  if (num_scripts == 0 && !is_init_mode)
5160  {
5161  process_builtin(findBuiltin("tpcb-like"), 1);
5162  benchmarking_option_set = true;
5163  internal_script_used = true;
5164  }
5165 
5166  /* if not simple query mode, parse the script(s) to find parameters */
5167  if (querymode != QUERY_SIMPLE)
5168  {
5169  for (i = 0; i < num_scripts; i++)
5170  {
5171  Command **commands = sql_script[i].commands;
5172  int j;
5173 
5174  for (j = 0; commands[j] != NULL; j++)
5175  {
5176  if (commands[j]->type != SQL_COMMAND)
5177  continue;
5178  if (!parseQuery(commands[j]))
5179  exit(1);
5180  }
5181  }
5182  }
5183 
5184  /* compute total_weight */
5185  for (i = 0; i < num_scripts; i++)
5186  /* cannot overflow: weight is 32b, total_weight 64b */
5187  total_weight += sql_script[i].weight;
5188 
5189  if (total_weight == 0 && !is_init_mode)
5190  {
5191  fprintf(stderr, "total script weight must not be zero\n");
5192  exit(1);
5193  }
5194 
5195  /* show per script stats if several scripts are used */
5196  if (num_scripts > 1)
5197  per_script_stats = true;
5198 
5199  /*
5200  * Don't need more threads than there are clients. (This is not merely an
5201  * optimization; throttle_delay is calculated incorrectly below if some
5202  * threads have no clients assigned to them.)
5203  */
5204  if (nthreads > nclients)
5205  nthreads = nclients;
5206 
5207  /* compute a per thread delay */
5208  throttle_delay *= nthreads;
5209 
5210  if (argc > optind)
5211  dbName = argv[optind];
5212  else
5213  {
5214  if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
5215  dbName = env;
5216  else if (login != NULL && *login != '\0')
5217  dbName = login;
5218  else
5219  dbName = "";
5220  }
5221 
5222  if (is_init_mode)
5223  {
5224  if (benchmarking_option_set)
5225  {
5226  fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
5227  exit(1);
5228  }
5229 
5230  if (initialize_steps == NULL)
5231  initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
5232 
5233  if (is_no_vacuum)
5234  {
5235  /* Remove any vacuum step in initialize_steps */
5236  char *p;
5237 
5238  while ((p = strchr(initialize_steps, 'v')) != NULL)
5239  *p = ' ';
5240  }
5241 
5242  if (foreign_keys)
5243  {
5244  /* Add 'f' to end of initialize_steps, if not already there */
5245  if (strchr(initialize_steps, 'f') == NULL)
5246  {
5247  initialize_steps = (char *)
5248  pg_realloc(initialize_steps,
5249  strlen(initialize_steps) + 2);
5250  strcat(initialize_steps, "f");
5251  }
5252  }
5253 
5254  runInitSteps(initialize_steps);
5255  exit(0);
5256  }
5257  else
5258  {
5259  if (initialization_option_set)
5260  {
5261  fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
5262  exit(1);
5263  }
5264  }
5265 
5266  if (nxacts > 0 && duration > 0)
5267  {
5268  fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
5269  exit(1);
5270  }
5271 
5272  /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
5273  if (nxacts <= 0 && duration <= 0)
5274  nxacts = DEFAULT_NXACTS;
5275 
5276  /* --sampling-rate may be used only with -l */
5277  if (sample_rate > 0.0 && !use_log)
5278  {
5279  fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
5280  exit(1);
5281  }
5282 
5283  /* --sampling-rate may not be used with --aggregate-interval */
5284  if (sample_rate > 0.0 && agg_interval > 0)
5285  {
5286  fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
5287  exit(1);
5288  }
5289 
5290  if (agg_interval > 0 && !use_log)
5291  {
5292  fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
5293  exit(1);
5294  }
5295 
5296  if (!use_log && logfile_prefix)
5297  {
5298  fprintf(stderr, "log file prefix (--log-prefix) is allowed only when logging transactions (-l)\n");
5299  exit(1);
5300  }
5301 
5302  if (duration > 0 && agg_interval > duration)
5303  {
5304  fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
5305  exit(1);
5306  }
5307 
5308  if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
5309  {
5310  fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
5311  exit(1);
5312  }
5313 
5314  if (progress_timestamp && progress == 0)
5315  {
5316  fprintf(stderr, "--progress-timestamp is allowed only under --progress\n");
5317  exit(1);
5318  }
5319 
5320  /*
5321  * save main process id in the global variable because process id will be
5322  * changed after fork.
5323  */
5324  main_pid = (int) getpid();
5325 
5326  if (nclients > 1)
5327  {
5328  state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
5329  memset(state + 1, 0, sizeof(CState) * (nclients - 1));
5330 
5331  /* copy any -D switch values to all clients */
5332  for (i = 1; i < nclients; i++)
5333  {
5334  int j;
5335 
5336  state[i].id = i;
5337  for (j = 0; j < state[0].nvariables; j++)
5338  {
5339  Variable *var = &state[0].variables[j];
5340 
5341  if (var->value.type != PGBT_NO_VALUE)
5342  {
5343  if (!putVariableValue(&state[i], "startup",
5344  var->name, &var->value))
5345  exit(1);
5346  }
5347  else
5348  {
5349  if (!putVariable(&state[i], "startup",
5350  var->name, var->svalue))
5351  exit(1);
5352  }
5353  }
5354  }
5355  }
5356 
5357  /* other CState initializations */
5358  for (i = 0; i < nclients; i++)
5359  {
5360  state[i].cstack = conditional_stack_create();
5361  }
5362 
5363  if (debug)
5364  {
5365  if (duration <= 0)
5366  printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
5367  pghost, pgport, nclients, nxacts, dbName);
5368  else
5369  printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
5370  pghost, pgport, nclients, duration, dbName);
5371  }
5372 
5373  /* opening connection... */
5374  con = doConnect();
5375  if (con == NULL)
5376  exit(1);
5377 
5378  if (PQstatus(con) == CONNECTION_BAD)
5379  {
5380  fprintf(stderr, "connection to database \"%s\" failed\n", dbName);
5381  fprintf(stderr, "%s", PQerrorMessage(con));
5382  exit(1);
5383  }
5384 
5385  if (internal_script_used)
5386  {
5387  /*
5388  * get the scaling factor that should be same as count(*) from
5389  * pgbench_branches if this is not a custom query
5390  */
5391  res = PQexec(con, "select count(*) from pgbench_branches");
5392  if (PQresultStatus(res) != PGRES_TUPLES_OK)
5393  {
5394  char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
5395 
5396  fprintf(stderr, "%s", PQerrorMessage(con));
5397  if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
5398  {
5399  fprintf(stderr, "Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"\n", PQdb(con));
5400  }
5401 
5402  exit(1);
5403  }
5404  scale = atoi(PQgetvalue(res, 0, 0));
5405  if (scale < 0)
5406  {
5407  fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
5408  PQgetvalue(res, 0, 0));
5409  exit(1);
5410  }
5411  PQclear(res);
5412 
5413  /* warn if we override user-given -s switch */
5414  if (scale_given)
5415  fprintf(stderr,
5416  "scale option ignored, using count from pgbench_branches table (%d)\n",
5417  scale);
5418  }
5419 
5420  /*
5421  * :scale variables normally get -s or database scale, but don't override
5422  * an explicit -D switch
5423  */
5424  if (lookupVariable(&state[0], "scale") == NULL)
5425  {
5426  for (i = 0; i < nclients; i++)
5427  {
5428  if (!putVariableInt(&state[i], "startup", "scale", scale))
5429  exit(1);
5430  }
5431  }
5432 
5433  /*
5434  * Define a :client_id variable that is unique per connection. But don't
5435  * override an explicit -D switch.
5436  */
5437  if (lookupVariable(&state[0], "client_id") == NULL)
5438  {
5439  for (i = 0; i < nclients; i++)
5440  if (!putVariableInt(&state[i], "startup", "client_id", i))
5441  exit(1);
5442  }
5443 
5444  /* set default seed for hash functions */
5445  if (lookupVariable(&state[0], "default_seed") == NULL)
5446  {
5447  uint64 seed = ((uint64) (random() & 0xFFFF) << 48) |
5448  ((uint64) (random() & 0xFFFF) << 32) |
5449  ((uint64) (random() & 0xFFFF) << 16) |
5450  (uint64) (random() & 0xFFFF);
5451 
5452  for (i = 0; i < nclients; i++)
5453  if (!putVariableInt(&state[i], "startup", "default_seed", (int64) seed))
5454  exit(1);
5455  }
5456 
5457  /* set random seed unless overwritten */
5458  if (lookupVariable(&state[0], "random_seed") == NULL)
5459  {
5460  for (i = 0; i < nclients; i++)
5461  if (!putVariableInt(&state[i], "startup", "random_seed", random_seed))
5462  exit(1);
5463  }
5464 
5465  if (!is_no_vacuum)
5466  {
5467  fprintf(stderr, "starting vacuum...");
5468  tryExecuteStatement(con, "vacuum pgbench_branches");
5469  tryExecuteStatement(con, "vacuum pgbench_tellers");
5470  tryExecuteStatement(con, "truncate pgbench_history");
5471  fprintf(stderr, "end.\n");
5472 
5473  if (do_vacuum_accounts)
5474  {
5475  fprintf(stderr, "starting vacuum pgbench_accounts...");
5476  tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
5477  fprintf(stderr, "end.\n");
5478  }
5479  }
5480  PQfinish(con);
5481 
5482  /* set up thread data structures */
5483  threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
5484  nclients_dealt = 0;
5485 
5486  for (i = 0; i < nthreads; i++)
5487  {
5488  TState *thread = &threads[i];
5489 
5490  thread->tid = i;
5491  thread->state = &state[nclients_dealt];
5492  thread->nstate =
5493  (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
5494  thread->random_state[0] = random();
5495  thread->random_state[1] = random();
5496  thread->random_state[2] = random();
5497  thread->logfile = NULL; /* filled in later */
5498  thread->latency_late = 0;
5499  thread->zipf_cache.nb_cells = 0;
5500  thread->zipf_cache.current = 0;
5501  thread->zipf_cache.overflowCount = 0;
5502  initStats(&thread->stats, 0);
5503 
5504  nclients_dealt += thread->nstate;
5505  }
5506 
5507  /* all clients must be assigned to a thread */
5508  Assert(nclients_dealt == nclients);
5509 
5510  /* get start up time */
5511  INSTR_TIME_SET_CURRENT(start_time);
5512 
5513  /* set alarm if duration is specified. */
5514  if (duration > 0)
5515  setalarm(duration);
5516 
5517  /* start threads */
5518 #ifdef ENABLE_THREAD_SAFETY
5519  for (i = 0; i < nthreads; i++)
5520  {
5521  TState *thread = &threads[i];
5522 
5524 
5525  /* compute when to stop */
5526  if (duration > 0)
5527  end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
5528  (int64) 1000000 * duration;
5529 
5530  /* the first thread (i = 0) is executed by main thread */
5531  if (i > 0)
5532  {
5533  int err = pthread_create(&thread->thread, NULL, threadRun, thread);
5534 
5535  if (err != 0 || thread->thread == INVALID_THREAD)
5536  {
5537  fprintf(stderr, "could not create thread: %s\n", strerror(err));
5538  exit(1);
5539  }
5540  }
5541  else
5542  {
5543  thread->thread = INVALID_THREAD;
5544  }
5545  }
5546 #else
5547  INSTR_TIME_SET_CURRENT(threads[0].start_time);
5548  /* compute when to stop */
5549  if (duration > 0)
5550  end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
5551  (int64) 1000000 * duration;
5552  threads[0].thread = INVALID_THREAD;
5553 #endif /* ENABLE_THREAD_SAFETY */
5554 
5555  /* wait for threads and accumulate results */
5556  initStats(&stats, 0);
5557  INSTR_TIME_SET_ZERO(conn_total_time);
5558  for (i = 0; i < nthreads; i++)
5559  {
5560  TState *thread = &threads[i];
5561 
5562 #ifdef ENABLE_THREAD_SAFETY
5563  if (threads[i].thread == INVALID_THREAD)
5564  /* actually run this thread directly in the main thread */
5565  (void) threadRun(thread);
5566  else
5567  /* wait of other threads. should check that 0 is returned? */
5568  pthread_join(thread->thread, NULL);
5569 #else
5570  (void) threadRun(thread);
5571 #endif /* ENABLE_THREAD_SAFETY */
5572 
5573  /* aggregate thread level stats */
5574  mergeSimpleStats(&stats.latency, &thread->stats.latency);
5575  mergeSimpleStats(&stats.lag, &thread->stats.lag);
5576  stats.cnt += thread->stats.cnt;
5577  stats.skipped += thread->stats.skipped;
5578  latency_late += thread->latency_late;
5579  INSTR_TIME_ADD(conn_total_time, thread->conn_time);
5580  }
5581  disconnect_all(state, nclients);
5582 
5583  /*
5584  * XXX We compute results as though every client of every thread started
5585  * and finished at the same time. That model can diverge noticeably from
5586  * reality for a short benchmark run involving relatively many threads.
5587  * The first thread may process notably many transactions before the last
5588  * thread begins. Improving the model alone would bring limited benefit,
5589  * because performance during those periods of partial thread count can
5590  * easily exceed steady state performance. This is one of the many ways
5591  * short runs convey deceptive performance figures.
5592  */
5593  INSTR_TIME_SET_CURRENT(total_time);
5594  INSTR_TIME_SUBTRACT(total_time, start_time);
5595  printResults(threads, &stats, total_time, conn_total_time, latency_late);
5596 
5597  return 0;
5598 }
5599 
5600 static void *
5601 threadRun(void *arg)
5602 {
5603  TState *thread = (TState *) arg;
5604  CState *state = thread->state;
5605  instr_time start,
5606  end;
5607  int nstate = thread->nstate;
5608  int remains = nstate; /* number of remaining clients */
5609  int i;
5610 
5611  /* for reporting progress: */
5612  int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
5613  int64 last_report = thread_start;
5614  int64 next_report = last_report + (int64) progress * 1000000;
5615  StatsData last,
5616  aggs;
5617 
5618  /*
5619  * Initialize throttling rate target for all of the thread's clients. It
5620  * might be a little more accurate to reset thread->start_time here too.
5621  * The possible drift seems too small relative to typical throttle delay
5622  * times to worry about it.
5623  */
5624  INSTR_TIME_SET_CURRENT(start);
5625  thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
5626 
5627  INSTR_TIME_SET_ZERO(thread->conn_time);
5628 
5629  initStats(&aggs, time(NULL));
5630  last = aggs;
5631 
5632  /* open log file if requested */
5633  if (use_log)
5634  {
5635  char logpath[MAXPGPATH];
5636  char *prefix = logfile_prefix ? logfile_prefix : "pgbench_log";
5637 
5638  if (thread->tid == 0)
5639  snprintf(logpath, sizeof(logpath), "%s.%d", prefix, main_pid);
5640  else
5641  snprintf(logpath, sizeof(logpath), "%s.%d.%d", prefix, main_pid, thread->tid);
5642 
5643  thread->logfile = fopen(logpath, "w");
5644 
5645  if (thread->logfile == NULL)
5646  {
5647  fprintf(stderr, "could not open logfile \"%s\": %s\n",
5648  logpath, strerror(errno));
5649  goto done;
5650  }
5651  }
5652 
5653  if (!is_connect)
5654  {
5655  /* make connections to the database */
5656  for (i = 0; i < nstate; i++)
5657  {
5658  if ((state[i].con = doConnect()) == NULL)
5659  goto done;
5660  }
5661  }
5662 
5663  /* time after thread and connections set up */
5665  INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
5666 
5667  /* explicitly initialize the state machines */
5668  for (i = 0; i < nstate; i++)
5669  {
5670  state[i].state = CSTATE_CHOOSE_SCRIPT;
5671  }
5672 
5673  /* loop till all clients have terminated */
5674  while (remains > 0)
5675  {
5676  fd_set input_mask;
5677  int maxsock; /* max socket number to be waited for */
5678  int64 min_usec;
5679  int64 now_usec = 0; /* set this only if needed */
5680 
5681  /* identify which client sockets should be checked for input */
5682  FD_ZERO(&input_mask);
5683  maxsock = -1;
5684  min_usec = PG_INT64_MAX;
5685  for (i = 0; i < nstate; i++)
5686  {
5687  CState *st = &state[i];
5688 
5689  if (st->state == CSTATE_THROTTLE && timer_exceeded)
5690  {
5691  /* interrupt client that has not started a transaction */
5692  st->state = CSTATE_FINISHED;
5693  finishCon(st);
5694  remains--;
5695  }
5696  else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
5697  {
5698  /* a nap from the script, or under throttling */
5699  int64 this_usec;
5700 
5701  /* get current time if needed */
5702  if (now_usec == 0)
5703  {
5704  instr_time now;
5705 
5707  now_usec = INSTR_TIME_GET_MICROSEC(now);
5708  }
5709 
5710  /* min_usec should be the minimum delay across all clients */
5711  this_usec = (st->state == CSTATE_SLEEP ?
5712  st->sleep_until : st->txn_scheduled) - now_usec;
5713  if (min_usec > this_usec)
5714  min_usec = this_usec;
5715  }
5716  else if (st->state == CSTATE_WAIT_RESULT)
5717  {
5718  /*
5719  * waiting for result from server - nothing to do unless the
5720  * socket is readable
5721  */
5722  int sock = PQsocket(st->con);
5723 
5724  if (sock < 0)
5725  {
5726  fprintf(stderr, "invalid socket: %s",
5727  PQerrorMessage(st->con));
5728  goto done;
5729  }
5730 
5731  FD_SET(sock, &input_mask);
5732  if (maxsock < sock)
5733  maxsock = sock;
5734  }
5735  else if (st->state != CSTATE_ABORTED &&
5736  st->state != CSTATE_FINISHED)
5737  {
5738  /*
5739  * This client thread is ready to do something, so we don't
5740  * want to wait. No need to examine additional clients.
5741  */
5742  min_usec = 0;
5743  break;
5744  }
5745  }
5746 
5747  /* also wake up to print the next progress report on time */
5748  if (progress && min_usec > 0 && thread->tid == 0)
5749  {
5750  /* get current time if needed */
5751  if (now_usec == 0)
5752  {
5753  instr_time now;
5754 
5756  now_usec = INSTR_TIME_GET_MICROSEC(now);
5757  }
5758 
5759  if (now_usec >= next_report)
5760  min_usec = 0;
5761  else if ((next_report - now_usec) < min_usec)
5762  min_usec = next_report - now_usec;
5763  }
5764 
5765  /*
5766  * If no clients are ready to execute actions, sleep until we receive
5767  * data from the server, or a nap-time specified in the script ends,
5768  * or it's time to print a progress report. Update input_mask to show
5769  * which client(s) received data.
5770  */
5771  if (min_usec > 0)
5772  {
5773  int nsocks = 0; /* return from select(2) if called */
5774 
5775  if (min_usec != PG_INT64_MAX)
5776  {
5777  if (maxsock != -1)
5778  {
5779  struct timeval timeout;
5780 
5781  timeout.tv_sec = min_usec / 1000000;
5782  timeout.tv_usec = min_usec % 1000000;
5783  nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
5784  }
5785  else /* nothing active, simple sleep */
5786  {
5787  pg_usleep(min_usec);
5788  }
5789  }
5790  else /* no explicit delay, select without timeout */
5791  {
5792  nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
5793  }
5794 
5795  if (nsocks < 0)
5796  {
5797  if (errno == EINTR)
5798  {
5799  /* On EINTR, go back to top of loop */
5800  continue;
5801  }
5802  /* must be something wrong */
5803  fprintf(stderr, "select() failed: %s\n", strerror(errno));
5804  goto done;
5805  }
5806  }
5807  else
5808  {
5809  /* min_usec == 0, i.e. something needs to be executed */
5810 
5811  /* If we didn't call select(), don't try to read any data */
5812  FD_ZERO(&input_mask);
5813  }
5814 
5815  /* ok, advance the state machine of each connection */
5816  for (i = 0; i < nstate; i++)
5817  {
5818  CState *st = &state[i];
5819 
5820  if (st->state == CSTATE_WAIT_RESULT)
5821  {
5822  /* don't call doCustom unless data is available */
5823  int sock = PQsocket(st->con);
5824 
5825  if (sock < 0)
5826  {
5827  fprintf(stderr, "invalid socket: %s",
5828  PQerrorMessage(st->con));
5829  goto done;
5830  }
5831 
5832  if (!FD_ISSET(sock, &input_mask))
5833  continue;
5834  }
5835  else if (st->state == CSTATE_FINISHED ||
5836  st->state == CSTATE_ABORTED)
5837  {
5838  /* this client is done, no need to consider it anymore */
5839  continue;
5840  }
5841 
5842  doCustom(thread, st, &aggs);
5843 
5844  /* If doCustom changed client to finished state, reduce remains */
5845  if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
5846  remains--;
5847  }
5848 
5849  /* progress report is made by thread 0 for all threads */
5850  if (progress && thread->tid == 0)
5851  {
5852  instr_time now_time;
5853  int64 now;
5854 
5855  INSTR_TIME_SET_CURRENT(now_time);
5856  now = INSTR_TIME_GET_MICROSEC(now_time);
5857  if (now >= next_report)
5858  {
5859  /* generate and show report */
5860  StatsData cur;
5861  int64 run = now - last_report,
5862  ntx;
5863  double tps,
5864  total_run,
5865  latency,
5866  sqlat,
5867  lag,
5868  stdev;
5869  char tbuf[315];
5870 
5871  /*
5872  * Add up the statistics of all threads.
5873  *
5874  * XXX: No locking. There is no guarantee that we get an
5875  * atomic snapshot of the transaction count and latencies, so
5876  * these figures can well be off by a small amount. The
5877  * progress report's purpose is to give a quick overview of
5878  * how the test is going, so that shouldn't matter too much.
5879  * (If a read from a 64-bit integer is not atomic, you might
5880  * get a "torn" read and completely bogus latencies though!)
5881  */
5882  initStats(&cur, 0);
5883  for (i = 0; i < nthreads; i++)
5884  {
5885  mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
5886  mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
5887  cur.cnt += thread[i].stats.cnt;
5888  cur.skipped += thread[i].stats.skipped;
5889  }
5890 
5891  /* we count only actually executed transactions */
5892  ntx = (cur.cnt - cur.skipped) - (last.cnt - last.skipped);
5893  total_run = (now - thread_start) / 1000000.0;
5894  tps = 1000000.0 * ntx / run;
5895  if (ntx > 0)
5896  {
5897  latency = 0.001 * (cur.latency.sum - last.latency.sum) / ntx;
5898  sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2) / ntx;
5899  stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
5900  lag = 0.001 * (cur.lag.sum - last.lag.sum) / ntx;
5901  }
5902  else
5903  {
5904  latency = sqlat = stdev = lag = 0;
5905  }
5906 
5907  if (progress_timestamp)
5908  {
5909  /*
5910  * On some platforms the current system timestamp is
5911  * available in now_time, but rather than get entangled
5912  * with that, we just eat the cost of an extra syscall in
5913  * all cases.
5914  */
5915  struct timeval tv;
5916 
5917  gettimeofday(&tv, NULL);
5918  snprintf(tbuf, sizeof(tbuf), "%ld.%03ld s",
5919  (long) tv.tv_sec, (long) (tv.tv_usec / 1000));
5920  }
5921  else
5922  {
5923  /* round seconds are expected, but the thread may be late */
5924  snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run);
5925  }
5926 
5927  fprintf(stderr,
5928  "progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
5929  tbuf, tps, latency, stdev);
5930 
5931  if (throttle_delay)
5932  {
5933  fprintf(stderr, ", lag %.3f ms", lag);
5934  if (latency_limit)
5935  fprintf(stderr, ", " INT64_FORMAT " skipped",
5936  cur.skipped - last.skipped);
5937  }
5938  fprintf(stderr, "\n");
5939 
5940  last = cur;
5941  last_report = now;
5942 
5943  /*
5944  * Ensure that the next report is in the future, in case
5945  * pgbench/postgres got stuck somewhere.
5946  */
5947  do
5948  {
5949  next_report += (int64) progress * 1000000;
5950  } while (now >= next_report);
5951  }
5952  }
5953  }
5954 
5955 done:
5956  INSTR_TIME_SET_CURRENT(start);
5957  disconnect_all(state, nstate);
5959  INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
5960  if (thread->logfile)
5961  {
5962  if (agg_interval > 0)
5963  {
5964  /* log aggregated but not yet reported transactions */
5965  doLog(thread, state, &aggs, false, 0, 0);
5966  }
5967  fclose(thread->logfile);
5968  thread->logfile = NULL;
5969  }
5970  return NULL;
5971 }
5972 
5973 static void
5975 {
5976  if (st->con != NULL)
5977  {
5978  PQfinish(st->con);
5979  st->con = NULL;
5980  }
5981 }
5982 
5983 /*
5984  * Support for duration option: set timer_exceeded after so many seconds.
5985  */
5986 
5987 #ifndef WIN32
5988 
5989 static void
5991 {
5992  timer_exceeded = true;
5993 }
5994 
5995 static void
5996 setalarm(int seconds)
5997 {
5999  alarm(seconds);
6000 }
6001