diff --git a/ext/mysql2_ext.c b/ext/mysql2_ext.c index ab49067..3c95092 100644 --- a/ext/mysql2_ext.c +++ b/ext/mysql2_ext.c @@ -1,5 +1,27 @@ #include "mysql2_ext.h" +/* + * non-blocking mysql_*() functions that we won't be wrapping since + * they do not appear to hit the network nor issue any interruptible + * or blocking system calls. + * + * - mysql_affected_rows() + * - mysql_error() + * - mysql_fetch_fields() + * - mysql_fetch_lengths() - calls cli_fetch_lengths or emb_fetch_lengths + * - mysql_field_count() + * - mysql_get_client_info() + * - mysql_get_client_version() + * - mysql_get_server_info() + * - mysql_get_server_version() + * - mysql_insert_id() + * - mysql_num_fields() + * - mysql_num_rows() + * - mysql_options() + * - mysql_real_escape_string() + * - mysql_ssl_set() + */ + static VALUE nogvl_connect(void *ptr) { struct nogvl_connect_args *args = ptr; @@ -106,6 +128,11 @@ static VALUE rb_mysql_client_new(int argc, VALUE * argv, VALUE klass) { } } + /* + * FIXME: mysql_init may initialize the embedded server and read + * /etc/services off disk (always blocking), so in those unlikely + * cases we should make this release the GVL + */ if (!mysql_init(args.mysql)) { // TODO: warning - not enough memory? rb_raise(cMysql2Error, "%s", mysql_error(args.mysql)); @@ -151,37 +178,58 @@ static VALUE rb_mysql_client_init(int argc, VALUE * argv, VALUE self) { void rb_mysql_client_free(void * client) { MYSQL * c = client; if (c) { + /* + * FIXME: this may send a "QUIT" message to the server and thus block + * on the socket write) + */ mysql_close(client); } } +/* + * mysql_send_query is unlikely to block since most queries are small + * enough to fit in a socket buffer, but sometimes large UPDATE and + * INSERTs will cause the process to block + */ +static VALUE nogvl_send_query(void *ptr) +{ + struct nogvl_send_query_args *args = ptr; + int rv; + const char *sql = RSTRING_PTR(args->sql); + long sql_len = RSTRING_LEN(args->sql); + + rv = mysql_send_query(args->mysql, sql, sql_len); + + return rv == 0 ? Qtrue : Qfalse; +} + static VALUE rb_mysql_client_query(int argc, VALUE * argv, VALUE self) { - MYSQL * client; + struct nogvl_send_query_args args; MYSQL_RES * result; fd_set fdset; int fd, retval; int async = 0; - VALUE sql, opts; + VALUE opts; VALUE rb_async; - if (rb_scan_args(argc, argv, "11", &sql, &opts) == 2) { + if (rb_scan_args(argc, argv, "11", &args.sql, &opts) == 2) { if ((rb_async = rb_hash_aref(opts, sym_async)) != Qnil) { async = rb_async == Qtrue ? 1 : 0; } } - Check_Type(sql, T_STRING); + Check_Type(args.sql, T_STRING); - GetMysql2Client(self, client); - if (mysql_send_query(client, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0) { - rb_raise(cMysql2Error, "%s", mysql_error(client)); + GetMysql2Client(self, args.mysql); + if (rb_thread_blocking_region(nogvl_send_query, &args, RUBY_UBF_IO, 0) == Qfalse) { + rb_raise(cMysql2Error, "%s", mysql_error(args.mysql)); return Qnil; } if (!async) { // the below code is largely from do_mysql // http://github.com/datamapper/do - fd = client->net.fd; + fd = args.mysql->net.fd; for(;;) { FD_ZERO(&fdset); FD_SET(fd, &fdset); @@ -250,17 +298,37 @@ static VALUE rb_mysql_client_socket(VALUE self) { return INT2NUM(client->net.fd); } +/* + * even though we did rb_thread_select before calling this, a large + * response can overflow the socket buffers and cause us to eventually + * block while calling mysql_read_query_result + */ +static VALUE nogvl_read_query_result(void *ptr) +{ + MYSQL * client = ptr; + my_bool res = mysql_read_query_result(client); + + return res == 0 ? Qtrue : Qfalse; +} + +/* mysql_store_result may (unlikely) read rows off the socket */ +static VALUE nogvl_store_result(void *ptr) +{ + MYSQL * client = ptr; + return (VALUE)mysql_store_result(client); +} + static VALUE rb_mysql_client_async_result(VALUE self) { MYSQL * client; MYSQL_RES * result; GetMysql2Client(self, client); - if (mysql_read_query_result(client) != 0) { + if (rb_thread_blocking_region(nogvl_read_query_result, client, RUBY_UBF_IO, 0) == Qfalse) { rb_raise(cMysql2Error, "%s", mysql_error(client)); return Qnil; } - result = mysql_store_result(client); + result = (MYSQL_RES *)rb_thread_blocking_region(nogvl_store_result, client, RUBY_UBF_IO, 0); if (result == NULL) { if (mysql_field_count(client) != 0) { rb_raise(cMysql2Error, "%s", mysql_error(client)); @@ -302,6 +370,7 @@ static VALUE rb_mysql_result_to_obj(MYSQL_RES * r) { void rb_mysql_result_free(void * wrapper) { mysql2_result_wrapper * w = wrapper; if (w && w->resultFreed != 1) { + /* FIXME: this may call flush_use_result, which can hit the socket */ mysql_free_result(w->result); w->resultFreed = 1; } @@ -315,6 +384,18 @@ void rb_mysql_result_mark(void * wrapper) { } } +/* + * for small results, this won't hit the network, but there's no + * reliable way for us to tell this so we'll always release the GVL + * to be safe + */ +static VALUE nogvl_fetch_row(void *ptr) +{ + MYSQL_RES *result = ptr; + + return (VALUE)mysql_fetch_row(result); +} + static VALUE rb_mysql_result_fetch_row(int argc, VALUE * argv, VALUE self) { VALUE rowHash, opts, block; mysql2_result_wrapper * wrapper; @@ -322,6 +403,7 @@ static VALUE rb_mysql_result_fetch_row(int argc, VALUE * argv, VALUE self) { MYSQL_FIELD * fields = NULL; unsigned int i = 0, symbolizeKeys = 0; unsigned long * fieldLengths; + void * ptr; GetMysql2Result(self, wrapper); @@ -332,7 +414,8 @@ static VALUE rb_mysql_result_fetch_row(int argc, VALUE * argv, VALUE self) { } } - row = mysql_fetch_row(wrapper->result); + ptr = wrapper->result; + row = (MYSQL_ROW)rb_thread_blocking_region(nogvl_fetch_row, ptr, RUBY_UBF_IO, 0); if (row == NULL) { return Qnil; } diff --git a/ext/mysql2_ext.h b/ext/mysql2_ext.h index e5ca91f..f117181 100644 --- a/ext/mysql2_ext.h +++ b/ext/mysql2_ext.h @@ -74,6 +74,15 @@ struct nogvl_connect_args { unsigned long client_flag; }; +/* + * used to pass all arguments to mysql_send_query while inside + * rb_thread_blocking_region + */ +struct nogvl_send_query_args { + MYSQL *mysql; + VALUE sql; +}; + /* * partial emulation of the 1.9 rb_thread_blocking_region under 1.8, * this is enough for dealing with blocking I/O functions in the