RUBY-261 Cursor#close hits secondary when appropriate.

This commit is contained in:
Kyle Banker 2011-05-10 14:21:23 -04:00
parent cb54c6b295
commit 604d0f60ed
5 changed files with 45 additions and 12 deletions

View File

@ -391,7 +391,7 @@ module Mongo
if safe
@connection.send_message_with_safe_check(Mongo::Constants::OP_UPDATE, message, @db.name, nil, safe)
else
@connection.send_message(Mongo::Constants::OP_UPDATE, message, nil)
@connection.send_message(Mongo::Constants::OP_UPDATE, message)
end
end
end
@ -874,7 +874,7 @@ module Mongo
if safe
@connection.send_message_with_safe_check(Mongo::Constants::OP_INSERT, message, @db.name, nil, safe)
else
@connection.send_message(Mongo::Constants::OP_INSERT, message, nil)
@connection.send_message(Mongo::Constants::OP_INSERT, message)
end
end
documents.collect { |o| o[:_id] || o['_id'] }

View File

@ -381,15 +381,36 @@ module Mongo
# @param [Integer] operation a MongoDB opcode.
# @param [BSON::ByteBuffer] message a message to send to the database.
#
# @option opts [Symbol] :connection (:writer) The connection to which
# this message should be sent. Valid options are :writer and :reader.
#
# @return [Integer] number of bytes sent
def send_message(operation, message, log_message=nil)
def send_message(operation, message, opts={})
if opts.is_a?(String)
warn "Connection#send_message no longer takes a string log message. " +
"Logging is now handled within the Collection and Cursor classes."
opts = {}
end
connection = opts.fetch(:connection, :writer)
begin
add_message_headers(message, operation)
packed_message = message.to_s
socket = checkout_writer
if connection == :writer
socket = checkout_writer
else
socket = checkout_reader
end
send_message_on_socket(packed_message, socket)
ensure
checkin_writer(socket)
if connection == :writer
checkin_writer(socket)
else
checkin_reader(socket)
end
end
end

View File

@ -284,7 +284,7 @@ module Mongo
message.put_int(1)
message.put_long(@cursor_id)
@logger.debug("MONGODB cursor.close #{@cursor_id}") if @logger
@connection.send_message(Mongo::Constants::OP_KILL_CURSORS, message, nil)
@connection.send_message(Mongo::Constants::OP_KILL_CURSORS, message, :connection => :reader)
end
@cursor_id = 0
@closed = true

View File

@ -31,7 +31,7 @@ class ReplicaSetQuerySecondariesTest < Test::Unit::TestCase
end
def test_query_secondaries
@coll = @db.collection("test-sets", :safe => {:w => 3, :wtimeout => 10000})
@coll = @db.collection("test-sets", :safe => {:w => 3, :wtimeout => 20000})
@coll.save({:a => 20})
@coll.save({:a => 30})
@coll.save({:a => 40})
@ -70,7 +70,7 @@ class ReplicaSetQuerySecondariesTest < Test::Unit::TestCase
end
def test_kill_secondary
@coll = @db.collection("test-sets", {:safe => {:w => 3, :wtimeout => 10000}})
@coll = @db.collection("test-sets", {:safe => {:w => 3, :wtimeout => 20000}})
@coll.save({:a => 20})
@coll.save({:a => 30})
assert_equal 2, @coll.find.to_a.length
@ -90,7 +90,19 @@ class ReplicaSetQuerySecondariesTest < Test::Unit::TestCase
assert_equal 2, length
end
new_read_pool_port = @conn.read_pool.port
assert old_read_pool != new_read_pool
assert old_read_pool_port != new_read_pool_port
end
def test_write_lots_of_data
@coll = @db.collection("test-sets", {:safe => {:w => 2}})
6000.times do |n|
@coll.save({:a => n})
end
cursor = @coll.find()
cursor.next
cursor.close
end
end

View File

@ -23,8 +23,8 @@ class ReplSetManager
@path = File.join(File.expand_path(File.dirname(__FILE__)), "data")
@arbiter_count = opts[:arbiter_count] || 2
@secondary_count = opts[:secondary_count] || 1
@passive_count = opts[:passive_count] || 1
@secondary_count = opts[:secondary_count] || 2
@passive_count = opts[:passive_count] || 0
@primary_count = 1
@count = @primary_count + @passive_count + @arbiter_count + @secondary_count
@ -41,7 +41,7 @@ class ReplSetManager
system("killall mongod")
n = 0
(@primary_count + @secondary_count).times do |n|
(@primary_count + @secondary_count).times do
init_node(n)
n += 1
end