From c6d4150a5140cebbf0b8db85b5c56ef1c16e04a9 Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Wed, 14 Oct 2009 14:38:44 -0400 Subject: [PATCH] Added option to disable cursor timeout on Collection#find when #find is invoked with a block. --- lib/mongo/collection.rb | 13 +++++++++- lib/mongo/message/opcodes.rb | 2 ++ lib/mongo/message/query_message.rb | 2 +- lib/mongo/query.rb | 27 ++++++++++++++----- test/messages/test_query_message.rb | 40 +++++++++++++++++++++++++++++ test/test_collection.rb | 18 +++++++++++++ test/test_query.rb | 36 ++++++++++++++++++++++++++ 7 files changed, 129 insertions(+), 9 deletions(-) create mode 100644 test/messages/test_query_message.rb create mode 100644 test/test_query.rb diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index b7451f0..7fde81c 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -100,6 +100,13 @@ module Mongo # objects missed, which were preset at both the start and # end of the query's execution. For details see # http://www.mongodb.org/display/DOCS/How+to+do+Snapshotting+in+the+Mongo+Database + # :timeout :: When +true+ (default), the returned cursor will be subject to + # the normal cursor timeout behavior of the mongod process. + # When +false+, the returned cursor will never timeout. Note + # that disabling timeout will only work when #find is invoked + # with a block. This is to prevent any inadvertant failure to + # close the cursor, as the cursor is explicitly closed when + # block code finishes. def find(selector={}, options={}) fields = options.delete(:fields) fields = ["_id"] if fields && fields.empty? @@ -112,6 +119,10 @@ module Mongo sort = options.delete(:sort) hint = options.delete(:hint) snapshot = options.delete(:snapshot) + if options[:timeout] == false && !block_given? + raise ArgumentError, "Timeout can be set to false only when #find is invoked with a block." + end + timeout = block_given? ? (options.delete(:timeout) || true) : true if hint hint = normalize_hint_fields(hint) else @@ -119,7 +130,7 @@ module Mongo end raise RuntimeError, "Unknown options [#{options.inspect}]" unless options.empty? - cursor = @db.query(self, Query.new(selector, fields, skip, limit, sort, hint, snapshot)) + cursor = @db.query(self, Query.new(selector, fields, skip, limit, sort, hint, snapshot, timeout)) if block_given? yield cursor cursor.close() diff --git a/lib/mongo/message/opcodes.rb b/lib/mongo/message/opcodes.rb index 88b9c54..235ecd7 100644 --- a/lib/mongo/message/opcodes.rb +++ b/lib/mongo/message/opcodes.rb @@ -24,4 +24,6 @@ module Mongo OP_GET_MORE = 2005 OP_DELETE = 2006 OP_KILL_CURSORS = 2007 + + OP_QUERY_NO_CURSOR_TIMEOUT = 16 end diff --git a/lib/mongo/message/query_message.rb b/lib/mongo/message/query_message.rb index ae22f92..bfcb86f 100644 --- a/lib/mongo/message/query_message.rb +++ b/lib/mongo/message/query_message.rb @@ -29,7 +29,7 @@ module Mongo super(OP_QUERY) @query = query @collection_name = collection_name - write_int(0) + write_int(@query.query_opts) write_string("#{db_name}.#{collection_name}") write_int(query.number_to_skip) write_int(query.number_to_return) diff --git a/lib/mongo/query.rb b/lib/mongo/query.rb index 65fee6f..e8392d1 100644 --- a/lib/mongo/query.rb +++ b/lib/mongo/query.rb @@ -19,8 +19,6 @@ require 'mongo/message' require 'mongo/types/code' module Mongo - - # A query against a collection. A query's selector is a hash. See the # Mongo documentation for query details. class Query @@ -56,13 +54,22 @@ module Mongo # probably will not be what you intend because key order # is not preserved. (order_by is called :sort in calls to # Collection#find.) + # :snapshot :: If true, snapshot mode will be used for this query. + # Snapshot mode assures no duplicates are returned, or + # objects missed, which were preset at both the start and + # end of the query's execution. For details see + # http://www.mongodb.org/display/DOCS/How+to+do+Snapshotting+in+the+Mongo+Database # # hint :: If not +nil+, specifies query hint fields. Must be either - # +nil+ or a hash (preferably an OrderedHash). See - # Collection#hint. - def initialize(sel={}, return_fields=nil, number_to_skip=0, number_to_return=0, order_by=nil, hint=nil, snapshot=nil) - @number_to_skip, @number_to_return, @order_by, @hint, @snapshot = - number_to_skip, number_to_return, order_by, hint, snapshot + # +nil+ or a hash (preferably an OrderedHash). See Collection#hint. + # + # timeout :: When +true+ (default), the returned cursor will be subject to + # the normal cursor timeout behavior of the mongod process. + # When +false+, the returned cursor will never timeout. Care should + # be taken to ensure that cursors with timeout disabled are properly closed. + def initialize(sel={}, return_fields=nil, number_to_skip=0, number_to_return=0, order_by=nil, hint=nil, snapshot=nil, timeout=true) + @number_to_skip, @number_to_return, @order_by, @hint, @snapshot, @timeout = + number_to_skip, number_to_return, order_by, hint, snapshot, timeout @explain = nil self.selector = sel self.fields = return_fields @@ -111,6 +118,12 @@ module Mongo @order_by || @explain || @hint || @snapshot end + # Returns an integer indicating which query options have been selected. + # See http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPQUERY + def query_opts + @timeout ? 0 : OP_QUERY_NO_CURSOR_TIMEOUT + end + def to_s "find(#{@selector.inspect})" + (@order_by ? ".sort(#{@order_by.inspect})" : "") end diff --git a/test/messages/test_query_message.rb b/test/messages/test_query_message.rb new file mode 100644 index 0000000..ea481ef --- /dev/null +++ b/test/messages/test_query_message.rb @@ -0,0 +1,40 @@ +# -- +# Copyright (C) 2008-2009 10gen Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ++ + +$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') +require 'mongo' +require 'test/unit' + +class TestQueryMessage < Test::Unit::TestCase + + include Mongo + + def test_timeout_opcodes + @timeout = true + @query = Query.new({}, nil, 0, 0, nil, nil, nil, @timeout) + @query_message = QueryMessage.new('db', 'collection', @query) + buf = @query_message.buf.instance_variable_get(:@buf) + assert_equal 0, buf[16] + + + @timeout = false + @query = Query.new({}, nil, 0, 0, nil, nil, nil, @timeout) + @query_message = QueryMessage.new('db', 'collection', @query) + buf = @query_message.buf.instance_variable_get(:@buf) + assert_equal 16, buf[16] + end + +end diff --git a/test/test_collection.rb b/test/test_collection.rb index fbc9a49..97945a3 100644 --- a/test/test_collection.rb +++ b/test/test_collection.rb @@ -123,6 +123,24 @@ class TestCollection < Test::Unit::TestCase assert_equal @@test.size, @@test.count end + def test_no_timeout_option + @@test.drop + + assert_raise ArgumentError, "Timeout can be set to false only when #find is invoked with a block." do + @@test.find({}, :timeout => false) + end + + @@test.find({}, :timeout => false) do |cursor| + assert_equal 0, cursor.count + end + + @@test.save("x" => 1) + @@test.save("x" => 2) + @@test.find({}, :timeout => false) do |cursor| + assert_equal 2, cursor.count + end + end + def test_find_one id = @@test.save("hello" => "world", "foo" => "bar") diff --git a/test/test_query.rb b/test/test_query.rb new file mode 100644 index 0000000..119c411 --- /dev/null +++ b/test/test_query.rb @@ -0,0 +1,36 @@ +# -- +# Copyright (C) 2008-2009 10gen Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ++ + +$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') +require 'mongo' +require 'test/unit' + +class TestQuery < Test::Unit::TestCase + + include Mongo + + def test_timeout_opcodes + @timeout = true + @query = Query.new({}, nil, 0, 0, nil, nil, nil, @timeout) + assert_equal 0, @query.query_opts + + + @timeout = false + @query = Query.new({}, nil, 0, 0, nil, nil, nil, @timeout) + assert_equal 16, @query.query_opts + end + +end