added safe more for gridfs
This commit is contained in:
parent
89fe06250e
commit
1d0cc7aae2
@ -15,7 +15,7 @@ mb = length / 1048576.0
|
|||||||
|
|
||||||
t1 = Time.now
|
t1 = Time.now
|
||||||
@grid = Grid.new(db)
|
@grid = Grid.new(db)
|
||||||
@id = @grid.put(sample_data, 'mongodb-new.pdf')
|
@id = @grid.put(sample_data, 'mongodb-new.pdf', :safe => true)
|
||||||
puts "Write: #{mb / (Time.now - t1)} mb/s"
|
puts "Write: #{mb / (Time.now - t1)} mb/s"
|
||||||
|
|
||||||
t1 = Time.now
|
t1 = Time.now
|
||||||
|
@ -21,7 +21,8 @@ module Mongo
|
|||||||
DEFAULT_FS_NAME = 'fs'
|
DEFAULT_FS_NAME = 'fs'
|
||||||
|
|
||||||
def initialize(db, fs_name=DEFAULT_FS_NAME)
|
def initialize(db, fs_name=DEFAULT_FS_NAME)
|
||||||
check_params(db)
|
raise MongoArgumentError, "db must be a Mongo::DB." unless db.is_a?(Mongo::DB)
|
||||||
|
|
||||||
@db = db
|
@db = db
|
||||||
@files = @db["#{fs_name}.files"]
|
@files = @db["#{fs_name}.files"]
|
||||||
@chunks = @db["#{fs_name}.chunks"]
|
@chunks = @db["#{fs_name}.chunks"]
|
||||||
@ -53,11 +54,5 @@ module Mongo
|
|||||||
def default_grid_io_opts
|
def default_grid_io_opts
|
||||||
{:fs_name => @fs_name}
|
{:fs_name => @fs_name}
|
||||||
end
|
end
|
||||||
|
|
||||||
def check_params(db)
|
|
||||||
if !db.is_a?(Mongo::DB)
|
|
||||||
raise MongoArgumentError, "db must be an instance of Mongo::DB."
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -19,8 +19,13 @@ module Mongo
|
|||||||
# WARNING: This class is part of a new, experimental GridFS API. Subject to change.
|
# WARNING: This class is part of a new, experimental GridFS API. Subject to change.
|
||||||
class GridFileSystem < Grid
|
class GridFileSystem < Grid
|
||||||
|
|
||||||
def initialize(db, fs_name=DEFAULT_FS_NAME)
|
def initialize(db, fs_name=Grid::DEFAULT_FS_NAME)
|
||||||
super
|
raise MongoArgumentError, "db must be a Mongo::DB." unless db.is_a?(Mongo::DB)
|
||||||
|
|
||||||
|
@db = db
|
||||||
|
@files = @db["#{fs_name}.files"]
|
||||||
|
@chunks = @db["#{fs_name}.chunks"]
|
||||||
|
@fs_name = fs_name
|
||||||
|
|
||||||
@files.create_index([['filename', 1], ['uploadDate', -1]])
|
@files.create_index([['filename', 1], ['uploadDate', -1]])
|
||||||
@default_query_opts = {:sort => [['filename', 1], ['uploadDate', -1]], :limit => 1}
|
@default_query_opts = {:sort => [['filename', 1], ['uploadDate', -1]], :limit => 1}
|
||||||
@ -39,25 +44,17 @@ module Mongo
|
|||||||
result
|
result
|
||||||
end
|
end
|
||||||
|
|
||||||
def put(data, filename, opts={})
|
def delete(filename)
|
||||||
opts.merge!(default_grid_io_opts(filename))
|
files = @files.find({'filename' => filename}, :fields => ['_id'])
|
||||||
file = GridIO.new(@files, @chunks, filename, 'w', opts)
|
files.each do |file|
|
||||||
file.write(data)
|
@files.remove({'_id' => file['_id']})
|
||||||
file.close
|
@chunks.remove({'files_id' => file['_id']})
|
||||||
file.files_id
|
|
||||||
end
|
end
|
||||||
|
end
|
||||||
|
alias_method :unlink, :delete
|
||||||
|
|
||||||
def get(filename, opts={})
|
def remove_previous_versions
|
||||||
opts.merge!(default_grid_io_opts(filename))
|
ids = @files.find({'filename' => filename}, :sort => [['filename', 1]])
|
||||||
GridIO.new(@files, @chunks, filename, 'r', opts)
|
|
||||||
end
|
|
||||||
|
|
||||||
def delete(filename, opts={})
|
|
||||||
ids = @files.find({'filename' => filename}, ['_id'])
|
|
||||||
ids.each do |id|
|
|
||||||
@files.remove({'_id' => id})
|
|
||||||
@chunks.remove('files_id' => id)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
@ -14,6 +14,8 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
# ++
|
# ++
|
||||||
|
|
||||||
|
require 'digest/md5'
|
||||||
|
|
||||||
module Mongo
|
module Mongo
|
||||||
|
|
||||||
# WARNING: This is part of a new, experimental GridFS API. Subject to change.
|
# WARNING: This is part of a new, experimental GridFS API. Subject to change.
|
||||||
@ -21,7 +23,8 @@ module Mongo
|
|||||||
DEFAULT_CHUNK_SIZE = 256 * 1024
|
DEFAULT_CHUNK_SIZE = 256 * 1024
|
||||||
DEFAULT_CONTENT_TYPE = 'binary/octet-stream'
|
DEFAULT_CONTENT_TYPE = 'binary/octet-stream'
|
||||||
|
|
||||||
attr_reader :content_type, :chunk_size, :upload_date, :files_id, :filename, :metadata
|
attr_reader :content_type, :chunk_size, :upload_date, :files_id, :filename,
|
||||||
|
:metadata, :server_md5, :client_md5
|
||||||
|
|
||||||
def initialize(files, chunks, filename, mode, opts={})
|
def initialize(files, chunks, filename, mode, opts={})
|
||||||
@files = files
|
@files = files
|
||||||
@ -31,6 +34,8 @@ module Mongo
|
|||||||
@query = opts[:query] || {}
|
@query = opts[:query] || {}
|
||||||
@query_opts = opts[:query_opts] || {}
|
@query_opts = opts[:query_opts] || {}
|
||||||
@fs_name = opts[:fs_name] || Grid::DEFAULT_FS_NAME
|
@fs_name = opts[:fs_name] || Grid::DEFAULT_FS_NAME
|
||||||
|
@safe = opts[:safe] || false
|
||||||
|
@local_md5 = Digest::MD5.new if @safe
|
||||||
|
|
||||||
case @mode
|
case @mode
|
||||||
when 'r' then init_read(opts)
|
when 'r' then init_read(opts)
|
||||||
@ -68,12 +73,22 @@ module Mongo
|
|||||||
def write(io)
|
def write(io)
|
||||||
raise GridError, "file not opened for write" unless @mode[0] == ?w
|
raise GridError, "file not opened for write" unless @mode[0] == ?w
|
||||||
if io.is_a? String
|
if io.is_a? String
|
||||||
|
if @safe
|
||||||
|
@local_md5.update(io)
|
||||||
|
end
|
||||||
write_string(io)
|
write_string(io)
|
||||||
else
|
else
|
||||||
length = 0
|
length = 0
|
||||||
|
if @safe
|
||||||
|
while(string = io.read(@chunk_size))
|
||||||
|
@local_md5.update(string)
|
||||||
|
length += write_string(string)
|
||||||
|
end
|
||||||
|
else
|
||||||
while(string = io.read(@chunk_size))
|
while(string = io.read(@chunk_size))
|
||||||
length += write_string(string)
|
length += write_string(string)
|
||||||
end
|
end
|
||||||
|
end
|
||||||
length
|
length
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@ -266,14 +281,24 @@ module Mongo
|
|||||||
h['uploadDate'] = @upload_date
|
h['uploadDate'] = @upload_date
|
||||||
h['aliases'] = @aliases
|
h['aliases'] = @aliases
|
||||||
h['metadata'] = @metadata
|
h['metadata'] = @metadata
|
||||||
|
h['md5'] = get_md5
|
||||||
|
h
|
||||||
|
end
|
||||||
|
|
||||||
# Get a server-side md5.
|
# Get a server-side md5 and validate against the client if running in safe mode.
|
||||||
|
def get_md5
|
||||||
md5_command = OrderedHash.new
|
md5_command = OrderedHash.new
|
||||||
md5_command['filemd5'] = @files_id
|
md5_command['filemd5'] = @files_id
|
||||||
md5_command['root'] = @fs_name
|
md5_command['root'] = @fs_name
|
||||||
h['md5'] = @files.db.command(md5_command)['md5']
|
@server_md5 = @files.db.command(md5_command)['md5']
|
||||||
|
if @safe
|
||||||
h
|
@client_md5 = @local_md5.hexdigest
|
||||||
|
if @local_md5 != @server_md5
|
||||||
|
raise @local_md5 != @server_md5GridError, "File on server failed MD5 check"
|
||||||
|
end
|
||||||
|
else
|
||||||
|
@server_md5
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -105,6 +105,29 @@ class GridFileSystemTest < Test::Unit::TestCase
|
|||||||
should "contain the new data" do
|
should "contain the new data" do
|
||||||
assert_equal @new_data, @new.read, "Expected DATA"
|
assert_equal @new_data, @new.read, "Expected DATA"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context "and on a second overwrite" do
|
||||||
|
setup do
|
||||||
|
sleep(2)
|
||||||
|
new_data = "NEW" * 1000
|
||||||
|
@grid.open('sample', 'w') do |f|
|
||||||
|
f.write new_data
|
||||||
|
end
|
||||||
|
|
||||||
|
@ids = @db['fs.files'].find({'filename' => 'sample'}).map {|file| file['_id']}
|
||||||
|
end
|
||||||
|
|
||||||
|
should "write a third version of the file" do
|
||||||
|
assert_equal 3, @db['fs.files'].find({'filename' => 'sample'}).count
|
||||||
|
assert_equal 3, @db['fs.chunks'].find({'files_id' => {'$in' => @ids}}).count
|
||||||
|
end
|
||||||
|
|
||||||
|
should "remove all versions and their data on delete" do
|
||||||
|
@grid.delete('sample')
|
||||||
|
assert_equal 0, @db['fs.files'].find({'filename' => 'sample'}).count
|
||||||
|
assert_equal 0, @db['fs.chunks'].find({'files_id' => {'$in' => @ids}}).count
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -9,6 +9,7 @@ class GridIOTest < Test::Unit::TestCase
|
|||||||
ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT).db('ruby-mongo-test')
|
ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT).db('ruby-mongo-test')
|
||||||
@files = @db.collection('fs.files')
|
@files = @db.collection('fs.files')
|
||||||
@chunks = @db.collection('fs.chunks')
|
@chunks = @db.collection('fs.chunks')
|
||||||
|
@chunks.create_index([['files_id', Mongo::ASCENDING], ['n', Mongo::ASCENDING]])
|
||||||
end
|
end
|
||||||
|
|
||||||
teardown do
|
teardown do
|
||||||
@ -32,6 +33,24 @@ class GridIOTest < Test::Unit::TestCase
|
|||||||
assert_equal 1000, file.chunk_size
|
assert_equal 1000, file.chunk_size
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context "Grid MD5 check" do
|
||||||
|
|
||||||
|
should "run in safe mode" do
|
||||||
|
file = GridIO.new(@files, @chunks, 'smallfile', 'w', :safe => true)
|
||||||
|
file.write("DATA" * 100)
|
||||||
|
assert file.close
|
||||||
|
assert_equal file.server_md5, file.client_md5
|
||||||
|
end
|
||||||
|
|
||||||
|
should "validate with a large file" do
|
||||||
|
io = File.open(File.join(File.dirname(__FILE__), 'data', 'sample_file.pdf'), 'r')
|
||||||
|
file = GridIO.new(@files, @chunks, 'bigfile', 'w', :safe => true)
|
||||||
|
file.write(io)
|
||||||
|
assert file.close
|
||||||
|
assert_equal file.server_md5, file.client_md5
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
Loading…
Reference in New Issue
Block a user