Initial commit for reads from rs secondaries

This commit is contained in:
Kyle Banker 2010-11-16 15:43:59 -05:00
parent 43182b8aec
commit 1e57ca90e1
11 changed files with 338 additions and 172 deletions

View File

@ -40,6 +40,7 @@ require 'bson'
require 'mongo/util/conversions'
require 'mongo/util/support'
require 'mongo/util/core_ext'
require 'mongo/util/pool'
require 'mongo/util/server_version'
require 'mongo/collection'

View File

@ -245,10 +245,10 @@ module Mongo
def save(doc, opts={})
if doc.has_key?(:_id) || doc.has_key?('_id')
id = doc[:_id] || doc['_id']
update({:_id => id}, doc, :upsert => true, :safe => opts[:safe])
update({:_id => id}, doc, :upsert => true, :safe => opts.fetch(:safe, @safe))
id
else
insert(doc, :safe => opts[:safe])
insert(doc, :safe => opts.fetch(:safe, @safe))
end
end

View File

@ -38,8 +38,8 @@ module Mongo
MONGODB_URI_MATCHER = /(([-_.\w\d]+):([-_\w\d]+)@)?([-.\w\d]+)(:([\w\d]+))?(\/([-\d\w]+))?/
MONGODB_URI_SPEC = "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/database]"
attr_reader :logger, :size, :host, :port, :nodes, :auths, :sockets, :checked_out, :primary, :secondaries, :arbiters,
:safe
attr_reader :logger, :size, :nodes, :auths, :primary, :secondaries, :arbiters,
:safe, :primary_pool, :secondary_pools
# Counter for generating unique request ids.
@@current_request_id = 0
@ -69,7 +69,7 @@ module Mongo
# @option options [Boolean] :slave_ok (false) Must be set to +true+ when connecting
# to a single, slave node.
# @option options [Logger, #debug] :logger (nil) Logger instance to receive driver operation log.
# @option options [String] :name (nil) The name of the replica set to connect to. An exception will be
# @option options [String] :rs_name (nil) The name of the replica set to connect to. An exception will be
# raised if unable to connect to a replica set with this name.
# @option options [Integer] :pool_size (1) The maximum number of socket connections that can be
# opened to the database.
@ -107,7 +107,7 @@ module Mongo
@host = @port = nil
# Replica set name
@replica_set_name = options[:name]
@replica_set_name = options[:rs_name]
# Lock for request ids.
@id_lock = Mutex.new
@ -129,15 +129,8 @@ module Mongo
# Condition variable for signal and wait
@queue = ConditionVariable.new
@sockets = []
@checked_out = []
# slave_ok can be true only if one node is specified
if @nodes.length > 1 && options[:slave_ok]
raise MongoArgumentError, "Can't specify more than one node when :slave_ok is true."
else
@slave_ok = options[:slave_ok]
end
# Cache the various node types
# when connecting to a replica set.
@ -145,8 +138,16 @@ module Mongo
@secondaries = []
@arbiters = []
# Connection pool for primay node
@primary_pool = nil
# Connection pools for each secondary node
@secondary_pools = []
# Maps sockets to pools for checkin
@pool_map = {}
@logger = options[:logger] || nil
@options = options
should_connect = options.fetch(:connect, true)
connect if should_connect
@ -174,10 +175,13 @@ module Mongo
unless nodes.length > 0 && nodes.all? {|n| n.is_a? Array}
raise MongoArgumentError, "Connection.multi requires at least one node to be specified."
end
# Block returns an array, the first element being an array of nodes and the second an array
# of authorizations for the database.
new(nil, nil, opts) do |con|
nodes.map do |node|
con.instance_variable_set(:@replica_set, true)
con.instance_variable_set(:@read_secondaries, true) if opts[:read_secondaries]
con.pair_val_to_connection(node)
end
end
@ -389,12 +393,9 @@ module Mongo
#
# @return [Boolean]
def slave_ok?
@slave_ok
@read_secondaries || @slave_ok
end
## Connections and pooling ##
# Send a message to MongoDB, adding the necessary headers.
#
# @param [Integer] operation a MongoDB opcode.
@ -404,10 +405,10 @@ module Mongo
def send_message(operation, message, log_message=nil)
begin
packed_message = add_message_headers(operation, message).to_s
socket = checkout
socket = checkout_writer
send_message_on_socket(packed_message, socket)
ensure
checkin(socket)
checkin_writer(socket)
end
end
@ -427,7 +428,7 @@ module Mongo
message_with_headers = add_message_headers(operation, message)
message_with_check = last_error_message(db_name, last_error_params)
begin
sock = checkout
sock = checkout_writer
packed_message = message_with_headers.append!(message_with_check).to_s
docs = num_received = cursor_id = ''
@safe_mutexes[sock].synchronize do
@ -435,7 +436,7 @@ module Mongo
docs, num_received, cursor_id = receive(sock)
end
ensure
checkin(sock)
checkin_writer(sock)
end
if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg'])
@ -455,10 +456,10 @@ module Mongo
# @return [Array]
# An array whose indexes include [0] documents returned, [1] number of document received,
# and [3] a cursor_id.
def receive_message(operation, message, log_message=nil, socket=nil)
def receive_message(operation, message, log_message=nil, socket=nil, command=false)
packed_message = add_message_headers(operation, message).to_s
begin
sock = socket || checkout
sock = socket || (command ? checkout_writer : checkout_reader)
result = ''
@safe_mutexes[sock].synchronize do
@ -466,7 +467,7 @@ module Mongo
result = receive(sock)
end
ensure
checkin(sock)
command ? checkin_writer(sock) : checkin_reader(sock)
end
result
end
@ -480,33 +481,40 @@ module Mongo
# @raise [ConnectionFailure] if unable to connect to any host or port.
def connect
reset_connection
@nodes_to_try = @nodes.clone
while !connected? && !(nodes_to_try = @nodes - @nodes_tried).empty?
nodes_to_try.each do |node|
config = check_is_master(node)
if is_primary?(config)
set_primary(node)
else
set_auxillary(node, config)
end
while connecting?
node = @nodes_to_try.shift
config = check_is_master(node)
if is_primary?(config)
set_primary(node)
else
set_auxillary(node, config)
end
end
raise ConnectionFailure, "failed to connect to any given host:port" unless connected?
end
def connecting?
!(connected? && @nodes_to_try.empty?)
end
# It's possible that we defined connected as all nodes being connected???
# NOTE: Do check if this needs to be more stringent.
# Probably not since if any node raises a connection failure, all nodes will be closed.
def connected?
@host && @port
@primary_pool && @primary_pool.host && @primary_pool.port
end
# Close the connection to the database.
def close
@sockets.each do |sock|
sock.close
@primary_pool.close if @primary_pool
@primary_pool = nil
@secondary_pools.each do |pool|
pool.close
end
@host = @port = nil
@sockets.clear
@checked_out.clear
end
## Configuration helper methods
@ -583,18 +591,60 @@ module Mongo
nodes
end
# Checkout a socket for reading (i.e., a secondary node).
def checkout_reader
connect unless connected?
case @secondary_pools.size
when 0 then
checkout_writer
when 1 then
@secondary_pools[0].checkout
else
@secondary_pools.push(pool = @secondary_pools.shift)
@pool_map[socket = pool.checkout] = pool
socket
end
end
# Checkout a socket for writing (i.e., a primary node).
def checkout_writer
connect unless connected?
@primary_pool.checkout
end
# Checkin a socket used for reading.
def checkin_reader(socket)
case @secondary_pools.size
when 0 then
checkin_writer(socket)
when 1 then
@secondary_pools[0].checkin(socket)
else
@pool_map[socket].checkin(socket)
end
end
# Checkin a socket used for writing.
def checkin_writer(socket)
if @primary_pool
@primary_pool.checkin(socket)
end
end
private
# If a ConnectionFailure is raised, this method will be called
# to close the connection and reset connection values.
def reset_connection
close
@host = nil
@port = nil
@primary = nil
@secondaries = []
@arbiters = []
@nodes_tried = []
@secondaries = []
@secondary_pools = []
@arbiters = []
@nodes_tried = []
@nodes_to_try = []
end
# Primary is defined as either a master node or a slave if
@ -603,7 +653,7 @@ module Mongo
# If a primary node is discovered, we set the the @host and @port and
# apply any saved authentication.
def is_primary?(config)
config && (config['ismaster'] == 1 || config['ismaster'] == true) || @slave_ok
config && (config['ismaster'] == 1 || config['ismaster'] == true) || !@replica_set && @slave_ok
end
def check_is_master(node)
@ -621,8 +671,9 @@ module Mongo
@nodes_tried << node
if config
update_node_list(config['hosts']) if config['hosts']
if @logger
@logger.warn("MONGODB #{config['msg']}") if config['msg']
if config['msg'] && @logger
@logger.warn("MONGODB #{config['msg']}")
end
end
@ -650,8 +701,9 @@ module Mongo
# Set the specified node as primary, and
# apply any saved authentication credentials.
def set_primary(node)
@host, @port = *node
@primary = [@host, @port]
host, port = *node
@primary = [host, port]
@primary_pool = Pool.new(self, host, port)
apply_saved_authentication
end
@ -660,7 +712,9 @@ module Mongo
def set_auxillary(node, config)
if config
if config['secondary']
host, port = *node
@secondaries << node unless @secondaries.include?(node)
@secondary_pools << Pool.new(self, host, port) if @read_secondaries
elsif config['arbiterOnly']
@arbiters << node unless @arbiters.include?(node)
end
@ -686,73 +740,7 @@ module Mongo
[host, port.to_i]
end
@nodes |= new_nodes
end
# Return a socket to the pool.
def checkin(socket)
@connection_mutex.synchronize do
@checked_out.delete(socket)
@queue.signal
end
true
end
# Adds a new socket to the pool and checks it out.
#
# This method is called exclusively from #checkout;
# therefore, it runs within a mutex.
def checkout_new_socket
begin
socket = TCPSocket.new(@host, @port)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
rescue => ex
raise ConnectionFailure, "Failed to connect socket: #{ex}"
end
@sockets << socket
@checked_out << socket
socket
end
# Checks out the first available socket from the pool.
#
# This method is called exclusively from #checkout;
# therefore, it runs within a mutex.
def checkout_existing_socket
socket = (@sockets - @checked_out).first
@checked_out << socket
socket
end
# Check out an existing socket or create a new socket if the maximum
# pool size has not been exceeded. Otherwise, wait for the next
# available socket.
def checkout
connect if !connected?
start_time = Time.now
loop do
if (Time.now - start_time) > @timeout
raise ConnectionTimeoutError, "could not obtain connection within " +
"#{@timeout} seconds. The max pool size is currently #{@size}; " +
"consider increasing the pool size or timeout."
end
@connection_mutex.synchronize do
socket = if @checked_out.size < @sockets.size
checkout_existing_socket
elsif @sockets.size < @size
checkout_new_socket
end
return socket if socket
# Otherwise, wait
if @logger
@logger.warn "MONGODB Waiting for available connection; #{@checked_out.size} of #{@size} connections checked out."
end
@queue.wait(@connection_mutex)
end
end
@nodes_to_try = new_nodes - @nodes_tried
end
def receive(sock)
@ -842,17 +830,17 @@ module Mongo
headers = [
# Message size.
16 + message.size,
# Unique request id.
get_request_id,
# Response id.
0,
# Opcode.
operation
].pack('VVVV')
message.prepend!(headers)
end
@ -898,7 +886,7 @@ module Mongo
end
message
end
# Low-level data for receiving data from socket.
# Unlike #receive_message_on_socket, this method immediately discards the data
# and only returns the number of bytes read.
@ -921,10 +909,10 @@ module Mongo
end
bytes_read
end
if defined?(Encoding)
BINARY_ENCODING = Encoding.find("binary")
def new_binary_string
"".force_encoding(BINARY_ENCODING)
end

View File

@ -57,13 +57,19 @@ module Mongo
@full_collection_name = "#{@collection.db.name}.#{@collection.name}"
@cache = []
@returned = 0
if @collection.name =~ /^\$cmd/ || @collection.name =~ /^system/
@command = true
else
@command = false
end
end
# Get the next document specified the cursor options.
#
# @return [Hash, Nil] the next document or Nil if no documents remain.
def next_document
refresh if @cache.length == 0#empty?# num_remaining == 0
refresh if @cache.length == 0
doc = @cache.shift
if doc && doc['$err']
@ -352,8 +358,8 @@ module Mongo
# Cursor id.
message.put_long(@cursor_id)
@logger.debug("MONGODB cursor.refresh() for cursor #{@cursor_id}") if @logger
results, @n_received, @cursor_id = @connection.receive_message(Mongo::Constants::OP_GET_MORE,
message, nil, @socket)
results, @n_received, @cursor_id = @connection.receive_message(
Mongo::Constants::OP_GET_MORE, message, nil, @socket, @command)
@returned += @n_received
@cache += results
close_cursor_if_query_complete
@ -366,7 +372,8 @@ module Mongo
else
message = construct_query_message
@logger.debug query_log_message if @logger
results, @n_received, @cursor_id = @connection.receive_message(Mongo::Constants::OP_QUERY, message, nil, @socket)
results, @n_received, @cursor_id = @connection.receive_message(
Mongo::Constants::OP_QUERY, message, nil, @socket, @command)
@returned += @n_received
@cache += results
@query_run = true

127
lib/mongo/util/pool.rb Normal file
View File

@ -0,0 +1,127 @@
# encoding: UTF-8
# --
# Copyright (C) 2008-2010 10gen Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
class Pool
attr_accessor :host, :port, :size, :timeout, :safe, :checked_out
# Create a new pool of connections.
#
def initialize(connection, host, port, options={})
@connection = connection
@host, @port = host, port
# Pool size and timeout.
@size = options[:pool_size] || 1
@timeout = options[:timeout] || 5.0
# Mutex for synchronizing pool access
@connection_mutex = Mutex.new
# Global safe option. This is false by default.
@safe = options[:safe] || false
# Create a mutex when a new key, in this case a socket,
# is added to the hash.
@safe_mutexes = Hash.new { |h, k| h[k] = Mutex.new }
# Condition variable for signal and wait
@queue = ConditionVariable.new
@sockets = []
@checked_out = []
end
def close
@sockets.each do |sock|
sock.close
end
@host = @port = nil
@sockets.clear
@checked_out.clear
end
# Return a socket to the pool.
def checkin(socket)
@connection_mutex.synchronize do
@checked_out.delete(socket)
@queue.signal
end
true
end
# Adds a new socket to the pool and checks it out.
#
# This method is called exclusively from #checkout;
# therefore, it runs within a mutex.
def checkout_new_socket
begin
socket = TCPSocket.new(@host, @port)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
rescue => ex
raise ConnectionFailure, "Failed to connect socket: #{ex}"
end
@sockets << socket
@checked_out << socket
socket
end
# Checks out the first available socket from the pool.
#
# This method is called exclusively from #checkout;
# therefore, it runs within a mutex.
def checkout_existing_socket
socket = (@sockets - @checked_out).first
@checked_out << socket
socket
end
# Check out an existing socket or create a new socket if the maximum
# pool size has not been exceeded. Otherwise, wait for the next
# available socket.
def checkout
@connection.connect if !@connection.connected?
start_time = Time.now
loop do
if (Time.now - start_time) > @timeout
raise ConnectionTimeoutError, "could not obtain connection within " +
"#{@timeout} seconds. The max pool size is currently #{@size}; " +
"consider increasing the pool size or timeout."
end
@connection_mutex.synchronize do
socket = if @checked_out.size < @sockets.size
checkout_existing_socket
elsif @sockets.size < @size
checkout_new_socket
end
return socket if socket
# Otherwise, wait
if @logger
@logger.warn "MONGODB Waiting for available connection; " +
"#{@checked_out.size} of #{@size} connections checked out."
end
@queue.wait(@connection_mutex)
end
end
end
end
end

View File

@ -24,8 +24,8 @@ class TestConnection < Test::Unit::TestCase
def test_connection_uri
con = Connection.from_uri("mongodb://#{host_port}")
assert_equal mongo_host, con.host
assert_equal mongo_port, con.port
assert_equal mongo_host, con.primary_pool.host
assert_equal mongo_port, con.primary_pool.port
end
def test_server_version
@ -44,8 +44,8 @@ class TestConnection < Test::Unit::TestCase
end
def test_replica_set_connection_name
assert_raise_error(Mongo::ReplicaSetConnectionError, "replSet") do
standard_connection(:name => "replica-set-foo")
assert_raise_error(Mongo::ReplicaSetConnectionError, "replica-set-foo") do
standard_connection(:rs_name => "replica-set-foo-wrong")
end
end
@ -144,12 +144,6 @@ class TestConnection < Test::Unit::TestCase
assert_equal ['bar', 27018], nodes[1]
end
def test_slave_ok_with_multiple_nodes
assert_raise MongoArgumentError do
Connection.multi([['foo', 27017], ['bar', 27018]], :connect => false, :slave_ok => true)
end
end
def test_fsync_lock
assert !@conn.locked?
@conn.lock!
@ -211,29 +205,29 @@ class TestConnection < Test::Unit::TestCase
should "release connection if an exception is raised on send_message" do
@con.stubs(:send_message_on_socket).raises(ConnectionFailure)
assert_equal 0, @con.checked_out.size
assert_equal 0, @con.primary_pool.checked_out.size
assert_raise ConnectionFailure do
@coll.insert({:test => "insert"})
end
assert_equal 0, @con.checked_out.size
assert_equal 0, @con.primary_pool.checked_out.size
end
should "release connection if an exception is raised on send_with_safe_check" do
@con.stubs(:receive).raises(ConnectionFailure)
assert_equal 0, @con.checked_out.size
assert_equal 0, @con.primary_pool.checked_out.size
assert_raise ConnectionFailure do
@coll.insert({:test => "insert"}, :safe => true)
end
assert_equal 0, @con.checked_out.size
assert_equal 0, @con.primary_pool.checked_out.size
end
should "release connection if an exception is raised on receive_message" do
@con.stubs(:receive).raises(ConnectionFailure)
assert_equal 0, @con.checked_out.size
assert_equal 0, @con.primary_pool.checked_out.size
assert_raise ConnectionFailure do
@coll.find.to_a
end
assert_equal 0, @con.checked_out.size
assert_equal 0, @con.primary_pool.checked_out.size
end
end
end

View File

@ -364,15 +364,12 @@ class DBAPITest < Test::Unit::TestCase
end
def test_array
@@coll.remove
@@coll.insert({'b' => [1, 2, 3]})
@@coll.insert({'b' => [1, 2, 3]})
rows = @@coll.find({}, {:fields => ['b']}).to_a
if @@version < "1.1.3"
assert_equal 1, rows.length
assert_equal [1, 2, 3], rows[0]['b']
else
assert_equal 2, rows.length
assert_equal [1, 2, 3], rows[1]['b']
end
assert_equal 2, rows.length
assert_equal [1, 2, 3], rows[1]['b']
end
def test_regex

View File

@ -0,0 +1,40 @@
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
require 'mongo'
require 'test/unit'
require './test/test_helper'
# NOTE: This test expects a replica set of three nodes to be running
# on the local host.
class ReplicaSetQuerySecondariesTest < Test::Unit::TestCase
include Mongo
def setup
@conn = Mongo::Connection.multi([['localhost', 27018]], :read_secondaries => true)
@db = @conn.db(MONGO_TEST_DB)
@db.drop_collection("test-sets")
@coll = @db.collection("test-sets", :safe => {:w => 2, :wtimeout => 100})
end
def test_query
@coll.save({:a => 20})
@coll.save({:a => 30})
@coll.save({:a => 40})
results = []
@coll.find.each {|r| results << r["a"]}
assert results.include?(20)
assert results.include?(30)
assert results.include?(40)
puts "Please disconnect the current master."
gets
results = []
rescue_connection_failure do
@coll.find.each {|r| results << r}
[20, 30, 40].each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
end
end
end
end

View File

@ -5,7 +5,7 @@ require './test/test_helper'
# NOTE: This test expects a replica set of three nodes to be running
# on the local host.
class ReplicaPairQueryTest < Test::Unit::TestCase
class ReplicaSetQueryTest < Test::Unit::TestCase
include Mongo
def setup

View File

@ -4,7 +4,7 @@ include Mongo
class ConnectionTest < Test::Unit::TestCase
context "Initialization: " do
setup do
def new_mock_socket
def new_mock_socket(host='localhost', port=27017)
socket = Object.new
socket.stubs(:setsockopt).with(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
socket.stubs(:close)
@ -28,12 +28,12 @@ class ConnectionTest < Test::Unit::TestCase
end
should "set localhost and port to master" do
assert_equal 'localhost', @conn.host
assert_equal 27017, @conn.port
assert_equal 'localhost', @conn.primary_pool.host
assert_equal 27017, @conn.primary_pool.port
end
should "set connection pool to 1" do
assert_equal 1, @conn.size
assert_equal 1, @conn.primary_pool.size
end
should "default slave_ok to false" do
@ -43,22 +43,32 @@ class ConnectionTest < Test::Unit::TestCase
context "connecting to a replica set" do
setup do
TCPSocket.stubs(:new).returns(new_mock_socket)
@conn = Connection.new('localhost', 27017, :connect => false)
TCPSocket.stubs(:new).returns(new_mock_socket('localhost', 27017))
@conn = Connection.multi([['localhost', 27017]], :connect => false, :read_secondaries => true)
admin_db = new_mock_db
@hosts = ['localhost:27018', 'localhost:27019']
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1, 'hosts' => @hosts})
@conn.expects(:[]).with('admin').returns(admin_db)
@hosts = ['localhost:27018', 'localhost:27019', 'localhost:27020']
admin_db.stubs(:command).returns({'ok' => 1, 'ismaster' => 1, 'hosts' => @hosts}).
then.returns({'ok' => 1, 'ismaster' => 0, 'hosts' => @hosts, 'secondary' => 1}).
then.returns({'ok' => 1, 'ismaster' => 0, 'hosts' => @hosts, 'secondary' => 1}).
then.returns({'ok' => 1, 'ismaster' => 0, 'arbiterOnly' => 1})
@conn.stubs(:[]).with('admin').returns(admin_db)
@conn.connect
end
should "store the hosts returned from the ismaster command" do
@hosts.each do |host|
host, port = host.split(":")
port = port.to_i
assert @conn.nodes.include?([host, port]), "Connection doesn't include host #{host.inspect}."
end
assert_equal 'localhost', @conn.primary_pool.host
assert_equal 27017, @conn.primary_pool.port
assert_equal 'localhost', @conn.secondary_pools[0].host
assert_equal 27018, @conn.secondary_pools[0].port
assert_equal 'localhost', @conn.secondary_pools[1].host
assert_equal 27019, @conn.secondary_pools[1].port
assert_equal 2, @conn.secondary_pools.length
end
end
@ -75,13 +85,6 @@ class ConnectionTest < Test::Unit::TestCase
end
should "not store any hosts redundantly" do
assert_equal 3, @conn.nodes.size
@hosts.each do |host|
host, port = host.split(":")
port = port.to_i
assert @conn.nodes.include?([host, port]), "Connection doesn't include host #{host.inspect}."
end
end
end

9
test/unit/pool_test.rb Normal file
View File

@ -0,0 +1,9 @@
require './test/test_helper'
include Mongo
class PoolTest < Test::Unit::TestCase
context "Initialization: " do
should "do" do
end
end
end