From 85141abf097a375520ca5cf17df4bccc48523d3e Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Thu, 2 Oct 2008 01:56:10 +0100 Subject: [PATCH 01/13] Validate async queries --- ext/extconf.rb | 2 +- ext/mysql.c | 171 ++++++++++++++++++++++++++++++++------- lib/mysqlplus.rb | 2 +- test/out_of_sync_test.rb | 26 ++++++ 4 files changed, 172 insertions(+), 29 deletions(-) create mode 100644 test/out_of_sync_test.rb diff --git a/ext/extconf.rb b/ext/extconf.rb index d14738e..73bef6d 100644 --- a/ext/extconf.rb +++ b/ext/extconf.rb @@ -71,4 +71,4 @@ File.open('error_const.h', 'w') do |f| end end -create_makefile("mysql") +create_makefile("mysql") \ No newline at end of file diff --git a/ext/mysql.c b/ext/mysql.c index b804723..c246f68 100644 --- a/ext/mysql.c +++ b/ext/mysql.c @@ -61,6 +61,7 @@ struct mysql { char connection; char query_with_result; char blocking; + int async_in_progress; }; struct mysql_res { @@ -231,6 +232,57 @@ static VALUE init(VALUE klass) return obj; } +static VALUE connection_identifier( VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + return mysql_thread_id( m ); +} + +static VALUE async_in_progress( VALUE obj ) +{ + struct mysql* m = GetMysqlStruct(obj); + return ( m->async_in_progress == connection_identifier(obj) ) ? Qtrue : Qfalse; +} + +static VALUE async_in_progress_set( VALUE obj, VALUE flag ) +{ + struct mysql* m = GetMysqlStruct(obj); + m->async_in_progress = (flag == Qnil || flag == Qfalse) ? 0 : connection_identifier(obj); + return flag; +} + +static void optimize_for_async( VALUE obj ) +{ + struct mysql* m = GetMysqlStruct(obj); + my_bool was_blocking; + vio_blocking(m->handler.net.vio, 0, &was_blocking); + m->blocking = vio_is_blocking( m->handler.net.vio ); + + vio_fastsend( m->handler.net.vio ); + async_in_progress_set( obj, Qfalse ); + /*last_connection_identifier(obj);*/ +} + +static void schedule_connect(VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + fd_set read; + + struct timeval tv = { tv_sec: m->options.connect_timeout, tv_usec: 0 }; + + if (rb_thread_select(0, NULL, NULL, NULL, &tv) < 0) { + rb_raise(eMysql, "connect: timeout"); + } +/* + FD_ZERO(&read); + FD_SET(m->net.fd, &read); + + if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) { + rb_raise(eMysql, "connect: timeout"); + } +*/ +} + /* real_connect(host=nil, user=nil, passwd=nil, db=nil, port=nil, sock=nil, flag=nil) */ static VALUE real_connect(int argc, VALUE* argv, VALUE klass) { @@ -266,19 +318,16 @@ static VALUE real_connect(int argc, VALUE* argv, VALUE klass) if (mysql_real_connect(&myp->handler, h, u, p, pp, s) == NULL) #endif mysql_raise(&myp->handler); - + myp->handler.reconnect = 0; myp->connection = Qtrue; - my_bool was_blocking; - - vio_blocking(myp->handler.net.vio, 0, &was_blocking); - myp->blocking = vio_is_blocking( myp->handler.net.vio ); - - vio_fastsend( myp->handler.net.vio ); + optimize_for_async(obj); myp->query_with_result = Qtrue; rb_obj_call_init(obj, argc, argv); + + schedule_connect(obj); return obj; } @@ -341,6 +390,9 @@ static VALUE real_connect2(int argc, VALUE* argv, VALUE obj) mysql_raise(m); m->reconnect = 0; GetMysqlStruct(obj)->connection = Qtrue; + + optimize_for_async(obj); + schedule_connect(obj); return obj; } @@ -764,13 +816,6 @@ static VALUE socket(VALUE obj) MYSQL* m = GetHandler(obj); return INT2NUM(m->net.fd); } -/* socket_type */ -static VALUE socket_type(VALUE obj) -{ - MYSQL* m = GetHandler(obj); - VALUE description = vio_description( m->net.vio ); - return NILorSTRING( description ); -} /* blocking */ static VALUE blocking(VALUE obj){ @@ -793,18 +838,81 @@ static VALUE readable( int argc, VALUE* argv, VALUE obj ) return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse ); } +/* ready */ +static VALUE ready( VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + return ( m->status == MYSQL_STATUS_READY ) ? Qtrue : Qfalse; +} + +/* retry */ +static VALUE retry( VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + return ( vio_should_retry( m->net.vio ) == 1 ? Qtrue : Qfalse ); +} + +/* interrupted */ +static VALUE interrupted( VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + return ( vio_was_interrupted( m->net.vio ) == 1 ? Qtrue : Qfalse ); +} + +static void validate_async_query( VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + + if( async_in_progress(obj) == Qtrue ){ + async_in_progress_set(obj, Qfalse); + rb_raise(eMysql, "Query out of sequence: Each call to Mysql#send_query requires a successive Mysql#get_result."); + } +} + +/* +static VALUE connection_dropped( VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + if( (mysql_errno(m) == CR_SERVER_LOST ) || ( mysql_errno(m) == CR_SERVER_GONE_ERROR ) || ( mysql_errno(m) == ER_SERVER_SHUTDOWN) ){ + return Qtrue; + }else{ + return Qfalse; + } +} +*/ + +static void async_reconnect( VALUE obj ) +{ + connect(obj); +} + +static VALUE simulate_disconnect( VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + mysql_library_end(); + return Qnil; +} + /* send_query(sql) */ static VALUE send_query(VALUE obj, VALUE sql) { MYSQL* m = GetHandler(obj); - + Check_Type(sql, T_STRING); - if (GetMysqlStruct(obj)->connection == Qfalse) { - rb_raise(eMysql, "query: not connected"); + + if (GetMysqlStruct(obj)->connection == Qfalse && async_in_progress(obj) == Qtrue ) { + rb_raise(eMysql, "query: not connected"); } - if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0) - mysql_raise(m); - return Qnil; + + validate_async_query(obj); + + if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0){ + mysql_raise(m); + /*( connection_dropped(obj) == Qtrue ) ? async_reconnect(obj) : mysql_raise(m);*/ + } + + async_in_progress_set( obj, Qtrue ); + return Qnil; } /* get_result */ @@ -814,16 +922,17 @@ static VALUE get_result(VALUE obj) if (GetMysqlStruct(obj)->connection == Qfalse) { rb_raise(eMysql, "query: not connected"); } - if (mysql_read_query_result(m) != 0) - mysql_raise(m); + if (mysql_read_query_result(m) != 0) + mysql_raise(m); if (GetMysqlStruct(obj)->query_with_result == Qfalse) return obj; if (mysql_field_count(m) == 0) - return Qnil; + return Qnil; + async_in_progress_set( obj, Qfalse ); return store_result(obj); } -static VALUE schedule(VALUE obj, VALUE timeout) +static void schedule_query(VALUE obj, VALUE timeout) { MYSQL* m = GetHandler(obj); fd_set read; @@ -838,7 +947,6 @@ static VALUE schedule(VALUE obj, VALUE timeout) if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) { rb_raise(eMysql, "query: timeout"); } - } /* async_query(sql,timeout=nil) */ @@ -849,9 +957,13 @@ static VALUE async_query(int argc, VALUE* argv, VALUE obj) rb_scan_args(argc, argv, "11", &sql, &timeout); + /*last_connection_identifier( obj );*/ + + async_in_progress_set( obj, Qfalse ); + send_query(obj,sql); - schedule(obj, timeout); + schedule_query(obj, timeout); return get_result(obj); } @@ -2144,12 +2256,17 @@ void Init_mysql(void) rb_define_method(cMysql, "query", query, 1); rb_define_method(cMysql, "real_query", query, 1); rb_define_method(cMysql, "c_async_query", async_query, -1); + rb_define_method(cMysql, "async_in_progress?", async_in_progress, 0); + rb_define_method(cMysql, "async_in_progress=", async_in_progress_set, 1); rb_define_method(cMysql, "send_query", send_query, 1); + rb_define_method(cMysql, "simulate_disconnect", simulate_disconnect, 0); rb_define_method(cMysql, "get_result", get_result, 0); rb_define_method(cMysql, "readable?", readable, -1); + rb_define_method(cMysql, "retry?", retry, 0); + rb_define_method(cMysql, "ready?", ready, 0); + rb_define_method(cMysql, "interrupted?", interrupted, 0); rb_define_method(cMysql, "blocking?", blocking, 0); rb_define_method(cMysql, "socket", socket, 0); - rb_define_method(cMysql, "socket_type", socket_type, 0); rb_define_method(cMysql, "refresh", refresh, 1); rb_define_method(cMysql, "reload", reload, 0); rb_define_method(cMysql, "select_db", select_db, 1); diff --git a/lib/mysqlplus.rb b/lib/mysqlplus.rb index 95fa6c4..3b7faaa 100644 --- a/lib/mysqlplus.rb +++ b/lib/mysqlplus.rb @@ -16,4 +16,4 @@ class Mysql::Result each_hash { |row| rows << row } rows end -end +end \ No newline at end of file diff --git a/test/out_of_sync_test.rb b/test/out_of_sync_test.rb new file mode 100644 index 0000000..f11db88 --- /dev/null +++ b/test/out_of_sync_test.rb @@ -0,0 +1,26 @@ +require File.dirname(__FILE__) + '/test_helper' + +m = Mysql.real_connect('localhost','root') +m.reconnect = true + +class << m + + def safe_query( query ) + begin + send_query( query ) + rescue => e + puts e.message + end + end + +end + +m.safe_query( 'select sleep(1)' ) +m.safe_query( 'select sleep(1)' )#raises +m.simulate_disconnect #fires mysql_library_end +m.safe_query( 'select sleep(1)' ) +m.safe_query( 'select sleep(1)' )#raises +m.close +m.connect('localhost','root') +m.safe_query( 'select sleep(1)' ) +m.safe_query( 'select sleep(1)' )#raises \ No newline at end of file From 4e5967bd24d832a26e89de6aa578205c3dde2a0c Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Mon, 6 Oct 2008 23:32:43 +0100 Subject: [PATCH 02/13] Do not schedule && retrieve the result if #query_with_result is false --- ext/mysql.c | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/ext/mysql.c b/ext/mysql.c index c246f68..8dccba0 100644 --- a/ext/mysql.c +++ b/ext/mysql.c @@ -260,7 +260,6 @@ static void optimize_for_async( VALUE obj ) vio_fastsend( m->handler.net.vio ); async_in_progress_set( obj, Qfalse ); - /*last_connection_identifier(obj);*/ } static void schedule_connect(VALUE obj ) @@ -838,13 +837,6 @@ static VALUE readable( int argc, VALUE* argv, VALUE obj ) return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse ); } -/* ready */ -static VALUE ready( VALUE obj ) -{ - MYSQL* m = GetHandler(obj); - return ( m->status == MYSQL_STATUS_READY ) ? Qtrue : Qfalse; -} - /* retry */ static VALUE retry( VALUE obj ) { @@ -924,11 +916,13 @@ static VALUE get_result(VALUE obj) } if (mysql_read_query_result(m) != 0) mysql_raise(m); + + async_in_progress_set( obj, Qfalse ); + if (GetMysqlStruct(obj)->query_with_result == Qfalse) return obj; if (mysql_field_count(m) == 0) - return Qnil; - async_in_progress_set( obj, Qfalse ); + return Qnil; return store_result(obj); } @@ -957,15 +951,17 @@ static VALUE async_query(int argc, VALUE* argv, VALUE obj) rb_scan_args(argc, argv, "11", &sql, &timeout); - /*last_connection_identifier( obj );*/ - async_in_progress_set( obj, Qfalse ); send_query(obj,sql); - schedule_query(obj, timeout); - - return get_result(obj); + if (GetMysqlStruct(obj)->query_with_result == Qfalse){ + async_in_progress_set( obj, Qfalse ); + return obj; + } else { + schedule_query(obj, timeout); + return get_result(obj); + } } #if MYSQL_VERSION_ID >= 40100 @@ -2263,7 +2259,6 @@ void Init_mysql(void) rb_define_method(cMysql, "get_result", get_result, 0); rb_define_method(cMysql, "readable?", readable, -1); rb_define_method(cMysql, "retry?", retry, 0); - rb_define_method(cMysql, "ready?", ready, 0); rb_define_method(cMysql, "interrupted?", interrupted, 0); rb_define_method(cMysql, "blocking?", blocking, 0); rb_define_method(cMysql, "socket", socket, 0); From f57c2a65761cb2cbf5cccca231fed86c9ad0cbfc Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Wed, 8 Oct 2008 05:17:01 +0100 Subject: [PATCH 03/13] Let #c_async_query support a block syntax that yields the result; ensure async in progress is negated by #get_result --- ext/mysql.c | 48 ++++++++++------------------- test/async_query_with_block_test.rb | 7 +++++ 2 files changed, 23 insertions(+), 32 deletions(-) create mode 100644 test/async_query_with_block_test.rb diff --git a/ext/mysql.c b/ext/mysql.c index 8dccba0..bfbc27e 100644 --- a/ext/mysql.c +++ b/ext/mysql.c @@ -861,23 +861,6 @@ static void validate_async_query( VALUE obj ) } } -/* -static VALUE connection_dropped( VALUE obj ) -{ - MYSQL* m = GetHandler(obj); - if( (mysql_errno(m) == CR_SERVER_LOST ) || ( mysql_errno(m) == CR_SERVER_GONE_ERROR ) || ( mysql_errno(m) == ER_SERVER_SHUTDOWN) ){ - return Qtrue; - }else{ - return Qfalse; - } -} -*/ - -static void async_reconnect( VALUE obj ) -{ - connect(obj); -} - static VALUE simulate_disconnect( VALUE obj ) { MYSQL* m = GetHandler(obj); @@ -900,7 +883,6 @@ static VALUE send_query(VALUE obj, VALUE sql) if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0){ mysql_raise(m); - /*( connection_dropped(obj) == Qtrue ) ? async_reconnect(obj) : mysql_raise(m);*/ } async_in_progress_set( obj, Qtrue ); @@ -911,13 +893,14 @@ static VALUE send_query(VALUE obj, VALUE sql) static VALUE get_result(VALUE obj) { MYSQL* m = GetHandler(obj); + + async_in_progress_set( obj, Qfalse ); + if (GetMysqlStruct(obj)->connection == Qfalse) { rb_raise(eMysql, "query: not connected"); } if (mysql_read_query_result(m) != 0) mysql_raise(m); - - async_in_progress_set( obj, Qfalse ); if (GetMysqlStruct(obj)->query_with_result == Qfalse) return obj; @@ -946,22 +929,23 @@ static void schedule_query(VALUE obj, VALUE timeout) /* async_query(sql,timeout=nil) */ static VALUE async_query(int argc, VALUE* argv, VALUE obj) { - MYSQL* m = GetHandler(obj); - VALUE sql, timeout; + MYSQL* m = GetHandler(obj); + VALUE sql, timeout; - rb_scan_args(argc, argv, "11", &sql, &timeout); + rb_scan_args(argc, argv, "11", &sql, &timeout); - async_in_progress_set( obj, Qfalse ); - - send_query(obj,sql); - - if (GetMysqlStruct(obj)->query_with_result == Qfalse){ async_in_progress_set( obj, Qfalse ); - return obj; - } else { + + send_query(obj,sql); + schedule_query(obj, timeout); - return get_result(obj); - } + + if (rb_block_given_p()) { + rb_yield( get_result(obj) ); + return obj; + }else{ + return get_result(obj); + } } #if MYSQL_VERSION_ID >= 40100 diff --git a/test/async_query_with_block_test.rb b/test/async_query_with_block_test.rb new file mode 100644 index 0000000..06e1369 --- /dev/null +++ b/test/async_query_with_block_test.rb @@ -0,0 +1,7 @@ +require File.dirname(__FILE__) + '/test_helper' + +m = Mysql.real_connect('localhost','root') + +m.c_async_query( 'SELECT SLEEP(1)' ) do |result| + puts result.inspect +end \ No newline at end of file From 8e6f300f9db75f07c03049efd00a6b91fe16538d Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Wed, 8 Oct 2008 17:09:47 +0100 Subject: [PATCH 04/13] Introduce Mysql#reconnected? --- ext/mysql.c | 9 +++++++++ test/async_query_with_block_test.rb | 4 ++-- test/reconnected_test.rb | 18 ++++++++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 test/reconnected_test.rb diff --git a/ext/mysql.c b/ext/mysql.c index bfbc27e..4b1fb44 100644 --- a/ext/mysql.c +++ b/ext/mysql.c @@ -851,6 +851,14 @@ static VALUE interrupted( VALUE obj ) return ( vio_was_interrupted( m->net.vio ) == 1 ? Qtrue : Qfalse ); } +/* reconnected */ +static VALUE reconnected( VALUE obj ){ + MYSQL* m = GetHandler(obj); + int current_connection_id = mysql_thread_id( m ); + mysql_ping(m); + return ( current_connection_id == mysql_thread_id( m ) ) ? Qfalse : Qtrue; +} + static void validate_async_query( VALUE obj ) { MYSQL* m = GetHandler(obj); @@ -2240,6 +2248,7 @@ void Init_mysql(void) rb_define_method(cMysql, "async_in_progress=", async_in_progress_set, 1); rb_define_method(cMysql, "send_query", send_query, 1); rb_define_method(cMysql, "simulate_disconnect", simulate_disconnect, 0); + rb_define_method(cMysql, "reconnected?", reconnected, 0); rb_define_method(cMysql, "get_result", get_result, 0); rb_define_method(cMysql, "readable?", readable, -1); rb_define_method(cMysql, "retry?", retry, 0); diff --git a/test/async_query_with_block_test.rb b/test/async_query_with_block_test.rb index 06e1369..7c5a216 100644 --- a/test/async_query_with_block_test.rb +++ b/test/async_query_with_block_test.rb @@ -1,7 +1,7 @@ require File.dirname(__FILE__) + '/test_helper' -m = Mysql.real_connect('localhost','root') +m = Mysql.real_connect('localhost','root','','mysql') -m.c_async_query( 'SELECT SLEEP(1)' ) do |result| +m.c_async_query( 'SELECT * FROM user' ) do |result| puts result.inspect end \ No newline at end of file diff --git a/test/reconnected_test.rb b/test/reconnected_test.rb new file mode 100644 index 0000000..f4f0154 --- /dev/null +++ b/test/reconnected_test.rb @@ -0,0 +1,18 @@ +require File.dirname(__FILE__) + '/test_helper' + +$m = Mysql.real_connect('localhost','root') +#$m.reconnect = true + +def assert_reconnected + puts $m.reconnected?().inspect + sleep 1 + yield + puts $m.reconnected?().inspect +end + +assert_reconnected do + $m.simulate_disconnect +end +assert_reconnected do + $m.close +end \ No newline at end of file From 35d2545c17f385a9cf524bfea39606524d75c009 Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Wed, 8 Oct 2008 22:11:14 +0100 Subject: [PATCH 05/13] Selective enable || disable GC for result retrieval with Mysql#disable_gc = true|false --- ext/mysql.c | 37 ++++++++++++++++++++++++++++--------- test/gc_benchmark.rb | 18 ++++++++++++++++++ 2 files changed, 46 insertions(+), 9 deletions(-) create mode 100644 test/gc_benchmark.rb diff --git a/ext/mysql.c b/ext/mysql.c index 4b1fb44..b14cae1 100644 --- a/ext/mysql.c +++ b/ext/mysql.c @@ -60,6 +60,7 @@ struct mysql { MYSQL handler; char connection; char query_with_result; + char gc_disabled; char blocking; int async_in_progress; }; @@ -181,7 +182,7 @@ static void mysql_raise(MYSQL* m) rb_exc_raise(e); } -static VALUE mysqlres2obj(MYSQL_RES* res) +static VALUE mysqlres2obj(MYSQL_RES* res, VALUE gc_disabled) { VALUE obj; struct mysql_res* resp; @@ -191,8 +192,9 @@ static VALUE mysqlres2obj(MYSQL_RES* res) resp->res = res; resp->freed = Qfalse; rb_obj_call_init(obj, 0, NULL); - if (++store_result_count > GC_STORE_RESULT_LIMIT) - rb_gc(); + if (++store_result_count > GC_STORE_RESULT_LIMIT && gc_disabled == Qfalse){ + rb_gc(); + } return obj; } @@ -228,6 +230,7 @@ static VALUE init(VALUE klass) mysql_init(&myp->handler); myp->connection = Qfalse; myp->query_with_result = Qtrue; + myp->gc_disabled = Qfalse; rb_obj_call_init(obj, 0, NULL); return obj; } @@ -646,7 +649,7 @@ static VALUE list_fields(int argc, VALUE* argv, VALUE obj) res = mysql_list_fields(m, StringValuePtr(table), NILorSTRING(field)); if (res == NULL) mysql_raise(m); - return mysqlres2obj(res); + return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled); } /* list_processes() */ @@ -656,7 +659,7 @@ static VALUE list_processes(VALUE obj) MYSQL_RES* res = mysql_list_processes(m); if (res == NULL) mysql_raise(m); - return mysqlres2obj(res); + return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled); } /* list_tables(table=nil) */ @@ -750,7 +753,7 @@ static VALUE store_result(VALUE obj) MYSQL_RES* res = mysql_store_result(m); if (res == NULL) mysql_raise(m); - return mysqlres2obj(res); + return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled); } /* thread_id() */ @@ -766,7 +769,7 @@ static VALUE use_result(VALUE obj) MYSQL_RES* res = mysql_use_result(m); if (res == NULL) mysql_raise(m); - return mysqlres2obj(res); + return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled); } static VALUE res_free(VALUE); @@ -788,7 +791,7 @@ static VALUE query(VALUE obj, VALUE sql) if (mysql_field_count(m) != 0) mysql_raise(m); } else { - VALUE robj = mysqlres2obj(res); + VALUE robj = mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled); rb_ensure(rb_yield, robj, res_free, robj); } #if MYSQL_VERSION_ID >= 40101 @@ -859,6 +862,20 @@ static VALUE reconnected( VALUE obj ){ return ( current_connection_id == mysql_thread_id( m ) ) ? Qfalse : Qtrue; } +/* disable_gc(true|false) */ +static VALUE disable_gc_set(VALUE obj, VALUE flag) +{ + if (TYPE(flag) != T_TRUE && TYPE(flag) != T_FALSE) + rb_raise(rb_eTypeError, "invalid type, required true or false."); + GetMysqlStruct(obj)->gc_disabled = flag; + return flag; +} + +/* gc_disabled */ +static VALUE gc_disabled( VALUE obj ){ + return GetMysqlStruct(obj)->gc_disabled ? Qtrue: Qfalse; +} + static void validate_async_query( VALUE obj ) { MYSQL* m = GetHandler(obj); @@ -1986,7 +2003,7 @@ static VALUE stmt_result_metadata(VALUE obj) mysql_stmt_raise(s->stmt); return Qnil; } - return mysqlres2obj(res); + return mysqlres2obj(res, Qfalse); } /* row_seek(offset) */ @@ -2254,6 +2271,8 @@ void Init_mysql(void) rb_define_method(cMysql, "retry?", retry, 0); rb_define_method(cMysql, "interrupted?", interrupted, 0); rb_define_method(cMysql, "blocking?", blocking, 0); + rb_define_method(cMysql, "gc_disabled?", gc_disabled, 0); + rb_define_method(cMysql, "disable_gc=", disable_gc_set, 1); rb_define_method(cMysql, "socket", socket, 0); rb_define_method(cMysql, "refresh", refresh, 1); rb_define_method(cMysql, "reload", reload, 0); diff --git a/test/gc_benchmark.rb b/test/gc_benchmark.rb new file mode 100644 index 0000000..a2c859f --- /dev/null +++ b/test/gc_benchmark.rb @@ -0,0 +1,18 @@ +require 'rubygems' +require 'mysqlplus' +require 'benchmark' + +with_gc = Mysql.real_connect('localhost','root','','mysql') +without_gc = Mysql.real_connect('localhost','root','','mysql') +without_gc.disable_gc = true + +n = 1000 +Benchmark.bm do |x| + x.report( 'With GC' ) do + n.times{ with_gc.c_async_query( 'SELECT * FROM user' ) } + end + GC.start + x.report( 'Without GC' ) do + n.times{ without_gc.c_async_query( 'SELECT * FROM user' ) } + end +end \ No newline at end of file From a9fea270f57544ce5afa6dc9eb2019dea17c7f48 Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Thu, 9 Oct 2008 02:01:57 +0100 Subject: [PATCH 06/13] Better GC stats --- test/gc_benchmark.rb | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/test/gc_benchmark.rb b/test/gc_benchmark.rb index a2c859f..0f3b397 100644 --- a/test/gc_benchmark.rb +++ b/test/gc_benchmark.rb @@ -6,13 +6,36 @@ with_gc = Mysql.real_connect('localhost','root','','mysql') without_gc = Mysql.real_connect('localhost','root','','mysql') without_gc.disable_gc = true +$gc_stats = [] + +def countable_gc? + GC.respond_to? :count +end + +def gc_counts( label, scope ) + $gc_stats << "GC #{scope} ( #{label} ) #{GC.count}" +end + +def with_gc_counts( label ) + gc_counts( label, 'before' ) if countable_gc? + yield + gc_counts( label, 'after' ) if countable_gc? +end + n = 1000 + Benchmark.bm do |x| x.report( 'With GC' ) do - n.times{ with_gc.c_async_query( 'SELECT * FROM user' ) } + with_gc_counts( 'With GC' ) do + n.times{ with_gc.c_async_query( 'SELECT * FROM user' ) } + end end GC.start x.report( 'Without GC' ) do - n.times{ without_gc.c_async_query( 'SELECT * FROM user' ) } + with_gc_counts( 'Without GC' ) do + n.times{ without_gc.c_async_query( 'SELECT * FROM user' ) } + end end -end \ No newline at end of file +end + +puts $gc_stats.join( ' | ' ) \ No newline at end of file From e81e145c15e693850dd6180ae6547ae74068a6eb Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Thu, 9 Oct 2008 02:13:40 +0100 Subject: [PATCH 07/13] Prefer Objects to GC as label --- test/gc_benchmark.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/gc_benchmark.rb b/test/gc_benchmark.rb index 0f3b397..e5fc7f5 100644 --- a/test/gc_benchmark.rb +++ b/test/gc_benchmark.rb @@ -13,7 +13,7 @@ def countable_gc? end def gc_counts( label, scope ) - $gc_stats << "GC #{scope} ( #{label} ) #{GC.count}" + $gc_stats << "Objects #{scope} ( #{label} ) #{GC.count}" end def with_gc_counts( label ) @@ -24,7 +24,7 @@ end n = 1000 -Benchmark.bm do |x| +Benchmark.bmbm do |x| x.report( 'With GC' ) do with_gc_counts( 'With GC' ) do n.times{ with_gc.c_async_query( 'SELECT * FROM user' ) } From 9d66a3b71e91688e4b4a90d056dbc8bbbb88dc7f Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Sun, 19 Oct 2008 23:04:33 +0100 Subject: [PATCH 08/13] Piggy back schedule loop on MYSQL_STATUS_READY --- ext/mysql.c | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/ext/mysql.c b/ext/mysql.c index b14cae1..61b4b05 100644 --- a/ext/mysql.c +++ b/ext/mysql.c @@ -938,16 +938,28 @@ static void schedule_query(VALUE obj, VALUE timeout) { MYSQL* m = GetHandler(obj); fd_set read; - + int ret; + timeout = ( NIL_P(timeout) ? m->net.read_timeout : INT2NUM(timeout) ); struct timeval tv = { tv_sec: timeout, tv_usec: 0 }; - FD_ZERO(&read); - FD_SET(m->net.fd, &read); + for(;;){ + FD_ZERO(&read); + FD_SET(m->net.fd, &read); + + ret = rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv); + if (ret < 0) { + rb_raise(eMysql, "query: timeout"); + } + + if (ret == 0) { + continue; + } - if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) { - rb_raise(eMysql, "query: timeout"); + if (m->status == MYSQL_STATUS_READY){ + break; + } } } From 1af85383ebc32f7f189f37b91e5932a7c398a034 Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Sun, 19 Oct 2008 23:17:42 +0100 Subject: [PATCH 09/13] Applied Roger's async validation patch, with a minimal test case --- ext/mysql.c | 46 ++++++++++++++++++++++++++++++++++++---- test/out_of_sync_test.rb | 7 +++++- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/ext/mysql.c b/ext/mysql.c index 61b4b05..ce5f412 100644 --- a/ext/mysql.c +++ b/ext/mysql.c @@ -888,9 +888,32 @@ static void validate_async_query( VALUE obj ) static VALUE simulate_disconnect( VALUE obj ) { - MYSQL* m = GetHandler(obj); - mysql_library_end(); - return Qnil; + MYSQL* m = GetHandler(obj); + mysql_library_end(); + return Qnil; +} + +static int begins_with_insensitive(char *candidate, char *check_for_in_upper_case) +{ + /* skip opening whitespace --tab is 11, newline is 12, cr is 15, space 32 */ + char *where_at = candidate; + while( ((*where_at >= 11 && *where_at <= 15) || (*where_at == 32)) && (where_at != 0)) + where_at++; + + char *where_at_in_test = check_for_in_upper_case; + while(*where_at_in_test) + { + int candidate_char = *where_at; + if(candidate_char == 0) + return 0; /* end of line */ + if(candidate_char >= 97 && candidate_char < 122) /* then it's upper case --lower case ify it */ + candidate_char -= 32; + if(candidate_char != *where_at_in_test) + return 0; + where_at_in_test++; + where_at++; + } + return 1; } /* send_query(sql) */ @@ -910,7 +933,22 @@ static VALUE send_query(VALUE obj, VALUE sql) mysql_raise(m); } - async_in_progress_set( obj, Qtrue ); + /* what about http://dev.mysql.com/doc/refman/5.0/en/implicit-commit.html and more? */ + if( + begins_with_insensitive(RSTRING_PTR(sql), "SET ") || + begins_with_insensitive(RSTRING_PTR(sql), "BEGIN") || + begins_with_insensitive(RSTRING_PTR(sql), "START TRANSACTION") || + begins_with_insensitive(RSTRING_PTR(sql), "ROLLBACK") || + begins_with_insensitive(RSTRING_PTR(sql), "LOCK ") || + begins_with_insensitive(RSTRING_PTR(sql), "UNLOCK ") || + begins_with_insensitive(RSTRING_PTR(sql), "USE ") || + begins_with_insensitive(RSTRING_PTR(sql), "COMMIT") ) + { + /* do not mark an async in progress --they used send_query for something that doesn't necessarily have a result--is this allowable? */ + async_in_progress_set( obj, Qfalse ); + } else { + async_in_progress_set( obj, Qtrue ); + } return Qnil; } diff --git a/test/out_of_sync_test.rb b/test/out_of_sync_test.rb index f11db88..5c60a43 100644 --- a/test/out_of_sync_test.rb +++ b/test/out_of_sync_test.rb @@ -23,4 +23,9 @@ m.safe_query( 'select sleep(1)' )#raises m.close m.connect('localhost','root') m.safe_query( 'select sleep(1)' ) -m.safe_query( 'select sleep(1)' )#raises \ No newline at end of file +m.safe_query( 'select sleep(1)' )#raises +m.simulate_disconnect +m.safe_query( 'BEGIN' ) +m.safe_query( 'select sleep(1)' ) +m.get_result() +m.safe_query( 'COMMIT' ) \ No newline at end of file From 48a61dd627a6498e9c612a8e6d21d98c44d3a96d Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Mon, 20 Oct 2008 01:46:03 +0100 Subject: [PATCH 10/13] Introduce Mysql#idle? && Mysql#busy? as Threaded connection pool helpers for Mysql#c_async_query --- ext/mysql.c | 48 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/ext/mysql.c b/ext/mysql.c index ce5f412..a9ba048 100644 --- a/ext/mysql.c +++ b/ext/mysql.c @@ -63,6 +63,7 @@ struct mysql { char gc_disabled; char blocking; int async_in_progress; + char busy; }; struct mysql_res { @@ -821,7 +822,33 @@ static VALUE socket(VALUE obj) /* blocking */ static VALUE blocking(VALUE obj){ - return ( GetMysqlStruct(obj)->blocking ? Qtrue : Qfalse ); + return ( GetMysqlStruct(obj)->blocking ? Qtrue : Qfalse ); +} + +/* is_busy */ +static VALUE is_busy(VALUE obj){ + return ( GetMysqlStruct(obj)->busy ? Qtrue : Qfalse ); +} + +static VALUE is_idle(VALUE obj){ + return ( is_busy(obj) == Qtrue ) ? Qfalse : Qtrue; +} + +/* busy(true|false) */ +static VALUE busy_set(VALUE obj, VALUE flag) +{ + if (TYPE(flag) != T_TRUE && TYPE(flag) != T_FALSE) + rb_raise(rb_eTypeError, "invalid type, required true or false."); + GetMysqlStruct(obj)->busy = flag; + return flag; +} + +static void busy( VALUE obj ){ + busy_set( obj, Qtrue ); +} + +static void idle( VALUE obj ){ + busy_set( obj, Qfalse ); } /* readable(timeout=nil) */ @@ -924,12 +951,14 @@ static VALUE send_query(VALUE obj, VALUE sql) Check_Type(sql, T_STRING); if (GetMysqlStruct(obj)->connection == Qfalse && async_in_progress(obj) == Qtrue ) { + idle( obj ); rb_raise(eMysql, "query: not connected"); } validate_async_query(obj); if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0){ + idle( obj ); mysql_raise(m); } @@ -960,10 +989,13 @@ static VALUE get_result(VALUE obj) async_in_progress_set( obj, Qfalse ); if (GetMysqlStruct(obj)->connection == Qfalse) { - rb_raise(eMysql, "query: not connected"); + idle( obj ); + rb_raise(eMysql, "query: not connected"); } - if (mysql_read_query_result(m) != 0) - mysql_raise(m); + if (mysql_read_query_result(m) != 0){ + idle( obj ); + mysql_raise(m); + } if (GetMysqlStruct(obj)->query_with_result == Qfalse) return obj; @@ -988,6 +1020,7 @@ static void schedule_query(VALUE obj, VALUE timeout) ret = rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv); if (ret < 0) { + idle( obj ); rb_raise(eMysql, "query: timeout"); } @@ -1011,14 +1044,18 @@ static VALUE async_query(int argc, VALUE* argv, VALUE obj) async_in_progress_set( obj, Qfalse ); + busy(obj); + send_query(obj,sql); schedule_query(obj, timeout); if (rb_block_given_p()) { rb_yield( get_result(obj) ); + idle( obj ); return obj; }else{ + idle( obj ); return get_result(obj); } } @@ -2323,6 +2360,9 @@ void Init_mysql(void) rb_define_method(cMysql, "blocking?", blocking, 0); rb_define_method(cMysql, "gc_disabled?", gc_disabled, 0); rb_define_method(cMysql, "disable_gc=", disable_gc_set, 1); + rb_define_method(cMysql, "busy?", is_busy, 0); + rb_define_method(cMysql, "idle?", is_idle, 0); + rb_define_method(cMysql, "busy=", busy_set, 1); rb_define_method(cMysql, "socket", socket, 0); rb_define_method(cMysql, "refresh", refresh, 1); rb_define_method(cMysql, "reload", reload, 0); From 4640893f27f55c67ab0836438c54e324791cf641 Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Fri, 31 Oct 2008 15:09:33 +0000 Subject: [PATCH 11/13] Do not schedule in a single threaded environment --- ext/mysql.c | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/ext/mysql.c b/ext/mysql.c index a9ba048..98656b8 100644 --- a/ext/mysql.c +++ b/ext/mysql.c @@ -913,11 +913,10 @@ static void validate_async_query( VALUE obj ) } } -static VALUE simulate_disconnect( VALUE obj ) +static void simulate_disconnect( VALUE obj ) { MYSQL* m = GetHandler(obj); mysql_library_end(); - return Qnil; } static int begins_with_insensitive(char *candidate, char *check_for_in_upper_case) @@ -947,7 +946,7 @@ static int begins_with_insensitive(char *candidate, char *check_for_in_upper_cas static VALUE send_query(VALUE obj, VALUE sql) { MYSQL* m = GetHandler(obj); - + Check_Type(sql, T_STRING); if (GetMysqlStruct(obj)->connection == Qfalse && async_in_progress(obj) == Qtrue ) { @@ -978,6 +977,7 @@ static VALUE send_query(VALUE obj, VALUE sql) } else { async_in_progress_set( obj, Qtrue ); } + return Qnil; } @@ -1001,7 +1001,7 @@ static VALUE get_result(VALUE obj) return obj; if (mysql_field_count(m) == 0) return Qnil; - return store_result(obj); + return store_result(obj); } static void schedule_query(VALUE obj, VALUE timeout) @@ -1034,6 +1034,10 @@ static void schedule_query(VALUE obj, VALUE timeout) } } +static int should_schedule_query(){ + return ( ( rb_thread_main() == rb_thread_current() ) && rb_thread_alone() ); +} + /* async_query(sql,timeout=nil) */ static VALUE async_query(int argc, VALUE* argv, VALUE obj) { @@ -1046,10 +1050,12 @@ static VALUE async_query(int argc, VALUE* argv, VALUE obj) busy(obj); - send_query(obj,sql); + send_query( obj, sql ); + + if ( should_schedule_query() != 1 ){ + schedule_query(obj, timeout); + } - schedule_query(obj, timeout); - if (rb_block_given_p()) { rb_yield( get_result(obj) ); idle( obj ); From 57712a64e67d30277978808e128f40cc251fa8ea Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Fri, 31 Oct 2008 15:48:28 +0000 Subject: [PATCH 12/13] Conditional schedule cleanup --- ext/mysql.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ext/mysql.c b/ext/mysql.c index 98656b8..84375c0 100644 --- a/ext/mysql.c +++ b/ext/mysql.c @@ -1035,7 +1035,7 @@ static void schedule_query(VALUE obj, VALUE timeout) } static int should_schedule_query(){ - return ( ( rb_thread_main() == rb_thread_current() ) && rb_thread_alone() ); + return ( ( rb_thread_main() == rb_thread_current() ) && rb_thread_alone() ) != 1; } /* async_query(sql,timeout=nil) */ @@ -1052,7 +1052,7 @@ static VALUE async_query(int argc, VALUE* argv, VALUE obj) send_query( obj, sql ); - if ( should_schedule_query() != 1 ){ + if ( should_schedule_query() ){ schedule_query(obj, timeout); } From 79f742908dd9ce1c4b26548a96967fd20aacd4a2 Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Sat, 1 Nov 2008 17:12:03 +0000 Subject: [PATCH 13/13] Use rb_thread_alone exclusively to determine if the query should be Thread scheduled --- ext/mysql.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/mysql.c b/ext/mysql.c index 84375c0..222e224 100644 --- a/ext/mysql.c +++ b/ext/mysql.c @@ -1035,7 +1035,7 @@ static void schedule_query(VALUE obj, VALUE timeout) } static int should_schedule_query(){ - return ( ( rb_thread_main() == rb_thread_current() ) && rb_thread_alone() ) != 1; + return rb_thread_alone() != 1; } /* async_query(sql,timeout=nil) */