diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c index 80203d6..da88bd7 100644 --- a/contrib/pgbench/pgbench.c +++ b/contrib/pgbench/pgbench.c @@ -137,6 +137,12 @@ int unlogged_tables = 0; double sample_rate = 0.0; /* + * When threads are throttled to a given rate limit, this is the target delay + * to reach that rate in usec. 0 is the default and means no throttling. + */ +int64 throttle_delay = 0; + +/* * tablespace selection */ char *tablespace = NULL; @@ -200,11 +206,13 @@ typedef struct int listen; /* 0 indicates that an async query has been * sent */ int sleeping; /* 1 indicates that the client is napping */ + bool throttling; /* whether nap is for throttling */ int64 until; /* napping until (usec) */ Variable *variables; /* array of variable definitions */ int nvariables; instr_time txn_begin; /* used for measuring transaction latencies */ instr_time stmt_begin; /* used for measuring statement latencies */ + bool throttled; /* whether current transaction was throttled */ int use_file; /* index in sql_files for this client */ bool prepared[MAX_FILES]; } CState; @@ -222,6 +230,10 @@ typedef struct instr_time *exec_elapsed; /* time spent executing cmds (per Command) */ int *exec_count; /* number of cmd executions (per Command) */ unsigned short random_state[3]; /* separate randomness for each thread */ + int64 throttle_trigger; /* previous/next throttling (us) */ + int64 throttle_lag; /* total transaction lag behind throttling */ + int64 throttle_lag_max; /* max transaction lag */ + } TState; #define INVALID_THREAD ((pthread_t) 0) @@ -230,6 +242,8 @@ typedef struct { instr_time conn_time; int xacts; + int64 throttle_lag; + int64 throttle_lag_max; } TResult; /* @@ -353,6 +367,7 @@ usage(void) " -n, --no-vacuum do not run VACUUM before tests\n" " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n" " -r, --report-latencies report average latency per command\n" + " -R, --rate SPEC target rate in transactions per second\n" " -s, --scale=NUM report this scale factor in output\n" " -S, --select-only perform SELECT-only transactions\n" " -t, --transactions number of transactions each client runs " @@ -895,17 +910,58 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa { PGresult *res; Command **commands; + bool do_throttle = false; top: commands = sql_files[st->use_file]; + /* handle throttling once per transaction by inserting a sleep. + * this is simpler than doing it at the end. + */ + if (throttle_delay && ! st->throttled) + { + /* compute delay to approximate a Poisson distribution + * 1000000 => 13.8 .. 0 multiplier + * 100000 => 11.5 .. 0 + * 10000 => 9.2 .. 0 + * 1000 => 6.9 .. 0 + * if transactions are too slow or a given wait shorter than + * a transaction, the next transaction will start right away. + */ + int64 wait = (int64) + throttle_delay * -log(getrand(thread, 1, 1000)/1000.0); + + thread->throttle_trigger += wait; + + st->until = thread->throttle_trigger; + st->sleeping = 1; + st->throttling = true; + st->throttled = true; + if (debug) + fprintf(stderr, "client %d throttling "INT64_FORMAT" us\n", + st->id, wait); + } + if (st->sleeping) { /* are we sleeping? */ instr_time now; + int64 now_us; INSTR_TIME_SET_CURRENT(now); - if (st->until <= INSTR_TIME_GET_MICROSEC(now)) + now_us = INSTR_TIME_GET_MICROSEC(now); + if (st->until <= now_us) + { st->sleeping = 0; /* Done sleeping, go ahead with next command */ + if (st->throttling) + { + /* measure lag of throttled transaction */ + int64 lag = now_us - st->until; + thread->throttle_lag += lag; + if (lag > thread->throttle_lag_max) + thread->throttle_lag_max = lag; + st->throttling = false; + } + } else return true; /* Still sleeping, nothing to do here */ } @@ -1034,7 +1090,7 @@ top: * This is more than we really ought to know about * instr_time */ - fprintf(logfile, "%d %d %.0f %d %ld %ld\n", + fprintf(logfile, "%d %d %.0f %d %ld.%06ld\n", st->id, st->cnt, usec, st->use_file, (long) now.tv_sec, (long) now.tv_usec); #else @@ -1043,7 +1099,7 @@ top: * On Windows, instr_time doesn't provide a timestamp * anyway */ - fprintf(logfile, "%d %d %.0f %d 0 0\n", + fprintf(logfile, "%d %d %.0f %d 0.0\n", st->id, st->cnt, usec, st->use_file); #endif } @@ -1092,6 +1148,13 @@ top: st->state = 0; st->use_file = (int) getrand(thread, 0, num_files - 1); commands = sql_files[st->use_file]; + st->throttled = false; + /* no transaction is underway, there is nothing to listen any more. + * under throttling, a sleep is going to be inserted, and then + * some SQL command will set listen back to 1. + */ + st->listen = 0; + do_throttle = (throttle_delay>0); } } @@ -1110,6 +1173,12 @@ top: INSTR_TIME_ACCUM_DIFF(*conn_time, end, start); } + if (do_throttle) { + /* delay throttling after reopenning the connection */ + do_throttle = false; + goto top; + } + /* Record transaction start time if logging is enabled */ if (logfile && st->state == 0) INSTR_TIME_SET_CURRENT(st->txn_begin); @@ -2014,7 +2083,8 @@ process_builtin(char *tb) static void printResults(int ttype, int normal_xacts, int nclients, TState *threads, int nthreads, - instr_time total_time, instr_time conn_total_time) + instr_time total_time, instr_time conn_total_time, + int64 throttle_lag, int64 throttle_lag_max) { double time_include, tps_include, @@ -2052,6 +2122,18 @@ printResults(int ttype, int normal_xacts, int nclients, printf("number of transactions actually processed: %d\n", normal_xacts); } + + if (throttle_delay) + { + /* Report average transaction lag under throttling, i.e. the delay + between scheduled and actual start times for the transaction. + The measured lag may be linked to the thread/client load, + the database load, or the Poisson throttling process. + */ + printf("average transaction lag: %.3f ms (max %.3f ms)\n", + 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max); + } + printf("tps = %f (including connections establishing)\n", tps_include); printf("tps = %f (excluding connections establishing)\n", tps_exclude); @@ -2136,6 +2218,7 @@ main(int argc, char **argv) {"unlogged-tables", no_argument, &unlogged_tables, 1}, {"sampling-rate", required_argument, NULL, 4}, {"aggregate-interval", required_argument, NULL, 5}, + {"rate", required_argument, NULL, 'R'}, {NULL, 0, NULL, 0} }; @@ -2158,6 +2241,8 @@ main(int argc, char **argv) instr_time total_time; instr_time conn_total_time; int total_xacts; + int64 throttle_lag = 0; + int64 throttle_lag_max = 0; int i; @@ -2202,7 +2287,7 @@ main(int argc, char **argv) state = (CState *) pg_malloc(sizeof(CState)); memset(state, 0, sizeof(CState)); - while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1) + while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:R:", long_options, &optindex)) != -1) { switch (c) { @@ -2357,6 +2442,19 @@ main(int argc, char **argv) exit(1); } break; + case 'R': + { + /* get a double from the beginning of option value */ + double throttle_value = atof(optarg); + if (throttle_value <= 0.0) + { + fprintf(stderr, "invalid rate limit: %s\n", optarg); + exit(1); + } + /* Invert rate limit into a time offset */ + throttle_delay = (int64) (1000000.0 / throttle_value); + } + break; case 0: /* This covers long options which take no argument. */ break; @@ -2394,6 +2492,9 @@ main(int argc, char **argv) } } + /* compute a per thread delay */ + throttle_delay *= nthreads; + if (argc > optind) dbName = argv[optind]; else @@ -2706,6 +2807,9 @@ main(int argc, char **argv) TResult *r = (TResult *) ret; total_xacts += r->xacts; + throttle_lag += r->throttle_lag; + if (r->throttle_lag_max > throttle_lag_max) + throttle_lag_max = r->throttle_lag_max; INSTR_TIME_ADD(conn_total_time, r->conn_time); free(ret); } @@ -2716,7 +2820,7 @@ main(int argc, char **argv) INSTR_TIME_SET_CURRENT(total_time); INSTR_TIME_SUBTRACT(total_time, start_time); printResults(ttype, total_xacts, nclients, threads, nthreads, - total_time, conn_total_time); + total_time, conn_total_time, throttle_lag, throttle_lag_max); return 0; } @@ -2736,6 +2840,15 @@ threadRun(void *arg) AggVals aggs; + /* SHOULD take actual thread start time when the thread is running? */ + /* INSTR_TIME_SET_CURRENT(thread->start_time); */ + + /* throttling for all thread's clients */ + INSTR_TIME_SET_CURRENT(start); + thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start); + thread->throttle_lag = 0; + thread->throttle_lag_max = 0; + result = pg_malloc(sizeof(TResult)); INSTR_TIME_SET_ZERO(result->conn_time); @@ -2811,25 +2924,38 @@ threadRun(void *arg) Command **commands = sql_files[st->use_file]; int sock; - if (st->sleeping) + if (st->con == NULL) { - int this_usec; - - if (min_usec == INT64_MAX) + continue; + } + else if (st->sleeping) + { + if (st->throttling && timer_exceeded) { - instr_time now; - - INSTR_TIME_SET_CURRENT(now); - now_usec = INSTR_TIME_GET_MICROSEC(now); + /* interrupt client which has not started a transaction */ + remains--; + st->sleeping = 0; + st->throttling = false; + PQfinish(st->con); + st->con = NULL; + continue; } + else /* just a nap from the script */ + { + int this_usec; - this_usec = st->until - now_usec; - if (min_usec > this_usec) - min_usec = this_usec; - } - else if (st->con == NULL) - { - continue; + if (min_usec == INT64_MAX) + { + instr_time now; + + INSTR_TIME_SET_CURRENT(now); + now_usec = INSTR_TIME_GET_MICROSEC(now); + } + + this_usec = st->until - now_usec; + if (min_usec > this_usec) + min_usec = this_usec; + } } else if (commands[st->state]->type == META_COMMAND) { @@ -2904,6 +3030,8 @@ done: result->xacts = 0; for (i = 0; i < nstate; i++) result->xacts += state[i].cnt; + result->throttle_lag = thread->throttle_lag; + result->throttle_lag_max = thread->throttle_lag_max; INSTR_TIME_SET_CURRENT(end); INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start); if (logfile) diff --git a/doc/src/sgml/pgbench.sgml b/doc/src/sgml/pgbench.sgml index a7f41e1..a5fd218 100644 --- a/doc/src/sgml/pgbench.sgml +++ b/doc/src/sgml/pgbench.sgml @@ -408,6 +408,27 @@ pgbench options dbname + rate + rate + + + Execute transactions targeting the specified rate instead of + running as fast as possible (the default). The rate is given in + transactions per second. If the targeted rate is + above the maximum possible rate these transactions can execute at, + the rate limit won't have any impact on results. + + The rate is targeted by starting transactions along a + Poisson-distributed event time line. When a rate limit is + active, the average and maximum transaction lag time + (the delay between the scheduled and actual transaction start times) + are reported in ms. High values indicate that the database + could not handle the scheduled load at some time. + + + + + scale_factor scale_factor