overcome a merge conflict which wasn't one

This commit is contained in:
Roger Pack 2009-04-18 22:24:31 +00:00
commit e4bb045695
6 changed files with 388 additions and 36 deletions

View File

@ -61,7 +61,10 @@ struct mysql {
MYSQL handler; MYSQL handler;
char connection; char connection;
char query_with_result; char query_with_result;
char gc_disabled;
char blocking; char blocking;
int async_in_progress;
char busy;
}; };
struct mysql_res { struct mysql_res {
@ -181,7 +184,7 @@ static void mysql_raise(MYSQL* m)
rb_exc_raise(e); rb_exc_raise(e);
} }
static VALUE mysqlres2obj(MYSQL_RES* res) static VALUE mysqlres2obj(MYSQL_RES* res, VALUE gc_disabled)
{ {
VALUE obj; VALUE obj;
struct mysql_res* resp; struct mysql_res* resp;
@ -191,10 +194,16 @@ static VALUE mysqlres2obj(MYSQL_RES* res)
resp->res = res; resp->res = res;
resp->freed = Qfalse; resp->freed = Qfalse;
rb_obj_call_init(obj, 0, NULL); rb_obj_call_init(obj, 0, NULL);
<<<<<<< HEAD:ext/mysql.c
/* disabled until it can be reviewed further--rely on the normal GC for now. /* disabled until it can be reviewed further--rely on the normal GC for now.
if (++store_result_count > GC_STORE_RESULT_LIMIT) if (++store_result_count > GC_STORE_RESULT_LIMIT)
rb_gc(); rb_gc();
*/ */
=======
if (++store_result_count > GC_STORE_RESULT_LIMIT && gc_disabled == Qfalse){
rb_gc();
}
>>>>>>> with_async_validation:ext/mysql.c
return obj; return obj;
} }
@ -230,10 +239,12 @@ static VALUE init(VALUE klass)
mysql_init(&myp->handler); mysql_init(&myp->handler);
myp->connection = Qfalse; myp->connection = Qfalse;
myp->query_with_result = Qtrue; myp->query_with_result = Qtrue;
myp->gc_disabled = Qfalse;
rb_obj_call_init(obj, 0, NULL); rb_obj_call_init(obj, 0, NULL);
return obj; return obj;
} }
<<<<<<< HEAD:ext/mysql.c
#ifdef HAVE_TBR #ifdef HAVE_TBR
typedef struct typedef struct
@ -297,6 +308,58 @@ static void call_single_function_rb_thread_blocking_region(void *arg_holder_in)
#endif #endif
=======
static VALUE connection_identifier( VALUE obj )
{
MYSQL* m = GetHandler(obj);
return mysql_thread_id( m );
}
static VALUE async_in_progress( VALUE obj )
{
struct mysql* m = GetMysqlStruct(obj);
return ( m->async_in_progress == connection_identifier(obj) ) ? Qtrue : Qfalse;
}
static VALUE async_in_progress_set( VALUE obj, VALUE flag )
{
struct mysql* m = GetMysqlStruct(obj);
m->async_in_progress = (flag == Qnil || flag == Qfalse) ? 0 : connection_identifier(obj);
return flag;
}
static void optimize_for_async( VALUE obj )
{
struct mysql* m = GetMysqlStruct(obj);
my_bool was_blocking;
vio_blocking(m->handler.net.vio, 0, &was_blocking);
m->blocking = vio_is_blocking( m->handler.net.vio );
vio_fastsend( m->handler.net.vio );
async_in_progress_set( obj, Qfalse );
}
static void schedule_connect(VALUE obj )
{
MYSQL* m = GetHandler(obj);
fd_set read;
struct timeval tv = { tv_sec: m->options.connect_timeout, tv_usec: 0 };
if (rb_thread_select(0, NULL, NULL, NULL, &tv) < 0) {
rb_raise(eMysql, "connect: timeout");
}
/*
FD_ZERO(&read);
FD_SET(m->net.fd, &read);
if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) {
rb_raise(eMysql, "connect: timeout");
}
*/
}
>>>>>>> with_async_validation:ext/mysql.c
/* real_connect(host=nil, user=nil, passwd=nil, db=nil, port=nil, sock=nil, flag=nil) */ /* real_connect(host=nil, user=nil, passwd=nil, db=nil, port=nil, sock=nil, flag=nil) */
static VALUE real_connect(int argc, VALUE* argv, VALUE klass) /* actually gets run */ static VALUE real_connect(int argc, VALUE* argv, VALUE klass) /* actually gets run */
{ {
@ -340,16 +403,13 @@ static VALUE real_connect(int argc, VALUE* argv, VALUE klass) /* actually gets r
myp->handler.reconnect = 0; myp->handler.reconnect = 0;
myp->connection = Qtrue; myp->connection = Qtrue;
my_bool was_blocking; optimize_for_async(obj);
vio_blocking(myp->handler.net.vio, 0, &was_blocking);
myp->blocking = vio_is_blocking( myp->handler.net.vio );
vio_fastsend( myp->handler.net.vio );
myp->query_with_result = Qtrue; myp->query_with_result = Qtrue;
rb_obj_call_init(obj, argc, argv); rb_obj_call_init(obj, argc, argv);
schedule_connect(obj);
return obj; return obj;
} }
@ -412,6 +472,9 @@ static VALUE real_connect2(int argc, VALUE* argv, VALUE obj)
m->reconnect = 0; m->reconnect = 0;
GetMysqlStruct(obj)->connection = Qtrue; GetMysqlStruct(obj)->connection = Qtrue;
optimize_for_async(obj);
schedule_connect(obj);
return obj; return obj;
} }
@ -665,7 +728,7 @@ static VALUE list_fields(int argc, VALUE* argv, VALUE obj)
res = mysql_list_fields(m, StringValuePtr(table), NILorSTRING(field)); res = mysql_list_fields(m, StringValuePtr(table), NILorSTRING(field));
if (res == NULL) if (res == NULL)
mysql_raise(m); mysql_raise(m);
return mysqlres2obj(res); return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled);
} }
/* list_processes() */ /* list_processes() */
@ -675,7 +738,7 @@ static VALUE list_processes(VALUE obj)
MYSQL_RES* res = mysql_list_processes(m); MYSQL_RES* res = mysql_list_processes(m);
if (res == NULL) if (res == NULL)
mysql_raise(m); mysql_raise(m);
return mysqlres2obj(res); return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled);
} }
/* list_tables(table=nil) */ /* list_tables(table=nil) */
@ -793,8 +856,12 @@ static VALUE store_result(VALUE obj)
if (res == NULL) if (res == NULL)
mysql_raise(m); mysql_raise(m);
<<<<<<< HEAD:ext/mysql.c
return mysqlres2obj(res); return mysqlres2obj(res);
=======
return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled);
>>>>>>> with_async_validation:ext/mysql.c
} }
/* thread_id() */ /* thread_id() */
@ -810,7 +877,7 @@ static VALUE use_result(VALUE obj)
MYSQL_RES* res = mysql_use_result(m); MYSQL_RES* res = mysql_use_result(m);
if (res == NULL) if (res == NULL)
mysql_raise(m); mysql_raise(m);
return mysqlres2obj(res); return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled);
} }
static VALUE res_free(VALUE); static VALUE res_free(VALUE);
@ -857,7 +924,7 @@ static VALUE query(VALUE obj, VALUE sql)
if (mysql_field_count(m) != 0) if (mysql_field_count(m) != 0)
mysql_raise(m); mysql_raise(m);
} else { } else {
VALUE robj = mysqlres2obj(res); VALUE robj = mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled);
rb_ensure(rb_yield, robj, res_free, robj); rb_ensure(rb_yield, robj, res_free, robj);
} }
#if MYSQL_VERSION_ID >= 40101 #if MYSQL_VERSION_ID >= 40101
@ -893,6 +960,7 @@ static VALUE socket(VALUE obj)
MYSQL* m = GetHandler(obj); MYSQL* m = GetHandler(obj);
return INT2NUM(m->net.fd); return INT2NUM(m->net.fd);
} }
<<<<<<< HEAD:ext/mysql.c
/* socket_type --currently returns true or false, needs some work */ /* socket_type --currently returns true or false, needs some work */
static VALUE socket_type(VALUE obj) static VALUE socket_type(VALUE obj)
{ {
@ -905,12 +973,40 @@ static VALUE socket_type(VALUE obj)
else else
return Qnil; return Qnil;
} }
=======
>>>>>>> with_async_validation:ext/mysql.c
/* blocking */ /* blocking */
static VALUE blocking(VALUE obj){ static VALUE blocking(VALUE obj){
return ( GetMysqlStruct(obj)->blocking ? Qtrue : Qfalse ); return ( GetMysqlStruct(obj)->blocking ? Qtrue : Qfalse );
} }
/* is_busy */
static VALUE is_busy(VALUE obj){
return ( GetMysqlStruct(obj)->busy ? Qtrue : Qfalse );
}
static VALUE is_idle(VALUE obj){
return ( is_busy(obj) == Qtrue ) ? Qfalse : Qtrue;
}
/* busy(true|false) */
static VALUE busy_set(VALUE obj, VALUE flag)
{
if (TYPE(flag) != T_TRUE && TYPE(flag) != T_FALSE)
rb_raise(rb_eTypeError, "invalid type, required true or false.");
GetMysqlStruct(obj)->busy = flag;
return flag;
}
static void busy( VALUE obj ){
busy_set( obj, Qtrue );
}
static void idle( VALUE obj ){
busy_set( obj, Qfalse );
}
/* readable(timeout=nil) */ /* readable(timeout=nil) */
static VALUE readable( int argc, VALUE* argv, VALUE obj ) static VALUE readable( int argc, VALUE* argv, VALUE obj )
{ {
@ -927,17 +1023,117 @@ static VALUE readable( int argc, VALUE* argv, VALUE obj )
return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse ); return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse );
} }
/* retry */
static VALUE retry( VALUE obj )
{
MYSQL* m = GetHandler(obj);
return ( vio_should_retry( m->net.vio ) == 1 ? Qtrue : Qfalse );
}
/* interrupted */
static VALUE interrupted( VALUE obj )
{
MYSQL* m = GetHandler(obj);
return ( vio_was_interrupted( m->net.vio ) == 1 ? Qtrue : Qfalse );
}
/* reconnected */
static VALUE reconnected( VALUE obj ){
MYSQL* m = GetHandler(obj);
int current_connection_id = mysql_thread_id( m );
mysql_ping(m);
return ( current_connection_id == mysql_thread_id( m ) ) ? Qfalse : Qtrue;
}
/* disable_gc(true|false) */
static VALUE disable_gc_set(VALUE obj, VALUE flag)
{
if (TYPE(flag) != T_TRUE && TYPE(flag) != T_FALSE)
rb_raise(rb_eTypeError, "invalid type, required true or false.");
GetMysqlStruct(obj)->gc_disabled = flag;
return flag;
}
/* gc_disabled */
static VALUE gc_disabled( VALUE obj ){
return GetMysqlStruct(obj)->gc_disabled ? Qtrue: Qfalse;
}
static void validate_async_query( VALUE obj )
{
MYSQL* m = GetHandler(obj);
if( async_in_progress(obj) == Qtrue ){
async_in_progress_set(obj, Qfalse);
rb_raise(eMysql, "Query out of sequence: Each call to Mysql#send_query requires a successive Mysql#get_result.");
}
}
static void simulate_disconnect( VALUE obj )
{
MYSQL* m = GetHandler(obj);
mysql_library_end();
}
static int begins_with_insensitive(char *candidate, char *check_for_in_upper_case)
{
/* skip opening whitespace --tab is 11, newline is 12, cr is 15, space 32 */
char *where_at = candidate;
while( ((*where_at >= 11 && *where_at <= 15) || (*where_at == 32)) && (where_at != 0))
where_at++;
char *where_at_in_test = check_for_in_upper_case;
while(*where_at_in_test)
{
int candidate_char = *where_at;
if(candidate_char == 0)
return 0; /* end of line */
if(candidate_char >= 97 && candidate_char < 122) /* then it's upper case --lower case ify it */
candidate_char -= 32;
if(candidate_char != *where_at_in_test)
return 0;
where_at_in_test++;
where_at++;
}
return 1;
}
/* send_query(sql) */ /* send_query(sql) */
static VALUE send_query(VALUE obj, VALUE sql) static VALUE send_query(VALUE obj, VALUE sql)
{ {
MYSQL* m = GetHandler(obj); MYSQL* m = GetHandler(obj);
Check_Type(sql, T_STRING); Check_Type(sql, T_STRING);
if (GetMysqlStruct(obj)->connection == Qfalse) {
if (GetMysqlStruct(obj)->connection == Qfalse && async_in_progress(obj) == Qtrue ) {
idle( obj );
rb_raise(eMysql, "query: not connected"); rb_raise(eMysql, "query: not connected");
} }
if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0)
validate_async_query(obj);
if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0){
idle( obj );
mysql_raise(m); mysql_raise(m);
}
/* what about http://dev.mysql.com/doc/refman/5.0/en/implicit-commit.html and more? */
if(
begins_with_insensitive(RSTRING_PTR(sql), "SET ") ||
begins_with_insensitive(RSTRING_PTR(sql), "BEGIN") ||
begins_with_insensitive(RSTRING_PTR(sql), "START TRANSACTION") ||
begins_with_insensitive(RSTRING_PTR(sql), "ROLLBACK") ||
begins_with_insensitive(RSTRING_PTR(sql), "LOCK ") ||
begins_with_insensitive(RSTRING_PTR(sql), "UNLOCK ") ||
begins_with_insensitive(RSTRING_PTR(sql), "USE ") ||
begins_with_insensitive(RSTRING_PTR(sql), "COMMIT") )
{
/* do not mark an async in progress --they used send_query for something that doesn't necessarily have a result--is this allowable? */
async_in_progress_set( obj, Qfalse );
} else {
async_in_progress_set( obj, Qtrue );
}
return Qnil; return Qnil;
} }
@ -945,11 +1141,18 @@ static VALUE send_query(VALUE obj, VALUE sql)
static VALUE get_result(VALUE obj) static VALUE get_result(VALUE obj)
{ {
MYSQL* m = GetHandler(obj); MYSQL* m = GetHandler(obj);
async_in_progress_set( obj, Qfalse );
if (GetMysqlStruct(obj)->connection == Qfalse) { if (GetMysqlStruct(obj)->connection == Qfalse) {
idle( obj );
rb_raise(eMysql, "query: not connected"); rb_raise(eMysql, "query: not connected");
} }
if (mysql_read_query_result(m) != 0) if (mysql_read_query_result(m) != 0){
idle( obj );
mysql_raise(m); mysql_raise(m);
}
if (GetMysqlStruct(obj)->query_with_result == Qfalse) if (GetMysqlStruct(obj)->query_with_result == Qfalse)
return obj; return obj;
if (mysql_field_count(m) == 0) if (mysql_field_count(m) == 0)
@ -957,35 +1160,77 @@ static VALUE get_result(VALUE obj)
return store_result(obj); return store_result(obj);
} }
<<<<<<< HEAD:ext/mysql.c
static void schedule(VALUE obj, VALUE timeout) static void schedule(VALUE obj, VALUE timeout)
=======
static void schedule_query(VALUE obj, VALUE timeout)
>>>>>>> with_async_validation:ext/mysql.c
{ {
MYSQL* m = GetHandler(obj); MYSQL* m = GetHandler(obj);
fd_set read; fd_set read;
int ret;
timeout = ( NIL_P(timeout) ? m->net.read_timeout : INT2NUM(timeout) ); timeout = ( NIL_P(timeout) ? m->net.read_timeout : INT2NUM(timeout) );
struct timeval tv = { tv_sec: timeout, tv_usec: 0 }; struct timeval tv = { tv_sec: timeout, tv_usec: 0 };
for(;;){
FD_ZERO(&read); FD_ZERO(&read);
FD_SET(m->net.fd, &read); FD_SET(m->net.fd, &read);
if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) { ret = rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv);
if (ret < 0) {
idle( obj );
rb_raise(eMysql, "query: timeout"); rb_raise(eMysql, "query: timeout");
} }
if (ret == 0) {
continue;
}
if (m->status == MYSQL_STATUS_READY){
break;
}
}
<<<<<<< HEAD:ext/mysql.c
=======
}
static int should_schedule_query(){
return rb_thread_alone() != 1;
>>>>>>> with_async_validation:ext/mysql.c
} }
/* async_query(sql,timeout=nil) */ /* async_query(sql,timeout=nil) */
static VALUE async_query(int argc, VALUE* argv, VALUE obj) static VALUE async_query(int argc, VALUE* argv, VALUE obj)
{ {
<<<<<<< HEAD:ext/mysql.c
VALUE sql, timeout; VALUE sql, timeout;
=======
MYSQL* m = GetHandler(obj);
VALUE sql, timeout;
>>>>>>> with_async_validation:ext/mysql.c
rb_scan_args(argc, argv, "11", &sql, &timeout); rb_scan_args(argc, argv, "11", &sql, &timeout);
send_query(obj,sql); async_in_progress_set( obj, Qfalse );
schedule(obj, timeout); busy(obj);
send_query( obj, sql );
if ( should_schedule_query() ){
schedule_query(obj, timeout);
}
if (rb_block_given_p()) {
rb_yield( get_result(obj) );
idle( obj );
return obj;
}else{
idle( obj );
return get_result(obj); return get_result(obj);
}
} }
#if MYSQL_VERSION_ID >= 40100 #if MYSQL_VERSION_ID >= 40100
@ -2020,7 +2265,7 @@ static VALUE stmt_result_metadata(VALUE obj)
mysql_stmt_raise(s->stmt); mysql_stmt_raise(s->stmt);
return Qnil; return Qnil;
} }
return mysqlres2obj(res); return mysqlres2obj(res, Qfalse);
} }
/* row_seek(offset) */ /* row_seek(offset) */
@ -2278,12 +2523,22 @@ void Init_mysql(void)
rb_define_method(cMysql, "query", query, 1); rb_define_method(cMysql, "query", query, 1);
rb_define_method(cMysql, "real_query", query, 1); rb_define_method(cMysql, "real_query", query, 1);
rb_define_method(cMysql, "c_async_query", async_query, -1); rb_define_method(cMysql, "c_async_query", async_query, -1);
rb_define_method(cMysql, "async_in_progress?", async_in_progress, 0);
rb_define_method(cMysql, "async_in_progress=", async_in_progress_set, 1);
rb_define_method(cMysql, "send_query", send_query, 1); rb_define_method(cMysql, "send_query", send_query, 1);
rb_define_method(cMysql, "simulate_disconnect", simulate_disconnect, 0);
rb_define_method(cMysql, "reconnected?", reconnected, 0);
rb_define_method(cMysql, "get_result", get_result, 0); rb_define_method(cMysql, "get_result", get_result, 0);
rb_define_method(cMysql, "readable?", readable, -1); rb_define_method(cMysql, "readable?", readable, -1);
rb_define_method(cMysql, "retry?", retry, 0);
rb_define_method(cMysql, "interrupted?", interrupted, 0);
rb_define_method(cMysql, "blocking?", blocking, 0); rb_define_method(cMysql, "blocking?", blocking, 0);
rb_define_method(cMysql, "gc_disabled?", gc_disabled, 0);
rb_define_method(cMysql, "disable_gc=", disable_gc_set, 1);
rb_define_method(cMysql, "busy?", is_busy, 0);
rb_define_method(cMysql, "idle?", is_idle, 0);
rb_define_method(cMysql, "busy=", busy_set, 1);
rb_define_method(cMysql, "socket", socket, 0); rb_define_method(cMysql, "socket", socket, 0);
rb_define_method(cMysql, "socket_type", socket_type, 0);
rb_define_method(cMysql, "refresh", refresh, 1); rb_define_method(cMysql, "refresh", refresh, 1);
rb_define_method(cMysql, "reload", reload, 0); rb_define_method(cMysql, "reload", reload, 0);
rb_define_method(cMysql, "select_db", select_db, 1); rb_define_method(cMysql, "select_db", select_db, 1);

View File

@ -0,0 +1,7 @@
require File.dirname(__FILE__) + '/test_helper'
m = Mysql.real_connect('localhost','root','','mysql')
m.c_async_query( 'SELECT * FROM user' ) do |result|
puts result.inspect
end

41
test/gc_benchmark.rb Normal file
View File

@ -0,0 +1,41 @@
require 'rubygems'
require 'mysqlplus'
require 'benchmark'
with_gc = Mysql.real_connect('localhost','root','','mysql')
without_gc = Mysql.real_connect('localhost','root','','mysql')
without_gc.disable_gc = true
$gc_stats = []
def countable_gc?
GC.respond_to? :count
end
def gc_counts( label, scope )
$gc_stats << "Objects #{scope} ( #{label} ) #{GC.count}"
end
def with_gc_counts( label )
gc_counts( label, 'before' ) if countable_gc?
yield
gc_counts( label, 'after' ) if countable_gc?
end
n = 1000
Benchmark.bmbm do |x|
x.report( 'With GC' ) do
with_gc_counts( 'With GC' ) do
n.times{ with_gc.c_async_query( 'SELECT * FROM user' ) }
end
end
GC.start
x.report( 'Without GC' ) do
with_gc_counts( 'Without GC' ) do
n.times{ without_gc.c_async_query( 'SELECT * FROM user' ) }
end
end
end
puts $gc_stats.join( ' | ' )

31
test/out_of_sync_test.rb Normal file
View File

@ -0,0 +1,31 @@
require File.dirname(__FILE__) + '/test_helper'
m = Mysql.real_connect('localhost','root')
m.reconnect = true
class << m
def safe_query( query )
begin
send_query( query )
rescue => e
puts e.message
end
end
end
m.safe_query( 'select sleep(1)' )
m.safe_query( 'select sleep(1)' )#raises
m.simulate_disconnect #fires mysql_library_end
m.safe_query( 'select sleep(1)' )
m.safe_query( 'select sleep(1)' )#raises
m.close
m.connect('localhost','root')
m.safe_query( 'select sleep(1)' )
m.safe_query( 'select sleep(1)' )#raises
m.simulate_disconnect
m.safe_query( 'BEGIN' )
m.safe_query( 'select sleep(1)' )
m.get_result()
m.safe_query( 'COMMIT' )

18
test/reconnected_test.rb Normal file
View File

@ -0,0 +1,18 @@
require File.dirname(__FILE__) + '/test_helper'
$m = Mysql.real_connect('localhost','root')
#$m.reconnect = true
def assert_reconnected
puts $m.reconnected?().inspect
sleep 1
yield
puts $m.reconnected?().inspect
end
assert_reconnected do
$m.simulate_disconnect
end
assert_reconnected do
$m.close
end