Merge branch 'master' of git@github.com:oldmoe/mysqlplus

This commit is contained in:
Roger Pack 2008-09-08 09:46:05 -06:00
commit b9285dbcb7
5 changed files with 142 additions and 33 deletions

View File

@ -60,6 +60,7 @@ struct mysql {
MYSQL handler; MYSQL handler;
char connection; char connection;
char query_with_result; char query_with_result;
char blocking;
}; };
struct mysql_res { struct mysql_res {
@ -268,6 +269,11 @@ static VALUE real_connect(int argc, VALUE* argv, VALUE klass)
myp->handler.reconnect = 0; myp->handler.reconnect = 0;
myp->connection = Qtrue; myp->connection = Qtrue;
my_bool was_blocking;
vio_blocking(myp->handler.net.vio, 0, &was_blocking);
myp->blocking = vio_is_blocking( myp->handler.net.vio );
myp->query_with_result = Qtrue; myp->query_with_result = Qtrue;
rb_obj_call_init(obj, argc, argv); rb_obj_call_init(obj, argc, argv);
@ -756,13 +762,31 @@ static VALUE socket(VALUE obj)
return INT2NUM(m->net.fd); return INT2NUM(m->net.fd);
} }
/* send_query(sql,timeout=nil) */ /* blocking */
static VALUE send_query(int argc, VALUE* argv, VALUE obj) static VALUE blocking(VALUE obj){
return ( GetMysqlStruct(obj)->blocking ? Qtrue : Qfalse );
}
/* readable(timeout=nil) */
static VALUE readable( int argc, VALUE* argv, VALUE obj )
{ {
MYSQL* m = GetHandler(obj); MYSQL* m = GetHandler(obj);
VALUE sql, timeout;
rb_scan_args(argc, argv, "11", &sql, &timeout); VALUE timeout;
rb_scan_args(argc, argv, "01", &timeout);
if ( NIL_P( timeout ) ){
timeout = m->net.read_timeout;
}
return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse );
}
/* send_query(sql) */
static VALUE send_query(VALUE obj, VALUE sql)
{
MYSQL* m = GetHandler(obj);
Check_Type(sql, T_STRING); Check_Type(sql, T_STRING);
if (GetMysqlStruct(obj)->connection == Qfalse) { if (GetMysqlStruct(obj)->connection == Qfalse) {
@ -789,17 +813,46 @@ static VALUE get_result(VALUE obj)
return store_result(obj); return store_result(obj);
} }
/* async_query */ /* async_query(sql,timeout=nil) */
/* static VALUE async_query(int argc, VALUE* argv, VALUE obj)
comment it out until I figure out how it works
static VALUE async_query(VALUE obj, VALUE sql)
{ {
MYSQL* m = GetHandler(obj);
VALUE sql, timeout;
fd_set read;
int ret;
rb_scan_args(argc, argv, "11", &sql, &timeout);
send_query(obj,sql); send_query(obj,sql);
rb_io_wait_readable(socket(obj));
if (NIL_P(timeout)) {
timeout = m->net.read_timeout;
}
VALUE args[1];
args[0] = timeout;
struct timeval tv = { tv_sec: timeout, tv_usec: 0 };
for(;;) {
FD_ZERO(&read);
FD_SET(m->net.fd, &read);
ret = rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv);
if (ret < 0) {
rb_sys_fail(0);
}
if (ret == 0) {
continue;
}
if (readable(1, (VALUE *)args, obj) == Qtrue) {
break;
}
}
return get_result(obj); return get_result(obj);
} }
*/
#if MYSQL_VERSION_ID >= 40100 #if MYSQL_VERSION_ID >= 40100
/* server_version() */ /* server_version() */
@ -2090,9 +2143,11 @@ void Init_mysql(void)
#endif #endif
rb_define_method(cMysql, "query", query, 1); rb_define_method(cMysql, "query", query, 1);
rb_define_method(cMysql, "real_query", query, 1); rb_define_method(cMysql, "real_query", query, 1);
/*rb_define_method(cMysql, "async_query", async_query, 1);*/ rb_define_method(cMysql, "async_query", async_query, -1);
rb_define_method(cMysql, "send_query", send_query, -1); rb_define_method(cMysql, "send_query", send_query, 1);
rb_define_method(cMysql, "get_result", get_result, 0); rb_define_method(cMysql, "get_result", get_result, 0);
rb_define_method(cMysql, "readable?", readable, -1);
rb_define_method(cMysql, "blocking?", blocking, 0);
rb_define_method(cMysql, "socket", socket, 0); rb_define_method(cMysql, "socket", socket, 0);
rb_define_method(cMysql, "refresh", refresh, 1); rb_define_method(cMysql, "refresh", refresh, 1);
rb_define_method(cMysql, "reload", reload, 0); rb_define_method(cMysql, "reload", reload, 0);

View File

@ -1,11 +1,17 @@
require 'mysql' require 'mysql'
class Mysql class Mysql
def async_query(sql)
alias_method :c_async_query, :async_query
def async_query(sql, timeout = nil)
puts "** Blocking ? #{blocking?().inspect}" if ENV['MYSQL_BLOCKING_STATUS'] == '1'
return c_async_query(sql, timeout) if ENV['MYSQL_C_ASYNC_QUERY'] == '1'
send_query(sql) send_query(sql)
select [ (@sockets ||= {})[socket] ||= IO.new(socket) ], nil, nil, nil select [ (@sockets ||= {})[socket] ||= IO.new(socket) ], nil, nil, nil
get_result get_result
end end
end end
class Mysql::Result class Mysql::Result

View File

@ -2,5 +2,12 @@ require File.dirname(__FILE__) + '/test_helper'
EventedMysqlTest.new( 10 ) do |test| EventedMysqlTest.new( 10 ) do |test|
test.setup{ Mysql.real_connect('localhost','root') } test.setup{ Mysql.real_connect('localhost','root') }
test.log_blocking_status = true
test.run!
end
EventedMysqlTest.new( 10 ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.c_async_query = true
test.run! test.run!
end end

View File

@ -10,11 +10,14 @@ class MysqlTest
:connections, :connections,
:connection_signature, :connection_signature,
:start, :start,
:done :done,
:c_async_query,
:log_blocking_status
def initialize( queries ) def initialize( queries )
@queries = queries @queries = queries
@done = [] @done = []
@c_async_query = false
yield self if block_given? yield self if block_given?
end end
@ -24,7 +27,12 @@ class MysqlTest
end end
def run! def run!
raise NotImplemented c_or_native_ruby_async_query do
with_blocking_status do
prepare
yield
end
end
end end
def prepare def prepare
@ -49,6 +57,28 @@ class MysqlTest
Time.now - @start Time.now - @start
end end
protected
def c_or_native_ruby_async_query
if @c_async_query
ENV['MYSQL_C_ASYNC_QUERY'] = '1'
log "** using C based async_query"
else
ENV['MYSQL_C_ASYNC_QUERY'] = '0'
log "** using native Ruby async_query"
end
yield
end
def with_blocking_status
if @log_blocking_status
ENV['MYSQL_BLOCKING_STATUS'] = '1'
else
ENV['MYSQL_BLOCKING_STATUS'] = '0'
end
yield
end
end end
class EventedMysqlTest < MysqlTest class EventedMysqlTest < MysqlTest
@ -73,18 +103,22 @@ class EventedMysqlTest < MysqlTest
end end
def run! def run!
prepare super do
catch :END_EVENT_LOOP do
loop do loop do
result = select( @sockets,nil,nil,nil ) result = select( @sockets,nil,nil,nil )
if result if result
result.first.each do |conn| result.first.each do |conn|
@connections[conn].get_result.each{|res| log( "Result for socket #{conn.fileno} : #{res}" ) } @connections[conn].get_result.each{|res| log( "Result for socket #{conn.fileno} : #{res}" ) }
@done << nil @done << nil
teardown if done? if done?
end teardown
end
end
end
end
end end
end end
end end
def prepare def prepare
@ -95,7 +129,7 @@ class EventedMysqlTest < MysqlTest
def teardown def teardown
log "done" log "done"
exit throw :END_EVENT_LOOP
end end
protected protected
@ -126,11 +160,11 @@ class ThreadedMysqlTest < MysqlTest
end end
def run! def run!
prepare super do
with_logging "waiting on threads" do
with_logging "waiting on threads" do @threads.each{|t| t.join }
@threads.each{|t| t.join } end
end end
end end
def prepare def prepare

View File

@ -2,5 +2,12 @@ require File.dirname(__FILE__) + '/test_helper'
ThreadedMysqlTest.new( 10 ) do |test| ThreadedMysqlTest.new( 10 ) do |test|
test.setup{ Mysql.real_connect('localhost','root') } test.setup{ Mysql.real_connect('localhost','root') }
test.log_blocking_status = true
test.run!
end
ThreadedMysqlTest.new( 10 ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.c_async_query = true
test.run! test.run!
end end