RUBY-242 check BSON size on a per-connection basis.

This commit is contained in:
Kyle Banker 2011-08-25 14:57:24 -04:00
parent 9b42265d10
commit 3c127984a3
20 changed files with 108 additions and 131 deletions

View File

@ -20,11 +20,13 @@
#include "bson_buffer.h"
#define INITIAL_BUFFER_SIZE 256
#define DEFAULT_MAX_SIZE 4 * 1024 * 1024
struct bson_buffer {
char* buffer;
int size;
int position;
int max_size;
};
/* Allocate and return a new buffer.
@ -43,10 +45,19 @@ bson_buffer_t bson_buffer_new(void) {
free(buffer);
return NULL;
}
buffer->max_size = DEFAULT_MAX_SIZE;
return buffer;
}
void bson_buffer_set_max_size(bson_buffer_t buffer, int max_size) {
buffer->max_size = max_size;
}
int bson_buffer_get_max_size(bson_buffer_t buffer) {
return buffer->max_size;
}
/* Free the memory allocated for `buffer`.
* Return non-zero on failure. */
int bson_buffer_free(bson_buffer_t buffer) {

View File

@ -29,6 +29,10 @@ typedef int bson_buffer_position;
* Return NULL on allocation failure. */
bson_buffer_t bson_buffer_new(void);
/* Set the max size for this buffer.
* Note: this is not a hard limit. */
void bson_buffer_set_max_size(bson_buffer_t buffer, int max_size);
/* Free the memory allocated for `buffer`.
* Return non-zero on failure. */
int bson_buffer_free(bson_buffer_t buffer);

View File

@ -599,17 +599,22 @@ static void write_doc(bson_buffer_t buffer, VALUE hash, VALUE check_keys, VALUE
length = bson_buffer_get_position(buffer) - start_position;
// make sure that length doesn't exceed 4MB
if (length > max_bson_size) {
if (length > bson_buffer_get_max_size(buffer)) {
bson_buffer_free(buffer);
rb_raise(InvalidDocument, "Document too large: BSON documents are limited to %d bytes.", max_bson_size);
rb_raise(InvalidDocument,
"Document too large: This BSON documents is limited to %d bytes.",
bson_buffer_get_max_size(buffer));
return;
}
SAFE_WRITE_AT_POS(buffer, length_location, (const char*)&length, 4);
}
static VALUE method_serialize(VALUE self, VALUE doc, VALUE check_keys, VALUE move_id) {
static VALUE method_serialize(VALUE self, VALUE doc, VALUE check_keys,
VALUE move_id, VALUE max_size) {
VALUE result;
bson_buffer_t buffer = bson_buffer_new();
bson_buffer_set_max_size(buffer, FIX2INT(max_size));
if (buffer == NULL) {
rb_raise(rb_eNoMemError, "failed to allocate memory in buffer.c");
}
@ -973,7 +978,7 @@ void Init_cbson() {
CBson = rb_define_module("CBson");
ext_version = rb_str_new2(VERSION);
rb_define_const(CBson, "VERSION", ext_version);
rb_define_module_function(CBson, "serialize", method_serialize, 3);
rb_define_module_function(CBson, "serialize", method_serialize, 4);
rb_define_module_function(CBson, "deserialize", method_deserialize, 1);
rb_define_module_function(CBson, "max_bson_size", method_max_bson_size, 0);
rb_define_module_function(CBson, "update_max_bson_size", method_update_max_bson_size, 1);

Binary file not shown.

View File

@ -69,7 +69,8 @@ public class RubyBSONEncoder extends BSONEncoder {
private static final BigInteger LONG_MIN = BigInteger.valueOf(-MAX - 1);
public RubyBSONEncoder(Ruby runtime, boolean check_keys, boolean move_id){
public RubyBSONEncoder(Ruby runtime, boolean check_keys, boolean move_id, int max_bson_size){
_max_bson_size = max_bson_size;
_check_keys = check_keys;
_move_id = move_id;
_runtime = runtime;

View File

@ -20,8 +20,8 @@
module BSON
class BSON_C
def self.serialize(obj, check_keys=false, move_id=false)
ByteBuffer.new(CBson.serialize(obj, check_keys, move_id))
def self.serialize(obj, check_keys=false, move_id=false, max_bson_size=BSON::DEFAULT_MAX_BSON_SIZE)
ByteBuffer.new(CBson.serialize(obj, check_keys, move_id, max_bson_size))
end
def self.deserialize(buf=nil)

View File

@ -4,9 +4,9 @@ module BSON
# TODO: Pool or cache instances of RubyBSONEncoder so that
# we don't create a new one on each call to #serialize.
def self.serialize(obj, check_keys=false, move_id=false)
def self.serialize(obj, check_keys=false, move_id=false, max_bson_size=BSON::DEFAULT_MAX_BSON_SIZE)
raise InvalidDocument, "BSON_JAVA.serialize takes a Hash" unless obj.is_a?(Hash)
enc = Java::OrgJbson::RubyBSONEncoder.new(JRuby.runtime, check_keys, move_id)
enc = Java::OrgJbson::RubyBSONEncoder.new(JRuby.runtime, check_keys, move_id, max_bson_size)
ByteBuffer.new(enc.encode(obj))
end

View File

@ -46,8 +46,8 @@ module BSON
NUMBER_LONG = 18
MAXKEY = 127
def initialize
@buf = ByteBuffer.new
def initialize(max_bson_size=BSON::DEFAULT_MAX_BSON_SIZE)
@buf = ByteBuffer.new('', max_bson_size)
@encoder = BSON_RUBY
end
@ -105,8 +105,8 @@ module BSON
# Serializes an object.
# Implemented to ensure an API compatible with BSON extension.
def self.serialize(obj, check_keys=false, move_id=false)
new.serialize(obj, check_keys, move_id)
def self.serialize(obj, check_keys=false, move_id=false, max_bson_size=BSON::DEFAULT_MAX_BSON_SIZE)
new(max_bson_size).serialize(obj, check_keys, move_id)
end
def self.deserialize(buf=nil)
@ -137,8 +137,9 @@ module BSON
end
serialize_eoo_element(@buf)
if @buf.size > @@max_bson_size
raise InvalidDocument, "Document is too large (#{@buf.size}). BSON documents are limited to #{@@max_bson_size} bytes."
if @buf.size > @buf.max_size
raise InvalidDocument, "Document is too large (#{@buf.size}). " +
"This BSON documents is limited to #{@buf.max_size} bytes."
end
@buf.put_int(@buf.size, 0)
@buf

View File

@ -20,9 +20,9 @@
module BSON
class ByteBuffer
attr_reader :order
attr_reader :order, :max_size
def initialize(initial_data="")
def initialize(initial_data="", max_size=BSON::DEFAULT_MAX_BSON_SIZE)
@str = case initial_data
when String then
if initial_data.respond_to?(:force_encoding)
@ -40,6 +40,7 @@ module BSON
@order = :little_endian
@int_pack_order = 'V'
@double_pack_order = 'E'
@max_size = max_size
end
if RUBY_VERSION >= '1.9'

View File

@ -357,7 +357,7 @@ module Mongo
message = BSON::ByteBuffer.new("\0\0\0\0")
BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@name}")
message.put_int(0)
message.put_binary(BSON::BSON_CODER.serialize(selector, false, true).to_s)
message.put_binary(BSON::BSON_CODER.serialize(selector, false, true, @connection.max_bson_size).to_s)
@connection.instrument(:remove, :database => @db.name, :collection => @name, :selector => selector) do
if safe
@ -404,7 +404,7 @@ module Mongo
update_options += 2 if opts[:multi]
message.put_int(update_options)
message.put_binary(BSON::BSON_CODER.serialize(selector, false, true).to_s)
message.put_binary(BSON::BSON_CODER.serialize(document, false, true).to_s)
message.put_binary(BSON::BSON_CODER.serialize(document, false, true, @connection.max_bson_size).to_s)
@connection.instrument(:update, :database => @db.name, :collection => @name, :selector => selector, :document => document) do
if safe
@ -897,7 +897,7 @@ module Mongo
message = BSON::ByteBuffer.new("\0\0\0\0")
BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{collection_name}")
documents.each do |doc|
message.put_binary(BSON::BSON_CODER.serialize(doc, check_keys, true).to_s)
message.put_binary(BSON::BSON_CODER.serialize(doc, check_keys, true, @connection.max_bson_size).to_s)
end
raise InvalidOperation, "Exceded maximum insert size of 16,000,000 bytes" if message.size > 16_000_000

View File

@ -512,12 +512,11 @@ module Mongo
@read_primary = false
end
@max_bson_size = config['maxBsonObjectSize'] || Mongo::DEFAULT_MAX_BSON_SIZE
set_primary(@host_to_try)
end
if connected?
BSON::BSON_CODER.update_max_bson_size(self)
else
if !connected?
raise ConnectionFailure, "Failed to connect to a master node at #{@host_to_try[0]}:#{@host_to_try[1]}"
end
end
@ -571,8 +570,7 @@ module Mongo
#
# @return [Integer]
def max_bson_size
config = self['admin'].command({:ismaster => 1})
config['maxBsonObjectSize'] || Mongo::DEFAULT_MAX_BSON_SIZE
@max_bson_size
end
# Checkout a socket for reading (i.e., a secondary node).
@ -619,7 +617,7 @@ module Mongo
@logger.fatal "MONGODB [FATAL] #{msg}"
else
@logger.info "MONGODB [INFO] #{msg}"
end
end
end
# Execute the block and log the operation described by name
@ -635,6 +633,9 @@ module Mongo
# Generic initialization code.
def setup(opts)
# Default maximum BSON object size
@max_bson_size = Mongo::DEFAULT_MAX_BSON_SIZE
# Authentication objects
@auths = opts.fetch(:auths, [])

View File

@ -8,7 +8,8 @@ module Mongo
if data.is_a?(String)
self.host, self.port = split_nodes(data)
else
self.host, self.port = data
self.host = data[0]
self.port = data[1].nil? ? Connection::DEFAULT_PORT : data[1].to_i
end
self.address = "#{host}:#{port}"
end
@ -134,7 +135,7 @@ module Mongo
def split_nodes(host_string)
data = host_string.split(":")
host = data[0]
port = data[1].to_i || Connection::DEFAULT_PORT
port = data[1].nil? ? Connection::DEFAULT_PORT : data[1].to_i
[host, port]
end

View File

@ -87,8 +87,10 @@ module Mongo
# The list of seed nodes
@seeds = args
@nodes = @seeds.dup
# The members of the replica set, stored as instances of Mongo::Node.
@nodes = []
@members = []
# Connection pool for primary node
@primary = nil
@ -139,7 +141,6 @@ module Mongo
manager.connect
update_config(manager)
#BSON::BSON_CODER.update_max_bson_size(self)
initiate_auto_refresh
if @primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
@ -164,6 +165,7 @@ module Mongo
@manager = manager
@hosts = manager.hosts
@nodes = manager.nodes
@max_bson_size = manager.max_bson_size
end
# If ismaster doesn't match our current view
@ -210,6 +212,11 @@ module Mongo
super
end
def nodes
warn "DEPRECATED"
@seeds
end
# Determine whether we're reading from a primary node. If false,
# this connection connects to a secondary node and @read_secondaries is true.
#

View File

@ -2,7 +2,7 @@ module Mongo
class PoolManager
attr_reader :connection, :seeds, :arbiters, :primary, :secondaries,
:primary_pool, :read_pool, :secondary_pools, :hosts, :nodes
:primary_pool, :read_pool, :secondary_pools, :hosts, :nodes, :max_bson_size
def initialize(connection, seeds)
@connection = connection
@ -12,10 +12,10 @@ module Mongo
def connect
initialize_data
nodes = connect_to_members
initialize_pools(nodes)
update_seed_list(nodes)
@nodes = nodes
members = connect_to_members
initialize_pools(members)
update_seed_list(members)
@members = members
end
# Ensure that the view of the replica set is current by
@ -106,34 +106,34 @@ module Mongo
@secondaries = []
@secondary_pools = []
@hosts = []
@nodes = []
@members = []
end
# Connect to each member of the replica set
# as reported by the given seed node, and return
# as a list of Mongo::Node objects.
def connect_to_members
nodes = []
members = []
seed = get_valid_seed_node
seed.node_list.each do |host|
node = Mongo::Node.new(self.connection, host)
if node.connect && node.set_config
nodes << node
members << node
end
end
if nodes.empty?
if members.empty?
raise ConnectionFailure, "Failed to connect to any given member."
end
nodes
members
end
# Initialize the connection pools for the primary and secondary nodes.
def initialize_pools(nodes)
nodes.each do |member|
def initialize_pools(members)
members.each do |member|
@hosts << member.host_string
if member.primary?
@ -151,7 +151,9 @@ module Mongo
end
end
@arbiters = nodes.first.arbiters
@max_bson_size = members.first.config['maxBsonObjectSize'] ||
Mongo::DEFAULT_MAX_BSON_SIZE
@arbiters = members.first.arbiters
choose_read_pool
end
@ -203,8 +205,8 @@ module Mongo
"#{@seeds.map {|s| "#{s[0]}:#{s[1]}" }.join(', ')}"
end
def update_seed_list(nodes)
@seeds = nodes.map { |n| n.host_port }
def update_seed_list(members)
@seeds = members.map { |n| n.host_port }
end
end

View File

@ -1,7 +1,7 @@
require './test/test_helper'
class TestCollection < Test::Unit::TestCase
@@connection ||= standard_connection(:op_timeout => 2)
@@connection ||= standard_connection(:op_timeout => 10)
@@db = @@connection.db(MONGO_TEST_DB)
@@test = @@db.collection("test")
@@version = @@connection.server_version

View File

@ -13,7 +13,7 @@ class TestConnection < Test::Unit::TestCase
end
def teardown
@conn[MONGO_TEST_DB].get_last_error
@conn.close
end
def test_connection_failure
@ -162,8 +162,8 @@ class TestConnection < Test::Unit::TestCase
end
def test_nodes
db = Connection.multi([['foo', 27017], ['bar', 27018]], :connect => false)
nodes = db.nodes
conn = Connection.multi([['foo', 27017], ['bar', 27018]], :connect => false)
nodes = conn.nodes
assert_equal 2, nodes.length
assert_equal ['foo', 27017], nodes[0]
assert_equal ['bar', 27018], nodes[1]
@ -191,35 +191,32 @@ class TestConnection < Test::Unit::TestCase
end
def test_max_bson_size_value
conn = standard_connection(:connect => false)
admin_db = Object.new
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1, 'maxBsonObjectSize' => 15_000_000})
conn.expects(:[]).with('admin').returns(admin_db)
conn.connect
assert_equal 15_000_000, conn.max_bson_size
conn = standard_connection
if conn.server_version > "1.7.2"
assert_equal conn['admin'].command({:ismaster => 1})['maxBsonObjectSize'], conn.max_bson_size
end
conn.connect
assert_equal BSON::BSON_CODER.max_bson_size, conn.max_bson_size
doc = {'n' => 'a' * (BSON_CODER.max_bson_size - 11)}
doc = {'n' => 'a' * (conn.max_bson_size)}
assert_raise InvalidDocument do
assert BSON::BSON_CODER.serialize(doc)
end
limit = 7 * 1024 * 1024
conn.stubs(:max_bson_size).returns(limit)
conn.connect
assert_equal limit, conn.max_bson_size
assert_equal limit, BSON::BSON_CODER.max_bson_size
doc = {'n' => 'a' * ((limit) - 11)}
assert_raise_error InvalidDocument, "limited to #{limit}" do
assert BSON::BSON_CODER.serialize(doc)
assert BSON::BSON_CODER.serialize(doc, false, true, @conn.max_bson_size)
end
end
def test_max_bson_size_with_old_mongod
def test_max_bson_size_with_no_reported_max_size
conn = standard_connection(:connect => false)
admin_db = Object.new
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1}).twice
conn.expects(:[]).with('admin').returns(admin_db).twice
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1})
conn.expects(:[]).with('admin').returns(admin_db)
conn.connect
assert_equal Mongo::DEFAULT_MAX_BSON_SIZE, BSON::BSON_CODER.max_bson_size
@ -253,6 +250,10 @@ class TestConnection < Test::Unit::TestCase
@conn.add_auth(@auth['db_name'], @auth['username'], @auth['password'])
end
teardown do
@conn.clear_auths
end
should "save the authentication" do
assert_equal @auth, @conn.auths[0]
end
@ -315,7 +316,7 @@ class TestConnection < Test::Unit::TestCase
fake_socket = Mocha::Mock.new
fake_socket.stubs(:close).raises(IOError.new)
fake_socket.stub_everything
TCPSocket.expects(:new).returns(fake_socket)
TCPSocket.stubs(:new).returns(fake_socket)
@con.primary_pool.checkout_new_socket
assert_equal [], @con.primary_pool.close

View File

@ -156,7 +156,8 @@ class DBTest < Test::Unit::TestCase
assert Mongo::Connection.from_uri("mongodb://spongebob:squarepants@#{host_port}/#{@@db.name}")
assert_raise Mongo::AuthenticationError do
Mongo::Connection.from_uri("mongodb://wrong:info@#{host_port}/#{@@db.name}")
con = Mongo::Connection.from_uri("mongodb://wrong:info@#{host_port}/#{@@db.name}")
con['test']['foo'].find_one
end
end

View File

@ -9,8 +9,8 @@ class ConnectionTest < Test::Unit::TestCase
TCPSocket.stubs(:new).returns(new_mock_socket)
admin_db = new_mock_db
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1}).twice
@conn.expects(:[]).with('admin').returns(admin_db).twice
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1})
@conn.expects(:[]).with('admin').returns(admin_db)
@conn.connect
end
@ -52,8 +52,8 @@ class ConnectionTest < Test::Unit::TestCase
@conn = Connection.from_uri("mongodb://localhost", :connect => false)
admin_db = new_mock_db
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1}).twice
@conn.expects(:[]).with('admin').returns(admin_db).twice
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1})
@conn.expects(:[]).with('admin').returns(admin_db)
@conn.connect
end

View File

@ -20,7 +20,7 @@ class NodeTest < Test::Unit::TestCase
assert_equal "power.level.com:#{Connection::DEFAULT_PORT}", node.address
end
should "load a node from a stirng" do
should "load a node from a string" do
node = Node.new(@connection, 'localhost:1234')
assert_equal 'localhost', node.host
assert_equal 1234, node.port

View File

@ -1,59 +0,0 @@
require './test/test_helper'
include Mongo
class ReplSetConnectionTest < Test::Unit::TestCase
context "Initialization: " do
context "connecting to a replica set" do
setup do
TCPSocket.stubs(:new).returns(new_mock_socket('localhost', 27017))
@conn = ReplSetConnection.new(['localhost', 27017], :connect => false, :read_secondary => true)
admin_db = new_mock_db
@hosts = ['localhost:27018', 'localhost:27019', 'localhost:27020']
admin_db.stubs(:command).returns({'ok' => 1, 'ismaster' => 1, 'hosts' => @hosts}).
then.returns({'ok' => 1, 'ismaster' => 0, 'hosts' => @hosts, 'secondary' => 1}).
then.returns({'ok' => 1, 'ismaster' => 0, 'hosts' => @hosts, 'secondary' => 1}).
then.returns({'ok' => 1, 'ismaster' => 0, 'arbiterOnly' => 1})
@conn.stubs(:[]).with('admin').returns(admin_db)
@conn.connect
end
should "store the hosts returned from the ismaster command" do
assert_equal 'localhost', @conn.primary_pool.host
assert_equal 27017, @conn.primary_pool.port
assert_equal 'localhost', @conn.secondary_pools[0].host
assert_equal 27018, @conn.secondary_pools[0].port
assert_equal 'localhost', @conn.secondary_pools[1].host
assert_equal 27019, @conn.secondary_pools[1].port
assert_equal 2, @conn.secondary_pools.length
end
end
context "connecting to a replica set and providing seed nodes" do
setup do
TCPSocket.stubs(:new).returns(new_mock_socket)
@conn = ReplSetConnection.new(['localhost', 27017], ['localhost', 27019], :connect => false)
admin_db = new_mock_db
@hosts = ['localhost:27017', 'localhost:27018', 'localhost:27019']
admin_db.stubs(:command).returns({'ok' => 1, 'ismaster' => 1, 'hosts' => @hosts})
@conn.stubs(:[]).with('admin').returns(admin_db)
@conn.connect
end
end
context "initializing with a mongodb uri" do
should "parse a uri specifying multiple nodes" do
@conn = Connection.from_uri("mongodb://localhost:27017,mydb.com:27018", :connect => false)
assert_equal ['localhost', 27017], @conn.nodes[0]
assert_equal ['mydb.com', 27018], @conn.nodes[1]
end
end
end
end