From 0ef78c46c423ae99ec61adb657da91ac6487a7d4 Mon Sep 17 00:00:00 2001 From: Jim Menard Date: Thu, 29 Jan 2009 11:23:50 -0500 Subject: [PATCH] GridStore (GridFS) implementation. --- README.rdoc | 13 ++ lib/mongo/gridfs.rb | 1 + lib/mongo/gridfs/chunk.rb | 107 +++++++++ lib/mongo/gridfs/grid_store.rb | 397 +++++++++++++++++++++++++++++++++ mongo-ruby-driver.gemspec | 2 +- tests/test_chunk.rb | 108 +++++++++ tests/test_grid_store.rb | 177 +++++++++++++++ 7 files changed, 804 insertions(+), 1 deletion(-) create mode 100644 lib/mongo/gridfs.rb create mode 100644 lib/mongo/gridfs/chunk.rb create mode 100644 lib/mongo/gridfs/grid_store.rb create mode 100644 tests/test_chunk.rb create mode 100644 tests/test_grid_store.rb diff --git a/README.rdoc b/README.rdoc index aee5944..d4c6985 100644 --- a/README.rdoc +++ b/README.rdoc @@ -25,6 +25,10 @@ A quick code sample: puts "There are #{coll.count()} records. Here they are:" coll.find().each { |doc| puts doc.inspect } +This driver also includes an implementation of a GridStore class, a Ruby +interface to Mongo's GridFS storage. NOTE: the GridStore code may be moved to +a separate project. + = Installation Install the "mongo" gem by typing @@ -54,6 +58,15 @@ be running, of course. See also the test code, especially tests/test_db_api.rb. +For the GridFS class GridStore, see the tests. + + += GridStore + +The GridStore class is a Ruby implementation of Mongo's GridFS file storage +system. An instance of GridStore is like an IO object. See the rdocs for +details. + = Notes diff --git a/lib/mongo/gridfs.rb b/lib/mongo/gridfs.rb new file mode 100644 index 0000000..d6fc1b2 --- /dev/null +++ b/lib/mongo/gridfs.rb @@ -0,0 +1 @@ +require 'mongo/gridfs/grid_store' diff --git a/lib/mongo/gridfs/chunk.rb b/lib/mongo/gridfs/chunk.rb new file mode 100644 index 0000000..a500dfd --- /dev/null +++ b/lib/mongo/gridfs/chunk.rb @@ -0,0 +1,107 @@ +require 'mongo/types/binary' +require 'mongo/types/dbref' +require 'mongo/types/objectid' +require 'mongo/util/byte_buffer' +require 'mongo/util/ordered_hash' + + +module XGen + module Mongo + module GridFS + + # A chunk stores a portion of GridStore data. + # + # TODO: user-defined chunk size + class Chunk + + DEFAULT_CHUNK_SIZE = 1024 * 256 + + attr_reader :object_id, :chunk_number + attr_accessor :data + + def initialize(db_collection, mongo_object={}) + @coll = db_collection + @object_id = mongo_object['_id'] || XGen::Mongo::Driver::ObjectID.new + @chunk_number = mongo_object['cn'] || 1 + + @data = ByteBuffer.new + case mongo_object['data'] + when String + mongo_object['data'].each_byte { |b| @data.put(b) } + when ByteBuffer + @data.put_array(mongo_object['data'].to_a) + when Array + @data.put_array(mongo_object['data']) + end + @data.rewind + + @next_chunk_dbref = mongo_object['next'] + end + + def has_next? + @next_chunk_dbref + end + + def next + return @next_chunk if @next_chunk + return nil unless @next_chunk_dbref + row = @coll.find({'_id' => @next_chunk_dbref.object_id}).next_object + @next_chunk = self.class.new(@coll, row) if row + @next_chunk + end + + def next=(chunk) + @next_chunk = chunk + @next_chunk_dbref = XGen::Mongo::Driver::DBRef.new(nil, nil, @coll.db, '_chunks', chunk.object_id) + end + + def pos; @data.position; end + def pos=(pos); @data.position = pos; end + def eof?; !@data.more?; end + + def size; @data.size; end + alias_method :length, :size + + def empty? + @data.length == 0 + end + + def clear + @data.clear + end + + # Erase all data after current position. + def truncate + if @data.position < @data.length + curr_data = @data + @data = ByteBuffer.new + @data.put_array(curr_data.to_a[0...curr_data.position]) + end + end + + def getc + @data.more? ? @data.get : nil + end + + def putc(byte) + @data.put(byte) + end + + def save + @coll.remove({'_id' => @object_id}) if @object_id + @coll.insert(to_mongo_object) + end + + def to_mongo_object + h = OrderedHash.new + h['_id'] = @object_id + h['cn'] = @chunk_number + h['data'] = data + h['next'] = @next_chunk_dbref + h + end + + end + end + end +end diff --git a/lib/mongo/gridfs/grid_store.rb b/lib/mongo/gridfs/grid_store.rb new file mode 100644 index 0000000..a76f2ce --- /dev/null +++ b/lib/mongo/gridfs/grid_store.rb @@ -0,0 +1,397 @@ +require 'mongo/types/dbref' +require 'mongo/types/objectid' +require 'mongo/util/ordered_hash' +require 'mongo/gridfs/chunk' + +module XGen + module Mongo + module GridFS + + # GridStore is an IO-like object that provides input and output for + # streams of data to Mongo. See Mongo's documentation about GridFS for + # storage implementation details. + # + # Example code: + # + # GridStore.open(database, 'filename', 'w') { |f| + # f.puts "Hello, world!" + # } + # GridStore.open(database, 'filename, 'r') { |f| + # puts f.read # => Hello, world!\n + # } + # GridStore.open(database, 'filename', 'w+") { |f| + # f.puts "But wait, there's more!" + # } + # GridStore.open(database, 'filename, 'r') { |f| + # puts f.read # => Hello, world!\nBut wait, there's more!\n + # } + class GridStore + + include Enumerable + + attr_accessor :filename + + # Array of strings; may be +nil+ + attr_accessor :aliases + + # Default is 'text/plain' + attr_accessor :content_type + + attr_reader :object_id, :upload_date + + attr_reader :chunk_size + + attr_accessor :lineno + + class << self + + def exist?(db, name) + db.collection('_files').find({'filename' => name}).next_object.nil? + end + + def open(db, name, mode) + gs = self.new(db, name, mode) + result = nil + begin + result = yield gs if block_given? + ensure + gs.close + end + result + end + + def read(db, name, length=nil, offset=nil) + GridStore.open(db, name, 'r') { |gs| + gs.seek(offset) if offset + gs.read(length) + } + end + + def readlines(db, name, separator=$/) + GridStore.open(db, name, 'r') { |gs| + gs.readlines(separator) + } + end + + def unlink(db, *names) + names.each { |name| + gs = GridStore.new(db, name) + gs.send(:delete_chunks) + db.collection('_files').remove('_id' => gs.object_id) + } + end + alias_method :delete, :unlink + + end + + #--- + # ================================================================ + #+++ + + # Mode may only be 'r', 'w', or 'w+'. + def initialize(db, name, mode='r') + @db, @filename, @mode = db, name, mode + + doc = @db.collection('_files').find({'filename' => @filename}).next_object + if doc + @object_id = doc['_id'] + @content_type = doc['contentType'] + @chunk_size = doc['chunkSize'] + @upload_date = doc['uploadDate'] + @aliases = doc['aliases'] + @length = doc['length'] + fc_id = doc['next'] + if fc_id + coll = @db.collection('_chunks') + row = coll.find({'_id' => fc_id.object_id}).next_object + @first_chunk = row ? Chunk.new(coll, row) : nil + else + @first_chunk = nil + end + else + @upload_date = Time.new + @chunk_size = Chunk::DEFAULT_CHUNK_SIZE + @content_type = 'text/plain' + @length = 0 + end + + case mode + when 'r' + @curr_chunk = @first_chunk + when 'w' + delete_chunks + @first_chunk = @curr_chunk = nil + when 'w+' + @curr_chunk = find_last_chunk + @curr_chunk.pos = @curr_chunk.data.length if @curr_chunk + end + + @lineno = 0 + @pushback_byte = nil + end + + # Change chunk size. Can only change if the file is opened for write + # and the first chunk's size is zero. + def chunk_size=(size) + unless @mode[0] == ?w && @first_chunk == nil + raise "error: can only change chunk size if open for write and no data written." + end + @chunk_size = size + end + + #--- + # ================ reading ================ + #+++ + + def getc + if @pushback_byte + byte = @pushback_byte + @pushback_byte = nil + byte + elsif eof? + nil + else + if @curr_chunk.eof? + @curr_chunk = @curr_chunk.next + end + @curr_chunk.getc + end + end + + def gets(separator=$/) + str = '' + byte = getc + return nil if byte == nil # EOF + while byte != nil + s = byte.chr + str << s + break if s == separator + byte = getc + end + @lineno += 1 + str + end + + def read(len=nil, buf=nil) + buf ||= '' + byte = getc + while byte != nil && (len == nil || len > 0) + buf << byte.chr + len -= 1 if len + byte = getc if (len == nil || len > 0) + end + buf + end + + def readchar + byte = getc + raise EOFError.new if byte == nil + byte + end + + def readline(separator=$/) + line = gets + raise EOFError.new if line == nil + line + end + + def readlines(separator=$/) + read.split(separator).collect { |line| "#{line}#{separator}" } + end + + def each + line = gets + while line + yield line + line = gets + end + end + alias_method :each_line, :each + + def each_byte + byte = getc + while byte + yield byte + byte = getc + end + end + + def ungetc(byte) + @pushback_byte = byte + end + + #--- + # ================ writing ================ + #+++ + + def putc(byte) + chunks = @db.collection('_chunks') + if @curr_chunk == nil + @first_chunk = @curr_chunk = Chunk.new(chunks, 'cn' => 1) + elsif @curr_chunk.pos == @chunk_size + prev_chunk = @curr_chunk + @curr_chunk = Chunk.new(chunks, 'cn' => prev_chunk.chunk_number + 1) + prev_chunk.next = @curr_chunk + prev_chunk.save + end + @curr_chunk.putc(byte) + end + + def print(*objs) + objs = [$_] if objs == nil || objs.empty? + objs.each { |obj| + str = obj.to_s + str.each_byte { |byte| putc(byte) } + } + nil + end + + def puts(*objs) + if objs == nil || objs.empty? + putc(10) + else + print(*objs.collect{ |obj| + str = obj.to_s + str << "\n" unless str =~ /\n$/ + str + }) + end + nil + end + + def <<(obj) + write(obj.to_s) + end + + # Writes +string+ as bytes and returns the number of bytes written. + def write(string) + raise "#@filename not opened for write" unless @mode[0] == ?w + count = 0 + string.each_byte { |byte| + putc byte + count += 1 + } + count + end + + # A no-op. + def flush + end + + #--- + # ================ status ================ + #+++ + + def eof + raise IOError.new("stream not open for reading") unless @mode[0] == ?r + @curr_chunk == nil || (@curr_chunk.eof? && !@curr_chunk.has_next?) + end + alias_method :eof?, :eof + + #--- + # ================ positioning ================ + #+++ + + def rewind + if @curr_chunk != @first_chunk + @curr_chunk.save unless @curr_chunk == nil || @curr_chunk.empty? + @curr_chunk == @first_chunk + end + @curr_chunk.pos = 0 + @lineno = 0 + # TODO if writing, delete all other chunks on first write + end + + def seek(pos, whence=IO::SEEK_SET) +# target_pos = case whence +# when IO::SEEK_CUR +# tell + pos +# when IO::SEEK_END + +# @curr_chunk.save if @curr_chunk +# target_chunk_num = ((pos / @chunk_size) + 1).to_i +# target_chunk_pos = pos % @chunk_size +# if @curr_chunk == nil || @curr_chunk.chunk_number != target_chunk_num +# end +# @curr_chunk.pos = target_chunk_pos +# 0 + # TODO + raise "not yet implemented" + end + + def tell + return 0 unless @curr_chunk + @chunk_size * (@curr_chunk.chunk_number - 1) + @curr_chunk.pos + end + + #--- + # ================ closing ================ + #+++ + + def close + if @mode[0] == ?w + if @curr_chunk + @curr_chunk.truncate + @curr_chunk.save + end + files = @db.collection('_files') + if @object_id + files.remove('_id' => @object_id) + else + @object_id = XGen::Mongo::Driver::ObjectID.new + end + files.insert(to_mongo_object) + end + @db = nil + end + + def closed? + @db == nil + end + + #--- + # ================ protected ================ + #+++ + + protected + + def to_mongo_object + h = OrderedHash.new + h['_id'] = @object_id + h['filename'] = @filename + h['contentType'] = @content_type + h['length'] = @curr_chunk ? (@curr_chunk.chunk_number - 1) * @chunk_size + @curr_chunk.pos : 0 + h['chunkSize'] = @chunk_size + h['uploadDate'] = @upload_date + h['aliases'] = @aliases + h['next'] = XGen::Mongo::Driver::DBRef.new(nil, nil, @db, '_chunks', @first_chunk.object_id) if @first_chunk + h + end + + def find_last_chunk + chunk = @curr_chunk || @first_chunk + while chunk.has_next? + chunk = chunk.next + end + chunk + end + + def save_chunk(chunk) + chunks = @db.collection('_chunks') + end + + def delete_chunks + chunk = @first_chunk + coll = @db.collection('_chunks') + while chunk + next_chunk = chunk.next + coll.remove({'_id' => chunk.object_id}) + chunk = next_chunk + end + @first_chunk = @curr_chunk = nil + end + + end + end + end +end diff --git a/mongo-ruby-driver.gemspec b/mongo-ruby-driver.gemspec index cab37c8..5b4dc34 100644 --- a/mongo-ruby-driver.gemspec +++ b/mongo-ruby-driver.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'mongo' - s.version = '0.3.3' + s.version = '0.4.0' s.platform = Gem::Platform::RUBY s.summary = 'Simple pure-Ruby driver for the 10gen Mongo DB' s.description = 'A pure-Ruby driver for the 10gen Mongo DB. For more information about Mongo, see http://www.mongodb.org.' diff --git a/tests/test_chunk.rb b/tests/test_chunk.rb new file mode 100644 index 0000000..f1d8762 --- /dev/null +++ b/tests/test_chunk.rb @@ -0,0 +1,108 @@ +$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') +require 'test/unit' +require 'rubygems' +require 'mongo' +require 'mongo/gridfs' + +class ChunkTest < Test::Unit::TestCase + + include XGen::Mongo::Driver + include XGen::Mongo::GridFS + + def setup + @host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost' + @port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT + @db = Mongo.new(@host, @port).db('ruby-mongo-utils-test') + + @files = @db.collection('_files') + @chunks = @db.collection('_chunks') + @chunks.clear + @files.clear + + @c = Chunk.new(@chunks) + end + + def teardown + if @db && @db.connected? + @chunks.clear + @files.clear + @db.close + end + end + + def test_has_next + assert !@c.has_next? + @c.next = Chunk.new(@chunks) + assert @c.has_next? + end + + def test_assign_next + assert !@c.has_next? + assert_nil @c.next + + c2 = Chunk.new(@chunks) + @c.next = c2 + assert_same c2, @c.next + end + + def test_pos + assert_equal 0, @c.pos + assert @c.eof? # since data is empty + + b = ByteBuffer.new + 3.times { |i| b.put(i) } + c = Chunk.new(@db, 'data' => b) + assert !c.eof? + end + + def test_getc + b = ByteBuffer.new + 3.times { |i| b.put(i) } + c = Chunk.new(@chunks, 'data' => b) + + assert !c.eof? + assert_equal 0, c.getc + assert !c.eof? + assert_equal 1, c.getc + assert !c.eof? + assert_equal 2, c.getc + assert c.eof? + end + + def test_putc + 3.times { |i| @c.putc(i) } + @c.pos = 0 + + assert !@c.eof? + assert_equal 0, @c.getc + assert !@c.eof? + assert_equal 1, @c.getc + assert !@c.eof? + assert_equal 2, @c.getc + assert @c.eof? + end + + def test_empty + assert @c.empty? + @c.putc(1) + assert !@c.empty? + end + + def test_truncate + 10.times { |i| @c.putc(i) } + assert_equal 10, @c.size + @c.pos = 3 + @c.truncate + assert_equal 3, @c.size + + @c.pos = 0 + assert !@c.eof? + assert_equal 0, @c.getc + assert !@c.eof? + assert_equal 1, @c.getc + assert !@c.eof? + assert_equal 2, @c.getc + assert @c.eof? + end + +end diff --git a/tests/test_grid_store.rb b/tests/test_grid_store.rb new file mode 100644 index 0000000..5f19cc9 --- /dev/null +++ b/tests/test_grid_store.rb @@ -0,0 +1,177 @@ +$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') +require 'test/unit' +require 'rubygems' +require 'mongo' +require 'mongo/gridfs' + +class GridStoreTest < Test::Unit::TestCase + + include XGen::Mongo::Driver + include XGen::Mongo::GridFS + + def setup + @host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost' + @port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT + @db = Mongo.new(@host, @port).db('ruby-mongo-utils-test') + + @files = @db.collection('_files') + @chunks = @db.collection('_chunks') + @chunks.clear + @files.clear + + GridStore.open(@db, 'foobar', 'w') { |f| f.write("hello, world!") } + end + + def teardown + if @db && @db.connected? + @chunks.clear + @files.clear + @db.close + end + end + + def test_small_write + rows = @files.find({'filename' => 'foobar'}).to_a + assert_not_nil rows + assert_equal 1, rows.length + assert_equal 1, rows.length + row = rows[0] + assert_not_nil row + + assert_kind_of DBRef, row['next'] + first_chunk_id = row['next'].object_id + first_chunk = @chunks.find({'_id' => first_chunk_id}).next_object + end + + def test_small_file + rows = @files.find({'filename' => 'foobar'}).to_a + assert_not_nil rows + assert_equal 1, rows.length + row = rows[0] + assert_not_nil row + assert_equal "hello, world!", GridStore.read(@db, 'foobar') + end + + def test_overwrite + GridStore.open(@db, 'foobar', 'w') { |f| f.write("overwrite") } + assert_equal "overwrite", GridStore.read(@db, 'foobar') + end + + def test_read_length + assert_equal "hello", GridStore.read(@db, 'foobar', 5) + end + +# TODO seek not yet implemented +# def test_read_with_offset +# assert_equal "world", GridStore.read(@db, 'foobar', 5, 7) +# assert_equal "world!", GridStore.read(@db, 'foobar', nil, 7) +# end + + def test_multi_chunk + @chunks.clear + @files.clear + + size = 512 + GridStore.open(@db, 'biggie', 'w') { |f| + f.chunk_size = size + f.write('x' * size) + f.write('y' * size) + f.write('z' * size) + } + + assert_equal 3, @chunks.count + assert_equal ('x' * size) + ('y' * size) + ('z' * size), GridStore.read(@db, 'biggie') + end + + def test_puts_and_readlines + GridStore.open(@db, 'multiline', 'w') { |f| + f.puts "line one" + f.puts "line two\n" + f.puts "line three" + } + + lines = GridStore.readlines(@db, 'multiline') + assert_equal ["line one\n", "line two\n", "line three\n"], lines + end + + def test_unlink + assert_equal 1, @files.count + assert_equal 1, @chunks.count + GridStore.unlink(@db, 'foobar') + assert_equal 0, @files.count + assert_equal 0, @chunks.count + end + + def test_append + GridStore.open(@db, 'foobar', 'w+') { |f| f.write(" how are you?") } + assert_equal 1, @chunks.count + assert_equal "hello, world! how are you?", GridStore.read(@db, 'foobar') + end + + def test_rewind_and_truncate_on_write + GridStore.open(@db, 'foobar', 'w') { |f| + f.write("some text is inserted here") + f.rewind + f.write("abc") + } + assert_equal "abc", GridStore.read(@db, 'foobar') + end + + def test_tell + GridStore.open(@db, 'foobar', 'r') { |f| + f.read(5) + assert_equal 5, f.tell + } + end + + def test_empty_block_ok + GridStore.open(@db, 'empty', 'w') + end + + def test_save_empty_file + @chunks.clear + @files.clear + GridStore.open(@db, 'empty', 'w') + assert_equal 1, @files.count + assert_equal 0, @chunks.count + end + + def test_empty_file_eof + GridStore.open(@db, 'empty', 'w') + GridStore.open(@db, 'empty', 'r') { |f| + assert f.eof? + } + end + + def test_cannot_change_chunk_size_on_read + begin + GridStore.open(@db, 'foobar', 'r') { |f| f.chunk_size = 42 } + fail "should have seen error" + rescue => ex + assert_match /error: can only change chunk size/, ex.to_s + end + end + + def test_cannot_change_chunk_size_after_data_written + begin + GridStore.open(@db, 'foobar', 'w') { |f| + f.write("some text") + f.chunk_size = 42 + } + fail "should have seen error" + rescue => ex + assert_match /error: can only change chunk size/, ex.to_s + end + end + + def test_change_chunk_size + GridStore.open(@db, 'new-file', 'w') { |f| + f.chunk_size = 42 + f.write("foo") + } + GridStore.open(@db, 'new-file', 'r') { |f| + assert f.chunk_size == 42 + } + end + +end