diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c new file mode 100644 index 08095a9..6564057 *** a/contrib/pgbench/pgbench.c --- b/contrib/pgbench/pgbench.c *************** int unlogged_tables = 0; *** 137,142 **** --- 137,148 ---- double sample_rate = 0.0; /* + * When threads are throttled to a given rate limit, this is the target delay + * to reach that rate in usec. 0 is the default and means no throttling. + */ + int64 throttle_delay = 0; + + /* * tablespace selection */ char *tablespace = NULL; *************** typedef struct *** 200,210 **** --- 206,218 ---- int listen; /* 0 indicates that an async query has been * sent */ int sleeping; /* 1 indicates that the client is napping */ + bool throttling; /* whether nap is for throttling */ int64 until; /* napping until (usec) */ Variable *variables; /* array of variable definitions */ int nvariables; instr_time txn_begin; /* used for measuring transaction latencies */ instr_time stmt_begin; /* used for measuring statement latencies */ + bool is_throttled; /* whether transaction should be throttled */ int use_file; /* index in sql_files for this client */ bool prepared[MAX_FILES]; } CState; *************** typedef struct *** 222,227 **** --- 230,238 ---- instr_time *exec_elapsed; /* time spent executing cmds (per Command) */ int *exec_count; /* number of cmd executions (per Command) */ unsigned short random_state[3]; /* separate randomness for each thread */ + int64 throttle_trigger; /* previous/next throttling (us) */ + int64 throttle_lag; /* total transaction lag behind throttling */ + int64 throttle_lag_max; /* max transaction lag */ } TState; #define INVALID_THREAD ((pthread_t) 0) *************** typedef struct *** 230,235 **** --- 241,248 ---- { instr_time conn_time; int xacts; + int64 throttle_lag; + int64 throttle_lag_max; } TResult; /* *************** usage(void) *** 353,358 **** --- 366,372 ---- " -n, --no-vacuum do not run VACUUM before tests\n" " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n" " -r, --report-latencies report average latency per command\n" + " -R, --rate SPEC target rate in transactions per second\n" " -s, --scale=NUM report this scale factor in output\n" " -S, --select-only perform SELECT-only transactions\n" " -t, --transactions number of transactions each client runs " *************** doCustom(TState *thread, CState *st, ins *** 900,916 **** { PGresult *res; Command **commands; top: commands = sql_files[st->use_file]; if (st->sleeping) { /* are we sleeping? */ instr_time now; INSTR_TIME_SET_CURRENT(now); ! if (st->until <= INSTR_TIME_GET_MICROSEC(now)) st->sleeping = 0; /* Done sleeping, go ahead with next command */ else return true; /* Still sleeping, nothing to do here */ } --- 914,973 ---- { PGresult *res; Command **commands; + bool trans_needs_throttle = false; top: commands = sql_files[st->use_file]; + /* + * Handle throttling once per transaction by sleeping. It is simpler + * to do this here rather than at the end, because so much complicated + * logic happens below when statements finish. + */ + if (throttle_delay && ! st->is_throttled) + { + /* + * Use inverse transform sampling to randomly generate a delay, such + * that the series of delays will approximate a Poisson distribution + * centered on the throttle_delay time. + * + * If transactions are too slow or a given wait is shorter than + * a transaction, the next transaction will start right away. + */ + int64 wait = (int64) + throttle_delay * -log(getrand(thread, 1, 1000)/1000.0); + + thread->throttle_trigger += wait; + + st->until = thread->throttle_trigger; + st->sleeping = 1; + st->throttling = true; + st->is_throttled = true; + if (debug) + fprintf(stderr, "client %d throttling "INT64_FORMAT" us\n", + st->id, wait); + } + if (st->sleeping) { /* are we sleeping? */ instr_time now; + int64 now_us; INSTR_TIME_SET_CURRENT(now); ! now_us = INSTR_TIME_GET_MICROSEC(now); ! if (st->until <= now_us) ! { st->sleeping = 0; /* Done sleeping, go ahead with next command */ + if (st->throttling) + { + /* Measure lag of throttled transaction relative to target */ + int64 lag = now_us - st->until; + thread->throttle_lag += lag; + if (lag > thread->throttle_lag_max) + thread->throttle_lag_max = lag; + st->throttling = false; + } + } else return true; /* Still sleeping, nothing to do here */ } *************** top: *** 1097,1102 **** --- 1154,1168 ---- st->state = 0; st->use_file = (int) getrand(thread, 0, num_files - 1); commands = sql_files[st->use_file]; + st->is_throttled = false; + /* + * No transaction is underway anymore, which means there is nothing + * to listen to right now. When throttling rate limits are active, + * a sleep will happen next, as the next transaction starts. And + * then in any case the next SQL command will set listen back to 1. + */ + st->listen = 0; + trans_needs_throttle = (throttle_delay>0); } } *************** top: *** 1115,1120 **** --- 1181,1195 ---- INSTR_TIME_ACCUM_DIFF(*conn_time, end, start); } + /* + * This won't add a throttling delay during the first transaction after + * opening a connection. They will be inserted starting with the second. + */ + if (trans_needs_throttle) { + trans_needs_throttle = false; + goto top; + } + /* Record transaction start time if logging is enabled */ if (logfile && st->state == 0) INSTR_TIME_SET_CURRENT(st->txn_begin); *************** process_builtin(char *tb) *** 2019,2025 **** static void printResults(int ttype, int normal_xacts, int nclients, TState *threads, int nthreads, ! instr_time total_time, instr_time conn_total_time) { double time_include, tps_include, --- 2094,2101 ---- static void printResults(int ttype, int normal_xacts, int nclients, TState *threads, int nthreads, ! instr_time total_time, instr_time conn_total_time, ! int64 throttle_lag, int64 throttle_lag_max) { double time_include, tps_include, *************** printResults(int ttype, int normal_xacts *** 2057,2062 **** --- 2133,2151 ---- printf("number of transactions actually processed: %d\n", normal_xacts); } + + if (throttle_delay) + { + /* + * Report average transaction lag under rate limit throttling. This + * is the delay between scheduled and actual start times for the + * transaction. The measured lag may be caused by thread/client load, + * the database load, or the Poisson throttling process. + */ + printf("average rate limit lag: %.3f ms (max %.3f ms)\n", + 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max); + } + printf("tps = %f (including connections establishing)\n", tps_include); printf("tps = %f (excluding connections establishing)\n", tps_exclude); *************** main(int argc, char **argv) *** 2141,2146 **** --- 2230,2236 ---- {"unlogged-tables", no_argument, &unlogged_tables, 1}, {"sampling-rate", required_argument, NULL, 4}, {"aggregate-interval", required_argument, NULL, 5}, + {"rate", required_argument, NULL, 'R'}, {NULL, 0, NULL, 0} }; *************** main(int argc, char **argv) *** 2163,2168 **** --- 2253,2260 ---- instr_time total_time; instr_time conn_total_time; int total_xacts; + int64 throttle_lag = 0; + int64 throttle_lag_max = 0; int i; *************** main(int argc, char **argv) *** 2207,2213 **** state = (CState *) pg_malloc(sizeof(CState)); memset(state, 0, sizeof(CState)); ! while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1) { switch (c) { --- 2299,2305 ---- state = (CState *) pg_malloc(sizeof(CState)); memset(state, 0, sizeof(CState)); ! while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:R:", long_options, &optindex)) != -1) { switch (c) { *************** main(int argc, char **argv) *** 2362,2367 **** --- 2454,2472 ---- exit(1); } break; + case 'R': + { + /* get a double from the beginning of option value */ + double throttle_value = atof(optarg); + if (throttle_value <= 0.0) + { + fprintf(stderr, "invalid rate limit: %s\n", optarg); + exit(1); + } + /* Invert rate limit into a time offset */ + throttle_delay = (int64) (1000000.0 / throttle_value); + } + break; case 0: /* This covers long options which take no argument. */ break; *************** main(int argc, char **argv) *** 2399,2404 **** --- 2504,2512 ---- } } + /* compute a per thread delay */ + throttle_delay *= nthreads; + if (argc > optind) dbName = argv[optind]; else *************** main(int argc, char **argv) *** 2711,2716 **** --- 2819,2827 ---- TResult *r = (TResult *) ret; total_xacts += r->xacts; + throttle_lag += r->throttle_lag; + if (r->throttle_lag_max > throttle_lag_max) + throttle_lag_max = r->throttle_lag_max; INSTR_TIME_ADD(conn_total_time, r->conn_time); free(ret); } *************** main(int argc, char **argv) *** 2721,2727 **** INSTR_TIME_SET_CURRENT(total_time); INSTR_TIME_SUBTRACT(total_time, start_time); printResults(ttype, total_xacts, nclients, threads, nthreads, ! total_time, conn_total_time); return 0; } --- 2832,2838 ---- INSTR_TIME_SET_CURRENT(total_time); INSTR_TIME_SUBTRACT(total_time, start_time); printResults(ttype, total_xacts, nclients, threads, nthreads, ! total_time, conn_total_time, throttle_lag, throttle_lag_max); return 0; } *************** threadRun(void *arg) *** 2741,2746 **** --- 2852,2868 ---- AggVals aggs; + /* + * Initialize throttling rate target for all of the thread's clients. It + * might be a little more accurate to reset thread->start_time here too. + * The possible drift seems too small relative to typical throttle delay + * times to worry about it. + */ + INSTR_TIME_SET_CURRENT(start); + thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start); + thread->throttle_lag = 0; + thread->throttle_lag_max = 0; + result = pg_malloc(sizeof(TResult)); INSTR_TIME_SET_ZERO(result->conn_time); *************** threadRun(void *arg) *** 2816,2840 **** Command **commands = sql_files[st->use_file]; int sock; ! if (st->sleeping) { ! int this_usec; ! ! if (min_usec == INT64_MAX) { ! instr_time now; ! ! INSTR_TIME_SET_CURRENT(now); ! now_usec = INSTR_TIME_GET_MICROSEC(now); } ! this_usec = st->until - now_usec; ! if (min_usec > this_usec) ! min_usec = this_usec; ! } ! else if (st->con == NULL) ! { ! continue; } else if (commands[st->state]->type == META_COMMAND) { --- 2938,2975 ---- Command **commands = sql_files[st->use_file]; int sock; ! if (st->con == NULL) { ! continue; ! } ! else if (st->sleeping) ! { ! if (st->throttling && timer_exceeded) { ! /* interrupt client which has not started a transaction */ ! remains--; ! st->sleeping = 0; ! st->throttling = false; ! PQfinish(st->con); ! st->con = NULL; ! continue; } + else /* just a nap from the script */ + { + int this_usec; ! if (min_usec == INT64_MAX) ! { ! instr_time now; ! ! INSTR_TIME_SET_CURRENT(now); ! now_usec = INSTR_TIME_GET_MICROSEC(now); ! } ! ! this_usec = st->until - now_usec; ! if (min_usec > this_usec) ! min_usec = this_usec; ! } } else if (commands[st->state]->type == META_COMMAND) { *************** done: *** 2909,2914 **** --- 3044,3051 ---- result->xacts = 0; for (i = 0; i < nstate; i++) result->xacts += state[i].cnt; + result->throttle_lag = thread->throttle_lag; + result->throttle_lag_max = thread->throttle_lag_max; INSTR_TIME_SET_CURRENT(end); INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start); if (logfile) diff --git a/doc/src/sgml/pgbench.sgml b/doc/src/sgml/pgbench.sgml new file mode 100644 index a7f41e1..a6ab1bc *** a/doc/src/sgml/pgbench.sgml --- b/doc/src/sgml/pgbench.sgml *************** pgbench options< *** 408,413 **** --- 408,434 ---- + rate + rate + + + Execute transactions targeting the specified rate instead of + running as fast as possible (the default). The rate is given in + transactions per second. If the targeted rate is + above the maximum possible rate these transactions can execute at, + the rate limit won't have any impact on results. + + The rate is targeted by starting transactions along a + Poisson-distributed event time line. When a rate limit is + active, the average and maximum transaction lag time + (the delay between the scheduled and actual transaction start times) + are reported in ms. High values indicate that the database + could not handle the scheduled load at some time. + + + + + scale_factor scale_factor