2008-12-17 16:49:06 +00:00
# --
2009-01-06 15:51:01 +00:00
# Copyright (C) 2008-2009 10gen Inc.
2008-11-22 01:00:51 +00:00
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License, version 3, as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
# for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
2008-12-17 16:49:06 +00:00
# ++
2008-11-22 01:00:51 +00:00
require 'socket'
2009-01-07 16:46:38 +00:00
require 'mutex_m'
2008-11-22 01:00:51 +00:00
require 'mongo/collection'
require 'mongo/message'
require 'mongo/query'
2008-12-08 16:38:42 +00:00
require 'mongo/util/ordered_hash.rb'
2009-01-07 20:36:12 +00:00
require 'mongo/admin'
2008-11-22 01:00:51 +00:00
module XGen
module Mongo
module Driver
2008-12-17 16:43:08 +00:00
# A Mongo database.
2008-11-22 01:00:51 +00:00
class DB
2008-12-17 16:43:08 +00:00
2008-11-22 01:00:51 +00:00
SYSTEM_NAMESPACE_COLLECTION = " system.namespaces "
SYSTEM_INDEX_COLLECTION = " system.indexes "
2009-01-07 20:58:54 +00:00
SYSTEM_PROFILE_COLLECTION = " system.profile "
2008-11-22 01:00:51 +00:00
SYSTEM_COMMAND_COLLECTION = " $cmd "
2008-12-17 16:43:08 +00:00
# Strict mode enforces collection existence checks. When +true+,
# asking for a collection that does not exist or trying to create a
# collection that already exists raises an error.
#
# Strict mode is off (+false+) by default. Its value can be changed at
# any time.
2008-12-16 22:08:15 +00:00
attr_writer :strict
# Returns the value of the +strict+ flag.
def strict? ; @strict ; end
2008-12-17 16:43:08 +00:00
# The name of the database.
attr_reader :name
2009-01-14 23:37:28 +00:00
# Host to which we are currently connected.
attr_reader :host
# Port to which we are currently connected.
attr_reader :port
# An array of [host, port] pairs.
attr_reader :nodes
2009-01-13 18:08:04 +00:00
2009-01-14 20:36:17 +00:00
# The database's socket. For internal (and Cursor) use only.
2008-12-17 16:43:08 +00:00
attr_reader :socket
2008-11-22 01:00:51 +00:00
2009-01-16 19:41:53 +00:00
# A primary key factory object (or +nil+). See the README.doc file or
# DB#new for details.
attr_reader :pk_factory
2009-01-16 21:10:52 +00:00
def pk_factory = ( pk_factory )
raise " error: can not change PK factory " if @pk_factory
@pk_factory = pk_factory
end
2008-12-17 16:43:08 +00:00
# db_name :: The database name
#
2009-01-14 23:37:28 +00:00
# nodes :: An array of [host, port] pairs.
2008-12-17 16:43:08 +00:00
#
2009-01-16 19:41:53 +00:00
# options :: A hash of options.
#
# Options:
#
# :strict :: If true, collections must exist to be accessed and must
# not exist to be created. See #collection and
# #create_collection.
#
# :pk :: A primary key factory object that must respond to :create_pk,
# which should take a hash and return a hash which merges the
# original hash with any primary key fields the factory wishes
# to inject. (NOTE: if the object already has a primary key,
# the factory should not inject a new key; this means that the
# object is being used in a repsert but it already exists.) The
# idea here is that when ever a record is inserted, the :pk
# object's +create_pk+ method will be called and the new hash
# returned will be inserted.
#
2009-01-14 23:37:28 +00:00
# When a DB object first connects, it tries the first node. If that
# fails, it keeps trying to connect to the remaining nodes until it
# sucessfully connects.
2009-01-16 19:41:53 +00:00
def initialize ( db_name , nodes , options = { } )
2009-01-20 20:59:07 +00:00
raise " Invalid DB name \" #{ db_name } \" (must be non-nil, non-zero-length, and can not contain \" . \" ) " if ! db_name || ( db_name && db_name . length > 0 && db_name . include? ( " . " ) )
2009-01-14 23:37:28 +00:00
@name , @nodes = db_name , nodes
2009-01-16 19:41:53 +00:00
@strict = options [ :strict ]
@pk_factory = options [ :pk ]
2009-01-07 16:46:38 +00:00
@semaphore = Object . new
@semaphore . extend Mutex_m
2009-01-14 23:37:28 +00:00
connect_to_first_available_host
end
def connect_to_first_available_host
close if @socket
@host = @port = nil
@nodes . detect { | hp |
@host , @port = * hp
begin
@socket = TCPSocket . new ( @host , @port )
break if ok? ( db_command ( :ismaster = > 1 ) ) # success
rescue = > ex
close if @socket
end
@socket
}
raise " error: failed to connect to any given host:port " unless @socket
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Returns an array of collection names. Each name is of the form
# "database_name.collection_name".
2008-11-22 01:00:51 +00:00
def collection_names
names = collections_info . collect { | doc | doc [ 'name' ] || '' }
names . delete ( '' )
names
end
2008-12-17 16:43:08 +00:00
# Returns a cursor over query result hashes. Each hash contains a
# 'name' string and optionally an 'options' hash. If +coll_name+ is
# specified, an array of length 1 is returned.
2008-11-22 01:00:51 +00:00
def collections_info ( coll_name = nil )
selector = { }
2008-12-17 16:43:08 +00:00
selector [ :name ] = full_coll_name ( coll_name ) if coll_name
2009-01-13 20:38:46 +00:00
query ( Collection . new ( self , SYSTEM_NAMESPACE_COLLECTION ) , Query . new ( selector ) )
2008-11-22 01:00:51 +00:00
end
2008-12-16 22:08:15 +00:00
# Create a collection. If +strict+ is false, will return existing or
# new collection. If +strict+ is true, will raise an error if
2008-12-17 16:43:08 +00:00
# collection +name+ already exists.
2008-12-17 18:14:42 +00:00
#
# Options is an optional hash:
#
# :capped :: Boolean. If not specified, capped is +false+.
#
# :size :: If +capped+ is +true+, specifies the maximum number of
# bytes. If +false+, specifies the initial extent of the
# collection.
#
# :max :: Max number of records in a capped collection. Optional.
2008-11-22 01:00:51 +00:00
def create_collection ( name , options = { } )
# First check existence
2008-12-16 22:08:15 +00:00
if collection_names . include? ( full_coll_name ( name ) )
if strict?
raise " Collection #{ name } already exists. Currently in strict mode. "
else
return Collection . new ( self , name )
end
end
2008-11-22 01:00:51 +00:00
# Create new collection
2008-12-08 16:38:42 +00:00
oh = OrderedHash . new
oh [ :create ] = name
2008-12-17 18:14:42 +00:00
doc = db_command ( oh . merge ( options || { } ) )
2008-12-16 22:35:31 +00:00
ok = doc [ 'ok' ]
return Collection . new ( self , name ) if ok . kind_of? ( Numeric ) && ( ok . to_i == 1 || ok . to_i == 0 )
2008-11-22 01:00:51 +00:00
raise " Error creating collection: #{ doc . inspect } "
end
def admin
Admin . new ( self )
end
2008-12-16 22:08:15 +00:00
# Return a collection. If +strict+ is false, will return existing or
# new collection. If +strict+ is true, will raise an error if
2008-12-17 16:43:08 +00:00
# collection +name+ does not already exists.
2008-11-22 01:00:51 +00:00
def collection ( name )
2009-01-16 18:24:49 +00:00
return Collection . new ( self , name ) if ! strict? || collection_names . include? ( full_coll_name ( name ) )
raise " Collection #{ name } doesn't exist. Currently in strict mode. "
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Drop collection +name+. Returns +true+ on success or if the
# collection does not exist, +false+ otherwise.
2008-11-22 01:00:51 +00:00
def drop_collection ( name )
2008-12-17 16:43:08 +00:00
return true unless collection_names . include? ( full_coll_name ( name ) )
2008-11-22 01:00:51 +00:00
coll = collection ( name )
2008-12-02 12:20:29 +00:00
coll . drop_indexes # Mongo requires that we drop indexes manually
2008-12-16 22:35:31 +00:00
ok? ( db_command ( :drop = > name ) )
end
# Returns true if this database is a master (or is not paired with any
# other database), false if it is a slave.
def master?
doc = db_command ( :ismaster = > 1 )
is_master = doc [ 'ismaster' ]
ok? ( doc ) && is_master . kind_of? ( Numeric ) && is_master . to_i == 1
2008-11-22 01:00:51 +00:00
end
2009-01-13 18:08:04 +00:00
# Returns a string of the form "host:port" that points to the master
# database. Works even if this is the master database.
def master
doc = db_command ( :ismaster = > 1 )
is_master = doc [ 'ismaster' ]
2009-01-20 20:59:07 +00:00
raise " Error retrieving master database: #{ doc . inspect } " unless ok? ( doc ) && is_master . kind_of? ( Numeric )
2009-01-13 18:08:04 +00:00
case is_master . to_i
when 1
" #@host : #@port "
else
doc [ 'remote' ]
end
end
2009-01-14 20:49:49 +00:00
# Switches our socket to the master database. If we are already the
# master, no change is made.
def switch_to_master
master_str = master ( )
unless master_str == " #@host : #@port "
@semaphore . synchronize {
master_str =~ / (.+):( \ d+) /
@host , @port = $1 , $2
close ( )
@socket = TCPSocket . new ( @host , @port )
}
end
end
2008-12-17 16:43:08 +00:00
# Close the connection to the database.
2008-11-22 01:00:51 +00:00
def close
2009-01-14 23:37:28 +00:00
@socket . close if @socket
@socket = nil
end
def connected?
@socket != nil
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Send a MsgMessage to the database.
2008-11-22 01:00:51 +00:00
def send_message ( msg )
send_to_db ( MsgMessage . new ( msg ) )
end
2009-01-13 19:02:16 +00:00
# Returns a Cursor over the query results.
#
# Note that the query gets sent lazily; the cursor calls
# #send_query_message when needed. If the caller never requests an
# object from the cursor, the query never gets sent.
2009-01-13 20:38:46 +00:00
def query ( collection , query )
2009-01-13 20:51:41 +00:00
Cursor . new ( self , collection , query )
2009-01-13 19:02:16 +00:00
end
# Used by a Cursor to lazily send the query to the database.
def send_query_message ( query_message )
2009-01-07 16:46:38 +00:00
@semaphore . synchronize {
2009-01-13 19:02:16 +00:00
send_to_db ( query_message )
2009-01-07 16:46:38 +00:00
}
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Remove the records that match +selector+ from +collection_name+.
# Normally called by Collection#remove or Collection#clear.
2008-12-08 16:38:42 +00:00
def remove_from_db ( collection_name , selector )
2009-01-07 16:46:38 +00:00
@semaphore . synchronize {
send_to_db ( RemoveMessage . new ( @name , collection_name , selector ) )
}
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Update records in +collection_name+ that match +selector+ by
# applying +obj+ as an update. Normally called by Collection#replace.
2008-12-08 16:38:42 +00:00
def replace_in_db ( collection_name , selector , obj )
2009-01-07 16:46:38 +00:00
@semaphore . synchronize {
send_to_db ( UpdateMessage . new ( @name , collection_name , selector , obj , false ) )
}
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Alias for #replace_in_db. Normally called by Collection.modify.
2008-11-22 01:00:51 +00:00
alias_method :modify_in_db , :replace_in_db
2008-12-17 16:43:08 +00:00
# Update records in +collection_name+ that match +selector+ by
# applying +obj+ as an update. If no match, inserts (???). Normally
# called by Collection#repsert.
2008-12-08 16:38:42 +00:00
def repsert_in_db ( collection_name , selector , obj )
2009-01-07 16:46:38 +00:00
@semaphore . synchronize {
2009-01-16 19:41:53 +00:00
obj = @pk_factory . create_pk ( obj ) if @pk_factory
2009-01-07 16:46:38 +00:00
send_to_db ( UpdateMessage . new ( @name , collection_name , selector , obj , true ) )
obj
}
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Return the number of records in +collection_name+ that match
# +selector+. If +selector+ is +nil+ or an empty hash, returns the
# count of all records. Normally called by Collection#count.
def count ( collection_name , selector = { } )
2008-12-08 16:38:42 +00:00
oh = OrderedHash . new
oh [ :count ] = collection_name
2008-12-17 16:43:08 +00:00
oh [ :query ] = selector || { }
2008-12-08 16:38:42 +00:00
doc = db_command ( oh )
2008-12-16 22:35:31 +00:00
return doc [ 'n' ] . to_i if ok? ( doc )
2008-12-02 00:36:20 +00:00
raise " Error with count command: #{ doc . inspect } "
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Drop index +name+ from +collection_name+. Normally called from
# Collection#drop_index or Collection#drop_indexes.
2008-12-08 16:38:42 +00:00
def drop_index ( collection_name , name )
oh = OrderedHash . new
oh [ :deleteIndexes ] = collection_name
oh [ :index ] = name
doc = db_command ( oh )
2008-12-16 22:35:31 +00:00
raise " Error with drop_index command: #{ doc . inspect } " unless ok? ( doc )
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Return an array of hashes, one for each index on +collection_name+.
# Normally called by Collection#index_information. Each hash contains:
#
# :name :: Index name
#
# :keys :: Hash whose keys are the names of the fields that make up
# the key and values are integers.
#
# :ns :: Namespace; same as +collection_name+.
2008-12-08 16:38:42 +00:00
def index_information ( collection_name )
sel = { :ns = > full_coll_name ( collection_name ) }
2009-01-13 20:38:46 +00:00
query ( Collection . new ( self , SYSTEM_INDEX_COLLECTION ) , Query . new ( sel ) ) . collect { | row |
2008-11-22 01:00:51 +00:00
h = { :name = > row [ 'name' ] }
2008-12-08 16:38:42 +00:00
raise " Name of index on return from db was nil. Coll = #{ full_coll_name ( collection_name ) } " unless h [ :name ]
2008-11-22 01:00:51 +00:00
2008-12-08 13:33:29 +00:00
h [ :keys ] = row [ 'key' ]
2008-12-08 16:38:42 +00:00
raise " Keys for index on return from db was nil. Coll = #{ full_coll_name ( collection_name ) } " unless h [ :keys ]
2008-11-22 01:00:51 +00:00
h [ :ns ] = row [ 'ns' ]
2008-12-08 16:38:42 +00:00
raise " Namespace for index on return from db was nil. Coll = #{ full_coll_name ( collection_name ) } " unless h [ :ns ]
2008-11-22 01:00:51 +00:00
h [ :ns ] . sub! ( / .* \ . / , '' )
2008-12-08 16:38:42 +00:00
raise " Error: ns != collection " unless h [ :ns ] == collection_name
2008-11-22 01:00:51 +00:00
h
}
end
2008-12-17 16:43:08 +00:00
# Create a new index on +collection_name+ named +index_name+. +fields+
# should be an array of field names. Normally called by
# Collection#create_index.
2008-12-08 16:38:42 +00:00
def create_index ( collection_name , index_name , fields )
sel = { :name = > index_name , :ns = > full_coll_name ( collection_name ) }
2008-11-22 01:00:51 +00:00
field_h = { }
fields . each { | f | field_h [ f ] = 1 }
2008-12-08 16:38:42 +00:00
sel [ :key ] = field_h
2009-01-07 16:46:38 +00:00
@semaphore . synchronize {
send_to_db ( InsertMessage . new ( @name , SYSTEM_INDEX_COLLECTION , sel ) )
}
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Insert +objects+ into +collection_name+. Normally called by
2009-01-16 21:10:52 +00:00
# Collection#insert. Returns a new array containing +objects+,
# possibly modified by @pk_factory.
2008-12-08 16:38:42 +00:00
def insert_into_db ( collection_name , objects )
2009-01-07 16:46:38 +00:00
@semaphore . synchronize {
2009-01-16 21:10:52 +00:00
objects . collect { | o |
2009-01-16 19:41:53 +00:00
o = @pk_factory . create_pk ( o ) if @pk_factory
send_to_db ( InsertMessage . new ( @name , collection_name , o ) )
2009-01-16 21:10:52 +00:00
o
2009-01-16 19:41:53 +00:00
}
2009-01-07 16:46:38 +00:00
}
2008-11-22 01:00:51 +00:00
end
def send_to_db ( message )
@socket . print ( message . buf . to_s )
end
2008-12-08 16:38:42 +00:00
def full_coll_name ( collection_name )
" #{ @name } . #{ collection_name } "
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Return +true+ if +doc+ contains an 'ok' field with the value 1.
2008-12-16 22:35:31 +00:00
def ok? ( doc )
ok = doc [ 'ok' ]
ok . kind_of? ( Numeric ) && ok . to_i == 1
end
2008-12-08 16:38:42 +00:00
# DB commands need to be ordered, so selector must be an OrderedHash
# (or a Hash with only one element). What DB commands really need is
# that the "command" key be first.
2009-01-07 20:36:12 +00:00
#
# Do not call this. Intended for driver use only.
2008-11-22 01:00:51 +00:00
def db_command ( selector )
2008-12-08 16:38:42 +00:00
if ! selector . kind_of? ( OrderedHash )
if ! selector . kind_of? ( Hash ) || selector . keys . length > 1
raise " db_command must be given an OrderedHash when there is more than one key "
end
end
2008-11-22 01:00:51 +00:00
q = Query . new ( selector )
q . number_to_return = 1
2009-01-13 20:38:46 +00:00
query ( Collection . new ( self , SYSTEM_COMMAND_COLLECTION ) , q ) . next_object
2008-11-22 01:00:51 +00:00
end
end
end
end
end