PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
pgbench.c
Go to the documentation of this file.
1 /*
2  * pgbench.c
3  *
4  * A simple benchmark program for PostgreSQL
5  * Originally written by Tatsuo Ishii and enhanced by many contributors.
6  *
7  * src/bin/pgbench/pgbench.c
8  * Copyright (c) 2000-2017, PostgreSQL Global Development Group
9  * ALL RIGHTS RESERVED;
10  *
11  * Permission to use, copy, modify, and distribute this software and its
12  * documentation for any purpose, without fee, and without a written agreement
13  * is hereby granted, provided that the above copyright notice and this
14  * paragraph and the following two paragraphs appear in all copies.
15  *
16  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20  * POSSIBILITY OF SUCH DAMAGE.
21  *
22  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24  * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27  *
28  */
29 
30 #ifdef WIN32
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
32 #endif /* ! WIN32 */
33 
34 #include "postgres_fe.h"
35 
36 #include "getopt_long.h"
37 #include "libpq-fe.h"
38 #include "portability/instr_time.h"
39 
40 #include <ctype.h>
41 #include <float.h>
42 #include <limits.h>
43 #include <math.h>
44 #include <signal.h>
45 #include <time.h>
46 #include <sys/time.h>
47 #ifdef HAVE_SYS_SELECT_H
48 #include <sys/select.h>
49 #endif
50 
51 #ifdef HAVE_SYS_RESOURCE_H
52 #include <sys/resource.h> /* for getrlimit */
53 #endif
54 
55 #ifndef M_PI
56 #define M_PI 3.14159265358979323846
57 #endif
58 
59 #include "pgbench.h"
60 
61 #define ERRCODE_UNDEFINED_TABLE "42P01"
62 
63 /*
64  * Multi-platform pthread implementations
65  */
66 
67 #ifdef WIN32
68 /* Use native win32 threads on Windows */
69 typedef struct win32_pthread *pthread_t;
70 typedef int pthread_attr_t;
71 
72 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
73 static int pthread_join(pthread_t th, void **thread_return);
74 #elif defined(ENABLE_THREAD_SAFETY)
75 /* Use platform-dependent pthread capability */
76 #include <pthread.h>
77 #else
78 /* No threads implementation, use none (-j 1) */
79 #define pthread_t void *
80 #endif
81 
82 
83 /********************************************************************
84  * some configurable parameters */
85 
86 /* max number of clients allowed */
87 #ifdef FD_SETSIZE
88 #define MAXCLIENTS (FD_SETSIZE - 10)
89 #else
90 #define MAXCLIENTS 1024
91 #endif
92 
93 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
94 #define DEFAULT_NXACTS 10 /* default nxacts */
95 
96 #define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */
97 
98 int nxacts = 0; /* number of transactions per client */
99 int duration = 0; /* duration in seconds */
100 int64 end_time = 0; /* when to stop in micro seconds, under -T */
101 
102 /*
103  * scaling factor. for example, scale = 10 will make 1000000 tuples in
104  * pgbench_accounts table.
105  */
106 int scale = 1;
107 
108 /*
109  * fillfactor. for example, fillfactor = 90 will use only 90 percent
110  * space during inserts and leave 10 percent free.
111  */
112 int fillfactor = 100;
113 
114 /*
115  * create foreign key constraints on the tables?
116  */
117 int foreign_keys = 0;
118 
119 /*
120  * use unlogged tables?
121  */
123 
124 /*
125  * log sampling rate (1.0 = log everything, 0.0 = option not given)
126  */
127 double sample_rate = 0.0;
128 
129 /*
130  * When threads are throttled to a given rate limit, this is the target delay
131  * to reach that rate in usec. 0 is the default and means no throttling.
132  */
133 int64 throttle_delay = 0;
134 
135 /*
136  * Transactions which take longer than this limit (in usec) are counted as
137  * late, and reported as such, although they are completed anyway. When
138  * throttling is enabled, execution time slots that are more than this late
139  * are skipped altogether, and counted separately.
140  */
141 int64 latency_limit = 0;
142 
143 /*
144  * tablespace selection
145  */
146 char *tablespace = NULL;
148 
149 /*
150  * end of configurable parameters
151  *********************************************************************/
152 
153 #define nbranches 1 /* Makes little sense to change this. Change
154  * -s instead */
155 #define ntellers 10
156 #define naccounts 100000
157 
158 /*
159  * The scale factor at/beyond which 32bit integers are incapable of storing
160  * 64bit values.
161  *
162  * Although the actual threshold is 21474, we use 20000 because it is easier to
163  * document and remember, and isn't that far away from the real threshold.
164  */
165 #define SCALE_32BIT_THRESHOLD 20000
166 
167 bool use_log; /* log transaction latencies to a file */
168 bool use_quiet; /* quiet logging onto stderr */
169 int agg_interval; /* log aggregates instead of individual
170  * transactions */
171 bool per_script_stats = false; /* whether to collect stats per script */
172 int progress = 0; /* thread progress report every this seconds */
173 bool progress_timestamp = false; /* progress report with Unix time */
174 int nclients = 1; /* number of clients */
175 int nthreads = 1; /* number of threads */
176 bool is_connect; /* establish connection for each transaction */
177 bool is_latencies; /* report per-command latencies */
178 int main_pid; /* main process id used in log filename */
179 
180 char *pghost = "";
181 char *pgport = "";
182 char *login = NULL;
183 char *dbName;
185 const char *progname;
186 
187 #define WSEP '@' /* weight separator */
188 
189 volatile bool timer_exceeded = false; /* flag from signal handler */
190 
191 /*
192  * Variable definitions. If a variable has a string value, "value" is that
193  * value, is_numeric is false, and num_value is undefined. If the value is
194  * known to be numeric, is_numeric is true and num_value contains the value
195  * (in any permitted numeric variant). In this case "value" contains the
196  * string equivalent of the number, if we've had occasion to compute that,
197  * or NULL if we haven't.
198  */
199 typedef struct
200 {
201  char *name; /* variable's name */
202  char *value; /* its value in string form, if known */
203  bool is_numeric; /* is numeric value known? */
204  PgBenchValue num_value; /* variable's value in numeric form */
205 } Variable;
206 
207 #define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */
208 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
209 
210 /*
211  * Simple data structure to keep stats about something.
212  *
213  * XXX probably the first value should be kept and used as an offset for
214  * better numerical stability...
215  */
216 typedef struct SimpleStats
217 {
218  int64 count; /* how many values were encountered */
219  double min; /* the minimum seen */
220  double max; /* the maximum seen */
221  double sum; /* sum of values */
222  double sum2; /* sum of squared values */
223 } SimpleStats;
224 
225 /*
226  * Data structure to hold various statistics: per-thread and per-script stats
227  * are maintained and merged together.
228  */
229 typedef struct StatsData
230 {
231  time_t start_time; /* interval start time, for aggregates */
232  int64 cnt; /* number of transactions */
233  int64 skipped; /* number of transactions skipped under --rate
234  * and --latency-limit */
237 } StatsData;
238 
239 /*
240  * Connection state machine states.
241  */
242 typedef enum
243 {
244  /*
245  * The client must first choose a script to execute. Once chosen, it can
246  * either be throttled (state CSTATE_START_THROTTLE under --rate) or start
247  * right away (state CSTATE_START_TX).
248  */
250 
251  /*
252  * In CSTATE_START_THROTTLE state, we calculate when to begin the next
253  * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
254  * sleeps until that moment. (If throttling is not enabled, doCustom()
255  * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
256  */
259 
260  /*
261  * CSTATE_START_TX performs start-of-transaction processing. Establishes
262  * a new connection for the transaction, in --connect mode, and records
263  * the transaction start time.
264  */
266 
267  /*
268  * We loop through these states, to process each command in the script:
269  *
270  * CSTATE_START_COMMAND starts the execution of a command. On a SQL
271  * command, the command is sent to the server, and we move to
272  * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
273  * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
274  * meta-commands are executed immediately.
275  *
276  * CSTATE_WAIT_RESULT waits until we get a result set back from the server
277  * for the current command.
278  *
279  * CSTATE_SLEEP waits until the end of \sleep.
280  *
281  * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
282  * command counter, and loops back to CSTATE_START_COMMAND state.
283  */
288 
289  /*
290  * CSTATE_END_TX performs end-of-transaction processing. Calculates
291  * latency, and logs the transaction. In --connect mode, closes the
292  * current connection. Chooses the next script to execute and starts over
293  * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no
294  * more work to do.
295  */
297 
298  /*
299  * Final states. CSTATE_ABORTED means that the script execution was
300  * aborted because a command failed, CSTATE_FINISHED means success.
301  */
305 
306 /*
307  * Connection state.
308  */
309 typedef struct
310 {
311  PGconn *con; /* connection handle to DB */
312  int id; /* client No. */
313  ConnectionStateEnum state; /* state machine's current state. */
314 
315  int use_file; /* index in sql_script for this client */
316  int command; /* command number in script */
317 
318  /* client variables */
319  Variable *variables; /* array of variable definitions */
320  int nvariables; /* number of variables */
321  bool vars_sorted; /* are variables sorted by name? */
322 
323  /* various times about current transaction */
324  int64 txn_scheduled; /* scheduled start time of transaction (usec) */
325  int64 sleep_until; /* scheduled start time of next cmd (usec) */
326  instr_time txn_begin; /* used for measuring schedule lag times */
327  instr_time stmt_begin; /* used for measuring statement latencies */
328 
329  bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
330 
331  /* per client collected stats */
332  int64 cnt; /* transaction count */
333  int ecnt; /* error count */
334 } CState;
335 
336 /*
337  * Thread state
338  */
339 typedef struct
340 {
341  int tid; /* thread id */
342  pthread_t thread; /* thread handle */
343  CState *state; /* array of CState */
344  int nstate; /* length of state[] */
345  unsigned short random_state[3]; /* separate randomness for each thread */
346  int64 throttle_trigger; /* previous/next throttling (us) */
347  FILE *logfile; /* where to log, or NULL */
348 
349  /* per thread collected stats */
350  instr_time start_time; /* thread start time */
353  int64 latency_late; /* executed but late transactions */
354 } TState;
355 
356 #define INVALID_THREAD ((pthread_t) 0)
357 
358 /*
359  * queries read from files
360  */
361 #define SQL_COMMAND 1
362 #define META_COMMAND 2
363 #define MAX_ARGS 10
364 
365 typedef enum QueryMode
366 {
367  QUERY_SIMPLE, /* simple query */
368  QUERY_EXTENDED, /* extended query */
369  QUERY_PREPARED, /* extended query with prepared statements */
371 } QueryMode;
372 
374 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
375 
376 typedef struct
377 {
378  char *line; /* text of command line */
379  int command_num; /* unique index of this Command struct */
380  int type; /* command type (SQL_COMMAND or META_COMMAND) */
381  int argc; /* number of command words */
382  char *argv[MAX_ARGS]; /* command word list */
383  PgBenchExpr *expr; /* parsed expression, if needed */
384  SimpleStats stats; /* time spent in this command */
385 } Command;
386 
387 typedef struct ParsedScript
388 {
389  const char *desc; /* script descriptor (eg, file name) */
390  int weight; /* selection weight */
391  Command **commands; /* NULL-terminated array of Commands */
392  StatsData stats; /* total time spent in script */
393 } ParsedScript;
394 
395 static ParsedScript sql_script[MAX_SCRIPTS]; /* SQL script files */
396 static int num_scripts; /* number of scripts in sql_script[] */
397 static int num_commands = 0; /* total number of Command structs */
398 static int64 total_weight = 0;
399 
400 static int debug = 0; /* debug flag */
401 
402 /* Builtin test scripts */
403 typedef struct BuiltinScript
404 {
405  const char *name; /* very short name for -b ... */
406  const char *desc; /* short description */
407  const char *script; /* actual pgbench script */
408 } BuiltinScript;
409 
411 {
412  {
413  "tpcb-like",
414  "<builtin: TPC-B (sort of)>",
415  "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
416  "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
417  "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
418  "\\set delta random(-5000, 5000)\n"
419  "BEGIN;\n"
420  "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
421  "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
422  "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
423  "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
424  "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
425  "END;\n"
426  },
427  {
428  "simple-update",
429  "<builtin: simple update>",
430  "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
431  "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
432  "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
433  "\\set delta random(-5000, 5000)\n"
434  "BEGIN;\n"
435  "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
436  "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
437  "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
438  "END;\n"
439  },
440  {
441  "select-only",
442  "<builtin: select only>",
443  "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
444  "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
445  }
446 };
447 
448 
449 /* Function prototypes */
450 static void setIntValue(PgBenchValue *pv, int64 ival);
451 static void setDoubleValue(PgBenchValue *pv, double dval);
452 static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *);
453 static void doLog(TState *thread, CState *st,
454  StatsData *agg, bool skipped, double latency, double lag);
455 static void processXactStats(TState *thread, CState *st, instr_time *now,
456  bool skipped, StatsData *agg);
457 static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
458 static void addScript(ParsedScript script);
459 static void *threadRun(void *arg);
460 static void setalarm(int seconds);
461 
462 
463 /* callback functions for our flex lexer */
465  NULL, /* don't need get_variable functionality */
467 };
468 
469 
470 static void
471 usage(void)
472 {
473  printf("%s is a benchmarking tool for PostgreSQL.\n\n"
474  "Usage:\n"
475  " %s [OPTION]... [DBNAME]\n"
476  "\nInitialization options:\n"
477  " -i, --initialize invokes initialization mode\n"
478  " -F, --fillfactor=NUM set fill factor\n"
479  " -n, --no-vacuum do not run VACUUM after initialization\n"
480  " -q, --quiet quiet logging (one message each 5 seconds)\n"
481  " -s, --scale=NUM scaling factor\n"
482  " --foreign-keys create foreign key constraints between tables\n"
483  " --index-tablespace=TABLESPACE\n"
484  " create indexes in the specified tablespace\n"
485  " --tablespace=TABLESPACE create tables in the specified tablespace\n"
486  " --unlogged-tables create tables as unlogged tables\n"
487  "\nOptions to select what to run:\n"
488  " -b, --builtin=NAME[@W] add builtin script NAME weighted at W (default: 1)\n"
489  " (use \"-b list\" to list available scripts)\n"
490  " -f, --file=FILENAME[@W] add script FILENAME weighted at W (default: 1)\n"
491  " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
492  " (same as \"-b simple-update\")\n"
493  " -S, --select-only perform SELECT-only transactions\n"
494  " (same as \"-b select-only\")\n"
495  "\nBenchmarking options:\n"
496  " -c, --client=NUM number of concurrent database clients (default: 1)\n"
497  " -C, --connect establish new connection for each transaction\n"
498  " -D, --define=VARNAME=VALUE\n"
499  " define variable for use by custom script\n"
500  " -j, --jobs=NUM number of threads (default: 1)\n"
501  " -l, --log write transaction times to log file\n"
502  " -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n"
503  " -M, --protocol=simple|extended|prepared\n"
504  " protocol for submitting queries (default: simple)\n"
505  " -n, --no-vacuum do not run VACUUM before tests\n"
506  " -P, --progress=NUM show thread progress report every NUM seconds\n"
507  " -r, --report-latencies report average latency per command\n"
508  " -R, --rate=NUM target rate in transactions per second\n"
509  " -s, --scale=NUM report this scale factor in output\n"
510  " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
511  " -T, --time=NUM duration of benchmark test in seconds\n"
512  " -v, --vacuum-all vacuum all four standard tables before tests\n"
513  " --aggregate-interval=NUM aggregate data over NUM seconds\n"
514  " --progress-timestamp use Unix epoch timestamps for progress\n"
515  " --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n"
516  " --log-prefix=PREFIX prefix for transaction time log file\n"
517  " (default: \"pgbench_log\")\n"
518  "\nCommon options:\n"
519  " -d, --debug print debugging output\n"
520  " -h, --host=HOSTNAME database server host or socket directory\n"
521  " -p, --port=PORT database server port number\n"
522  " -U, --username=USERNAME connect as specified database user\n"
523  " -V, --version output version information, then exit\n"
524  " -?, --help show this help, then exit\n"
525  "\n"
526  "Report bugs to <pgsql-bugs@postgresql.org>.\n",
527  progname, progname);
528 }
529 
530 /* return whether str matches "^\s*[-+]?[0-9]+$" */
531 static bool
532 is_an_int(const char *str)
533 {
534  const char *ptr = str;
535 
536  /* skip leading spaces; cast is consistent with strtoint64 */
537  while (*ptr && isspace((unsigned char) *ptr))
538  ptr++;
539 
540  /* skip sign */
541  if (*ptr == '+' || *ptr == '-')
542  ptr++;
543 
544  /* at least one digit */
545  if (*ptr && !isdigit((unsigned char) *ptr))
546  return false;
547 
548  /* eat all digits */
549  while (*ptr && isdigit((unsigned char) *ptr))
550  ptr++;
551 
552  /* must have reached end of string */
553  return *ptr == '\0';
554 }
555 
556 
557 /*
558  * strtoint64 -- convert a string to 64-bit integer
559  *
560  * This function is a modified version of scanint8() from
561  * src/backend/utils/adt/int8.c.
562  */
563 int64
564 strtoint64(const char *str)
565 {
566  const char *ptr = str;
567  int64 result = 0;
568  int sign = 1;
569 
570  /*
571  * Do our own scan, rather than relying on sscanf which might be broken
572  * for long long.
573  */
574 
575  /* skip leading spaces */
576  while (*ptr && isspace((unsigned char) *ptr))
577  ptr++;
578 
579  /* handle sign */
580  if (*ptr == '-')
581  {
582  ptr++;
583 
584  /*
585  * Do an explicit check for INT64_MIN. Ugly though this is, it's
586  * cleaner than trying to get the loop below to handle it portably.
587  */
588  if (strncmp(ptr, "9223372036854775808", 19) == 0)
589  {
590  result = PG_INT64_MIN;
591  ptr += 19;
592  goto gotdigits;
593  }
594  sign = -1;
595  }
596  else if (*ptr == '+')
597  ptr++;
598 
599  /* require at least one digit */
600  if (!isdigit((unsigned char) *ptr))
601  fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
602 
603  /* process digits */
604  while (*ptr && isdigit((unsigned char) *ptr))
605  {
606  int64 tmp = result * 10 + (*ptr++ - '0');
607 
608  if ((tmp / 10) != result) /* overflow? */
609  fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
610  result = tmp;
611  }
612 
613 gotdigits:
614 
615  /* allow trailing whitespace, but not other trailing chars */
616  while (*ptr != '\0' && isspace((unsigned char) *ptr))
617  ptr++;
618 
619  if (*ptr != '\0')
620  fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
621 
622  return ((sign < 0) ? -result : result);
623 }
624 
625 /* random number generator: uniform distribution from min to max inclusive */
626 static int64
627 getrand(TState *thread, int64 min, int64 max)
628 {
629  /*
630  * Odd coding is so that min and max have approximately the same chance of
631  * being selected as do numbers between them.
632  *
633  * pg_erand48() is thread-safe and concurrent, which is why we use it
634  * rather than random(), which in glibc is non-reentrant, and therefore
635  * protected by a mutex, and therefore a bottleneck on machines with many
636  * CPUs.
637  */
638  return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
639 }
640 
641 /*
642  * random number generator: exponential distribution from min to max inclusive.
643  * the parameter is so that the density of probability for the last cut-off max
644  * value is exp(-parameter).
645  */
646 static int64
647 getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
648 {
649  double cut,
650  uniform,
651  rand;
652 
653  /* abort if wrong parameter, but must really be checked beforehand */
654  Assert(parameter > 0.0);
655  cut = exp(-parameter);
656  /* erand in [0, 1), uniform in (0, 1] */
657  uniform = 1.0 - pg_erand48(thread->random_state);
658 
659  /*
660  * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
661  */
662  Assert((1.0 - cut) != 0.0);
663  rand = -log(cut + (1.0 - cut) * uniform) / parameter;
664  /* return int64 random number within between min and max */
665  return min + (int64) ((max - min + 1) * rand);
666 }
667 
668 /* random number generator: gaussian distribution from min to max inclusive */
669 static int64
670 getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
671 {
672  double stdev;
673  double rand;
674 
675  /* abort if parameter is too low, but must really be checked beforehand */
676  Assert(parameter >= MIN_GAUSSIAN_PARAM);
677 
678  /*
679  * Get user specified random number from this loop, with -parameter <
680  * stdev <= parameter
681  *
682  * This loop is executed until the number is in the expected range.
683  *
684  * As the minimum parameter is 2.0, the probability of looping is low:
685  * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
686  * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
687  * the worst case. For a parameter value of 5.0, the looping probability
688  * is about e^{-5} * 2 / pi ~ 0.43%.
689  */
690  do
691  {
692  /*
693  * pg_erand48 generates [0,1), but for the basic version of the
694  * Box-Muller transform the two uniformly distributed random numbers
695  * are expected in (0, 1] (see
696  * http://en.wikipedia.org/wiki/Box_muller)
697  */
698  double rand1 = 1.0 - pg_erand48(thread->random_state);
699  double rand2 = 1.0 - pg_erand48(thread->random_state);
700 
701  /* Box-Muller basic form transform */
702  double var_sqrt = sqrt(-2.0 * log(rand1));
703 
704  stdev = var_sqrt * sin(2.0 * M_PI * rand2);
705 
706  /*
707  * we may try with cos, but there may be a bias induced if the
708  * previous value fails the test. To be on the safe side, let us try
709  * over.
710  */
711  }
712  while (stdev < -parameter || stdev >= parameter);
713 
714  /* stdev is in [-parameter, parameter), normalization to [0,1) */
715  rand = (stdev + parameter) / (parameter * 2.0);
716 
717  /* return int64 random number within between min and max */
718  return min + (int64) ((max - min + 1) * rand);
719 }
720 
721 /*
722  * random number generator: generate a value, such that the series of values
723  * will approximate a Poisson distribution centered on the given value.
724  */
725 static int64
726 getPoissonRand(TState *thread, int64 center)
727 {
728  /*
729  * Use inverse transform sampling to generate a value > 0, such that the
730  * expected (i.e. average) value is the given argument.
731  */
732  double uniform;
733 
734  /* erand in [0, 1), uniform in (0, 1] */
735  uniform = 1.0 - pg_erand48(thread->random_state);
736 
737  return (int64) (-log(uniform) * ((double) center) + 0.5);
738 }
739 
740 /*
741  * Initialize the given SimpleStats struct to all zeroes
742  */
743 static void
745 {
746  memset(ss, 0, sizeof(SimpleStats));
747 }
748 
749 /*
750  * Accumulate one value into a SimpleStats struct.
751  */
752 static void
754 {
755  if (ss->count == 0 || val < ss->min)
756  ss->min = val;
757  if (ss->count == 0 || val > ss->max)
758  ss->max = val;
759  ss->count++;
760  ss->sum += val;
761  ss->sum2 += val * val;
762 }
763 
764 /*
765  * Merge two SimpleStats objects
766  */
767 static void
769 {
770  if (acc->count == 0 || ss->min < acc->min)
771  acc->min = ss->min;
772  if (acc->count == 0 || ss->max > acc->max)
773  acc->max = ss->max;
774  acc->count += ss->count;
775  acc->sum += ss->sum;
776  acc->sum2 += ss->sum2;
777 }
778 
779 /*
780  * Initialize a StatsData struct to mostly zeroes, with its start time set to
781  * the given value.
782  */
783 static void
785 {
786  sd->start_time = start_time;
787  sd->cnt = 0;
788  sd->skipped = 0;
789  initSimpleStats(&sd->latency);
790  initSimpleStats(&sd->lag);
791 }
792 
793 /*
794  * Accumulate one additional item into the given stats object.
795  */
796 static void
797 accumStats(StatsData *stats, bool skipped, double lat, double lag)
798 {
799  stats->cnt++;
800 
801  if (skipped)
802  {
803  /* no latency to record on skipped transactions */
804  stats->skipped++;
805  }
806  else
807  {
808  addToSimpleStats(&stats->latency, lat);
809 
810  /* and possibly the same for schedule lag */
811  if (throttle_delay)
812  addToSimpleStats(&stats->lag, lag);
813  }
814 }
815 
816 /* call PQexec() and exit() on failure */
817 static void
818 executeStatement(PGconn *con, const char *sql)
819 {
820  PGresult *res;
821 
822  res = PQexec(con, sql);
823  if (PQresultStatus(res) != PGRES_COMMAND_OK)
824  {
825  fprintf(stderr, "%s", PQerrorMessage(con));
826  exit(1);
827  }
828  PQclear(res);
829 }
830 
831 /* call PQexec() and complain, but without exiting, on failure */
832 static void
833 tryExecuteStatement(PGconn *con, const char *sql)
834 {
835  PGresult *res;
836 
837  res = PQexec(con, sql);
838  if (PQresultStatus(res) != PGRES_COMMAND_OK)
839  {
840  fprintf(stderr, "%s", PQerrorMessage(con));
841  fprintf(stderr, "(ignoring this error and continuing anyway)\n");
842  }
843  PQclear(res);
844 }
845 
846 /* set up a connection to the backend */
847 static PGconn *
849 {
850  PGconn *conn;
851  bool new_pass;
852  static bool have_password = false;
853  static char password[100];
854 
855  /*
856  * Start the connection. Loop until we have a password if requested by
857  * backend.
858  */
859  do
860  {
861 #define PARAMS_ARRAY_SIZE 7
862 
863  const char *keywords[PARAMS_ARRAY_SIZE];
864  const char *values[PARAMS_ARRAY_SIZE];
865 
866  keywords[0] = "host";
867  values[0] = pghost;
868  keywords[1] = "port";
869  values[1] = pgport;
870  keywords[2] = "user";
871  values[2] = login;
872  keywords[3] = "password";
873  values[3] = have_password ? password : NULL;
874  keywords[4] = "dbname";
875  values[4] = dbName;
876  keywords[5] = "fallback_application_name";
877  values[5] = progname;
878  keywords[6] = NULL;
879  values[6] = NULL;
880 
881  new_pass = false;
882 
883  conn = PQconnectdbParams(keywords, values, true);
884 
885  if (!conn)
886  {
887  fprintf(stderr, "connection to database \"%s\" failed\n",
888  dbName);
889  return NULL;
890  }
891 
892  if (PQstatus(conn) == CONNECTION_BAD &&
894  !have_password)
895  {
896  PQfinish(conn);
897  simple_prompt("Password: ", password, sizeof(password), false);
898  have_password = true;
899  new_pass = true;
900  }
901  } while (new_pass);
902 
903  /* check to see that the backend connection was successfully made */
904  if (PQstatus(conn) == CONNECTION_BAD)
905  {
906  fprintf(stderr, "connection to database \"%s\" failed:\n%s",
907  dbName, PQerrorMessage(conn));
908  PQfinish(conn);
909  return NULL;
910  }
911 
912  return conn;
913 }
914 
915 /* throw away response from backend */
916 static void
918 {
919  PGresult *res;
920 
921  do
922  {
923  res = PQgetResult(state->con);
924  if (res)
925  PQclear(res);
926  } while (res);
927 }
928 
929 /* qsort comparator for Variable array */
930 static int
931 compareVariableNames(const void *v1, const void *v2)
932 {
933  return strcmp(((const Variable *) v1)->name,
934  ((const Variable *) v2)->name);
935 }
936 
937 /* Locate a variable by name; returns NULL if unknown */
938 static Variable *
940 {
941  Variable key;
942 
943  /* On some versions of Solaris, bsearch of zero items dumps core */
944  if (st->nvariables <= 0)
945  return NULL;
946 
947  /* Sort if we have to */
948  if (!st->vars_sorted)
949  {
950  qsort((void *) st->variables, st->nvariables, sizeof(Variable),
952  st->vars_sorted = true;
953  }
954 
955  /* Now we can search */
956  key.name = name;
957  return (Variable *) bsearch((void *) &key,
958  (void *) st->variables,
959  st->nvariables,
960  sizeof(Variable),
962 }
963 
964 /* Get the value of a variable, in string form; returns NULL if unknown */
965 static char *
967 {
968  Variable *var;
969  char stringform[64];
970 
971  var = lookupVariable(st, name);
972  if (var == NULL)
973  return NULL; /* not found */
974 
975  if (var->value)
976  return var->value; /* we have it in string form */
977 
978  /* We need to produce a string equivalent of the numeric value */
979  Assert(var->is_numeric);
980  if (var->num_value.type == PGBT_INT)
981  snprintf(stringform, sizeof(stringform),
982  INT64_FORMAT, var->num_value.u.ival);
983  else
984  {
985  Assert(var->num_value.type == PGBT_DOUBLE);
986  snprintf(stringform, sizeof(stringform),
987  "%.*g", DBL_DIG, var->num_value.u.dval);
988  }
989  var->value = pg_strdup(stringform);
990  return var->value;
991 }
992 
993 /* Try to convert variable to numeric form; return false on failure */
994 static bool
996 {
997  if (var->is_numeric)
998  return true; /* no work */
999 
1000  if (is_an_int(var->value))
1001  {
1002  setIntValue(&var->num_value, strtoint64(var->value));
1003  var->is_numeric = true;
1004  }
1005  else /* type should be double */
1006  {
1007  double dv;
1008  char xs;
1009 
1010  if (sscanf(var->value, "%lf%c", &dv, &xs) != 1)
1011  {
1012  fprintf(stderr,
1013  "malformed variable \"%s\" value: \"%s\"\n",
1014  var->name, var->value);
1015  return false;
1016  }
1017  setDoubleValue(&var->num_value, dv);
1018  var->is_numeric = true;
1019  }
1020  return true;
1021 }
1022 
1023 /* check whether the name consists of alphabets, numerals and underscores. */
1024 static bool
1026 {
1027  int i;
1028 
1029  for (i = 0; name[i] != '\0'; i++)
1030  {
1031  if (!isalnum((unsigned char) name[i]) && name[i] != '_')
1032  return false;
1033  }
1034 
1035  return (i > 0); /* must be non-empty */
1036 }
1037 
1038 /*
1039  * Lookup a variable by name, creating it if need be.
1040  * Caller is expected to assign a value to the variable.
1041  * Returns NULL on failure (bad name).
1042  */
1043 static Variable *
1044 lookupCreateVariable(CState *st, const char *context, char *name)
1045 {
1046  Variable *var;
1047 
1048  var = lookupVariable(st, name);
1049  if (var == NULL)
1050  {
1051  Variable *newvars;
1052 
1053  /*
1054  * Check for the name only when declaring a new variable to avoid
1055  * overhead.
1056  */
1057  if (!isLegalVariableName(name))
1058  {
1059  fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
1060  context, name);
1061  return NULL;
1062  }
1063 
1064  /* Create variable at the end of the array */
1065  if (st->variables)
1066  newvars = (Variable *) pg_realloc(st->variables,
1067  (st->nvariables + 1) * sizeof(Variable));
1068  else
1069  newvars = (Variable *) pg_malloc(sizeof(Variable));
1070 
1071  st->variables = newvars;
1072 
1073  var = &newvars[st->nvariables];
1074 
1075  var->name = pg_strdup(name);
1076  var->value = NULL;
1077  /* caller is expected to initialize remaining fields */
1078 
1079  st->nvariables++;
1080  /* we don't re-sort the array till we have to */
1081  st->vars_sorted = false;
1082  }
1083 
1084  return var;
1085 }
1086 
1087 /* Assign a string value to a variable, creating it if need be */
1088 /* Returns false on failure (bad name) */
1089 static bool
1090 putVariable(CState *st, const char *context, char *name, const char *value)
1091 {
1092  Variable *var;
1093  char *val;
1094 
1095  var = lookupCreateVariable(st, context, name);
1096  if (!var)
1097  return false;
1098 
1099  /* dup then free, in case value is pointing at this variable */
1100  val = pg_strdup(value);
1101 
1102  if (var->value)
1103  free(var->value);
1104  var->value = val;
1105  var->is_numeric = false;
1106 
1107  return true;
1108 }
1109 
1110 /* Assign a numeric value to a variable, creating it if need be */
1111 /* Returns false on failure (bad name) */
1112 static bool
1113 putVariableNumber(CState *st, const char *context, char *name,
1114  const PgBenchValue *value)
1115 {
1116  Variable *var;
1117 
1118  var = lookupCreateVariable(st, context, name);
1119  if (!var)
1120  return false;
1121 
1122  if (var->value)
1123  free(var->value);
1124  var->value = NULL;
1125  var->is_numeric = true;
1126  var->num_value = *value;
1127 
1128  return true;
1129 }
1130 
1131 /* Assign an integer value to a variable, creating it if need be */
1132 /* Returns false on failure (bad name) */
1133 static bool
1134 putVariableInt(CState *st, const char *context, char *name, int64 value)
1135 {
1136  PgBenchValue val;
1137 
1138  setIntValue(&val, value);
1139  return putVariableNumber(st, context, name, &val);
1140 }
1141 
1142 static char *
1143 parseVariable(const char *sql, int *eaten)
1144 {
1145  int i = 0;
1146  char *name;
1147 
1148  do
1149  {
1150  i++;
1151  } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
1152  if (i == 1)
1153  return NULL;
1154 
1155  name = pg_malloc(i);
1156  memcpy(name, &sql[1], i - 1);
1157  name[i - 1] = '\0';
1158 
1159  *eaten = i;
1160  return name;
1161 }
1162 
1163 static char *
1164 replaceVariable(char **sql, char *param, int len, char *value)
1165 {
1166  int valueln = strlen(value);
1167 
1168  if (valueln > len)
1169  {
1170  size_t offset = param - *sql;
1171 
1172  *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
1173  param = *sql + offset;
1174  }
1175 
1176  if (valueln != len)
1177  memmove(param + valueln, param + len, strlen(param + len) + 1);
1178  memcpy(param, value, valueln);
1179 
1180  return param + valueln;
1181 }
1182 
1183 static char *
1184 assignVariables(CState *st, char *sql)
1185 {
1186  char *p,
1187  *name,
1188  *val;
1189 
1190  p = sql;
1191  while ((p = strchr(p, ':')) != NULL)
1192  {
1193  int eaten;
1194 
1195  name = parseVariable(p, &eaten);
1196  if (name == NULL)
1197  {
1198  while (*p == ':')
1199  {
1200  p++;
1201  }
1202  continue;
1203  }
1204 
1205  val = getVariable(st, name);
1206  free(name);
1207  if (val == NULL)
1208  {
1209  p++;
1210  continue;
1211  }
1212 
1213  p = replaceVariable(&sql, p, eaten, val);
1214  }
1215 
1216  return sql;
1217 }
1218 
1219 static void
1220 getQueryParams(CState *st, const Command *command, const char **params)
1221 {
1222  int i;
1223 
1224  for (i = 0; i < command->argc - 1; i++)
1225  params[i] = getVariable(st, command->argv[i + 1]);
1226 }
1227 
1228 /* get a value as an int, tell if there is a problem */
1229 static bool
1230 coerceToInt(PgBenchValue *pval, int64 *ival)
1231 {
1232  if (pval->type == PGBT_INT)
1233  {
1234  *ival = pval->u.ival;
1235  return true;
1236  }
1237  else
1238  {
1239  double dval = pval->u.dval;
1240 
1241  Assert(pval->type == PGBT_DOUBLE);
1242  if (dval < PG_INT64_MIN || PG_INT64_MAX < dval)
1243  {
1244  fprintf(stderr, "double to int overflow for %f\n", dval);
1245  return false;
1246  }
1247  *ival = (int64) dval;
1248  return true;
1249  }
1250 }
1251 
1252 /* get a value as a double, or tell if there is a problem */
1253 static bool
1254 coerceToDouble(PgBenchValue *pval, double *dval)
1255 {
1256  if (pval->type == PGBT_DOUBLE)
1257  {
1258  *dval = pval->u.dval;
1259  return true;
1260  }
1261  else
1262  {
1263  Assert(pval->type == PGBT_INT);
1264  *dval = (double) pval->u.ival;
1265  return true;
1266  }
1267 }
1268 
1269 /* assign an integer value */
1270 static void
1271 setIntValue(PgBenchValue *pv, int64 ival)
1272 {
1273  pv->type = PGBT_INT;
1274  pv->u.ival = ival;
1275 }
1276 
1277 /* assign a double value */
1278 static void
1279 setDoubleValue(PgBenchValue *pv, double dval)
1280 {
1281  pv->type = PGBT_DOUBLE;
1282  pv->u.dval = dval;
1283 }
1284 
1285 /* maximum number of function arguments */
1286 #define MAX_FARGS 16
1287 
1288 /*
1289  * Recursive evaluation of functions
1290  */
1291 static bool
1292 evalFunc(TState *thread, CState *st,
1294 {
1295  /* evaluate all function arguments */
1296  int nargs = 0;
1297  PgBenchValue vargs[MAX_FARGS];
1298  PgBenchExprLink *l = args;
1299 
1300  for (nargs = 0; nargs < MAX_FARGS && l != NULL; nargs++, l = l->next)
1301  if (!evaluateExpr(thread, st, l->expr, &vargs[nargs]))
1302  return false;
1303 
1304  if (l != NULL)
1305  {
1306  fprintf(stderr,
1307  "too many function arguments, maximum is %d\n", MAX_FARGS);
1308  return false;
1309  }
1310 
1311  /* then evaluate function */
1312  switch (func)
1313  {
1314  /* overloaded operators */
1315  case PGBENCH_ADD:
1316  case PGBENCH_SUB:
1317  case PGBENCH_MUL:
1318  case PGBENCH_DIV:
1319  case PGBENCH_MOD:
1320  {
1321  PgBenchValue *lval = &vargs[0],
1322  *rval = &vargs[1];
1323 
1324  Assert(nargs == 2);
1325 
1326  /* overloaded type management, double if some double */
1327  if ((lval->type == PGBT_DOUBLE ||
1328  rval->type == PGBT_DOUBLE) && func != PGBENCH_MOD)
1329  {
1330  double ld,
1331  rd;
1332 
1333  if (!coerceToDouble(lval, &ld) ||
1334  !coerceToDouble(rval, &rd))
1335  return false;
1336 
1337  switch (func)
1338  {
1339  case PGBENCH_ADD:
1340  setDoubleValue(retval, ld + rd);
1341  return true;
1342 
1343  case PGBENCH_SUB:
1344  setDoubleValue(retval, ld - rd);
1345  return true;
1346 
1347  case PGBENCH_MUL:
1348  setDoubleValue(retval, ld * rd);
1349  return true;
1350 
1351  case PGBENCH_DIV:
1352  setDoubleValue(retval, ld / rd);
1353  return true;
1354 
1355  default:
1356  /* cannot get here */
1357  Assert(0);
1358  }
1359  }
1360  else /* we have integer operands, or % */
1361  {
1362  int64 li,
1363  ri;
1364 
1365  if (!coerceToInt(lval, &li) ||
1366  !coerceToInt(rval, &ri))
1367  return false;
1368 
1369  switch (func)
1370  {
1371  case PGBENCH_ADD:
1372  setIntValue(retval, li + ri);
1373  return true;
1374 
1375  case PGBENCH_SUB:
1376  setIntValue(retval, li - ri);
1377  return true;
1378 
1379  case PGBENCH_MUL:
1380  setIntValue(retval, li * ri);
1381  return true;
1382 
1383  case PGBENCH_DIV:
1384  case PGBENCH_MOD:
1385  if (ri == 0)
1386  {
1387  fprintf(stderr, "division by zero\n");
1388  return false;
1389  }
1390  /* special handling of -1 divisor */
1391  if (ri == -1)
1392  {
1393  if (func == PGBENCH_DIV)
1394  {
1395  /* overflow check (needed for INT64_MIN) */
1396  if (li == PG_INT64_MIN)
1397  {
1398  fprintf(stderr, "bigint out of range\n");
1399  return false;
1400  }
1401  else
1402  setIntValue(retval, -li);
1403  }
1404  else
1405  setIntValue(retval, 0);
1406  return true;
1407  }
1408  /* else divisor is not -1 */
1409  if (func == PGBENCH_DIV)
1410  setIntValue(retval, li / ri);
1411  else /* func == PGBENCH_MOD */
1412  setIntValue(retval, li % ri);
1413 
1414  return true;
1415 
1416  default:
1417  /* cannot get here */
1418  Assert(0);
1419  }
1420  }
1421  }
1422 
1423  /* no arguments */
1424  case PGBENCH_PI:
1425  setDoubleValue(retval, M_PI);
1426  return true;
1427 
1428  /* 1 overloaded argument */
1429  case PGBENCH_ABS:
1430  {
1431  PgBenchValue *varg = &vargs[0];
1432 
1433  Assert(nargs == 1);
1434 
1435  if (varg->type == PGBT_INT)
1436  {
1437  int64 i = varg->u.ival;
1438 
1439  setIntValue(retval, i < 0 ? -i : i);
1440  }
1441  else
1442  {
1443  double d = varg->u.dval;
1444 
1445  Assert(varg->type == PGBT_DOUBLE);
1446  setDoubleValue(retval, d < 0.0 ? -d : d);
1447  }
1448 
1449  return true;
1450  }
1451 
1452  case PGBENCH_DEBUG:
1453  {
1454  PgBenchValue *varg = &vargs[0];
1455 
1456  Assert(nargs == 1);
1457 
1458  fprintf(stderr, "debug(script=%d,command=%d): ",
1459  st->use_file, st->command + 1);
1460 
1461  if (varg->type == PGBT_INT)
1462  fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
1463  else
1464  {
1465  Assert(varg->type == PGBT_DOUBLE);
1466  fprintf(stderr, "double %.*g\n", DBL_DIG, varg->u.dval);
1467  }
1468 
1469  *retval = *varg;
1470 
1471  return true;
1472  }
1473 
1474  /* 1 double argument */
1475  case PGBENCH_DOUBLE:
1476  case PGBENCH_SQRT:
1477  {
1478  double dval;
1479 
1480  Assert(nargs == 1);
1481 
1482  if (!coerceToDouble(&vargs[0], &dval))
1483  return false;
1484 
1485  if (func == PGBENCH_SQRT)
1486  dval = sqrt(dval);
1487 
1488  setDoubleValue(retval, dval);
1489  return true;
1490  }
1491 
1492  /* 1 int argument */
1493  case PGBENCH_INT:
1494  {
1495  int64 ival;
1496 
1497  Assert(nargs == 1);
1498 
1499  if (!coerceToInt(&vargs[0], &ival))
1500  return false;
1501 
1502  setIntValue(retval, ival);
1503  return true;
1504  }
1505 
1506  /* variable number of arguments */
1507  case PGBENCH_LEAST:
1508  case PGBENCH_GREATEST:
1509  {
1510  bool havedouble;
1511  int i;
1512 
1513  Assert(nargs >= 1);
1514 
1515  /* need double result if any input is double */
1516  havedouble = false;
1517  for (i = 0; i < nargs; i++)
1518  {
1519  if (vargs[i].type == PGBT_DOUBLE)
1520  {
1521  havedouble = true;
1522  break;
1523  }
1524  }
1525  if (havedouble)
1526  {
1527  double extremum;
1528 
1529  if (!coerceToDouble(&vargs[0], &extremum))
1530  return false;
1531  for (i = 1; i < nargs; i++)
1532  {
1533  double dval;
1534 
1535  if (!coerceToDouble(&vargs[i], &dval))
1536  return false;
1537  if (func == PGBENCH_LEAST)
1538  extremum = Min(extremum, dval);
1539  else
1540  extremum = Max(extremum, dval);
1541  }
1542  setDoubleValue(retval, extremum);
1543  }
1544  else
1545  {
1546  int64 extremum;
1547 
1548  if (!coerceToInt(&vargs[0], &extremum))
1549  return false;
1550  for (i = 1; i < nargs; i++)
1551  {
1552  int64 ival;
1553 
1554  if (!coerceToInt(&vargs[i], &ival))
1555  return false;
1556  if (func == PGBENCH_LEAST)
1557  extremum = Min(extremum, ival);
1558  else
1559  extremum = Max(extremum, ival);
1560  }
1561  setIntValue(retval, extremum);
1562  }
1563  return true;
1564  }
1565 
1566  /* random functions */
1567  case PGBENCH_RANDOM:
1570  {
1571  int64 imin,
1572  imax;
1573 
1574  Assert(nargs >= 2);
1575 
1576  if (!coerceToInt(&vargs[0], &imin) ||
1577  !coerceToInt(&vargs[1], &imax))
1578  return false;
1579 
1580  /* check random range */
1581  if (imin > imax)
1582  {
1583  fprintf(stderr, "empty range given to random\n");
1584  return false;
1585  }
1586  else if (imax - imin < 0 || (imax - imin) + 1 < 0)
1587  {
1588  /* prevent int overflows in random functions */
1589  fprintf(stderr, "random range is too large\n");
1590  return false;
1591  }
1592 
1593  if (func == PGBENCH_RANDOM)
1594  {
1595  Assert(nargs == 2);
1596  setIntValue(retval, getrand(thread, imin, imax));
1597  }
1598  else /* gaussian & exponential */
1599  {
1600  double param;
1601 
1602  Assert(nargs == 3);
1603 
1604  if (!coerceToDouble(&vargs[2], &param))
1605  return false;
1606 
1607  if (func == PGBENCH_RANDOM_GAUSSIAN)
1608  {
1609  if (param < MIN_GAUSSIAN_PARAM)
1610  {
1611  fprintf(stderr,
1612  "gaussian parameter must be at least %f "
1613  "(not %f)\n", MIN_GAUSSIAN_PARAM, param);
1614  return false;
1615  }
1616 
1617  setIntValue(retval,
1618  getGaussianRand(thread, imin, imax, param));
1619  }
1620  else /* exponential */
1621  {
1622  if (param <= 0.0)
1623  {
1624  fprintf(stderr,
1625  "exponential parameter must be greater than zero"
1626  " (got %f)\n", param);
1627  return false;
1628  }
1629 
1630  setIntValue(retval,
1631  getExponentialRand(thread, imin, imax, param));
1632  }
1633  }
1634 
1635  return true;
1636  }
1637 
1638  default:
1639  /* cannot get here */
1640  Assert(0);
1641  /* dead code to avoid a compiler warning */
1642  return false;
1643  }
1644 }
1645 
1646 /*
1647  * Recursive evaluation of an expression in a pgbench script
1648  * using the current state of variables.
1649  * Returns whether the evaluation was ok,
1650  * the value itself is returned through the retval pointer.
1651  */
1652 static bool
1653 evaluateExpr(TState *thread, CState *st, PgBenchExpr *expr, PgBenchValue *retval)
1654 {
1655  switch (expr->etype)
1656  {
1657  case ENODE_CONSTANT:
1658  {
1659  *retval = expr->u.constant;
1660  return true;
1661  }
1662 
1663  case ENODE_VARIABLE:
1664  {
1665  Variable *var;
1666 
1667  if ((var = lookupVariable(st, expr->u.variable.varname)) == NULL)
1668  {
1669  fprintf(stderr, "undefined variable \"%s\"\n",
1670  expr->u.variable.varname);
1671  return false;
1672  }
1673 
1674  if (!makeVariableNumeric(var))
1675  return false;
1676 
1677  *retval = var->num_value;
1678  return true;
1679  }
1680 
1681  case ENODE_FUNCTION:
1682  return evalFunc(thread, st,
1683  expr->u.function.function,
1684  expr->u.function.args,
1685  retval);
1686 
1687  default:
1688  /* internal error which should never occur */
1689  fprintf(stderr, "unexpected enode type in evaluation: %d\n",
1690  expr->etype);
1691  exit(1);
1692  }
1693 }
1694 
1695 /*
1696  * Run a shell command. The result is assigned to the variable if not NULL.
1697  * Return true if succeeded, or false on error.
1698  */
1699 static bool
1700 runShellCommand(CState *st, char *variable, char **argv, int argc)
1701 {
1702  char command[SHELL_COMMAND_SIZE];
1703  int i,
1704  len = 0;
1705  FILE *fp;
1706  char res[64];
1707  char *endptr;
1708  int retval;
1709 
1710  /*----------
1711  * Join arguments with whitespace separators. Arguments starting with
1712  * exactly one colon are treated as variables:
1713  * name - append a string "name"
1714  * :var - append a variable named 'var'
1715  * ::name - append a string ":name"
1716  *----------
1717  */
1718  for (i = 0; i < argc; i++)
1719  {
1720  char *arg;
1721  int arglen;
1722 
1723  if (argv[i][0] != ':')
1724  {
1725  arg = argv[i]; /* a string literal */
1726  }
1727  else if (argv[i][1] == ':')
1728  {
1729  arg = argv[i] + 1; /* a string literal starting with colons */
1730  }
1731  else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
1732  {
1733  fprintf(stderr, "%s: undefined variable \"%s\"\n",
1734  argv[0], argv[i]);
1735  return false;
1736  }
1737 
1738  arglen = strlen(arg);
1739  if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
1740  {
1741  fprintf(stderr, "%s: shell command is too long\n", argv[0]);
1742  return false;
1743  }
1744 
1745  if (i > 0)
1746  command[len++] = ' ';
1747  memcpy(command + len, arg, arglen);
1748  len += arglen;
1749  }
1750 
1751  command[len] = '\0';
1752 
1753  /* Fast path for non-assignment case */
1754  if (variable == NULL)
1755  {
1756  if (system(command))
1757  {
1758  if (!timer_exceeded)
1759  fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1760  return false;
1761  }
1762  return true;
1763  }
1764 
1765  /* Execute the command with pipe and read the standard output. */
1766  if ((fp = popen(command, "r")) == NULL)
1767  {
1768  fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1769  return false;
1770  }
1771  if (fgets(res, sizeof(res), fp) == NULL)
1772  {
1773  if (!timer_exceeded)
1774  fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
1775  (void) pclose(fp);
1776  return false;
1777  }
1778  if (pclose(fp) < 0)
1779  {
1780  fprintf(stderr, "%s: could not close shell command\n", argv[0]);
1781  return false;
1782  }
1783 
1784  /* Check whether the result is an integer and assign it to the variable */
1785  retval = (int) strtol(res, &endptr, 10);
1786  while (*endptr != '\0' && isspace((unsigned char) *endptr))
1787  endptr++;
1788  if (*res == '\0' || *endptr != '\0')
1789  {
1790  fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
1791  argv[0], res);
1792  return false;
1793  }
1794  if (!putVariableInt(st, "setshell", variable, retval))
1795  return false;
1796 
1797 #ifdef DEBUG
1798  printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
1799 #endif
1800  return true;
1801 }
1802 
1803 #define MAX_PREPARE_NAME 32
1804 static void
1805 preparedStatementName(char *buffer, int file, int state)
1806 {
1807  sprintf(buffer, "P%d_%d", file, state);
1808 }
1809 
1810 static void
1811 commandFailed(CState *st, char *message)
1812 {
1813  fprintf(stderr,
1814  "client %d aborted in command %d of script %d; %s\n",
1815  st->id, st->command, st->use_file, message);
1816 }
1817 
1818 /* return a script number with a weighted choice. */
1819 static int
1821 {
1822  int i = 0;
1823  int64 w;
1824 
1825  if (num_scripts == 1)
1826  return 0;
1827 
1828  w = getrand(thread, 0, total_weight - 1);
1829  do
1830  {
1831  w -= sql_script[i++].weight;
1832  } while (w >= 0);
1833 
1834  return i - 1;
1835 }
1836 
1837 /* Send a SQL command, using the chosen querymode */
1838 static bool
1840 {
1841  int r;
1842 
1843  if (querymode == QUERY_SIMPLE)
1844  {
1845  char *sql;
1846 
1847  sql = pg_strdup(command->argv[0]);
1848  sql = assignVariables(st, sql);
1849 
1850  if (debug)
1851  fprintf(stderr, "client %d sending %s\n", st->id, sql);
1852  r = PQsendQuery(st->con, sql);
1853  free(sql);
1854  }
1855  else if (querymode == QUERY_EXTENDED)
1856  {
1857  const char *sql = command->argv[0];
1858  const char *params[MAX_ARGS];
1859 
1860  getQueryParams(st, command, params);
1861 
1862  if (debug)
1863  fprintf(stderr, "client %d sending %s\n", st->id, sql);
1864  r = PQsendQueryParams(st->con, sql, command->argc - 1,
1865  NULL, params, NULL, NULL, 0);
1866  }
1867  else if (querymode == QUERY_PREPARED)
1868  {
1869  char name[MAX_PREPARE_NAME];
1870  const char *params[MAX_ARGS];
1871 
1872  if (!st->prepared[st->use_file])
1873  {
1874  int j;
1875  Command **commands = sql_script[st->use_file].commands;
1876 
1877  for (j = 0; commands[j] != NULL; j++)
1878  {
1879  PGresult *res;
1880  char name[MAX_PREPARE_NAME];
1881 
1882  if (commands[j]->type != SQL_COMMAND)
1883  continue;
1884  preparedStatementName(name, st->use_file, j);
1885  res = PQprepare(st->con, name,
1886  commands[j]->argv[0], commands[j]->argc - 1, NULL);
1887  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1888  fprintf(stderr, "%s", PQerrorMessage(st->con));
1889  PQclear(res);
1890  }
1891  st->prepared[st->use_file] = true;
1892  }
1893 
1894  getQueryParams(st, command, params);
1895  preparedStatementName(name, st->use_file, st->command);
1896 
1897  if (debug)
1898  fprintf(stderr, "client %d sending %s\n", st->id, name);
1899  r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1900  params, NULL, NULL, 0);
1901  }
1902  else /* unknown sql mode */
1903  r = 0;
1904 
1905  if (r == 0)
1906  {
1907  if (debug)
1908  fprintf(stderr, "client %d could not send %s\n",
1909  st->id, command->argv[0]);
1910  st->ecnt++;
1911  return false;
1912  }
1913  else
1914  return true;
1915 }
1916 
1917 /*
1918  * Parse the argument to a \sleep command, and return the requested amount
1919  * of delay, in microseconds. Returns true on success, false on error.
1920  */
1921 static bool
1922 evaluateSleep(CState *st, int argc, char **argv, int *usecs)
1923 {
1924  char *var;
1925  int usec;
1926 
1927  if (*argv[1] == ':')
1928  {
1929  if ((var = getVariable(st, argv[1] + 1)) == NULL)
1930  {
1931  fprintf(stderr, "%s: undefined variable \"%s\"\n",
1932  argv[0], argv[1]);
1933  return false;
1934  }
1935  usec = atoi(var);
1936  }
1937  else
1938  usec = atoi(argv[1]);
1939 
1940  if (argc > 2)
1941  {
1942  if (pg_strcasecmp(argv[2], "ms") == 0)
1943  usec *= 1000;
1944  else if (pg_strcasecmp(argv[2], "s") == 0)
1945  usec *= 1000000;
1946  }
1947  else
1948  usec *= 1000000;
1949 
1950  *usecs = usec;
1951  return true;
1952 }
1953 
1954 /*
1955  * Advance the state machine of a connection, if possible.
1956  */
1957 static void
1958 doCustom(TState *thread, CState *st, StatsData *agg)
1959 {
1960  PGresult *res;
1961  Command *command;
1962  instr_time now;
1963  bool end_tx_processed = false;
1964  int64 wait;
1965 
1966  /*
1967  * gettimeofday() isn't free, so we get the current timestamp lazily the
1968  * first time it's needed, and reuse the same value throughout this
1969  * function after that. This also ensures that e.g. the calculated
1970  * latency reported in the log file and in the totals are the same. Zero
1971  * means "not set yet". Reset "now" when we execute shell commands or
1972  * expressions, which might take a non-negligible amount of time, though.
1973  */
1974  INSTR_TIME_SET_ZERO(now);
1975 
1976  /*
1977  * Loop in the state machine, until we have to wait for a result from the
1978  * server (or have to sleep, for throttling or for \sleep).
1979  *
1980  * Note: In the switch-statement below, 'break' will loop back here,
1981  * meaning "continue in the state machine". Return is used to return to
1982  * the caller.
1983  */
1984  for (;;)
1985  {
1986  switch (st->state)
1987  {
1988  /*
1989  * Select transaction to run.
1990  */
1991  case CSTATE_CHOOSE_SCRIPT:
1992 
1993  st->use_file = chooseScript(thread);
1994 
1995  if (debug)
1996  fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
1997  sql_script[st->use_file].desc);
1998 
1999  if (throttle_delay > 0)
2001  else
2002  st->state = CSTATE_START_TX;
2003  break;
2004 
2005  /*
2006  * Handle throttling once per transaction by sleeping.
2007  */
2008  case CSTATE_START_THROTTLE:
2009 
2010  /*
2011  * Generate a delay such that the series of delays will
2012  * approximate a Poisson distribution centered on the
2013  * throttle_delay time.
2014  *
2015  * If transactions are too slow or a given wait is shorter
2016  * than a transaction, the next transaction will start right
2017  * away.
2018  */
2019  Assert(throttle_delay > 0);
2020  wait = getPoissonRand(thread, throttle_delay);
2021 
2022  thread->throttle_trigger += wait;
2023  st->txn_scheduled = thread->throttle_trigger;
2024 
2025  /*
2026  * stop client if next transaction is beyond pgbench end of
2027  * execution
2028  */
2029  if (duration > 0 && st->txn_scheduled > end_time)
2030  {
2031  st->state = CSTATE_FINISHED;
2032  break;
2033  }
2034 
2035  /*
2036  * If this --latency-limit is used, and this slot is already
2037  * late so that the transaction will miss the latency limit
2038  * even if it completed immediately, we skip this time slot
2039  * and iterate till the next slot that isn't late yet.
2040  */
2041  if (latency_limit)
2042  {
2043  int64 now_us;
2044 
2045  if (INSTR_TIME_IS_ZERO(now))
2047  now_us = INSTR_TIME_GET_MICROSEC(now);
2048  while (thread->throttle_trigger < now_us - latency_limit)
2049  {
2050  processXactStats(thread, st, &now, true, agg);
2051  /* next rendez-vous */
2052  wait = getPoissonRand(thread, throttle_delay);
2053  thread->throttle_trigger += wait;
2054  st->txn_scheduled = thread->throttle_trigger;
2055  }
2056  }
2057 
2058  st->state = CSTATE_THROTTLE;
2059  if (debug)
2060  fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
2061  st->id, wait);
2062  break;
2063 
2064  /*
2065  * Wait until it's time to start next transaction.
2066  */
2067  case CSTATE_THROTTLE:
2068  if (INSTR_TIME_IS_ZERO(now))
2070  if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
2071  return; /* Still sleeping, nothing to do here */
2072 
2073  /* Else done sleeping, start the transaction */
2074  st->state = CSTATE_START_TX;
2075  break;
2076 
2077  /* Start new transaction */
2078  case CSTATE_START_TX:
2079 
2080  /*
2081  * Establish connection on first call, or if is_connect is
2082  * true.
2083  */
2084  if (st->con == NULL)
2085  {
2086  instr_time start;
2087 
2088  if (INSTR_TIME_IS_ZERO(now))
2090  start = now;
2091  if ((st->con = doConnect()) == NULL)
2092  {
2093  fprintf(stderr, "client %d aborted while establishing connection\n",
2094  st->id);
2095  st->state = CSTATE_ABORTED;
2096  break;
2097  }
2099  INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
2100 
2101  /* Reset session-local state */
2102  memset(st->prepared, 0, sizeof(st->prepared));
2103  }
2104 
2105  /*
2106  * Record transaction start time under logging, progress or
2107  * throttling.
2108  */
2109  if (use_log || progress || throttle_delay || latency_limit ||
2110  per_script_stats)
2111  {
2112  if (INSTR_TIME_IS_ZERO(now))
2114  st->txn_begin = now;
2115 
2116  /*
2117  * When not throttling, this is also the transaction's
2118  * scheduled start time.
2119  */
2120  if (!throttle_delay)
2122  }
2123 
2124  /* Begin with the first command */
2125  st->command = 0;
2127  break;
2128 
2129  /*
2130  * Send a command to server (or execute a meta-command)
2131  */
2132  case CSTATE_START_COMMAND:
2133  command = sql_script[st->use_file].commands[st->command];
2134 
2135  /*
2136  * If we reached the end of the script, move to end-of-xact
2137  * processing.
2138  */
2139  if (command == NULL)
2140  {
2141  st->state = CSTATE_END_TX;
2142  break;
2143  }
2144 
2145  /*
2146  * Record statement start time if per-command latencies are
2147  * requested
2148  */
2149  if (is_latencies)
2150  {
2151  if (INSTR_TIME_IS_ZERO(now))
2153  st->stmt_begin = now;
2154  }
2155 
2156  if (command->type == SQL_COMMAND)
2157  {
2158  if (!sendCommand(st, command))
2159  {
2160  /*
2161  * Failed. Stay in CSTATE_START_COMMAND state, to
2162  * retry. ??? What the point or retrying? Should
2163  * rather abort?
2164  */
2165  return;
2166  }
2167  else
2168  st->state = CSTATE_WAIT_RESULT;
2169  }
2170  else if (command->type == META_COMMAND)
2171  {
2172  int argc = command->argc,
2173  i;
2174  char **argv = command->argv;
2175 
2176  if (debug)
2177  {
2178  fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
2179  for (i = 1; i < argc; i++)
2180  fprintf(stderr, " %s", argv[i]);
2181  fprintf(stderr, "\n");
2182  }
2183 
2184  if (pg_strcasecmp(argv[0], "sleep") == 0)
2185  {
2186  /*
2187  * A \sleep doesn't execute anything, we just get the
2188  * delay from the argument, and enter the CSTATE_SLEEP
2189  * state. (The per-command latency will be recorded
2190  * in CSTATE_SLEEP state, not here, after the delay
2191  * has elapsed.)
2192  */
2193  int usec;
2194 
2195  if (!evaluateSleep(st, argc, argv, &usec))
2196  {
2197  commandFailed(st, "execution of meta-command 'sleep' failed");
2198  st->state = CSTATE_ABORTED;
2199  break;
2200  }
2201 
2202  if (INSTR_TIME_IS_ZERO(now))
2204  st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
2205  st->state = CSTATE_SLEEP;
2206  break;
2207  }
2208  else
2209  {
2210  if (pg_strcasecmp(argv[0], "set") == 0)
2211  {
2212  PgBenchExpr *expr = command->expr;
2213  PgBenchValue result;
2214 
2215  if (!evaluateExpr(thread, st, expr, &result))
2216  {
2217  commandFailed(st, "evaluation of meta-command 'set' failed");
2218  st->state = CSTATE_ABORTED;
2219  break;
2220  }
2221 
2222  if (!putVariableNumber(st, argv[0], argv[1], &result))
2223  {
2224  commandFailed(st, "assignment of meta-command 'set' failed");
2225  st->state = CSTATE_ABORTED;
2226  break;
2227  }
2228  }
2229  else if (pg_strcasecmp(argv[0], "setshell") == 0)
2230  {
2231  bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
2232 
2233  if (timer_exceeded) /* timeout */
2234  {
2235  st->state = CSTATE_FINISHED;
2236  break;
2237  }
2238  else if (!ret) /* on error */
2239  {
2240  commandFailed(st, "execution of meta-command 'setshell' failed");
2241  st->state = CSTATE_ABORTED;
2242  break;
2243  }
2244  else
2245  {
2246  /* succeeded */
2247  }
2248  }
2249  else if (pg_strcasecmp(argv[0], "shell") == 0)
2250  {
2251  bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
2252 
2253  if (timer_exceeded) /* timeout */
2254  {
2255  st->state = CSTATE_FINISHED;
2256  break;
2257  }
2258  else if (!ret) /* on error */
2259  {
2260  commandFailed(st, "execution of meta-command 'shell' failed");
2261  st->state = CSTATE_ABORTED;
2262  break;
2263  }
2264  else
2265  {
2266  /* succeeded */
2267  }
2268  }
2269 
2270  /*
2271  * executing the expression or shell command might
2272  * take a non-negligible amount of time, so reset
2273  * 'now'
2274  */
2275  INSTR_TIME_SET_ZERO(now);
2276 
2277  st->state = CSTATE_END_COMMAND;
2278  }
2279  }
2280  break;
2281 
2282  /*
2283  * Wait for the current SQL command to complete
2284  */
2285  case CSTATE_WAIT_RESULT:
2286  command = sql_script[st->use_file].commands[st->command];
2287  if (debug)
2288  fprintf(stderr, "client %d receiving\n", st->id);
2289  if (!PQconsumeInput(st->con))
2290  { /* there's something wrong */
2291  commandFailed(st, "perhaps the backend died while processing");
2292  st->state = CSTATE_ABORTED;
2293  break;
2294  }
2295  if (PQisBusy(st->con))
2296  return; /* don't have the whole result yet */
2297 
2298  /*
2299  * Read and discard the query result;
2300  */
2301  res = PQgetResult(st->con);
2302  switch (PQresultStatus(res))
2303  {
2304  case PGRES_COMMAND_OK:
2305  case PGRES_TUPLES_OK:
2306  case PGRES_EMPTY_QUERY:
2307  /* OK */
2308  PQclear(res);
2309  discard_response(st);
2310  st->state = CSTATE_END_COMMAND;
2311  break;
2312  default:
2313  commandFailed(st, PQerrorMessage(st->con));
2314  PQclear(res);
2315  st->state = CSTATE_ABORTED;
2316  break;
2317  }
2318  break;
2319 
2320  /*
2321  * Wait until sleep is done. This state is entered after a
2322  * \sleep metacommand. The behavior is similar to
2323  * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
2324  * instead of CSTATE_START_TX.
2325  */
2326  case CSTATE_SLEEP:
2327  if (INSTR_TIME_IS_ZERO(now))
2329  if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
2330  return; /* Still sleeping, nothing to do here */
2331  /* Else done sleeping. */
2332  st->state = CSTATE_END_COMMAND;
2333  break;
2334 
2335  /*
2336  * End of command: record stats and proceed to next command.
2337  */
2338  case CSTATE_END_COMMAND:
2339 
2340  /*
2341  * command completed: accumulate per-command execution times
2342  * in thread-local data structure, if per-command latencies
2343  * are requested.
2344  */
2345  if (is_latencies)
2346  {
2347  if (INSTR_TIME_IS_ZERO(now))
2349 
2350  /* XXX could use a mutex here, but we choose not to */
2351  command = sql_script[st->use_file].commands[st->command];
2352  addToSimpleStats(&command->stats,
2353  INSTR_TIME_GET_DOUBLE(now) -
2355  }
2356 
2357  /* Go ahead with next command */
2358  st->command++;
2360  break;
2361 
2362  /*
2363  * End of transaction.
2364  */
2365  case CSTATE_END_TX:
2366 
2367  /*
2368  * transaction finished: calculate latency and log the
2369  * transaction
2370  */
2371  if (progress || throttle_delay || latency_limit ||
2372  per_script_stats || use_log)
2373  processXactStats(thread, st, &now, false, agg);
2374  else
2375  thread->stats.cnt++;
2376 
2377  if (is_connect)
2378  {
2379  PQfinish(st->con);
2380  st->con = NULL;
2381  INSTR_TIME_SET_ZERO(now);
2382  }
2383 
2384  ++st->cnt;
2385  if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
2386  {
2387  /* exit success */
2388  st->state = CSTATE_FINISHED;
2389  break;
2390  }
2391 
2392  /*
2393  * No transaction is underway anymore.
2394  */
2396 
2397  /*
2398  * If we paced through all commands in the script in this
2399  * loop, without returning to the caller even once, do it now.
2400  * This gives the thread a chance to process other
2401  * connections, and to do progress reporting. This can
2402  * currently only happen if the script consists entirely of
2403  * meta-commands.
2404  */
2405  if (end_tx_processed)
2406  return;
2407  else
2408  {
2409  end_tx_processed = true;
2410  break;
2411  }
2412 
2413  /*
2414  * Final states. Close the connection if it's still open.
2415  */
2416  case CSTATE_ABORTED:
2417  case CSTATE_FINISHED:
2418  if (st->con != NULL)
2419  {
2420  PQfinish(st->con);
2421  st->con = NULL;
2422  }
2423  return;
2424  }
2425  }
2426 }
2427 
2428 /*
2429  * Print log entry after completing one transaction.
2430  *
2431  * We print Unix-epoch timestamps in the log, so that entries can be
2432  * correlated against other logs. On some platforms this could be obtained
2433  * from the instr_time reading the caller has, but rather than get entangled
2434  * with that, we just eat the cost of an extra syscall in all cases.
2435  */
2436 static void
2437 doLog(TState *thread, CState *st,
2438  StatsData *agg, bool skipped, double latency, double lag)
2439 {
2440  FILE *logfile = thread->logfile;
2441 
2442  Assert(use_log);
2443 
2444  /*
2445  * Skip the log entry if sampling is enabled and this row doesn't belong
2446  * to the random sample.
2447  */
2448  if (sample_rate != 0.0 &&
2449  pg_erand48(thread->random_state) > sample_rate)
2450  return;
2451 
2452  /* should we aggregate the results or not? */
2453  if (agg_interval > 0)
2454  {
2455  /*
2456  * Loop until we reach the interval of the current moment, and print
2457  * any empty intervals in between (this may happen with very low tps,
2458  * e.g. --rate=0.1).
2459  */
2460  time_t now = time(NULL);
2461 
2462  while (agg->start_time + agg_interval <= now)
2463  {
2464  /* print aggregated report to logfile */
2465  fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
2466  (long) agg->start_time,
2467  agg->cnt,
2468  agg->latency.sum,
2469  agg->latency.sum2,
2470  agg->latency.min,
2471  agg->latency.max);
2472  if (throttle_delay)
2473  {
2474  fprintf(logfile, " %.0f %.0f %.0f %.0f",
2475  agg->lag.sum,
2476  agg->lag.sum2,
2477  agg->lag.min,
2478  agg->lag.max);
2479  if (latency_limit)
2480  fprintf(logfile, " " INT64_FORMAT, agg->skipped);
2481  }
2482  fputc('\n', logfile);
2483 
2484  /* reset data and move to next interval */
2485  initStats(agg, agg->start_time + agg_interval);
2486  }
2487 
2488  /* accumulate the current transaction */
2489  accumStats(agg, skipped, latency, lag);
2490  }
2491  else
2492  {
2493  /* no, print raw transactions */
2494  struct timeval tv;
2495 
2496  gettimeofday(&tv, NULL);
2497  if (skipped)
2498  fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
2499  st->id, st->cnt, st->use_file,
2500  (long) tv.tv_sec, (long) tv.tv_usec);
2501  else
2502  fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
2503  st->id, st->cnt, latency, st->use_file,
2504  (long) tv.tv_sec, (long) tv.tv_usec);
2505  if (throttle_delay)
2506  fprintf(logfile, " %.0f", lag);
2507  fputc('\n', logfile);
2508  }
2509 }
2510 
2511 /*
2512  * Accumulate and report statistics at end of a transaction.
2513  *
2514  * (This is also called when a transaction is late and thus skipped.)
2515  */
2516 static void
2518  bool skipped, StatsData *agg)
2519 {
2520  double latency = 0.0,
2521  lag = 0.0;
2522 
2523  if ((!skipped) && INSTR_TIME_IS_ZERO(*now))
2524  INSTR_TIME_SET_CURRENT(*now);
2525 
2526  if (!skipped)
2527  {
2528  /* compute latency & lag */
2529  latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
2531  }
2532 
2533  if (progress || throttle_delay || latency_limit)
2534  {
2535  accumStats(&thread->stats, skipped, latency, lag);
2536 
2537  /* count transactions over the latency limit, if needed */
2538  if (latency_limit && latency > latency_limit)
2539  thread->latency_late++;
2540  }
2541  else
2542  thread->stats.cnt++;
2543 
2544  if (use_log)
2545  doLog(thread, st, agg, skipped, latency, lag);
2546 
2547  /* XXX could use a mutex here, but we choose not to */
2548  if (per_script_stats)
2549  accumStats(&sql_script[st->use_file].stats, skipped, latency, lag);
2550 }
2551 
2552 
2553 /* discard connections */
2554 static void
2556 {
2557  int i;
2558 
2559  for (i = 0; i < length; i++)
2560  {
2561  if (state[i].con)
2562  {
2563  PQfinish(state[i].con);
2564  state[i].con = NULL;
2565  }
2566  }
2567 }
2568 
2569 /* create tables and setup data */
2570 static void
2571 init(bool is_no_vacuum)
2572 {
2573 /*
2574  * The scale factor at/beyond which 32-bit integers are insufficient for
2575  * storing TPC-B account IDs.
2576  *
2577  * Although the actual threshold is 21474, we use 20000 because it is easier to
2578  * document and remember, and isn't that far away from the real threshold.
2579  */
2580 #define SCALE_32BIT_THRESHOLD 20000
2581 
2582  /*
2583  * Note: TPC-B requires at least 100 bytes per row, and the "filler"
2584  * fields in these table declarations were intended to comply with that.
2585  * The pgbench_accounts table complies with that because the "filler"
2586  * column is set to blank-padded empty string. But for all other tables
2587  * the columns default to NULL and so don't actually take any space. We
2588  * could fix that by giving them non-null default values. However, that
2589  * would completely break comparability of pgbench results with prior
2590  * versions. Since pgbench has never pretended to be fully TPC-B compliant
2591  * anyway, we stick with the historical behavior.
2592  */
2593  struct ddlinfo
2594  {
2595  const char *table; /* table name */
2596  const char *smcols; /* column decls if accountIDs are 32 bits */
2597  const char *bigcols; /* column decls if accountIDs are 64 bits */
2598  int declare_fillfactor;
2599  };
2600  static const struct ddlinfo DDLs[] = {
2601  {
2602  "pgbench_history",
2603  "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
2604  "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
2605  0
2606  },
2607  {
2608  "pgbench_tellers",
2609  "tid int not null,bid int,tbalance int,filler char(84)",
2610  "tid int not null,bid int,tbalance int,filler char(84)",
2611  1
2612  },
2613  {
2614  "pgbench_accounts",
2615  "aid int not null,bid int,abalance int,filler char(84)",
2616  "aid bigint not null,bid int,abalance int,filler char(84)",
2617  1
2618  },
2619  {
2620  "pgbench_branches",
2621  "bid int not null,bbalance int,filler char(88)",
2622  "bid int not null,bbalance int,filler char(88)",
2623  1
2624  }
2625  };
2626  static const char *const DDLINDEXes[] = {
2627  "alter table pgbench_branches add primary key (bid)",
2628  "alter table pgbench_tellers add primary key (tid)",
2629  "alter table pgbench_accounts add primary key (aid)"
2630  };
2631  static const char *const DDLKEYs[] = {
2632  "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
2633  "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
2634  "alter table pgbench_history add foreign key (bid) references pgbench_branches",
2635  "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
2636  "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
2637  };
2638 
2639  PGconn *con;
2640  PGresult *res;
2641  char sql[256];
2642  int i;
2643  int64 k;
2644 
2645  /* used to track elapsed time and estimate of the remaining time */
2646  instr_time start,
2647  diff;
2648  double elapsed_sec,
2649  remaining_sec;
2650  int log_interval = 1;
2651 
2652  if ((con = doConnect()) == NULL)
2653  exit(1);
2654 
2655  for (i = 0; i < lengthof(DDLs); i++)
2656  {
2657  char opts[256];
2658  char buffer[256];
2659  const struct ddlinfo *ddl = &DDLs[i];
2660  const char *cols;
2661 
2662  /* Remove old table, if it exists. */
2663  snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
2664  executeStatement(con, buffer);
2665 
2666  /* Construct new create table statement. */
2667  opts[0] = '\0';
2668  if (ddl->declare_fillfactor)
2669  snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
2670  " with (fillfactor=%d)", fillfactor);
2671  if (tablespace != NULL)
2672  {
2673  char *escape_tablespace;
2674 
2675  escape_tablespace = PQescapeIdentifier(con, tablespace,
2676  strlen(tablespace));
2677  snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
2678  " tablespace %s", escape_tablespace);
2679  PQfreemem(escape_tablespace);
2680  }
2681 
2682  cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
2683 
2684  snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
2685  unlogged_tables ? " unlogged" : "",
2686  ddl->table, cols, opts);
2687 
2688  executeStatement(con, buffer);
2689  }
2690 
2691  executeStatement(con, "begin");
2692 
2693  for (i = 0; i < nbranches * scale; i++)
2694  {
2695  /* "filler" column defaults to NULL */
2696  snprintf(sql, sizeof(sql),
2697  "insert into pgbench_branches(bid,bbalance) values(%d,0)",
2698  i + 1);
2699  executeStatement(con, sql);
2700  }
2701 
2702  for (i = 0; i < ntellers * scale; i++)
2703  {
2704  /* "filler" column defaults to NULL */
2705  snprintf(sql, sizeof(sql),
2706  "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
2707  i + 1, i / ntellers + 1);
2708  executeStatement(con, sql);
2709  }
2710 
2711  executeStatement(con, "commit");
2712 
2713  /*
2714  * fill the pgbench_accounts table with some data
2715  */
2716  fprintf(stderr, "creating tables...\n");
2717 
2718  executeStatement(con, "begin");
2719  executeStatement(con, "truncate pgbench_accounts");
2720 
2721  res = PQexec(con, "copy pgbench_accounts from stdin");
2722  if (PQresultStatus(res) != PGRES_COPY_IN)
2723  {
2724  fprintf(stderr, "%s", PQerrorMessage(con));
2725  exit(1);
2726  }
2727  PQclear(res);
2728 
2729  INSTR_TIME_SET_CURRENT(start);
2730 
2731  for (k = 0; k < (int64) naccounts * scale; k++)
2732  {
2733  int64 j = k + 1;
2734 
2735  /* "filler" column defaults to blank padded empty string */
2736  snprintf(sql, sizeof(sql),
2737  INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
2738  j, k / naccounts + 1, 0);
2739  if (PQputline(con, sql))
2740  {
2741  fprintf(stderr, "PQputline failed\n");
2742  exit(1);
2743  }
2744 
2745  /*
2746  * If we want to stick with the original logging, print a message each
2747  * 100k inserted rows.
2748  */
2749  if ((!use_quiet) && (j % 100000 == 0))
2750  {
2751  INSTR_TIME_SET_CURRENT(diff);
2752  INSTR_TIME_SUBTRACT(diff, start);
2753 
2754  elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2755  remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2756 
2757  fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2758  j, (int64) naccounts * scale,
2759  (int) (((int64) j * 100) / (naccounts * (int64) scale)),
2760  elapsed_sec, remaining_sec);
2761  }
2762  /* let's not call the timing for each row, but only each 100 rows */
2763  else if (use_quiet && (j % 100 == 0))
2764  {
2765  INSTR_TIME_SET_CURRENT(diff);
2766  INSTR_TIME_SUBTRACT(diff, start);
2767 
2768  elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2769  remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2770 
2771  /* have we reached the next interval (or end)? */
2772  if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
2773  {
2774  fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2775  j, (int64) naccounts * scale,
2776  (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
2777 
2778  /* skip to the next interval */
2779  log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
2780  }
2781  }
2782 
2783  }
2784  if (PQputline(con, "\\.\n"))
2785  {
2786  fprintf(stderr, "very last PQputline failed\n");
2787  exit(1);
2788  }
2789  if (PQendcopy(con))
2790  {
2791  fprintf(stderr, "PQendcopy failed\n");
2792  exit(1);
2793  }
2794  executeStatement(con, "commit");
2795 
2796  /* vacuum */
2797  if (!is_no_vacuum)
2798  {
2799  fprintf(stderr, "vacuum...\n");
2800  executeStatement(con, "vacuum analyze pgbench_branches");
2801  executeStatement(con, "vacuum analyze pgbench_tellers");
2802  executeStatement(con, "vacuum analyze pgbench_accounts");
2803  executeStatement(con, "vacuum analyze pgbench_history");
2804  }
2805 
2806  /*
2807  * create indexes
2808  */
2809  fprintf(stderr, "set primary keys...\n");
2810  for (i = 0; i < lengthof(DDLINDEXes); i++)
2811  {
2812  char buffer[256];
2813 
2814  strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
2815 
2816  if (index_tablespace != NULL)
2817  {
2818  char *escape_tablespace;
2819 
2820  escape_tablespace = PQescapeIdentifier(con, index_tablespace,
2821  strlen(index_tablespace));
2822  snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
2823  " using index tablespace %s", escape_tablespace);
2824  PQfreemem(escape_tablespace);
2825  }
2826 
2827  executeStatement(con, buffer);
2828  }
2829 
2830  /*
2831  * create foreign keys
2832  */
2833  if (foreign_keys)
2834  {
2835  fprintf(stderr, "set foreign keys...\n");
2836  for (i = 0; i < lengthof(DDLKEYs); i++)
2837  {
2838  executeStatement(con, DDLKEYs[i]);
2839  }
2840  }
2841 
2842  fprintf(stderr, "done.\n");
2843  PQfinish(con);
2844 }
2845 
2846 /*
2847  * Parse the raw sql and replace :param to $n.
2848  */
2849 static bool
2850 parseQuery(Command *cmd, const char *raw_sql)
2851 {
2852  char *sql,
2853  *p;
2854 
2855  sql = pg_strdup(raw_sql);
2856  cmd->argc = 1;
2857 
2858  p = sql;
2859  while ((p = strchr(p, ':')) != NULL)
2860  {
2861  char var[12];
2862  char *name;
2863  int eaten;
2864 
2865  name = parseVariable(p, &eaten);
2866  if (name == NULL)
2867  {
2868  while (*p == ':')
2869  {
2870  p++;
2871  }
2872  continue;
2873  }
2874 
2875  if (cmd->argc >= MAX_ARGS)
2876  {
2877  fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
2878  pg_free(name);
2879  return false;
2880  }
2881 
2882  sprintf(var, "$%d", cmd->argc);
2883  p = replaceVariable(&sql, p, eaten, var);
2884 
2885  cmd->argv[cmd->argc] = name;
2886  cmd->argc++;
2887  }
2888 
2889  cmd->argv[0] = sql;
2890  return true;
2891 }
2892 
2893 /*
2894  * Simple error-printing function, might be needed by lexer
2895  */
2896 static void
2897 pgbench_error(const char *fmt,...)
2898 {
2899  va_list ap;
2900 
2901  fflush(stdout);
2902  va_start(ap, fmt);
2903  vfprintf(stderr, _(fmt), ap);
2904  va_end(ap);
2905 }
2906 
2907 /*
2908  * syntax error while parsing a script (in practice, while parsing a
2909  * backslash command, because we don't detect syntax errors in SQL)
2910  *
2911  * source: source of script (filename or builtin-script ID)
2912  * lineno: line number within script (count from 1)
2913  * line: whole line of backslash command, if available
2914  * command: backslash command name, if available
2915  * msg: the actual error message
2916  * more: optional extra message
2917  * column: zero-based column number, or -1 if unknown
2918  */
2919 void
2920 syntax_error(const char *source, int lineno,
2921  const char *line, const char *command,
2922  const char *msg, const char *more, int column)
2923 {
2924  fprintf(stderr, "%s:%d: %s", source, lineno, msg);
2925  if (more != NULL)
2926  fprintf(stderr, " (%s)", more);
2927  if (column >= 0 && line == NULL)
2928  fprintf(stderr, " at column %d", column + 1);
2929  if (command != NULL)
2930  fprintf(stderr, " in command \"%s\"", command);
2931  fprintf(stderr, "\n");
2932  if (line != NULL)
2933  {
2934  fprintf(stderr, "%s\n", line);
2935  if (column >= 0)
2936  {
2937  int i;
2938 
2939  for (i = 0; i < column; i++)
2940  fprintf(stderr, " ");
2941  fprintf(stderr, "^ error found here\n");
2942  }
2943  }
2944  exit(1);
2945 }
2946 
2947 /*
2948  * Parse a SQL command; return a Command struct, or NULL if it's a comment
2949  *
2950  * On entry, psqlscan.l has collected the command into "buf", so we don't
2951  * really need to do much here except check for comment and set up a
2952  * Command struct.
2953  */
2954 static Command *
2956 {
2957  Command *my_command;
2958  char *p;
2959  char *nlpos;
2960 
2961  /* Skip any leading whitespace, as well as "--" style comments */
2962  p = buf->data;
2963  for (;;)
2964  {
2965  if (isspace((unsigned char) *p))
2966  p++;
2967  else if (strncmp(p, "--", 2) == 0)
2968  {
2969  p = strchr(p, '\n');
2970  if (p == NULL)
2971  return NULL;
2972  p++;
2973  }
2974  else
2975  break;
2976  }
2977 
2978  /* If there's nothing but whitespace and comments, we're done */
2979  if (*p == '\0')
2980  return NULL;
2981 
2982  /* Allocate and initialize Command structure */
2983  my_command = (Command *) pg_malloc0(sizeof(Command));
2984  my_command->command_num = num_commands++;
2985  my_command->type = SQL_COMMAND;
2986  my_command->argc = 0;
2987  initSimpleStats(&my_command->stats);
2988 
2989  /*
2990  * If SQL command is multi-line, we only want to save the first line as
2991  * the "line" label.
2992  */
2993  nlpos = strchr(p, '\n');
2994  if (nlpos)
2995  {
2996  my_command->line = pg_malloc(nlpos - p + 1);
2997  memcpy(my_command->line, p, nlpos - p);
2998  my_command->line[nlpos - p] = '\0';
2999  }
3000  else
3001  my_command->line = pg_strdup(p);
3002 
3003  switch (querymode)
3004  {
3005  case QUERY_SIMPLE:
3006  my_command->argv[0] = pg_strdup(p);
3007  my_command->argc++;
3008  break;
3009  case QUERY_EXTENDED:
3010  case QUERY_PREPARED:
3011  if (!parseQuery(my_command, p))
3012  exit(1);
3013  break;
3014  default:
3015  exit(1);
3016  }
3017 
3018  return my_command;
3019 }
3020 
3021 /*
3022  * Parse a backslash command; return a Command struct, or NULL if comment
3023  *
3024  * At call, we have scanned only the initial backslash.
3025  */
3026 static Command *
3027 process_backslash_command(PsqlScanState sstate, const char *source)
3028 {
3029  Command *my_command;
3030  PQExpBufferData word_buf;
3031  int word_offset;
3032  int offsets[MAX_ARGS]; /* offsets of argument words */
3033  int start_offset,
3034  end_offset;
3035  int lineno;
3036  int j;
3037 
3038  initPQExpBuffer(&word_buf);
3039 
3040  /* Remember location of the backslash */
3041  start_offset = expr_scanner_offset(sstate) - 1;
3042  lineno = expr_scanner_get_lineno(sstate, start_offset);
3043 
3044  /* Collect first word of command */
3045  if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
3046  {
3047  termPQExpBuffer(&word_buf);
3048  return NULL;
3049  }
3050 
3051  /* Allocate and initialize Command structure */
3052  my_command = (Command *) pg_malloc0(sizeof(Command));
3053  my_command->command_num = num_commands++;
3054  my_command->type = META_COMMAND;
3055  my_command->argc = 0;
3056  initSimpleStats(&my_command->stats);
3057 
3058  /* Save first word (command name) */
3059  j = 0;
3060  offsets[j] = word_offset;
3061  my_command->argv[j++] = pg_strdup(word_buf.data);
3062  my_command->argc++;
3063 
3064  if (pg_strcasecmp(my_command->argv[0], "set") == 0)
3065  {
3066  /* For \set, collect var name, then lex the expression. */
3068 
3069  if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
3070  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3071  "missing argument", NULL, -1);
3072 
3073  offsets[j] = word_offset;
3074  my_command->argv[j++] = pg_strdup(word_buf.data);
3075  my_command->argc++;
3076 
3077  yyscanner = expr_scanner_init(sstate, source, lineno, start_offset,
3078  my_command->argv[0]);
3079 
3080  if (expr_yyparse(yyscanner) != 0)
3081  {
3082  /* dead code: exit done from syntax_error called by yyerror */
3083  exit(1);
3084  }
3085 
3086  my_command->expr = expr_parse_result;
3087 
3088  /* Get location of the ending newline */
3089  end_offset = expr_scanner_offset(sstate) - 1;
3090 
3091  /* Save line */
3092  my_command->line = expr_scanner_get_substring(sstate,
3093  start_offset,
3094  end_offset);
3095 
3096  expr_scanner_finish(yyscanner);
3097 
3098  termPQExpBuffer(&word_buf);
3099 
3100  return my_command;
3101  }
3102 
3103  /* For all other commands, collect remaining words. */
3104  while (expr_lex_one_word(sstate, &word_buf, &word_offset))
3105  {
3106  if (j >= MAX_ARGS)
3107  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3108  "too many arguments", NULL, -1);
3109 
3110  offsets[j] = word_offset;
3111  my_command->argv[j++] = pg_strdup(word_buf.data);
3112  my_command->argc++;
3113  }
3114 
3115  /* Get location of the ending newline */
3116  end_offset = expr_scanner_offset(sstate) - 1;
3117 
3118  /* Save line */
3119  my_command->line = expr_scanner_get_substring(sstate,
3120  start_offset,
3121  end_offset);
3122 
3123  if (pg_strcasecmp(my_command->argv[0], "sleep") == 0)
3124  {
3125  if (my_command->argc < 2)
3126  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3127  "missing argument", NULL, -1);
3128 
3129  if (my_command->argc > 3)
3130  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3131  "too many arguments", NULL,
3132  offsets[3] - start_offset);
3133 
3134  /*
3135  * Split argument into number and unit to allow "sleep 1ms" etc. We
3136  * don't have to terminate the number argument with null because it
3137  * will be parsed with atoi, which ignores trailing non-digit
3138  * characters.
3139  */
3140  if (my_command->argc == 2 && my_command->argv[1][0] != ':')
3141  {
3142  char *c = my_command->argv[1];
3143 
3144  while (isdigit((unsigned char) *c))
3145  c++;
3146  if (*c)
3147  {
3148  my_command->argv[2] = c;
3149  offsets[2] = offsets[1] + (c - my_command->argv[1]);
3150  my_command->argc = 3;
3151  }
3152  }
3153 
3154  if (my_command->argc == 3)
3155  {
3156  if (pg_strcasecmp(my_command->argv[2], "us") != 0 &&
3157  pg_strcasecmp(my_command->argv[2], "ms") != 0 &&
3158  pg_strcasecmp(my_command->argv[2], "s") != 0)
3159  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3160  "unrecognized time unit, must be us, ms or s",
3161  my_command->argv[2], offsets[2] - start_offset);
3162  }
3163  }
3164  else if (pg_strcasecmp(my_command->argv[0], "setshell") == 0)
3165  {
3166  if (my_command->argc < 3)
3167  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3168  "missing argument", NULL, -1);
3169  }
3170  else if (pg_strcasecmp(my_command->argv[0], "shell") == 0)
3171  {
3172  if (my_command->argc < 2)
3173  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3174  "missing command", NULL, -1);
3175  }
3176  else
3177  {
3178  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3179  "invalid command", NULL, -1);
3180  }
3181 
3182  termPQExpBuffer(&word_buf);
3183 
3184  return my_command;
3185 }
3186 
3187 /*
3188  * Parse a script (either the contents of a file, or a built-in script)
3189  * and add it to the list of scripts.
3190  */
3191 static void
3192 ParseScript(const char *script, const char *desc, int weight)
3193 {
3194  ParsedScript ps;
3195  PsqlScanState sstate;
3196  PQExpBufferData line_buf;
3197  int alloc_num;
3198  int index;
3199 
3200 #define COMMANDS_ALLOC_NUM 128
3201  alloc_num = COMMANDS_ALLOC_NUM;
3202 
3203  /* Initialize all fields of ps */
3204  ps.desc = desc;
3205  ps.weight = weight;
3206  ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
3207  initStats(&ps.stats, 0);
3208 
3209  /* Prepare to parse script */
3211 
3212  /*
3213  * Ideally, we'd scan scripts using the encoding and stdstrings settings
3214  * we get from a DB connection. However, without major rearrangement of
3215  * pgbench's argument parsing, we can't have a DB connection at the time
3216  * we parse scripts. Using SQL_ASCII (encoding 0) should work well enough
3217  * with any backend-safe encoding, though conceivably we could be fooled
3218  * if a script file uses a client-only encoding. We also assume that
3219  * stdstrings should be true, which is a bit riskier.
3220  */
3221  psql_scan_setup(sstate, script, strlen(script), 0, true);
3222 
3223  initPQExpBuffer(&line_buf);
3224 
3225  index = 0;
3226 
3227  for (;;)
3228  {
3229  PsqlScanResult sr;
3230  promptStatus_t prompt;
3231  Command *command;
3232 
3233  resetPQExpBuffer(&line_buf);
3234 
3235  sr = psql_scan(sstate, &line_buf, &prompt);
3236 
3237  /* If we collected a SQL command, process that */
3238  command = process_sql_command(&line_buf, desc);
3239  if (command)
3240  {
3241  ps.commands[index] = command;
3242  index++;
3243 
3244  if (index >= alloc_num)
3245  {
3246  alloc_num += COMMANDS_ALLOC_NUM;
3247  ps.commands = (Command **)
3248  pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
3249  }
3250  }
3251 
3252  /* If we reached a backslash, process that */
3253  if (sr == PSCAN_BACKSLASH)
3254  {
3255  command = process_backslash_command(sstate, desc);
3256  if (command)
3257  {
3258  ps.commands[index] = command;
3259  index++;
3260 
3261  if (index >= alloc_num)
3262  {
3263  alloc_num += COMMANDS_ALLOC_NUM;
3264  ps.commands = (Command **)
3265  pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
3266  }
3267  }
3268  }
3269 
3270  /* Done if we reached EOF */
3271  if (sr == PSCAN_INCOMPLETE || sr == PSCAN_EOL)
3272  break;
3273  }
3274 
3275  ps.commands[index] = NULL;
3276 
3277  addScript(ps);
3278 
3279  termPQExpBuffer(&line_buf);
3280  psql_scan_finish(sstate);
3281  psql_scan_destroy(sstate);
3282 }
3283 
3284 /*
3285  * Read the entire contents of file fd, and return it in a malloc'd buffer.
3286  *
3287  * The buffer will typically be larger than necessary, but we don't care
3288  * in this program, because we'll free it as soon as we've parsed the script.
3289  */
3290 static char *
3292 {
3293  char *buf;
3294  size_t buflen = BUFSIZ;
3295  size_t used = 0;
3296 
3297  buf = (char *) pg_malloc(buflen);
3298 
3299  for (;;)
3300  {
3301  size_t nread;
3302 
3303  nread = fread(buf + used, 1, BUFSIZ, fd);
3304  used += nread;
3305  /* If fread() read less than requested, must be EOF or error */
3306  if (nread < BUFSIZ)
3307  break;
3308  /* Enlarge buf so we can read some more */
3309  buflen += BUFSIZ;
3310  buf = (char *) pg_realloc(buf, buflen);
3311  }
3312  /* There is surely room for a terminator */
3313  buf[used] = '\0';
3314 
3315  return buf;
3316 }
3317 
3318 /*
3319  * Given a file name, read it and add its script to the list.
3320  * "-" means to read stdin.
3321  * NB: filename must be storage that won't disappear.
3322  */
3323 static void
3324 process_file(const char *filename, int weight)
3325 {
3326  FILE *fd;
3327  char *buf;
3328 
3329  /* Slurp the file contents into "buf" */
3330  if (strcmp(filename, "-") == 0)
3331  fd = stdin;
3332  else if ((fd = fopen(filename, "r")) == NULL)
3333  {
3334  fprintf(stderr, "could not open file \"%s\": %s\n",
3335  filename, strerror(errno));
3336  exit(1);
3337  }
3338 
3339  buf = read_file_contents(fd);
3340 
3341  if (ferror(fd))
3342  {
3343  fprintf(stderr, "could not read file \"%s\": %s\n",
3344  filename, strerror(errno));
3345  exit(1);
3346  }
3347 
3348  if (fd != stdin)
3349  fclose(fd);
3350 
3351  ParseScript(buf, filename, weight);
3352 
3353  free(buf);
3354 }
3355 
3356 /* Parse the given builtin script and add it to the list. */
3357 static void
3358 process_builtin(const BuiltinScript *bi, int weight)
3359 {
3360  ParseScript(bi->script, bi->desc, weight);
3361 }
3362 
3363 /* show available builtin scripts */
3364 static void
3366 {
3367  int i;
3368 
3369  fprintf(stderr, "Available builtin scripts:\n");
3370  for (i = 0; i < lengthof(builtin_script); i++)
3371  fprintf(stderr, "\t%s\n", builtin_script[i].name);
3372  fprintf(stderr, "\n");
3373 }
3374 
3375 /* return builtin script "name" if unambiguous, fails if not found */
3376 static const BuiltinScript *
3377 findBuiltin(const char *name)
3378 {
3379  int i,
3380  found = 0,
3381  len = strlen(name);
3382  const BuiltinScript *result = NULL;
3383 
3384  for (i = 0; i < lengthof(builtin_script); i++)
3385  {
3386  if (strncmp(builtin_script[i].name, name, len) == 0)
3387  {
3388  result = &builtin_script[i];
3389  found++;
3390  }
3391  }
3392 
3393  /* ok, unambiguous result */
3394  if (found == 1)
3395  return result;
3396 
3397  /* error cases */
3398  if (found == 0)
3399  fprintf(stderr, "no builtin script found for name \"%s\"\n", name);
3400  else /* found > 1 */
3401  fprintf(stderr,
3402  "ambiguous builtin name: %d builtin scripts found for prefix \"%s\"\n", found, name);
3403 
3405  exit(1);
3406 }
3407 
3408 /*
3409  * Determine the weight specification from a script option (-b, -f), if any,
3410  * and return it as an integer (1 is returned if there's no weight). The
3411  * script name is returned in *script as a malloc'd string.
3412  */
3413 static int
3414 parseScriptWeight(const char *option, char **script)
3415 {
3416  char *sep;
3417  int weight;
3418 
3419  if ((sep = strrchr(option, WSEP)))
3420  {
3421  int namelen = sep - option;
3422  long wtmp;
3423  char *badp;
3424 
3425  /* generate the script name */
3426  *script = pg_malloc(namelen + 1);
3427  strncpy(*script, option, namelen);
3428  (*script)[namelen] = '\0';
3429 
3430  /* process digits of the weight spec */
3431  errno = 0;
3432  wtmp = strtol(sep + 1, &badp, 10);
3433  if (errno != 0 || badp == sep + 1 || *badp != '\0')
3434  {
3435  fprintf(stderr, "invalid weight specification: %s\n", sep);
3436  exit(1);
3437  }
3438  if (wtmp > INT_MAX || wtmp < 0)
3439  {
3440  fprintf(stderr,
3441  "weight specification out of range (0 .. %u): " INT64_FORMAT "\n",
3442  INT_MAX, (int64) wtmp);
3443  exit(1);
3444  }
3445  weight = wtmp;
3446  }
3447  else
3448  {
3449  *script = pg_strdup(option);
3450  weight = 1;
3451  }
3452 
3453  return weight;
3454 }
3455 
3456 /* append a script to the list of scripts to process */
3457 static void
3459 {
3460  if (script.commands == NULL || script.commands[0] == NULL)
3461  {
3462  fprintf(stderr, "empty command list for script \"%s\"\n", script.desc);
3463  exit(1);
3464  }
3465 
3466  if (num_scripts >= MAX_SCRIPTS)
3467  {
3468  fprintf(stderr, "at most %d SQL scripts are allowed\n", MAX_SCRIPTS);
3469  exit(1);
3470  }
3471 
3472  sql_script[num_scripts] = script;
3473  num_scripts++;
3474 }
3475 
3476 static void
3477 printSimpleStats(char *prefix, SimpleStats *ss)
3478 {
3479  /* print NaN if no transactions where executed */
3480  double latency = ss->sum / ss->count;
3481  double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
3482 
3483  printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
3484  printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
3485 }
3486 
3487 /* print out results */
3488 static void
3489 printResults(TState *threads, StatsData *total, instr_time total_time,
3490  instr_time conn_total_time, int latency_late)
3491 {
3492  double time_include,
3493  tps_include,
3494  tps_exclude;
3495 
3496  time_include = INSTR_TIME_GET_DOUBLE(total_time);
3497  tps_include = total->cnt / time_include;
3498  tps_exclude = total->cnt / (time_include -
3499  (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
3500 
3501  /* Report test parameters. */
3502  printf("transaction type: %s\n",
3503  num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
3504  printf("scaling factor: %d\n", scale);
3505  printf("query mode: %s\n", QUERYMODE[querymode]);
3506  printf("number of clients: %d\n", nclients);
3507  printf("number of threads: %d\n", nthreads);
3508  if (duration <= 0)
3509  {
3510  printf("number of transactions per client: %d\n", nxacts);
3511  printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
3512  total->cnt, nxacts * nclients);
3513  }
3514  else
3515  {
3516  printf("duration: %d s\n", duration);
3517  printf("number of transactions actually processed: " INT64_FORMAT "\n",
3518  total->cnt);
3519  }
3520 
3521  /* Remaining stats are nonsensical if we failed to execute any xacts */
3522  if (total->cnt <= 0)
3523  return;
3524 
3525  if (throttle_delay && latency_limit)
3526  printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
3527  total->skipped,
3528  100.0 * total->skipped / (total->skipped + total->cnt));
3529 
3530  if (latency_limit)
3531  printf("number of transactions above the %.1f ms latency limit: %d (%.3f %%)\n",
3532  latency_limit / 1000.0, latency_late,
3533  100.0 * latency_late / (total->skipped + total->cnt));
3534 
3535  if (throttle_delay || progress || latency_limit)
3536  printSimpleStats("latency", &total->latency);
3537  else
3538  {
3539  /* no measurement, show average latency computed from run time */
3540  printf("latency average = %.3f ms\n",
3541  1000.0 * time_include * nclients / total->cnt);
3542  }
3543 
3544  if (throttle_delay)
3545  {
3546  /*
3547  * Report average transaction lag under rate limit throttling. This
3548  * is the delay between scheduled and actual start times for the
3549  * transaction. The measured lag may be caused by thread/client load,
3550  * the database load, or the Poisson throttling process.
3551  */
3552  printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
3553  0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
3554  }
3555 
3556  printf("tps = %f (including connections establishing)\n", tps_include);
3557  printf("tps = %f (excluding connections establishing)\n", tps_exclude);
3558 
3559  /* Report per-script/command statistics */
3560  if (per_script_stats || latency_limit || is_latencies)
3561  {
3562  int i;
3563 
3564  for (i = 0; i < num_scripts; i++)
3565  {
3566  if (num_scripts > 1)
3567  printf("SQL script %d: %s\n"
3568  " - weight: %d (targets %.1f%% of total)\n"
3569  " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n",
3570  i + 1, sql_script[i].desc,
3571  sql_script[i].weight,
3572  100.0 * sql_script[i].weight / total_weight,
3573  sql_script[i].stats.cnt,
3574  100.0 * sql_script[i].stats.cnt / total->cnt,
3575  sql_script[i].stats.cnt / time_include);
3576  else
3577  printf("script statistics:\n");
3578 
3579  if (latency_limit)
3580  printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
3581  sql_script[i].stats.skipped,
3582  100.0 * sql_script[i].stats.skipped /
3583  (sql_script[i].stats.skipped + sql_script[i].stats.cnt));
3584 
3585  if (num_scripts > 1)
3586  printSimpleStats(" - latency", &sql_script[i].stats.latency);
3587 
3588  /* Report per-command latencies */
3589  if (is_latencies)
3590  {
3591  Command **commands;
3592 
3593  printf(" - statement latencies in milliseconds:\n");
3594 
3595  for (commands = sql_script[i].commands;
3596  *commands != NULL;
3597  commands++)
3598  printf(" %11.3f %s\n",
3599  1000.0 * (*commands)->stats.sum /
3600  (*commands)->stats.count,
3601  (*commands)->line);
3602  }
3603  }
3604  }
3605 }
3606 
3607 
3608 int
3609 main(int argc, char **argv)
3610 {
3611  static struct option long_options[] = {
3612  /* systematic long/short named options */
3613  {"tpc-b", no_argument, NULL, 'b'},
3614  {"client", required_argument, NULL, 'c'},
3615  {"connect", no_argument, NULL, 'C'},
3616  {"debug", no_argument, NULL, 'd'},
3617  {"define", required_argument, NULL, 'D'},
3618  {"file", required_argument, NULL, 'f'},
3619  {"fillfactor", required_argument, NULL, 'F'},
3620  {"host", required_argument, NULL, 'h'},
3621  {"initialize", no_argument, NULL, 'i'},
3622  {"jobs", required_argument, NULL, 'j'},
3623  {"log", no_argument, NULL, 'l'},
3624  {"latency-limit", required_argument, NULL, 'L'},
3625  {"no-vacuum", no_argument, NULL, 'n'},
3626  {"port", required_argument, NULL, 'p'},
3627  {"progress", required_argument, NULL, 'P'},
3628  {"protocol", required_argument, NULL, 'M'},
3629  {"quiet", no_argument, NULL, 'q'},
3630  {"report-latencies", no_argument, NULL, 'r'},
3631  {"rate", required_argument, NULL, 'R'},
3632  {"scale", required_argument, NULL, 's'},
3633  {"select-only", no_argument, NULL, 'S'},
3634  {"skip-some-updates", no_argument, NULL, 'N'},
3635  {"time", required_argument, NULL, 'T'},
3636  {"transactions", required_argument, NULL, 't'},
3637  {"username", required_argument, NULL, 'U'},
3638  {"vacuum-all", no_argument, NULL, 'v'},
3639  /* long-named only options */
3640  {"foreign-keys", no_argument, &foreign_keys, 1},
3641  {"index-tablespace", required_argument, NULL, 3},
3642  {"tablespace", required_argument, NULL, 2},
3643  {"unlogged-tables", no_argument, &unlogged_tables, 1},
3644  {"sampling-rate", required_argument, NULL, 4},
3645  {"aggregate-interval", required_argument, NULL, 5},
3646  {"progress-timestamp", no_argument, NULL, 6},
3647  {"log-prefix", required_argument, NULL, 7},
3648  {NULL, 0, NULL, 0}
3649  };
3650 
3651  int c;
3652  int is_init_mode = 0; /* initialize mode? */
3653  int is_no_vacuum = 0; /* no vacuum at all before testing? */
3654  int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
3655  int optindex;
3656  bool scale_given = false;
3657 
3658  bool benchmarking_option_set = false;
3659  bool initialization_option_set = false;
3660  bool internal_script_used = false;
3661 
3662  CState *state; /* status of clients */
3663  TState *threads; /* array of thread */
3664 
3665  instr_time start_time; /* start up time */
3666  instr_time total_time;
3667  instr_time conn_total_time;
3668  int64 latency_late = 0;
3669  StatsData stats;
3670  int weight;
3671 
3672  int i;
3673  int nclients_dealt;
3674 
3675 #ifdef HAVE_GETRLIMIT
3676  struct rlimit rlim;
3677 #endif
3678 
3679  PGconn *con;
3680  PGresult *res;
3681  char *env;
3682 
3683  progname = get_progname(argv[0]);
3684 
3685  if (argc > 1)
3686  {
3687  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
3688  {
3689  usage();
3690  exit(0);
3691  }
3692  if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
3693  {
3694  puts("pgbench (PostgreSQL) " PG_VERSION);
3695  exit(0);
3696  }
3697  }
3698 
3699 #ifdef WIN32
3700  /* stderr is buffered on Win32. */
3701  setvbuf(stderr, NULL, _IONBF, 0);
3702 #endif
3703 
3704  if ((env = getenv("PGHOST")) != NULL && *env != '\0')
3705  pghost = env;
3706  if ((env = getenv("PGPORT")) != NULL && *env != '\0')
3707  pgport = env;
3708  else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
3709  login = env;
3710 
3711  state = (CState *) pg_malloc(sizeof(CState));
3712  memset(state, 0, sizeof(CState));
3713 
3714  while ((c = getopt_long(argc, argv, "ih:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
3715  {
3716  char *script;
3717 
3718  switch (c)
3719  {
3720  case 'i':
3721  is_init_mode++;
3722  break;
3723  case 'h':
3724  pghost = pg_strdup(optarg);
3725  break;
3726  case 'n':
3727  is_no_vacuum++;
3728  break;
3729  case 'v':
3730  do_vacuum_accounts++;
3731  break;
3732  case 'p':
3733  pgport = pg_strdup(optarg);
3734  break;
3735  case 'd':
3736  debug++;
3737  break;
3738  case 'c':
3739  benchmarking_option_set = true;
3740  nclients = atoi(optarg);
3741  if (nclients <= 0 || nclients > MAXCLIENTS)
3742  {
3743  fprintf(stderr, "invalid number of clients: \"%s\"\n",
3744  optarg);
3745  exit(1);
3746  }
3747 #ifdef HAVE_GETRLIMIT
3748 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
3749  if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
3750 #else /* but BSD doesn't ... */
3751  if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
3752 #endif /* RLIMIT_NOFILE */
3753  {
3754  fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
3755  exit(1);
3756  }
3757  if (rlim.rlim_cur < nclients + 3)
3758  {
3759  fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
3760  nclients + 3, (long) rlim.rlim_cur);
3761  fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
3762  exit(1);
3763  }
3764 #endif /* HAVE_GETRLIMIT */
3765  break;
3766  case 'j': /* jobs */
3767  benchmarking_option_set = true;
3768  nthreads = atoi(optarg);
3769  if (nthreads <= 0)
3770  {
3771  fprintf(stderr, "invalid number of threads: \"%s\"\n",
3772  optarg);
3773  exit(1);
3774  }
3775 #ifndef ENABLE_THREAD_SAFETY
3776  if (nthreads != 1)
3777  {
3778  fprintf(stderr, "threads are not supported on this platform; use -j1\n");
3779  exit(1);
3780  }
3781 #endif /* !ENABLE_THREAD_SAFETY */
3782  break;
3783  case 'C':
3784  benchmarking_option_set = true;
3785  is_connect = true;
3786  break;
3787  case 'r':
3788  benchmarking_option_set = true;
3789  per_script_stats = true;
3790  is_latencies = true;
3791  break;
3792  case 's':
3793  scale_given = true;
3794  scale = atoi(optarg);
3795  if (scale <= 0)
3796  {
3797  fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
3798  exit(1);
3799  }
3800  break;
3801  case 't':
3802  benchmarking_option_set = true;
3803  if (duration > 0)
3804  {
3805  fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
3806  exit(1);
3807  }
3808  nxacts = atoi(optarg);
3809  if (nxacts <= 0)
3810  {
3811  fprintf(stderr, "invalid number of transactions: \"%s\"\n",
3812  optarg);
3813  exit(1);
3814  }
3815  break;
3816  case 'T':
3817  benchmarking_option_set = true;
3818  if (nxacts > 0)
3819  {
3820  fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
3821  exit(1);
3822  }
3823  duration = atoi(optarg);
3824  if (duration <= 0)
3825  {
3826  fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
3827  exit(1);
3828  }
3829  break;
3830  case 'U':
3831  login = pg_strdup(optarg);
3832  break;
3833  case 'l':
3834  benchmarking_option_set = true;
3835  use_log = true;
3836  break;
3837  case 'q':
3838  initialization_option_set = true;
3839  use_quiet = true;
3840  break;
3841 
3842  case 'b':
3843  if (strcmp(optarg, "list") == 0)
3844  {
3846  exit(0);
3847  }
3848 
3849  weight = parseScriptWeight(optarg, &script);
3850  process_builtin(findBuiltin(script), weight);
3851  benchmarking_option_set = true;
3852  internal_script_used = true;
3853  break;
3854 
3855  case 'S':
3856  process_builtin(findBuiltin("select-only"), 1);
3857  benchmarking_option_set = true;
3858  internal_script_used = true;
3859  break;
3860  case 'N':
3861  process_builtin(findBuiltin("simple-update"), 1);
3862  benchmarking_option_set = true;
3863  internal_script_used = true;
3864  break;
3865  case 'f':
3866  weight = parseScriptWeight(optarg, &script);
3867  process_file(script, weight);
3868  benchmarking_option_set = true;
3869  break;
3870  case 'D':
3871  {
3872  char *p;
3873 
3874  benchmarking_option_set = true;
3875 
3876  if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
3877  {
3878  fprintf(stderr, "invalid variable definition: \"%s\"\n",
3879  optarg);
3880  exit(1);
3881  }
3882 
3883  *p++ = '\0';
3884  if (!putVariable(&state[0], "option", optarg, p))
3885  exit(1);
3886  }
3887  break;
3888  case 'F':
3889  initialization_option_set = true;
3890  fillfactor = atoi(optarg);
3891  if (fillfactor < 10 || fillfactor > 100)
3892  {
3893  fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
3894  exit(1);
3895  }
3896  break;
3897  case 'M':
3898  benchmarking_option_set = true;
3899  if (num_scripts > 0)
3900  {
3901  fprintf(stderr, "query mode (-M) should be specified before any transaction scripts (-f or -b)\n");
3902  exit(1);
3903  }
3904  for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
3905  if (strcmp(optarg, QUERYMODE[querymode]) == 0)
3906  break;
3907  if (querymode >= NUM_QUERYMODE)
3908  {
3909  fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
3910  optarg);
3911  exit(1);
3912  }
3913  break;
3914  case 'P':
3915  benchmarking_option_set = true;
3916  progress = atoi(optarg);
3917  if (progress <= 0)
3918  {
3919  fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
3920  optarg);
3921  exit(1);
3922  }
3923  break;
3924  case 'R':
3925  {
3926  /* get a double from the beginning of option value */
3927  double throttle_value = atof(optarg);
3928 
3929  benchmarking_option_set = true;
3930 
3931  if (throttle_value <= 0.0)
3932  {
3933  fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
3934  exit(1);
3935  }
3936  /* Invert rate limit into a time offset */
3937  throttle_delay = (int64) (1000000.0 / throttle_value);
3938  }
3939  break;
3940  case 'L':
3941  {
3942  double limit_ms = atof(optarg);
3943 
3944  if (limit_ms <= 0.0)
3945  {
3946  fprintf(stderr, "invalid latency limit: \"%s\"\n",
3947  optarg);
3948  exit(1);
3949  }
3950  benchmarking_option_set = true;
3951  latency_limit = (int64) (limit_ms * 1000);
3952  }
3953  break;
3954  case 0:
3955  /* This covers long options which take no argument. */
3956  if (foreign_keys || unlogged_tables)
3957  initialization_option_set = true;
3958  break;
3959  case 2: /* tablespace */
3960  initialization_option_set = true;
3961  tablespace = pg_strdup(optarg);
3962  break;
3963  case 3: /* index-tablespace */
3964  initialization_option_set = true;
3965  index_tablespace = pg_strdup(optarg);
3966  break;
3967  case 4:
3968  benchmarking_option_set = true;
3969  sample_rate = atof(optarg);
3970  if (sample_rate <= 0.0 || sample_rate > 1.0)
3971  {
3972  fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
3973  exit(1);
3974  }
3975  break;
3976  case 5:
3977  benchmarking_option_set = true;
3978  agg_interval = atoi(optarg);
3979  if (agg_interval <= 0)
3980  {
3981  fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
3982  optarg);
3983  exit(1);
3984  }
3985  break;
3986  case 6:
3987  progress_timestamp = true;
3988  benchmarking_option_set = true;
3989  break;
3990  case 7:
3991  benchmarking_option_set = true;
3992  logfile_prefix = pg_strdup(optarg);
3993  break;
3994  default:
3995  fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
3996  exit(1);
3997  break;
3998  }
3999  }
4000 
4001  /* set default script if none */
4002  if (num_scripts == 0 && !is_init_mode)
4003  {
4004  process_builtin(findBuiltin("tpcb-like"), 1);
4005  benchmarking_option_set = true;
4006  internal_script_used = true;
4007  }
4008 
4009  /* compute total_weight */
4010  for (i = 0; i < num_scripts; i++)
4011  /* cannot overflow: weight is 32b, total_weight 64b */
4012  total_weight += sql_script[i].weight;
4013 
4014  if (total_weight == 0 && !is_init_mode)
4015  {
4016  fprintf(stderr, "total script weight must not be zero\n");
4017  exit(1);
4018  }
4019 
4020  /* show per script stats if several scripts are used */
4021  if (num_scripts > 1)
4022  per_script_stats = true;
4023 
4024  /*
4025  * Don't need more threads than there are clients. (This is not merely an
4026  * optimization; throttle_delay is calculated incorrectly below if some
4027  * threads have no clients assigned to them.)
4028  */
4029  if (nthreads > nclients)
4030  nthreads = nclients;
4031 
4032  /* compute a per thread delay */
4033  throttle_delay *= nthreads;
4034 
4035  if (argc > optind)
4036  dbName = argv[optind];
4037  else
4038  {
4039  if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
4040  dbName = env;
4041  else if (login != NULL && *login != '\0')
4042  dbName = login;
4043  else
4044  dbName = "";
4045  }
4046 
4047  if (is_init_mode)
4048  {
4049  if (benchmarking_option_set)
4050  {
4051  fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
4052  exit(1);
4053  }
4054 
4055  init(is_no_vacuum);
4056  exit(0);
4057  }
4058  else
4059  {
4060  if (initialization_option_set)
4061  {
4062  fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
4063  exit(1);
4064  }
4065  }
4066 
4067  /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
4068  if (nxacts <= 0 && duration <= 0)
4069  nxacts = DEFAULT_NXACTS;
4070 
4071  /* --sampling-rate may be used only with -l */
4072  if (sample_rate > 0.0 && !use_log)
4073  {
4074  fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
4075  exit(1);
4076  }
4077 
4078  /* --sampling-rate may not be used with --aggregate-interval */
4079  if (sample_rate > 0.0 && agg_interval > 0)
4080  {
4081  fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
4082  exit(1);
4083  }
4084 
4085  if (agg_interval > 0 && !use_log)
4086  {
4087  fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
4088  exit(1);
4089  }
4090 
4091  if (!use_log && logfile_prefix)
4092  {
4093  fprintf(stderr, "log file prefix (--log-prefix) is allowed only when logging transactions (-l)\n");
4094  exit(1);
4095  }
4096 
4097  if (duration > 0 && agg_interval > duration)
4098  {
4099  fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
4100  exit(1);
4101  }
4102 
4103  if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
4104  {
4105  fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
4106  exit(1);
4107  }
4108 
4109  /*
4110  * save main process id in the global variable because process id will be
4111  * changed after fork.
4112  */
4113  main_pid = (int) getpid();
4114 
4115  if (nclients > 1)
4116  {
4117  state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
4118  memset(state + 1, 0, sizeof(CState) * (nclients - 1));
4119 
4120  /* copy any -D switch values to all clients */
4121  for (i = 1; i < nclients; i++)
4122  {
4123  int j;
4124 
4125  state[i].id = i;
4126  for (j = 0; j < state[0].nvariables; j++)
4127  {
4128  Variable *var = &state[0].variables[j];
4129 
4130  if (var->is_numeric)
4131  {
4132  if (!putVariableNumber(&state[i], "startup",
4133  var->name, &var->num_value))
4134  exit(1);
4135  }
4136  else
4137  {
4138  if (!putVariable(&state[i], "startup",
4139  var->name, var->value))
4140  exit(1);
4141  }
4142  }
4143  }
4144  }
4145 
4146  if (debug)
4147  {
4148  if (duration <= 0)
4149  printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
4150  pghost, pgport, nclients, nxacts, dbName);
4151  else
4152  printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
4153  pghost, pgport, nclients, duration, dbName);
4154  }
4155 
4156  /* opening connection... */
4157  con = doConnect();
4158  if (con == NULL)
4159  exit(1);
4160 
4161  if (PQstatus(con) == CONNECTION_BAD)
4162  {
4163  fprintf(stderr, "connection to database \"%s\" failed\n", dbName);
4164  fprintf(stderr, "%s", PQerrorMessage(con));
4165  exit(1);
4166  }
4167 
4168  if (internal_script_used)
4169  {
4170  /*
4171  * get the scaling factor that should be same as count(*) from
4172  * pgbench_branches if this is not a custom query
4173  */
4174  res = PQexec(con, "select count(*) from pgbench_branches");
4175  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4176  {
4177  char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
4178 
4179  fprintf(stderr, "%s", PQerrorMessage(con));
4180  if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
4181  {
4182  fprintf(stderr, "Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"\n", PQdb(con));
4183  }
4184 
4185  exit(1);
4186  }
4187  scale = atoi(PQgetvalue(res, 0, 0));
4188  if (scale < 0)
4189  {
4190  fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
4191  PQgetvalue(res, 0, 0));
4192  exit(1);
4193  }
4194  PQclear(res);
4195 
4196  /* warn if we override user-given -s switch */
4197  if (scale_given)
4198  fprintf(stderr,
4199  "scale option ignored, using count from pgbench_branches table (%d)\n",
4200  scale);
4201  }
4202 
4203  /*
4204  * :scale variables normally get -s or database scale, but don't override
4205  * an explicit -D switch
4206  */
4207  if (lookupVariable(&state[0], "scale") == NULL)
4208  {
4209  for (i = 0; i < nclients; i++)
4210  {
4211  if (!putVariableInt(&state[i], "startup", "scale", scale))
4212  exit(1);
4213  }
4214  }
4215 
4216  /*
4217  * Define a :client_id variable that is unique per connection. But don't
4218  * override an explicit -D switch.
4219  */
4220  if (lookupVariable(&state[0], "client_id") == NULL)
4221  {
4222  for (i = 0; i < nclients; i++)
4223  {
4224  if (!putVariableInt(&state[i], "startup", "client_id", i))
4225  exit(1);
4226  }
4227  }
4228 
4229  if (!is_no_vacuum)
4230  {
4231  fprintf(stderr, "starting vacuum...");
4232  tryExecuteStatement(con, "vacuum pgbench_branches");
4233  tryExecuteStatement(con, "vacuum pgbench_tellers");
4234  tryExecuteStatement(con, "truncate pgbench_history");
4235  fprintf(stderr, "end.\n");
4236 
4237  if (do_vacuum_accounts)
4238  {
4239  fprintf(stderr, "starting vacuum pgbench_accounts...");
4240  tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
4241  fprintf(stderr, "end.\n");
4242  }
4243  }
4244  PQfinish(con);
4245 
4246  /* set random seed */
4247  INSTR_TIME_SET_CURRENT(start_time);
4248  srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
4249 
4250  /* set up thread data structures */
4251  threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
4252  nclients_dealt = 0;
4253 
4254  for (i = 0; i < nthreads; i++)
4255  {
4256  TState *thread = &threads[i];
4257 
4258  thread->tid = i;
4259  thread->state = &state[nclients_dealt];
4260  thread->nstate =
4261  (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
4262  thread->random_state[0] = random();
4263  thread->random_state[1] = random();
4264  thread->random_state[2] = random();
4265  thread->logfile = NULL; /* filled in later */
4266  thread->latency_late = 0;
4267  initStats(&thread->stats, 0);
4268 
4269  nclients_dealt += thread->nstate;
4270  }
4271 
4272  /* all clients must be assigned to a thread */
4273  Assert(nclients_dealt == nclients);
4274 
4275  /* get start up time */
4276  INSTR_TIME_SET_CURRENT(start_time);
4277 
4278  /* set alarm if duration is specified. */
4279  if (duration > 0)
4280  setalarm(duration);
4281 
4282  /* start threads */
4283 #ifdef ENABLE_THREAD_SAFETY
4284  for (i = 0; i < nthreads; i++)
4285  {
4286  TState *thread = &threads[i];
4287 
4289 
4290  /* compute when to stop */
4291  if (duration > 0)
4292  end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
4293  (int64) 1000000 *duration;
4294 
4295  /* the first thread (i = 0) is executed by main thread */
4296  if (i > 0)
4297  {
4298  int err = pthread_create(&thread->thread, NULL, threadRun, thread);
4299 
4300  if (err != 0 || thread->thread == INVALID_THREAD)
4301  {
4302  fprintf(stderr, "could not create thread: %s\n", strerror(err));
4303  exit(1);
4304  }
4305  }
4306  else
4307  {
4308  thread->thread = INVALID_THREAD;
4309  }
4310  }
4311 #else
4312  INSTR_TIME_SET_CURRENT(threads[0].start_time);
4313  /* compute when to stop */
4314  if (duration > 0)
4315  end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
4316  (int64) 1000000 *duration;
4317  threads[0].thread = INVALID_THREAD;
4318 #endif /* ENABLE_THREAD_SAFETY */
4319 
4320  /* wait for threads and accumulate results */
4321  initStats(&stats, 0);
4322  INSTR_TIME_SET_ZERO(conn_total_time);
4323  for (i = 0; i < nthreads; i++)
4324  {
4325  TState *thread = &threads[i];
4326 
4327 #ifdef ENABLE_THREAD_SAFETY
4328  if (threads[i].thread == INVALID_THREAD)
4329  /* actually run this thread directly in the main thread */
4330  (void) threadRun(thread);
4331  else
4332  /* wait of other threads. should check that 0 is returned? */
4333  pthread_join(thread->thread, NULL);
4334 #else
4335  (void) threadRun(thread);
4336 #endif /* ENABLE_THREAD_SAFETY */
4337 
4338  /* aggregate thread level stats */
4339  mergeSimpleStats(&stats.latency, &thread->stats.latency);
4340  mergeSimpleStats(&stats.lag, &thread->stats.lag);
4341  stats.cnt += thread->stats.cnt;
4342  stats.skipped += thread->stats.skipped;
4343  latency_late += thread->latency_late;
4344  INSTR_TIME_ADD(conn_total_time, thread->conn_time);
4345  }
4346  disconnect_all(state, nclients);
4347 
4348  /*
4349  * XXX We compute results as though every client of every thread started
4350  * and finished at the same time. That model can diverge noticeably from
4351  * reality for a short benchmark run involving relatively many threads.
4352  * The first thread may process notably many transactions before the last
4353  * thread begins. Improving the model alone would bring limited benefit,
4354  * because performance during those periods of partial thread count can
4355  * easily exceed steady state performance. This is one of the many ways
4356  * short runs convey deceptive performance figures.
4357  */
4358  INSTR_TIME_SET_CURRENT(total_time);
4359  INSTR_TIME_SUBTRACT(total_time, start_time);
4360  printResults(threads, &stats, total_time, conn_total_time, latency_late);
4361 
4362  return 0;
4363 }
4364 
4365 static void *
4367 {
4368  TState *thread = (TState *) arg;
4369  CState *state = thread->state;
4370  instr_time start,
4371  end;
4372  int nstate = thread->nstate;
4373  int remains = nstate; /* number of remaining clients */
4374  int i;
4375 
4376  /* for reporting progress: */
4377  int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
4378  int64 last_report = thread_start;
4379  int64 next_report = last_report + (int64) progress * 1000000;
4380  StatsData last,
4381  aggs;
4382 
4383  /*
4384  * Initialize throttling rate target for all of the thread's clients. It
4385  * might be a little more accurate to reset thread->start_time here too.
4386  * The possible drift seems too small relative to typical throttle delay
4387  * times to worry about it.
4388  */
4389  INSTR_TIME_SET_CURRENT(start);
4390  thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
4391 
4392  INSTR_TIME_SET_ZERO(thread->conn_time);
4393 
4394  initStats(&aggs, time(NULL));
4395  last = aggs;
4396 
4397  /* open log file if requested */
4398  if (use_log)
4399  {
4400  char logpath[MAXPGPATH];
4401  char *prefix = logfile_prefix ? logfile_prefix : "pgbench_log";
4402 
4403  if (thread->tid == 0)
4404  snprintf(logpath, sizeof(logpath), "%s.%d", prefix, main_pid);
4405  else
4406  snprintf(logpath, sizeof(logpath), "%s.%d.%d", prefix, main_pid, thread->tid);
4407 
4408  thread->logfile = fopen(logpath, "w");
4409 
4410  if (thread->logfile == NULL)
4411  {
4412  fprintf(stderr, "could not open logfile \"%s\": %s\n",
4413  logpath, strerror(errno));
4414  goto done;
4415  }
4416  }
4417 
4418  if (!is_connect)
4419  {
4420  /* make connections to the database */
4421  for (i = 0; i < nstate; i++)
4422  {
4423  if ((state[i].con = doConnect()) == NULL)
4424  goto done;
4425  }
4426  }
4427 
4428  /* time after thread and connections set up */
4430  INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
4431 
4432  /* explicitly initialize the state machines */
4433  for (i = 0; i < nstate; i++)
4434  {
4435  state[i].state = CSTATE_CHOOSE_SCRIPT;
4436  }
4437 
4438  /* loop till all clients have terminated */
4439  while (remains > 0)
4440  {
4441  fd_set input_mask;
4442  int maxsock; /* max socket number to be waited for */
4443  int64 min_usec;
4444  int64 now_usec = 0; /* set this only if needed */
4445 
4446  /* identify which client sockets should be checked for input */
4447  FD_ZERO(&input_mask);
4448  maxsock = -1;
4449  min_usec = PG_INT64_MAX;
4450  for (i = 0; i < nstate; i++)
4451  {
4452  CState *st = &state[i];
4453 
4454  if (st->state == CSTATE_THROTTLE && timer_exceeded)
4455  {
4456  /* interrupt client that has not started a transaction */
4457  st->state = CSTATE_FINISHED;
4458  PQfinish(st->con);
4459  st->con = NULL;
4460  remains--;
4461  }
4462  else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
4463  {
4464  /* a nap from the script, or under throttling */
4465  int64 this_usec;
4466 
4467  /* get current time if needed */
4468  if (now_usec == 0)
4469  {
4470  instr_time now;
4471 
4473  now_usec = INSTR_TIME_GET_MICROSEC(now);
4474  }
4475 
4476  /* min_usec should be the minimum delay across all clients */
4477  this_usec = (st->state == CSTATE_SLEEP ?
4478  st->sleep_until : st->txn_scheduled) - now_usec;
4479  if (min_usec > this_usec)
4480  min_usec = this_usec;
4481  }
4482  else if (st->state == CSTATE_WAIT_RESULT)
4483  {
4484  /*
4485  * waiting for result from server - nothing to do unless the
4486  * socket is readable
4487  */
4488  int sock = PQsocket(st->con);
4489 
4490  if (sock < 0)
4491  {
4492  fprintf(stderr, "invalid socket: %s",
4493  PQerrorMessage(st->con));
4494  goto done;
4495  }
4496 
4497  FD_SET(sock, &input_mask);
4498  if (maxsock < sock)
4499  maxsock = sock;
4500  }
4501  else if (st->state != CSTATE_ABORTED &&
4502  st->state != CSTATE_FINISHED)
4503  {
4504  /*
4505  * This client thread is ready to do something, so we don't
4506  * want to wait. No need to examine additional clients.
4507  */
4508  min_usec = 0;
4509  break;
4510  }
4511  }
4512 
4513  /* also wake up to print the next progress report on time */
4514  if (progress && min_usec > 0 && thread->tid == 0)
4515  {
4516  /* get current time if needed */
4517  if (now_usec == 0)
4518  {
4519  instr_time now;
4520 
4522  now_usec = INSTR_TIME_GET_MICROSEC(now);
4523  }
4524 
4525  if (now_usec >= next_report)
4526  min_usec = 0;
4527  else if ((next_report - now_usec) < min_usec)
4528  min_usec = next_report - now_usec;
4529  }
4530 
4531  /*
4532  * If no clients are ready to execute actions, sleep until we receive
4533  * data from the server, or a nap-time specified in the script ends,
4534  * or it's time to print a progress report. Update input_mask to show
4535  * which client(s) received data.
4536  */
4537  if (min_usec > 0 && maxsock != -1)
4538  {
4539  int nsocks; /* return from select(2) */
4540 
4541  if (min_usec != PG_INT64_MAX)
4542  {
4543  struct timeval timeout;
4544 
4545  timeout.tv_sec = min_usec / 1000000;
4546  timeout.tv_usec = min_usec % 1000000;
4547  nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
4548  }
4549  else
4550  nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
4551  if (nsocks < 0)
4552  {
4553  if (errno == EINTR)
4554  {
4555  /* On EINTR, go back to top of loop */
4556  continue;
4557  }
4558  /* must be something wrong */
4559  fprintf(stderr, "select() failed: %s\n", strerror(errno));
4560  goto done;
4561  }
4562  }
4563  else
4564  {
4565  /* If we didn't call select(), don't try to read any data */
4566  FD_ZERO(&input_mask);
4567  }
4568 
4569  /* ok, advance the state machine of each connection */
4570  for (i = 0; i < nstate; i++)
4571  {
4572  CState *st = &state[i];
4573 
4574  if (st->state == CSTATE_WAIT_RESULT)
4575  {
4576  /* don't call doCustom unless data is available */
4577  int sock = PQsocket(st->con);
4578 
4579  if (sock < 0)
4580  {
4581  fprintf(stderr, "invalid socket: %s",
4582  PQerrorMessage(st->con));
4583  goto done;
4584  }
4585 
4586  if (!FD_ISSET(sock, &input_mask))
4587  continue;
4588  }
4589  else if (st->state == CSTATE_FINISHED ||
4590  st->state == CSTATE_ABORTED)
4591  {
4592  /* this client is done, no need to consider it anymore */
4593  continue;
4594  }
4595 
4596  doCustom(thread, st, &aggs);
4597 
4598  /* If doCustom changed client to finished state, reduce remains */
4599  if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
4600  remains--;
4601  }
4602 
4603  /* progress report is made by thread 0 for all threads */
4604  if (progress && thread->tid == 0)
4605  {
4606  instr_time now_time;
4607  int64 now;
4608 
4609  INSTR_TIME_SET_CURRENT(now_time);
4610  now = INSTR_TIME_GET_MICROSEC(now_time);
4611  if (now >= next_report)
4612  {
4613  /* generate and show report */
4614  StatsData cur;
4615  int64 run = now - last_report;
4616  double tps,
4617  total_run,
4618  latency,
4619  sqlat,
4620  lag,
4621  stdev;
4622  char tbuf[64];
4623 
4624  /*
4625  * Add up the statistics of all threads.
4626  *
4627  * XXX: No locking. There is no guarantee that we get an
4628  * atomic snapshot of the transaction count and latencies, so
4629  * these figures can well be off by a small amount. The
4630  * progress is report's purpose is to give a quick overview of
4631  * how the test is going, so that shouldn't matter too much.
4632  * (If a read from a 64-bit integer is not atomic, you might
4633  * get a "torn" read and completely bogus latencies though!)
4634  */
4635  initStats(&cur, 0);
4636  for (i = 0; i < nthreads; i++)
4637  {
4638  mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
4639  mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
4640  cur.cnt += thread[i].stats.cnt;
4641  cur.skipped += thread[i].stats.skipped;
4642  }
4643 
4644  total_run = (now - thread_start) / 1000000.0;
4645  tps = 1000000.0 * (cur.cnt - last.cnt) / run;
4646  latency = 0.001 * (cur.latency.sum - last.latency.sum) /
4647  (cur.cnt - last.cnt);
4648  sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2)
4649  / (cur.cnt - last.cnt);
4650  stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
4651  lag = 0.001 * (cur.lag.sum - last.lag.sum) /
4652  (cur.cnt - last.cnt);
4653 
4654  if (progress_timestamp)
4655  sprintf(tbuf, "%.03f s",
4656  INSTR_TIME_GET_MILLISEC(now_time) / 1000.0);
4657  else
4658  sprintf(tbuf, "%.1f s", total_run);
4659 
4660  fprintf(stderr,
4661  "progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
4662  tbuf, tps, latency, stdev);
4663 
4664  if (throttle_delay)
4665  {
4666  fprintf(stderr, ", lag %.3f ms", lag);
4667  if (latency_limit)
4668  fprintf(stderr, ", " INT64_FORMAT " skipped",
4669  cur.skipped - last.skipped);
4670  }
4671  fprintf(stderr, "\n");
4672 
4673  last = cur;
4674  last_report = now;
4675 
4676  /*
4677  * Ensure that the next report is in the future, in case
4678  * pgbench/postgres got stuck somewhere.
4679  */
4680  do
4681  {
4682  next_report += (int64) progress *1000000;
4683  } while (now >= next_report);
4684  }
4685  }
4686  }
4687 
4688 done:
4689  INSTR_TIME_SET_CURRENT(start);
4690  disconnect_all(state, nstate);
4692  INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
4693  if (thread->logfile)
4694  {
4695  if (agg_interval > 0)
4696  {
4697  /* log aggregated but not yet reported transactions */
4698  doLog(thread, state, &aggs, false, 0, 0);
4699  }
4700  fclose(thread->logfile);
4701  thread->logfile = NULL;
4702  }
4703  return NULL;
4704 }
4705 
4706 /*
4707  * Support for duration option: set timer_exceeded after so many seconds.
4708  */
4709 
4710 #ifndef WIN32
4711 
4712 static void
4714 {
4715  timer_exceeded = true;
4716 }
4717 
4718 static void
4719 setalarm(int seconds)
4720 {
4722  alarm(seconds);
4723 }
4724 
4725 #else /* WIN32 */
4726 
4727 static VOID CALLBACK
4728 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
4729 {
4730  timer_exceeded = true;
4731 }
4732 
4733 static void
4734 setalarm(int seconds)
4735 {
4736  HANDLE queue;
4737  HANDLE timer;
4738 
4739  /* This function will be called at most once, so we can cheat a bit. */
4740  queue = CreateTimerQueue();
4741  if (seconds > ((DWORD) -1) / 1000 ||
4742  !CreateTimerQueueTimer(&timer, queue,
4743  win32_timer_callback, NULL, seconds * 1000, 0,
4744  WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
4745  {
4746  fprintf(stderr, "failed to set timer\n");
4747  exit(1);
4748  }
4749 }
4750 
4751 /* partial pthread implementation for Windows */
4752 
4753 typedef struct win32_pthread
4754 {
4755  HANDLE handle;
4756  void *(*routine) (void *);
4757  void *arg;
4758  void *result;
4759 } win32_pthread;
4760 
4761 static unsigned __stdcall
4762 win32_pthread_run(void *arg)
4763 {
4764  win32_pthread *th = (win32_pthread *) arg;
4765 
4766  th->result = th->routine(th->arg);
4767 
4768  return 0;
4769 }
4770 
4771 static int
4772 pthread_create(pthread_t *thread,
4773  pthread_attr_t *attr,
4774  void *(*start_routine) (void *),
4775  void *arg)
4776 {
4777  int save_errno;
4778  win32_pthread *th;
4779 
4780  th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
4781  th->routine = start_routine;
4782  th->arg = arg;
4783  th->result = NULL;
4784 
4785  th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
4786  if (th->handle == NULL)
4787  {
4788  save_errno = errno;
4789  free(th);
4790  return save_errno;
4791  }
4792 
4793  *thread = th;
4794  return 0;
4795 }
4796 
4797 static int
4798 pthread_join(pthread_t th, void **thread_return)
4799 {
4800  if (th == NULL || th->handle == NULL)
4801  return errno = EINVAL;
4802 
4803  if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
4804  {
4805  _dosmaperr(GetLastError());
4806  return errno;
4807  }
4808 
4809  if (thread_return)
4810  *thread_return = th->result;
4811 
4812  CloseHandle(th->handle);
4813  free(th);
4814  return 0;
4815 }
4816 
4817 #endif /* WIN32 */
time_t start_time
Definition: pgbench.c:231
static char password[100]
Definition: streamutil.c:41
int length(const List *list)
Definition: list.c:1271
PgBenchValue constant
Definition: pgbench.h:90
static const BuiltinScript * findBuiltin(const char *name)
Definition: pgbench.c:3377
static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *)
Definition: pgbench.c:1653
static struct @76 value
int64 throttle_trigger
Definition: pgbench.c:346
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1890
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:5959
int unlogged_tables
Definition: pgbench.c:122
PsqlScanResult
Definition: psqlscan.h:30
void expr_scanner_finish(yyscan_t yyscanner)
int gettimeofday(struct timeval *tp, struct timezone *tzp)
Definition: gettimeofday.c:105
double sample_rate
Definition: pgbench.c:127
static Variable * lookupVariable(CState *st, char *name)
Definition: pgbench.c:939
static bool putVariableNumber(CState *st, const char *context, char *name, const PgBenchValue *value)
Definition: pgbench.c:1113
#define PG_INT64_MAX
Definition: c.h:340
int main(int argc, char **argv)
Definition: pgbench.c:3609
static int64 getrand(TState *thread, int64 min, int64 max)
Definition: pgbench.c:627
#define ERRCODE_UNDEFINED_TABLE
Definition: pgbench.c:61
int type
Definition: pgbench.c:380
#define INVALID_THREAD
Definition: pgbench.c:356
int command_num
Definition: pgbench.c:379
int expr_yyparse(yyscan_t yyscanner)
ConnectionStateEnum
Definition: pgbench.c:242
int id
Definition: pgbench.c:312
const char * name
Definition: pgbench.c:405
static void ParseScript(const char *script, const char *desc, int weight)
Definition: pgbench.c:3192
static int chooseScript(TState *thread)
Definition: pgbench.c:1820
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1183
static void discard_response(CState *state)
Definition: pgbench.c:917
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
static int num_scripts
Definition: pgbench.c:396
char * line
Definition: pgbench.c:378
unsigned short random_state[3]
Definition: pgbench.c:345
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static void listAvailableScripts(void)
Definition: pgbench.c:3365
static Command * process_sql_command(PQExpBuffer buf, const char *source)
Definition: pgbench.c:2955
char * name
Definition: pgbench.c:201
int nclients
Definition: pgbench.c:174
const char * get_progname(const char *argv0)
Definition: path.c:453
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:128
PsqlScanState psql_scan_create(const PsqlScanCallbacks *callbacks)
struct BuiltinScript BuiltinScript
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:57
static int compareVariableNames(const void *v1, const void *v2)
Definition: pgbench.c:931
long random(void)
Definition: random.c:22
static void static void addScript(ParsedScript script)
Definition: pgbench.c:3458
static int parseScriptWeight(const char *option, char **script)
Definition: pgbench.c:3414
static const PsqlScanCallbacks pgbench_callbacks
Definition: pgbench.c:464
yyscan_t expr_scanner_init(PsqlScanState state, const char *source, int lineno, int start_offset, const char *command)
#define INSTR_TIME_GET_MILLISEC(t)
Definition: instr_time.h:199
struct timeval instr_time
Definition: instr_time.h:147
#define Min(x, y)
Definition: c.h:802
#define COMMANDS_ALLOC_NUM
static void preparedStatementName(char *buffer, int file, int state)
Definition: pgbench.c:1805
void syntax_error(const char *source, int lineno, const char *line, const char *command, const char *msg, const char *more, int column)
Definition: pgbench.c:2920
int64 latency_late
Definition: pgbench.c:353
PgBenchExpr * expr
Definition: pgbench.c:383
SimpleStats lag
Definition: pgbench.c:236
struct ParsedScript ParsedScript
struct cursor * cur
Definition: ecpg.c:28
StatsData stats
Definition: pgbench.c:392
static void initStats(StatsData *sd, time_t start_time)
Definition: pgbench.c:784
double sum
Definition: pgbench.c:221
#define INSTR_TIME_ACCUM_DIFF(x, y, z)
Definition: instr_time.h:179
void PQfinish(PGconn *conn)
Definition: fe-connect.c:3516
int weight
Definition: pgbench.c:390
int scale
Definition: pgbench.c:106
static void addToSimpleStats(SimpleStats *ss, double val)
Definition: pgbench.c:753
#define select(n, r, w, e, timeout)
Definition: win32.h:384
#define INSTR_TIME_SET_ZERO(t)
Definition: instr_time.h:151
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
bool use_log
Definition: pgbench.c:167
static char * getVariable(CState *st, char *name)
Definition: pgbench.c:966
static bool putVariable(CState *st, const char *context, char *name, const char *value)
Definition: pgbench.c:1090
static FILE * logfile
Definition: pg_regress.c:97
SimpleStats latency
Definition: pgbench.c:235
#define INSTR_TIME_GET_DOUBLE(t)
Definition: instr_time.h:196
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
#define lengthof(array)
Definition: c.h:558
static bool coerceToDouble(PgBenchValue *pval, double *dval)
Definition: pgbench.c:1254
#define MAX_SCRIPTS
Definition: pgbench.c:207
static void setDoubleValue(PgBenchValue *pv, double dval)
Definition: pgbench.c:1279
#define LOG_STEP_SECONDS
Definition: pgbench.c:93
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:53
#define MIN_GAUSSIAN_PARAM
Definition: pgbench.c:96
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define WSEP
Definition: pgbench.c:187
static void process_file(const char *filename, int weight)
Definition: pgbench.c:3324
bool per_script_stats
Definition: pgbench.c:171
union PgBenchValue::@32 u
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
char * argv[MAX_ARGS]
Definition: pgbench.c:382
bool use_quiet
Definition: pgbench.c:168
int duration
Definition: pgbench.c:99
static void commandFailed(CState *st, char *message)
Definition: pgbench.c:1811
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:470
#define INSTR_TIME_IS_ZERO(t)
Definition: instr_time.h:149
static time_t start_time
Definition: pg_ctl.c:91
#define pg_attribute_printf(f, a)
Definition: c.h:634
#define MAX_ARGS
Definition: pgbench.c:363
static void executeStatement(PGconn *con, const char *sql)
Definition: pgbench.c:818
static bool parseQuery(Command *cmd, const char *raw_sql)
Definition: pgbench.c:2850
static bool makeVariableNumeric(Variable *var)
Definition: pgbench.c:995
static bool isLegalVariableName(const char *name)
Definition: pgbench.c:1025
#define SCALE_32BIT_THRESHOLD
Definition: pgbench.c:165
Definition: type.h:90
static int64 getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
Definition: pgbench.c:670
bool is_connect
Definition: pgbench.c:176
Variable * variables
Definition: pgbench.c:319
int PQputline(PGconn *conn, const char *s)
Definition: fe-exec.c:2489
int nstate
Definition: pgbench.c:344
static const char * QUERYMODE[]
Definition: pgbench.c:374
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1132
#define MAX_FARGS
Definition: pgbench.c:1286
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:3474
int64 strtoint64(const char *str)
Definition: pgbench.c:564
#define SHELL_COMMAND_SIZE
Definition: pgbench.c:208
int64 end_time
Definition: pgbench.c:100
#define nbranches
Definition: pgbench.c:153
#define required_argument
Definition: getopt_long.h:25
FILE * logfile
Definition: pgbench.c:347
static void mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
Definition: pgbench.c:768
int foreign_keys
Definition: pgbench.c:117
int optind
Definition: getopt.c:51
instr_time stmt_begin
Definition: pgbench.c:327
int64 cnt
Definition: pgbench.c:232
CState * state
Definition: pgbench.c:343
int expr_scanner_get_lineno(PsqlScanState state, int offset)
static void printSimpleStats(char *prefix, SimpleStats *ss)
Definition: pgbench.c:3477
int fillfactor
Definition: pgbench.c:112
#define M_PI
Definition: pgbench.c:56
#define INSTR_TIME_SUBTRACT(x, y)
Definition: instr_time.h:167
#define META_COMMAND
Definition: pgbench.c:362
static bool evalFunc(TState *thread, CState *st, PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
Definition: pgbench.c:1292
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
static int debug
Definition: pgbench.c:400
PGconn * conn
Definition: streamutil.c:42
bool vars_sorted
Definition: pgbench.c:321
#define MAXPGPATH
static void doCustom(TState *thread, CState *st, StatsData *agg)
Definition: pgbench.c:1958
static char * assignVariables(CState *st, char *sql)
Definition: pgbench.c:1184
char sign
Definition: informix.c:693
QueryMode
Definition: pgbench.c:365
char * c
int nxacts
Definition: pgbench.c:98
static char * buf
Definition: pg_test_fsync.c:65
#define INSTR_TIME_ADD(x, y)
Definition: instr_time.h:155
#define memmove(d, s, c)
Definition: c.h:1058
#define PG_INT64_MIN
Definition: c.h:339
static void disconnect_all(CState *state, int length)
Definition: pgbench.c:2555
const char * desc
Definition: pgbench.c:389
PgBenchExpr * expr_parse_result
bool is_numeric
Definition: pgbench.c:203
void simple_prompt(const char *prompt, char *destination, size_t destlen, bool echo)
Definition: sprompt.c:37
char * tablespace
Definition: pgbench.c:146
static void usage(void)
Definition: pgbench.c:471
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
int64 cnt
Definition: pgbench.c:332
#define PARAMS_ARRAY_SIZE
instr_time start_time
Definition: pgbench.c:350
char * index_tablespace
Definition: pgbench.c:147
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
int nthreads
Definition: pgbench.c:175
int64 txn_scheduled
Definition: pgbench.c:324
char * value
Definition: pgbench.c:202
int expr_scanner_offset(PsqlScanState state)
static ParsedScript sql_script[MAX_SCRIPTS]
Definition: pgbench.c:395
static bool sendCommand(CState *st, Command *command)
Definition: pgbench.c:1839
static Variable * lookupCreateVariable(CState *st, const char *context, char *name)
Definition: pgbench.c:1044
static bool is_an_int(const char *str)
Definition: pgbench.c:532
static char * replaceVariable(char **sql, char *param, int len, char *value)
Definition: pgbench.c:1164
int64 count
Definition: pgbench.c:218
int64 skipped
Definition: pgbench.c:233
int ecnt
Definition: pgbench.c:333
static int64 total_weight
Definition: pgbench.c:398
char * expr_scanner_get_substring(PsqlScanState state, int start_offset, int end_offset)
char * pghost
Definition: pgbench.c:180
int progress
Definition: pgbench.c:172
static void doLog(TState *thread, CState *st, StatsData *agg, bool skipped, double latency, double lag)
Definition: pgbench.c:2437
#define CppAsString2(x)
Definition: c.h:1007
#define no_argument
Definition: getopt_long.h:24
instr_time txn_begin
Definition: pgbench.c:326
static bool putVariableInt(CState *st, const char *context, char *name, int64 value)
Definition: pgbench.c:1134
#define MAX_PREPARE_NAME
Definition: pgbench.c:1803
#define ntellers
Definition: pgbench.c:155
typedef VOID(WINAPI *PgGetSystemTimeFn)(LPFILETIME)
int argc
Definition: pgbench.c:381
#define SQL_COMMAND
Definition: pgbench.c:361
double max
Definition: pgbench.c:220
static const BuiltinScript builtin_script[]
Definition: pgbench.c:410
#define EINTR
Definition: win32.h:295
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1631
int command
Definition: pgbench.c:316
static void * threadRun(void *arg)
Definition: pgbench.c:4366
static void accumStats(StatsData *stats, bool skipped, double lat, double lag)
Definition: pgbench.c:797
void psql_scan_destroy(PsqlScanState state)
SimpleStats stats
Definition: pgbench.c:384
#define MAXCLIENTS
Definition: pgbench.c:90
#define naccounts
Definition: pgbench.c:156
double pg_erand48(unsigned short xseed[3])
Definition: erand48.c:79
ConnectionStateEnum state
Definition: pgbench.c:313
const char * desc
Definition: pgbench.c:406
static bool runShellCommand(CState *st, char *variable, char **argv, int argc)
Definition: pgbench.c:1700
double dval
Definition: pgbench.h:47
static void getQueryParams(CState *st, const Command *command, const char **params)
Definition: pgbench.c:1220
static int64 getPoissonRand(TState *thread, int64 center)
Definition: pgbench.c:726
void PQclear(PGresult *res)
Definition: fe-exec.c:650
int64 sleep_until
Definition: pgbench.c:325
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
#define free(a)
Definition: header.h:60
static void printResults(TState *threads, StatsData *total, instr_time total_time, instr_time conn_total_time, int latency_late)
Definition: pgbench.c:3489
bool progress_timestamp
Definition: pgbench.c:173
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
static bool evaluateSleep(CState *st, int argc, char **argv, int *usecs)
Definition: pgbench.c:1922
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:5827
static bool have_password
Definition: streamutil.c:40
#define Max(x, y)
Definition: c.h:796
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2522
PGconn * con
Definition: pgbench.c:311
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2658
#define SIGNAL_ARGS
Definition: c.h:1079
#define NULL
Definition: c.h:226
void * yyscan_t
Definition: psqlscan_int.h:60
static PGconn * doConnect(void)
Definition: pgbench.c:848
#define Assert(condition)
Definition: c.h:671
void _dosmaperr(unsigned long)
Definition: win32error.c:171
double sum2
Definition: pgbench.c:222
Definition: regguts.h:298
const char * progname
Definition: pgbench.c:185
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1681
char * logfile_prefix
Definition: pgbench.c:184
#define INSTR_TIME_GET_MICROSEC(t)
Definition: instr_time.h:202
static void setIntValue(PgBenchValue *pv, int64 ival)
Definition: pgbench.c:1271
enum _promptStatus promptStatus_t
static char * read_file_contents(FILE *fd)
Definition: pgbench.c:3291
volatile bool timer_exceeded
Definition: pgbench.c:189
Command ** commands
Definition: pgbench.c:391
struct SimpleStats SimpleStats
static void init(bool is_no_vacuum)
Definition: pgbench.c:2571
static QueryMode querymode
Definition: pgbench.c:373
void pg_free(void *ptr)
Definition: fe_memutils.c:105
static char * parseVariable(const char *sql, int *eaten)
Definition: pgbench.c:1143
int64 latency_limit
Definition: pgbench.c:141
#define INSTR_TIME_SET_CURRENT(t)
Definition: instr_time.h:153
#define INT64_FORMAT
Definition: c.h:312
const char * name
Definition: encode.c:521
static void pgbench_error(const char *fmt,...) pg_attribute_printf(1
Definition: pgbench.c:2897
PsqlScanResult psql_scan(PsqlScanState state, PQExpBuffer query_buf, promptStatus_t *prompt)
pthread_t thread
Definition: pgbench.c:342
const char * script
Definition: pgbench.c:407
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1325
static Command * process_backslash_command(PsqlScanState sstate, const char *source)
Definition: pgbench.c:3027
PgBenchFunction function
Definition: pgbench.h:97
int main_pid
Definition: pgbench.c:178
int nvariables
Definition: pgbench.c:320
static core_yyscan_t yyscanner
Definition: pl_scanner.c:208
static Datum values[MAXATTR]
Definition: bootstrap.c:162
static void initSimpleStats(SimpleStats *ss)
Definition: pgbench.c:744
static char * filename
Definition: pg_dumpall.c:80
int PQconnectionNeedsPassword(const PGconn *conn)
Definition: fe-connect.c:5993
char * dbName
Definition: pgbench.c:183
int64 throttle_delay
Definition: pgbench.c:133
char * optarg
Definition: getopt.c:53
static void handle_sig_alarm(SIGNAL_ARGS)
Definition: pgbench.c:4713
PgBenchValueType type
Definition: pgbench.h:43
static int64 getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
Definition: pgbench.c:647
StatsData stats
Definition: pgbench.c:352
int i
const char * strerror(int errnum)
Definition: strerror.c:19
static void process_builtin(const BuiltinScript *bi, int weight)
Definition: pgbench.c:3358
instr_time conn_time
Definition: pgbench.c:351
static bool coerceToInt(PgBenchValue *pval, int64 *ival)
Definition: pgbench.c:1230
void psql_scan_setup(PsqlScanState state, const char *line, int line_len, int encoding, bool std_strings)
bool prepared[MAX_SCRIPTS]
Definition: pgbench.c:329
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
static void processXactStats(TState *thread, CState *st, instr_time *now, bool skipped, StatsData *agg)
Definition: pgbench.c:2517
void srandom(unsigned int seed)
Definition: srandom.c:22
void * arg
int tid
Definition: pgbench.c:341
void psql_scan_finish(PsqlScanState state)
static void tryExecuteStatement(PGconn *con, const char *sql)
Definition: pgbench.c:833
static int num_commands
Definition: pgbench.c:397
#define DEFAULT_NXACTS
Definition: pgbench.c:94
#define pthread_t
Definition: pgbench.c:79
#define qsort(a, b, c, d)
Definition: port.h:440
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:145
PgBenchFunction
Definition: pgbench.h:61
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:5906
static void setalarm(int seconds)
Definition: pgbench.c:4719
int64 ival
Definition: pgbench.h:46
bool expr_lex_one_word(PsqlScanState state, PQExpBuffer word_buf, int *offset)
struct StatsData StatsData
int use_file
Definition: pgbench.c:315
#define SIGALRM
Definition: win32.h:202
#define _(x)
Definition: elog.c:84
void PQfreemem(void *ptr)
Definition: fe-exec.c:3200
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:5977
char * pgport
Definition: pgbench.c:181
long val
Definition: informix.c:689
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1702
bool is_latencies
Definition: pgbench.c:177
struct PgBenchExpr::@33::@34 variable
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:89
double min
Definition: pgbench.c:219
char * login
Definition: pgbench.c:182
int agg_interval
Definition: pgbench.c:169
PgBenchValue num_value
Definition: pgbench.c:204
union PgBenchExpr::@33 u
PgBenchExprType etype
Definition: pgbench.h:87