PostgreSQL Source Code  git master
pgbench.c
Go to the documentation of this file.
1 /*
2  * pgbench.c
3  *
4  * A simple benchmark program for PostgreSQL
5  * Originally written by Tatsuo Ishii and enhanced by many contributors.
6  *
7  * src/bin/pgbench/pgbench.c
8  * Copyright (c) 2000-2018, PostgreSQL Global Development Group
9  * ALL RIGHTS RESERVED;
10  *
11  * Permission to use, copy, modify, and distribute this software and its
12  * documentation for any purpose, without fee, and without a written agreement
13  * is hereby granted, provided that the above copyright notice and this
14  * paragraph and the following two paragraphs appear in all copies.
15  *
16  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20  * POSSIBILITY OF SUCH DAMAGE.
21  *
22  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24  * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27  *
28  */
29 
30 #ifdef WIN32
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
32 #endif /* ! WIN32 */
33 
34 #include "postgres_fe.h"
35 
36 #include "getopt_long.h"
37 #include "libpq-fe.h"
38 #include "portability/instr_time.h"
39 
40 #include <ctype.h>
41 #include <float.h>
42 #include <limits.h>
43 #include <math.h>
44 #include <signal.h>
45 #include <time.h>
46 #include <sys/time.h>
47 #ifdef HAVE_SYS_SELECT_H
48 #include <sys/select.h>
49 #endif
50 
51 #ifdef HAVE_SYS_RESOURCE_H
52 #include <sys/resource.h> /* for getrlimit */
53 #endif
54 
55 #ifndef M_PI
56 #define M_PI 3.14159265358979323846
57 #endif
58 
59 #include "pgbench.h"
60 
61 #define ERRCODE_UNDEFINED_TABLE "42P01"
62 
63 /*
64  * Multi-platform pthread implementations
65  */
66 
67 #ifdef WIN32
68 /* Use native win32 threads on Windows */
69 typedef struct win32_pthread *pthread_t;
70 typedef int pthread_attr_t;
71 
72 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
73 static int pthread_join(pthread_t th, void **thread_return);
74 #elif defined(ENABLE_THREAD_SAFETY)
75 /* Use platform-dependent pthread capability */
76 #include <pthread.h>
77 #else
78 /* No threads implementation, use none (-j 1) */
79 #define pthread_t void *
80 #endif
81 
82 
83 /********************************************************************
84  * some configurable parameters */
85 
86 /* max number of clients allowed */
87 #ifdef FD_SETSIZE
88 #define MAXCLIENTS (FD_SETSIZE - 10)
89 #else
90 #define MAXCLIENTS 1024
91 #endif
92 
93 #define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */
94 
95 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
96 #define DEFAULT_NXACTS 10 /* default nxacts */
97 
98 #define ZIPF_CACHE_SIZE 15 /* cache cells number */
99 
100 #define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */
101 #define MAX_ZIPFIAN_PARAM 1000 /* maximum parameter for zipfian */
102 
103 int nxacts = 0; /* number of transactions per client */
104 int duration = 0; /* duration in seconds */
105 int64 end_time = 0; /* when to stop in micro seconds, under -T */
106 
107 /*
108  * scaling factor. for example, scale = 10 will make 1000000 tuples in
109  * pgbench_accounts table.
110  */
111 int scale = 1;
112 
113 /*
114  * fillfactor. for example, fillfactor = 90 will use only 90 percent
115  * space during inserts and leave 10 percent free.
116  */
117 int fillfactor = 100;
118 
119 /*
120  * use unlogged tables?
121  */
122 bool unlogged_tables = false;
123 
124 /*
125  * log sampling rate (1.0 = log everything, 0.0 = option not given)
126  */
127 double sample_rate = 0.0;
128 
129 /*
130  * When threads are throttled to a given rate limit, this is the target delay
131  * to reach that rate in usec. 0 is the default and means no throttling.
132  */
133 int64 throttle_delay = 0;
134 
135 /*
136  * Transactions which take longer than this limit (in usec) are counted as
137  * late, and reported as such, although they are completed anyway. When
138  * throttling is enabled, execution time slots that are more than this late
139  * are skipped altogether, and counted separately.
140  */
141 int64 latency_limit = 0;
142 
143 /*
144  * tablespace selection
145  */
146 char *tablespace = NULL;
147 char *index_tablespace = NULL;
148 
149 /*
150  * end of configurable parameters
151  *********************************************************************/
152 
153 #define nbranches 1 /* Makes little sense to change this. Change
154  * -s instead */
155 #define ntellers 10
156 #define naccounts 100000
157 
158 /*
159  * The scale factor at/beyond which 32bit integers are incapable of storing
160  * 64bit values.
161  *
162  * Although the actual threshold is 21474, we use 20000 because it is easier to
163  * document and remember, and isn't that far away from the real threshold.
164  */
165 #define SCALE_32BIT_THRESHOLD 20000
166 
167 bool use_log; /* log transaction latencies to a file */
168 bool use_quiet; /* quiet logging onto stderr */
169 int agg_interval; /* log aggregates instead of individual
170  * transactions */
171 bool per_script_stats = false; /* whether to collect stats per script */
172 int progress = 0; /* thread progress report every this seconds */
173 bool progress_timestamp = false; /* progress report with Unix time */
174 int nclients = 1; /* number of clients */
175 int nthreads = 1; /* number of threads */
176 bool is_connect; /* establish connection for each transaction */
177 bool is_latencies; /* report per-command latencies */
178 int main_pid; /* main process id used in log filename */
179 
180 char *pghost = "";
181 char *pgport = "";
182 char *login = NULL;
183 char *dbName;
184 char *logfile_prefix = NULL;
185 const char *progname;
186 
187 #define WSEP '@' /* weight separator */
188 
189 volatile bool timer_exceeded = false; /* flag from signal handler */
190 
191 /*
192  * Variable definitions.
193  *
194  * If a variable only has a string value, "svalue" is that value, and value is
195  * "not set". If the value is known, "value" contains the value (in any
196  * variant).
197  *
198  * In this case "svalue" contains the string equivalent of the value, if we've
199  * had occasion to compute that, or NULL if we haven't.
200  */
201 typedef struct
202 {
203  char *name; /* variable's name */
204  char *svalue; /* its value in string form, if known */
205  PgBenchValue value; /* actual variable's value */
206 } Variable;
207 
208 #define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */
209 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
210 
211 /*
212  * Simple data structure to keep stats about something.
213  *
214  * XXX probably the first value should be kept and used as an offset for
215  * better numerical stability...
216  */
217 typedef struct SimpleStats
218 {
219  int64 count; /* how many values were encountered */
220  double min; /* the minimum seen */
221  double max; /* the maximum seen */
222  double sum; /* sum of values */
223  double sum2; /* sum of squared values */
224 } SimpleStats;
225 
226 /*
227  * Data structure to hold various statistics: per-thread and per-script stats
228  * are maintained and merged together.
229  */
230 typedef struct StatsData
231 {
232  time_t start_time; /* interval start time, for aggregates */
233  int64 cnt; /* number of transactions, including skipped */
234  int64 skipped; /* number of transactions skipped under --rate
235  * and --latency-limit */
238 } StatsData;
239 
240 /*
241  * Connection state machine states.
242  */
243 typedef enum
244 {
245  /*
246  * The client must first choose a script to execute. Once chosen, it can
247  * either be throttled (state CSTATE_START_THROTTLE under --rate) or start
248  * right away (state CSTATE_START_TX).
249  */
251 
252  /*
253  * In CSTATE_START_THROTTLE state, we calculate when to begin the next
254  * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
255  * sleeps until that moment. (If throttling is not enabled, doCustom()
256  * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
257  */
260 
261  /*
262  * CSTATE_START_TX performs start-of-transaction processing. Establishes
263  * a new connection for the transaction, in --connect mode, and records
264  * the transaction start time.
265  */
267 
268  /*
269  * We loop through these states, to process each command in the script:
270  *
271  * CSTATE_START_COMMAND starts the execution of a command. On a SQL
272  * command, the command is sent to the server, and we move to
273  * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
274  * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
275  * meta-commands are executed immediately.
276  *
277  * CSTATE_WAIT_RESULT waits until we get a result set back from the server
278  * for the current command.
279  *
280  * CSTATE_SLEEP waits until the end of \sleep.
281  *
282  * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
283  * command counter, and loops back to CSTATE_START_COMMAND state.
284  */
289 
290  /*
291  * CSTATE_END_TX performs end-of-transaction processing. Calculates
292  * latency, and logs the transaction. In --connect mode, closes the
293  * current connection. Chooses the next script to execute and starts over
294  * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no
295  * more work to do.
296  */
298 
299  /*
300  * Final states. CSTATE_ABORTED means that the script execution was
301  * aborted because a command failed, CSTATE_FINISHED means success.
302  */
306 
307 /*
308  * Connection state.
309  */
310 typedef struct
311 {
312  PGconn *con; /* connection handle to DB */
313  int id; /* client No. */
314  ConnectionStateEnum state; /* state machine's current state. */
315 
316  int use_file; /* index in sql_script for this client */
317  int command; /* command number in script */
318 
319  /* client variables */
320  Variable *variables; /* array of variable definitions */
321  int nvariables; /* number of variables */
322  bool vars_sorted; /* are variables sorted by name? */
323 
324  /* various times about current transaction */
325  int64 txn_scheduled; /* scheduled start time of transaction (usec) */
326  int64 sleep_until; /* scheduled start time of next cmd (usec) */
327  instr_time txn_begin; /* used for measuring schedule lag times */
328  instr_time stmt_begin; /* used for measuring statement latencies */
329 
330  bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
331 
332  /* per client collected stats */
333  int64 cnt; /* client transaction count, for -t */
334  int ecnt; /* error count */
335 } CState;
336 
337 /*
338  * Cache cell for zipfian_random call
339  */
340 typedef struct
341 {
342  /* cell keys */
343  double s; /* s - parameter of zipfan_random function */
344  int64 n; /* number of elements in range (max - min + 1) */
345 
346  double harmonicn; /* generalizedHarmonicNumber(n, s) */
347  double alpha;
348  double beta;
349  double eta;
350 
351  uint64 last_used; /* last used logical time */
352 } ZipfCell;
353 
354 /*
355  * Zipf cache for zeta values
356  */
357 typedef struct
358 {
359  uint64 current; /* counter for LRU cache replacement algorithm */
360 
361  int nb_cells; /* number of filled cells */
362  int overflowCount; /* number of cache overflows */
364 } ZipfCache;
365 
366 /*
367  * Thread state
368  */
369 typedef struct
370 {
371  int tid; /* thread id */
372  pthread_t thread; /* thread handle */
373  CState *state; /* array of CState */
374  int nstate; /* length of state[] */
375  unsigned short random_state[3]; /* separate randomness for each thread */
376  int64 throttle_trigger; /* previous/next throttling (us) */
377  FILE *logfile; /* where to log, or NULL */
378  ZipfCache zipf_cache; /* for thread-safe zipfian random number
379  * generation */
380 
381  /* per thread collected stats */
382  instr_time start_time; /* thread start time */
385  int64 latency_late; /* executed but late transactions */
386 } TState;
387 
388 #define INVALID_THREAD ((pthread_t) 0)
389 
390 /*
391  * queries read from files
392  */
393 #define SQL_COMMAND 1
394 #define META_COMMAND 2
395 #define MAX_ARGS 10
396 
397 typedef enum MetaCommand
398 {
399  META_NONE, /* not a known meta-command */
400  META_SET, /* \set */
401  META_SETSHELL, /* \setshell */
402  META_SHELL, /* \shell */
403  META_SLEEP /* \sleep */
404 } MetaCommand;
405 
406 typedef enum QueryMode
407 {
408  QUERY_SIMPLE, /* simple query */
409  QUERY_EXTENDED, /* extended query */
410  QUERY_PREPARED, /* extended query with prepared statements */
412 } QueryMode;
413 
414 static QueryMode querymode = QUERY_SIMPLE;
415 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
416 
417 typedef struct
418 {
419  char *line; /* text of command line */
420  int command_num; /* unique index of this Command struct */
421  int type; /* command type (SQL_COMMAND or META_COMMAND) */
422  MetaCommand meta; /* meta command identifier, or META_NONE */
423  int argc; /* number of command words */
424  char *argv[MAX_ARGS]; /* command word list */
425  PgBenchExpr *expr; /* parsed expression, if needed */
426  SimpleStats stats; /* time spent in this command */
427 } Command;
428 
429 typedef struct ParsedScript
430 {
431  const char *desc; /* script descriptor (eg, file name) */
432  int weight; /* selection weight */
433  Command **commands; /* NULL-terminated array of Commands */
434  StatsData stats; /* total time spent in script */
435 } ParsedScript;
436 
437 static ParsedScript sql_script[MAX_SCRIPTS]; /* SQL script files */
438 static int num_scripts; /* number of scripts in sql_script[] */
439 static int num_commands = 0; /* total number of Command structs */
440 static int64 total_weight = 0;
441 
442 static int debug = 0; /* debug flag */
443 
444 /* Builtin test scripts */
445 typedef struct BuiltinScript
446 {
447  const char *name; /* very short name for -b ... */
448  const char *desc; /* short description */
449  const char *script; /* actual pgbench script */
450 } BuiltinScript;
451 
453 {
454  {
455  "tpcb-like",
456  "<builtin: TPC-B (sort of)>",
457  "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
458  "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
459  "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
460  "\\set delta random(-5000, 5000)\n"
461  "BEGIN;\n"
462  "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
463  "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
464  "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
465  "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
466  "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
467  "END;\n"
468  },
469  {
470  "simple-update",
471  "<builtin: simple update>",
472  "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
473  "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
474  "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
475  "\\set delta random(-5000, 5000)\n"
476  "BEGIN;\n"
477  "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
478  "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
479  "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
480  "END;\n"
481  },
482  {
483  "select-only",
484  "<builtin: select only>",
485  "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
486  "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
487  }
488 };
489 
490 
491 /* Function prototypes */
492 static void setNullValue(PgBenchValue *pv);
493 static void setBoolValue(PgBenchValue *pv, bool bval);
494 static void setIntValue(PgBenchValue *pv, int64 ival);
495 static void setDoubleValue(PgBenchValue *pv, double dval);
496 static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *);
497 static void doLog(TState *thread, CState *st,
498  StatsData *agg, bool skipped, double latency, double lag);
499 static void processXactStats(TState *thread, CState *st, instr_time *now,
500  bool skipped, StatsData *agg);
501 static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
502 static void addScript(ParsedScript script);
503 static void *threadRun(void *arg);
504 static void setalarm(int seconds);
505 
506 
507 /* callback functions for our flex lexer */
509  NULL, /* don't need get_variable functionality */
511 };
512 
513 
514 static void
515 usage(void)
516 {
517  printf("%s is a benchmarking tool for PostgreSQL.\n\n"
518  "Usage:\n"
519  " %s [OPTION]... [DBNAME]\n"
520  "\nInitialization options:\n"
521  " -i, --initialize invokes initialization mode\n"
522  " -I, --init-steps=[dtgvpf]+ (default \"dtgvp\")\n"
523  " run selected initialization steps\n"
524  " -F, --fillfactor=NUM set fill factor\n"
525  " -n, --no-vacuum do not run VACUUM during initialization\n"
526  " -q, --quiet quiet logging (one message each 5 seconds)\n"
527  " -s, --scale=NUM scaling factor\n"
528  " --foreign-keys create foreign key constraints between tables\n"
529  " --index-tablespace=TABLESPACE\n"
530  " create indexes in the specified tablespace\n"
531  " --tablespace=TABLESPACE create tables in the specified tablespace\n"
532  " --unlogged-tables create tables as unlogged tables\n"
533  "\nOptions to select what to run:\n"
534  " -b, --builtin=NAME[@W] add builtin script NAME weighted at W (default: 1)\n"
535  " (use \"-b list\" to list available scripts)\n"
536  " -f, --file=FILENAME[@W] add script FILENAME weighted at W (default: 1)\n"
537  " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
538  " (same as \"-b simple-update\")\n"
539  " -S, --select-only perform SELECT-only transactions\n"
540  " (same as \"-b select-only\")\n"
541  "\nBenchmarking options:\n"
542  " -c, --client=NUM number of concurrent database clients (default: 1)\n"
543  " -C, --connect establish new connection for each transaction\n"
544  " -D, --define=VARNAME=VALUE\n"
545  " define variable for use by custom script\n"
546  " -j, --jobs=NUM number of threads (default: 1)\n"
547  " -l, --log write transaction times to log file\n"
548  " -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n"
549  " -M, --protocol=simple|extended|prepared\n"
550  " protocol for submitting queries (default: simple)\n"
551  " -n, --no-vacuum do not run VACUUM before tests\n"
552  " -P, --progress=NUM show thread progress report every NUM seconds\n"
553  " -r, --report-latencies report average latency per command\n"
554  " -R, --rate=NUM target rate in transactions per second\n"
555  " -s, --scale=NUM report this scale factor in output\n"
556  " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
557  " -T, --time=NUM duration of benchmark test in seconds\n"
558  " -v, --vacuum-all vacuum all four standard tables before tests\n"
559  " --aggregate-interval=NUM aggregate data over NUM seconds\n"
560  " --log-prefix=PREFIX prefix for transaction time log file\n"
561  " (default: \"pgbench_log\")\n"
562  " --progress-timestamp use Unix epoch timestamps for progress\n"
563  " --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n"
564  "\nCommon options:\n"
565  " -d, --debug print debugging output\n"
566  " -h, --host=HOSTNAME database server host or socket directory\n"
567  " -p, --port=PORT database server port number\n"
568  " -U, --username=USERNAME connect as specified database user\n"
569  " -V, --version output version information, then exit\n"
570  " -?, --help show this help, then exit\n"
571  "\n"
572  "Report bugs to <pgsql-bugs@postgresql.org>.\n",
573  progname, progname);
574 }
575 
576 /* return whether str matches "^\s*[-+]?[0-9]+$" */
577 static bool
578 is_an_int(const char *str)
579 {
580  const char *ptr = str;
581 
582  /* skip leading spaces; cast is consistent with strtoint64 */
583  while (*ptr && isspace((unsigned char) *ptr))
584  ptr++;
585 
586  /* skip sign */
587  if (*ptr == '+' || *ptr == '-')
588  ptr++;
589 
590  /* at least one digit */
591  if (*ptr && !isdigit((unsigned char) *ptr))
592  return false;
593 
594  /* eat all digits */
595  while (*ptr && isdigit((unsigned char) *ptr))
596  ptr++;
597 
598  /* must have reached end of string */
599  return *ptr == '\0';
600 }
601 
602 
603 /*
604  * strtoint64 -- convert a string to 64-bit integer
605  *
606  * This function is a modified version of scanint8() from
607  * src/backend/utils/adt/int8.c.
608  */
609 int64
610 strtoint64(const char *str)
611 {
612  const char *ptr = str;
613  int64 result = 0;
614  int sign = 1;
615 
616  /*
617  * Do our own scan, rather than relying on sscanf which might be broken
618  * for long long.
619  */
620 
621  /* skip leading spaces */
622  while (*ptr && isspace((unsigned char) *ptr))
623  ptr++;
624 
625  /* handle sign */
626  if (*ptr == '-')
627  {
628  ptr++;
629 
630  /*
631  * Do an explicit check for INT64_MIN. Ugly though this is, it's
632  * cleaner than trying to get the loop below to handle it portably.
633  */
634  if (strncmp(ptr, "9223372036854775808", 19) == 0)
635  {
636  result = PG_INT64_MIN;
637  ptr += 19;
638  goto gotdigits;
639  }
640  sign = -1;
641  }
642  else if (*ptr == '+')
643  ptr++;
644 
645  /* require at least one digit */
646  if (!isdigit((unsigned char) *ptr))
647  fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
648 
649  /* process digits */
650  while (*ptr && isdigit((unsigned char) *ptr))
651  {
652  int64 tmp = result * 10 + (*ptr++ - '0');
653 
654  if ((tmp / 10) != result) /* overflow? */
655  fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
656  result = tmp;
657  }
658 
659 gotdigits:
660 
661  /* allow trailing whitespace, but not other trailing chars */
662  while (*ptr != '\0' && isspace((unsigned char) *ptr))
663  ptr++;
664 
665  if (*ptr != '\0')
666  fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
667 
668  return ((sign < 0) ? -result : result);
669 }
670 
671 /* random number generator: uniform distribution from min to max inclusive */
672 static int64
673 getrand(TState *thread, int64 min, int64 max)
674 {
675  /*
676  * Odd coding is so that min and max have approximately the same chance of
677  * being selected as do numbers between them.
678  *
679  * pg_erand48() is thread-safe and concurrent, which is why we use it
680  * rather than random(), which in glibc is non-reentrant, and therefore
681  * protected by a mutex, and therefore a bottleneck on machines with many
682  * CPUs.
683  */
684  return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
685 }
686 
687 /*
688  * random number generator: exponential distribution from min to max inclusive.
689  * the parameter is so that the density of probability for the last cut-off max
690  * value is exp(-parameter).
691  */
692 static int64
693 getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
694 {
695  double cut,
696  uniform,
697  rand;
698 
699  /* abort if wrong parameter, but must really be checked beforehand */
700  Assert(parameter > 0.0);
701  cut = exp(-parameter);
702  /* erand in [0, 1), uniform in (0, 1] */
703  uniform = 1.0 - pg_erand48(thread->random_state);
704 
705  /*
706  * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
707  */
708  Assert((1.0 - cut) != 0.0);
709  rand = -log(cut + (1.0 - cut) * uniform) / parameter;
710  /* return int64 random number within between min and max */
711  return min + (int64) ((max - min + 1) * rand);
712 }
713 
714 /* random number generator: gaussian distribution from min to max inclusive */
715 static int64
716 getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
717 {
718  double stdev;
719  double rand;
720 
721  /* abort if parameter is too low, but must really be checked beforehand */
722  Assert(parameter >= MIN_GAUSSIAN_PARAM);
723 
724  /*
725  * Get user specified random number from this loop, with -parameter <
726  * stdev <= parameter
727  *
728  * This loop is executed until the number is in the expected range.
729  *
730  * As the minimum parameter is 2.0, the probability of looping is low:
731  * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
732  * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
733  * the worst case. For a parameter value of 5.0, the looping probability
734  * is about e^{-5} * 2 / pi ~ 0.43%.
735  */
736  do
737  {
738  /*
739  * pg_erand48 generates [0,1), but for the basic version of the
740  * Box-Muller transform the two uniformly distributed random numbers
741  * are expected in (0, 1] (see
742  * http://en.wikipedia.org/wiki/Box_muller)
743  */
744  double rand1 = 1.0 - pg_erand48(thread->random_state);
745  double rand2 = 1.0 - pg_erand48(thread->random_state);
746 
747  /* Box-Muller basic form transform */
748  double var_sqrt = sqrt(-2.0 * log(rand1));
749 
750  stdev = var_sqrt * sin(2.0 * M_PI * rand2);
751 
752  /*
753  * we may try with cos, but there may be a bias induced if the
754  * previous value fails the test. To be on the safe side, let us try
755  * over.
756  */
757  }
758  while (stdev < -parameter || stdev >= parameter);
759 
760  /* stdev is in [-parameter, parameter), normalization to [0,1) */
761  rand = (stdev + parameter) / (parameter * 2.0);
762 
763  /* return int64 random number within between min and max */
764  return min + (int64) ((max - min + 1) * rand);
765 }
766 
767 /*
768  * random number generator: generate a value, such that the series of values
769  * will approximate a Poisson distribution centered on the given value.
770  */
771 static int64
772 getPoissonRand(TState *thread, int64 center)
773 {
774  /*
775  * Use inverse transform sampling to generate a value > 0, such that the
776  * expected (i.e. average) value is the given argument.
777  */
778  double uniform;
779 
780  /* erand in [0, 1), uniform in (0, 1] */
781  uniform = 1.0 - pg_erand48(thread->random_state);
782 
783  return (int64) (-log(uniform) * ((double) center) + 0.5);
784 }
785 
786 /* helper function for getZipfianRand */
787 static double
788 generalizedHarmonicNumber(int64 n, double s)
789 {
790  int i;
791  double ans = 0.0;
792 
793  for (i = n; i > 1; i--)
794  ans += pow(i, -s);
795  return ans + 1.0;
796 }
797 
798 /* set harmonicn and other parameters to cache cell */
799 static void
800 zipfSetCacheCell(ZipfCell * cell, int64 n, double s)
801 {
802  double harmonic2;
803 
804  cell->n = n;
805  cell->s = s;
806 
807  harmonic2 = generalizedHarmonicNumber(2, s);
808  cell->harmonicn = generalizedHarmonicNumber(n, s);
809 
810  cell->alpha = 1.0 / (1.0 - s);
811  cell->beta = pow(0.5, s);
812  cell->eta = (1.0 - pow(2.0 / n, 1.0 - s)) / (1.0 - harmonic2 / cell->harmonicn);
813 }
814 
815 /*
816  * search for cache cell with keys (n, s)
817  * and create new cell if it does not exist
818  */
819 static ZipfCell *
820 zipfFindOrCreateCacheCell(ZipfCache * cache, int64 n, double s)
821 {
822  int i,
823  least_recently_used = 0;
824  ZipfCell *cell;
825 
826  /* search cached cell for given parameters */
827  for (i = 0; i < cache->nb_cells; i++)
828  {
829  cell = &cache->cells[i];
830  if (cell->n == n && cell->s == s)
831  return &cache->cells[i];
832 
833  if (cell->last_used < cache->cells[least_recently_used].last_used)
834  least_recently_used = i;
835  }
836 
837  /* create new one if it does not exist */
838  if (cache->nb_cells < ZIPF_CACHE_SIZE)
839  i = cache->nb_cells++;
840  else
841  {
842  /* replace LRU cell if cache is full */
843  i = least_recently_used;
844  cache->overflowCount++;
845  }
846 
847  zipfSetCacheCell(&cache->cells[i], n, s);
848 
849  cache->cells[i].last_used = cache->current++;
850  return &cache->cells[i];
851 }
852 
853 /*
854  * Computing zipfian using rejection method, based on
855  * "Non-Uniform Random Variate Generation",
856  * Luc Devroye, p. 550-551, Springer 1986.
857  */
858 static int64
859 computeIterativeZipfian(TState *thread, int64 n, double s)
860 {
861  double b = pow(2.0, s - 1.0);
862  double x,
863  t,
864  u,
865  v;
866 
867  while (true)
868  {
869  /* random variates */
870  u = pg_erand48(thread->random_state);
871  v = pg_erand48(thread->random_state);
872 
873  x = floor(pow(u, -1.0 / (s - 1.0)));
874 
875  t = pow(1.0 + 1.0 / x, s - 1.0);
876  /* reject if too large or out of bound */
877  if (v * x * (t - 1.0) / (b - 1.0) <= t / b && x <= n)
878  break;
879  }
880  return (int64) x;
881 }
882 
883 /*
884  * Computing zipfian using harmonic numbers, based on algorithm described in
885  * "Quickly Generating Billion-Record Synthetic Databases",
886  * Jim Gray et al, SIGMOD 1994
887  */
888 static int64
889 computeHarmonicZipfian(TState *thread, int64 n, double s)
890 {
891  ZipfCell *cell = zipfFindOrCreateCacheCell(&thread->zipf_cache, n, s);
892  double uniform = pg_erand48(thread->random_state);
893  double uz = uniform * cell->harmonicn;
894 
895  if (uz < 1.0)
896  return 1;
897  if (uz < 1.0 + cell->beta)
898  return 2;
899  return 1 + (int64) (cell->n * pow(cell->eta * uniform - cell->eta + 1.0, cell->alpha));
900 }
901 
902 /* random number generator: zipfian distribution from min to max inclusive */
903 static int64
904 getZipfianRand(TState *thread, int64 min, int64 max, double s)
905 {
906  int64 n = max - min + 1;
907 
908  /* abort if parameter is invalid */
909  Assert(s > 0.0 && s != 1.0 && s <= MAX_ZIPFIAN_PARAM);
910 
911 
912  return min - 1 + ((s > 1)
913  ? computeIterativeZipfian(thread, n, s)
914  : computeHarmonicZipfian(thread, n, s));
915 }
916 
917 /*
918  * Initialize the given SimpleStats struct to all zeroes
919  */
920 static void
922 {
923  memset(ss, 0, sizeof(SimpleStats));
924 }
925 
926 /*
927  * Accumulate one value into a SimpleStats struct.
928  */
929 static void
931 {
932  if (ss->count == 0 || val < ss->min)
933  ss->min = val;
934  if (ss->count == 0 || val > ss->max)
935  ss->max = val;
936  ss->count++;
937  ss->sum += val;
938  ss->sum2 += val * val;
939 }
940 
941 /*
942  * Merge two SimpleStats objects
943  */
944 static void
946 {
947  if (acc->count == 0 || ss->min < acc->min)
948  acc->min = ss->min;
949  if (acc->count == 0 || ss->max > acc->max)
950  acc->max = ss->max;
951  acc->count += ss->count;
952  acc->sum += ss->sum;
953  acc->sum2 += ss->sum2;
954 }
955 
956 /*
957  * Initialize a StatsData struct to mostly zeroes, with its start time set to
958  * the given value.
959  */
960 static void
962 {
963  sd->start_time = start_time;
964  sd->cnt = 0;
965  sd->skipped = 0;
966  initSimpleStats(&sd->latency);
967  initSimpleStats(&sd->lag);
968 }
969 
970 /*
971  * Accumulate one additional item into the given stats object.
972  */
973 static void
974 accumStats(StatsData *stats, bool skipped, double lat, double lag)
975 {
976  stats->cnt++;
977 
978  if (skipped)
979  {
980  /* no latency to record on skipped transactions */
981  stats->skipped++;
982  }
983  else
984  {
985  addToSimpleStats(&stats->latency, lat);
986 
987  /* and possibly the same for schedule lag */
988  if (throttle_delay)
989  addToSimpleStats(&stats->lag, lag);
990  }
991 }
992 
993 /* call PQexec() and exit() on failure */
994 static void
995 executeStatement(PGconn *con, const char *sql)
996 {
997  PGresult *res;
998 
999  res = PQexec(con, sql);
1000  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1001  {
1002  fprintf(stderr, "%s", PQerrorMessage(con));
1003  exit(1);
1004  }
1005  PQclear(res);
1006 }
1007 
1008 /* call PQexec() and complain, but without exiting, on failure */
1009 static void
1010 tryExecuteStatement(PGconn *con, const char *sql)
1011 {
1012  PGresult *res;
1013 
1014  res = PQexec(con, sql);
1015  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1016  {
1017  fprintf(stderr, "%s", PQerrorMessage(con));
1018  fprintf(stderr, "(ignoring this error and continuing anyway)\n");
1019  }
1020  PQclear(res);
1021 }
1022 
1023 /* set up a connection to the backend */
1024 static PGconn *
1026 {
1027  PGconn *conn;
1028  bool new_pass;
1029  static bool have_password = false;
1030  static char password[100];
1031 
1032  /*
1033  * Start the connection. Loop until we have a password if requested by
1034  * backend.
1035  */
1036  do
1037  {
1038 #define PARAMS_ARRAY_SIZE 7
1039 
1040  const char *keywords[PARAMS_ARRAY_SIZE];
1041  const char *values[PARAMS_ARRAY_SIZE];
1042 
1043  keywords[0] = "host";
1044  values[0] = pghost;
1045  keywords[1] = "port";
1046  values[1] = pgport;
1047  keywords[2] = "user";
1048  values[2] = login;
1049  keywords[3] = "password";
1050  values[3] = have_password ? password : NULL;
1051  keywords[4] = "dbname";
1052  values[4] = dbName;
1053  keywords[5] = "fallback_application_name";
1054  values[5] = progname;
1055  keywords[6] = NULL;
1056  values[6] = NULL;
1057 
1058  new_pass = false;
1059 
1060  conn = PQconnectdbParams(keywords, values, true);
1061 
1062  if (!conn)
1063  {
1064  fprintf(stderr, "connection to database \"%s\" failed\n",
1065  dbName);
1066  return NULL;
1067  }
1068 
1069  if (PQstatus(conn) == CONNECTION_BAD &&
1070  PQconnectionNeedsPassword(conn) &&
1071  !have_password)
1072  {
1073  PQfinish(conn);
1074  simple_prompt("Password: ", password, sizeof(password), false);
1075  have_password = true;
1076  new_pass = true;
1077  }
1078  } while (new_pass);
1079 
1080  /* check to see that the backend connection was successfully made */
1081  if (PQstatus(conn) == CONNECTION_BAD)
1082  {
1083  fprintf(stderr, "connection to database \"%s\" failed:\n%s",
1084  dbName, PQerrorMessage(conn));
1085  PQfinish(conn);
1086  return NULL;
1087  }
1088 
1089  return conn;
1090 }
1091 
1092 /* throw away response from backend */
1093 static void
1095 {
1096  PGresult *res;
1097 
1098  do
1099  {
1100  res = PQgetResult(state->con);
1101  if (res)
1102  PQclear(res);
1103  } while (res);
1104 }
1105 
1106 /* qsort comparator for Variable array */
1107 static int
1108 compareVariableNames(const void *v1, const void *v2)
1109 {
1110  return strcmp(((const Variable *) v1)->name,
1111  ((const Variable *) v2)->name);
1112 }
1113 
1114 /* Locate a variable by name; returns NULL if unknown */
1115 static Variable *
1117 {
1118  Variable key;
1119 
1120  /* On some versions of Solaris, bsearch of zero items dumps core */
1121  if (st->nvariables <= 0)
1122  return NULL;
1123 
1124  /* Sort if we have to */
1125  if (!st->vars_sorted)
1126  {
1127  qsort((void *) st->variables, st->nvariables, sizeof(Variable),
1129  st->vars_sorted = true;
1130  }
1131 
1132  /* Now we can search */
1133  key.name = name;
1134  return (Variable *) bsearch((void *) &key,
1135  (void *) st->variables,
1136  st->nvariables,
1137  sizeof(Variable),
1139 }
1140 
1141 /* Get the value of a variable, in string form; returns NULL if unknown */
1142 static char *
1144 {
1145  Variable *var;
1146  char stringform[64];
1147 
1148  var = lookupVariable(st, name);
1149  if (var == NULL)
1150  return NULL; /* not found */
1151 
1152  if (var->svalue)
1153  return var->svalue; /* we have it in string form */
1154 
1155  /* We need to produce a string equivalent of the value */
1156  Assert(var->value.type != PGBT_NO_VALUE);
1157  if (var->value.type == PGBT_NULL)
1158  snprintf(stringform, sizeof(stringform), "NULL");
1159  else if (var->value.type == PGBT_BOOLEAN)
1160  snprintf(stringform, sizeof(stringform),
1161  "%s", var->value.u.bval ? "true" : "false");
1162  else if (var->value.type == PGBT_INT)
1163  snprintf(stringform, sizeof(stringform),
1164  INT64_FORMAT, var->value.u.ival);
1165  else if (var->value.type == PGBT_DOUBLE)
1166  snprintf(stringform, sizeof(stringform),
1167  "%.*g", DBL_DIG, var->value.u.dval);
1168  else /* internal error, unexpected type */
1169  Assert(0);
1170  var->svalue = pg_strdup(stringform);
1171  return var->svalue;
1172 }
1173 
1174 /* Try to convert variable to a value; return false on failure */
1175 static bool
1177 {
1178  size_t slen;
1179 
1180  if (var->value.type != PGBT_NO_VALUE)
1181  return true; /* no work */
1182 
1183  slen = strlen(var->svalue);
1184 
1185  if (slen == 0)
1186  /* what should it do on ""? */
1187  return false;
1188 
1189  if (pg_strcasecmp(var->svalue, "null") == 0)
1190  {
1191  setNullValue(&var->value);
1192  }
1193  /*
1194  * accept prefixes such as y, ye, n, no... but not for "o".
1195  * 0/1 are recognized later as an int, which is converted
1196  * to bool if needed.
1197  */
1198  else if (pg_strncasecmp(var->svalue, "true", slen) == 0 ||
1199  pg_strncasecmp(var->svalue, "yes", slen) == 0 ||
1200  pg_strcasecmp(var->svalue, "on") == 0)
1201  {
1202  setBoolValue(&var->value, true);
1203  }
1204  else if (pg_strncasecmp(var->svalue, "false", slen) == 0 ||
1205  pg_strncasecmp(var->svalue, "no", slen) == 0 ||
1206  pg_strcasecmp(var->svalue, "off") == 0 ||
1207  pg_strcasecmp(var->svalue, "of") == 0)
1208  {
1209  setBoolValue(&var->value, false);
1210  }
1211  else if (is_an_int(var->svalue))
1212  {
1213  setIntValue(&var->value, strtoint64(var->svalue));
1214  }
1215  else /* type should be double */
1216  {
1217  double dv;
1218  char xs;
1219 
1220  if (sscanf(var->svalue, "%lf%c", &dv, &xs) != 1)
1221  {
1222  fprintf(stderr,
1223  "malformed variable \"%s\" value: \"%s\"\n",
1224  var->name, var->svalue);
1225  return false;
1226  }
1227  setDoubleValue(&var->value, dv);
1228  }
1229  return true;
1230 }
1231 
1232 /*
1233  * Check whether a variable's name is allowed.
1234  *
1235  * We allow any non-ASCII character, as well as ASCII letters, digits, and
1236  * underscore.
1237  *
1238  * Keep this in sync with the definitions of variable name characters in
1239  * "src/fe_utils/psqlscan.l", "src/bin/psql/psqlscanslash.l" and
1240  * "src/bin/pgbench/exprscan.l". Also see parseVariable(), below.
1241  *
1242  * Note: this static function is copied from "src/bin/psql/variables.c"
1243  */
1244 static bool
1246 {
1247  const unsigned char *ptr = (const unsigned char *) name;
1248 
1249  /* Mustn't be zero-length */
1250  if (*ptr == '\0')
1251  return false;
1252 
1253  while (*ptr)
1254  {
1255  if (IS_HIGHBIT_SET(*ptr) ||
1256  strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1257  "_0123456789", *ptr) != NULL)
1258  ptr++;
1259  else
1260  return false;
1261  }
1262 
1263  return true;
1264 }
1265 
1266 /*
1267  * Lookup a variable by name, creating it if need be.
1268  * Caller is expected to assign a value to the variable.
1269  * Returns NULL on failure (bad name).
1270  */
1271 static Variable *
1272 lookupCreateVariable(CState *st, const char *context, char *name)
1273 {
1274  Variable *var;
1275 
1276  var = lookupVariable(st, name);
1277  if (var == NULL)
1278  {
1279  Variable *newvars;
1280 
1281  /*
1282  * Check for the name only when declaring a new variable to avoid
1283  * overhead.
1284  */
1285  if (!valid_variable_name(name))
1286  {
1287  fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
1288  context, name);
1289  return NULL;
1290  }
1291 
1292  /* Create variable at the end of the array */
1293  if (st->variables)
1294  newvars = (Variable *) pg_realloc(st->variables,
1295  (st->nvariables + 1) * sizeof(Variable));
1296  else
1297  newvars = (Variable *) pg_malloc(sizeof(Variable));
1298 
1299  st->variables = newvars;
1300 
1301  var = &newvars[st->nvariables];
1302 
1303  var->name = pg_strdup(name);
1304  var->svalue = NULL;
1305  /* caller is expected to initialize remaining fields */
1306 
1307  st->nvariables++;
1308  /* we don't re-sort the array till we have to */
1309  st->vars_sorted = false;
1310  }
1311 
1312  return var;
1313 }
1314 
1315 /* Assign a string value to a variable, creating it if need be */
1316 /* Returns false on failure (bad name) */
1317 static bool
1318 putVariable(CState *st, const char *context, char *name, const char *value)
1319 {
1320  Variable *var;
1321  char *val;
1322 
1323  var = lookupCreateVariable(st, context, name);
1324  if (!var)
1325  return false;
1326 
1327  /* dup then free, in case value is pointing at this variable */
1328  val = pg_strdup(value);
1329 
1330  if (var->svalue)
1331  free(var->svalue);
1332  var->svalue = val;
1333  var->value.type = PGBT_NO_VALUE;
1334 
1335  return true;
1336 }
1337 
1338 /* Assign a value to a variable, creating it if need be */
1339 /* Returns false on failure (bad name) */
1340 static bool
1341 putVariableValue(CState *st, const char *context, char *name,
1342  const PgBenchValue *value)
1343 {
1344  Variable *var;
1345 
1346  var = lookupCreateVariable(st, context, name);
1347  if (!var)
1348  return false;
1349 
1350  if (var->svalue)
1351  free(var->svalue);
1352  var->svalue = NULL;
1353  var->value = *value;
1354 
1355  return true;
1356 }
1357 
1358 /* Assign an integer value to a variable, creating it if need be */
1359 /* Returns false on failure (bad name) */
1360 static bool
1361 putVariableInt(CState *st, const char *context, char *name, int64 value)
1362 {
1363  PgBenchValue val;
1364 
1365  setIntValue(&val, value);
1366  return putVariableValue(st, context, name, &val);
1367 }
1368 
1369 /*
1370  * Parse a possible variable reference (:varname).
1371  *
1372  * "sql" points at a colon. If what follows it looks like a valid
1373  * variable name, return a malloc'd string containing the variable name,
1374  * and set *eaten to the number of characters consumed.
1375  * Otherwise, return NULL.
1376  */
1377 static char *
1378 parseVariable(const char *sql, int *eaten)
1379 {
1380  int i = 0;
1381  char *name;
1382 
1383  do
1384  {
1385  i++;
1386  } while (IS_HIGHBIT_SET(sql[i]) ||
1387  strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1388  "_0123456789", sql[i]) != NULL);
1389  if (i == 1)
1390  return NULL; /* no valid variable name chars */
1391 
1392  name = pg_malloc(i);
1393  memcpy(name, &sql[1], i - 1);
1394  name[i - 1] = '\0';
1395 
1396  *eaten = i;
1397  return name;
1398 }
1399 
1400 static char *
1401 replaceVariable(char **sql, char *param, int len, char *value)
1402 {
1403  int valueln = strlen(value);
1404 
1405  if (valueln > len)
1406  {
1407  size_t offset = param - *sql;
1408 
1409  *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
1410  param = *sql + offset;
1411  }
1412 
1413  if (valueln != len)
1414  memmove(param + valueln, param + len, strlen(param + len) + 1);
1415  memcpy(param, value, valueln);
1416 
1417  return param + valueln;
1418 }
1419 
1420 static char *
1421 assignVariables(CState *st, char *sql)
1422 {
1423  char *p,
1424  *name,
1425  *val;
1426 
1427  p = sql;
1428  while ((p = strchr(p, ':')) != NULL)
1429  {
1430  int eaten;
1431 
1432  name = parseVariable(p, &eaten);
1433  if (name == NULL)
1434  {
1435  while (*p == ':')
1436  {
1437  p++;
1438  }
1439  continue;
1440  }
1441 
1442  val = getVariable(st, name);
1443  free(name);
1444  if (val == NULL)
1445  {
1446  p++;
1447  continue;
1448  }
1449 
1450  p = replaceVariable(&sql, p, eaten, val);
1451  }
1452 
1453  return sql;
1454 }
1455 
1456 static void
1457 getQueryParams(CState *st, const Command *command, const char **params)
1458 {
1459  int i;
1460 
1461  for (i = 0; i < command->argc - 1; i++)
1462  params[i] = getVariable(st, command->argv[i + 1]);
1463 }
1464 
1465 static char *
1467 {
1468  if (pval->type == PGBT_NO_VALUE)
1469  return "none";
1470  else if (pval->type == PGBT_NULL)
1471  return "null";
1472  else if (pval->type == PGBT_INT)
1473  return "int";
1474  else if (pval->type == PGBT_DOUBLE)
1475  return "double";
1476  else if (pval->type == PGBT_BOOLEAN)
1477  return "boolean";
1478  else
1479  {
1480  /* internal error, should never get there */
1481  Assert(false);
1482  return NULL;
1483  }
1484 }
1485 
1486 /* get a value as a boolean, or tell if there is a problem */
1487 static bool
1488 coerceToBool(PgBenchValue *pval, bool *bval)
1489 {
1490  if (pval->type == PGBT_BOOLEAN)
1491  {
1492  *bval = pval->u.bval;
1493  return true;
1494  }
1495  else /* NULL, INT or DOUBLE */
1496  {
1497  fprintf(stderr, "cannot coerce %s to boolean\n", valueTypeName(pval));
1498  return false;
1499  }
1500 }
1501 
1502 /*
1503  * Return true or false from an expression for conditional purposes.
1504  * Non zero numerical values are true, zero and NULL are false.
1505  */
1506 static bool
1508 {
1509  switch (pval->type)
1510  {
1511  case PGBT_NULL:
1512  return false;
1513  case PGBT_BOOLEAN:
1514  return pval->u.bval;
1515  case PGBT_INT:
1516  return pval->u.ival != 0;
1517  case PGBT_DOUBLE:
1518  return pval->u.dval != 0.0;
1519  default:
1520  /* internal error, unexpected type */
1521  Assert(0);
1522  return false;
1523  }
1524 }
1525 
1526 /* get a value as an int, tell if there is a problem */
1527 static bool
1528 coerceToInt(PgBenchValue *pval, int64 *ival)
1529 {
1530  if (pval->type == PGBT_INT)
1531  {
1532  *ival = pval->u.ival;
1533  return true;
1534  }
1535  else if (pval->type == PGBT_DOUBLE)
1536  {
1537  double dval = pval->u.dval;
1538 
1539  if (dval < PG_INT64_MIN || PG_INT64_MAX < dval)
1540  {
1541  fprintf(stderr, "double to int overflow for %f\n", dval);
1542  return false;
1543  }
1544  *ival = (int64) dval;
1545  return true;
1546  }
1547  else /* BOOLEAN or NULL */
1548  {
1549  fprintf(stderr, "cannot coerce %s to int\n", valueTypeName(pval));
1550  return false;
1551  }
1552 }
1553 
1554 /* get a value as a double, or tell if there is a problem */
1555 static bool
1556 coerceToDouble(PgBenchValue *pval, double *dval)
1557 {
1558  if (pval->type == PGBT_DOUBLE)
1559  {
1560  *dval = pval->u.dval;
1561  return true;
1562  }
1563  else if (pval->type == PGBT_INT)
1564  {
1565  *dval = (double) pval->u.ival;
1566  return true;
1567  }
1568  else /* BOOLEAN or NULL */
1569  {
1570  fprintf(stderr, "cannot coerce %s to double\n", valueTypeName(pval));
1571  return false;
1572  }
1573 }
1574 
1575 /* assign a null value */
1576 static void
1578 {
1579  pv->type = PGBT_NULL;
1580  pv->u.ival = 0;
1581 }
1582 
1583 /* assign a boolean value */
1584 static void
1586 {
1587  pv->type = PGBT_BOOLEAN;
1588  pv->u.bval = bval;
1589 }
1590 /* assign an integer value */
1591 static void
1592 setIntValue(PgBenchValue *pv, int64 ival)
1593 {
1594  pv->type = PGBT_INT;
1595  pv->u.ival = ival;
1596 }
1597 
1598 /* assign a double value */
1599 static void
1600 setDoubleValue(PgBenchValue *pv, double dval)
1601 {
1602  pv->type = PGBT_DOUBLE;
1603  pv->u.dval = dval;
1604 }
1605 
1606 static bool isLazyFunc(PgBenchFunction func)
1607 {
1608  return func == PGBENCH_AND || func == PGBENCH_OR || func == PGBENCH_CASE;
1609 }
1610 
1611 /* lazy evaluation of some functions */
1612 static bool
1615 {
1616  PgBenchValue a1, a2;
1617  bool ba1, ba2;
1618 
1619  Assert(isLazyFunc(func) && args != NULL && args->next != NULL);
1620 
1621  /* args points to first condition */
1622  if (!evaluateExpr(thread, st, args->expr, &a1))
1623  return false;
1624 
1625  /* second condition for AND/OR and corresponding branch for CASE */
1626  args = args->next;
1627 
1628  switch (func)
1629  {
1630  case PGBENCH_AND:
1631  if (a1.type == PGBT_NULL)
1632  {
1633  setNullValue(retval);
1634  return true;
1635  }
1636 
1637  if (!coerceToBool(&a1, &ba1))
1638  return false;
1639 
1640  if (!ba1)
1641  {
1642  setBoolValue(retval, false);
1643  return true;
1644  }
1645 
1646  if (!evaluateExpr(thread, st, args->expr, &a2))
1647  return false;
1648 
1649  if (a2.type == PGBT_NULL)
1650  {
1651  setNullValue(retval);
1652  return true;
1653  }
1654  else if (!coerceToBool(&a2, &ba2))
1655  return false;
1656  else
1657  {
1658  setBoolValue(retval, ba2);
1659  return true;
1660  }
1661 
1662  return true;
1663 
1664  case PGBENCH_OR:
1665 
1666  if (a1.type == PGBT_NULL)
1667  {
1668  setNullValue(retval);
1669  return true;
1670  }
1671 
1672  if (!coerceToBool(&a1, &ba1))
1673  return false;
1674 
1675  if (ba1)
1676  {
1677  setBoolValue(retval, true);
1678  return true;
1679  }
1680 
1681  if (!evaluateExpr(thread, st, args->expr, &a2))
1682  return false;
1683 
1684  if (a2.type == PGBT_NULL)
1685  {
1686  setNullValue(retval);
1687  return true;
1688  }
1689  else if (!coerceToBool(&a2, &ba2))
1690  return false;
1691  else
1692  {
1693  setBoolValue(retval, ba2);
1694  return true;
1695  }
1696 
1697  case PGBENCH_CASE:
1698  /* when true, execute branch */
1699  if (valueTruth(&a1))
1700  return evaluateExpr(thread, st, args->expr, retval);
1701 
1702  /* now args contains next condition or final else expression */
1703  args = args->next;
1704 
1705  /* final else case? */
1706  if (args->next == NULL)
1707  return evaluateExpr(thread, st, args->expr, retval);
1708 
1709  /* no, another when, proceed */
1710  return evalLazyFunc(thread, st, PGBENCH_CASE, args, retval);
1711 
1712  default:
1713  /* internal error, cannot get here */
1714  Assert(0);
1715  break;
1716  }
1717  return false;
1718 }
1719 
1720 /* maximum number of function arguments */
1721 #define MAX_FARGS 16
1722 
1723 /*
1724  * Recursive evaluation of standard functions,
1725  * which do not require lazy evaluation.
1726  */
1727 static bool
1729  TState *thread, CState *st,
1731 {
1732  /* evaluate all function arguments */
1733  int nargs = 0;
1734  PgBenchValue vargs[MAX_FARGS];
1735  PgBenchExprLink *l = args;
1736  bool has_null = false;
1737 
1738  for (nargs = 0; nargs < MAX_FARGS && l != NULL; nargs++, l = l->next)
1739  {
1740  if (!evaluateExpr(thread, st, l->expr, &vargs[nargs]))
1741  return false;
1742  has_null |= vargs[nargs].type == PGBT_NULL;
1743  }
1744 
1745  if (l != NULL)
1746  {
1747  fprintf(stderr,
1748  "too many function arguments, maximum is %d\n", MAX_FARGS);
1749  return false;
1750  }
1751 
1752  /* NULL arguments */
1753  if (has_null && func != PGBENCH_IS && func != PGBENCH_DEBUG)
1754  {
1755  setNullValue(retval);
1756  return true;
1757  }
1758 
1759  /* then evaluate function */
1760  switch (func)
1761  {
1762  /* overloaded operators */
1763  case PGBENCH_ADD:
1764  case PGBENCH_SUB:
1765  case PGBENCH_MUL:
1766  case PGBENCH_DIV:
1767  case PGBENCH_MOD:
1768  case PGBENCH_EQ:
1769  case PGBENCH_NE:
1770  case PGBENCH_LE:
1771  case PGBENCH_LT:
1772  {
1773  PgBenchValue *lval = &vargs[0],
1774  *rval = &vargs[1];
1775 
1776  Assert(nargs == 2);
1777 
1778  /* overloaded type management, double if some double */
1779  if ((lval->type == PGBT_DOUBLE ||
1780  rval->type == PGBT_DOUBLE) && func != PGBENCH_MOD)
1781  {
1782  double ld,
1783  rd;
1784 
1785  if (!coerceToDouble(lval, &ld) ||
1786  !coerceToDouble(rval, &rd))
1787  return false;
1788 
1789  switch (func)
1790  {
1791  case PGBENCH_ADD:
1792  setDoubleValue(retval, ld + rd);
1793  return true;
1794 
1795  case PGBENCH_SUB:
1796  setDoubleValue(retval, ld - rd);
1797  return true;
1798 
1799  case PGBENCH_MUL:
1800  setDoubleValue(retval, ld * rd);
1801  return true;
1802 
1803  case PGBENCH_DIV:
1804  setDoubleValue(retval, ld / rd);
1805  return true;
1806 
1807  case PGBENCH_EQ:
1808  setBoolValue(retval, ld == rd);
1809  return true;
1810 
1811  case PGBENCH_NE:
1812  setBoolValue(retval, ld != rd);
1813  return true;
1814 
1815  case PGBENCH_LE:
1816  setBoolValue(retval, ld <= rd);
1817  return true;
1818 
1819  case PGBENCH_LT:
1820  setBoolValue(retval, ld < rd);
1821  return true;
1822 
1823  default:
1824  /* cannot get here */
1825  Assert(0);
1826  }
1827  }
1828  else /* we have integer operands, or % */
1829  {
1830  int64 li,
1831  ri;
1832 
1833  if (!coerceToInt(lval, &li) ||
1834  !coerceToInt(rval, &ri))
1835  return false;
1836 
1837  switch (func)
1838  {
1839  case PGBENCH_ADD:
1840  setIntValue(retval, li + ri);
1841  return true;
1842 
1843  case PGBENCH_SUB:
1844  setIntValue(retval, li - ri);
1845  return true;
1846 
1847  case PGBENCH_MUL:
1848  setIntValue(retval, li * ri);
1849  return true;
1850 
1851  case PGBENCH_EQ:
1852  setBoolValue(retval, li == ri);
1853  return true;
1854 
1855  case PGBENCH_NE:
1856  setBoolValue(retval, li != ri);
1857  return true;
1858 
1859  case PGBENCH_LE:
1860  setBoolValue(retval, li <= ri);
1861  return true;
1862 
1863  case PGBENCH_LT:
1864  setBoolValue(retval, li < ri);
1865  return true;
1866 
1867  case PGBENCH_DIV:
1868  case PGBENCH_MOD:
1869  if (ri == 0)
1870  {
1871  fprintf(stderr, "division by zero\n");
1872  return false;
1873  }
1874  /* special handling of -1 divisor */
1875  if (ri == -1)
1876  {
1877  if (func == PGBENCH_DIV)
1878  {
1879  /* overflow check (needed for INT64_MIN) */
1880  if (li == PG_INT64_MIN)
1881  {
1882  fprintf(stderr, "bigint out of range\n");
1883  return false;
1884  }
1885  else
1886  setIntValue(retval, -li);
1887  }
1888  else
1889  setIntValue(retval, 0);
1890  return true;
1891  }
1892  /* else divisor is not -1 */
1893  if (func == PGBENCH_DIV)
1894  setIntValue(retval, li / ri);
1895  else /* func == PGBENCH_MOD */
1896  setIntValue(retval, li % ri);
1897 
1898  return true;
1899 
1900  default:
1901  /* cannot get here */
1902  Assert(0);
1903  }
1904  }
1905  }
1906 
1907  /* integer bitwise operators */
1908  case PGBENCH_BITAND:
1909  case PGBENCH_BITOR:
1910  case PGBENCH_BITXOR:
1911  case PGBENCH_LSHIFT:
1912  case PGBENCH_RSHIFT:
1913  {
1914  int64 li, ri;
1915 
1916  if (!coerceToInt(&vargs[0], &li) || !coerceToInt(&vargs[1], &ri))
1917  return false;
1918 
1919  if (func == PGBENCH_BITAND)
1920  setIntValue(retval, li & ri);
1921  else if (func == PGBENCH_BITOR)
1922  setIntValue(retval, li | ri);
1923  else if (func == PGBENCH_BITXOR)
1924  setIntValue(retval, li ^ ri);
1925  else if (func == PGBENCH_LSHIFT)
1926  setIntValue(retval, li << ri);
1927  else if (func == PGBENCH_RSHIFT)
1928  setIntValue(retval, li >> ri);
1929  else /* cannot get here */
1930  Assert(0);
1931 
1932  return true;
1933  }
1934 
1935  /* logical operators */
1936  case PGBENCH_NOT:
1937  {
1938  bool b;
1939  if (!coerceToBool(&vargs[0], &b))
1940  return false;
1941 
1942  setBoolValue(retval, !b);
1943  return true;
1944  }
1945 
1946  /* no arguments */
1947  case PGBENCH_PI:
1948  setDoubleValue(retval, M_PI);
1949  return true;
1950 
1951  /* 1 overloaded argument */
1952  case PGBENCH_ABS:
1953  {
1954  PgBenchValue *varg = &vargs[0];
1955 
1956  Assert(nargs == 1);
1957 
1958  if (varg->type == PGBT_INT)
1959  {
1960  int64 i = varg->u.ival;
1961 
1962  setIntValue(retval, i < 0 ? -i : i);
1963  }
1964  else
1965  {
1966  double d = varg->u.dval;
1967 
1968  Assert(varg->type == PGBT_DOUBLE);
1969  setDoubleValue(retval, d < 0.0 ? -d : d);
1970  }
1971 
1972  return true;
1973  }
1974 
1975  case PGBENCH_DEBUG:
1976  {
1977  PgBenchValue *varg = &vargs[0];
1978 
1979  Assert(nargs == 1);
1980 
1981  fprintf(stderr, "debug(script=%d,command=%d): ",
1982  st->use_file, st->command + 1);
1983 
1984  if (varg->type == PGBT_NULL)
1985  fprintf(stderr, "null\n");
1986  else if (varg->type == PGBT_BOOLEAN)
1987  fprintf(stderr, "boolean %s\n", varg->u.bval ? "true" : "false");
1988  else if (varg->type == PGBT_INT)
1989  fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
1990  else if (varg->type == PGBT_DOUBLE)
1991  fprintf(stderr, "double %.*g\n", DBL_DIG, varg->u.dval);
1992  else /* internal error, unexpected type */
1993  Assert(0);
1994 
1995  *retval = *varg;
1996 
1997  return true;
1998  }
1999 
2000  /* 1 double argument */
2001  case PGBENCH_DOUBLE:
2002  case PGBENCH_SQRT:
2003  case PGBENCH_LN:
2004  case PGBENCH_EXP:
2005  {
2006  double dval;
2007 
2008  Assert(nargs == 1);
2009 
2010  if (!coerceToDouble(&vargs[0], &dval))
2011  return false;
2012 
2013  if (func == PGBENCH_SQRT)
2014  dval = sqrt(dval);
2015  else if (func == PGBENCH_LN)
2016  dval = log(dval);
2017  else if (func == PGBENCH_EXP)
2018  dval = exp(dval);
2019  /* else is cast: do nothing */
2020 
2021  setDoubleValue(retval, dval);
2022  return true;
2023  }
2024 
2025  /* 1 int argument */
2026  case PGBENCH_INT:
2027  {
2028  int64 ival;
2029 
2030  Assert(nargs == 1);
2031 
2032  if (!coerceToInt(&vargs[0], &ival))
2033  return false;
2034 
2035  setIntValue(retval, ival);
2036  return true;
2037  }
2038 
2039  /* variable number of arguments */
2040  case PGBENCH_LEAST:
2041  case PGBENCH_GREATEST:
2042  {
2043  bool havedouble;
2044  int i;
2045 
2046  Assert(nargs >= 1);
2047 
2048  /* need double result if any input is double */
2049  havedouble = false;
2050  for (i = 0; i < nargs; i++)
2051  {
2052  if (vargs[i].type == PGBT_DOUBLE)
2053  {
2054  havedouble = true;
2055  break;
2056  }
2057  }
2058  if (havedouble)
2059  {
2060  double extremum;
2061 
2062  if (!coerceToDouble(&vargs[0], &extremum))
2063  return false;
2064  for (i = 1; i < nargs; i++)
2065  {
2066  double dval;
2067 
2068  if (!coerceToDouble(&vargs[i], &dval))
2069  return false;
2070  if (func == PGBENCH_LEAST)
2071  extremum = Min(extremum, dval);
2072  else
2073  extremum = Max(extremum, dval);
2074  }
2075  setDoubleValue(retval, extremum);
2076  }
2077  else
2078  {
2079  int64 extremum;
2080 
2081  if (!coerceToInt(&vargs[0], &extremum))
2082  return false;
2083  for (i = 1; i < nargs; i++)
2084  {
2085  int64 ival;
2086 
2087  if (!coerceToInt(&vargs[i], &ival))
2088  return false;
2089  if (func == PGBENCH_LEAST)
2090  extremum = Min(extremum, ival);
2091  else
2092  extremum = Max(extremum, ival);
2093  }
2094  setIntValue(retval, extremum);
2095  }
2096  return true;
2097  }
2098 
2099  /* random functions */
2100  case PGBENCH_RANDOM:
2104  {
2105  int64 imin,
2106  imax;
2107 
2108  Assert(nargs >= 2);
2109 
2110  if (!coerceToInt(&vargs[0], &imin) ||
2111  !coerceToInt(&vargs[1], &imax))
2112  return false;
2113 
2114  /* check random range */
2115  if (imin > imax)
2116  {
2117  fprintf(stderr, "empty range given to random\n");
2118  return false;
2119  }
2120  else if (imax - imin < 0 || (imax - imin) + 1 < 0)
2121  {
2122  /* prevent int overflows in random functions */
2123  fprintf(stderr, "random range is too large\n");
2124  return false;
2125  }
2126 
2127  if (func == PGBENCH_RANDOM)
2128  {
2129  Assert(nargs == 2);
2130  setIntValue(retval, getrand(thread, imin, imax));
2131  }
2132  else /* gaussian & exponential */
2133  {
2134  double param;
2135 
2136  Assert(nargs == 3);
2137 
2138  if (!coerceToDouble(&vargs[2], &param))
2139  return false;
2140 
2141  if (func == PGBENCH_RANDOM_GAUSSIAN)
2142  {
2143  if (param < MIN_GAUSSIAN_PARAM)
2144  {
2145  fprintf(stderr,
2146  "gaussian parameter must be at least %f "
2147  "(not %f)\n", MIN_GAUSSIAN_PARAM, param);
2148  return false;
2149  }
2150 
2151  setIntValue(retval,
2152  getGaussianRand(thread, imin, imax, param));
2153  }
2154  else if (func == PGBENCH_RANDOM_ZIPFIAN)
2155  {
2156  if (param <= 0.0 || param == 1.0 || param > MAX_ZIPFIAN_PARAM)
2157  {
2158  fprintf(stderr,
2159  "zipfian parameter must be in range (0, 1) U (1, %d]"
2160  " (got %f)\n", MAX_ZIPFIAN_PARAM, param);
2161  return false;
2162  }
2163  setIntValue(retval,
2164  getZipfianRand(thread, imin, imax, param));
2165  }
2166  else /* exponential */
2167  {
2168  if (param <= 0.0)
2169  {
2170  fprintf(stderr,
2171  "exponential parameter must be greater than zero"
2172  " (got %f)\n", param);
2173  return false;
2174  }
2175 
2176  setIntValue(retval,
2177  getExponentialRand(thread, imin, imax, param));
2178  }
2179  }
2180 
2181  return true;
2182  }
2183 
2184  case PGBENCH_POW:
2185  {
2186  PgBenchValue *lval = &vargs[0];
2187  PgBenchValue *rval = &vargs[1];
2188  double ld,
2189  rd;
2190 
2191  Assert(nargs == 2);
2192 
2193  if (!coerceToDouble(lval, &ld) ||
2194  !coerceToDouble(rval, &rd))
2195  return false;
2196 
2197  setDoubleValue(retval, pow(ld, rd));
2198 
2199  return true;
2200  }
2201 
2202  case PGBENCH_IS:
2203  {
2204  Assert(nargs == 2);
2205  /* note: this simple implementation is more permissive than SQL */
2206  setBoolValue(retval,
2207  vargs[0].type == vargs[1].type &&
2208  vargs[0].u.bval == vargs[1].u.bval);
2209  return true;
2210  }
2211 
2212  default:
2213  /* cannot get here */
2214  Assert(0);
2215  /* dead code to avoid a compiler warning */
2216  return false;
2217  }
2218 }
2219 
2220 /* evaluate some function */
2221 static bool
2222 evalFunc(TState *thread, CState *st,
2224 {
2225  if (isLazyFunc(func))
2226  return evalLazyFunc(thread, st, func, args, retval);
2227  else
2228  return evalStandardFunc(thread, st, func, args, retval);
2229 }
2230 
2231 /*
2232  * Recursive evaluation of an expression in a pgbench script
2233  * using the current state of variables.
2234  * Returns whether the evaluation was ok,
2235  * the value itself is returned through the retval pointer.
2236  */
2237 static bool
2238 evaluateExpr(TState *thread, CState *st, PgBenchExpr *expr, PgBenchValue *retval)
2239 {
2240  switch (expr->etype)
2241  {
2242  case ENODE_CONSTANT:
2243  {
2244  *retval = expr->u.constant;
2245  return true;
2246  }
2247 
2248  case ENODE_VARIABLE:
2249  {
2250  Variable *var;
2251 
2252  if ((var = lookupVariable(st, expr->u.variable.varname)) == NULL)
2253  {
2254  fprintf(stderr, "undefined variable \"%s\"\n",
2255  expr->u.variable.varname);
2256  return false;
2257  }
2258 
2259  if (!makeVariableValue(var))
2260  return false;
2261 
2262  *retval = var->value;
2263  return true;
2264  }
2265 
2266  case ENODE_FUNCTION:
2267  return evalFunc(thread, st,
2268  expr->u.function.function,
2269  expr->u.function.args,
2270  retval);
2271 
2272  default:
2273  /* internal error which should never occur */
2274  fprintf(stderr, "unexpected enode type in evaluation: %d\n",
2275  expr->etype);
2276  exit(1);
2277  }
2278 }
2279 
2280 /*
2281  * Convert command name to meta-command enum identifier
2282  */
2283 static MetaCommand
2284 getMetaCommand(const char *cmd)
2285 {
2286  MetaCommand mc;
2287 
2288  if (cmd == NULL)
2289  mc = META_NONE;
2290  else if (pg_strcasecmp(cmd, "set") == 0)
2291  mc = META_SET;
2292  else if (pg_strcasecmp(cmd, "setshell") == 0)
2293  mc = META_SETSHELL;
2294  else if (pg_strcasecmp(cmd, "shell") == 0)
2295  mc = META_SHELL;
2296  else if (pg_strcasecmp(cmd, "sleep") == 0)
2297  mc = META_SLEEP;
2298  else
2299  mc = META_NONE;
2300  return mc;
2301 }
2302 
2303 /*
2304  * Run a shell command. The result is assigned to the variable if not NULL.
2305  * Return true if succeeded, or false on error.
2306  */
2307 static bool
2308 runShellCommand(CState *st, char *variable, char **argv, int argc)
2309 {
2310  char command[SHELL_COMMAND_SIZE];
2311  int i,
2312  len = 0;
2313  FILE *fp;
2314  char res[64];
2315  char *endptr;
2316  int retval;
2317 
2318  /*----------
2319  * Join arguments with whitespace separators. Arguments starting with
2320  * exactly one colon are treated as variables:
2321  * name - append a string "name"
2322  * :var - append a variable named 'var'
2323  * ::name - append a string ":name"
2324  *----------
2325  */
2326  for (i = 0; i < argc; i++)
2327  {
2328  char *arg;
2329  int arglen;
2330 
2331  if (argv[i][0] != ':')
2332  {
2333  arg = argv[i]; /* a string literal */
2334  }
2335  else if (argv[i][1] == ':')
2336  {
2337  arg = argv[i] + 1; /* a string literal starting with colons */
2338  }
2339  else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
2340  {
2341  fprintf(stderr, "%s: undefined variable \"%s\"\n",
2342  argv[0], argv[i]);
2343  return false;
2344  }
2345 
2346  arglen = strlen(arg);
2347  if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
2348  {
2349  fprintf(stderr, "%s: shell command is too long\n", argv[0]);
2350  return false;
2351  }
2352 
2353  if (i > 0)
2354  command[len++] = ' ';
2355  memcpy(command + len, arg, arglen);
2356  len += arglen;
2357  }
2358 
2359  command[len] = '\0';
2360 
2361  /* Fast path for non-assignment case */
2362  if (variable == NULL)
2363  {
2364  if (system(command))
2365  {
2366  if (!timer_exceeded)
2367  fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
2368  return false;
2369  }
2370  return true;
2371  }
2372 
2373  /* Execute the command with pipe and read the standard output. */
2374  if ((fp = popen(command, "r")) == NULL)
2375  {
2376  fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
2377  return false;
2378  }
2379  if (fgets(res, sizeof(res), fp) == NULL)
2380  {
2381  if (!timer_exceeded)
2382  fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
2383  (void) pclose(fp);
2384  return false;
2385  }
2386  if (pclose(fp) < 0)
2387  {
2388  fprintf(stderr, "%s: could not close shell command\n", argv[0]);
2389  return false;
2390  }
2391 
2392  /* Check whether the result is an integer and assign it to the variable */
2393  retval = (int) strtol(res, &endptr, 10);
2394  while (*endptr != '\0' && isspace((unsigned char) *endptr))
2395  endptr++;
2396  if (*res == '\0' || *endptr != '\0')
2397  {
2398  fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
2399  argv[0], res);
2400  return false;
2401  }
2402  if (!putVariableInt(st, "setshell", variable, retval))
2403  return false;
2404 
2405 #ifdef DEBUG
2406  printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
2407 #endif
2408  return true;
2409 }
2410 
2411 #define MAX_PREPARE_NAME 32
2412 static void
2413 preparedStatementName(char *buffer, int file, int state)
2414 {
2415  sprintf(buffer, "P%d_%d", file, state);
2416 }
2417 
2418 static void
2419 commandFailed(CState *st, const char *message)
2420 {
2421  fprintf(stderr,
2422  "client %d aborted in command %d of script %d; %s\n",
2423  st->id, st->command, st->use_file, message);
2424 }
2425 
2426 /* return a script number with a weighted choice. */
2427 static int
2429 {
2430  int i = 0;
2431  int64 w;
2432 
2433  if (num_scripts == 1)
2434  return 0;
2435 
2436  w = getrand(thread, 0, total_weight - 1);
2437  do
2438  {
2439  w -= sql_script[i++].weight;
2440  } while (w >= 0);
2441 
2442  return i - 1;
2443 }
2444 
2445 /* Send a SQL command, using the chosen querymode */
2446 static bool
2448 {
2449  int r;
2450 
2451  if (querymode == QUERY_SIMPLE)
2452  {
2453  char *sql;
2454 
2455  sql = pg_strdup(command->argv[0]);
2456  sql = assignVariables(st, sql);
2457 
2458  if (debug)
2459  fprintf(stderr, "client %d sending %s\n", st->id, sql);
2460  r = PQsendQuery(st->con, sql);
2461  free(sql);
2462  }
2463  else if (querymode == QUERY_EXTENDED)
2464  {
2465  const char *sql = command->argv[0];
2466  const char *params[MAX_ARGS];
2467 
2468  getQueryParams(st, command, params);
2469 
2470  if (debug)
2471  fprintf(stderr, "client %d sending %s\n", st->id, sql);
2472  r = PQsendQueryParams(st->con, sql, command->argc - 1,
2473  NULL, params, NULL, NULL, 0);
2474  }
2475  else if (querymode == QUERY_PREPARED)
2476  {
2477  char name[MAX_PREPARE_NAME];
2478  const char *params[MAX_ARGS];
2479 
2480  if (!st->prepared[st->use_file])
2481  {
2482  int j;
2483  Command **commands = sql_script[st->use_file].commands;
2484 
2485  for (j = 0; commands[j] != NULL; j++)
2486  {
2487  PGresult *res;
2488  char name[MAX_PREPARE_NAME];
2489 
2490  if (commands[j]->type != SQL_COMMAND)
2491  continue;
2492  preparedStatementName(name, st->use_file, j);
2493  res = PQprepare(st->con, name,
2494  commands[j]->argv[0], commands[j]->argc - 1, NULL);
2495  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2496  fprintf(stderr, "%s", PQerrorMessage(st->con));
2497  PQclear(res);
2498  }
2499  st->prepared[st->use_file] = true;
2500  }
2501 
2502  getQueryParams(st, command, params);
2503  preparedStatementName(name, st->use_file, st->command);
2504 
2505  if (debug)
2506  fprintf(stderr, "client %d sending %s\n", st->id, name);
2507  r = PQsendQueryPrepared(st->con, name, command->argc - 1,
2508  params, NULL, NULL, 0);
2509  }
2510  else /* unknown sql mode */
2511  r = 0;
2512 
2513  if (r == 0)
2514  {
2515  if (debug)
2516  fprintf(stderr, "client %d could not send %s\n",
2517  st->id, command->argv[0]);
2518  st->ecnt++;
2519  return false;
2520  }
2521  else
2522  return true;
2523 }
2524 
2525 /*
2526  * Parse the argument to a \sleep command, and return the requested amount
2527  * of delay, in microseconds. Returns true on success, false on error.
2528  */
2529 static bool
2530 evaluateSleep(CState *st, int argc, char **argv, int *usecs)
2531 {
2532  char *var;
2533  int usec;
2534 
2535  if (*argv[1] == ':')
2536  {
2537  if ((var = getVariable(st, argv[1] + 1)) == NULL)
2538  {
2539  fprintf(stderr, "%s: undefined variable \"%s\"\n",
2540  argv[0], argv[1]);
2541  return false;
2542  }
2543  usec = atoi(var);
2544  }
2545  else
2546  usec = atoi(argv[1]);
2547 
2548  if (argc > 2)
2549  {
2550  if (pg_strcasecmp(argv[2], "ms") == 0)
2551  usec *= 1000;
2552  else if (pg_strcasecmp(argv[2], "s") == 0)
2553  usec *= 1000000;
2554  }
2555  else
2556  usec *= 1000000;
2557 
2558  *usecs = usec;
2559  return true;
2560 }
2561 
2562 /*
2563  * Advance the state machine of a connection, if possible.
2564  */
2565 static void
2566 doCustom(TState *thread, CState *st, StatsData *agg)
2567 {
2568  PGresult *res;
2569  Command *command;
2570  instr_time now;
2571  bool end_tx_processed = false;
2572  int64 wait;
2573 
2574  /*
2575  * gettimeofday() isn't free, so we get the current timestamp lazily the
2576  * first time it's needed, and reuse the same value throughout this
2577  * function after that. This also ensures that e.g. the calculated
2578  * latency reported in the log file and in the totals are the same. Zero
2579  * means "not set yet". Reset "now" when we execute shell commands or
2580  * expressions, which might take a non-negligible amount of time, though.
2581  */
2582  INSTR_TIME_SET_ZERO(now);
2583 
2584  /*
2585  * Loop in the state machine, until we have to wait for a result from the
2586  * server (or have to sleep, for throttling or for \sleep).
2587  *
2588  * Note: In the switch-statement below, 'break' will loop back here,
2589  * meaning "continue in the state machine". Return is used to return to
2590  * the caller.
2591  */
2592  for (;;)
2593  {
2594  switch (st->state)
2595  {
2596  /*
2597  * Select transaction to run.
2598  */
2599  case CSTATE_CHOOSE_SCRIPT:
2600 
2601  st->use_file = chooseScript(thread);
2602 
2603  if (debug)
2604  fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
2605  sql_script[st->use_file].desc);
2606 
2607  if (throttle_delay > 0)
2609  else
2610  st->state = CSTATE_START_TX;
2611  break;
2612 
2613  /*
2614  * Handle throttling once per transaction by sleeping.
2615  */
2616  case CSTATE_START_THROTTLE:
2617 
2618  /*
2619  * Generate a delay such that the series of delays will
2620  * approximate a Poisson distribution centered on the
2621  * throttle_delay time.
2622  *
2623  * If transactions are too slow or a given wait is shorter
2624  * than a transaction, the next transaction will start right
2625  * away.
2626  */
2627  Assert(throttle_delay > 0);
2628  wait = getPoissonRand(thread, throttle_delay);
2629 
2630  thread->throttle_trigger += wait;
2631  st->txn_scheduled = thread->throttle_trigger;
2632 
2633  /*
2634  * stop client if next transaction is beyond pgbench end of
2635  * execution
2636  */
2637  if (duration > 0 && st->txn_scheduled > end_time)
2638  {
2639  st->state = CSTATE_FINISHED;
2640  break;
2641  }
2642 
2643  /*
2644  * If --latency-limit is used, and this slot is already late
2645  * so that the transaction will miss the latency limit even if
2646  * it completed immediately, we skip this time slot and
2647  * iterate till the next slot that isn't late yet. But don't
2648  * iterate beyond the -t limit, if one is given.
2649  */
2650  if (latency_limit)
2651  {
2652  int64 now_us;
2653 
2654  if (INSTR_TIME_IS_ZERO(now))
2656  now_us = INSTR_TIME_GET_MICROSEC(now);
2657  while (thread->throttle_trigger < now_us - latency_limit &&
2658  (nxacts <= 0 || st->cnt < nxacts))
2659  {
2660  processXactStats(thread, st, &now, true, agg);
2661  /* next rendez-vous */
2662  wait = getPoissonRand(thread, throttle_delay);
2663  thread->throttle_trigger += wait;
2664  st->txn_scheduled = thread->throttle_trigger;
2665  }
2666  /* stop client if -t exceeded */
2667  if (nxacts > 0 && st->cnt >= nxacts)
2668  {
2669  st->state = CSTATE_FINISHED;
2670  break;
2671  }
2672  }
2673 
2674  st->state = CSTATE_THROTTLE;
2675  if (debug)
2676  fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
2677  st->id, wait);
2678  break;
2679 
2680  /*
2681  * Wait until it's time to start next transaction.
2682  */
2683  case CSTATE_THROTTLE:
2684  if (INSTR_TIME_IS_ZERO(now))
2686  if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
2687  return; /* Still sleeping, nothing to do here */
2688 
2689  /* Else done sleeping, start the transaction */
2690  st->state = CSTATE_START_TX;
2691  break;
2692 
2693  /* Start new transaction */
2694  case CSTATE_START_TX:
2695 
2696  /*
2697  * Establish connection on first call, or if is_connect is
2698  * true.
2699  */
2700  if (st->con == NULL)
2701  {
2702  instr_time start;
2703 
2704  if (INSTR_TIME_IS_ZERO(now))
2706  start = now;
2707  if ((st->con = doConnect()) == NULL)
2708  {
2709  fprintf(stderr, "client %d aborted while establishing connection\n",
2710  st->id);
2711  st->state = CSTATE_ABORTED;
2712  break;
2713  }
2715  INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
2716 
2717  /* Reset session-local state */
2718  memset(st->prepared, 0, sizeof(st->prepared));
2719  }
2720 
2721  /*
2722  * Record transaction start time under logging, progress or
2723  * throttling.
2724  */
2725  if (use_log || progress || throttle_delay || latency_limit ||
2726  per_script_stats)
2727  {
2728  if (INSTR_TIME_IS_ZERO(now))
2730  st->txn_begin = now;
2731 
2732  /*
2733  * When not throttling, this is also the transaction's
2734  * scheduled start time.
2735  */
2736  if (!throttle_delay)
2738  }
2739 
2740  /* Begin with the first command */
2741  st->command = 0;
2743  break;
2744 
2745  /*
2746  * Send a command to server (or execute a meta-command)
2747  */
2748  case CSTATE_START_COMMAND:
2749  command = sql_script[st->use_file].commands[st->command];
2750 
2751  /*
2752  * If we reached the end of the script, move to end-of-xact
2753  * processing.
2754  */
2755  if (command == NULL)
2756  {
2757  st->state = CSTATE_END_TX;
2758  break;
2759  }
2760 
2761  /*
2762  * Record statement start time if per-command latencies are
2763  * requested
2764  */
2765  if (is_latencies)
2766  {
2767  if (INSTR_TIME_IS_ZERO(now))
2769  st->stmt_begin = now;
2770  }
2771 
2772  if (command->type == SQL_COMMAND)
2773  {
2774  if (!sendCommand(st, command))
2775  {
2776  commandFailed(st, "SQL command send failed");
2777  st->state = CSTATE_ABORTED;
2778  }
2779  else
2780  st->state = CSTATE_WAIT_RESULT;
2781  }
2782  else if (command->type == META_COMMAND)
2783  {
2784  int argc = command->argc,
2785  i;
2786  char **argv = command->argv;
2787 
2788  if (debug)
2789  {
2790  fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
2791  for (i = 1; i < argc; i++)
2792  fprintf(stderr, " %s", argv[i]);
2793  fprintf(stderr, "\n");
2794  }
2795 
2796  if (command->meta == META_SLEEP)
2797  {
2798  /*
2799  * A \sleep doesn't execute anything, we just get the
2800  * delay from the argument, and enter the CSTATE_SLEEP
2801  * state. (The per-command latency will be recorded
2802  * in CSTATE_SLEEP state, not here, after the delay
2803  * has elapsed.)
2804  */
2805  int usec;
2806 
2807  if (!evaluateSleep(st, argc, argv, &usec))
2808  {
2809  commandFailed(st, "execution of meta-command 'sleep' failed");
2810  st->state = CSTATE_ABORTED;
2811  break;
2812  }
2813 
2814  if (INSTR_TIME_IS_ZERO(now))
2816  st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
2817  st->state = CSTATE_SLEEP;
2818  break;
2819  }
2820  else
2821  {
2822  if (command->meta == META_SET)
2823  {
2824  PgBenchExpr *expr = command->expr;
2825  PgBenchValue result;
2826 
2827  if (!evaluateExpr(thread, st, expr, &result))
2828  {
2829  commandFailed(st, "evaluation of meta-command 'set' failed");
2830  st->state = CSTATE_ABORTED;
2831  break;
2832  }
2833 
2834  if (!putVariableValue(st, argv[0], argv[1], &result))
2835  {
2836  commandFailed(st, "assignment of meta-command 'set' failed");
2837  st->state = CSTATE_ABORTED;
2838  break;
2839  }
2840  }
2841  else if (command->meta == META_SETSHELL)
2842  {
2843  bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
2844 
2845  if (timer_exceeded) /* timeout */
2846  {
2847  st->state = CSTATE_FINISHED;
2848  break;
2849  }
2850  else if (!ret) /* on error */
2851  {
2852  commandFailed(st, "execution of meta-command 'setshell' failed");
2853  st->state = CSTATE_ABORTED;
2854  break;
2855  }
2856  else
2857  {
2858  /* succeeded */
2859  }
2860  }
2861  else if (command->meta == META_SHELL)
2862  {
2863  bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
2864 
2865  if (timer_exceeded) /* timeout */
2866  {
2867  st->state = CSTATE_FINISHED;
2868  break;
2869  }
2870  else if (!ret) /* on error */
2871  {
2872  commandFailed(st, "execution of meta-command 'shell' failed");
2873  st->state = CSTATE_ABORTED;
2874  break;
2875  }
2876  else
2877  {
2878  /* succeeded */
2879  }
2880  }
2881 
2882  /*
2883  * executing the expression or shell command might
2884  * take a non-negligible amount of time, so reset
2885  * 'now'
2886  */
2887  INSTR_TIME_SET_ZERO(now);
2888 
2889  st->state = CSTATE_END_COMMAND;
2890  }
2891  }
2892  break;
2893 
2894  /*
2895  * Wait for the current SQL command to complete
2896  */
2897  case CSTATE_WAIT_RESULT:
2898  command = sql_script[st->use_file].commands[st->command];
2899  if (debug)
2900  fprintf(stderr, "client %d receiving\n", st->id);
2901  if (!PQconsumeInput(st->con))
2902  { /* there's something wrong */
2903  commandFailed(st, "perhaps the backend died while processing");
2904  st->state = CSTATE_ABORTED;
2905  break;
2906  }
2907  if (PQisBusy(st->con))
2908  return; /* don't have the whole result yet */
2909 
2910  /*
2911  * Read and discard the query result;
2912  */
2913  res = PQgetResult(st->con);
2914  switch (PQresultStatus(res))
2915  {
2916  case PGRES_COMMAND_OK:
2917  case PGRES_TUPLES_OK:
2918  case PGRES_EMPTY_QUERY:
2919  /* OK */
2920  PQclear(res);
2921  discard_response(st);
2922  st->state = CSTATE_END_COMMAND;
2923  break;
2924  default:
2925  commandFailed(st, PQerrorMessage(st->con));
2926  PQclear(res);
2927  st->state = CSTATE_ABORTED;
2928  break;
2929  }
2930  break;
2931 
2932  /*
2933  * Wait until sleep is done. This state is entered after a
2934  * \sleep metacommand. The behavior is similar to
2935  * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
2936  * instead of CSTATE_START_TX.
2937  */
2938  case CSTATE_SLEEP:
2939  if (INSTR_TIME_IS_ZERO(now))
2941  if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
2942  return; /* Still sleeping, nothing to do here */
2943  /* Else done sleeping. */
2944  st->state = CSTATE_END_COMMAND;
2945  break;
2946 
2947  /*
2948  * End of command: record stats and proceed to next command.
2949  */
2950  case CSTATE_END_COMMAND:
2951 
2952  /*
2953  * command completed: accumulate per-command execution times
2954  * in thread-local data structure, if per-command latencies
2955  * are requested.
2956  */
2957  if (is_latencies)
2958  {
2959  if (INSTR_TIME_IS_ZERO(now))
2961 
2962  /* XXX could use a mutex here, but we choose not to */
2963  command = sql_script[st->use_file].commands[st->command];
2964  addToSimpleStats(&command->stats,
2965  INSTR_TIME_GET_DOUBLE(now) -
2967  }
2968 
2969  /* Go ahead with next command */
2970  st->command++;
2972  break;
2973 
2974  /*
2975  * End of transaction.
2976  */
2977  case CSTATE_END_TX:
2978 
2979  /* transaction finished: calculate latency and do log */
2980  processXactStats(thread, st, &now, false, agg);
2981 
2982  if (is_connect)
2983  {
2984  PQfinish(st->con);
2985  st->con = NULL;
2986  INSTR_TIME_SET_ZERO(now);
2987  }
2988 
2989  if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
2990  {
2991  /* exit success */
2992  st->state = CSTATE_FINISHED;
2993  break;
2994  }
2995 
2996  /*
2997  * No transaction is underway anymore.
2998  */
3000 
3001  /*
3002  * If we paced through all commands in the script in this
3003  * loop, without returning to the caller even once, do it now.
3004  * This gives the thread a chance to process other
3005  * connections, and to do progress reporting. This can
3006  * currently only happen if the script consists entirely of
3007  * meta-commands.
3008  */
3009  if (end_tx_processed)
3010  return;
3011  else
3012  {
3013  end_tx_processed = true;
3014  break;
3015  }
3016 
3017  /*
3018  * Final states. Close the connection if it's still open.
3019  */
3020  case CSTATE_ABORTED:
3021  case CSTATE_FINISHED:
3022  if (st->con != NULL)
3023  {
3024  PQfinish(st->con);
3025  st->con = NULL;
3026  }
3027  return;
3028  }
3029  }
3030 }
3031 
3032 /*
3033  * Print log entry after completing one transaction.
3034  *
3035  * We print Unix-epoch timestamps in the log, so that entries can be
3036  * correlated against other logs. On some platforms this could be obtained
3037  * from the instr_time reading the caller has, but rather than get entangled
3038  * with that, we just eat the cost of an extra syscall in all cases.
3039  */
3040 static void
3041 doLog(TState *thread, CState *st,
3042  StatsData *agg, bool skipped, double latency, double lag)
3043 {
3044  FILE *logfile = thread->logfile;
3045 
3046  Assert(use_log);
3047 
3048  /*
3049  * Skip the log entry if sampling is enabled and this row doesn't belong
3050  * to the random sample.
3051  */
3052  if (sample_rate != 0.0 &&
3053  pg_erand48(thread->random_state) > sample_rate)
3054  return;
3055 
3056  /* should we aggregate the results or not? */
3057  if (agg_interval > 0)
3058  {
3059  /*
3060  * Loop until we reach the interval of the current moment, and print
3061  * any empty intervals in between (this may happen with very low tps,
3062  * e.g. --rate=0.1).
3063  */
3064  time_t now = time(NULL);
3065 
3066  while (agg->start_time + agg_interval <= now)
3067  {
3068  /* print aggregated report to logfile */
3069  fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
3070  (long) agg->start_time,
3071  agg->cnt,
3072  agg->latency.sum,
3073  agg->latency.sum2,
3074  agg->latency.min,
3075  agg->latency.max);
3076  if (throttle_delay)
3077  {
3078  fprintf(logfile, " %.0f %.0f %.0f %.0f",
3079  agg->lag.sum,
3080  agg->lag.sum2,
3081  agg->lag.min,
3082  agg->lag.max);
3083  if (latency_limit)
3084  fprintf(logfile, " " INT64_FORMAT, agg->skipped);
3085  }
3086  fputc('\n', logfile);
3087 
3088  /* reset data and move to next interval */
3089  initStats(agg, agg->start_time + agg_interval);
3090  }
3091 
3092  /* accumulate the current transaction */
3093  accumStats(agg, skipped, latency, lag);
3094  }
3095  else
3096  {
3097  /* no, print raw transactions */
3098  struct timeval tv;
3099 
3100  gettimeofday(&tv, NULL);
3101  if (skipped)
3102  fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
3103  st->id, st->cnt, st->use_file,
3104  (long) tv.tv_sec, (long) tv.tv_usec);
3105  else
3106  fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
3107  st->id, st->cnt, latency, st->use_file,
3108  (long) tv.tv_sec, (long) tv.tv_usec);
3109  if (throttle_delay)
3110  fprintf(logfile, " %.0f", lag);
3111  fputc('\n', logfile);
3112  }
3113 }
3114 
3115 /*
3116  * Accumulate and report statistics at end of a transaction.
3117  *
3118  * (This is also called when a transaction is late and thus skipped.
3119  * Note that even skipped transactions are counted in the "cnt" fields.)
3120  */
3121 static void
3123  bool skipped, StatsData *agg)
3124 {
3125  double latency = 0.0,
3126  lag = 0.0;
3127  bool thread_details = progress || throttle_delay || latency_limit,
3128  detailed = thread_details || use_log || per_script_stats;
3129 
3130  if (detailed && !skipped)
3131  {
3132  if (INSTR_TIME_IS_ZERO(*now))
3133  INSTR_TIME_SET_CURRENT(*now);
3134 
3135  /* compute latency & lag */
3136  latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
3138  }
3139 
3140  if (thread_details)
3141  {
3142  /* keep detailed thread stats */
3143  accumStats(&thread->stats, skipped, latency, lag);
3144 
3145  /* count transactions over the latency limit, if needed */
3146  if (latency_limit && latency > latency_limit)
3147  thread->latency_late++;
3148  }
3149  else
3150  {
3151  /* no detailed stats, just count */
3152  thread->stats.cnt++;
3153  }
3154 
3155  /* client stat is just counting */
3156  st->cnt++;
3157 
3158  if (use_log)
3159  doLog(thread, st, agg, skipped, latency, lag);
3160 
3161  /* XXX could use a mutex here, but we choose not to */
3162  if (per_script_stats)
3163  accumStats(&sql_script[st->use_file].stats, skipped, latency, lag);
3164 }
3165 
3166 
3167 /* discard connections */
3168 static void
3170 {
3171  int i;
3172 
3173  for (i = 0; i < length; i++)
3174  {
3175  if (state[i].con)
3176  {
3177  PQfinish(state[i].con);
3178  state[i].con = NULL;
3179  }
3180  }
3181 }
3182 
3183 /*
3184  * Remove old pgbench tables, if any exist
3185  */
3186 static void
3188 {
3189  fprintf(stderr, "dropping old tables...\n");
3190 
3191  /*
3192  * We drop all the tables in one command, so that whether there are
3193  * foreign key dependencies or not doesn't matter.
3194  */
3195  executeStatement(con, "drop table if exists "
3196  "pgbench_accounts, "
3197  "pgbench_branches, "
3198  "pgbench_history, "
3199  "pgbench_tellers");
3200 }
3201 
3202 /*
3203  * Create pgbench's standard tables
3204  */
3205 static void
3207 {
3208  /*
3209  * The scale factor at/beyond which 32-bit integers are insufficient for
3210  * storing TPC-B account IDs.
3211  *
3212  * Although the actual threshold is 21474, we use 20000 because it is
3213  * easier to document and remember, and isn't that far away from the real
3214  * threshold.
3215  */
3216 #define SCALE_32BIT_THRESHOLD 20000
3217 
3218  /*
3219  * Note: TPC-B requires at least 100 bytes per row, and the "filler"
3220  * fields in these table declarations were intended to comply with that.
3221  * The pgbench_accounts table complies with that because the "filler"
3222  * column is set to blank-padded empty string. But for all other tables
3223  * the columns default to NULL and so don't actually take any space. We
3224  * could fix that by giving them non-null default values. However, that
3225  * would completely break comparability of pgbench results with prior
3226  * versions. Since pgbench has never pretended to be fully TPC-B compliant
3227  * anyway, we stick with the historical behavior.
3228  */
3229  struct ddlinfo
3230  {
3231  const char *table; /* table name */
3232  const char *smcols; /* column decls if accountIDs are 32 bits */
3233  const char *bigcols; /* column decls if accountIDs are 64 bits */
3234  int declare_fillfactor;
3235  };
3236  static const struct ddlinfo DDLs[] = {
3237  {
3238  "pgbench_history",
3239  "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
3240  "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
3241  0
3242  },
3243  {
3244  "pgbench_tellers",
3245  "tid int not null,bid int,tbalance int,filler char(84)",
3246  "tid int not null,bid int,tbalance int,filler char(84)",
3247  1
3248  },
3249  {
3250  "pgbench_accounts",
3251  "aid int not null,bid int,abalance int,filler char(84)",
3252  "aid bigint not null,bid int,abalance int,filler char(84)",
3253  1
3254  },
3255  {
3256  "pgbench_branches",
3257  "bid int not null,bbalance int,filler char(88)",
3258  "bid int not null,bbalance int,filler char(88)",
3259  1
3260  }
3261  };
3262  int i;
3263 
3264  fprintf(stderr, "creating tables...\n");
3265 
3266  for (i = 0; i < lengthof(DDLs); i++)
3267  {
3268  char opts[256];
3269  char buffer[256];
3270  const struct ddlinfo *ddl = &DDLs[i];
3271  const char *cols;
3272 
3273  /* Construct new create table statement. */
3274  opts[0] = '\0';
3275  if (ddl->declare_fillfactor)
3276  snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
3277  " with (fillfactor=%d)", fillfactor);
3278  if (tablespace != NULL)
3279  {
3280  char *escape_tablespace;
3281 
3282  escape_tablespace = PQescapeIdentifier(con, tablespace,
3283  strlen(tablespace));
3284  snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
3285  " tablespace %s", escape_tablespace);
3286  PQfreemem(escape_tablespace);
3287  }
3288 
3289  cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
3290 
3291  snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
3292  unlogged_tables ? " unlogged" : "",
3293  ddl->table, cols, opts);
3294 
3295  executeStatement(con, buffer);
3296  }
3297 }
3298 
3299 /*
3300  * Fill the standard tables with some data
3301  */
3302 static void
3304 {
3305  char sql[256];
3306  PGresult *res;
3307  int i;
3308  int64 k;
3309 
3310  /* used to track elapsed time and estimate of the remaining time */
3311  instr_time start,
3312  diff;
3313  double elapsed_sec,
3314  remaining_sec;
3315  int log_interval = 1;
3316 
3317  fprintf(stderr, "generating data...\n");
3318 
3319  /*
3320  * we do all of this in one transaction to enable the backend's
3321  * data-loading optimizations
3322  */
3323  executeStatement(con, "begin");
3324 
3325  /*
3326  * truncate away any old data, in one command in case there are foreign
3327  * keys
3328  */
3329  executeStatement(con, "truncate table "
3330  "pgbench_accounts, "
3331  "pgbench_branches, "
3332  "pgbench_history, "
3333  "pgbench_tellers");
3334 
3335  /*
3336  * fill branches, tellers, accounts in that order in case foreign keys
3337  * already exist
3338  */
3339  for (i = 0; i < nbranches * scale; i++)
3340  {
3341  /* "filler" column defaults to NULL */
3342  snprintf(sql, sizeof(sql),
3343  "insert into pgbench_branches(bid,bbalance) values(%d,0)",
3344  i + 1);
3345  executeStatement(con, sql);
3346  }
3347 
3348  for (i = 0; i < ntellers * scale; i++)
3349  {
3350  /* "filler" column defaults to NULL */
3351  snprintf(sql, sizeof(sql),
3352  "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
3353  i + 1, i / ntellers + 1);
3354  executeStatement(con, sql);
3355  }
3356 
3357  /*
3358  * accounts is big enough to be worth using COPY and tracking runtime
3359  */
3360  res = PQexec(con, "copy pgbench_accounts from stdin");
3361  if (PQresultStatus(res) != PGRES_COPY_IN)
3362  {
3363  fprintf(stderr, "%s", PQerrorMessage(con));
3364  exit(1);
3365  }
3366  PQclear(res);
3367 
3368  INSTR_TIME_SET_CURRENT(start);
3369 
3370  for (k = 0; k < (int64) naccounts * scale; k++)
3371  {
3372  int64 j = k + 1;
3373 
3374  /* "filler" column defaults to blank padded empty string */
3375  snprintf(sql, sizeof(sql),
3376  INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
3377  j, k / naccounts + 1, 0);
3378  if (PQputline(con, sql))
3379  {
3380  fprintf(stderr, "PQputline failed\n");
3381  exit(1);
3382  }
3383 
3384  /*
3385  * If we want to stick with the original logging, print a message each
3386  * 100k inserted rows.
3387  */
3388  if ((!use_quiet) && (j % 100000 == 0))
3389  {
3390  INSTR_TIME_SET_CURRENT(diff);
3391  INSTR_TIME_SUBTRACT(diff, start);
3392 
3393  elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
3394  remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
3395 
3396  fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
3397  j, (int64) naccounts * scale,
3398  (int) (((int64) j * 100) / (naccounts * (int64) scale)),
3399  elapsed_sec, remaining_sec);
3400  }
3401  /* let's not call the timing for each row, but only each 100 rows */
3402  else if (use_quiet && (j % 100 == 0))
3403  {
3404  INSTR_TIME_SET_CURRENT(diff);
3405  INSTR_TIME_SUBTRACT(diff, start);
3406 
3407  elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
3408  remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
3409 
3410  /* have we reached the next interval (or end)? */
3411  if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
3412  {
3413  fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
3414  j, (int64) naccounts * scale,
3415  (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
3416 
3417  /* skip to the next interval */
3418  log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
3419  }
3420  }
3421 
3422  }
3423  if (PQputline(con, "\\.\n"))
3424  {
3425  fprintf(stderr, "very last PQputline failed\n");
3426  exit(1);
3427  }
3428  if (PQendcopy(con))
3429  {
3430  fprintf(stderr, "PQendcopy failed\n");
3431  exit(1);
3432  }
3433 
3434  executeStatement(con, "commit");
3435 }
3436 
3437 /*
3438  * Invoke vacuum on the standard tables
3439  */
3440 static void
3442 {
3443  fprintf(stderr, "vacuuming...\n");
3444  executeStatement(con, "vacuum analyze pgbench_branches");
3445  executeStatement(con, "vacuum analyze pgbench_tellers");
3446  executeStatement(con, "vacuum analyze pgbench_accounts");
3447  executeStatement(con, "vacuum analyze pgbench_history");
3448 }
3449 
3450 /*
3451  * Create primary keys on the standard tables
3452  */
3453 static void
3455 {
3456  static const char *const DDLINDEXes[] = {
3457  "alter table pgbench_branches add primary key (bid)",
3458  "alter table pgbench_tellers add primary key (tid)",
3459  "alter table pgbench_accounts add primary key (aid)"
3460  };
3461  int i;
3462 
3463  fprintf(stderr, "creating primary keys...\n");
3464  for (i = 0; i < lengthof(DDLINDEXes); i++)
3465  {
3466  char buffer[256];
3467 
3468  strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
3469 
3470  if (index_tablespace != NULL)
3471  {
3472  char *escape_tablespace;
3473 
3474  escape_tablespace = PQescapeIdentifier(con, index_tablespace,
3475  strlen(index_tablespace));
3476  snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
3477  " using index tablespace %s", escape_tablespace);
3478  PQfreemem(escape_tablespace);
3479  }
3480 
3481  executeStatement(con, buffer);
3482  }
3483 }
3484 
3485 /*
3486  * Create foreign key constraints between the standard tables
3487  */
3488 static void
3490 {
3491  static const char *const DDLKEYs[] = {
3492  "alter table pgbench_tellers add constraint pgbench_tellers_bid_fkey foreign key (bid) references pgbench_branches",
3493  "alter table pgbench_accounts add constraint pgbench_accounts_bid_fkey foreign key (bid) references pgbench_branches",
3494  "alter table pgbench_history add constraint pgbench_history_bid_fkey foreign key (bid) references pgbench_branches",
3495  "alter table pgbench_history add constraint pgbench_history_tid_fkey foreign key (tid) references pgbench_tellers",
3496  "alter table pgbench_history add constraint pgbench_history_aid_fkey foreign key (aid) references pgbench_accounts"
3497  };
3498  int i;
3499 
3500  fprintf(stderr, "creating foreign keys...\n");
3501  for (i = 0; i < lengthof(DDLKEYs); i++)
3502  {
3503  executeStatement(con, DDLKEYs[i]);
3504  }
3505 }
3506 
3507 /*
3508  * Validate an initialization-steps string
3509  *
3510  * (We could just leave it to runInitSteps() to fail if there are wrong
3511  * characters, but since initialization can take awhile, it seems friendlier
3512  * to check during option parsing.)
3513  */
3514 static void
3515 checkInitSteps(const char *initialize_steps)
3516 {
3517  const char *step;
3518 
3519  if (initialize_steps[0] == '\0')
3520  {
3521  fprintf(stderr, "no initialization steps specified\n");
3522  exit(1);
3523  }
3524 
3525  for (step = initialize_steps; *step != '\0'; step++)
3526  {
3527  if (strchr("dtgvpf ", *step) == NULL)
3528  {
3529  fprintf(stderr, "unrecognized initialization step \"%c\"\n",
3530  *step);
3531  fprintf(stderr, "allowed steps are: \"d\", \"t\", \"g\", \"v\", \"p\", \"f\"\n");
3532  exit(1);
3533  }
3534  }
3535 }
3536 
3537 /*
3538  * Invoke each initialization step in the given string
3539  */
3540 static void
3541 runInitSteps(const char *initialize_steps)
3542 {
3543  PGconn *con;
3544  const char *step;
3545 
3546  if ((con = doConnect()) == NULL)
3547  exit(1);
3548 
3549  for (step = initialize_steps; *step != '\0'; step++)
3550  {
3551  switch (*step)
3552  {
3553  case 'd':
3554  initDropTables(con);
3555  break;
3556  case 't':
3557  initCreateTables(con);
3558  break;
3559  case 'g':
3560  initGenerateData(con);
3561  break;
3562  case 'v':
3563  initVacuum(con);
3564  break;
3565  case 'p':
3566  initCreatePKeys(con);
3567  break;
3568  case 'f':
3569  initCreateFKeys(con);
3570  break;
3571  case ' ':
3572  break; /* ignore */
3573  default:
3574  fprintf(stderr, "unrecognized initialization step \"%c\"\n",
3575  *step);
3576  PQfinish(con);
3577  exit(1);
3578  }
3579  }
3580 
3581  fprintf(stderr, "done.\n");
3582  PQfinish(con);
3583 }
3584 
3585 /*
3586  * Replace :param with $n throughout the command's SQL text, which
3587  * is a modifiable string in cmd->argv[0].
3588  */
3589 static bool
3591 {
3592  char *sql,
3593  *p;
3594 
3595  /* We don't want to scribble on cmd->argv[0] until done */
3596  sql = pg_strdup(cmd->argv[0]);
3597 
3598  cmd->argc = 1;
3599 
3600  p = sql;
3601  while ((p = strchr(p, ':')) != NULL)
3602  {
3603  char var[12];
3604  char *name;
3605  int eaten;
3606 
3607  name = parseVariable(p, &eaten);
3608  if (name == NULL)
3609  {
3610  while (*p == ':')
3611  {
3612  p++;
3613  }
3614  continue;
3615  }
3616 
3617  if (cmd->argc >= MAX_ARGS)
3618  {
3619  fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n",
3620  MAX_ARGS - 1, cmd->argv[0]);
3621  pg_free(name);
3622  return false;
3623  }
3624 
3625  sprintf(var, "$%d", cmd->argc);
3626  p = replaceVariable(&sql, p, eaten, var);
3627 
3628  cmd->argv[cmd->argc] = name;
3629  cmd->argc++;
3630  }
3631 
3632  pg_free(cmd->argv[0]);
3633  cmd->argv[0] = sql;
3634  return true;
3635 }
3636 
3637 /*
3638  * Simple error-printing function, might be needed by lexer
3639  */
3640 static void
3641 pgbench_error(const char *fmt,...)
3642 {
3643  va_list ap;
3644 
3645  fflush(stdout);
3646  va_start(ap, fmt);
3647  vfprintf(stderr, _(fmt), ap);
3648  va_end(ap);
3649 }
3650 
3651 /*
3652  * syntax error while parsing a script (in practice, while parsing a
3653  * backslash command, because we don't detect syntax errors in SQL)
3654  *
3655  * source: source of script (filename or builtin-script ID)
3656  * lineno: line number within script (count from 1)
3657  * line: whole line of backslash command, if available
3658  * command: backslash command name, if available
3659  * msg: the actual error message
3660  * more: optional extra message
3661  * column: zero-based column number, or -1 if unknown
3662  */
3663 void
3664 syntax_error(const char *source, int lineno,
3665  const char *line, const char *command,
3666  const char *msg, const char *more, int column)
3667 {
3668  fprintf(stderr, "%s:%d: %s", source, lineno, msg);
3669  if (more != NULL)
3670  fprintf(stderr, " (%s)", more);
3671  if (column >= 0 && line == NULL)
3672  fprintf(stderr, " at column %d", column + 1);
3673  if (command != NULL)
3674  fprintf(stderr, " in command \"%s\"", command);
3675  fprintf(stderr, "\n");
3676  if (line != NULL)
3677  {
3678  fprintf(stderr, "%s\n", line);
3679  if (column >= 0)
3680  {
3681  int i;
3682 
3683  for (i = 0; i < column; i++)
3684  fprintf(stderr, " ");
3685  fprintf(stderr, "^ error found here\n");
3686  }
3687  }
3688  exit(1);
3689 }
3690 
3691 /*
3692  * Parse a SQL command; return a Command struct, or NULL if it's a comment
3693  *
3694  * On entry, psqlscan.l has collected the command into "buf", so we don't
3695  * really need to do much here except check for comment and set up a
3696  * Command struct.
3697  */
3698 static Command *
3700 {
3701  Command *my_command;
3702  char *p;
3703  char *nlpos;
3704 
3705  /* Skip any leading whitespace, as well as "--" style comments */
3706  p = buf->data;
3707  for (;;)
3708  {
3709  if (isspace((unsigned char) *p))
3710  p++;
3711  else if (strncmp(p, "--", 2) == 0)
3712  {
3713  p = strchr(p, '\n');
3714  if (p == NULL)
3715  return NULL;
3716  p++;
3717  }
3718  else
3719  break;
3720  }
3721 
3722  /* If there's nothing but whitespace and comments, we're done */
3723  if (*p == '\0')
3724  return NULL;
3725 
3726  /* Allocate and initialize Command structure */
3727  my_command = (Command *) pg_malloc0(sizeof(Command));
3728  my_command->command_num = num_commands++;
3729  my_command->type = SQL_COMMAND;
3730  my_command->meta = META_NONE;
3731  initSimpleStats(&my_command->stats);
3732 
3733  /*
3734  * Install query text as the sole argv string. If we are using a
3735  * non-simple query mode, we'll extract parameters from it later.
3736  */
3737  my_command->argv[0] = pg_strdup(p);
3738  my_command->argc = 1;
3739 
3740  /*
3741  * If SQL command is multi-line, we only want to save the first line as
3742  * the "line" label.
3743  */
3744  nlpos = strchr(p, '\n');
3745  if (nlpos)
3746  {
3747  my_command->line = pg_malloc(nlpos - p + 1);
3748  memcpy(my_command->line, p, nlpos - p);
3749  my_command->line[nlpos - p] = '\0';
3750  }
3751  else
3752  my_command->line = pg_strdup(p);
3753 
3754  return my_command;
3755 }
3756 
3757 /*
3758  * Parse a backslash command; return a Command struct, or NULL if comment
3759  *
3760  * At call, we have scanned only the initial backslash.
3761  */
3762 static Command *
3763 process_backslash_command(PsqlScanState sstate, const char *source)
3764 {
3765  Command *my_command;
3766  PQExpBufferData word_buf;
3767  int word_offset;
3768  int offsets[MAX_ARGS]; /* offsets of argument words */
3769  int start_offset;
3770  int lineno;
3771  int j;
3772 
3773  initPQExpBuffer(&word_buf);
3774 
3775  /* Remember location of the backslash */
3776  start_offset = expr_scanner_offset(sstate) - 1;
3777  lineno = expr_scanner_get_lineno(sstate, start_offset);
3778 
3779  /* Collect first word of command */
3780  if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
3781  {
3782  termPQExpBuffer(&word_buf);
3783  return NULL;
3784  }
3785 
3786  /* Allocate and initialize Command structure */
3787  my_command = (Command *) pg_malloc0(sizeof(Command));
3788  my_command->command_num = num_commands++;
3789  my_command->type = META_COMMAND;
3790  my_command->argc = 0;
3791  initSimpleStats(&my_command->stats);
3792 
3793  /* Save first word (command name) */
3794  j = 0;
3795  offsets[j] = word_offset;
3796  my_command->argv[j++] = pg_strdup(word_buf.data);
3797  my_command->argc++;
3798 
3799  /* ... and convert it to enum form */
3800  my_command->meta = getMetaCommand(my_command->argv[0]);
3801 
3802  if (my_command->meta == META_SET)
3803  {
3804  /* For \set, collect var name, then lex the expression. */
3806 
3807  if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
3808  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3809  "missing argument", NULL, -1);
3810 
3811  offsets[j] = word_offset;
3812  my_command->argv[j++] = pg_strdup(word_buf.data);
3813  my_command->argc++;
3814 
3815  yyscanner = expr_scanner_init(sstate, source, lineno, start_offset,
3816  my_command->argv[0]);
3817 
3818  if (expr_yyparse(yyscanner) != 0)
3819  {
3820  /* dead code: exit done from syntax_error called by yyerror */
3821  exit(1);
3822  }
3823 
3824  my_command->expr = expr_parse_result;
3825 
3826  /* Save line, trimming any trailing newline */
3827  my_command->line = expr_scanner_get_substring(sstate,
3828  start_offset,
3829  expr_scanner_offset(sstate),
3830  true);
3831 
3832  expr_scanner_finish(yyscanner);
3833 
3834  termPQExpBuffer(&word_buf);
3835 
3836  return my_command;
3837  }
3838 
3839  /* For all other commands, collect remaining words. */
3840  while (expr_lex_one_word(sstate, &word_buf, &word_offset))
3841  {
3842  if (j >= MAX_ARGS)
3843  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3844  "too many arguments", NULL, -1);
3845 
3846  offsets[j] = word_offset;
3847  my_command->argv[j++] = pg_strdup(word_buf.data);
3848  my_command->argc++;
3849  }
3850 
3851  /* Save line, trimming any trailing newline */
3852  my_command->line = expr_scanner_get_substring(sstate,
3853  start_offset,
3854  expr_scanner_offset(sstate),
3855  true);
3856 
3857  if (my_command->meta == META_SLEEP)
3858  {
3859  if (my_command->argc < 2)
3860  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3861  "missing argument", NULL, -1);
3862 
3863  if (my_command->argc > 3)
3864  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3865  "too many arguments", NULL,
3866  offsets[3] - start_offset);
3867 
3868  /*
3869  * Split argument into number and unit to allow "sleep 1ms" etc. We
3870  * don't have to terminate the number argument with null because it
3871  * will be parsed with atoi, which ignores trailing non-digit
3872  * characters.
3873  */
3874  if (my_command->argc == 2 && my_command->argv[1][0] != ':')
3875  {
3876  char *c = my_command->argv[1];
3877 
3878  while (isdigit((unsigned char) *c))
3879  c++;
3880  if (*c)
3881  {
3882  my_command->argv[2] = c;
3883  offsets[2] = offsets[1] + (c - my_command->argv[1]);
3884  my_command->argc = 3;
3885  }
3886  }
3887 
3888  if (my_command->argc == 3)
3889  {
3890  if (pg_strcasecmp(my_command->argv[2], "us") != 0 &&
3891  pg_strcasecmp(my_command->argv[2], "ms") != 0 &&
3892  pg_strcasecmp(my_command->argv[2], "s") != 0)
3893  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3894  "unrecognized time unit, must be us, ms or s",
3895  my_command->argv[2], offsets[2] - start_offset);
3896  }
3897  }
3898  else if (my_command->meta == META_SETSHELL)
3899  {
3900  if (my_command->argc < 3)
3901  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3902  "missing argument", NULL, -1);
3903  }
3904  else if (my_command->meta == META_SHELL)
3905  {
3906  if (my_command->argc < 2)
3907  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3908  "missing command", NULL, -1);
3909  }
3910  else
3911  {
3912  /* my_command->meta == META_NONE */
3913  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3914  "invalid command", NULL, -1);
3915  }
3916 
3917  termPQExpBuffer(&word_buf);
3918 
3919  return my_command;
3920 }
3921 
3922 /*
3923  * Parse a script (either the contents of a file, or a built-in script)
3924  * and add it to the list of scripts.
3925  */
3926 static void
3927 ParseScript(const char *script, const char *desc, int weight)
3928 {
3929  ParsedScript ps;
3930  PsqlScanState sstate;
3931  PQExpBufferData line_buf;
3932  int alloc_num;
3933  int index;
3934 
3935 #define COMMANDS_ALLOC_NUM 128
3936  alloc_num = COMMANDS_ALLOC_NUM;
3937 
3938  /* Initialize all fields of ps */
3939  ps.desc = desc;
3940  ps.weight = weight;
3941  ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
3942  initStats(&ps.stats, 0);
3943 
3944  /* Prepare to parse script */
3945  sstate = psql_scan_create(&pgbench_callbacks);
3946 
3947  /*
3948  * Ideally, we'd scan scripts using the encoding and stdstrings settings
3949  * we get from a DB connection. However, without major rearrangement of
3950  * pgbench's argument parsing, we can't have a DB connection at the time
3951  * we parse scripts. Using SQL_ASCII (encoding 0) should work well enough
3952  * with any backend-safe encoding, though conceivably we could be fooled
3953  * if a script file uses a client-only encoding. We also assume that
3954  * stdstrings should be true, which is a bit riskier.
3955  */
3956  psql_scan_setup(sstate, script, strlen(script), 0, true);
3957 
3958  initPQExpBuffer(&line_buf);
3959 
3960  index = 0;
3961 
3962  for (;;)
3963  {
3964  PsqlScanResult sr;
3965  promptStatus_t prompt;
3966  Command *command;
3967 
3968  resetPQExpBuffer(&line_buf);
3969 
3970  sr = psql_scan(sstate, &line_buf, &prompt);
3971 
3972  /* If we collected a SQL command, process that */
3973  command = process_sql_command(&line_buf, desc);
3974  if (command)
3975  {
3976  ps.commands[index] = command;
3977  index++;
3978 
3979  if (index >= alloc_num)
3980  {
3981  alloc_num += COMMANDS_ALLOC_NUM;
3982  ps.commands = (Command **)
3983  pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
3984  }
3985  }
3986 
3987  /* If we reached a backslash, process that */
3988  if (sr == PSCAN_BACKSLASH)
3989  {
3990  command = process_backslash_command(sstate, desc);
3991  if (command)
3992  {
3993  ps.commands[index] = command;
3994  index++;
3995 
3996  if (index >= alloc_num)
3997  {
3998  alloc_num += COMMANDS_ALLOC_NUM;
3999  ps.commands = (Command **)
4000  pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
4001  }
4002  }
4003  }
4004 
4005  /* Done if we reached EOF */
4006  if (sr == PSCAN_INCOMPLETE || sr == PSCAN_EOL)
4007  break;
4008  }
4009 
4010  ps.commands[index] = NULL;
4011 
4012  addScript(ps);
4013 
4014  termPQExpBuffer(&line_buf);
4015  psql_scan_finish(sstate);
4016  psql_scan_destroy(sstate);
4017 }
4018 
4019 /*
4020  * Read the entire contents of file fd, and return it in a malloc'd buffer.
4021  *
4022  * The buffer will typically be larger than necessary, but we don't care
4023  * in this program, because we'll free it as soon as we've parsed the script.
4024  */
4025 static char *
4027 {
4028  char *buf;
4029  size_t buflen = BUFSIZ;
4030  size_t used = 0;
4031 
4032  buf = (char *) pg_malloc(buflen);
4033 
4034  for (;;)
4035  {
4036  size_t nread;
4037 
4038  nread = fread(buf + used, 1, BUFSIZ, fd);
4039  used += nread;
4040  /* If fread() read less than requested, must be EOF or error */
4041  if (nread < BUFSIZ)
4042  break;
4043  /* Enlarge buf so we can read some more */
4044  buflen += BUFSIZ;
4045  buf = (char *) pg_realloc(buf, buflen);
4046  }
4047  /* There is surely room for a terminator */
4048  buf[used] = '\0';
4049 
4050  return buf;
4051 }
4052 
4053 /*
4054  * Given a file name, read it and add its script to the list.
4055  * "-" means to read stdin.
4056  * NB: filename must be storage that won't disappear.
4057  */
4058 static void
4059 process_file(const char *filename, int weight)
4060 {
4061  FILE *fd;
4062  char *buf;
4063 
4064  /* Slurp the file contents into "buf" */
4065  if (strcmp(filename, "-") == 0)
4066  fd = stdin;
4067  else if ((fd = fopen(filename, "r")) == NULL)
4068  {
4069  fprintf(stderr, "could not open file \"%s\": %s\n",
4070  filename, strerror(errno));
4071  exit(1);
4072  }
4073 
4074  buf = read_file_contents(fd);
4075 
4076  if (ferror(fd))
4077  {
4078  fprintf(stderr, "could not read file \"%s\": %s\n",
4079  filename, strerror(errno));
4080  exit(1);
4081  }
4082 
4083  if (fd != stdin)
4084  fclose(fd);
4085 
4086  ParseScript(buf, filename, weight);
4087 
4088  free(buf);
4089 }
4090 
4091 /* Parse the given builtin script and add it to the list. */
4092 static void
4093 process_builtin(const BuiltinScript *bi, int weight)
4094 {
4095  ParseScript(bi->script, bi->desc, weight);
4096 }
4097 
4098 /* show available builtin scripts */
4099 static void
4101 {
4102  int i;
4103 
4104  fprintf(stderr, "Available builtin scripts:\n");
4105  for (i = 0; i < lengthof(builtin_script); i++)
4106  fprintf(stderr, "\t%s\n", builtin_script[i].name);
4107  fprintf(stderr, "\n");
4108 }
4109 
4110 /* return builtin script "name" if unambiguous, fails if not found */
4111 static const BuiltinScript *
4112 findBuiltin(const char *name)
4113 {
4114  int i,
4115  found = 0,
4116  len = strlen(name);
4117  const BuiltinScript *result = NULL;
4118 
4119  for (i = 0; i < lengthof(builtin_script); i++)
4120  {
4121  if (strncmp(builtin_script[i].name, name, len) == 0)
4122  {
4123  result = &builtin_script[i];
4124  found++;
4125  }
4126  }
4127 
4128  /* ok, unambiguous result */
4129  if (found == 1)
4130  return result;
4131 
4132  /* error cases */
4133  if (found == 0)
4134  fprintf(stderr, "no builtin script found for name \"%s\"\n", name);
4135  else /* found > 1 */
4136  fprintf(stderr,
4137  "ambiguous builtin name: %d builtin scripts found for prefix \"%s\"\n", found, name);
4138 
4140  exit(1);
4141 }
4142 
4143 /*
4144  * Determine the weight specification from a script option (-b, -f), if any,
4145  * and return it as an integer (1 is returned if there's no weight). The
4146  * script name is returned in *script as a malloc'd string.
4147  */
4148 static int
4149 parseScriptWeight(const char *option, char **script)
4150 {
4151  char *sep;
4152  int weight;
4153 
4154  if ((sep = strrchr(option, WSEP)))
4155  {
4156  int namelen = sep - option;
4157  long wtmp;
4158  char *badp;
4159 
4160  /* generate the script name */
4161  *script = pg_malloc(namelen + 1);
4162  strncpy(*script, option, namelen);
4163  (*script)[namelen] = '\0';
4164 
4165  /* process digits of the weight spec */
4166  errno = 0;
4167  wtmp = strtol(sep + 1, &badp, 10);
4168  if (errno != 0 || badp == sep + 1 || *badp != '\0')
4169  {
4170  fprintf(stderr, "invalid weight specification: %s\n", sep);
4171  exit(1);
4172  }
4173  if (wtmp > INT_MAX || wtmp < 0)
4174  {
4175  fprintf(stderr,
4176  "weight specification out of range (0 .. %u): " INT64_FORMAT "\n",
4177  INT_MAX, (int64) wtmp);
4178  exit(1);
4179  }
4180  weight = wtmp;
4181  }
4182  else
4183  {
4184  *script = pg_strdup(option);
4185  weight = 1;
4186  }
4187 
4188  return weight;
4189 }
4190 
4191 /* append a script to the list of scripts to process */
4192 static void
4194 {
4195  if (script.commands == NULL || script.commands[0] == NULL)
4196  {
4197  fprintf(stderr, "empty command list for script \"%s\"\n", script.desc);
4198  exit(1);
4199  }
4200 
4201  if (num_scripts >= MAX_SCRIPTS)
4202  {
4203  fprintf(stderr, "at most %d SQL scripts are allowed\n", MAX_SCRIPTS);
4204  exit(1);
4205  }
4206 
4207  sql_script[num_scripts] = script;
4208  num_scripts++;
4209 }
4210 
4211 static void
4212 printSimpleStats(const char *prefix, SimpleStats *ss)
4213 {
4214  if (ss->count > 0)
4215  {
4216  double latency = ss->sum / ss->count;
4217  double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
4218 
4219  printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
4220  printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
4221  }
4222 }
4223 
4224 /* print out results */
4225 static void
4226 printResults(TState *threads, StatsData *total, instr_time total_time,
4227  instr_time conn_total_time, int64 latency_late)
4228 {
4229  double time_include,
4230  tps_include,
4231  tps_exclude;
4232  int64 ntx = total->cnt - total->skipped;
4233  int i,
4234  totalCacheOverflows = 0;
4235 
4236  time_include = INSTR_TIME_GET_DOUBLE(total_time);
4237 
4238  /* tps is about actually executed transactions */
4239  tps_include = ntx / time_include;
4240  tps_exclude = ntx /
4241  (time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
4242 
4243  /* Report test parameters. */
4244  printf("transaction type: %s\n",
4245  num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
4246  printf("scaling factor: %d\n", scale);
4247  printf("query mode: %s\n", QUERYMODE[querymode]);
4248  printf("number of clients: %d\n", nclients);
4249  printf("number of threads: %d\n", nthreads);
4250  if (duration <= 0)
4251  {
4252  printf("number of transactions per client: %d\n", nxacts);
4253  printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
4254  ntx, nxacts * nclients);
4255  }
4256  else
4257  {
4258  printf("duration: %d s\n", duration);
4259  printf("number of transactions actually processed: " INT64_FORMAT "\n",
4260  ntx);
4261  }
4262  /* Report zipfian cache overflow */
4263  for (i = 0; i < nthreads; i++)
4264  {
4265  totalCacheOverflows += threads[i].zipf_cache.overflowCount;
4266  }
4267  if (totalCacheOverflows > 0)
4268  {
4269  printf("zipfian cache array overflowed %d time(s)\n", totalCacheOverflows);
4270  }
4271 
4272  /* Remaining stats are nonsensical if we failed to execute any xacts */
4273  if (total->cnt <= 0)
4274  return;
4275 
4276  if (throttle_delay && latency_limit)
4277  printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
4278  total->skipped,
4279  100.0 * total->skipped / total->cnt);
4280 
4281  if (latency_limit)
4282  printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT "/" INT64_FORMAT " (%.3f %%)\n",
4283  latency_limit / 1000.0, latency_late, ntx,
4284  (ntx > 0) ? 100.0 * latency_late / ntx : 0.0);
4285 
4286  if (throttle_delay || progress || latency_limit)
4287  printSimpleStats("latency", &total->latency);
4288  else
4289  {
4290  /* no measurement, show average latency computed from run time */
4291  printf("latency average = %.3f ms\n",
4292  1000.0 * time_include * nclients / total->cnt);
4293  }
4294 
4295  if (throttle_delay)
4296  {
4297  /*
4298  * Report average transaction lag under rate limit throttling. This
4299  * is the delay between scheduled and actual start times for the
4300  * transaction. The measured lag may be caused by thread/client load,
4301  * the database load, or the Poisson throttling process.
4302  */
4303  printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
4304  0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
4305  }
4306 
4307  printf("tps = %f (including connections establishing)\n", tps_include);
4308  printf("tps = %f (excluding connections establishing)\n", tps_exclude);
4309 
4310  /* Report per-script/command statistics */
4311  if (per_script_stats || is_latencies)
4312  {
4313  int i;
4314 
4315  for (i = 0; i < num_scripts; i++)
4316  {
4317  if (per_script_stats)
4318  {
4319  StatsData *sstats = &sql_script[i].stats;
4320 
4321  printf("SQL script %d: %s\n"
4322  " - weight: %d (targets %.1f%% of total)\n"
4323  " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n",
4324  i + 1, sql_script[i].desc,
4325  sql_script[i].weight,
4326  100.0 * sql_script[i].weight / total_weight,
4327  sstats->cnt,
4328  100.0 * sstats->cnt / total->cnt,
4329  (sstats->cnt - sstats->skipped) / time_include);
4330 
4331  if (throttle_delay && latency_limit && sstats->cnt > 0)
4332  printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
4333  sstats->skipped,
4334  100.0 * sstats->skipped / sstats->cnt);
4335 
4336  printSimpleStats(" - latency", &sstats->latency);
4337  }
4338 
4339  /* Report per-command latencies */
4340  if (is_latencies)
4341  {
4342  Command **commands;
4343 
4344  if (per_script_stats)
4345  printf(" - statement latencies in milliseconds:\n");
4346  else
4347  printf("statement latencies in milliseconds:\n");
4348 
4349  for (commands = sql_script[i].commands;
4350  *commands != NULL;
4351  commands++)
4352  {
4353  SimpleStats *cstats = &(*commands)->stats;
4354 
4355  printf(" %11.3f %s\n",
4356  (cstats->count > 0) ?
4357  1000.0 * cstats->sum / cstats->count : 0.0,
4358  (*commands)->line);
4359  }
4360  }
4361  }
4362  }
4363 }
4364 
4365 
4366 int
4367 main(int argc, char **argv)
4368 {
4369  static struct option long_options[] = {
4370  /* systematic long/short named options */
4371  {"builtin", required_argument, NULL, 'b'},
4372  {"client", required_argument, NULL, 'c'},
4373  {"connect", no_argument, NULL, 'C'},
4374  {"debug", no_argument, NULL, 'd'},
4375  {"define", required_argument, NULL, 'D'},
4376  {"file", required_argument, NULL, 'f'},
4377  {"fillfactor", required_argument, NULL, 'F'},
4378  {"host", required_argument, NULL, 'h'},
4379  {"initialize", no_argument, NULL, 'i'},
4380  {"init-steps", required_argument, NULL, 'I'},
4381  {"jobs", required_argument, NULL, 'j'},
4382  {"log", no_argument, NULL, 'l'},
4383  {"latency-limit", required_argument, NULL, 'L'},
4384  {"no-vacuum", no_argument, NULL, 'n'},
4385  {"port", required_argument, NULL, 'p'},
4386  {"progress", required_argument, NULL, 'P'},
4387  {"protocol", required_argument, NULL, 'M'},
4388  {"quiet", no_argument, NULL, 'q'},
4389  {"report-latencies", no_argument, NULL, 'r'},
4390  {"rate", required_argument, NULL, 'R'},
4391  {"scale", required_argument, NULL, 's'},
4392  {"select-only", no_argument, NULL, 'S'},
4393  {"skip-some-updates", no_argument, NULL, 'N'},
4394  {"time", required_argument, NULL, 'T'},
4395  {"transactions", required_argument, NULL, 't'},
4396  {"username", required_argument, NULL, 'U'},
4397  {"vacuum-all", no_argument, NULL, 'v'},
4398  /* long-named only options */
4399  {"unlogged-tables", no_argument, NULL, 1},
4400  {"tablespace", required_argument, NULL, 2},
4401  {"index-tablespace", required_argument, NULL, 3},
4402  {"sampling-rate", required_argument, NULL, 4},
4403  {"aggregate-interval", required_argument, NULL, 5},
4404  {"progress-timestamp", no_argument, NULL, 6},
4405  {"log-prefix", required_argument, NULL, 7},
4406  {"foreign-keys", no_argument, NULL, 8},
4407  {NULL, 0, NULL, 0}
4408  };
4409 
4410  int c;
4411  bool is_init_mode = false; /* initialize mode? */
4412  char *initialize_steps = NULL;
4413  bool foreign_keys = false;
4414  bool is_no_vacuum = false;
4415  bool do_vacuum_accounts = false; /* vacuum accounts table? */
4416  int optindex;
4417  bool scale_given = false;
4418 
4419  bool benchmarking_option_set = false;
4420  bool initialization_option_set = false;
4421  bool internal_script_used = false;
4422 
4423  CState *state; /* status of clients */
4424  TState *threads; /* array of thread */
4425 
4426  instr_time start_time; /* start up time */
4427  instr_time total_time;
4428  instr_time conn_total_time;
4429  int64 latency_late = 0;
4430  StatsData stats;
4431  int weight;
4432 
4433  int i;
4434  int nclients_dealt;
4435 
4436 #ifdef HAVE_GETRLIMIT
4437  struct rlimit rlim;
4438 #endif
4439 
4440  PGconn *con;
4441  PGresult *res;
4442  char *env;
4443 
4444  progname = get_progname(argv[0]);
4445 
4446  if (argc > 1)
4447  {
4448  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
4449  {
4450  usage();
4451  exit(0);
4452  }
4453  if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
4454  {
4455  puts("pgbench (PostgreSQL) " PG_VERSION);
4456  exit(0);
4457  }
4458  }
4459 
4460 #ifdef WIN32
4461  /* stderr is buffered on Win32. */
4462  setvbuf(stderr, NULL, _IONBF, 0);
4463 #endif
4464 
4465  if ((env = getenv("PGHOST")) != NULL && *env != '\0')
4466  pghost = env;
4467  if ((env = getenv("PGPORT")) != NULL && *env != '\0')
4468  pgport = env;
4469  else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
4470  login = env;
4471 
4472  state = (CState *) pg_malloc(sizeof(CState));
4473  memset(state, 0, sizeof(CState));
4474 
4475  while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
4476  {
4477  char *script;
4478 
4479  switch (c)
4480  {
4481  case 'i':
4482  is_init_mode = true;
4483  break;
4484  case 'I':
4485  if (initialize_steps)
4486  pg_free(initialize_steps);
4487  initialize_steps = pg_strdup(optarg);
4488  checkInitSteps(initialize_steps);
4489  initialization_option_set = true;
4490  break;
4491  case 'h':
4492  pghost = pg_strdup(optarg);
4493  break;
4494  case 'n':
4495  is_no_vacuum = true;
4496  break;
4497  case 'v':
4498  benchmarking_option_set = true;
4499  do_vacuum_accounts = true;
4500  break;
4501  case 'p':
4502  pgport = pg_strdup(optarg);
4503  break;
4504  case 'd':
4505  debug++;
4506  break;
4507  case 'c':
4508  benchmarking_option_set = true;
4509  nclients = atoi(optarg);
4510  if (nclients <= 0 || nclients > MAXCLIENTS)
4511  {
4512  fprintf(stderr, "invalid number of clients: \"%s\"\n",
4513  optarg);
4514  exit(1);
4515  }
4516 #ifdef HAVE_GETRLIMIT
4517 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
4518  if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
4519 #else /* but BSD doesn't ... */
4520  if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
4521 #endif /* RLIMIT_NOFILE */
4522  {
4523  fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
4524  exit(1);
4525  }
4526  if (rlim.rlim_cur < nclients + 3)
4527  {
4528  fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
4529  nclients + 3, (long) rlim.rlim_cur);
4530  fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
4531  exit(1);
4532  }
4533 #endif /* HAVE_GETRLIMIT */
4534  break;
4535  case 'j': /* jobs */
4536  benchmarking_option_set = true;
4537  nthreads = atoi(optarg);
4538  if (nthreads <= 0)
4539  {
4540  fprintf(stderr, "invalid number of threads: \"%s\"\n",
4541  optarg);
4542  exit(1);
4543  }
4544 #ifndef ENABLE_THREAD_SAFETY
4545  if (nthreads != 1)
4546  {
4547  fprintf(stderr, "threads are not supported on this platform; use -j1\n");
4548  exit(1);
4549  }
4550 #endif /* !ENABLE_THREAD_SAFETY */
4551  break;
4552  case 'C':
4553  benchmarking_option_set = true;
4554  is_connect = true;
4555  break;
4556  case 'r':
4557  benchmarking_option_set = true;
4558  is_latencies = true;
4559  break;
4560  case 's':
4561  scale_given = true;
4562  scale = atoi(optarg);
4563  if (scale <= 0)
4564  {
4565  fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
4566  exit(1);
4567  }
4568  break;
4569  case 't':
4570  benchmarking_option_set = true;
4571  nxacts = atoi(optarg);
4572  if (nxacts <= 0)
4573  {
4574  fprintf(stderr, "invalid number of transactions: \"%s\"\n",
4575  optarg);
4576  exit(1);
4577  }
4578  break;
4579  case 'T':
4580  benchmarking_option_set = true;
4581  duration = atoi(optarg);
4582  if (duration <= 0)
4583  {
4584  fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
4585  exit(1);
4586  }
4587  break;
4588  case 'U':
4589  login = pg_strdup(optarg);
4590  break;
4591  case 'l':
4592  benchmarking_option_set = true;
4593  use_log = true;
4594  break;
4595  case 'q':
4596  initialization_option_set = true;
4597  use_quiet = true;
4598  break;
4599  case 'b':
4600  if (strcmp(optarg, "list") == 0)
4601  {
4603  exit(0);
4604  }
4605  weight = parseScriptWeight(optarg, &script);
4606  process_builtin(findBuiltin(script), weight);
4607  benchmarking_option_set = true;
4608  internal_script_used = true;
4609  break;
4610  case 'S':
4611  process_builtin(findBuiltin("select-only"), 1);
4612  benchmarking_option_set = true;
4613  internal_script_used = true;
4614  break;
4615  case 'N':
4616  process_builtin(findBuiltin("simple-update"), 1);
4617  benchmarking_option_set = true;
4618  internal_script_used = true;
4619  break;
4620  case 'f':
4621  weight = parseScriptWeight(optarg, &script);
4622  process_file(script, weight);
4623  benchmarking_option_set = true;
4624  break;
4625  case 'D':
4626  {
4627  char *p;
4628 
4629  benchmarking_option_set = true;
4630 
4631  if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
4632  {
4633  fprintf(stderr, "invalid variable definition: \"%s\"\n",
4634  optarg);
4635  exit(1);
4636  }
4637 
4638  *p++ = '\0';
4639  if (!putVariable(&state[0], "option", optarg, p))
4640  exit(1);
4641  }
4642  break;
4643  case 'F':
4644  initialization_option_set = true;
4645  fillfactor = atoi(optarg);
4646  if (fillfactor < 10 || fillfactor > 100)
4647  {
4648  fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
4649  exit(1);
4650  }
4651  break;
4652  case 'M':
4653  benchmarking_option_set = true;
4654  for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
4655  if (strcmp(optarg, QUERYMODE[querymode]) == 0)
4656  break;
4657  if (querymode >= NUM_QUERYMODE)
4658  {
4659  fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
4660  optarg);
4661  exit(1);
4662  }
4663  break;
4664  case 'P':
4665  benchmarking_option_set = true;
4666  progress = atoi(optarg);
4667  if (progress <= 0)
4668  {
4669  fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
4670  optarg);
4671  exit(1);
4672  }
4673  break;
4674  case 'R':
4675  {
4676  /* get a double from the beginning of option value */
4677  double throttle_value = atof(optarg);
4678 
4679  benchmarking_option_set = true;
4680 
4681  if (throttle_value <= 0.0)
4682  {
4683  fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
4684  exit(1);
4685  }
4686  /* Invert rate limit into a time offset */
4687  throttle_delay = (int64) (1000000.0 / throttle_value);
4688  }
4689  break;
4690  case 'L':
4691  {
4692  double limit_ms = atof(optarg);
4693 
4694  if (limit_ms <= 0.0)
4695  {
4696  fprintf(stderr, "invalid latency limit: \"%s\"\n",
4697  optarg);
4698  exit(1);
4699  }
4700  benchmarking_option_set = true;
4701  latency_limit = (int64) (limit_ms * 1000);
4702  }
4703  break;
4704  case 1: /* unlogged-tables */
4705  initialization_option_set = true;
4706  unlogged_tables = true;
4707  break;
4708  case 2: /* tablespace */
4709  initialization_option_set = true;
4710  tablespace = pg_strdup(optarg);
4711  break;
4712  case 3: /* index-tablespace */
4713  initialization_option_set = true;
4714  index_tablespace = pg_strdup(optarg);
4715  break;
4716  case 4: /* sampling-rate */
4717  benchmarking_option_set = true;
4718  sample_rate = atof(optarg);
4719  if (sample_rate <= 0.0 || sample_rate > 1.0)
4720  {
4721  fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
4722  exit(1);
4723  }
4724  break;
4725  case 5: /* aggregate-interval */
4726  benchmarking_option_set = true;
4727  agg_interval = atoi(optarg);
4728  if (agg_interval <= 0)
4729  {
4730  fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
4731  optarg);
4732  exit(1);
4733  }
4734  break;
4735  case 6: /* progress-timestamp */
4736  progress_timestamp = true;
4737  benchmarking_option_set = true;
4738  break;
4739  case 7: /* log-prefix */
4740  benchmarking_option_set = true;
4741  logfile_prefix = pg_strdup(optarg);
4742  break;
4743  case 8: /* foreign-keys */
4744  initialization_option_set = true;
4745  foreign_keys = true;
4746  break;
4747  default:
4748  fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
4749  exit(1);
4750  break;
4751  }
4752  }
4753 
4754  /* set default script if none */
4755  if (num_scripts == 0 && !is_init_mode)
4756  {
4757  process_builtin(findBuiltin("tpcb-like"), 1);
4758  benchmarking_option_set = true;
4759  internal_script_used = true;
4760  }
4761 
4762  /* if not simple query mode, parse the script(s) to find parameters */
4763  if (querymode != QUERY_SIMPLE)
4764  {
4765  for (i = 0; i < num_scripts; i++)
4766  {
4767  Command **commands = sql_script[i].commands;
4768  int j;
4769 
4770  for (j = 0; commands[j] != NULL; j++)
4771  {
4772  if (commands[j]->type != SQL_COMMAND)
4773  continue;
4774  if (!parseQuery(commands[j]))
4775  exit(1);
4776  }
4777  }
4778  }
4779 
4780  /* compute total_weight */
4781  for (i = 0; i < num_scripts; i++)
4782  /* cannot overflow: weight is 32b, total_weight 64b */
4783  total_weight += sql_script[i].weight;
4784 
4785  if (total_weight == 0 && !is_init_mode)
4786  {
4787  fprintf(stderr, "total script weight must not be zero\n");
4788  exit(1);
4789  }
4790 
4791  /* show per script stats if several scripts are used */
4792  if (num_scripts > 1)
4793  per_script_stats = true;
4794 
4795  /*
4796  * Don't need more threads than there are clients. (This is not merely an
4797  * optimization; throttle_delay is calculated incorrectly below if some
4798  * threads have no clients assigned to them.)
4799  */
4800  if (nthreads > nclients)
4801  nthreads = nclients;
4802 
4803  /* compute a per thread delay */
4804  throttle_delay *= nthreads;
4805 
4806  if (argc > optind)
4807  dbName = argv[optind];
4808  else
4809  {
4810  if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
4811  dbName = env;
4812  else if (login != NULL && *login != '\0')
4813  dbName = login;
4814  else
4815  dbName = "";
4816  }
4817 
4818  if (is_init_mode)
4819  {
4820  if (benchmarking_option_set)
4821  {
4822  fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
4823  exit(1);
4824  }
4825 
4826  if (initialize_steps == NULL)
4827  initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
4828 
4829  if (is_no_vacuum)
4830  {
4831  /* Remove any vacuum step in initialize_steps */
4832  char *p;
4833 
4834  while ((p = strchr(initialize_steps, 'v')) != NULL)
4835  *p = ' ';
4836  }
4837 
4838  if (foreign_keys)
4839  {
4840  /* Add 'f' to end of initialize_steps, if not already there */
4841  if (strchr(initialize_steps, 'f') == NULL)
4842  {
4843  initialize_steps = (char *)
4844  pg_realloc(initialize_steps,
4845  strlen(initialize_steps) + 2);
4846  strcat(initialize_steps, "f");
4847  }
4848  }
4849 
4850  runInitSteps(initialize_steps);
4851  exit(0);
4852  }
4853  else
4854  {
4855  if (initialization_option_set)
4856  {
4857  fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
4858  exit(1);
4859  }
4860  }
4861 
4862  if (nxacts > 0 && duration > 0)
4863  {
4864  fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
4865  exit(1);
4866  }
4867 
4868  /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
4869  if (nxacts <= 0 && duration <= 0)
4870  nxacts = DEFAULT_NXACTS;
4871 
4872  /* --sampling-rate may be used only with -l */
4873  if (sample_rate > 0.0 && !use_log)
4874  {
4875  fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
4876  exit(1);
4877  }
4878 
4879  /* --sampling-rate may not be used with --aggregate-interval */
4880  if (sample_rate > 0.0 && agg_interval > 0)
4881  {
4882  fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
4883  exit(1);
4884  }
4885 
4886  if (agg_interval > 0 && !use_log)
4887  {
4888  fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
4889  exit(1);
4890  }
4891 
4892  if (!use_log && logfile_prefix)
4893  {
4894  fprintf(stderr, "log file prefix (--log-prefix) is allowed only when logging transactions (-l)\n");
4895  exit(1);
4896  }
4897 
4898  if (duration > 0 && agg_interval > duration)
4899  {
4900  fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
4901  exit(1);
4902  }
4903 
4904  if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
4905  {
4906  fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
4907  exit(1);
4908  }
4909 
4910  if (progress_timestamp && progress == 0)
4911  {
4912  fprintf(stderr, "--progress-timestamp is allowed only under --progress\n");
4913  exit(1);
4914  }
4915 
4916  /*
4917  * save main process id in the global variable because process id will be
4918  * changed after fork.
4919  */
4920  main_pid = (int) getpid();
4921 
4922  if (nclients > 1)
4923  {
4924  state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
4925  memset(state + 1, 0, sizeof(CState) * (nclients - 1));
4926 
4927  /* copy any -D switch values to all clients */
4928  for (i = 1; i < nclients; i++)
4929  {
4930  int j;
4931 
4932  state[i].id = i;
4933  for (j = 0; j < state[0].nvariables; j++)
4934  {
4935  Variable *var = &state[0].variables[j];
4936 
4937  if (var->value.type != PGBT_NO_VALUE)
4938  {
4939  if (!putVariableValue(&state[i], "startup",
4940  var->name, &var->value))
4941  exit(1);
4942  }
4943  else
4944  {
4945  if (!putVariable(&state[i], "startup",
4946  var->name, var->svalue))
4947  exit(1);
4948  }
4949  }
4950  }
4951  }
4952 
4953  if (debug)
4954  {
4955  if (duration <= 0)
4956  printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
4957  pghost, pgport, nclients, nxacts, dbName);
4958  else
4959  printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
4960  pghost, pgport, nclients, duration, dbName);
4961  }
4962 
4963  /* opening connection... */
4964  con = doConnect();
4965  if (con == NULL)
4966  exit(1);
4967 
4968  if (PQstatus(con) == CONNECTION_BAD)
4969  {
4970  fprintf(stderr, "connection to database \"%s\" failed\n", dbName);
4971  fprintf(stderr, "%s", PQerrorMessage(con));
4972  exit(1);
4973  }
4974 
4975  if (internal_script_used)
4976  {
4977  /*
4978  * get the scaling factor that should be same as count(*) from
4979  * pgbench_branches if this is not a custom query
4980  */
4981  res = PQexec(con, "select count(*) from pgbench_branches");
4982  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4983  {
4984  char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
4985 
4986  fprintf(stderr, "%s", PQerrorMessage(con));
4987  if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
4988  {
4989  fprintf(stderr, "Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"\n", PQdb(con));
4990  }
4991 
4992  exit(1);
4993  }
4994  scale = atoi(PQgetvalue(res, 0, 0));
4995  if (scale < 0)
4996  {
4997  fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
4998  PQgetvalue(res, 0, 0));
4999  exit(1);
5000  }
5001  PQclear(res);
5002 
5003  /* warn if we override user-given -s switch */
5004  if (scale_given)
5005  fprintf(stderr,
5006  "scale option ignored, using count from pgbench_branches table (%d)\n",
5007  scale);
5008  }
5009 
5010  /*
5011  * :scale variables normally get -s or database scale, but don't override
5012  * an explicit -D switch
5013  */
5014  if (lookupVariable(&state[0], "scale") == NULL)
5015  {
5016  for (i = 0; i < nclients; i++)
5017  {
5018  if (!putVariableInt(&state[i], "startup", "scale", scale))
5019  exit(1);
5020  }
5021  }
5022 
5023  /*
5024  * Define a :client_id variable that is unique per connection. But don't
5025  * override an explicit -D switch.
5026  */
5027  if (lookupVariable(&state[0], "client_id") == NULL)
5028  {
5029  for (i = 0; i < nclients; i++)
5030  {
5031  if (!putVariableInt(&state[i], "startup", "client_id", i))
5032  exit(1);
5033  }
5034  }
5035 
5036  if (!is_no_vacuum)
5037  {
5038  fprintf(stderr, "starting vacuum...");
5039  tryExecuteStatement(con, "vacuum pgbench_branches");
5040  tryExecuteStatement(con, "vacuum pgbench_tellers");
5041  tryExecuteStatement(con, "truncate pgbench_history");
5042  fprintf(stderr, "end.\n");
5043 
5044  if (do_vacuum_accounts)
5045  {
5046  fprintf(stderr, "starting vacuum pgbench_accounts...");
5047  tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
5048  fprintf(stderr, "end.\n");
5049  }
5050  }
5051  PQfinish(con);
5052 
5053  /* set random seed */
5054  INSTR_TIME_SET_CURRENT(start_time);
5055  srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
5056 
5057  /* set up thread data structures */
5058  threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
5059  nclients_dealt = 0;
5060 
5061  for (i = 0; i < nthreads; i++)
5062  {
5063  TState *thread = &threads[i];
5064 
5065  thread->tid = i;
5066  thread->state = &state[nclients_dealt];
5067  thread->nstate =
5068  (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
5069  thread->random_state[0] = random();
5070  thread->random_state[1] = random();
5071  thread->random_state[2] = random();
5072  thread->logfile = NULL; /* filled in later */
5073  thread->latency_late = 0;
5074  thread->zipf_cache.nb_cells = 0;
5075  thread->zipf_cache.current = 0;
5076  thread->zipf_cache.overflowCount = 0;
5077  initStats(&thread->stats, 0);
5078 
5079  nclients_dealt += thread->nstate;
5080  }
5081 
5082  /* all clients must be assigned to a thread */
5083  Assert(nclients_dealt == nclients);
5084 
5085  /* get start up time */
5086  INSTR_TIME_SET_CURRENT(start_time);
5087 
5088  /* set alarm if duration is specified. */
5089  if (duration > 0)
5090  setalarm(duration);
5091 
5092  /* start threads */
5093 #ifdef ENABLE_THREAD_SAFETY
5094  for (i = 0; i < nthreads; i++)
5095  {
5096  TState *thread = &threads[i];
5097 
5099 
5100  /* compute when to stop */
5101  if (duration > 0)
5102  end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
5103  (int64) 1000000 * duration;
5104 
5105  /* the first thread (i = 0) is executed by main thread */
5106  if (i > 0)
5107  {
5108  int err = pthread_create(&thread->thread, NULL, threadRun, thread);
5109 
5110  if (err != 0 || thread->thread == INVALID_THREAD)
5111  {
5112  fprintf(stderr, "could not create thread: %s\n", strerror(err));
5113  exit(1);
5114  }
5115  }
5116  else
5117  {
5118  thread->thread = INVALID_THREAD;
5119  }
5120  }
5121 #else
5122  INSTR_TIME_SET_CURRENT(threads[0].start_time);
5123  /* compute when to stop */
5124  if (duration > 0)
5125  end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
5126  (int64) 1000000 * duration;
5127  threads[0].thread = INVALID_THREAD;
5128 #endif /* ENABLE_THREAD_SAFETY */
5129 
5130  /* wait for threads and accumulate results */
5131  initStats(&stats, 0);
5132  INSTR_TIME_SET_ZERO(conn_total_time);
5133  for (i = 0; i < nthreads; i++)
5134  {
5135  TState *thread = &threads[i];
5136 
5137 #ifdef ENABLE_THREAD_SAFETY
5138  if (threads[i].thread == INVALID_THREAD)
5139  /* actually run this thread directly in the main thread */
5140  (void) threadRun(thread);
5141  else
5142  /* wait of other threads. should check that 0 is returned? */
5143  pthread_join(thread->thread, NULL);
5144 #else
5145  (void) threadRun(thread);
5146 #endif /* ENABLE_THREAD_SAFETY */
5147 
5148  /* aggregate thread level stats */
5149  mergeSimpleStats(&stats.latency, &thread->stats.latency);
5150  mergeSimpleStats(&stats.lag, &thread->stats.lag);
5151  stats.cnt += thread->stats.cnt;
5152  stats.skipped += thread->stats.skipped;
5153  latency_late += thread->latency_late;
5154  INSTR_TIME_ADD(conn_total_time, thread->conn_time);
5155  }
5156  disconnect_all(state, nclients);
5157 
5158  /*
5159  * XXX We compute results as though every client of every thread started
5160  * and finished at the same time. That model can diverge noticeably from
5161  * reality for a short benchmark run involving relatively many threads.
5162  * The first thread may process notably many transactions before the last
5163  * thread begins. Improving the model alone would bring limited benefit,
5164  * because performance during those periods of partial thread count can
5165  * easily exceed steady state performance. This is one of the many ways
5166  * short runs convey deceptive performance figures.
5167  */
5168  INSTR_TIME_SET_CURRENT(total_time);
5169  INSTR_TIME_SUBTRACT(total_time, start_time);
5170  printResults(threads, &stats, total_time, conn_total_time, latency_late);
5171 
5172  return 0;
5173 }
5174 
5175 static void *
5176 threadRun(void *arg)
5177 {
5178  TState *thread = (TState *) arg;
5179  CState *state = thread->state;
5180  instr_time start,
5181  end;
5182  int nstate = thread->nstate;
5183  int remains = nstate; /* number of remaining clients */
5184  int i;
5185 
5186  /* for reporting progress: */
5187  int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
5188  int64 last_report = thread_start;
5189  int64 next_report = last_report + (int64) progress * 1000000;
5190  StatsData last,
5191  aggs;
5192 
5193  /*
5194  * Initialize throttling rate target for all of the thread's clients. It
5195  * might be a little more accurate to reset thread->start_time here too.
5196  * The possible drift seems too small relative to typical throttle delay
5197  * times to worry about it.
5198  */
5199  INSTR_TIME_SET_CURRENT(start);
5200  thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
5201 
5202  INSTR_TIME_SET_ZERO(thread->conn_time);
5203 
5204  initStats(&aggs, time(NULL));
5205  last = aggs;
5206 
5207  /* open log file if requested */
5208  if (use_log)
5209  {
5210  char logpath[MAXPGPATH];
5211  char *prefix = logfile_prefix ? logfile_prefix : "pgbench_log";
5212 
5213  if (thread->tid == 0)
5214  snprintf(logpath, sizeof(logpath), "%s.%d", prefix, main_pid);
5215  else
5216  snprintf(logpath, sizeof(logpath), "%s.%d.%d", prefix, main_pid, thread->tid);
5217 
5218  thread->logfile = fopen(logpath, "w");
5219 
5220  if (thread->logfile == NULL)
5221  {
5222  fprintf(stderr, "could not open logfile \"%s\": %s\n",
5223  logpath, strerror(errno));
5224  goto done;
5225  }
5226  }
5227 
5228  if (!is_connect)
5229  {
5230  /* make connections to the database */
5231  for (i = 0; i < nstate; i++)
5232  {
5233  if ((state[i].con = doConnect()) == NULL)
5234  goto done;
5235  }
5236  }
5237 
5238  /* time after thread and connections set up */
5240  INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
5241 
5242  /* explicitly initialize the state machines */
5243  for (i = 0; i < nstate; i++)
5244  {
5245  state[i].state = CSTATE_CHOOSE_SCRIPT;
5246  }
5247 
5248  /* loop till all clients have terminated */
5249  while (remains > 0)
5250  {
5251  fd_set input_mask;
5252  int maxsock; /* max socket number to be waited for */
5253  int64 min_usec;
5254  int64 now_usec = 0; /* set this only if needed */
5255 
5256  /* identify which client sockets should be checked for input */
5257  FD_ZERO(&input_mask);
5258  maxsock = -1;
5259  min_usec = PG_INT64_MAX;
5260  for (i = 0; i < nstate; i++)
5261  {
5262  CState *st = &state[i];
5263 
5264  if (st->state == CSTATE_THROTTLE && timer_exceeded)
5265  {
5266  /* interrupt client that has not started a transaction */
5267  st->state = CSTATE_FINISHED;
5268  PQfinish(st->con);
5269  st->con = NULL;
5270  remains--;
5271  }
5272  else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
5273  {
5274  /* a nap from the script, or under throttling */
5275  int64 this_usec;
5276 
5277  /* get current time if needed */
5278  if (now_usec == 0)
5279  {
5280  instr_time now;
5281 
5283  now_usec = INSTR_TIME_GET_MICROSEC(now);
5284  }
5285 
5286  /* min_usec should be the minimum delay across all clients */
5287  this_usec = (st->state == CSTATE_SLEEP ?
5288  st->sleep_until : st->txn_scheduled) - now_usec;
5289  if (min_usec > this_usec)
5290  min_usec = this_usec;
5291  }
5292  else if (st->state == CSTATE_WAIT_RESULT)
5293  {
5294  /*
5295  * waiting for result from server - nothing to do unless the
5296  * socket is readable
5297  */
5298  int sock = PQsocket(st->con);
5299 
5300  if (sock < 0)
5301  {
5302  fprintf(stderr, "invalid socket: %s",
5303  PQerrorMessage(st->con));
5304  goto done;
5305  }
5306 
5307  FD_SET(sock, &input_mask);
5308  if (maxsock < sock)
5309  maxsock = sock;
5310  }
5311  else if (st->state != CSTATE_ABORTED &&
5312  st->state != CSTATE_FINISHED)
5313  {
5314  /*
5315  * This client thread is ready to do something, so we don't
5316  * want to wait. No need to examine additional clients.
5317  */
5318  min_usec = 0;
5319  break;
5320  }
5321  }
5322 
5323  /* also wake up to print the next progress report on time */
5324  if (progress && min_usec > 0 && thread->tid == 0)
5325  {
5326  /* get current time if needed */
5327  if (now_usec == 0)
5328  {
5329  instr_time now;
5330 
5332  now_usec = INSTR_TIME_GET_MICROSEC(now);
5333  }
5334 
5335  if (now_usec >= next_report)
5336  min_usec = 0;
5337  else if ((next_report - now_usec) < min_usec)
5338  min_usec = next_report - now_usec;
5339  }
5340 
5341  /*
5342  * If no clients are ready to execute actions, sleep until we receive
5343  * data from the server, or a nap-time specified in the script ends,
5344  * or it's time to print a progress report. Update input_mask to show
5345  * which client(s) received data.
5346  */
5347  if (min_usec > 0)
5348  {
5349  int nsocks = 0; /* return from select(2) if called */
5350 
5351  if (min_usec != PG_INT64_MAX)
5352  {
5353  if (maxsock != -1)
5354  {
5355  struct timeval timeout;
5356 
5357  timeout.tv_sec = min_usec / 1000000;
5358  timeout.tv_usec = min_usec % 1000000;
5359  nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
5360  }
5361  else /* nothing active, simple sleep */
5362  {
5363  pg_usleep(min_usec);
5364  }
5365  }
5366  else /* no explicit delay, select without timeout */
5367  {
5368  nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
5369  }
5370 
5371  if (nsocks < 0)
5372  {
5373  if (errno == EINTR)
5374  {
5375  /* On EINTR, go back to top of loop */
5376  continue;
5377  }
5378  /* must be something wrong */
5379  fprintf(stderr, "select() failed: %s\n", strerror(errno));
5380  goto done;
5381  }
5382  }
5383  else
5384  {
5385  /* min_usec == 0, i.e. something needs to be executed */
5386 
5387  /* If we didn't call select(), don't try to read any data */
5388  FD_ZERO(&input_mask);
5389  }
5390 
5391  /* ok, advance the state machine of each connection */
5392  for (i = 0; i < nstate; i++)
5393  {
5394  CState *st = &state[i];
5395 
5396  if (st->state == CSTATE_WAIT_RESULT)
5397  {
5398  /* don't call doCustom unless data is available */
5399  int sock = PQsocket(st->con);
5400 
5401  if (sock < 0)
5402  {
5403  fprintf(stderr, "invalid socket: %s",
5404  PQerrorMessage(st->con));
5405  goto done;
5406  }
5407 
5408  if (!FD_ISSET(sock, &input_mask))
5409  continue;
5410  }
5411  else if (st->state == CSTATE_FINISHED ||
5412  st->state == CSTATE_ABORTED)
5413  {
5414  /* this client is done, no need to consider it anymore */
5415  continue;
5416  }
5417 
5418  doCustom(thread, st, &aggs);
5419 
5420  /* If doCustom changed client to finished state, reduce remains */
5421  if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
5422  remains--;
5423  }
5424 
5425  /* progress report is made by thread 0 for all threads */
5426  if (progress && thread->tid == 0)
5427  {
5428  instr_time now_time;
5429  int64 now;
5430 
5431  INSTR_TIME_SET_CURRENT(now_time);
5432  now = INSTR_TIME_GET_MICROSEC(now_time);
5433  if (now >= next_report)
5434  {
5435  /* generate and show report */
5436  StatsData cur;
5437  int64 run = now - last_report,
5438  ntx;
5439  double tps,
5440  total_run,
5441  latency,
5442  sqlat,
5443  lag,
5444  stdev;
5445  char tbuf[64];
5446 
5447  /*
5448  * Add up the statistics of all threads.
5449  *
5450  * XXX: No locking. There is no guarantee that we get an
5451  * atomic snapshot of the transaction count and latencies, so
5452  * these figures can well be off by a small amount. The
5453  * progress report's purpose is to give a quick overview of
5454  * how the test is going, so that shouldn't matter too much.
5455  * (If a read from a 64-bit integer is not atomic, you might
5456  * get a "torn" read and completely bogus latencies though!)
5457  */
5458  initStats(&cur, 0);
5459  for (i = 0; i < nthreads; i++)
5460  {
5461  mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
5462  mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
5463  cur.cnt += thread[i].stats.cnt;
5464  cur.skipped += thread[i].stats.skipped;
5465  }
5466 
5467  /* we count only actually executed transactions */
5468  ntx = (cur.cnt - cur.skipped) - (last.cnt - last.skipped);
5469  total_run = (now - thread_start) / 1000000.0;
5470  tps = 1000000.0 * ntx / run;
5471  if (ntx > 0)
5472  {
5473  latency = 0.001 * (cur.latency.sum - last.latency.sum) / ntx;
5474  sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2) / ntx;
5475  stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
5476  lag = 0.001 * (cur.lag.sum - last.lag.sum) / ntx;
5477  }
5478  else
5479  {
5480  latency = sqlat = stdev = lag = 0;
5481  }
5482 
5483  if (progress_timestamp)
5484  {
5485  /*
5486  * On some platforms the current system timestamp is
5487  * available in now_time, but rather than get entangled
5488  * with that, we just eat the cost of an extra syscall in
5489  * all cases.
5490  */
5491  struct timeval tv;
5492 
5493  gettimeofday(&tv, NULL);
5494  snprintf(tbuf, sizeof(tbuf), "%ld.%03ld s",
5495  (long) tv.tv_sec, (long) (tv.tv_usec / 1000));
5496  }
5497  else
5498  {
5499  /* round seconds are expected, but the thread may be late */
5500  snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run);
5501  }
5502 
5503  fprintf(stderr,
5504  "progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
5505  tbuf, tps, latency, stdev);
5506 
5507  if (throttle_delay)
5508  {
5509  fprintf(stderr, ", lag %.3f ms", lag);
5510  if (latency_limit)
5511  fprintf(stderr, ", " INT64_FORMAT " skipped",
5512  cur.skipped - last.skipped);
5513  }
5514  fprintf(stderr, "\n");
5515 
5516  last = cur;
5517  last_report = now;
5518 
5519  /*
5520  * Ensure that the next report is in the future, in case
5521  * pgbench/postgres got stuck somewhere.
5522  */
5523  do
5524  {
5525  next_report += (int64) progress * 1000000;
5526  } while (now >= next_report);
5527  }
5528  }
5529  }
5530 
5531 done:
5532  INSTR_TIME_SET_CURRENT(start);
5533  disconnect_all(state, nstate);
5535  INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
5536  if (thread->logfile)
5537  {
5538  if (agg_interval > 0)
5539  {
5540  /* log aggregated but not yet reported transactions */
5541  doLog(thread, state, &aggs, false, 0, 0);
5542  }
5543  fclose(thread->logfile);
5544  thread->logfile = NULL;
5545  }
5546  return NULL;
5547 }
5548 
5549 /*
5550  * Support for duration option: set timer_exceeded after so many seconds.
5551  */
5552 
5553 #ifndef WIN32
5554 
5555 static void
5557 {
5558  timer_exceeded = true;
5559 }
5560 
5561 static void
5562 setalarm(int seconds)
5563 {
5565  alarm(seconds);
5566 }
5567 
5568 #else /* WIN32 */
5569 
5570 static VOID CALLBACK
5571 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
5572 {
5573  timer_exceeded = true;
5574 }
5575 
5576 static void
5577 setalarm(int seconds)
5578 {
5579  HANDLE queue;
5580  HANDLE timer;
5581 
5582  /* This function will be called at most once, so we can cheat a bit. */
5583  queue = CreateTimerQueue();
5584  if (seconds > ((DWORD) -1) / 1000 ||
5585  !CreateTimerQueueTimer(&timer, queue,
5586  win32_timer_callback, NULL, seconds * 1000, 0,
5587  WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
5588  {
5589  fprintf(stderr, "failed to set timer\n");
5590  exit(1);
5591  }
5592 }
5593 
5594 /* partial pthread implementation for Windows */
5595 
5596 typedef struct win32_pthread
5597 {
5598  HANDLE handle;
5599  void *(*routine) (void *);
5600  void *arg;
5601  void *result;
5602 } win32_pthread;
5603 
5604 static unsigned __stdcall
5605 win32_pthread_run(void *arg)
5606 {
5607  win32_pthread *th = (win32_pthread *) arg;
5608 
5609  th->result = th->routine(th->arg);
5610 
5611  return 0;
5612 }
5613 
5614 static int
5615 pthread_create(pthread_t *thread,
5616  pthread_attr_t *attr,
5617  void *(*start_routine) (void *),
5618  void *arg)
5619 {
5620  int save_errno;
5621  win32_pthread *th;
5622 
5623  th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
5624  th->routine = start_routine;
5625  th->arg = arg;
5626  th->result = NULL;
5627 
5628  th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
5629  if (th->handle == NULL)
5630  {
5631  save_errno = errno;
5632  free(th);
5633  return save_errno;
5634  }
5635 
5636  *thread = th;
5637  return 0;
5638 }
5639 
5640 static int
5641 pthread_join(pthread_t th, void **thread_return)
5642 {
5643  if (th == NULL || th->handle == NULL)
5644  return errno = EINVAL;
5645 
5646  if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
5647  {
5648  _dosmaperr(GetLastError());
5649  return errno;
5650  }
5651 
5652  if (thread_return)
5653  *thread_return = th->result;
5654 
5655  CloseHandle(th->handle);
5656  free(th);
5657  return 0;
5658 }
5659 
5660 #endif /* WIN32 */
static void checkInitSteps(const char *initialize_steps)
Definition: pgbench.c:3515
static void printResults(TState *threads, StatsData *total, instr_time total_time, instr_time conn_total_time, int64 latency_late)
Definition: pgbench.c:4226
time_t start_time
Definition: pgbench.c:232
static void initDropTables(PGconn *con)
Definition: pgbench.c:3187
static char password[100]
Definition: streamutil.c:45
static void printSimpleStats(const char *prefix, SimpleStats *ss)
Definition: pgbench.c:4212
int length(const List *list)
Definition: list.c:1333
static bool coerceToBool(PgBenchValue *pval, bool *bval)
Definition: pgbench.c:1488
PgBenchValue constant
Definition: pgbench.h:112
static const BuiltinScript * findBuiltin(const char *name)
Definition: pgbench.c:4112
static void initVacuum(PGconn *con)
Definition: pgbench.c:3441
static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *)
Definition: pgbench.c:2238
int64 throttle_trigger
Definition: pgbench.c:376
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1941
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6115
PsqlScanResult
Definition: psqlscan.h:30
void expr_scanner_finish(yyscan_t yyscanner)
int gettimeofday(struct timeval *tp, struct timezone *tzp)
Definition: gettimeofday.c:105
double sample_rate
Definition: pgbench.c:127
static Variable * lookupVariable(CState *st, char *name)
Definition: pgbench.c:1116
double harmonicn
Definition: pgbench.c:346
double alpha
Definition: pgbench.c:347
#define PG_INT64_MAX
Definition: c.h:392
int main(int argc, char **argv)
Definition: pgbench.c:4367
static struct @130 value
static int64 getrand(TState *thread, int64 min, int64 max)
Definition: pgbench.c:673
#define ERRCODE_UNDEFINED_TABLE
Definition: pgbench.c:61
int type
Definition: pgbench.c:421
#define INVALID_THREAD
Definition: pgbench.c:388
int command_num
Definition: pgbench.c:420
int expr_yyparse(yyscan_t yyscanner)
ConnectionStateEnum
Definition: pgbench.c:243
int id
Definition: pgbench.c:313
const char * name
Definition: pgbench.c:447
static void ParseScript(const char *script, const char *desc, int weight)
Definition: pgbench.c:3927
static int chooseScript(TState *thread)
Definition: pgbench.c:2428
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1234
static void discard_response(CState *state)
Definition: pgbench.c:1094
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3118
static int num_scripts
Definition: pgbench.c:438
char * line
Definition: pgbench.c:419
unsigned short random_state[3]
Definition: pgbench.c:375
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static void listAvailableScripts(void)
Definition: pgbench.c:4100
static Command * process_sql_command(PQExpBuffer buf, const char *source)
Definition: pgbench.c:3699
int64 n
Definition: pgbench.c:344
char * name
Definition: pgbench.c:203
int nclients
Definition: pgbench.c:174
#define ZIPF_CACHE_SIZE
Definition: pgbench.c:98
const char * get_progname(const char *argv0)
Definition: path.c:453
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:128
int nb_cells
Definition: pgbench.c:361
static ZipfCell * zipfFindOrCreateCacheCell(ZipfCache *cache, int64 n, double s)
Definition: pgbench.c:820
PsqlScanState psql_scan_create(const PsqlScanCallbacks *callbacks)
struct BuiltinScript BuiltinScript
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:57
static int compareVariableNames(const void *v1, const void *v2)
Definition: pgbench.c:1108
long random(void)
Definition: random.c:22
static void static void addScript(ParsedScript script)
Definition: pgbench.c:4193
static int parseScriptWeight(const char *option, char **script)
Definition: pgbench.c:4149
static const PsqlScanCallbacks pgbench_callbacks
Definition: pgbench.c:508
#define DEFAULT_INIT_STEPS
Definition: pgbench.c:93
yyscan_t expr_scanner_init(PsqlScanState state, const char *source, int lineno, int start_offset, const char *command)
struct timeval instr_time
Definition: instr_time.h:147
#define Min(x, y)
Definition: c.h:826
#define COMMANDS_ALLOC_NUM
static void preparedStatementName(char *buffer, int file, int state)
Definition: pgbench.c:2413
void syntax_error(const char *source, int lineno, const char *line, const char *command, const char *msg, const char *more, int column)
Definition: pgbench.c:3664
MetaCommand
Definition: pgbench.c:397
struct PgBenchExpr::@40::@41 variable
union PgBenchValue::@39 u
int64 latency_late
Definition: pgbench.c:385
PgBenchExpr * expr
Definition: pgbench.c:425
void _dosmaperr(unsigned long)
Definition: win32error.c:171
SimpleStats lag
Definition: pgbench.c:237
struct ParsedScript ParsedScript
struct cursor * cur
Definition: ecpg.c:28
StatsData stats
Definition: pgbench.c:434
static void initStats(StatsData *sd, time_t start_time)
Definition: pgbench.c:961
double sum
Definition: pgbench.c:222
#define INSTR_TIME_ACCUM_DIFF(x, y, z)
Definition: instr_time.h:179
void PQfinish(PGconn *conn)
Definition: fe-connect.c:3638
int weight
Definition: pgbench.c:432
int scale
Definition: pgbench.c:111
static void addToSimpleStats(SimpleStats *ss, double val)
Definition: pgbench.c:930
#define INSTR_TIME_SET_ZERO(t)
Definition: instr_time.h:151
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
bool use_log
Definition: pgbench.c:167
static char * getVariable(CState *st, char *name)
Definition: pgbench.c:1143
static bool putVariable(CState *st, const char *context, char *name, const char *value)
Definition: pgbench.c:1318
static FILE * logfile
Definition: pg_regress.c:100
SimpleStats latency
Definition: pgbench.c:236
#define INSTR_TIME_GET_DOUBLE(t)
Definition: instr_time.h:196
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
#define lengthof(array)
Definition: c.h:610
static bool coerceToDouble(PgBenchValue *pval, double *dval)
Definition: pgbench.c:1556
#define MAX_SCRIPTS
Definition: pgbench.c:208
static void setDoubleValue(PgBenchValue *pv, double dval)
Definition: pgbench.c:1600
uint64 current
Definition: pgbench.c:359
#define LOG_STEP_SECONDS
Definition: pgbench.c:95
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
#define MIN_GAUSSIAN_PARAM
Definition: pgbench.c:100
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define WSEP
Definition: pgbench.c:187
static void process_file(const char *filename, int weight)
Definition: pgbench.c:4059
bool per_script_stats
Definition: pgbench.c:171
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2647
char * argv[MAX_ARGS]
Definition: pgbench.c:424
bool use_quiet
Definition: pgbench.c:168
int duration
Definition: pgbench.c:104
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:536
static bool valid_variable_name(const char *name)
Definition: pgbench.c:1245
#define INSTR_TIME_IS_ZERO(t)
Definition: instr_time.h:149
static time_t start_time
Definition: pg_ctl.c:95
#define pg_attribute_printf(f, a)
Definition: c.h:131
#define MAX_ARGS
Definition: pgbench.c:395
static void executeStatement(PGconn *con, const char *sql)
Definition: pgbench.c:995
#define SCALE_32BIT_THRESHOLD
Definition: pgbench.c:165
Definition: type.h:89
static int64 getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
Definition: pgbench.c:716
bool is_connect
Definition: pgbench.c:176
Variable * variables
Definition: pgbench.c:320
int PQputline(PGconn *conn, const char *s)
Definition: fe-exec.c:2540
int nstate
Definition: pgbench.c:374
static const char * QUERYMODE[]
Definition: pgbench.c:415
int pg_strncasecmp(const char *s1, const char *s2, size_t n)
Definition: pgstrcasecmp.c:69
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1183
void pg_usleep(long microsec)
Definition: signal.c:53
#define MAX_FARGS
Definition: pgbench.c:1721
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:3525
int64 strtoint64(const char *str)
Definition: pgbench.c:610
#define SHELL_COMMAND_SIZE
Definition: pgbench.c:209
int64 end_time
Definition: pgbench.c:105
#define nbranches
Definition: pgbench.c:153
#define required_argument
Definition: getopt_long.h:25
FILE * logfile
Definition: pgbench.c:377
static void mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
Definition: pgbench.c:945
double s
Definition: pgbench.c:343
static void setBoolValue(PgBenchValue *pv, bool bval)
Definition: pgbench.c:1585
int optind
Definition: getopt.c:51
#define IS_HIGHBIT_SET(ch)
Definition: c.h:963
instr_time stmt_begin
Definition: pgbench.c:328
int64 cnt
Definition: pgbench.c:233
CState * state
Definition: pgbench.c:373
int expr_scanner_get_lineno(PsqlScanState state, int offset)
static void zipfSetCacheCell(ZipfCell *cell, int64 n, double s)
Definition: pgbench.c:800
int fillfactor
Definition: pgbench.c:117
#define M_PI
Definition: pgbench.c:56
#define INSTR_TIME_SUBTRACT(x, y)
Definition: instr_time.h:167
#define META_COMMAND
Definition: pgbench.c:394
static bool evalFunc(TState *thread, CState *st, PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
Definition: pgbench.c:2222
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
static int debug
Definition: pgbench.c:442
PGconn * conn
Definition: streamutil.c:46
static char * valueTypeName(PgBenchValue *pval)
Definition: pgbench.c:1466
bool vars_sorted
Definition: pgbench.c:322
#define MAXPGPATH
ZipfCache zipf_cache
Definition: pgbench.c:378
static void setNullValue(PgBenchValue *pv)
Definition: pgbench.c:1577
static void commandFailed(CState *st, const char *message)
Definition: pgbench.c:2419
static void doCustom(TState *thread, CState *st, StatsData *agg)
Definition: pgbench.c:2566
static char * assignVariables(CState *st, char *sql)
Definition: pgbench.c:1421
char sign
Definition: informix.c:693
static void initGenerateData(PGconn *con)
Definition: pgbench.c:3303
QueryMode
Definition: pgbench.c:406
char * svalue
Definition: pgbench.c:204
char * c
static bool isLazyFunc(PgBenchFunction func)
Definition: pgbench.c:1606
int nxacts
Definition: pgbench.c:103
static char * buf
Definition: pg_test_fsync.c:67
static bool makeVariableValue(Variable *var)
Definition: pgbench.c:1176
static bool evalLazyFunc(TState *thread, CState *st, PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
Definition: pgbench.c:1613