streaming for new gridfs api
This commit is contained in:
parent
83f2bdf472
commit
6a09b141fd
|
@ -18,13 +18,13 @@ module Mongo
|
||||||
|
|
||||||
# WARNING: This class is part of a new, experimental GridFS API. Subject to change.
|
# WARNING: This class is part of a new, experimental GridFS API. Subject to change.
|
||||||
class Grid
|
class Grid
|
||||||
DEFAULT_BUCKET_NAME = 'fs'
|
DEFAULT_FS_NAME = 'fs'
|
||||||
|
|
||||||
def initialize(db, bucket_name=DEFAULT_BUCKET_NAME)
|
def initialize(db, fs_name=DEFAULT_FS_NAME)
|
||||||
check_params(db)
|
check_params(db)
|
||||||
@db = db
|
@db = db
|
||||||
@files = @db["#{bucket_name}.files"]
|
@files = @db["#{fs_name}.files"]
|
||||||
@chunks = @db["#{bucket_name}.chunks"]
|
@chunks = @db["#{fs_name}.chunks"]
|
||||||
|
|
||||||
@chunks.create_index([['files_id', Mongo::ASCENDING], ['n', Mongo::ASCENDING]])
|
@chunks.create_index([['files_id', Mongo::ASCENDING], ['n', Mongo::ASCENDING]])
|
||||||
end
|
end
|
||||||
|
|
|
@ -19,7 +19,7 @@ module Mongo
|
||||||
# WARNING: This class is part of a new, experimental GridFS API. Subject to change.
|
# WARNING: This class is part of a new, experimental GridFS API. Subject to change.
|
||||||
class GridFileSystem < Grid
|
class GridFileSystem < Grid
|
||||||
|
|
||||||
def initialize(db, bucket_name=DEFAULT_BUCKET_NAME)
|
def initialize(db, fs_name=DEFAULT_FS_NAME)
|
||||||
super
|
super
|
||||||
|
|
||||||
@files.create_index([['filename', 1], ['uploadDate', -1]])
|
@files.create_index([['filename', 1], ['uploadDate', -1]])
|
||||||
|
|
|
@ -48,20 +48,15 @@ module Mongo
|
||||||
# @return [String]
|
# @return [String]
|
||||||
# the data in the file
|
# the data in the file
|
||||||
def read(length=nil)
|
def read(length=nil)
|
||||||
return '' if length == 0
|
if length == 0
|
||||||
return read_all if length.nil? && @file_position.zero?
|
return ''
|
||||||
buf = ''
|
elsif length.nil? && @file_position.zero?
|
||||||
while true
|
read_all
|
||||||
buf << @current_chunk['data'].to_s[@chunk_position..-1]
|
else
|
||||||
if buf.length >= length
|
read_length(length)
|
||||||
return buf[0...length]
|
|
||||||
else
|
|
||||||
@current_chunk = get_chunk(@current_chunk['n'] + 1)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
buf
|
|
||||||
end
|
end
|
||||||
alias :data :read
|
alias_method :data, :read
|
||||||
|
|
||||||
# Write the given string (binary) data to the file.
|
# Write the given string (binary) data to the file.
|
||||||
#
|
#
|
||||||
|
@ -70,26 +65,17 @@ module Mongo
|
||||||
#
|
#
|
||||||
# @return [Integer]
|
# @return [Integer]
|
||||||
# the number of bytes written.
|
# the number of bytes written.
|
||||||
def write(string)
|
def write(io)
|
||||||
raise GridError, "#{@filename} not opened for write" unless @mode[0] == ?w
|
raise GridError, "file not opened for write" unless @mode[0] == ?w
|
||||||
# Since Ruby 1.9.1 doesn't necessarily store one character per byte.
|
if io.is_a? String
|
||||||
if string.respond_to?(:force_encoding)
|
write_string(io)
|
||||||
string.force_encoding("binary")
|
else
|
||||||
end
|
length = 0
|
||||||
to_write = string.length
|
while(string = io.read(@chunk_size))
|
||||||
while (to_write > 0) do
|
length += write_string(string)
|
||||||
if @current_chunk && @chunk_position == @chunk_size
|
|
||||||
next_chunk_number = @current_chunk['n'] + 1
|
|
||||||
@current_chunk = create_chunk(next_chunk_number)
|
|
||||||
end
|
end
|
||||||
chunk_available = @chunk_size - @chunk_position
|
length
|
||||||
step_size = (to_write > chunk_available) ? chunk_available : to_write
|
|
||||||
@current_chunk['data'] = Binary.new((@current_chunk['data'].to_s << string[-to_write, step_size]).unpack("c*"))
|
|
||||||
@chunk_position += step_size
|
|
||||||
to_write -= step_size
|
|
||||||
save_chunk(@current_chunk)
|
|
||||||
end
|
end
|
||||||
string.length - to_write
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# Position the file pointer at the provided location.
|
# Position the file pointer at the provided location.
|
||||||
|
@ -129,12 +115,12 @@ module Mongo
|
||||||
@file_position
|
@file_position
|
||||||
end
|
end
|
||||||
|
|
||||||
# Creates or updates the document storing the chunks' metadata
|
# Creates or updates the document from the files collection that
|
||||||
# in the files collection. The file exists only after this method
|
# stores the chunks' metadata. The file becomes available only after
|
||||||
# is called.
|
# this method has been called.
|
||||||
#
|
#
|
||||||
# This method will be invoked automatically
|
# This method will be invoked automatically when
|
||||||
# on GridIO#open. Otherwise, it must be called manually.
|
# on GridIO#open is passed a block. Otherwise, it must be called manually.
|
||||||
#
|
#
|
||||||
# @return [True]
|
# @return [True]
|
||||||
def close
|
def close
|
||||||
|
@ -170,7 +156,12 @@ module Mongo
|
||||||
def get_chunk(n)
|
def get_chunk(n)
|
||||||
chunk = @chunks.find({'files_id' => @files_id, 'n' => n}).next_document
|
chunk = @chunks.find({'files_id' => @files_id, 'n' => n}).next_document
|
||||||
@chunk_position = 0
|
@chunk_position = 0
|
||||||
chunk || {}
|
chunk
|
||||||
|
end
|
||||||
|
|
||||||
|
def get_chunk_for_read(n)
|
||||||
|
chunk = get_chunk(n)
|
||||||
|
return nil unless chunk
|
||||||
end
|
end
|
||||||
|
|
||||||
def last_chunk_number
|
def last_chunk_number
|
||||||
|
@ -188,6 +179,58 @@ module Mongo
|
||||||
buf
|
buf
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def read_length(length)
|
||||||
|
cache_chunk_data
|
||||||
|
remaining = (@file_length - @file_position)
|
||||||
|
to_read = length > remaining ? remaining : length
|
||||||
|
return nil unless remaining > 0
|
||||||
|
|
||||||
|
buf = ''
|
||||||
|
while to_read > 0
|
||||||
|
if @chunk_position == @chunk_data_length
|
||||||
|
@current_chunk = get_chunk(@current_chunk['n'] + 1)
|
||||||
|
cache_chunk_data
|
||||||
|
end
|
||||||
|
chunk_remainder = @chunk_data_length - @chunk_position
|
||||||
|
size = (to_read >= chunk_remainder) ? chunk_remainder : to_read
|
||||||
|
buf << @current_chunk_data[@chunk_position, size]
|
||||||
|
to_read -= size
|
||||||
|
@chunk_position += size
|
||||||
|
@file_position += size
|
||||||
|
end
|
||||||
|
buf
|
||||||
|
end
|
||||||
|
|
||||||
|
def cache_chunk_data
|
||||||
|
@current_chunk_data = @current_chunk['data'].to_s
|
||||||
|
if @current_chunk_data.respond_to?(:force_encoding)
|
||||||
|
@current_chunk_data.force_encoding("binary")
|
||||||
|
end
|
||||||
|
@chunk_data_length = @current_chunk['data'].length
|
||||||
|
end
|
||||||
|
|
||||||
|
def write_string(string)
|
||||||
|
# Since Ruby 1.9.1 doesn't necessarily store one character per byte.
|
||||||
|
if string.respond_to?(:force_encoding)
|
||||||
|
string.force_encoding("binary")
|
||||||
|
end
|
||||||
|
|
||||||
|
to_write = string.length
|
||||||
|
while (to_write > 0) do
|
||||||
|
if @current_chunk && @chunk_position == @chunk_size
|
||||||
|
next_chunk_number = @current_chunk['n'] + 1
|
||||||
|
@current_chunk = create_chunk(next_chunk_number)
|
||||||
|
end
|
||||||
|
chunk_available = @chunk_size - @chunk_position
|
||||||
|
step_size = (to_write > chunk_available) ? chunk_available : to_write
|
||||||
|
@current_chunk['data'] = Binary.new((@current_chunk['data'].to_s << string[-to_write, step_size]).unpack("c*"))
|
||||||
|
@chunk_position += step_size
|
||||||
|
to_write -= step_size
|
||||||
|
save_chunk(@current_chunk)
|
||||||
|
end
|
||||||
|
string.length - to_write
|
||||||
|
end
|
||||||
|
|
||||||
# Initialize based on whether the supplied file exists.
|
# Initialize based on whether the supplied file exists.
|
||||||
def init_read(filesystem, opts)
|
def init_read(filesystem, opts)
|
||||||
if filesystem
|
if filesystem
|
||||||
|
|
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
This is pretty small data
|
|
@ -5,8 +5,8 @@ class GridTest < Test::Unit::TestCase
|
||||||
def setup
|
def setup
|
||||||
@db ||= Connection.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost',
|
@db ||= Connection.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost',
|
||||||
ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT).db('ruby-mongo-test')
|
ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT).db('ruby-mongo-test')
|
||||||
@files = @db.collection('test-bucket.files')
|
@files = @db.collection('test-fs.files')
|
||||||
@chunks = @db.collection('test-bucket.chunks')
|
@chunks = @db.collection('test-fs.chunks')
|
||||||
end
|
end
|
||||||
|
|
||||||
def teardown
|
def teardown
|
||||||
|
@ -17,7 +17,7 @@ class GridTest < Test::Unit::TestCase
|
||||||
context "A basic grid-stored file" do
|
context "A basic grid-stored file" do
|
||||||
setup do
|
setup do
|
||||||
@data = "GRIDDATA" * 50000
|
@data = "GRIDDATA" * 50000
|
||||||
@grid = Grid.new(@db, 'test-bucket')
|
@grid = Grid.new(@db, 'test-fs')
|
||||||
@id = @grid.put(@data, 'sample', :metadata => {'app' => 'photos'})
|
@id = @grid.put(@data, 'sample', :metadata => {'app' => 'photos'})
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -44,4 +44,38 @@ class GridTest < Test::Unit::TestCase
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def read_and_write_stream(filename, read_length, opts={})
|
||||||
|
io = File.open(File.join(File.dirname(__FILE__), 'data', filename), 'r')
|
||||||
|
id = @grid.put(io, filename + read_length.to_s, opts)
|
||||||
|
file = @grid.get(id)
|
||||||
|
io.rewind
|
||||||
|
data = io.read
|
||||||
|
read_data = ""
|
||||||
|
while(chunk = file.read(read_length))
|
||||||
|
read_data << chunk
|
||||||
|
end
|
||||||
|
assert_equal data.length, read_data.length
|
||||||
|
end
|
||||||
|
|
||||||
|
context "Streaming: " do
|
||||||
|
setup do
|
||||||
|
@grid = Grid.new(@db, 'test-fs')
|
||||||
|
end
|
||||||
|
|
||||||
|
should "put and get a small io object with a small chunk size" do
|
||||||
|
read_and_write_stream('small_data.txt', 1, :chunk_size => 2)
|
||||||
|
end
|
||||||
|
|
||||||
|
should "put and get a small io object" do
|
||||||
|
read_and_write_stream('small_data.txt', 1)
|
||||||
|
end
|
||||||
|
|
||||||
|
should "put and get a large io object when reading smaller than the chunk size" do
|
||||||
|
read_and_write_stream('sample_file.pdf', 256 * 1024)
|
||||||
|
end
|
||||||
|
|
||||||
|
should "put and get a large io object when reading larger than the chunk size" do
|
||||||
|
read_and_write_stream('sample_file.pdf', 300 * 1024)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -143,12 +143,12 @@ class GridTest < Test::Unit::TestCase
|
||||||
f.seek(0)
|
f.seek(0)
|
||||||
f.seek(7, IO::SEEK_CUR)
|
f.seek(7, IO::SEEK_CUR)
|
||||||
assert_equal 'w', f.read(1)
|
assert_equal 'w', f.read(1)
|
||||||
f.seek(-1, IO::SEEK_CUR)
|
f.seek(-2, IO::SEEK_CUR)
|
||||||
assert_equal ' ', f.read(1)
|
assert_equal ' ', f.read(1)
|
||||||
f.seek(-4, IO::SEEK_CUR)
|
f.seek(-4, IO::SEEK_CUR)
|
||||||
assert_equal 'l', f.read(1)
|
assert_equal 'l', f.read(1)
|
||||||
f.seek(3, IO::SEEK_CUR)
|
f.seek(3, IO::SEEK_CUR)
|
||||||
assert_equal ',', f.read(1)
|
assert_equal 'w', f.read(1)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue