diff --git a/ext/extconf.rb b/ext/extconf.rb index b200c2c..3c84c4f 100644 --- a/ext/extconf.rb +++ b/ext/extconf.rb @@ -87,4 +87,4 @@ File.open('error_const.h', 'w') do |f| end end -create_makefile("mysql") +create_makefile("mysql") \ No newline at end of file diff --git a/ext/mysql.c b/ext/mysql.c index fc22491..2bd5904 100644 --- a/ext/mysql.c +++ b/ext/mysql.c @@ -61,7 +61,10 @@ struct mysql { MYSQL handler; char connection; char query_with_result; + char gc_disabled; char blocking; + int async_in_progress; + char busy; }; struct mysql_res { @@ -181,7 +184,7 @@ static void mysql_raise(MYSQL* m) rb_exc_raise(e); } -static VALUE mysqlres2obj(MYSQL_RES* res) +static VALUE mysqlres2obj(MYSQL_RES* res, VALUE gc_disabled) { VALUE obj; struct mysql_res* resp; @@ -191,10 +194,16 @@ static VALUE mysqlres2obj(MYSQL_RES* res) resp->res = res; resp->freed = Qfalse; rb_obj_call_init(obj, 0, NULL); +<<<<<<< HEAD:ext/mysql.c /* disabled until it can be reviewed further--rely on the normal GC for now. if (++store_result_count > GC_STORE_RESULT_LIMIT) rb_gc(); */ +======= + if (++store_result_count > GC_STORE_RESULT_LIMIT && gc_disabled == Qfalse){ + rb_gc(); + } +>>>>>>> with_async_validation:ext/mysql.c return obj; } @@ -230,10 +239,12 @@ static VALUE init(VALUE klass) mysql_init(&myp->handler); myp->connection = Qfalse; myp->query_with_result = Qtrue; + myp->gc_disabled = Qfalse; rb_obj_call_init(obj, 0, NULL); return obj; } +<<<<<<< HEAD:ext/mysql.c #ifdef HAVE_TBR typedef struct @@ -297,6 +308,58 @@ static void call_single_function_rb_thread_blocking_region(void *arg_holder_in) #endif +======= +static VALUE connection_identifier( VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + return mysql_thread_id( m ); +} + +static VALUE async_in_progress( VALUE obj ) +{ + struct mysql* m = GetMysqlStruct(obj); + return ( m->async_in_progress == connection_identifier(obj) ) ? Qtrue : Qfalse; +} + +static VALUE async_in_progress_set( VALUE obj, VALUE flag ) +{ + struct mysql* m = GetMysqlStruct(obj); + m->async_in_progress = (flag == Qnil || flag == Qfalse) ? 0 : connection_identifier(obj); + return flag; +} + +static void optimize_for_async( VALUE obj ) +{ + struct mysql* m = GetMysqlStruct(obj); + my_bool was_blocking; + vio_blocking(m->handler.net.vio, 0, &was_blocking); + m->blocking = vio_is_blocking( m->handler.net.vio ); + + vio_fastsend( m->handler.net.vio ); + async_in_progress_set( obj, Qfalse ); +} + +static void schedule_connect(VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + fd_set read; + + struct timeval tv = { tv_sec: m->options.connect_timeout, tv_usec: 0 }; + + if (rb_thread_select(0, NULL, NULL, NULL, &tv) < 0) { + rb_raise(eMysql, "connect: timeout"); + } +/* + FD_ZERO(&read); + FD_SET(m->net.fd, &read); + + if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) { + rb_raise(eMysql, "connect: timeout"); + } +*/ +} + +>>>>>>> with_async_validation:ext/mysql.c /* real_connect(host=nil, user=nil, passwd=nil, db=nil, port=nil, sock=nil, flag=nil) */ static VALUE real_connect(int argc, VALUE* argv, VALUE klass) /* actually gets run */ { @@ -336,19 +399,16 @@ static VALUE real_connect(int argc, VALUE* argv, VALUE klass) /* actually gets r if (mysql_real_connect(&myp->handler, h, u, p, pp, s) == NULL) #endif mysql_raise(&myp->handler); - + myp->handler.reconnect = 0; myp->connection = Qtrue; - my_bool was_blocking; - - vio_blocking(myp->handler.net.vio, 0, &was_blocking); - myp->blocking = vio_is_blocking( myp->handler.net.vio ); - - vio_fastsend( myp->handler.net.vio ); + optimize_for_async(obj); myp->query_with_result = Qtrue; rb_obj_call_init(obj, argc, argv); + + schedule_connect(obj); return obj; } @@ -411,6 +471,9 @@ static VALUE real_connect2(int argc, VALUE* argv, VALUE obj) mysql_raise(m); m->reconnect = 0; GetMysqlStruct(obj)->connection = Qtrue; + + optimize_for_async(obj); + schedule_connect(obj); return obj; } @@ -665,7 +728,7 @@ static VALUE list_fields(int argc, VALUE* argv, VALUE obj) res = mysql_list_fields(m, StringValuePtr(table), NILorSTRING(field)); if (res == NULL) mysql_raise(m); - return mysqlres2obj(res); + return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled); } /* list_processes() */ @@ -675,7 +738,7 @@ static VALUE list_processes(VALUE obj) MYSQL_RES* res = mysql_list_processes(m); if (res == NULL) mysql_raise(m); - return mysqlres2obj(res); + return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled); } /* list_tables(table=nil) */ @@ -793,8 +856,12 @@ static VALUE store_result(VALUE obj) if (res == NULL) mysql_raise(m); +<<<<<<< HEAD:ext/mysql.c return mysqlres2obj(res); +======= + return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled); +>>>>>>> with_async_validation:ext/mysql.c } /* thread_id() */ @@ -810,7 +877,7 @@ static VALUE use_result(VALUE obj) MYSQL_RES* res = mysql_use_result(m); if (res == NULL) mysql_raise(m); - return mysqlres2obj(res); + return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled); } static VALUE res_free(VALUE); @@ -857,7 +924,7 @@ static VALUE query(VALUE obj, VALUE sql) if (mysql_field_count(m) != 0) mysql_raise(m); } else { - VALUE robj = mysqlres2obj(res); + VALUE robj = mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled); rb_ensure(rb_yield, robj, res_free, robj); } #if MYSQL_VERSION_ID >= 40101 @@ -893,6 +960,7 @@ static VALUE socket(VALUE obj) MYSQL* m = GetHandler(obj); return INT2NUM(m->net.fd); } +<<<<<<< HEAD:ext/mysql.c /* socket_type --currently returns true or false, needs some work */ static VALUE socket_type(VALUE obj) { @@ -905,10 +973,38 @@ static VALUE socket_type(VALUE obj) else return Qnil; } +======= +>>>>>>> with_async_validation:ext/mysql.c /* blocking */ static VALUE blocking(VALUE obj){ - return ( GetMysqlStruct(obj)->blocking ? Qtrue : Qfalse ); + return ( GetMysqlStruct(obj)->blocking ? Qtrue : Qfalse ); +} + +/* is_busy */ +static VALUE is_busy(VALUE obj){ + return ( GetMysqlStruct(obj)->busy ? Qtrue : Qfalse ); +} + +static VALUE is_idle(VALUE obj){ + return ( is_busy(obj) == Qtrue ) ? Qfalse : Qtrue; +} + +/* busy(true|false) */ +static VALUE busy_set(VALUE obj, VALUE flag) +{ + if (TYPE(flag) != T_TRUE && TYPE(flag) != T_FALSE) + rb_raise(rb_eTypeError, "invalid type, required true or false."); + GetMysqlStruct(obj)->busy = flag; + return flag; +} + +static void busy( VALUE obj ){ + busy_set( obj, Qtrue ); +} + +static void idle( VALUE obj ){ + busy_set( obj, Qfalse ); } /* readable(timeout=nil) */ @@ -927,65 +1023,214 @@ static VALUE readable( int argc, VALUE* argv, VALUE obj ) return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse ); } +/* retry */ +static VALUE retry( VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + return ( vio_should_retry( m->net.vio ) == 1 ? Qtrue : Qfalse ); +} + +/* interrupted */ +static VALUE interrupted( VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + return ( vio_was_interrupted( m->net.vio ) == 1 ? Qtrue : Qfalse ); +} + +/* reconnected */ +static VALUE reconnected( VALUE obj ){ + MYSQL* m = GetHandler(obj); + int current_connection_id = mysql_thread_id( m ); + mysql_ping(m); + return ( current_connection_id == mysql_thread_id( m ) ) ? Qfalse : Qtrue; +} + +/* disable_gc(true|false) */ +static VALUE disable_gc_set(VALUE obj, VALUE flag) +{ + if (TYPE(flag) != T_TRUE && TYPE(flag) != T_FALSE) + rb_raise(rb_eTypeError, "invalid type, required true or false."); + GetMysqlStruct(obj)->gc_disabled = flag; + return flag; +} + +/* gc_disabled */ +static VALUE gc_disabled( VALUE obj ){ + return GetMysqlStruct(obj)->gc_disabled ? Qtrue: Qfalse; +} + +static void validate_async_query( VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + + if( async_in_progress(obj) == Qtrue ){ + async_in_progress_set(obj, Qfalse); + rb_raise(eMysql, "Query out of sequence: Each call to Mysql#send_query requires a successive Mysql#get_result."); + } +} + +static void simulate_disconnect( VALUE obj ) +{ + MYSQL* m = GetHandler(obj); + mysql_library_end(); +} + +static int begins_with_insensitive(char *candidate, char *check_for_in_upper_case) +{ + /* skip opening whitespace --tab is 11, newline is 12, cr is 15, space 32 */ + char *where_at = candidate; + while( ((*where_at >= 11 && *where_at <= 15) || (*where_at == 32)) && (where_at != 0)) + where_at++; + + char *where_at_in_test = check_for_in_upper_case; + while(*where_at_in_test) + { + int candidate_char = *where_at; + if(candidate_char == 0) + return 0; /* end of line */ + if(candidate_char >= 97 && candidate_char < 122) /* then it's upper case --lower case ify it */ + candidate_char -= 32; + if(candidate_char != *where_at_in_test) + return 0; + where_at_in_test++; + where_at++; + } + return 1; +} + /* send_query(sql) */ static VALUE send_query(VALUE obj, VALUE sql) { MYSQL* m = GetHandler(obj); - + Check_Type(sql, T_STRING); - if (GetMysqlStruct(obj)->connection == Qfalse) { - rb_raise(eMysql, "query: not connected"); + + if (GetMysqlStruct(obj)->connection == Qfalse && async_in_progress(obj) == Qtrue ) { + idle( obj ); + rb_raise(eMysql, "query: not connected"); } - if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0) - mysql_raise(m); - return Qnil; + + validate_async_query(obj); + + if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0){ + idle( obj ); + mysql_raise(m); + } + + /* what about http://dev.mysql.com/doc/refman/5.0/en/implicit-commit.html and more? */ + if( + begins_with_insensitive(RSTRING_PTR(sql), "SET ") || + begins_with_insensitive(RSTRING_PTR(sql), "BEGIN") || + begins_with_insensitive(RSTRING_PTR(sql), "START TRANSACTION") || + begins_with_insensitive(RSTRING_PTR(sql), "ROLLBACK") || + begins_with_insensitive(RSTRING_PTR(sql), "LOCK ") || + begins_with_insensitive(RSTRING_PTR(sql), "UNLOCK ") || + begins_with_insensitive(RSTRING_PTR(sql), "USE ") || + begins_with_insensitive(RSTRING_PTR(sql), "COMMIT") ) + { + /* do not mark an async in progress --they used send_query for something that doesn't necessarily have a result--is this allowable? */ + async_in_progress_set( obj, Qfalse ); + } else { + async_in_progress_set( obj, Qtrue ); + } + + return Qnil; } /* get_result */ static VALUE get_result(VALUE obj) { MYSQL* m = GetHandler(obj); + + async_in_progress_set( obj, Qfalse ); + if (GetMysqlStruct(obj)->connection == Qfalse) { - rb_raise(eMysql, "query: not connected"); + idle( obj ); + rb_raise(eMysql, "query: not connected"); } - if (mysql_read_query_result(m) != 0) - mysql_raise(m); + if (mysql_read_query_result(m) != 0){ + idle( obj ); + mysql_raise(m); + } + if (GetMysqlStruct(obj)->query_with_result == Qfalse) return obj; if (mysql_field_count(m) == 0) - return Qnil; - return store_result(obj); + return Qnil; + return store_result(obj); } +<<<<<<< HEAD:ext/mysql.c static void schedule(VALUE obj, VALUE timeout) +======= +static void schedule_query(VALUE obj, VALUE timeout) +>>>>>>> with_async_validation:ext/mysql.c { MYSQL* m = GetHandler(obj); fd_set read; - + int ret; + timeout = ( NIL_P(timeout) ? m->net.read_timeout : INT2NUM(timeout) ); struct timeval tv = { tv_sec: timeout, tv_usec: 0 }; - FD_ZERO(&read); - FD_SET(m->net.fd, &read); + for(;;){ + FD_ZERO(&read); + FD_SET(m->net.fd, &read); + + ret = rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv); + if (ret < 0) { + idle( obj ); + rb_raise(eMysql, "query: timeout"); + } + + if (ret == 0) { + continue; + } - if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) { - rb_raise(eMysql, "query: timeout"); + if (m->status == MYSQL_STATUS_READY){ + break; + } } +<<<<<<< HEAD:ext/mysql.c +======= +} + +static int should_schedule_query(){ + return rb_thread_alone() != 1; +>>>>>>> with_async_validation:ext/mysql.c } /* async_query(sql,timeout=nil) */ static VALUE async_query(int argc, VALUE* argv, VALUE obj) { +<<<<<<< HEAD:ext/mysql.c VALUE sql, timeout; +======= + MYSQL* m = GetHandler(obj); + VALUE sql, timeout; +>>>>>>> with_async_validation:ext/mysql.c - rb_scan_args(argc, argv, "11", &sql, &timeout); + rb_scan_args(argc, argv, "11", &sql, &timeout); - send_query(obj,sql); + async_in_progress_set( obj, Qfalse ); - schedule(obj, timeout); + busy(obj); - return get_result(obj); + send_query( obj, sql ); + + if ( should_schedule_query() ){ + schedule_query(obj, timeout); + } + + if (rb_block_given_p()) { + rb_yield( get_result(obj) ); + idle( obj ); + return obj; + }else{ + idle( obj ); + return get_result(obj); + } } #if MYSQL_VERSION_ID >= 40100 @@ -2020,7 +2265,7 @@ static VALUE stmt_result_metadata(VALUE obj) mysql_stmt_raise(s->stmt); return Qnil; } - return mysqlres2obj(res); + return mysqlres2obj(res, Qfalse); } /* row_seek(offset) */ @@ -2278,12 +2523,22 @@ void Init_mysql(void) rb_define_method(cMysql, "query", query, 1); rb_define_method(cMysql, "real_query", query, 1); rb_define_method(cMysql, "c_async_query", async_query, -1); + rb_define_method(cMysql, "async_in_progress?", async_in_progress, 0); + rb_define_method(cMysql, "async_in_progress=", async_in_progress_set, 1); rb_define_method(cMysql, "send_query", send_query, 1); + rb_define_method(cMysql, "simulate_disconnect", simulate_disconnect, 0); + rb_define_method(cMysql, "reconnected?", reconnected, 0); rb_define_method(cMysql, "get_result", get_result, 0); rb_define_method(cMysql, "readable?", readable, -1); + rb_define_method(cMysql, "retry?", retry, 0); + rb_define_method(cMysql, "interrupted?", interrupted, 0); rb_define_method(cMysql, "blocking?", blocking, 0); + rb_define_method(cMysql, "gc_disabled?", gc_disabled, 0); + rb_define_method(cMysql, "disable_gc=", disable_gc_set, 1); + rb_define_method(cMysql, "busy?", is_busy, 0); + rb_define_method(cMysql, "idle?", is_idle, 0); + rb_define_method(cMysql, "busy=", busy_set, 1); rb_define_method(cMysql, "socket", socket, 0); - rb_define_method(cMysql, "socket_type", socket_type, 0); rb_define_method(cMysql, "refresh", refresh, 1); rb_define_method(cMysql, "reload", reload, 0); rb_define_method(cMysql, "select_db", select_db, 1); diff --git a/test/async_query_with_block_test.rb b/test/async_query_with_block_test.rb new file mode 100644 index 0000000..7c5a216 --- /dev/null +++ b/test/async_query_with_block_test.rb @@ -0,0 +1,7 @@ +require File.dirname(__FILE__) + '/test_helper' + +m = Mysql.real_connect('localhost','root','','mysql') + +m.c_async_query( 'SELECT * FROM user' ) do |result| + puts result.inspect +end \ No newline at end of file diff --git a/test/gc_benchmark.rb b/test/gc_benchmark.rb new file mode 100644 index 0000000..e5fc7f5 --- /dev/null +++ b/test/gc_benchmark.rb @@ -0,0 +1,41 @@ +require 'rubygems' +require 'mysqlplus' +require 'benchmark' + +with_gc = Mysql.real_connect('localhost','root','','mysql') +without_gc = Mysql.real_connect('localhost','root','','mysql') +without_gc.disable_gc = true + +$gc_stats = [] + +def countable_gc? + GC.respond_to? :count +end + +def gc_counts( label, scope ) + $gc_stats << "Objects #{scope} ( #{label} ) #{GC.count}" +end + +def with_gc_counts( label ) + gc_counts( label, 'before' ) if countable_gc? + yield + gc_counts( label, 'after' ) if countable_gc? +end + +n = 1000 + +Benchmark.bmbm do |x| + x.report( 'With GC' ) do + with_gc_counts( 'With GC' ) do + n.times{ with_gc.c_async_query( 'SELECT * FROM user' ) } + end + end + GC.start + x.report( 'Without GC' ) do + with_gc_counts( 'Without GC' ) do + n.times{ without_gc.c_async_query( 'SELECT * FROM user' ) } + end + end +end + +puts $gc_stats.join( ' | ' ) \ No newline at end of file diff --git a/test/out_of_sync_test.rb b/test/out_of_sync_test.rb new file mode 100644 index 0000000..5c60a43 --- /dev/null +++ b/test/out_of_sync_test.rb @@ -0,0 +1,31 @@ +require File.dirname(__FILE__) + '/test_helper' + +m = Mysql.real_connect('localhost','root') +m.reconnect = true + +class << m + + def safe_query( query ) + begin + send_query( query ) + rescue => e + puts e.message + end + end + +end + +m.safe_query( 'select sleep(1)' ) +m.safe_query( 'select sleep(1)' )#raises +m.simulate_disconnect #fires mysql_library_end +m.safe_query( 'select sleep(1)' ) +m.safe_query( 'select sleep(1)' )#raises +m.close +m.connect('localhost','root') +m.safe_query( 'select sleep(1)' ) +m.safe_query( 'select sleep(1)' )#raises +m.simulate_disconnect +m.safe_query( 'BEGIN' ) +m.safe_query( 'select sleep(1)' ) +m.get_result() +m.safe_query( 'COMMIT' ) \ No newline at end of file diff --git a/test/reconnected_test.rb b/test/reconnected_test.rb new file mode 100644 index 0000000..f4f0154 --- /dev/null +++ b/test/reconnected_test.rb @@ -0,0 +1,18 @@ +require File.dirname(__FILE__) + '/test_helper' + +$m = Mysql.real_connect('localhost','root') +#$m.reconnect = true + +def assert_reconnected + puts $m.reconnected?().inspect + sleep 1 + yield + puts $m.reconnected?().inspect +end + +assert_reconnected do + $m.simulate_disconnect +end +assert_reconnected do + $m.close +end \ No newline at end of file