Added synchronization

This commit is contained in:
Jim Menard 2009-01-07 11:46:38 -05:00
parent e4b19ec8f4
commit 3473d026a6

View File

@ -15,6 +15,7 @@
# ++
require 'socket'
require 'mutex_m'
require 'mongo/mongo'
require 'mongo/collection'
require 'mongo/message'
@ -61,6 +62,8 @@ module XGen
@name, @host, @port = db_name, host, port
@socket = TCPSocket.new(@host, @port)
@strict = false
@semaphore = Object.new
@semaphore.extend Mutex_m
end
# Returns an array of collection names. Each name is of the form
@ -161,23 +164,26 @@ module XGen
# Send a Query to +collection_name+ and return a Cursor over the
# results.
def query(collection_name, query)
# TODO synchronize
send_to_db(QueryMessage.new(@name, collection_name, query))
return Cursor.new(self, collection_name, query.number_to_return)
@semaphore.synchronize {
send_to_db(QueryMessage.new(@name, collection_name, query))
Cursor.new(self, collection_name, query.number_to_return)
}
end
# Remove the records that match +selector+ from +collection_name+.
# Normally called by Collection#remove or Collection#clear.
def remove_from_db(collection_name, selector)
# TODO synchronize
send_to_db(RemoveMessage.new(@name, collection_name, selector))
@semaphore.synchronize {
send_to_db(RemoveMessage.new(@name, collection_name, selector))
}
end
# Update records in +collection_name+ that match +selector+ by
# applying +obj+ as an update. Normally called by Collection#replace.
def replace_in_db(collection_name, selector, obj)
# TODO synchronize
send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, false))
@semaphore.synchronize {
send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, false))
}
end
# Alias for #replace_in_db. Normally called by Collection.modify.
@ -188,9 +194,10 @@ module XGen
# called by Collection#repsert.
def repsert_in_db(collection_name, selector, obj)
# TODO if PKInjector, inject
# TODO synchronize
send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, true))
obj
@semaphore.synchronize {
send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, true))
obj
}
end
# Return the number of records in +collection_name+ that match
@ -226,7 +233,6 @@ module XGen
# :ns :: Namespace; same as +collection_name+.
def index_information(collection_name)
sel = {:ns => full_coll_name(collection_name)}
# TODO synchronize
query(SYSTEM_INDEX_COLLECTION, Query.new(sel)).collect { |row|
h = {:name => row['name']}
raise "Name of index on return from db was nil. Coll = #{full_coll_name(collection_name)}" unless h[:name]
@ -251,15 +257,17 @@ module XGen
field_h = {}
fields.each { |f| field_h[f] = 1 }
sel[:key] = field_h
# TODO synchronize
send_to_db(InsertMessage.new(@name, SYSTEM_INDEX_COLLECTION, sel))
@semaphore.synchronize {
send_to_db(InsertMessage.new(@name, SYSTEM_INDEX_COLLECTION, sel))
}
end
# Insert +objects+ into +collection_name+. Normally called by
# Collection#insert.
def insert_into_db(collection_name, objects)
# TODO synchronize
objects.each { |o| send_to_db(InsertMessage.new(@name, collection_name, o)) }
@semaphore.synchronize {
objects.each { |o| send_to_db(InsertMessage.new(@name, collection_name, o)) }
}
end
def send_to_db(message)
@ -288,7 +296,6 @@ module XGen
end
end
# TODO synchronize
q = Query.new(selector)
q.number_to_return = 1
query(SYSTEM_COMMAND_COLLECTION, q).next_object