From f158aa13af154583a93c2fb0f379f5bd2310600a Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Fri, 12 Feb 2010 18:03:07 -0500 Subject: [PATCH] Initial GridFS refactoring --- bin/gridstore_benchmark | 31 +++- lib/mongo.rb | 3 +- lib/mongo/collection.rb | 2 +- lib/mongo/exceptions.rb | 3 + lib/mongo/gridfs/grid.rb | 55 +++++++ lib/mongo/gridfs/grid_io.rb | 255 +++++++++++++++++++++++++++++++++ lib/mongo/gridfs/grid_store.rb | 4 +- test/test_grid.rb | 195 +++++++++++++++++++++++++ test/test_grid_io.rb | 36 +++++ 9 files changed, 573 insertions(+), 11 deletions(-) create mode 100644 lib/mongo/gridfs/grid.rb create mode 100644 lib/mongo/gridfs/grid_io.rb create mode 100644 test/test_grid.rb create mode 100644 test/test_grid_io.rb diff --git a/bin/gridstore_benchmark b/bin/gridstore_benchmark index 49c3e2f..6892c5c 100755 --- a/bin/gridstore_benchmark +++ b/bin/gridstore_benchmark @@ -9,23 +9,40 @@ include GridFS db = Connection.new['benchmark-gridfs'] sample_data = File.open(File.join(File.dirname(__FILE__), 'sample_file.pdf'), 'r').read GridStore.delete(db, 'mongodb.pdf') +GridStore.delete(db, 'mongodb-new.pdf') length = sample_data.length mb = length / 1048576.0 t1 = Time.now -#RubyProf.start +@grid = Grid.new(db) +@grid.open('mongodb-new.pdf', 'w') do |f| + f.write(sample_data) +end +puts "Write: #{mb / (Time.now - t1)} mb/s" + +t1 = Time.now GridStore.open(db, 'mongodb.pdf', 'w') do |f| f.write(sample_data) end -#result = RubyProf.stop puts "Write: #{mb / (Time.now - t1)} mb/s" -#printer = RubyProf::FlatPrinter.new(result) -#printer.print(STDOUT, 0) - t1 = Time.now -GridStore.open(db, 'mongodb.pdf', 'r') do |f| - data = f.read +@grid = Grid.new(db) +data = @grid.open('mongodb-new.pdf', 'r') do |f| + f.read +end +puts "Read new: #{mb / (Time.now - t1)} mb/s" +file = db['fs.files'].find_one({:filename => 'mongodb-new.pdf'}) +p file +puts +p db['fs.chunks'].find({:files_id => file['_id']}, {:fields => ['files_id']}).to_a + +t1 = Time.now +old_data = GridStore.open(db, 'mongodb.pdf', 'r') do |f| + f.read end puts "Read: #{mb / (Time.now - t1)} mb/s" + +puts sample_data == old_data +puts sample_data == data diff --git a/lib/mongo.rb b/lib/mongo.rb index 53dd30e..fcc965c 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -58,4 +58,5 @@ require 'mongo/connection' require 'mongo/cursor' require 'mongo/db' require 'mongo/exceptions' -require 'mongo/gridfs' +require 'mongo/gridfs/grid' +require 'mongo/gridfs/grid_io' diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index 7e69251..462226f 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -552,7 +552,7 @@ EOS # Note: If operating in auth mode, the client must be authorized as an admin to # perform this operation. # - # @param [String ] new_name the new name for this collection + # @param [String] new_name the new name for this collection # # @raise [InvalidName] if +new_name+ is an invalid collection name. def rename(new_name) diff --git a/lib/mongo/exceptions.rb b/lib/mongo/exceptions.rb index 03b3013..5d1c900 100644 --- a/lib/mongo/exceptions.rb +++ b/lib/mongo/exceptions.rb @@ -24,6 +24,9 @@ module Mongo # Raised when configuration options cause connections, queries, etc., to fail. class ConfigurationError < MongoRubyError; end + # Raised with fatal errors to GridFS. + class GridError < MongoRubyError; end + # Raised when invalid arguments are sent to Mongo Ruby methods. class MongoArgumentError < MongoRubyError; end diff --git a/lib/mongo/gridfs/grid.rb b/lib/mongo/gridfs/grid.rb new file mode 100644 index 0000000..1436258 --- /dev/null +++ b/lib/mongo/gridfs/grid.rb @@ -0,0 +1,55 @@ +# -- +# Copyright (C) 2008-2009 10gen Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ++ + +# GridFS is a specification for storing large objects in MongoDB. +# See the documentation for GridFS::GridStore +# +# @see GridFS::GridStore +# +# @core gridfs +module Mongo + class Grid + DEFAULT_ROOT_COLLECTION = 'fs' + + def initialize(db, root_collection=DEFAULT_ROOT_COLLECTION, opts={}) + check_params(db) + @db = db + @files = @db["#{root_collection}.files"] + @chunks = @db["#{root_collection}.chunks"] + end + + def open(filename, mode, opts={}) + file = GridIO.new(@files, @chunks, filename, mode, opts) + result = nil + begin + if block_given? + result = yield file + end + ensure + file.close + end + result + end + + private + + def check_params(db) + if !db.is_a?(Mongo::DB) + raise MongoArgumentError, "db must be an instance of Mongo::DB." + end + end + end +end diff --git a/lib/mongo/gridfs/grid_io.rb b/lib/mongo/gridfs/grid_io.rb new file mode 100644 index 0000000..3aa51cb --- /dev/null +++ b/lib/mongo/gridfs/grid_io.rb @@ -0,0 +1,255 @@ +# -- +# Copyright (C) 2008-2009 10gen Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ++ + +module Mongo + class GridIO + DEFAULT_CHUNK_SIZE = 256 * 1024 + DEFAULT_CONTENT_TYPE = 'text/plain' + + attr_reader :content_type + attr_reader :chunk_size + + # @options opts [Hash] :cond + def initialize(files, chunks, filename, mode, opts={}) + @files = files + @chunks = chunks + @filename = filename + @mode = mode + @content_type = opts[:content_type] || DEFAULT_CONTENT_TYPE + @chunk_size = opts[:chunk_size] || DEFAULT_CHUNK_SIZE + @files_id = opts[:files_id] || Mongo::ObjectID.new + + init_file(opts) + init_mode(opts) + end + + # Read the data from the file. If a length if specified, will read from the + # current file position. + # + # @param [Integer] length + # + # @return [String] + # the data in the file + def read(length=nil) + return '' if length == 0 + return read_all if length.nil? && @file_position.zero? + buf = '' + while true + buf << @current_chunk['data'].to_s[@chunk_position..-1] + if buf.length >= length + return buf[0...length] + else + @current_chunk = get_chunk(@current_chunk['n'] + 1) + end + end + buf + end + + # Write the given string (binary) data to the file. + # + # @param [String] string + # the data to write + # + # @return [Integer] + # the number of bytes written. + def write(string) + raise GridError, "#{@filename} not opened for write" unless @mode[0] == ?w + # Since Ruby 1.9.1 doesn't necessarily store one character per byte. + if string.respond_to?(:force_encoding) + string.force_encoding("binary") + end + to_write = string.length + while (to_write > 0) do + if @current_chunk && @chunk_position == @chunk_size + next_chunk_number = @current_chunk['n'] + 1 + @current_chunk = create_chunk(next_chunk_number) + end + chunk_available = @chunk_size - @chunk_position + step_size = (to_write > chunk_available) ? chunk_available : to_write + @current_chunk['data'] = Binary.new(@current_chunk['data'].to_s << string[-to_write, step_size]) + @chunk_position += step_size + to_write -= step_size + save_chunk(@current_chunk) + end + string.length - to_write + end + + # Position the file pointer at the provided location. + # + # @param [Integer] pos + # the number of bytes to advance the file pointer. this can be a negative + # number. + # @param [Integer] whence + # one of IO::SEEK_CUR, IO::SEEK_END, or IO::SEEK_SET + # + # @return [Integer] the new file position + def seek(pos, whence=IO::SEEK_SET) + raise GridError, "Seek is only allowed in read mode." unless @mode == 'r' + target_pos = case whence + when IO::SEEK_CUR + @file_position + pos + when IO::SEEK_END + @file_length + pos + when IO::SEEK_SET + pos + end + + new_chunk_number = (target_pos / @chunk_size).to_i + if new_chunk_number != @current_chunk['n'] + save_chunk(@current_chunk) if @mode[0] == ?w + @current_chunk = get_chunk(new_chunk_number) + end + @file_position = target_pos + @chunk_position = @file_position % @chunk_size + @file_position + end + + # The current position of the file. + # + # @return [Integer] + def tell + @file_position + end + + # Creates or updates the document storing the chunks' metadata + # in the files collection. The file exists only after this method + # is called. + # + # This method will be invoked automatically + # on GridIO#open. Otherwise, it must be called manually. + # + # @return [True] + def close + if @mode[0] == ?w + if @upload_date + @files.remove('_id' => @files_id) + else + @upload_date = Time.now + end + @files.insert(to_mongo_object) + end + true + end + + private + + def create_chunk(n) + chunk = OrderedHash.new + chunk['_id'] = Mongo::ObjectID.new + chunk['n'] = n + chunk['files_id'] = @files_id + chunk['data'] = '' + @chunk_position = 0 + chunk + end + + def save_chunk(chunk) + @chunks.remove('_id' => chunk['_id']) + @chunks.insert(chunk) + end + + def get_chunk(n) + chunk = @chunks.find({'files_id' => @files_id, 'n' => n}).next_document + @chunk_position = 0 + chunk || {} + end + + def delete_chunks(selector) + @chunks.remove(selector) + end + + def last_chunk_number + (@file_length / @chunk_size).to_i + end + + # An optimized read method for reading the whole file. + def read_all + buf = '' + while true + buf << @current_chunk['data'].to_s + break if @current_chunk['n'] == last_chunk_number + @current_chunk = get_chunk(@current_chunk['n'] + 1) + end + buf + end + + # Initialize based on whether the supplied file exists. + def init_file(opts) + selector = {'filename' => @filename} + selector.merge(opts[:criteria]) if opts[:criteria] + doc = @files.find(selector).next_document + if doc + @files_id = doc['_id'] + @content_type = doc['contentType'] + @chunk_size = doc['chunkSize'] + @upload_date = doc['uploadDate'] + @aliases = doc['aliases'] + @file_length = doc['length'] + @metadata = doc['metadata'] + @md5 = doc['md5'] + else + @files_id = Mongo::ObjectID.new + @content_type = opts[:content_type] || DEFAULT_CONTENT_TYPE + @chunk_size = opts[:chunk_size] || DEFAULT_CHUNK_SIZE + @length = 0 + end + end + + # Validates and sets up the class for the given file mode. + def init_mode(opts) + case @mode + when 'r' + @current_chunk = get_chunk(0) + @file_position = 0 + when 'w' + delete_chunks({'_files_id' => }) + + @metadata = opts[:metadata] if opts[:metadata] + @chunks.create_index([['files_id', Mongo::ASCENDING], ['n', Mongo::ASCENDING]]) + @current_chunk = create_chunk(0) + @file_position = 0 + when 'w+' + @metadata = opts[:metadata] if opts[:metadata] + @chunks.create_index([['files_id', Mongo::ASCENDING], ['n', Mongo::ASCENDING]]) + @current_chunk = get_chunk(last_chunk_number) || create_chunk(0) + @chunk_position = @current_chunk['data'].length + @file_position = @length + else + raise GridError, "Illegal file mode #{mode}. Valid options are 'r', 'w', and 'w+'." + end + end + + def to_mongo_object + h = OrderedHash.new + h['_id'] = @files_id + h['filename'] = @filename + h['contentType'] = @content_type + h['length'] = @current_chunk ? @current_chunk['n'] * @chunk_size + @chunk_position : 0 + h['chunkSize'] = @chunk_size + h['uploadDate'] = @upload_date + h['aliases'] = @aliases + h['metadata'] = @metadata + + # Get a server-side md5. + md5_command = OrderedHash.new + md5_command['filemd5'] = @files_id + md5_command['root'] = 'fs' + h['md5'] = @files.db.command(md5_command)['md5'] + + h + end + end +end diff --git a/lib/mongo/gridfs/grid_store.rb b/lib/mongo/gridfs/grid_store.rb index d56ebe7..8d96322 100644 --- a/lib/mongo/gridfs/grid_store.rb +++ b/lib/mongo/gridfs/grid_store.rb @@ -150,7 +150,7 @@ module GridFS end end - # Get each line of data from the specified file + # Get each line of data from the specified file # as an array of strings. # # @param [Mongo::DB] db a MongoDB database. @@ -208,7 +208,7 @@ module GridFS # @option options [Integer] :chunk_size (Chunk::DEFAULT_CHUNK_SIZE) (w) Sets chunk size for files opened for writing. # See also GridStore#chunk_size=. # - # @option options [String] :content_type ('text/plain') Set the content type stored as the + # @option options [String] :content_type ('text/plain') Set the content type stored as the # file's metadata. See also GridStore#content_type=. def initialize(db, name, mode='r', options={}) @db, @filename, @mode = db, name, mode diff --git a/test/test_grid.rb b/test/test_grid.rb new file mode 100644 index 0000000..e1dc88a --- /dev/null +++ b/test/test_grid.rb @@ -0,0 +1,195 @@ +require 'test/test_helper' + +class GridTest < Test::Unit::TestCase + include GridFS + + def setup + @db ||= Connection.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost', + ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT).db('ruby-mongo-test') + @files = @db.collection('fs.files') + @chunks = @db.collection('fs.chunks') + end + + def teardown + @files.remove + @chunks.remove + end + + context "When reading:" do + setup do + @data = "CHUNKS" * 50000 + GridStore.open(@db, 'sample', 'w') do |f| + f.puts @data + end + + @grid = Grid.new(@db) + end + + should "contain sample data" do + assert_equal @data, GridStore.read(@db, 'sample') + end + + should "read sample data" do + data = @grid.open('sample', 'r') { |f| f.read } + assert_equal data.length, @data.length + end + + should "return an empty string if length is zero" do + data = @grid.open('sample', 'r') { |f| f.read(0) } + assert_equal '', data + end + + should "return the first n bytes" do + data = @grid.open('sample', 'r') {|f| f.read(288888) } + assert_equal 288888, data.length + assert_equal @data[0...288888], data + end + + should "return the first n bytes even with an offset" do + data = @grid.open('sample', 'r') do |f| + f.seek(1000) + f.read(288888) + end + assert_equal 288888, data.length + assert_equal @data[1000...289888], data + end + end + + context "When writing:" do + setup do + @data = "BYTES" * 50000 + @grid = Grid.new(@db) + @grid.open('sample', 'w') do |f| + f.write @data + end + end + + should "read sample data" do + data = @grid.open('sample', 'r') { |f| f.read } + assert_equal data.length, @data.length + end + + should "return the total number of bytes written" do + data = 'a' * 300000 + assert_equal 300000, @grid.open('write', 'w') {|f| f.write(data) } + end + + should "more read sample data" do + data = @grid.open('sample', 'r') { |f| f.read } + assert_equal data.length, @data.length + end + + should "raise exception if not opened for write" do + assert_raise GridError do + @grid.open('io', 'r') { |f| f.write('hello') } + end + end + end + + context "When appending:" do + setup do + @data = "1" + @grid = Grid.new(@db) + @grid.open('sample', 'w', :chunk_size => 1000) do |f| + f.write @data + end + end + + should "add data to the file" do + new_data = "2" + @grid.open('sample', 'w+') do |f| + f.write(new_data) + end + + all_data = @grid.open('sample', 'r') {|f| f.read } + assert_equal @data + new_data, all_data + end + + should "add multi-chunk-data" do + new_data = "2" * 5000 + + @grid.open('sample', 'w+') do |f| + f.write(new_data) + end + + all_data = @grid.open('sample', 'r') {|f| f.read } + assert_equal @data + new_data, all_data + end + end + + context "When writing chunks:" do + setup do + data = "B" * 50000 + @grid = Grid.new(@db) + @grid.open('sample', 'w', :chunk_size => 1000) do |f| + f.write data + end + end + + should "write the correct number of chunks" do + file = @files.find_one({:filename => 'sample'}) + chunks = @chunks.find({'files_id' => file['_id']}).to_a + assert_equal 50, chunks.length + end + end + + context "Positioning:" do + setup do + data = 'hello, world' + '1' * 5000 + 'goodbye!' + '2' * 1000 + '!' + @grid = Grid.new(@db) + @grid.open('hello', 'w', :chunk_size => 1000) do |f| + f.write data + end + end + + should "seek within chunks" do + @grid.open('hello', 'r') do |f| + f.seek(0) + assert_equal 'h', f.read(1) + f.seek(7) + assert_equal 'w', f.read(1) + f.seek(4) + assert_equal 'o', f.read(1) + f.seek(0) + f.seek(7, IO::SEEK_CUR) + assert_equal 'w', f.read(1) + f.seek(-1, IO::SEEK_CUR) + assert_equal ' ', f.read(1) + f.seek(-4, IO::SEEK_CUR) + assert_equal 'l', f.read(1) + f.seek(3, IO::SEEK_CUR) + assert_equal ',', f.read(1) + end + end + + should "seek between chunks" do + @grid.open('hello', 'r') do |f| + f.seek(1000) + assert_equal '11111', f.read(5) + + f.seek(5009) + assert_equal '111goodbye!222', f.read(14) + + f.seek(-1, IO::SEEK_END) + assert_equal '!', f.read(1) + f.seek(-6, IO::SEEK_END) + assert_equal '2', f.read(1) + end + end + + should "tell the current position" do + @grid.open('hello', 'r') do |f| + assert_equal 0, f.tell + + f.seek(999) + assert_equal 999, f.tell + end + end + + should "seek only in read mode" do + assert_raise GridError do + @grid.open('hello', 'w+') {|f| f.seek(0) } + end + end + end +end diff --git a/test/test_grid_io.rb b/test/test_grid_io.rb new file mode 100644 index 0000000..bb11f17 --- /dev/null +++ b/test/test_grid_io.rb @@ -0,0 +1,36 @@ +require 'test/test_helper' + +class GridIOTest < Test::Unit::TestCase + include GridFS + + def setup + @db ||= Connection.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost', + ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT).db('ruby-mongo-test') + @files = @db.collection('fs.files') + @chunks = @db.collection('fs.chunks') + end + + def teardown + @files.remove + @chunks.remove + end + + context "Options" do + setup do + @filename = 'test' + @mode = 'w' + end + + should "set default 256k chunk size" do + file = GridIO.new(@files, @chunks, @filename, @mode) + assert_equal 256 * 1024, file.chunk_size + end + + should "set chunk size" do + file = GridIO.new(@files, @chunks, @filename, @mode, :chunk_size => 1000) + assert_equal 1000, file.chunk_size + end + + end + +end