Extract thread scheduler
This commit is contained in:
parent
6884d454bc
commit
3c5dd38037
42
ext/mysql.c
42
ext/mysql.c
@ -764,7 +764,6 @@ static VALUE socket(VALUE obj)
|
||||
MYSQL* m = GetHandler(obj);
|
||||
return INT2NUM(m->net.fd);
|
||||
}
|
||||
|
||||
/* socket_type */
|
||||
static VALUE socket_type(VALUE obj)
|
||||
{
|
||||
@ -824,29 +823,20 @@ static VALUE get_result(VALUE obj)
|
||||
return store_result(obj);
|
||||
}
|
||||
|
||||
/* async_query(sql,timeout=nil) */
|
||||
static VALUE async_query(int argc, VALUE* argv, VALUE obj)
|
||||
static VALUE schedule(VALUE obj, VALUE timeout)
|
||||
{
|
||||
MYSQL* m = GetHandler(obj);
|
||||
VALUE sql, timeout;
|
||||
fd_set read;
|
||||
int ret;
|
||||
MYSQL* m = GetHandler(obj);
|
||||
fd_set read;
|
||||
int ret;
|
||||
|
||||
rb_scan_args(argc, argv, "11", &sql, &timeout);
|
||||
timeout = ( NIL_P(timeout) ? m->net.read_timeout : INT2NUM(timeout) );
|
||||
|
||||
send_query(obj,sql);
|
||||
struct timeval tv = { tv_sec: timeout, tv_usec: 0 };
|
||||
|
||||
timeout = ( NIL_P(timeout) ? m->net.read_timeout : INT2NUM(timeout) );
|
||||
FD_ZERO(&read);
|
||||
FD_SET(m->net.fd, &read);
|
||||
|
||||
VALUE args[1];
|
||||
args[0] = timeout;
|
||||
|
||||
struct timeval tv = { tv_sec: timeout, tv_usec: 0 };
|
||||
|
||||
FD_ZERO(&read);
|
||||
FD_SET(m->net.fd, &read);
|
||||
|
||||
for(;;) {
|
||||
for(;;) {
|
||||
ret = rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv);
|
||||
if (ret < 0) {
|
||||
rb_raise(eMysql, "query: timeout");
|
||||
@ -861,6 +851,20 @@ static VALUE async_query(int argc, VALUE* argv, VALUE obj)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/* async_query(sql,timeout=nil) */
|
||||
static VALUE async_query(int argc, VALUE* argv, VALUE obj)
|
||||
{
|
||||
MYSQL* m = GetHandler(obj);
|
||||
VALUE sql, timeout;
|
||||
|
||||
rb_scan_args(argc, argv, "11", &sql, &timeout);
|
||||
|
||||
send_query(obj,sql);
|
||||
|
||||
schedule(obj, timeout);
|
||||
|
||||
return get_result(obj);
|
||||
}
|
||||
|
||||
|
@ -3,34 +3,34 @@ require File.dirname(__FILE__) + '/test_helper'
|
||||
ThreadedMysqlTest.new( 10, "Threaded, C, very small overhead" ) do |test|
|
||||
test.setup{ Mysql.real_connect('localhost','root') }
|
||||
test.per_query_overhead = 0.005
|
||||
test.c_async_query = true
|
||||
test.query_with = :c_async_query
|
||||
test.run!
|
||||
end
|
||||
|
||||
ThreadedMysqlTest.new( 10, "Threaded, C, small overhead" ) do |test|
|
||||
test.setup{ Mysql.real_connect('localhost','root') }
|
||||
test.per_query_overhead = 0.1
|
||||
test.c_async_query = true
|
||||
test.query_with = :c_async_query
|
||||
test.run!
|
||||
end
|
||||
|
||||
ThreadedMysqlTest.new( 10, "Threaded, C, medium overhead" ) do |test|
|
||||
test.setup{ Mysql.real_connect('localhost','root') }
|
||||
test.per_query_overhead = 1
|
||||
test.c_async_query = true
|
||||
test.query_with = :c_async_query
|
||||
test.run!
|
||||
end
|
||||
|
||||
ThreadedMysqlTest.new( 10, "Threaded, C, large overhead" ) do |test|
|
||||
test.setup{ Mysql.real_connect('localhost','root') }
|
||||
test.per_query_overhead = 3
|
||||
test.c_async_query = true
|
||||
test.query_with = :c_async_query
|
||||
test.run!
|
||||
end
|
||||
|
||||
ThreadedMysqlTest.new( 10, "Threaded, C, random overhead" ) do |test|
|
||||
test.setup{ Mysql.real_connect('localhost','root') }
|
||||
test.per_query_overhead = :random
|
||||
test.c_async_query = true
|
||||
test.query_with = :c_async_query
|
||||
test.run!
|
||||
end
|
@ -3,29 +3,34 @@ require File.dirname(__FILE__) + '/test_helper'
|
||||
ThreadedMysqlTest.new( 10, "Threaded, native Ruby, very small overhead" ) do |test|
|
||||
test.setup{ Mysql.real_connect('localhost','root') }
|
||||
test.per_query_overhead = 0.005
|
||||
test.query_with = :async_query
|
||||
test.run!
|
||||
end
|
||||
|
||||
ThreadedMysqlTest.new( 10, "Threaded, native Ruby, small overhead" ) do |test|
|
||||
test.setup{ Mysql.real_connect('localhost','root') }
|
||||
test.per_query_overhead = 0.1
|
||||
test.query_with = :async_query
|
||||
test.run!
|
||||
end
|
||||
|
||||
ThreadedMysqlTest.new( 10, "Threaded, native Ruby, medium overhead" ) do |test|
|
||||
test.setup{ Mysql.real_connect('localhost','root') }
|
||||
test.per_query_overhead = 1
|
||||
test.query_with = :async_query
|
||||
test.run!
|
||||
end
|
||||
|
||||
ThreadedMysqlTest.new( 10, "Threaded, native Ruby, large overhead" ) do |test|
|
||||
test.setup{ Mysql.real_connect('localhost','root') }
|
||||
test.per_query_overhead = 3
|
||||
test.query_with = :async_query
|
||||
test.run!
|
||||
end
|
||||
|
||||
ThreadedMysqlTest.new( 10, "Threaded, native Ruby, random overhead" ) do |test|
|
||||
test.setup{ Mysql.real_connect('localhost','root') }
|
||||
test.per_query_overhead = :random
|
||||
test.query_with = :async_query
|
||||
test.run!
|
||||
end
|
@ -12,7 +12,7 @@ class MysqlTest
|
||||
:connection_signature,
|
||||
:start,
|
||||
:done,
|
||||
:c_async_query,
|
||||
:query_with,
|
||||
:per_query_overhead,
|
||||
:timeout
|
||||
|
||||
@ -20,7 +20,7 @@ class MysqlTest
|
||||
@queries = queries
|
||||
@context = context
|
||||
@done = []
|
||||
@c_async_query = false
|
||||
@query_with = :async_query
|
||||
@per_query_overhead = 3
|
||||
@timeout = 20
|
||||
yield self if block_given?
|
||||
@ -78,7 +78,7 @@ class MysqlTest
|
||||
end
|
||||
|
||||
def c_or_native_ruby_async_query
|
||||
if @c_async_query
|
||||
if @query_with == :c_async_query
|
||||
log "** using C based async_query"
|
||||
else
|
||||
log "** using native Ruby async_query"
|
||||
@ -86,9 +86,8 @@ class MysqlTest
|
||||
yield
|
||||
end
|
||||
|
||||
def c_or_native_async_query( connection, sql, timeout = nil )
|
||||
method = @c_async_query ? :c_async_query : :async_query
|
||||
connection.send( method, sql, timeout )
|
||||
def dispatch_query( connection, sql, timeout = nil )
|
||||
connection.send( @query_with, sql, timeout )
|
||||
end
|
||||
|
||||
end
|
||||
@ -188,7 +187,7 @@ class ThreadedMysqlTest < MysqlTest
|
||||
|
||||
log "sending query on connection #{conn}"
|
||||
|
||||
c_or_native_async_query( @connections[conn], "select sleep(#{@per_query_overhead})", @timeout ).each do |result|
|
||||
dispatch_query( @connections[conn], "select sleep(#{@per_query_overhead})", @timeout ).each do |result|
|
||||
log "connection #{conn} done"
|
||||
end
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user