Simplified message sending methods names; organized Connection
This commit is contained in:
parent
3bdd3fdd53
commit
ab0d189832
@ -219,7 +219,7 @@ module Mongo
|
|||||||
BSON.serialize_cstr(message, "#{@db.name}.#{@name}")
|
BSON.serialize_cstr(message, "#{@db.name}.#{@name}")
|
||||||
message.put_int(0)
|
message.put_int(0)
|
||||||
message.put_array(BSON_SERIALIZER.serialize(selector, false).unpack("C*"))
|
message.put_array(BSON_SERIALIZER.serialize(selector, false).unpack("C*"))
|
||||||
@connection.send_message_with_operation(Mongo::Constants::OP_DELETE, message,
|
@connection.send_message(Mongo::Constants::OP_DELETE, message,
|
||||||
"db.#{@db.name}.remove(#{selector.inspect})")
|
"db.#{@db.name}.remove(#{selector.inspect})")
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -254,7 +254,7 @@ module Mongo
|
|||||||
@connection.send_message_with_safe_check(Mongo::Constants::OP_UPDATE, message, @db.name,
|
@connection.send_message_with_safe_check(Mongo::Constants::OP_UPDATE, message, @db.name,
|
||||||
"db.#{@name}.update(#{selector.inspect}, #{document.inspect})")
|
"db.#{@name}.update(#{selector.inspect}, #{document.inspect})")
|
||||||
else
|
else
|
||||||
@connection.send_message_with_operation(Mongo::Constants::OP_UPDATE, message,
|
@connection.send_message(Mongo::Constants::OP_UPDATE, message,
|
||||||
"db.#{@name}.update(#{selector.inspect}, #{document.inspect})")
|
"db.#{@name}.update(#{selector.inspect}, #{document.inspect})")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@ -481,7 +481,7 @@ EOS
|
|||||||
@connection.send_message_with_safe_check(Mongo::Constants::OP_INSERT, message, @db.name,
|
@connection.send_message_with_safe_check(Mongo::Constants::OP_INSERT, message, @db.name,
|
||||||
"db.#{collection_name}.insert(#{documents.inspect})")
|
"db.#{collection_name}.insert(#{documents.inspect})")
|
||||||
else
|
else
|
||||||
@connection.send_message_with_operation(Mongo::Constants::OP_INSERT, message,
|
@connection.send_message(Mongo::Constants::OP_INSERT, message,
|
||||||
"db.#{collection_name}.insert(#{documents.inspect})")
|
"db.#{collection_name}.insert(#{documents.inspect})")
|
||||||
end
|
end
|
||||||
documents.collect { |o| o[:_id] || o['_id'] }
|
documents.collect { |o| o[:_id] || o['_id'] }
|
||||||
|
@ -43,7 +43,7 @@ module Mongo
|
|||||||
#
|
#
|
||||||
# In all cases, the default host is "localhost" and the default port, is 27017.
|
# In all cases, the default host is "localhost" and the default port, is 27017.
|
||||||
#
|
#
|
||||||
# When specifying, pair_or_host, is a hash with two keys: :left and :right. Each key maps to either
|
# When specifying a pair, pair_or_host, is a hash with two keys: :left and :right. Each key maps to either
|
||||||
# * a server name, in which case port is 27017,
|
# * a server name, in which case port is 27017,
|
||||||
# * a port number, in which case the server is "localhost", or
|
# * a port number, in which case the server is "localhost", or
|
||||||
# * an array containing [server_name, port_number]
|
# * an array containing [server_name, port_number]
|
||||||
@ -86,9 +86,6 @@ module Mongo
|
|||||||
# Lock for request ids.
|
# Lock for request ids.
|
||||||
@id_lock = Mutex.new
|
@id_lock = Mutex.new
|
||||||
|
|
||||||
# Lock for checking master.
|
|
||||||
@master_lock = Mutex.new
|
|
||||||
|
|
||||||
# Pool size and timeout.
|
# Pool size and timeout.
|
||||||
@size = options[:pool_size] || 1
|
@size = options[:pool_size] || 1
|
||||||
@timeout = options[:timeout] || 1.0
|
@timeout = options[:timeout] || 1.0
|
||||||
@ -201,21 +198,6 @@ module Mongo
|
|||||||
|
|
||||||
## Connections and pooling ##
|
## Connections and pooling ##
|
||||||
|
|
||||||
# Sends a message to MongoDB.
|
|
||||||
#
|
|
||||||
# Takes a MongoDB opcode, +operation+, a message of class ByteBuffer,
|
|
||||||
# +message+, and an optional formatted +log_message+.
|
|
||||||
def send_message(operation, message, log_message=nil)
|
|
||||||
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
|
||||||
|
|
||||||
packed_message = pack_message(operation, message)
|
|
||||||
socket = checkout
|
|
||||||
send_message_on_socket(packed_message, socket)
|
|
||||||
checkin(socket)
|
|
||||||
end
|
|
||||||
|
|
||||||
# Sends a message to MongoDB and returns the response.
|
|
||||||
#
|
|
||||||
# Takes a MongoDB opcode, +operation+, a message of class ByteBuffer,
|
# Takes a MongoDB opcode, +operation+, a message of class ByteBuffer,
|
||||||
# +message+, an optional formatted +log_message+, and an optional
|
# +message+, an optional formatted +log_message+, and an optional
|
||||||
# socket.
|
# socket.
|
||||||
@ -240,7 +222,7 @@ module Mongo
|
|||||||
# Takes a MongoDB opcode, +operation+, a message of class ByteBuffer,
|
# Takes a MongoDB opcode, +operation+, a message of class ByteBuffer,
|
||||||
# +message+, and an optional formatted +log_message+.
|
# +message+, and an optional formatted +log_message+.
|
||||||
# Sends the message to the databse, adding the necessary headers.
|
# Sends the message to the databse, adding the necessary headers.
|
||||||
def send_message_with_operation(operation, message, log_message=nil)
|
def send_message(operation, message, log_message=nil)
|
||||||
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
||||||
packed_message = pack_message(operation, message)
|
packed_message = pack_message(operation, message)
|
||||||
socket = checkout
|
socket = checkout
|
||||||
@ -250,6 +232,10 @@ module Mongo
|
|||||||
|
|
||||||
# Sends a message to the database, waits for a response, and raises
|
# Sends a message to the database, waits for a response, and raises
|
||||||
# and exception if the operation has failed.
|
# and exception if the operation has failed.
|
||||||
|
#
|
||||||
|
# Takes a MongoDB opcode, +operation+, a message of class ByteBuffer,
|
||||||
|
# +message+, the +db_name+, and an optional formatted +log_message+.
|
||||||
|
# Sends the message to the databse, adding the necessary headers.
|
||||||
def send_message_with_safe_check(operation, message, db_name, log_message=nil)
|
def send_message_with_safe_check(operation, message, db_name, log_message=nil)
|
||||||
message_with_headers = add_message_headers(operation, message)
|
message_with_headers = add_message_headers(operation, message)
|
||||||
message_with_check = last_error_message(db_name)
|
message_with_check = last_error_message(db_name)
|
||||||
@ -265,8 +251,12 @@ module Mongo
|
|||||||
[docs, num_received, cursor_id]
|
[docs, num_received, cursor_id]
|
||||||
end
|
end
|
||||||
|
|
||||||
# Send a message to the database and waits for the response.
|
# Sends a message to the database and waits for the response.
|
||||||
def receive_message_with_operation(operation, message, log_message=nil, socket=nil)
|
#
|
||||||
|
# Takes a MongoDB opcode, +operation+, a message of class ByteBuffer,
|
||||||
|
# +message+, and an optional formatted +log_message+. This method
|
||||||
|
# also takes an options socket for internal use with #connect_to_master.
|
||||||
|
def receive_message(operation, message, log_message=nil, socket=nil)
|
||||||
message_with_headers = add_message_headers(operation, message).to_s
|
message_with_headers = add_message_headers(operation, message).to_s
|
||||||
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
||||||
sock = socket || checkout
|
sock = socket || checkout
|
||||||
@ -326,6 +316,8 @@ module Mongo
|
|||||||
@reserved_connections.clear
|
@reserved_connections.clear
|
||||||
end
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
# Get a socket from the pool, mapped to the current thread.
|
# Get a socket from the pool, mapped to the current thread.
|
||||||
def checkout
|
def checkout
|
||||||
#return @socket ||= checkout_new_socket if @size == 1
|
#return @socket ||= checkout_new_socket if @size == 1
|
||||||
@ -560,5 +552,6 @@ module Mongo
|
|||||||
a
|
a
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -201,7 +201,7 @@ module Mongo
|
|||||||
message.put_int(0)
|
message.put_int(0)
|
||||||
message.put_int(1)
|
message.put_int(1)
|
||||||
message.put_long(@cursor_id)
|
message.put_long(@cursor_id)
|
||||||
@connection.send_message_with_operation(Mongo::Constants::OP_KILL_CURSORS, message, "cursor.close()")
|
@connection.send_message(Mongo::Constants::OP_KILL_CURSORS, message, "cursor.close()")
|
||||||
end
|
end
|
||||||
@cursor_id = 0
|
@cursor_id = 0
|
||||||
@closed = true
|
@closed = true
|
||||||
@ -296,7 +296,7 @@ module Mongo
|
|||||||
|
|
||||||
# Cursor id.
|
# Cursor id.
|
||||||
message.put_long(@cursor_id)
|
message.put_long(@cursor_id)
|
||||||
results, @n_received, @cursor_id = @connection.receive_message_with_operation(Mongo::Constants::OP_GET_MORE, message, "cursor.get_more()", @socket)
|
results, @n_received, @cursor_id = @connection.receive_message(Mongo::Constants::OP_GET_MORE, message, "cursor.get_more()", @socket)
|
||||||
@cache += results
|
@cache += results
|
||||||
close_cursor_if_query_complete
|
close_cursor_if_query_complete
|
||||||
end
|
end
|
||||||
@ -307,7 +307,7 @@ module Mongo
|
|||||||
false
|
false
|
||||||
else
|
else
|
||||||
message = construct_query_message
|
message = construct_query_message
|
||||||
results, @n_received, @cursor_id = @connection.receive_message_with_operation(Mongo::Constants::OP_QUERY, message,
|
results, @n_received, @cursor_id = @connection.receive_message(Mongo::Constants::OP_QUERY, message,
|
||||||
(query_log_message if @connection.logger), @socket)
|
(query_log_message if @connection.logger), @socket)
|
||||||
@cache += results
|
@cache += results
|
||||||
@query_run = true
|
@query_run = true
|
||||||
|
@ -11,7 +11,7 @@ class ConnectionTest < Test::Unit::TestCase
|
|||||||
@conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false)
|
@conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false)
|
||||||
@db = @conn['testing']
|
@db = @conn['testing']
|
||||||
@coll = @db.collection('books')
|
@coll = @db.collection('books')
|
||||||
@conn.expects(:send_message_with_operation).with do |op, msg, log|
|
@conn.expects(:send_message).with do |op, msg, log|
|
||||||
op == 2001 && log.include?("db.books.update")
|
op == 2001 && log.include?("db.books.update")
|
||||||
end
|
end
|
||||||
@coll.update({}, {:title => 'Moby Dick'})
|
@coll.update({}, {:title => 'Moby Dick'})
|
||||||
@ -21,7 +21,7 @@ class ConnectionTest < Test::Unit::TestCase
|
|||||||
@conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false)
|
@conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false)
|
||||||
@db = @conn['testing']
|
@db = @conn['testing']
|
||||||
@coll = @db.collection('books')
|
@coll = @db.collection('books')
|
||||||
@conn.expects(:send_message_with_operation).with do |op, msg, log|
|
@conn.expects(:send_message).with do |op, msg, log|
|
||||||
op == 2002 && log.include?("db.books.insert")
|
op == 2002 && log.include?("db.books.insert")
|
||||||
end
|
end
|
||||||
@coll.insert({:title => 'Moby Dick'})
|
@coll.insert({:title => 'Moby Dick'})
|
||||||
|
@ -12,6 +12,11 @@ class ConnectionTest < Test::Unit::TestCase
|
|||||||
db = Object.new
|
db = Object.new
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Make a few methods public for these tests.
|
||||||
|
class Mongo::Connection
|
||||||
|
public :checkin, :checkout, :clear_stale_cached_connections!
|
||||||
|
end
|
||||||
|
|
||||||
context "Initialization: " do
|
context "Initialization: " do
|
||||||
|
|
||||||
context "given a single node" do
|
context "given a single node" do
|
||||||
|
Loading…
Reference in New Issue
Block a user