Validate async queries
This commit is contained in:
parent
c186184809
commit
85141abf09
155
ext/mysql.c
155
ext/mysql.c
@ -61,6 +61,7 @@ struct mysql {
|
|||||||
char connection;
|
char connection;
|
||||||
char query_with_result;
|
char query_with_result;
|
||||||
char blocking;
|
char blocking;
|
||||||
|
int async_in_progress;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct mysql_res {
|
struct mysql_res {
|
||||||
@ -231,6 +232,57 @@ static VALUE init(VALUE klass)
|
|||||||
return obj;
|
return obj;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static VALUE connection_identifier( VALUE obj )
|
||||||
|
{
|
||||||
|
MYSQL* m = GetHandler(obj);
|
||||||
|
return mysql_thread_id( m );
|
||||||
|
}
|
||||||
|
|
||||||
|
static VALUE async_in_progress( VALUE obj )
|
||||||
|
{
|
||||||
|
struct mysql* m = GetMysqlStruct(obj);
|
||||||
|
return ( m->async_in_progress == connection_identifier(obj) ) ? Qtrue : Qfalse;
|
||||||
|
}
|
||||||
|
|
||||||
|
static VALUE async_in_progress_set( VALUE obj, VALUE flag )
|
||||||
|
{
|
||||||
|
struct mysql* m = GetMysqlStruct(obj);
|
||||||
|
m->async_in_progress = (flag == Qnil || flag == Qfalse) ? 0 : connection_identifier(obj);
|
||||||
|
return flag;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void optimize_for_async( VALUE obj )
|
||||||
|
{
|
||||||
|
struct mysql* m = GetMysqlStruct(obj);
|
||||||
|
my_bool was_blocking;
|
||||||
|
vio_blocking(m->handler.net.vio, 0, &was_blocking);
|
||||||
|
m->blocking = vio_is_blocking( m->handler.net.vio );
|
||||||
|
|
||||||
|
vio_fastsend( m->handler.net.vio );
|
||||||
|
async_in_progress_set( obj, Qfalse );
|
||||||
|
/*last_connection_identifier(obj);*/
|
||||||
|
}
|
||||||
|
|
||||||
|
static void schedule_connect(VALUE obj )
|
||||||
|
{
|
||||||
|
MYSQL* m = GetHandler(obj);
|
||||||
|
fd_set read;
|
||||||
|
|
||||||
|
struct timeval tv = { tv_sec: m->options.connect_timeout, tv_usec: 0 };
|
||||||
|
|
||||||
|
if (rb_thread_select(0, NULL, NULL, NULL, &tv) < 0) {
|
||||||
|
rb_raise(eMysql, "connect: timeout");
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
FD_ZERO(&read);
|
||||||
|
FD_SET(m->net.fd, &read);
|
||||||
|
|
||||||
|
if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) {
|
||||||
|
rb_raise(eMysql, "connect: timeout");
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
/* real_connect(host=nil, user=nil, passwd=nil, db=nil, port=nil, sock=nil, flag=nil) */
|
/* real_connect(host=nil, user=nil, passwd=nil, db=nil, port=nil, sock=nil, flag=nil) */
|
||||||
static VALUE real_connect(int argc, VALUE* argv, VALUE klass)
|
static VALUE real_connect(int argc, VALUE* argv, VALUE klass)
|
||||||
{
|
{
|
||||||
@ -270,16 +322,13 @@ static VALUE real_connect(int argc, VALUE* argv, VALUE klass)
|
|||||||
myp->handler.reconnect = 0;
|
myp->handler.reconnect = 0;
|
||||||
myp->connection = Qtrue;
|
myp->connection = Qtrue;
|
||||||
|
|
||||||
my_bool was_blocking;
|
optimize_for_async(obj);
|
||||||
|
|
||||||
vio_blocking(myp->handler.net.vio, 0, &was_blocking);
|
|
||||||
myp->blocking = vio_is_blocking( myp->handler.net.vio );
|
|
||||||
|
|
||||||
vio_fastsend( myp->handler.net.vio );
|
|
||||||
|
|
||||||
myp->query_with_result = Qtrue;
|
myp->query_with_result = Qtrue;
|
||||||
rb_obj_call_init(obj, argc, argv);
|
rb_obj_call_init(obj, argc, argv);
|
||||||
|
|
||||||
|
schedule_connect(obj);
|
||||||
|
|
||||||
return obj;
|
return obj;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -342,6 +391,9 @@ static VALUE real_connect2(int argc, VALUE* argv, VALUE obj)
|
|||||||
m->reconnect = 0;
|
m->reconnect = 0;
|
||||||
GetMysqlStruct(obj)->connection = Qtrue;
|
GetMysqlStruct(obj)->connection = Qtrue;
|
||||||
|
|
||||||
|
optimize_for_async(obj);
|
||||||
|
schedule_connect(obj);
|
||||||
|
|
||||||
return obj;
|
return obj;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -764,13 +816,6 @@ static VALUE socket(VALUE obj)
|
|||||||
MYSQL* m = GetHandler(obj);
|
MYSQL* m = GetHandler(obj);
|
||||||
return INT2NUM(m->net.fd);
|
return INT2NUM(m->net.fd);
|
||||||
}
|
}
|
||||||
/* socket_type */
|
|
||||||
static VALUE socket_type(VALUE obj)
|
|
||||||
{
|
|
||||||
MYSQL* m = GetHandler(obj);
|
|
||||||
VALUE description = vio_description( m->net.vio );
|
|
||||||
return NILorSTRING( description );
|
|
||||||
}
|
|
||||||
|
|
||||||
/* blocking */
|
/* blocking */
|
||||||
static VALUE blocking(VALUE obj){
|
static VALUE blocking(VALUE obj){
|
||||||
@ -793,17 +838,80 @@ static VALUE readable( int argc, VALUE* argv, VALUE obj )
|
|||||||
return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse );
|
return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* ready */
|
||||||
|
static VALUE ready( VALUE obj )
|
||||||
|
{
|
||||||
|
MYSQL* m = GetHandler(obj);
|
||||||
|
return ( m->status == MYSQL_STATUS_READY ) ? Qtrue : Qfalse;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* retry */
|
||||||
|
static VALUE retry( VALUE obj )
|
||||||
|
{
|
||||||
|
MYSQL* m = GetHandler(obj);
|
||||||
|
return ( vio_should_retry( m->net.vio ) == 1 ? Qtrue : Qfalse );
|
||||||
|
}
|
||||||
|
|
||||||
|
/* interrupted */
|
||||||
|
static VALUE interrupted( VALUE obj )
|
||||||
|
{
|
||||||
|
MYSQL* m = GetHandler(obj);
|
||||||
|
return ( vio_was_interrupted( m->net.vio ) == 1 ? Qtrue : Qfalse );
|
||||||
|
}
|
||||||
|
|
||||||
|
static void validate_async_query( VALUE obj )
|
||||||
|
{
|
||||||
|
MYSQL* m = GetHandler(obj);
|
||||||
|
|
||||||
|
if( async_in_progress(obj) == Qtrue ){
|
||||||
|
async_in_progress_set(obj, Qfalse);
|
||||||
|
rb_raise(eMysql, "Query out of sequence: Each call to Mysql#send_query requires a successive Mysql#get_result.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
static VALUE connection_dropped( VALUE obj )
|
||||||
|
{
|
||||||
|
MYSQL* m = GetHandler(obj);
|
||||||
|
if( (mysql_errno(m) == CR_SERVER_LOST ) || ( mysql_errno(m) == CR_SERVER_GONE_ERROR ) || ( mysql_errno(m) == ER_SERVER_SHUTDOWN) ){
|
||||||
|
return Qtrue;
|
||||||
|
}else{
|
||||||
|
return Qfalse;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
static void async_reconnect( VALUE obj )
|
||||||
|
{
|
||||||
|
connect(obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
static VALUE simulate_disconnect( VALUE obj )
|
||||||
|
{
|
||||||
|
MYSQL* m = GetHandler(obj);
|
||||||
|
mysql_library_end();
|
||||||
|
return Qnil;
|
||||||
|
}
|
||||||
|
|
||||||
/* send_query(sql) */
|
/* send_query(sql) */
|
||||||
static VALUE send_query(VALUE obj, VALUE sql)
|
static VALUE send_query(VALUE obj, VALUE sql)
|
||||||
{
|
{
|
||||||
MYSQL* m = GetHandler(obj);
|
MYSQL* m = GetHandler(obj);
|
||||||
|
|
||||||
Check_Type(sql, T_STRING);
|
Check_Type(sql, T_STRING);
|
||||||
if (GetMysqlStruct(obj)->connection == Qfalse) {
|
|
||||||
|
if (GetMysqlStruct(obj)->connection == Qfalse && async_in_progress(obj) == Qtrue ) {
|
||||||
rb_raise(eMysql, "query: not connected");
|
rb_raise(eMysql, "query: not connected");
|
||||||
}
|
}
|
||||||
if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0)
|
|
||||||
|
validate_async_query(obj);
|
||||||
|
|
||||||
|
if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0){
|
||||||
mysql_raise(m);
|
mysql_raise(m);
|
||||||
|
/*( connection_dropped(obj) == Qtrue ) ? async_reconnect(obj) : mysql_raise(m);*/
|
||||||
|
}
|
||||||
|
|
||||||
|
async_in_progress_set( obj, Qtrue );
|
||||||
return Qnil;
|
return Qnil;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -820,10 +928,11 @@ static VALUE get_result(VALUE obj)
|
|||||||
return obj;
|
return obj;
|
||||||
if (mysql_field_count(m) == 0)
|
if (mysql_field_count(m) == 0)
|
||||||
return Qnil;
|
return Qnil;
|
||||||
|
async_in_progress_set( obj, Qfalse );
|
||||||
return store_result(obj);
|
return store_result(obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
static VALUE schedule(VALUE obj, VALUE timeout)
|
static void schedule_query(VALUE obj, VALUE timeout)
|
||||||
{
|
{
|
||||||
MYSQL* m = GetHandler(obj);
|
MYSQL* m = GetHandler(obj);
|
||||||
fd_set read;
|
fd_set read;
|
||||||
@ -838,7 +947,6 @@ static VALUE schedule(VALUE obj, VALUE timeout)
|
|||||||
if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) {
|
if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) {
|
||||||
rb_raise(eMysql, "query: timeout");
|
rb_raise(eMysql, "query: timeout");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* async_query(sql,timeout=nil) */
|
/* async_query(sql,timeout=nil) */
|
||||||
@ -849,9 +957,13 @@ static VALUE async_query(int argc, VALUE* argv, VALUE obj)
|
|||||||
|
|
||||||
rb_scan_args(argc, argv, "11", &sql, &timeout);
|
rb_scan_args(argc, argv, "11", &sql, &timeout);
|
||||||
|
|
||||||
|
/*last_connection_identifier( obj );*/
|
||||||
|
|
||||||
|
async_in_progress_set( obj, Qfalse );
|
||||||
|
|
||||||
send_query(obj,sql);
|
send_query(obj,sql);
|
||||||
|
|
||||||
schedule(obj, timeout);
|
schedule_query(obj, timeout);
|
||||||
|
|
||||||
return get_result(obj);
|
return get_result(obj);
|
||||||
}
|
}
|
||||||
@ -2144,12 +2256,17 @@ void Init_mysql(void)
|
|||||||
rb_define_method(cMysql, "query", query, 1);
|
rb_define_method(cMysql, "query", query, 1);
|
||||||
rb_define_method(cMysql, "real_query", query, 1);
|
rb_define_method(cMysql, "real_query", query, 1);
|
||||||
rb_define_method(cMysql, "c_async_query", async_query, -1);
|
rb_define_method(cMysql, "c_async_query", async_query, -1);
|
||||||
|
rb_define_method(cMysql, "async_in_progress?", async_in_progress, 0);
|
||||||
|
rb_define_method(cMysql, "async_in_progress=", async_in_progress_set, 1);
|
||||||
rb_define_method(cMysql, "send_query", send_query, 1);
|
rb_define_method(cMysql, "send_query", send_query, 1);
|
||||||
|
rb_define_method(cMysql, "simulate_disconnect", simulate_disconnect, 0);
|
||||||
rb_define_method(cMysql, "get_result", get_result, 0);
|
rb_define_method(cMysql, "get_result", get_result, 0);
|
||||||
rb_define_method(cMysql, "readable?", readable, -1);
|
rb_define_method(cMysql, "readable?", readable, -1);
|
||||||
|
rb_define_method(cMysql, "retry?", retry, 0);
|
||||||
|
rb_define_method(cMysql, "ready?", ready, 0);
|
||||||
|
rb_define_method(cMysql, "interrupted?", interrupted, 0);
|
||||||
rb_define_method(cMysql, "blocking?", blocking, 0);
|
rb_define_method(cMysql, "blocking?", blocking, 0);
|
||||||
rb_define_method(cMysql, "socket", socket, 0);
|
rb_define_method(cMysql, "socket", socket, 0);
|
||||||
rb_define_method(cMysql, "socket_type", socket_type, 0);
|
|
||||||
rb_define_method(cMysql, "refresh", refresh, 1);
|
rb_define_method(cMysql, "refresh", refresh, 1);
|
||||||
rb_define_method(cMysql, "reload", reload, 0);
|
rb_define_method(cMysql, "reload", reload, 0);
|
||||||
rb_define_method(cMysql, "select_db", select_db, 1);
|
rb_define_method(cMysql, "select_db", select_db, 1);
|
||||||
|
26
test/out_of_sync_test.rb
Normal file
26
test/out_of_sync_test.rb
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
require File.dirname(__FILE__) + '/test_helper'
|
||||||
|
|
||||||
|
m = Mysql.real_connect('localhost','root')
|
||||||
|
m.reconnect = true
|
||||||
|
|
||||||
|
class << m
|
||||||
|
|
||||||
|
def safe_query( query )
|
||||||
|
begin
|
||||||
|
send_query( query )
|
||||||
|
rescue => e
|
||||||
|
puts e.message
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
m.safe_query( 'select sleep(1)' )
|
||||||
|
m.safe_query( 'select sleep(1)' )#raises
|
||||||
|
m.simulate_disconnect #fires mysql_library_end
|
||||||
|
m.safe_query( 'select sleep(1)' )
|
||||||
|
m.safe_query( 'select sleep(1)' )#raises
|
||||||
|
m.close
|
||||||
|
m.connect('localhost','root')
|
||||||
|
m.safe_query( 'select sleep(1)' )
|
||||||
|
m.safe_query( 'select sleep(1)' )#raises
|
Loading…
Reference in New Issue
Block a user