diff --git a/ext/mysql.c b/ext/mysql.c index 79480f5..739d352 100644 --- a/ext/mysql.c +++ b/ext/mysql.c @@ -60,6 +60,7 @@ struct mysql { MYSQL handler; char connection; char query_with_result; + char blocking; }; struct mysql_res { @@ -268,6 +269,11 @@ static VALUE real_connect(int argc, VALUE* argv, VALUE klass) myp->handler.reconnect = 0; myp->connection = Qtrue; + + my_bool was_blocking; + vio_blocking(myp->handler.net.vio, 0, &was_blocking); + myp->blocking = vio_is_blocking( myp->handler.net.vio ); + myp->query_with_result = Qtrue; rb_obj_call_init(obj, argc, argv); @@ -756,13 +762,31 @@ static VALUE socket(VALUE obj) return INT2NUM(m->net.fd); } -/* send_query(sql,timeout=nil) */ -static VALUE send_query(int argc, VALUE* argv, VALUE obj) +/* blocking */ +static VALUE blocking(VALUE obj){ + return ( GetMysqlStruct(obj)->blocking ? Qtrue : Qfalse ); +} + +/* readable(timeout=nil) */ +static VALUE readable( int argc, VALUE* argv, VALUE obj ) { MYSQL* m = GetHandler(obj); - VALUE sql, timeout; - rb_scan_args(argc, argv, "11", &sql, &timeout); + VALUE timeout; + + rb_scan_args(argc, argv, "01", &timeout); + + if ( NIL_P( timeout ) ){ + timeout = m->net.read_timeout; + } + + return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse ); +} + +/* send_query(sql) */ +static VALUE send_query(VALUE obj, VALUE sql) +{ + MYSQL* m = GetHandler(obj); Check_Type(sql, T_STRING); if (GetMysqlStruct(obj)->connection == Qfalse) { @@ -789,17 +813,46 @@ static VALUE get_result(VALUE obj) return store_result(obj); } -/* async_query */ -/* -comment it out until I figure out how it works -static VALUE async_query(VALUE obj, VALUE sql) +/* async_query(sql,timeout=nil) */ +static VALUE async_query(int argc, VALUE* argv, VALUE obj) { + MYSQL* m = GetHandler(obj); + VALUE sql, timeout; + fd_set read; + int ret; + + rb_scan_args(argc, argv, "11", &sql, &timeout); + send_query(obj,sql); - rb_io_wait_readable(socket(obj)); + + if (NIL_P(timeout)) { + timeout = m->net.read_timeout; + } + + VALUE args[1]; + args[0] = timeout; + + struct timeval tv = { tv_sec: timeout, tv_usec: 0 }; + + for(;;) { + FD_ZERO(&read); + FD_SET(m->net.fd, &read); + ret = rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv); + if (ret < 0) { + rb_sys_fail(0); + } + + if (ret == 0) { + continue; + } + + if (readable(1, (VALUE *)args, obj) == Qtrue) { + break; + } + } + return get_result(obj); } -*/ - #if MYSQL_VERSION_ID >= 40100 /* server_version() */ @@ -2090,9 +2143,11 @@ void Init_mysql(void) #endif rb_define_method(cMysql, "query", query, 1); rb_define_method(cMysql, "real_query", query, 1); - /*rb_define_method(cMysql, "async_query", async_query, 1);*/ - rb_define_method(cMysql, "send_query", send_query, -1); + rb_define_method(cMysql, "async_query", async_query, -1); + rb_define_method(cMysql, "send_query", send_query, 1); rb_define_method(cMysql, "get_result", get_result, 0); + rb_define_method(cMysql, "readable?", readable, -1); + rb_define_method(cMysql, "blocking?", blocking, 0); rb_define_method(cMysql, "socket", socket, 0); rb_define_method(cMysql, "refresh", refresh, 1); rb_define_method(cMysql, "reload", reload, 0); diff --git a/lib/mysqlplus.rb b/lib/mysqlplus.rb index 474eeb3..48fa44a 100644 --- a/lib/mysqlplus.rb +++ b/lib/mysqlplus.rb @@ -1,11 +1,17 @@ require 'mysql' class Mysql - def async_query(sql) + + alias_method :c_async_query, :async_query + + def async_query(sql, timeout = nil) + puts "** Blocking ? #{blocking?().inspect}" if ENV['MYSQL_BLOCKING_STATUS'] == '1' + return c_async_query(sql, timeout) if ENV['MYSQL_C_ASYNC_QUERY'] == '1' send_query(sql) select [ (@sockets ||= {})[socket] ||= IO.new(socket) ], nil, nil, nil get_result end + end class Mysql::Result diff --git a/test/evented_test.rb b/test/evented_test.rb index ec8ad3d..0155db4 100644 --- a/test/evented_test.rb +++ b/test/evented_test.rb @@ -2,5 +2,12 @@ require File.dirname(__FILE__) + '/test_helper' EventedMysqlTest.new( 10 ) do |test| test.setup{ Mysql.real_connect('localhost','root') } + test.log_blocking_status = true + test.run! +end + +EventedMysqlTest.new( 10 ) do |test| + test.setup{ Mysql.real_connect('localhost','root') } + test.c_async_query = true test.run! end \ No newline at end of file diff --git a/test/test_helper.rb b/test/test_helper.rb index 3f5078c..4e5c96e 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -10,11 +10,14 @@ class MysqlTest :connections, :connection_signature, :start, - :done + :done, + :c_async_query, + :log_blocking_status def initialize( queries ) @queries = queries @done = [] + @c_async_query = false yield self if block_given? end @@ -24,7 +27,12 @@ class MysqlTest end def run! - raise NotImplemented + c_or_native_ruby_async_query do + with_blocking_status do + prepare + yield + end + end end def prepare @@ -49,6 +57,28 @@ class MysqlTest Time.now - @start end + protected + + def c_or_native_ruby_async_query + if @c_async_query + ENV['MYSQL_C_ASYNC_QUERY'] = '1' + log "** using C based async_query" + else + ENV['MYSQL_C_ASYNC_QUERY'] = '0' + log "** using native Ruby async_query" + end + yield + end + + def with_blocking_status + if @log_blocking_status + ENV['MYSQL_BLOCKING_STATUS'] = '1' + else + ENV['MYSQL_BLOCKING_STATUS'] = '0' + end + yield + end + end class EventedMysqlTest < MysqlTest @@ -73,18 +103,22 @@ class EventedMysqlTest < MysqlTest end def run! - prepare - - loop do - result = select( @sockets,nil,nil,nil ) - if result - result.first.each do |conn| - @connections[conn].get_result.each{|res| log( "Result for socket #{conn.fileno} : #{res}" ) } - @done << nil - teardown if done? - end + super do + catch :END_EVENT_LOOP do + loop do + result = select( @sockets,nil,nil,nil ) + if result + result.first.each do |conn| + @connections[conn].get_result.each{|res| log( "Result for socket #{conn.fileno} : #{res}" ) } + @done << nil + if done? + teardown + end + end + end + end end - end + end end def prepare @@ -95,7 +129,7 @@ class EventedMysqlTest < MysqlTest def teardown log "done" - exit + throw :END_EVENT_LOOP end protected @@ -126,11 +160,11 @@ class ThreadedMysqlTest < MysqlTest end def run! - prepare - - with_logging "waiting on threads" do - @threads.each{|t| t.join } - end + super do + with_logging "waiting on threads" do + @threads.each{|t| t.join } + end + end end def prepare diff --git a/test/threaded_test.rb b/test/threaded_test.rb index 7a18a26..122c817 100644 --- a/test/threaded_test.rb +++ b/test/threaded_test.rb @@ -2,5 +2,12 @@ require File.dirname(__FILE__) + '/test_helper' ThreadedMysqlTest.new( 10 ) do |test| test.setup{ Mysql.real_connect('localhost','root') } + test.log_blocking_status = true + test.run! +end + +ThreadedMysqlTest.new( 10 ) do |test| + test.setup{ Mysql.real_connect('localhost','root') } + test.c_async_query = true test.run! end \ No newline at end of file