Merge branch 'master' into 19_full_thread_no_gil

This commit is contained in:
Roger Pack 2008-09-08 12:17:20 -06:00
commit 16d9c043c2
13 changed files with 599 additions and 81 deletions

3
README
View File

@ -8,7 +8,7 @@ An enhanced MySQL database driver. With support for async operations and threade
on OSX with the default mysql installed: on OSX with the default mysql installed:
sudo gem install mysqlplus-0.1.0.gem -- --with-mysql-config=/usr/local/mysql/bin/mysql_config sudo gem install mysqlplus-0.1.0.gem -- --with-mysql-config
on OSX, with mysql installed by macports: on OSX, with mysql installed by macports:
@ -26,6 +26,7 @@ An enhanced MySQL database driver. With support for async operations and threade
Aman Gupta, for help in threading support and improved tests Aman Gupta, for help in threading support and improved tests
Tomita Masahiro--since this is a fork of his already excellent mysql lib [http://www.tmtm.org/en/mysql/ruby]. Tomita Masahiro--since this is a fork of his already excellent mysql lib [http://www.tmtm.org/en/mysql/ruby].
Roger Pack, for helping in the file descriptor hunt :) Roger Pack, for helping in the file descriptor hunt :)
Lourens Naude for 1.9 integration help.
=== License === License
Ruby License, http://www.ruby-lang.org/en/LICENSE.txt. Ruby License, http://www.ruby-lang.org/en/LICENSE.txt.

52
Rakefile Normal file
View File

@ -0,0 +1,52 @@
require 'rake'
require 'rake/testtask'
desc 'Default: build and install.'
task :default => :build
desc 'Run performance tests.'
Rake::TestTask.new(:test) do |t|
t.libs = [] #reference the installed gem instead
t.pattern = 'test/*_test.rb'
t.verbose = true
end
task :build do |t|
configure
install
end
def configure
puts "** building gem"
puts %x{gem build mysqlplus.gemspec}
end
def install
puts "** installing gem"
_mysql_config = mysql_config
puts "** using mysql_config: #{_mysql_config}"
puts %x{sudo gem install mysqlplus-#{version}.gem -- --with-mysql-config=#{_mysql_config}}
end
def gem_spec
@gem_spec ||= eval( IO.read( 'mysqlplus.gemspec') )
end
def version
gem_spec.version.to_s
end
def mysql_configs
%w(mysql_config mysql_config5)
end
def mysql_config
mysql_configs.each do |config|
path = mysql_config!( config )
return path unless path.empty?
end
end
def mysql_config!( config )
%x{which #{config}}
end

View File

@ -31,6 +31,10 @@ else
exit 1 exit 1
end end
if have_func('rb_thread_blocking_region') and have_macro('RB_UBF_DFL', 'ruby.h')
flags << "-DHAVE_TBR"
end
# make mysql constant # make mysql constant
File.open("conftest.c", "w") do |f| File.open("conftest.c", "w") do |f|
f.puts src f.puts src

View File

@ -60,6 +60,7 @@ struct mysql {
MYSQL handler; MYSQL handler;
char connection; char connection;
char query_with_result; char query_with_result;
char blocking;
}; };
struct mysql_res { struct mysql_res {
@ -268,6 +269,11 @@ static VALUE real_connect(int argc, VALUE* argv, VALUE klass)
myp->handler.reconnect = 0; myp->handler.reconnect = 0;
myp->connection = Qtrue; myp->connection = Qtrue;
my_bool was_blocking;
vio_blocking(myp->handler.net.vio, 0, &was_blocking);
myp->blocking = vio_is_blocking( myp->handler.net.vio );
myp->query_with_result = Qtrue; myp->query_with_result = Qtrue;
rb_obj_call_init(obj, argc, argv); rb_obj_call_init(obj, argc, argv);
@ -779,13 +785,35 @@ static VALUE query(VALUE obj, VALUE sql)
static VALUE socket(VALUE obj) static VALUE socket(VALUE obj)
{ {
MYSQL* m = GetHandler(obj); MYSQL* m = GetHandler(obj);
return INT2NUM(vio_fd(m->net.vio)); return INT2NUM(m->net.fd);
} }
/* send_query */ /* blocking */
static VALUE blocking(VALUE obj){
return ( GetMysqlStruct(obj)->blocking ? Qtrue : Qfalse );
}
/* readable(timeout=nil) */
static VALUE readable( int argc, VALUE* argv, VALUE obj )
{
MYSQL* m = GetHandler(obj);
VALUE timeout;
rb_scan_args(argc, argv, "01", &timeout);
if ( NIL_P( timeout ) ){
timeout = m->net.read_timeout;
}
return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse );
}
/* send_query(sql) */
static VALUE send_query(VALUE obj, VALUE sql) static VALUE send_query(VALUE obj, VALUE sql)
{ {
MYSQL* m = GetHandler(obj); MYSQL* m = GetHandler(obj);
Check_Type(sql, T_STRING); Check_Type(sql, T_STRING);
if (GetMysqlStruct(obj)->connection == Qfalse) { if (GetMysqlStruct(obj)->connection == Qfalse) {
rb_raise(eMysql, "query: not connected"); rb_raise(eMysql, "query: not connected");
@ -811,17 +839,46 @@ static VALUE get_result(VALUE obj)
return store_result(obj); return store_result(obj);
} }
/* async_query */ /* async_query(sql,timeout=nil) */
/* static VALUE async_query(int argc, VALUE* argv, VALUE obj)
comment it out until I figure out how it works
static VALUE async_query(VALUE obj, VALUE sql)
{ {
MYSQL* m = GetHandler(obj);
VALUE sql, timeout;
fd_set read;
int ret;
rb_scan_args(argc, argv, "11", &sql, &timeout);
send_query(obj,sql); send_query(obj,sql);
rb_io_wait_readable(socket(obj));
if (NIL_P(timeout)) {
timeout = m->net.read_timeout;
}
VALUE args[1];
args[0] = timeout;
struct timeval tv = { tv_sec: timeout, tv_usec: 0 };
for(;;) {
FD_ZERO(&read);
FD_SET(m->net.fd, &read);
ret = rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv);
if (ret < 0) {
rb_sys_fail(0);
}
if (ret == 0) {
continue;
}
if (readable(1, (VALUE *)args, obj) == Qtrue) {
break;
}
}
return get_result(obj); return get_result(obj);
} }
*/
#if MYSQL_VERSION_ID >= 40100 #if MYSQL_VERSION_ID >= 40100
/* server_version() */ /* server_version() */
@ -1090,6 +1147,80 @@ static VALUE fetch_row(VALUE obj)
return ary; return ary;
} }
/* process_all_hashes (internal) */
static VALUE process_all_hashes(VALUE obj, VALUE with_table, int build_array, int yield)
{
MYSQL_RES* res = GetMysqlRes(obj);
unsigned int n = mysql_num_fields(res);
VALUE ary;
if(build_array)
ary = rb_ary_new();
MYSQL_ROW row = mysql_fetch_row(res); // grab one off the top, to determine the rows
if (row == NULL){
if(build_array){
return ary;
}else{
return Qnil;
}
}
MYSQL_FIELD* fields = mysql_fetch_fields(res);
unsigned int i;
VALUE hash;
VALUE colname;
if (with_table == Qfalse) {
colname = rb_iv_get(obj, "colname");
if (colname == Qnil) {
colname = rb_ary_new2(n);
for (i=0; i<n; i++) {
VALUE s = rb_tainted_str_new2(fields[i].name);
rb_obj_freeze(s);
rb_ary_store(colname, i, s);
}
rb_obj_freeze(colname);
rb_iv_set(obj, "colname", colname);
}
} else {
colname = rb_iv_get(obj, "tblcolname");
if (colname == Qnil) {
colname = rb_ary_new2(n);
for (i=0; i<n; i++) {
int len = strlen(fields[i].table)+strlen(fields[i].name)+1;
VALUE s = rb_tainted_str_new(NULL, len);
snprintf(RSTRING_PTR(s), len+1, "%s.%s", fields[i].table, fields[i].name);
rb_obj_freeze(s);
rb_ary_store(colname, i, s);
}
rb_obj_freeze(colname);
rb_iv_set(obj, "tblcolname", colname);
}
}
unsigned long* lengths = NULL;
while(row != NULL)
{
hash = rb_hash_new();
lengths = mysql_fetch_lengths(res);
for (i=0; i<n; i++) {
rb_hash_aset(hash, rb_ary_entry(colname, i), row[i]? rb_tainted_str_new(row[i], lengths[i]): Qnil);
}
if(build_array)
rb_ary_push(ary, hash);
if(yield)
rb_yield(hash);
row = mysql_fetch_row(res);
}
/* pass back apropriate return values */
if(build_array)
return ary;
if(yield)
return obj;
}
/* fetch_hash2 (internal) */ /* fetch_hash2 (internal) */
static VALUE fetch_hash2(VALUE obj, VALUE with_table) static VALUE fetch_hash2(VALUE obj, VALUE with_table)
{ {
@ -1105,7 +1236,7 @@ static VALUE fetch_hash2(VALUE obj, VALUE with_table)
return Qnil; return Qnil;
hash = rb_hash_new(); hash = rb_hash_new();
if (with_table == Qnil || with_table == Qfalse) { if (with_table == Qfalse) {
colname = rb_iv_get(obj, "colname"); colname = rb_iv_get(obj, "colname");
if (colname == Qnil) { if (colname == Qnil) {
colname = rb_ary_new2(n); colname = rb_ary_new2(n);
@ -1222,16 +1353,26 @@ static VALUE each(VALUE obj)
static VALUE each_hash(int argc, VALUE* argv, VALUE obj) static VALUE each_hash(int argc, VALUE* argv, VALUE obj)
{ {
VALUE with_table; VALUE with_table;
VALUE hash;
check_free(obj); check_free(obj);
rb_scan_args(argc, argv, "01", &with_table); rb_scan_args(argc, argv, "01", &with_table);
if (with_table == Qnil) if (with_table == Qnil)
with_table = Qfalse; with_table = Qfalse;
while ((hash = fetch_hash2(obj, with_table)) != Qnil) process_all_hashes(obj, with_table, 0, 1);
rb_yield(hash);
return obj; return obj;
} }
/* all_hashes(with_table=false) -- returns an array of hashes, one hash per row */
static VALUE all_hashes(int argc, VALUE* argv, VALUE obj)
{
VALUE with_table;
check_free(obj);
rb_scan_args(argc, argv, "01", &with_table);
if (with_table == Qnil)
with_table = Qfalse;
return process_all_hashes(obj, with_table, 1, 0);
}
/*------------------------------- /*-------------------------------
* Mysql::Field object method * Mysql::Field object method
*/ */
@ -2026,9 +2167,11 @@ void Init_mysql(void)
#endif #endif
rb_define_method(cMysql, "query", query, 1); rb_define_method(cMysql, "query", query, 1);
rb_define_method(cMysql, "real_query", query, 1); rb_define_method(cMysql, "real_query", query, 1);
/*rb_define_method(cMysql, "async_query", async_query, 1);*/ rb_define_method(cMysql, "c_async_query", async_query, -1);
rb_define_method(cMysql, "send_query", send_query, 1); rb_define_method(cMysql, "send_query", send_query, 1);
rb_define_method(cMysql, "get_result", get_result, 0); rb_define_method(cMysql, "get_result", get_result, 0);
rb_define_method(cMysql, "readable?", readable, -1);
rb_define_method(cMysql, "blocking?", blocking, 0);
rb_define_method(cMysql, "socket", socket, 0); rb_define_method(cMysql, "socket", socket, 0);
rb_define_method(cMysql, "refresh", refresh, 1); rb_define_method(cMysql, "refresh", refresh, 1);
rb_define_method(cMysql, "reload", reload, 0); rb_define_method(cMysql, "reload", reload, 0);
@ -2181,6 +2324,7 @@ void Init_mysql(void)
rb_define_method(cMysqlRes, "row_tell", row_tell, 0); rb_define_method(cMysqlRes, "row_tell", row_tell, 0);
rb_define_method(cMysqlRes, "each", each, 0); rb_define_method(cMysqlRes, "each", each, 0);
rb_define_method(cMysqlRes, "each_hash", each_hash, -1); rb_define_method(cMysqlRes, "each_hash", each_hash, -1);
rb_define_method(cMysqlRes, "all_hashes", all_hashes, -1);
/* MysqlField object method */ /* MysqlField object method */
rb_define_method(cMysqlField, "name", field_name, 0); rb_define_method(cMysqlField, "name", field_name, 0);

View File

@ -1,9 +1,19 @@
require 'mysql' require 'mysql'
class Mysql class Mysql
def async_query(sql)
def async_query(sql, timeout = nil)
send_query(sql) send_query(sql)
select [ (@sockets ||= {})[socket] ||= IO.new(socket) ], nil, nil, nil select [ (@sockets ||= {})[socket] ||= IO.new(socket) ], nil, nil, nil
get_result get_result
end end
end
class Mysql::Result
def all_hashes
rows = []
each_hash { |row| rows << row }
rows
end
end end

View File

@ -12,12 +12,14 @@ Gem::Specification.new do |s|
s.files = [ s.files = [
"mysqlplus.gemspec", "mysqlplus.gemspec",
"README", "README",
"Rakefile",
"lib/mysqlplus.rb", "lib/mysqlplus.rb",
"test/test_threaded.rb", "test/test_helper.rb",
"test/test_evented.rb", "test/threaded_test.rb",
"test/evented_test.rb",
"ext/error_const.h",
"ext/extconf.rb", "ext/extconf.rb",
"ext/mysql.c", "ext/mysql.c"
"ext/Makefile"
] ]
s.rdoc_options = ["--main", "README"] s.rdoc_options = ["--main", "README"]
s.extra_rdoc_files = ["README"] s.extra_rdoc_files = ["README"]

25
test/evented_test.rb Normal file
View File

@ -0,0 +1,25 @@
require File.dirname(__FILE__) + '/test_helper'
EventedMysqlTest.new( 10, "Evented, small overhead" ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.per_query_overhead = 0.1
test.run!
end
EventedMysqlTest.new( 10, "Evented, medium overhead" ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.per_query_overhead = 1
test.run!
end
EventedMysqlTest.new( 10, "Evented, large overhead" ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.per_query_overhead = 3
test.run!
end
EventedMysqlTest.new( 10, "Evented, random overhead" ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.per_query_overhead = :random
test.run!
end

42
test/test_all_hashes.rb Normal file
View File

@ -0,0 +1,42 @@
# shows the effect of using .all_hashes instead of looping on each hash
# run it by substiting in a 'long' [many row] query for the query variable and toggling use_all_hashes here at the top
# note that we load all the rows first, then run .all_hashes on the result [to see more easily the effect of all hashes]
# on my machine and a 200_000 row table, it took 3.38s versus 3.65s
require 'mysqlplus'
use_the_all_hashes_method = true
$count = 5
$start = Time.now
$connections = []
$count.times do
$connections << Mysql.real_connect('localhost','root', '', 'local_leadgen_dev')
end
puts 'connection pool ready'
$threads = []
$count.times do |i|
$threads << Thread.new do
query = "select * from campus_zips"
puts "sending query on connection #{i}"
conn = $connections[i]
result = conn.async_query(query)
if use_the_all_hashes_method
saved = result.all_hashes
else
saved = []
result.each_hash {|h| saved << h }
end
result.free
end
end
puts 'waiting on threads'
$threads.each{|t| t.join }
puts Time.now - $start

View File

@ -1,31 +0,0 @@
require 'mysqlplus'
@count = 10
@connections = {}
@count.times do
c = Mysql.real_connect('localhost','root',nil)
@connections[IO.new(c.socket)] = c
end
@sockets = @connections.keys
@done = 0
@t = Time.now
@connections.each_value do |c|
c.send_query('select sleep(1)')
end
loop do
res = select(@sockets,nil,nil,nil)
if res
res.first.each do |c|
@connections[c].get_result.each{|r| p r}
@done = @done + 1
if @done == @count
puts Time.now - @t
exit
end
end
end
end

198
test/test_helper.rb Normal file
View File

@ -0,0 +1,198 @@
require 'rubygems'
require 'mysqlplus'
class MysqlTest
class NotImplemented < StandardError
end
attr_accessor :queries,
:context,
:connections,
:connection_signature,
:start,
:done,
:c_async_query,
:per_query_overhead
def initialize( queries, context = '' )
@queries = queries
@context = context
@done = []
@c_async_query = false
@per_query_overhead = 3
yield self if block_given?
end
def setup( &block )
@start = Time.now
@connection_signature = block
end
def run!
c_or_native_ruby_async_query do
present_context if context?
prepare
yield
end
end
def per_query_overhead=( overhead )
@per_query_overhead = ( overhead == :random ) ? rand() : overhead
end
protected
def prepare
raise NotImplemented
end
def teardown
raise NotImplemented
end
def log( message, prefix = '' )
puts "[#{timestamp}] #{prefix} #{message}"
end
def with_logging( message )
log( message, 'Start' )
yield
log( message, 'End' )
end
def timestamp
Time.now - @start
end
def context?
@context != ''
end
def present_context
log "#############################################"
log "# #{@context}"
log "#############################################"
end
def c_or_native_ruby_async_query
if @c_async_query
log "** using C based async_query"
else
log "** using native Ruby async_query"
end
yield
end
def c_or_native_async_query( connection, sql, timeout = nil )
method = @c_async_query ? :c_async_query : :async_query
connection.send( method, sql, timeout )
end
end
class EventedMysqlTest < MysqlTest
attr_accessor :sockets
def initialize( queries, context = '' )
@sockets = []
@connections = {}
super( queries, context )
end
def setup( &block )
super( &block )
with_logging 'Setup connection pool' do
@queries.times do
connection = @connection_signature.call
@connections[ IO.new(connection.socket) ] = connection
@sockets = @connections.keys
end
end
end
def run!
super do
catch :END_EVENT_LOOP do
loop do
result = select( @sockets,nil,nil,nil )
if result
result.first.each do |conn|
@connections[conn].get_result.each{|res| log( "Result for socket #{conn.fileno} : #{res}" ) }
@done << nil
if done?
teardown
end
end
end
end
end
end
end
protected
def prepare
@connections.each_value do |conn|
conn.send_query( "select sleep(#{@per_query_overhead})" )
end
end
def teardown
log "done"
throw :END_EVENT_LOOP
end
def done?
@done.size == @queries
end
end
class ThreadedMysqlTest < MysqlTest
attr_accessor :threads
def initialize( queries, context = '' )
@connections = []
@threads = []
super( queries, context )
end
def setup( &block )
super( &block )
with_logging "Setup connection pool" do
@queries.times do
@connections << @connection_signature.call
end
end
end
def run!
super do
with_logging "waiting on threads" do
@threads.each{|t| t.join }
end
end
end
protected
def prepare
with_logging "prepare" do
@queries.times do |conn|
@threads << Thread.new do
log "sending query on connection #{conn}"
c_or_native_async_query( @connections[conn], "select sleep(#{@per_query_overhead})" ).each do |result|
log "connection #{conn} done"
end
end
end
end
end
end

View File

@ -0,0 +1,48 @@
# This is an example of using Mysql::ResultSet#use_result [see docs for what that does]
# this function is useful for those who have large query results and want to be able to parse them
# as they come in, instead of having to wait for the query to finish before doing parsing
# for me, running this on a query with 200_000 lines decreases total time to create an array of results
# from .82s to .62s
# you can experiment with it by changing the query here to be a long one, and toggling the do_the_use_query_optimization variable
# this also has the interesting property of 'freeing' Ruby to do thread changes mid-query.
require 'mysqlplus'
do_the_use_query_optimization = true
$count = 5
$start = Time.now
$connections = []
$count.times do
$connections << Mysql.real_connect('localhost','root', '', 'local_leadgen_dev')
end
puts 'connection pool ready'
$threads = []
$count.times do |i|
$threads << Thread.new do
puts "sending query on connection #{i}"
conn = $connections[i]
saved = []
query = "select * from campus_zips"
if do_the_use_query_optimization
conn.query_with_result=false
result = conn.async_query(query)
res = result.use_result
res.each_hash { |h| saved << h }
res.free
else
conn.async_query(query).each_hash {|h| saved << h }
end
end
end
puts 'waiting on threads'
$threads.each{|t| t.join }
puts Time.now - $start

View File

@ -1,30 +0,0 @@
require 'mysqlplus'
$count = 10
$start = Time.now
$connections = []
$count.times do
$connections << Mysql.real_connect('localhost','root')
end
puts 'connection pool ready'
$threads = []
$count.times do |i|
$threads << Thread.new do
puts "sending query on connection #{i}"
$connections[i].async_query("select sleep(3)").each{ |r|
puts "connection #{i} done"
}
end
end
puts 'waiting on threads'
$threads.each{|t| t.join }
puts Time.now - $start

53
test/threaded_test.rb Normal file
View File

@ -0,0 +1,53 @@
require File.dirname(__FILE__) + '/test_helper'
ThreadedMysqlTest.new( 10, "Threaded, native Ruby, small overhead" ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.per_query_overhead = 0.1
test.run!
end
ThreadedMysqlTest.new( 10, "Threaded, native Ruby, medium overhead" ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.per_query_overhead = 1
test.run!
end
ThreadedMysqlTest.new( 10, "Threaded, native Ruby, large overhead" ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.per_query_overhead = 3
test.run!
end
ThreadedMysqlTest.new( 10, "Threaded, native Ruby, random overhead" ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.per_query_overhead = :random
test.run!
end
ThreadedMysqlTest.new( 10, "Threaded, C, small overhead" ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.per_query_overhead = 0.1
test.c_async_query = true
test.run!
end
ThreadedMysqlTest.new( 10, "Threaded, C, medium overhead" ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.per_query_overhead = 1
test.c_async_query = true
test.run!
end
ThreadedMysqlTest.new( 10, "Threaded, C, large overhead" ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.per_query_overhead = 3
test.c_async_query = true
test.run!
end
ThreadedMysqlTest.new( 10, "Threaded, C, random overhead" ) do |test|
test.setup{ Mysql.real_connect('localhost','root') }
test.per_query_overhead = :random
test.c_async_query = true
test.run!
end