Initial GridFS refactoring
This commit is contained in:
parent
7863b37f07
commit
f158aa13af
@ -9,23 +9,40 @@ include GridFS
|
||||
db = Connection.new['benchmark-gridfs']
|
||||
sample_data = File.open(File.join(File.dirname(__FILE__), 'sample_file.pdf'), 'r').read
|
||||
GridStore.delete(db, 'mongodb.pdf')
|
||||
GridStore.delete(db, 'mongodb-new.pdf')
|
||||
|
||||
length = sample_data.length
|
||||
mb = length / 1048576.0
|
||||
|
||||
t1 = Time.now
|
||||
#RubyProf.start
|
||||
@grid = Grid.new(db)
|
||||
@grid.open('mongodb-new.pdf', 'w') do |f|
|
||||
f.write(sample_data)
|
||||
end
|
||||
puts "Write: #{mb / (Time.now - t1)} mb/s"
|
||||
|
||||
t1 = Time.now
|
||||
GridStore.open(db, 'mongodb.pdf', 'w') do |f|
|
||||
f.write(sample_data)
|
||||
end
|
||||
#result = RubyProf.stop
|
||||
puts "Write: #{mb / (Time.now - t1)} mb/s"
|
||||
#printer = RubyProf::FlatPrinter.new(result)
|
||||
#printer.print(STDOUT, 0)
|
||||
|
||||
|
||||
t1 = Time.now
|
||||
GridStore.open(db, 'mongodb.pdf', 'r') do |f|
|
||||
data = f.read
|
||||
@grid = Grid.new(db)
|
||||
data = @grid.open('mongodb-new.pdf', 'r') do |f|
|
||||
f.read
|
||||
end
|
||||
puts "Read new: #{mb / (Time.now - t1)} mb/s"
|
||||
file = db['fs.files'].find_one({:filename => 'mongodb-new.pdf'})
|
||||
p file
|
||||
puts
|
||||
p db['fs.chunks'].find({:files_id => file['_id']}, {:fields => ['files_id']}).to_a
|
||||
|
||||
t1 = Time.now
|
||||
old_data = GridStore.open(db, 'mongodb.pdf', 'r') do |f|
|
||||
f.read
|
||||
end
|
||||
puts "Read: #{mb / (Time.now - t1)} mb/s"
|
||||
|
||||
puts sample_data == old_data
|
||||
puts sample_data == data
|
||||
|
@ -58,4 +58,5 @@ require 'mongo/connection'
|
||||
require 'mongo/cursor'
|
||||
require 'mongo/db'
|
||||
require 'mongo/exceptions'
|
||||
require 'mongo/gridfs'
|
||||
require 'mongo/gridfs/grid'
|
||||
require 'mongo/gridfs/grid_io'
|
||||
|
@ -552,7 +552,7 @@ EOS
|
||||
# Note: If operating in auth mode, the client must be authorized as an admin to
|
||||
# perform this operation.
|
||||
#
|
||||
# @param [String ] new_name the new name for this collection
|
||||
# @param [String] new_name the new name for this collection
|
||||
#
|
||||
# @raise [InvalidName] if +new_name+ is an invalid collection name.
|
||||
def rename(new_name)
|
||||
|
@ -24,6 +24,9 @@ module Mongo
|
||||
# Raised when configuration options cause connections, queries, etc., to fail.
|
||||
class ConfigurationError < MongoRubyError; end
|
||||
|
||||
# Raised with fatal errors to GridFS.
|
||||
class GridError < MongoRubyError; end
|
||||
|
||||
# Raised when invalid arguments are sent to Mongo Ruby methods.
|
||||
class MongoArgumentError < MongoRubyError; end
|
||||
|
||||
|
55
lib/mongo/gridfs/grid.rb
Normal file
55
lib/mongo/gridfs/grid.rb
Normal file
@ -0,0 +1,55 @@
|
||||
# --
|
||||
# Copyright (C) 2008-2009 10gen Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# ++
|
||||
|
||||
# GridFS is a specification for storing large objects in MongoDB.
|
||||
# See the documentation for GridFS::GridStore
|
||||
#
|
||||
# @see GridFS::GridStore
|
||||
#
|
||||
# @core gridfs
|
||||
module Mongo
|
||||
class Grid
|
||||
DEFAULT_ROOT_COLLECTION = 'fs'
|
||||
|
||||
def initialize(db, root_collection=DEFAULT_ROOT_COLLECTION, opts={})
|
||||
check_params(db)
|
||||
@db = db
|
||||
@files = @db["#{root_collection}.files"]
|
||||
@chunks = @db["#{root_collection}.chunks"]
|
||||
end
|
||||
|
||||
def open(filename, mode, opts={})
|
||||
file = GridIO.new(@files, @chunks, filename, mode, opts)
|
||||
result = nil
|
||||
begin
|
||||
if block_given?
|
||||
result = yield file
|
||||
end
|
||||
ensure
|
||||
file.close
|
||||
end
|
||||
result
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def check_params(db)
|
||||
if !db.is_a?(Mongo::DB)
|
||||
raise MongoArgumentError, "db must be an instance of Mongo::DB."
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
255
lib/mongo/gridfs/grid_io.rb
Normal file
255
lib/mongo/gridfs/grid_io.rb
Normal file
@ -0,0 +1,255 @@
|
||||
# --
|
||||
# Copyright (C) 2008-2009 10gen Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# ++
|
||||
|
||||
module Mongo
|
||||
class GridIO
|
||||
DEFAULT_CHUNK_SIZE = 256 * 1024
|
||||
DEFAULT_CONTENT_TYPE = 'text/plain'
|
||||
|
||||
attr_reader :content_type
|
||||
attr_reader :chunk_size
|
||||
|
||||
# @options opts [Hash] :cond
|
||||
def initialize(files, chunks, filename, mode, opts={})
|
||||
@files = files
|
||||
@chunks = chunks
|
||||
@filename = filename
|
||||
@mode = mode
|
||||
@content_type = opts[:content_type] || DEFAULT_CONTENT_TYPE
|
||||
@chunk_size = opts[:chunk_size] || DEFAULT_CHUNK_SIZE
|
||||
@files_id = opts[:files_id] || Mongo::ObjectID.new
|
||||
|
||||
init_file(opts)
|
||||
init_mode(opts)
|
||||
end
|
||||
|
||||
# Read the data from the file. If a length if specified, will read from the
|
||||
# current file position.
|
||||
#
|
||||
# @param [Integer] length
|
||||
#
|
||||
# @return [String]
|
||||
# the data in the file
|
||||
def read(length=nil)
|
||||
return '' if length == 0
|
||||
return read_all if length.nil? && @file_position.zero?
|
||||
buf = ''
|
||||
while true
|
||||
buf << @current_chunk['data'].to_s[@chunk_position..-1]
|
||||
if buf.length >= length
|
||||
return buf[0...length]
|
||||
else
|
||||
@current_chunk = get_chunk(@current_chunk['n'] + 1)
|
||||
end
|
||||
end
|
||||
buf
|
||||
end
|
||||
|
||||
# Write the given string (binary) data to the file.
|
||||
#
|
||||
# @param [String] string
|
||||
# the data to write
|
||||
#
|
||||
# @return [Integer]
|
||||
# the number of bytes written.
|
||||
def write(string)
|
||||
raise GridError, "#{@filename} not opened for write" unless @mode[0] == ?w
|
||||
# Since Ruby 1.9.1 doesn't necessarily store one character per byte.
|
||||
if string.respond_to?(:force_encoding)
|
||||
string.force_encoding("binary")
|
||||
end
|
||||
to_write = string.length
|
||||
while (to_write > 0) do
|
||||
if @current_chunk && @chunk_position == @chunk_size
|
||||
next_chunk_number = @current_chunk['n'] + 1
|
||||
@current_chunk = create_chunk(next_chunk_number)
|
||||
end
|
||||
chunk_available = @chunk_size - @chunk_position
|
||||
step_size = (to_write > chunk_available) ? chunk_available : to_write
|
||||
@current_chunk['data'] = Binary.new(@current_chunk['data'].to_s << string[-to_write, step_size])
|
||||
@chunk_position += step_size
|
||||
to_write -= step_size
|
||||
save_chunk(@current_chunk)
|
||||
end
|
||||
string.length - to_write
|
||||
end
|
||||
|
||||
# Position the file pointer at the provided location.
|
||||
#
|
||||
# @param [Integer] pos
|
||||
# the number of bytes to advance the file pointer. this can be a negative
|
||||
# number.
|
||||
# @param [Integer] whence
|
||||
# one of IO::SEEK_CUR, IO::SEEK_END, or IO::SEEK_SET
|
||||
#
|
||||
# @return [Integer] the new file position
|
||||
def seek(pos, whence=IO::SEEK_SET)
|
||||
raise GridError, "Seek is only allowed in read mode." unless @mode == 'r'
|
||||
target_pos = case whence
|
||||
when IO::SEEK_CUR
|
||||
@file_position + pos
|
||||
when IO::SEEK_END
|
||||
@file_length + pos
|
||||
when IO::SEEK_SET
|
||||
pos
|
||||
end
|
||||
|
||||
new_chunk_number = (target_pos / @chunk_size).to_i
|
||||
if new_chunk_number != @current_chunk['n']
|
||||
save_chunk(@current_chunk) if @mode[0] == ?w
|
||||
@current_chunk = get_chunk(new_chunk_number)
|
||||
end
|
||||
@file_position = target_pos
|
||||
@chunk_position = @file_position % @chunk_size
|
||||
@file_position
|
||||
end
|
||||
|
||||
# The current position of the file.
|
||||
#
|
||||
# @return [Integer]
|
||||
def tell
|
||||
@file_position
|
||||
end
|
||||
|
||||
# Creates or updates the document storing the chunks' metadata
|
||||
# in the files collection. The file exists only after this method
|
||||
# is called.
|
||||
#
|
||||
# This method will be invoked automatically
|
||||
# on GridIO#open. Otherwise, it must be called manually.
|
||||
#
|
||||
# @return [True]
|
||||
def close
|
||||
if @mode[0] == ?w
|
||||
if @upload_date
|
||||
@files.remove('_id' => @files_id)
|
||||
else
|
||||
@upload_date = Time.now
|
||||
end
|
||||
@files.insert(to_mongo_object)
|
||||
end
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def create_chunk(n)
|
||||
chunk = OrderedHash.new
|
||||
chunk['_id'] = Mongo::ObjectID.new
|
||||
chunk['n'] = n
|
||||
chunk['files_id'] = @files_id
|
||||
chunk['data'] = ''
|
||||
@chunk_position = 0
|
||||
chunk
|
||||
end
|
||||
|
||||
def save_chunk(chunk)
|
||||
@chunks.remove('_id' => chunk['_id'])
|
||||
@chunks.insert(chunk)
|
||||
end
|
||||
|
||||
def get_chunk(n)
|
||||
chunk = @chunks.find({'files_id' => @files_id, 'n' => n}).next_document
|
||||
@chunk_position = 0
|
||||
chunk || {}
|
||||
end
|
||||
|
||||
def delete_chunks(selector)
|
||||
@chunks.remove(selector)
|
||||
end
|
||||
|
||||
def last_chunk_number
|
||||
(@file_length / @chunk_size).to_i
|
||||
end
|
||||
|
||||
# An optimized read method for reading the whole file.
|
||||
def read_all
|
||||
buf = ''
|
||||
while true
|
||||
buf << @current_chunk['data'].to_s
|
||||
break if @current_chunk['n'] == last_chunk_number
|
||||
@current_chunk = get_chunk(@current_chunk['n'] + 1)
|
||||
end
|
||||
buf
|
||||
end
|
||||
|
||||
# Initialize based on whether the supplied file exists.
|
||||
def init_file(opts)
|
||||
selector = {'filename' => @filename}
|
||||
selector.merge(opts[:criteria]) if opts[:criteria]
|
||||
doc = @files.find(selector).next_document
|
||||
if doc
|
||||
@files_id = doc['_id']
|
||||
@content_type = doc['contentType']
|
||||
@chunk_size = doc['chunkSize']
|
||||
@upload_date = doc['uploadDate']
|
||||
@aliases = doc['aliases']
|
||||
@file_length = doc['length']
|
||||
@metadata = doc['metadata']
|
||||
@md5 = doc['md5']
|
||||
else
|
||||
@files_id = Mongo::ObjectID.new
|
||||
@content_type = opts[:content_type] || DEFAULT_CONTENT_TYPE
|
||||
@chunk_size = opts[:chunk_size] || DEFAULT_CHUNK_SIZE
|
||||
@length = 0
|
||||
end
|
||||
end
|
||||
|
||||
# Validates and sets up the class for the given file mode.
|
||||
def init_mode(opts)
|
||||
case @mode
|
||||
when 'r'
|
||||
@current_chunk = get_chunk(0)
|
||||
@file_position = 0
|
||||
when 'w'
|
||||
delete_chunks({'_files_id' => })
|
||||
|
||||
@metadata = opts[:metadata] if opts[:metadata]
|
||||
@chunks.create_index([['files_id', Mongo::ASCENDING], ['n', Mongo::ASCENDING]])
|
||||
@current_chunk = create_chunk(0)
|
||||
@file_position = 0
|
||||
when 'w+'
|
||||
@metadata = opts[:metadata] if opts[:metadata]
|
||||
@chunks.create_index([['files_id', Mongo::ASCENDING], ['n', Mongo::ASCENDING]])
|
||||
@current_chunk = get_chunk(last_chunk_number) || create_chunk(0)
|
||||
@chunk_position = @current_chunk['data'].length
|
||||
@file_position = @length
|
||||
else
|
||||
raise GridError, "Illegal file mode #{mode}. Valid options are 'r', 'w', and 'w+'."
|
||||
end
|
||||
end
|
||||
|
||||
def to_mongo_object
|
||||
h = OrderedHash.new
|
||||
h['_id'] = @files_id
|
||||
h['filename'] = @filename
|
||||
h['contentType'] = @content_type
|
||||
h['length'] = @current_chunk ? @current_chunk['n'] * @chunk_size + @chunk_position : 0
|
||||
h['chunkSize'] = @chunk_size
|
||||
h['uploadDate'] = @upload_date
|
||||
h['aliases'] = @aliases
|
||||
h['metadata'] = @metadata
|
||||
|
||||
# Get a server-side md5.
|
||||
md5_command = OrderedHash.new
|
||||
md5_command['filemd5'] = @files_id
|
||||
md5_command['root'] = 'fs'
|
||||
h['md5'] = @files.db.command(md5_command)['md5']
|
||||
|
||||
h
|
||||
end
|
||||
end
|
||||
end
|
195
test/test_grid.rb
Normal file
195
test/test_grid.rb
Normal file
@ -0,0 +1,195 @@
|
||||
require 'test/test_helper'
|
||||
|
||||
class GridTest < Test::Unit::TestCase
|
||||
include GridFS
|
||||
|
||||
def setup
|
||||
@db ||= Connection.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost',
|
||||
ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT).db('ruby-mongo-test')
|
||||
@files = @db.collection('fs.files')
|
||||
@chunks = @db.collection('fs.chunks')
|
||||
end
|
||||
|
||||
def teardown
|
||||
@files.remove
|
||||
@chunks.remove
|
||||
end
|
||||
|
||||
context "When reading:" do
|
||||
setup do
|
||||
@data = "CHUNKS" * 50000
|
||||
GridStore.open(@db, 'sample', 'w') do |f|
|
||||
f.puts @data
|
||||
end
|
||||
|
||||
@grid = Grid.new(@db)
|
||||
end
|
||||
|
||||
should "contain sample data" do
|
||||
assert_equal @data, GridStore.read(@db, 'sample')
|
||||
end
|
||||
|
||||
should "read sample data" do
|
||||
data = @grid.open('sample', 'r') { |f| f.read }
|
||||
assert_equal data.length, @data.length
|
||||
end
|
||||
|
||||
should "return an empty string if length is zero" do
|
||||
data = @grid.open('sample', 'r') { |f| f.read(0) }
|
||||
assert_equal '', data
|
||||
end
|
||||
|
||||
should "return the first n bytes" do
|
||||
data = @grid.open('sample', 'r') {|f| f.read(288888) }
|
||||
assert_equal 288888, data.length
|
||||
assert_equal @data[0...288888], data
|
||||
end
|
||||
|
||||
should "return the first n bytes even with an offset" do
|
||||
data = @grid.open('sample', 'r') do |f|
|
||||
f.seek(1000)
|
||||
f.read(288888)
|
||||
end
|
||||
assert_equal 288888, data.length
|
||||
assert_equal @data[1000...289888], data
|
||||
end
|
||||
end
|
||||
|
||||
context "When writing:" do
|
||||
setup do
|
||||
@data = "BYTES" * 50000
|
||||
@grid = Grid.new(@db)
|
||||
@grid.open('sample', 'w') do |f|
|
||||
f.write @data
|
||||
end
|
||||
end
|
||||
|
||||
should "read sample data" do
|
||||
data = @grid.open('sample', 'r') { |f| f.read }
|
||||
assert_equal data.length, @data.length
|
||||
end
|
||||
|
||||
should "return the total number of bytes written" do
|
||||
data = 'a' * 300000
|
||||
assert_equal 300000, @grid.open('write', 'w') {|f| f.write(data) }
|
||||
end
|
||||
|
||||
should "more read sample data" do
|
||||
data = @grid.open('sample', 'r') { |f| f.read }
|
||||
assert_equal data.length, @data.length
|
||||
end
|
||||
|
||||
should "raise exception if not opened for write" do
|
||||
assert_raise GridError do
|
||||
@grid.open('io', 'r') { |f| f.write('hello') }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "When appending:" do
|
||||
setup do
|
||||
@data = "1"
|
||||
@grid = Grid.new(@db)
|
||||
@grid.open('sample', 'w', :chunk_size => 1000) do |f|
|
||||
f.write @data
|
||||
end
|
||||
end
|
||||
|
||||
should "add data to the file" do
|
||||
new_data = "2"
|
||||
@grid.open('sample', 'w+') do |f|
|
||||
f.write(new_data)
|
||||
end
|
||||
|
||||
all_data = @grid.open('sample', 'r') {|f| f.read }
|
||||
assert_equal @data + new_data, all_data
|
||||
end
|
||||
|
||||
should "add multi-chunk-data" do
|
||||
new_data = "2" * 5000
|
||||
|
||||
@grid.open('sample', 'w+') do |f|
|
||||
f.write(new_data)
|
||||
end
|
||||
|
||||
all_data = @grid.open('sample', 'r') {|f| f.read }
|
||||
assert_equal @data + new_data, all_data
|
||||
end
|
||||
end
|
||||
|
||||
context "When writing chunks:" do
|
||||
setup do
|
||||
data = "B" * 50000
|
||||
@grid = Grid.new(@db)
|
||||
@grid.open('sample', 'w', :chunk_size => 1000) do |f|
|
||||
f.write data
|
||||
end
|
||||
end
|
||||
|
||||
should "write the correct number of chunks" do
|
||||
file = @files.find_one({:filename => 'sample'})
|
||||
chunks = @chunks.find({'files_id' => file['_id']}).to_a
|
||||
assert_equal 50, chunks.length
|
||||
end
|
||||
end
|
||||
|
||||
context "Positioning:" do
|
||||
setup do
|
||||
data = 'hello, world' + '1' * 5000 + 'goodbye!' + '2' * 1000 + '!'
|
||||
@grid = Grid.new(@db)
|
||||
@grid.open('hello', 'w', :chunk_size => 1000) do |f|
|
||||
f.write data
|
||||
end
|
||||
end
|
||||
|
||||
should "seek within chunks" do
|
||||
@grid.open('hello', 'r') do |f|
|
||||
f.seek(0)
|
||||
assert_equal 'h', f.read(1)
|
||||
f.seek(7)
|
||||
assert_equal 'w', f.read(1)
|
||||
f.seek(4)
|
||||
assert_equal 'o', f.read(1)
|
||||
f.seek(0)
|
||||
f.seek(7, IO::SEEK_CUR)
|
||||
assert_equal 'w', f.read(1)
|
||||
f.seek(-1, IO::SEEK_CUR)
|
||||
assert_equal ' ', f.read(1)
|
||||
f.seek(-4, IO::SEEK_CUR)
|
||||
assert_equal 'l', f.read(1)
|
||||
f.seek(3, IO::SEEK_CUR)
|
||||
assert_equal ',', f.read(1)
|
||||
end
|
||||
end
|
||||
|
||||
should "seek between chunks" do
|
||||
@grid.open('hello', 'r') do |f|
|
||||
f.seek(1000)
|
||||
assert_equal '11111', f.read(5)
|
||||
|
||||
f.seek(5009)
|
||||
assert_equal '111goodbye!222', f.read(14)
|
||||
|
||||
f.seek(-1, IO::SEEK_END)
|
||||
assert_equal '!', f.read(1)
|
||||
f.seek(-6, IO::SEEK_END)
|
||||
assert_equal '2', f.read(1)
|
||||
end
|
||||
end
|
||||
|
||||
should "tell the current position" do
|
||||
@grid.open('hello', 'r') do |f|
|
||||
assert_equal 0, f.tell
|
||||
|
||||
f.seek(999)
|
||||
assert_equal 999, f.tell
|
||||
end
|
||||
end
|
||||
|
||||
should "seek only in read mode" do
|
||||
assert_raise GridError do
|
||||
@grid.open('hello', 'w+') {|f| f.seek(0) }
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
36
test/test_grid_io.rb
Normal file
36
test/test_grid_io.rb
Normal file
@ -0,0 +1,36 @@
|
||||
require 'test/test_helper'
|
||||
|
||||
class GridIOTest < Test::Unit::TestCase
|
||||
include GridFS
|
||||
|
||||
def setup
|
||||
@db ||= Connection.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost',
|
||||
ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT).db('ruby-mongo-test')
|
||||
@files = @db.collection('fs.files')
|
||||
@chunks = @db.collection('fs.chunks')
|
||||
end
|
||||
|
||||
def teardown
|
||||
@files.remove
|
||||
@chunks.remove
|
||||
end
|
||||
|
||||
context "Options" do
|
||||
setup do
|
||||
@filename = 'test'
|
||||
@mode = 'w'
|
||||
end
|
||||
|
||||
should "set default 256k chunk size" do
|
||||
file = GridIO.new(@files, @chunks, @filename, @mode)
|
||||
assert_equal 256 * 1024, file.chunk_size
|
||||
end
|
||||
|
||||
should "set chunk size" do
|
||||
file = GridIO.new(@files, @chunks, @filename, @mode, :chunk_size => 1000)
|
||||
assert_equal 1000, file.chunk_size
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
Loading…
Reference in New Issue
Block a user