wrap mysql_real_connect with rb_thread_blocking_region

mysql_real_connect() is a blocking function that issues
blocking-but-interruptible connect(), read(), and write() system
calls.  So we'll allow other threads in the VM to run while
calling it since it can block indefinitely.

This introduces a rb_thread_blocking_region() wrapper for 1.8
which ensures any received signals can be handled gracefully
while inside blocking function calls.
This commit is contained in:
Eric Wong 2010-05-05 11:57:37 -07:00
parent 33e0b9ea51
commit fa213c9892
3 changed files with 85 additions and 23 deletions

View File

@ -1,6 +1,9 @@
# encoding: UTF-8 # encoding: UTF-8
require 'mkmf' require 'mkmf'
# 1.9-only
have_func('rb_thread_blocking_region')
# borrowed from mysqlplus # borrowed from mysqlplus
# http://github.com/oldmoe/mysqlplus/blob/master/ext/extconf.rb # http://github.com/oldmoe/mysqlplus/blob/master/ext/extconf.rb
dirs = ENV['PATH'].split(':') + %w[ dirs = ENV['PATH'].split(':') + %w[

View File

@ -1,54 +1,73 @@
#include "mysql2_ext.h" #include "mysql2_ext.h"
static VALUE nogvl_connect(void *ptr)
{
struct nogvl_connect_args *args = ptr;
MYSQL *client;
client = mysql_real_connect(args->mysql, args->host,
args->user, args->passwd,
args->db, args->port, args->unix_socket,
args->client_flag);
return client ? Qtrue : Qfalse;
}
/* Mysql2::Client */ /* Mysql2::Client */
static VALUE rb_mysql_client_new(int argc, VALUE * argv, VALUE klass) { static VALUE rb_mysql_client_new(int argc, VALUE * argv, VALUE klass) {
MYSQL * client; struct nogvl_connect_args args = {
.host = "localhost",
.user = NULL,
.passwd = NULL,
.db = NULL,
.port = 3306,
.unix_socket = NULL,
.client_flag = 0
};
VALUE obj, opts; VALUE obj, opts;
VALUE rb_host, rb_socket, rb_port, rb_database, VALUE rb_host, rb_socket, rb_port, rb_database,
rb_username, rb_password, rb_reconnect, rb_username, rb_password, rb_reconnect,
rb_connect_timeout; rb_connect_timeout;
VALUE rb_ssl_client_key, rb_ssl_client_cert, rb_ssl_ca_cert, VALUE rb_ssl_client_key, rb_ssl_client_cert, rb_ssl_ca_cert,
rb_ssl_ca_path, rb_ssl_cipher; rb_ssl_ca_path, rb_ssl_cipher;
char *host = "localhost", *socket = NULL, *username = NULL,
*password = NULL, *database = NULL;
char *ssl_client_key = NULL, *ssl_client_cert = NULL, *ssl_ca_cert = NULL, char *ssl_client_key = NULL, *ssl_client_cert = NULL, *ssl_ca_cert = NULL,
*ssl_ca_path = NULL, *ssl_cipher = NULL; *ssl_ca_path = NULL, *ssl_cipher = NULL;
unsigned int port = 3306, connect_timeout = 0; unsigned int connect_timeout = 0;
my_bool reconnect = 1; my_bool reconnect = 1;
obj = Data_Make_Struct(klass, MYSQL, NULL, rb_mysql_client_free, client); obj = Data_Make_Struct(klass, MYSQL, NULL, rb_mysql_client_free, args.mysql);
if (rb_scan_args(argc, argv, "01", &opts) == 1) { if (rb_scan_args(argc, argv, "01", &opts) == 1) {
Check_Type(opts, T_HASH); Check_Type(opts, T_HASH);
if ((rb_host = rb_hash_aref(opts, sym_host)) != Qnil) { if ((rb_host = rb_hash_aref(opts, sym_host)) != Qnil) {
Check_Type(rb_host, T_STRING); Check_Type(rb_host, T_STRING);
host = RSTRING_PTR(rb_host); args.host = RSTRING_PTR(rb_host);
} }
if ((rb_socket = rb_hash_aref(opts, sym_socket)) != Qnil) { if ((rb_socket = rb_hash_aref(opts, sym_socket)) != Qnil) {
Check_Type(rb_socket, T_STRING); Check_Type(rb_socket, T_STRING);
socket = RSTRING_PTR(rb_socket); args.unix_socket = RSTRING_PTR(rb_socket);
} }
if ((rb_port = rb_hash_aref(opts, sym_port)) != Qnil) { if ((rb_port = rb_hash_aref(opts, sym_port)) != Qnil) {
Check_Type(rb_port, T_FIXNUM); Check_Type(rb_port, T_FIXNUM);
port = FIX2INT(rb_port); args.port = FIX2INT(rb_port);
} }
if ((rb_username = rb_hash_aref(opts, sym_username)) != Qnil) { if ((rb_username = rb_hash_aref(opts, sym_username)) != Qnil) {
Check_Type(rb_username, T_STRING); Check_Type(rb_username, T_STRING);
username = RSTRING_PTR(rb_username); args.user = RSTRING_PTR(rb_username);
} }
if ((rb_password = rb_hash_aref(opts, sym_password)) != Qnil) { if ((rb_password = rb_hash_aref(opts, sym_password)) != Qnil) {
Check_Type(rb_password, T_STRING); Check_Type(rb_password, T_STRING);
password = RSTRING_PTR(rb_password); args.passwd = RSTRING_PTR(rb_password);
} }
if ((rb_database = rb_hash_aref(opts, sym_database)) != Qnil) { if ((rb_database = rb_hash_aref(opts, sym_database)) != Qnil) {
Check_Type(rb_database, T_STRING); Check_Type(rb_database, T_STRING);
database = RSTRING_PTR(rb_database); args.db = RSTRING_PTR(rb_database);
} }
if ((rb_reconnect = rb_hash_aref(opts, sym_reconnect)) != Qnil) { if ((rb_reconnect = rb_hash_aref(opts, sym_reconnect)) != Qnil) {
@ -87,37 +106,37 @@ static VALUE rb_mysql_client_new(int argc, VALUE * argv, VALUE klass) {
} }
} }
if (!mysql_init(client)) { if (!mysql_init(args.mysql)) {
// TODO: warning - not enough memory? // TODO: warning - not enough memory?
rb_raise(cMysql2Error, "%s", mysql_error(client)); rb_raise(cMysql2Error, "%s", mysql_error(args.mysql));
return Qnil; return Qnil;
} }
// set default reconnect behavior // set default reconnect behavior
if (mysql_options(client, MYSQL_OPT_RECONNECT, &reconnect) != 0) { if (mysql_options(args.mysql, MYSQL_OPT_RECONNECT, &reconnect) != 0) {
// TODO: warning - unable to set reconnect behavior // TODO: warning - unable to set reconnect behavior
rb_warn("%s\n", mysql_error(client)); rb_warn("%s\n", mysql_error(args.mysql));
} }
// set default connection timeout behavior // set default connection timeout behavior
if (connect_timeout != 0 && mysql_options(client, MYSQL_OPT_CONNECT_TIMEOUT, &connect_timeout) != 0) { if (connect_timeout != 0 && mysql_options(args.mysql, MYSQL_OPT_CONNECT_TIMEOUT, &connect_timeout) != 0) {
// TODO: warning - unable to set connection timeout // TODO: warning - unable to set connection timeout
rb_warn("%s\n", mysql_error(client)); rb_warn("%s\n", mysql_error(args.mysql));
} }
// force the encoding to utf8 // force the encoding to utf8
if (mysql_options(client, MYSQL_SET_CHARSET_NAME, "utf8") != 0) { if (mysql_options(args.mysql, MYSQL_SET_CHARSET_NAME, "utf8") != 0) {
// TODO: warning - unable to set charset // TODO: warning - unable to set charset
rb_warn("%s\n", mysql_error(client)); rb_warn("%s\n", mysql_error(args.mysql));
} }
if (ssl_ca_cert != NULL || ssl_client_key != NULL) { if (ssl_ca_cert != NULL || ssl_client_key != NULL) {
mysql_ssl_set(client, ssl_client_key, ssl_client_cert, ssl_ca_cert, ssl_ca_path, ssl_cipher); mysql_ssl_set(args.mysql, ssl_client_key, ssl_client_cert, ssl_ca_cert, ssl_ca_path, ssl_cipher);
} }
if (mysql_real_connect(client, host, username, password, database, port, socket, 0) == NULL) { if (rb_thread_blocking_region(nogvl_connect, &args, RUBY_UBF_IO, 0) == Qfalse) {
// unable to connect // unable to connect
rb_raise(cMysql2Error, "%s", mysql_error(client)); rb_raise(cMysql2Error, "%s", mysql_error(args.mysql));
return Qnil; return Qnil;
} }

View File

@ -58,3 +58,43 @@ static VALUE rb_mysql_result_fetch_row(int argc, VALUE * argv, VALUE self);
static VALUE rb_mysql_result_each(int argc, VALUE * argv, VALUE self); static VALUE rb_mysql_result_each(int argc, VALUE * argv, VALUE self);
void rb_mysql_result_free(void * wrapper); void rb_mysql_result_free(void * wrapper);
void rb_mysql_result_mark(void * wrapper); void rb_mysql_result_mark(void * wrapper);
/*
* used to pass all arguments to mysql_real_connect while inside
* rb_thread_blocking_region
*/
struct nogvl_connect_args {
MYSQL *mysql;
const char *host;
const char *user;
const char *passwd;
const char *db;
unsigned int port;
const char *unix_socket;
unsigned long client_flag;
};
/*
* partial emulation of the 1.9 rb_thread_blocking_region under 1.8,
* this is enough for dealing with blocking I/O functions in the
* presence of threads.
*/
#ifndef HAVE_RB_THREAD_BLOCKING_REGION
# include <rubysig.h>
# define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
typedef void rb_unblock_function_t(void *);
typedef VALUE rb_blocking_function_t(void *);
static VALUE
rb_thread_blocking_region(
rb_blocking_function_t *func, void *data1,
rb_unblock_function_t *ubf, void *data2)
{
VALUE rv;
TRAP_BEG;
rv = func(data1);
TRAP_END;
return rv;
}
#endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */