GridStore (GridFS) implementation.

This commit is contained in:
Jim Menard 2009-01-29 11:23:50 -05:00
parent 8a6d0cebda
commit 0ef78c46c4
7 changed files with 804 additions and 1 deletions

View File

@ -25,6 +25,10 @@ A quick code sample:
puts "There are #{coll.count()} records. Here they are:"
coll.find().each { |doc| puts doc.inspect }
This driver also includes an implementation of a GridStore class, a Ruby
interface to Mongo's GridFS storage. NOTE: the GridStore code may be moved to
a separate project.
= Installation
Install the "mongo" gem by typing
@ -54,6 +58,15 @@ be running, of course.
See also the test code, especially tests/test_db_api.rb.
For the GridFS class GridStore, see the tests.
= GridStore
The GridStore class is a Ruby implementation of Mongo's GridFS file storage
system. An instance of GridStore is like an IO object. See the rdocs for
details.
= Notes

1
lib/mongo/gridfs.rb Normal file
View File

@ -0,0 +1 @@
require 'mongo/gridfs/grid_store'

107
lib/mongo/gridfs/chunk.rb Normal file
View File

@ -0,0 +1,107 @@
require 'mongo/types/binary'
require 'mongo/types/dbref'
require 'mongo/types/objectid'
require 'mongo/util/byte_buffer'
require 'mongo/util/ordered_hash'
module XGen
module Mongo
module GridFS
# A chunk stores a portion of GridStore data.
#
# TODO: user-defined chunk size
class Chunk
DEFAULT_CHUNK_SIZE = 1024 * 256
attr_reader :object_id, :chunk_number
attr_accessor :data
def initialize(db_collection, mongo_object={})
@coll = db_collection
@object_id = mongo_object['_id'] || XGen::Mongo::Driver::ObjectID.new
@chunk_number = mongo_object['cn'] || 1
@data = ByteBuffer.new
case mongo_object['data']
when String
mongo_object['data'].each_byte { |b| @data.put(b) }
when ByteBuffer
@data.put_array(mongo_object['data'].to_a)
when Array
@data.put_array(mongo_object['data'])
end
@data.rewind
@next_chunk_dbref = mongo_object['next']
end
def has_next?
@next_chunk_dbref
end
def next
return @next_chunk if @next_chunk
return nil unless @next_chunk_dbref
row = @coll.find({'_id' => @next_chunk_dbref.object_id}).next_object
@next_chunk = self.class.new(@coll, row) if row
@next_chunk
end
def next=(chunk)
@next_chunk = chunk
@next_chunk_dbref = XGen::Mongo::Driver::DBRef.new(nil, nil, @coll.db, '_chunks', chunk.object_id)
end
def pos; @data.position; end
def pos=(pos); @data.position = pos; end
def eof?; !@data.more?; end
def size; @data.size; end
alias_method :length, :size
def empty?
@data.length == 0
end
def clear
@data.clear
end
# Erase all data after current position.
def truncate
if @data.position < @data.length
curr_data = @data
@data = ByteBuffer.new
@data.put_array(curr_data.to_a[0...curr_data.position])
end
end
def getc
@data.more? ? @data.get : nil
end
def putc(byte)
@data.put(byte)
end
def save
@coll.remove({'_id' => @object_id}) if @object_id
@coll.insert(to_mongo_object)
end
def to_mongo_object
h = OrderedHash.new
h['_id'] = @object_id
h['cn'] = @chunk_number
h['data'] = data
h['next'] = @next_chunk_dbref
h
end
end
end
end
end

View File

@ -0,0 +1,397 @@
require 'mongo/types/dbref'
require 'mongo/types/objectid'
require 'mongo/util/ordered_hash'
require 'mongo/gridfs/chunk'
module XGen
module Mongo
module GridFS
# GridStore is an IO-like object that provides input and output for
# streams of data to Mongo. See Mongo's documentation about GridFS for
# storage implementation details.
#
# Example code:
#
# GridStore.open(database, 'filename', 'w') { |f|
# f.puts "Hello, world!"
# }
# GridStore.open(database, 'filename, 'r') { |f|
# puts f.read # => Hello, world!\n
# }
# GridStore.open(database, 'filename', 'w+") { |f|
# f.puts "But wait, there's more!"
# }
# GridStore.open(database, 'filename, 'r') { |f|
# puts f.read # => Hello, world!\nBut wait, there's more!\n
# }
class GridStore
include Enumerable
attr_accessor :filename
# Array of strings; may be +nil+
attr_accessor :aliases
# Default is 'text/plain'
attr_accessor :content_type
attr_reader :object_id, :upload_date
attr_reader :chunk_size
attr_accessor :lineno
class << self
def exist?(db, name)
db.collection('_files').find({'filename' => name}).next_object.nil?
end
def open(db, name, mode)
gs = self.new(db, name, mode)
result = nil
begin
result = yield gs if block_given?
ensure
gs.close
end
result
end
def read(db, name, length=nil, offset=nil)
GridStore.open(db, name, 'r') { |gs|
gs.seek(offset) if offset
gs.read(length)
}
end
def readlines(db, name, separator=$/)
GridStore.open(db, name, 'r') { |gs|
gs.readlines(separator)
}
end
def unlink(db, *names)
names.each { |name|
gs = GridStore.new(db, name)
gs.send(:delete_chunks)
db.collection('_files').remove('_id' => gs.object_id)
}
end
alias_method :delete, :unlink
end
#---
# ================================================================
#+++
# Mode may only be 'r', 'w', or 'w+'.
def initialize(db, name, mode='r')
@db, @filename, @mode = db, name, mode
doc = @db.collection('_files').find({'filename' => @filename}).next_object
if doc
@object_id = doc['_id']
@content_type = doc['contentType']
@chunk_size = doc['chunkSize']
@upload_date = doc['uploadDate']
@aliases = doc['aliases']
@length = doc['length']
fc_id = doc['next']
if fc_id
coll = @db.collection('_chunks')
row = coll.find({'_id' => fc_id.object_id}).next_object
@first_chunk = row ? Chunk.new(coll, row) : nil
else
@first_chunk = nil
end
else
@upload_date = Time.new
@chunk_size = Chunk::DEFAULT_CHUNK_SIZE
@content_type = 'text/plain'
@length = 0
end
case mode
when 'r'
@curr_chunk = @first_chunk
when 'w'
delete_chunks
@first_chunk = @curr_chunk = nil
when 'w+'
@curr_chunk = find_last_chunk
@curr_chunk.pos = @curr_chunk.data.length if @curr_chunk
end
@lineno = 0
@pushback_byte = nil
end
# Change chunk size. Can only change if the file is opened for write
# and the first chunk's size is zero.
def chunk_size=(size)
unless @mode[0] == ?w && @first_chunk == nil
raise "error: can only change chunk size if open for write and no data written."
end
@chunk_size = size
end
#---
# ================ reading ================
#+++
def getc
if @pushback_byte
byte = @pushback_byte
@pushback_byte = nil
byte
elsif eof?
nil
else
if @curr_chunk.eof?
@curr_chunk = @curr_chunk.next
end
@curr_chunk.getc
end
end
def gets(separator=$/)
str = ''
byte = getc
return nil if byte == nil # EOF
while byte != nil
s = byte.chr
str << s
break if s == separator
byte = getc
end
@lineno += 1
str
end
def read(len=nil, buf=nil)
buf ||= ''
byte = getc
while byte != nil && (len == nil || len > 0)
buf << byte.chr
len -= 1 if len
byte = getc if (len == nil || len > 0)
end
buf
end
def readchar
byte = getc
raise EOFError.new if byte == nil
byte
end
def readline(separator=$/)
line = gets
raise EOFError.new if line == nil
line
end
def readlines(separator=$/)
read.split(separator).collect { |line| "#{line}#{separator}" }
end
def each
line = gets
while line
yield line
line = gets
end
end
alias_method :each_line, :each
def each_byte
byte = getc
while byte
yield byte
byte = getc
end
end
def ungetc(byte)
@pushback_byte = byte
end
#---
# ================ writing ================
#+++
def putc(byte)
chunks = @db.collection('_chunks')
if @curr_chunk == nil
@first_chunk = @curr_chunk = Chunk.new(chunks, 'cn' => 1)
elsif @curr_chunk.pos == @chunk_size
prev_chunk = @curr_chunk
@curr_chunk = Chunk.new(chunks, 'cn' => prev_chunk.chunk_number + 1)
prev_chunk.next = @curr_chunk
prev_chunk.save
end
@curr_chunk.putc(byte)
end
def print(*objs)
objs = [$_] if objs == nil || objs.empty?
objs.each { |obj|
str = obj.to_s
str.each_byte { |byte| putc(byte) }
}
nil
end
def puts(*objs)
if objs == nil || objs.empty?
putc(10)
else
print(*objs.collect{ |obj|
str = obj.to_s
str << "\n" unless str =~ /\n$/
str
})
end
nil
end
def <<(obj)
write(obj.to_s)
end
# Writes +string+ as bytes and returns the number of bytes written.
def write(string)
raise "#@filename not opened for write" unless @mode[0] == ?w
count = 0
string.each_byte { |byte|
putc byte
count += 1
}
count
end
# A no-op.
def flush
end
#---
# ================ status ================
#+++
def eof
raise IOError.new("stream not open for reading") unless @mode[0] == ?r
@curr_chunk == nil || (@curr_chunk.eof? && !@curr_chunk.has_next?)
end
alias_method :eof?, :eof
#---
# ================ positioning ================
#+++
def rewind
if @curr_chunk != @first_chunk
@curr_chunk.save unless @curr_chunk == nil || @curr_chunk.empty?
@curr_chunk == @first_chunk
end
@curr_chunk.pos = 0
@lineno = 0
# TODO if writing, delete all other chunks on first write
end
def seek(pos, whence=IO::SEEK_SET)
# target_pos = case whence
# when IO::SEEK_CUR
# tell + pos
# when IO::SEEK_END
# @curr_chunk.save if @curr_chunk
# target_chunk_num = ((pos / @chunk_size) + 1).to_i
# target_chunk_pos = pos % @chunk_size
# if @curr_chunk == nil || @curr_chunk.chunk_number != target_chunk_num
# end
# @curr_chunk.pos = target_chunk_pos
# 0
# TODO
raise "not yet implemented"
end
def tell
return 0 unless @curr_chunk
@chunk_size * (@curr_chunk.chunk_number - 1) + @curr_chunk.pos
end
#---
# ================ closing ================
#+++
def close
if @mode[0] == ?w
if @curr_chunk
@curr_chunk.truncate
@curr_chunk.save
end
files = @db.collection('_files')
if @object_id
files.remove('_id' => @object_id)
else
@object_id = XGen::Mongo::Driver::ObjectID.new
end
files.insert(to_mongo_object)
end
@db = nil
end
def closed?
@db == nil
end
#---
# ================ protected ================
#+++
protected
def to_mongo_object
h = OrderedHash.new
h['_id'] = @object_id
h['filename'] = @filename
h['contentType'] = @content_type
h['length'] = @curr_chunk ? (@curr_chunk.chunk_number - 1) * @chunk_size + @curr_chunk.pos : 0
h['chunkSize'] = @chunk_size
h['uploadDate'] = @upload_date
h['aliases'] = @aliases
h['next'] = XGen::Mongo::Driver::DBRef.new(nil, nil, @db, '_chunks', @first_chunk.object_id) if @first_chunk
h
end
def find_last_chunk
chunk = @curr_chunk || @first_chunk
while chunk.has_next?
chunk = chunk.next
end
chunk
end
def save_chunk(chunk)
chunks = @db.collection('_chunks')
end
def delete_chunks
chunk = @first_chunk
coll = @db.collection('_chunks')
while chunk
next_chunk = chunk.next
coll.remove({'_id' => chunk.object_id})
chunk = next_chunk
end
@first_chunk = @curr_chunk = nil
end
end
end
end
end

View File

@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'mongo'
s.version = '0.3.3'
s.version = '0.4.0'
s.platform = Gem::Platform::RUBY
s.summary = 'Simple pure-Ruby driver for the 10gen Mongo DB'
s.description = 'A pure-Ruby driver for the 10gen Mongo DB. For more information about Mongo, see http://www.mongodb.org.'

108
tests/test_chunk.rb Normal file
View File

@ -0,0 +1,108 @@
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'test/unit'
require 'rubygems'
require 'mongo'
require 'mongo/gridfs'
class ChunkTest < Test::Unit::TestCase
include XGen::Mongo::Driver
include XGen::Mongo::GridFS
def setup
@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT
@db = Mongo.new(@host, @port).db('ruby-mongo-utils-test')
@files = @db.collection('_files')
@chunks = @db.collection('_chunks')
@chunks.clear
@files.clear
@c = Chunk.new(@chunks)
end
def teardown
if @db && @db.connected?
@chunks.clear
@files.clear
@db.close
end
end
def test_has_next
assert !@c.has_next?
@c.next = Chunk.new(@chunks)
assert @c.has_next?
end
def test_assign_next
assert !@c.has_next?
assert_nil @c.next
c2 = Chunk.new(@chunks)
@c.next = c2
assert_same c2, @c.next
end
def test_pos
assert_equal 0, @c.pos
assert @c.eof? # since data is empty
b = ByteBuffer.new
3.times { |i| b.put(i) }
c = Chunk.new(@db, 'data' => b)
assert !c.eof?
end
def test_getc
b = ByteBuffer.new
3.times { |i| b.put(i) }
c = Chunk.new(@chunks, 'data' => b)
assert !c.eof?
assert_equal 0, c.getc
assert !c.eof?
assert_equal 1, c.getc
assert !c.eof?
assert_equal 2, c.getc
assert c.eof?
end
def test_putc
3.times { |i| @c.putc(i) }
@c.pos = 0
assert !@c.eof?
assert_equal 0, @c.getc
assert !@c.eof?
assert_equal 1, @c.getc
assert !@c.eof?
assert_equal 2, @c.getc
assert @c.eof?
end
def test_empty
assert @c.empty?
@c.putc(1)
assert !@c.empty?
end
def test_truncate
10.times { |i| @c.putc(i) }
assert_equal 10, @c.size
@c.pos = 3
@c.truncate
assert_equal 3, @c.size
@c.pos = 0
assert !@c.eof?
assert_equal 0, @c.getc
assert !@c.eof?
assert_equal 1, @c.getc
assert !@c.eof?
assert_equal 2, @c.getc
assert @c.eof?
end
end

177
tests/test_grid_store.rb Normal file
View File

@ -0,0 +1,177 @@
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'test/unit'
require 'rubygems'
require 'mongo'
require 'mongo/gridfs'
class GridStoreTest < Test::Unit::TestCase
include XGen::Mongo::Driver
include XGen::Mongo::GridFS
def setup
@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT
@db = Mongo.new(@host, @port).db('ruby-mongo-utils-test')
@files = @db.collection('_files')
@chunks = @db.collection('_chunks')
@chunks.clear
@files.clear
GridStore.open(@db, 'foobar', 'w') { |f| f.write("hello, world!") }
end
def teardown
if @db && @db.connected?
@chunks.clear
@files.clear
@db.close
end
end
def test_small_write
rows = @files.find({'filename' => 'foobar'}).to_a
assert_not_nil rows
assert_equal 1, rows.length
assert_equal 1, rows.length
row = rows[0]
assert_not_nil row
assert_kind_of DBRef, row['next']
first_chunk_id = row['next'].object_id
first_chunk = @chunks.find({'_id' => first_chunk_id}).next_object
end
def test_small_file
rows = @files.find({'filename' => 'foobar'}).to_a
assert_not_nil rows
assert_equal 1, rows.length
row = rows[0]
assert_not_nil row
assert_equal "hello, world!", GridStore.read(@db, 'foobar')
end
def test_overwrite
GridStore.open(@db, 'foobar', 'w') { |f| f.write("overwrite") }
assert_equal "overwrite", GridStore.read(@db, 'foobar')
end
def test_read_length
assert_equal "hello", GridStore.read(@db, 'foobar', 5)
end
# TODO seek not yet implemented
# def test_read_with_offset
# assert_equal "world", GridStore.read(@db, 'foobar', 5, 7)
# assert_equal "world!", GridStore.read(@db, 'foobar', nil, 7)
# end
def test_multi_chunk
@chunks.clear
@files.clear
size = 512
GridStore.open(@db, 'biggie', 'w') { |f|
f.chunk_size = size
f.write('x' * size)
f.write('y' * size)
f.write('z' * size)
}
assert_equal 3, @chunks.count
assert_equal ('x' * size) + ('y' * size) + ('z' * size), GridStore.read(@db, 'biggie')
end
def test_puts_and_readlines
GridStore.open(@db, 'multiline', 'w') { |f|
f.puts "line one"
f.puts "line two\n"
f.puts "line three"
}
lines = GridStore.readlines(@db, 'multiline')
assert_equal ["line one\n", "line two\n", "line three\n"], lines
end
def test_unlink
assert_equal 1, @files.count
assert_equal 1, @chunks.count
GridStore.unlink(@db, 'foobar')
assert_equal 0, @files.count
assert_equal 0, @chunks.count
end
def test_append
GridStore.open(@db, 'foobar', 'w+') { |f| f.write(" how are you?") }
assert_equal 1, @chunks.count
assert_equal "hello, world! how are you?", GridStore.read(@db, 'foobar')
end
def test_rewind_and_truncate_on_write
GridStore.open(@db, 'foobar', 'w') { |f|
f.write("some text is inserted here")
f.rewind
f.write("abc")
}
assert_equal "abc", GridStore.read(@db, 'foobar')
end
def test_tell
GridStore.open(@db, 'foobar', 'r') { |f|
f.read(5)
assert_equal 5, f.tell
}
end
def test_empty_block_ok
GridStore.open(@db, 'empty', 'w')
end
def test_save_empty_file
@chunks.clear
@files.clear
GridStore.open(@db, 'empty', 'w')
assert_equal 1, @files.count
assert_equal 0, @chunks.count
end
def test_empty_file_eof
GridStore.open(@db, 'empty', 'w')
GridStore.open(@db, 'empty', 'r') { |f|
assert f.eof?
}
end
def test_cannot_change_chunk_size_on_read
begin
GridStore.open(@db, 'foobar', 'r') { |f| f.chunk_size = 42 }
fail "should have seen error"
rescue => ex
assert_match /error: can only change chunk size/, ex.to_s
end
end
def test_cannot_change_chunk_size_after_data_written
begin
GridStore.open(@db, 'foobar', 'w') { |f|
f.write("some text")
f.chunk_size = 42
}
fail "should have seen error"
rescue => ex
assert_match /error: can only change chunk size/, ex.to_s
end
end
def test_change_chunk_size
GridStore.open(@db, 'new-file', 'w') { |f|
f.chunk_size = 42
f.write("foo")
}
GridStore.open(@db, 'new-file', 'r') { |f|
assert f.chunk_size == 42
}
end
end