2008-12-17 16:49:06 +00:00
# --
2009-01-06 15:51:01 +00:00
# Copyright (C) 2008-2009 10gen Inc.
2008-11-22 01:00:51 +00:00
#
2009-02-15 13:24:14 +00:00
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
2008-11-22 01:00:51 +00:00
#
2009-02-15 13:24:14 +00:00
# http://www.apache.org/licenses/LICENSE-2.0
2008-11-22 01:00:51 +00:00
#
2009-02-15 13:24:14 +00:00
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
2008-12-17 16:49:06 +00:00
# ++
2008-11-22 01:00:51 +00:00
require 'socket'
2009-01-21 16:52:43 +00:00
require 'digest/md5'
2009-01-07 16:46:38 +00:00
require 'mutex_m'
2008-11-22 01:00:51 +00:00
require 'mongo/collection'
require 'mongo/message'
require 'mongo/query'
2008-12-08 16:38:42 +00:00
require 'mongo/util/ordered_hash.rb'
2009-01-07 20:36:12 +00:00
require 'mongo/admin'
2008-11-22 01:00:51 +00:00
module XGen
module Mongo
module Driver
2008-12-17 16:43:08 +00:00
# A Mongo database.
2008-11-22 01:00:51 +00:00
class DB
2008-12-17 16:43:08 +00:00
2008-11-22 01:00:51 +00:00
SYSTEM_NAMESPACE_COLLECTION = " system.namespaces "
SYSTEM_INDEX_COLLECTION = " system.indexes "
2009-01-07 20:58:54 +00:00
SYSTEM_PROFILE_COLLECTION = " system.profile "
2009-01-21 16:26:18 +00:00
SYSTEM_USER_COLLECTION = " system.users "
2008-11-22 01:00:51 +00:00
SYSTEM_COMMAND_COLLECTION = " $cmd "
2008-12-17 16:43:08 +00:00
# Strict mode enforces collection existence checks. When +true+,
# asking for a collection that does not exist or trying to create a
# collection that already exists raises an error.
#
# Strict mode is off (+false+) by default. Its value can be changed at
# any time.
2008-12-16 22:08:15 +00:00
attr_writer :strict
# Returns the value of the +strict+ flag.
def strict? ; @strict ; end
2008-12-17 16:43:08 +00:00
# The name of the database.
attr_reader :name
2009-01-14 23:37:28 +00:00
# Host to which we are currently connected.
attr_reader :host
# Port to which we are currently connected.
attr_reader :port
# An array of [host, port] pairs.
attr_reader :nodes
2009-01-13 18:08:04 +00:00
2009-01-14 20:36:17 +00:00
# The database's socket. For internal (and Cursor) use only.
2008-12-17 16:43:08 +00:00
attr_reader :socket
2008-11-22 01:00:51 +00:00
2009-01-23 16:47:22 +00:00
def slave_ok? ; @slave_ok ; end
def auto_reconnect? ; @auto_reconnect ; end
2009-01-16 19:41:53 +00:00
# A primary key factory object (or +nil+). See the README.doc file or
# DB#new for details.
attr_reader :pk_factory
2009-01-16 21:10:52 +00:00
def pk_factory = ( pk_factory )
raise " error: can not change PK factory " if @pk_factory
@pk_factory = pk_factory
end
2009-01-23 18:30:59 +00:00
# Instances of DB are normally obtained by calling Mongo#db.
#
2008-12-17 16:43:08 +00:00
# db_name :: The database name
#
2009-01-23 18:30:59 +00:00
# nodes :: An array of [host, port] pairs. See Mongo#new, which offers
# a more flexible way of defining nodes.
2008-12-17 16:43:08 +00:00
#
2009-01-16 19:41:53 +00:00
# options :: A hash of options.
#
# Options:
#
# :strict :: If true, collections must exist to be accessed and must
# not exist to be created. See #collection and
# #create_collection.
#
# :pk :: A primary key factory object that must respond to :create_pk,
# which should take a hash and return a hash which merges the
# original hash with any primary key fields the factory wishes
# to inject. (NOTE: if the object already has a primary key,
# the factory should not inject a new key; this means that the
# object is being used in a repsert but it already exists.) The
# idea here is that when ever a record is inserted, the :pk
# object's +create_pk+ method will be called and the new hash
# returned will be inserted.
#
2009-01-23 16:47:22 +00:00
# :slave_ok :: Only used if +nodes+ contains only one host/port. If
# false, when connecting to that host/port we check to
# see if the server is the master. If it is not, an error
# is thrown.
#
# :auto_reconnect :: If the connection gets closed (for example, we
# have a server pair and saw the "not master"
# error, which closes the connection), then
# automatically try to reconnect to the master or
# to the single server we have been given. Defaults
# to +false+.
#
# When a DB object first connects to a pair, it will find the master
# instance and connect to that one. On socket error or if we recieve a
# "not master" error, we again find the master of the pair.
2009-01-16 19:41:53 +00:00
def initialize ( db_name , nodes , options = { } )
2009-01-20 20:59:07 +00:00
raise " Invalid DB name \" #{ db_name } \" (must be non-nil, non-zero-length, and can not contain \" . \" ) " if ! db_name || ( db_name && db_name . length > 0 && db_name . include? ( " . " ) )
2009-01-14 23:37:28 +00:00
@name , @nodes = db_name , nodes
2009-01-16 19:41:53 +00:00
@strict = options [ :strict ]
@pk_factory = options [ :pk ]
2009-01-23 16:47:22 +00:00
@slave_ok = options [ :slave_ok ] && @nodes . length == 1 # only OK if one node
@auto_reconnect = options [ :auto_reconnect ]
2009-01-07 16:46:38 +00:00
@semaphore = Object . new
@semaphore . extend Mutex_m
2009-02-05 20:59:44 +00:00
@socket = nil
2009-01-23 16:47:22 +00:00
connect_to_master
2009-01-14 23:37:28 +00:00
end
2009-01-23 16:47:22 +00:00
def connect_to_master
2009-01-14 23:37:28 +00:00
close if @socket
@host = @port = nil
@nodes . detect { | hp |
@host , @port = * hp
begin
@socket = TCPSocket . new ( @host , @port )
2009-02-10 18:32:40 +00:00
@socket . setsockopt ( Socket :: IPPROTO_TCP , Socket :: TCP_NODELAY , 1 )
2009-01-23 16:47:22 +00:00
# Check for master. Can't call master? because it uses mutex,
# which may already be in use during this call.
semaphore_is_locked = @semaphore . locked?
@semaphore . unlock if semaphore_is_locked
is_master = master?
@semaphore . lock if semaphore_is_locked
break if @slave_ok || is_master
2009-03-12 20:40:02 +00:00
rescue SocketError , SystemCallError , IOError = > ex
2009-01-14 23:37:28 +00:00
close if @socket
end
@socket
}
raise " error: failed to connect to any given host:port " unless @socket
2008-11-22 01:00:51 +00:00
end
2009-01-21 16:26:18 +00:00
# Returns true if +username+ has +password+ in
# +SYSTEM_USER_COLLECTION+. +name+ is username, +password+ is
# plaintext password.
def authenticate ( username , password )
doc = db_command ( :getnonce = > 1 )
raise " error retrieving nonce: #{ doc } " unless ok? ( doc )
nonce = doc [ 'nonce' ]
auth = OrderedHash . new
auth [ 'authenticate' ] = 1
auth [ 'user' ] = username
auth [ 'nonce' ] = nonce
2009-01-30 21:49:19 +00:00
auth [ 'key' ] = Digest :: MD5 . hexdigest ( " #{ nonce } #{ username } #{ hash_password ( username , password ) } " )
2009-01-21 16:26:18 +00:00
ok? ( db_command ( auth ) )
end
2009-01-21 16:52:43 +00:00
# Deauthorizes use for this database for this connection.
def logout
doc = db_command ( :logout = > 1 )
raise " error logging out: #{ doc . inspect } " unless ok? ( doc )
end
2008-12-17 16:43:08 +00:00
# Returns an array of collection names. Each name is of the form
# "database_name.collection_name".
2008-11-22 01:00:51 +00:00
def collection_names
names = collections_info . collect { | doc | doc [ 'name' ] || '' }
names . delete ( '' )
names
end
2008-12-17 16:43:08 +00:00
# Returns a cursor over query result hashes. Each hash contains a
# 'name' string and optionally an 'options' hash. If +coll_name+ is
# specified, an array of length 1 is returned.
2008-11-22 01:00:51 +00:00
def collections_info ( coll_name = nil )
selector = { }
2008-12-17 16:43:08 +00:00
selector [ :name ] = full_coll_name ( coll_name ) if coll_name
2009-01-13 20:38:46 +00:00
query ( Collection . new ( self , SYSTEM_NAMESPACE_COLLECTION ) , Query . new ( selector ) )
2008-11-22 01:00:51 +00:00
end
2008-12-16 22:08:15 +00:00
# Create a collection. If +strict+ is false, will return existing or
# new collection. If +strict+ is true, will raise an error if
2008-12-17 16:43:08 +00:00
# collection +name+ already exists.
2008-12-17 18:14:42 +00:00
#
# Options is an optional hash:
#
# :capped :: Boolean. If not specified, capped is +false+.
#
# :size :: If +capped+ is +true+, specifies the maximum number of
# bytes. If +false+, specifies the initial extent of the
# collection.
#
# :max :: Max number of records in a capped collection. Optional.
2008-11-22 01:00:51 +00:00
def create_collection ( name , options = { } )
# First check existence
2008-12-16 22:08:15 +00:00
if collection_names . include? ( full_coll_name ( name ) )
if strict?
raise " Collection #{ name } already exists. Currently in strict mode. "
else
return Collection . new ( self , name )
end
end
2008-11-22 01:00:51 +00:00
# Create new collection
2008-12-08 16:38:42 +00:00
oh = OrderedHash . new
oh [ :create ] = name
2008-12-17 18:14:42 +00:00
doc = db_command ( oh . merge ( options || { } ) )
2008-12-16 22:35:31 +00:00
ok = doc [ 'ok' ]
return Collection . new ( self , name ) if ok . kind_of? ( Numeric ) && ( ok . to_i == 1 || ok . to_i == 0 )
2008-11-22 01:00:51 +00:00
raise " Error creating collection: #{ doc . inspect } "
end
def admin
Admin . new ( self )
end
2008-12-16 22:08:15 +00:00
# Return a collection. If +strict+ is false, will return existing or
# new collection. If +strict+ is true, will raise an error if
2008-12-17 16:43:08 +00:00
# collection +name+ does not already exists.
2008-11-22 01:00:51 +00:00
def collection ( name )
2009-01-16 18:24:49 +00:00
return Collection . new ( self , name ) if ! strict? || collection_names . include? ( full_coll_name ( name ) )
raise " Collection #{ name } doesn't exist. Currently in strict mode. "
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Drop collection +name+. Returns +true+ on success or if the
# collection does not exist, +false+ otherwise.
2008-11-22 01:00:51 +00:00
def drop_collection ( name )
2008-12-17 16:43:08 +00:00
return true unless collection_names . include? ( full_coll_name ( name ) )
2008-12-16 22:35:31 +00:00
ok? ( db_command ( :drop = > name ) )
end
2009-01-23 18:47:27 +00:00
# Returns the error message from the most recently executed database
# operation for this connection, or +nil+ if there was no error.
#
# Note: as of this writing, errors are only detected on the db server
# for certain kinds of operations (writes). The plan is to change this
# so that all operations will set the error if needed.
def error
doc = db_command ( :getlasterror = > 1 )
raise " error retrieving last error: #{ doc } " unless ok? ( doc )
doc [ 'err' ]
end
2009-01-23 18:54:57 +00:00
# Returns +true+ if an error was caused by the most recently executed
# database operation.
2009-01-23 18:47:27 +00:00
#
# Note: as of this writing, errors are only detected on the db server
# for certain kinds of operations (writes). The plan is to change this
# so that all operations will set the error if needed.
def error?
error != nil
end
2008-12-16 22:35:31 +00:00
# Returns true if this database is a master (or is not paired with any
# other database), false if it is a slave.
def master?
doc = db_command ( :ismaster = > 1 )
is_master = doc [ 'ismaster' ]
ok? ( doc ) && is_master . kind_of? ( Numeric ) && is_master . to_i == 1
2008-11-22 01:00:51 +00:00
end
2009-01-13 18:08:04 +00:00
# Returns a string of the form "host:port" that points to the master
# database. Works even if this is the master database.
def master
doc = db_command ( :ismaster = > 1 )
is_master = doc [ 'ismaster' ]
2009-01-20 20:59:07 +00:00
raise " Error retrieving master database: #{ doc . inspect } " unless ok? ( doc ) && is_master . kind_of? ( Numeric )
2009-01-13 18:08:04 +00:00
case is_master . to_i
when 1
" #@host : #@port "
else
doc [ 'remote' ]
end
end
2008-12-17 16:43:08 +00:00
# Close the connection to the database.
2008-11-22 01:00:51 +00:00
def close
2009-02-05 14:53:10 +00:00
if @socket
s = @socket
@socket = nil
s . close
end
2009-01-14 23:37:28 +00:00
end
def connected?
@socket != nil
2008-11-22 01:00:51 +00:00
end
2009-03-04 19:28:00 +00:00
def receive_full ( length )
message = " "
while message . length < length do
chunk = @socket . recv ( length - message . length )
raise " connection closed " unless chunk . length > 0
message += chunk
end
message
end
2008-12-17 16:43:08 +00:00
# Send a MsgMessage to the database.
2008-11-22 01:00:51 +00:00
def send_message ( msg )
send_to_db ( MsgMessage . new ( msg ) )
end
2009-02-26 17:06:03 +00:00
2009-01-13 19:02:16 +00:00
# Returns a Cursor over the query results.
#
# Note that the query gets sent lazily; the cursor calls
# #send_query_message when needed. If the caller never requests an
# object from the cursor, the query never gets sent.
2009-01-13 20:38:46 +00:00
def query ( collection , query )
2009-01-13 20:51:41 +00:00
Cursor . new ( self , collection , query )
2009-01-13 19:02:16 +00:00
end
# Used by a Cursor to lazily send the query to the database.
def send_query_message ( query_message )
2009-01-07 16:46:38 +00:00
@semaphore . synchronize {
2009-01-13 19:02:16 +00:00
send_to_db ( query_message )
2009-01-07 16:46:38 +00:00
}
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Remove the records that match +selector+ from +collection_name+.
# Normally called by Collection#remove or Collection#clear.
2008-12-08 16:38:42 +00:00
def remove_from_db ( collection_name , selector )
2009-01-07 16:46:38 +00:00
@semaphore . synchronize {
send_to_db ( RemoveMessage . new ( @name , collection_name , selector ) )
}
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Update records in +collection_name+ that match +selector+ by
# applying +obj+ as an update. Normally called by Collection#replace.
2008-12-08 16:38:42 +00:00
def replace_in_db ( collection_name , selector , obj )
2009-01-07 16:46:38 +00:00
@semaphore . synchronize {
send_to_db ( UpdateMessage . new ( @name , collection_name , selector , obj , false ) )
}
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Alias for #replace_in_db. Normally called by Collection.modify.
2008-11-22 01:00:51 +00:00
alias_method :modify_in_db , :replace_in_db
2008-12-17 16:43:08 +00:00
# Update records in +collection_name+ that match +selector+ by
# applying +obj+ as an update. If no match, inserts (???). Normally
# called by Collection#repsert.
2008-12-08 16:38:42 +00:00
def repsert_in_db ( collection_name , selector , obj )
2009-01-07 16:46:38 +00:00
@semaphore . synchronize {
2009-01-16 19:41:53 +00:00
obj = @pk_factory . create_pk ( obj ) if @pk_factory
2009-01-07 16:46:38 +00:00
send_to_db ( UpdateMessage . new ( @name , collection_name , selector , obj , true ) )
obj
}
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Return the number of records in +collection_name+ that match
# +selector+. If +selector+ is +nil+ or an empty hash, returns the
# count of all records. Normally called by Collection#count.
def count ( collection_name , selector = { } )
2008-12-08 16:38:42 +00:00
oh = OrderedHash . new
oh [ :count ] = collection_name
2008-12-17 16:43:08 +00:00
oh [ :query ] = selector || { }
2008-12-08 16:38:42 +00:00
doc = db_command ( oh )
2008-12-16 22:35:31 +00:00
return doc [ 'n' ] . to_i if ok? ( doc )
2008-12-02 00:36:20 +00:00
raise " Error with count command: #{ doc . inspect } "
2008-11-22 01:00:51 +00:00
end
2009-03-13 15:03:52 +00:00
# Evaluate a JavaScript expression on MongoDB.
# +code+ should be a string or Code instance containing a JavaScript
# expression. Additional arguments will be passed to that expression
# when it is run on the server.
def eval ( code , * args )
if not code . is_a? Code
code = Code . new ( code )
end
oh = OrderedHash . new
oh [ : $eval ] = code
oh [ :args ] = args
doc = db_command ( oh )
return doc [ 'retval' ] if ok? ( doc )
raise " Error with eval command: #{ doc . inspect } "
end
2008-12-17 16:43:08 +00:00
# Drop index +name+ from +collection_name+. Normally called from
# Collection#drop_index or Collection#drop_indexes.
2008-12-08 16:38:42 +00:00
def drop_index ( collection_name , name )
oh = OrderedHash . new
oh [ :deleteIndexes ] = collection_name
oh [ :index ] = name
doc = db_command ( oh )
2008-12-16 22:35:31 +00:00
raise " Error with drop_index command: #{ doc . inspect } " unless ok? ( doc )
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Return an array of hashes, one for each index on +collection_name+.
# Normally called by Collection#index_information. Each hash contains:
#
# :name :: Index name
#
# :keys :: Hash whose keys are the names of the fields that make up
# the key and values are integers.
#
# :ns :: Namespace; same as +collection_name+.
2008-12-08 16:38:42 +00:00
def index_information ( collection_name )
sel = { :ns = > full_coll_name ( collection_name ) }
2009-01-13 20:38:46 +00:00
query ( Collection . new ( self , SYSTEM_INDEX_COLLECTION ) , Query . new ( sel ) ) . collect { | row |
2008-11-22 01:00:51 +00:00
h = { :name = > row [ 'name' ] }
2008-12-08 16:38:42 +00:00
raise " Name of index on return from db was nil. Coll = #{ full_coll_name ( collection_name ) } " unless h [ :name ]
2008-11-22 01:00:51 +00:00
2008-12-08 13:33:29 +00:00
h [ :keys ] = row [ 'key' ]
2008-12-08 16:38:42 +00:00
raise " Keys for index on return from db was nil. Coll = #{ full_coll_name ( collection_name ) } " unless h [ :keys ]
2008-11-22 01:00:51 +00:00
h [ :ns ] = row [ 'ns' ]
2008-12-08 16:38:42 +00:00
raise " Namespace for index on return from db was nil. Coll = #{ full_coll_name ( collection_name ) } " unless h [ :ns ]
2008-11-22 01:00:51 +00:00
h [ :ns ] . sub! ( / .* \ . / , '' )
2008-12-08 16:38:42 +00:00
raise " Error: ns != collection " unless h [ :ns ] == collection_name
2008-11-22 01:00:51 +00:00
h
}
end
2009-02-26 17:06:03 +00:00
# Create a new index on +collection_name+. +field_or_spec+
# should be either a single field name or a Array of [field name,
# direction] pairs. Directions should be specified as
# XGen::Mongo::ASCENDING or XGen::Mongo::DESCENDING. Normally called
2009-04-21 18:44:57 +00:00
# by Collection#create_index. If +unique+ is true the index will
# enforce a uniqueness constraint.
def create_index ( collection_name , field_or_spec , unique = false )
2009-02-26 17:06:03 +00:00
field_h = OrderedHash . new
if field_or_spec . is_a? String
field_h [ field_or_spec ] = 1
else
field_or_spec . each { | f | field_h [ f [ 0 ] ] = f [ 1 ] }
end
name = gen_index_name ( field_h )
sel = {
:name = > name ,
:ns = > full_coll_name ( collection_name ) ,
2009-04-21 18:44:57 +00:00
:key = > field_h ,
:unique = > unique
2009-02-26 17:06:03 +00:00
}
2009-01-07 16:46:38 +00:00
@semaphore . synchronize {
send_to_db ( InsertMessage . new ( @name , SYSTEM_INDEX_COLLECTION , sel ) )
}
2009-02-26 17:06:03 +00:00
name
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Insert +objects+ into +collection_name+. Normally called by
2009-01-16 21:10:52 +00:00
# Collection#insert. Returns a new array containing +objects+,
# possibly modified by @pk_factory.
2008-12-08 16:38:42 +00:00
def insert_into_db ( collection_name , objects )
2009-01-07 16:46:38 +00:00
@semaphore . synchronize {
2009-01-16 21:10:52 +00:00
objects . collect { | o |
2009-01-16 19:41:53 +00:00
o = @pk_factory . create_pk ( o ) if @pk_factory
send_to_db ( InsertMessage . new ( @name , collection_name , o ) )
2009-01-16 21:10:52 +00:00
o
2009-01-16 19:41:53 +00:00
}
2009-01-07 16:46:38 +00:00
}
2008-11-22 01:00:51 +00:00
end
def send_to_db ( message )
2009-01-23 16:47:22 +00:00
connect_to_master if ! connected? && @auto_reconnect
begin
@socket . print ( message . buf . to_s )
2009-02-05 14:53:10 +00:00
@socket . flush
2009-01-23 16:47:22 +00:00
rescue = > ex
close
raise ex
end
2008-11-22 01:00:51 +00:00
end
2008-12-08 16:38:42 +00:00
def full_coll_name ( collection_name )
" #{ @name } . #{ collection_name } "
2008-11-22 01:00:51 +00:00
end
2008-12-17 16:43:08 +00:00
# Return +true+ if +doc+ contains an 'ok' field with the value 1.
2008-12-16 22:35:31 +00:00
def ok? ( doc )
ok = doc [ 'ok' ]
ok . kind_of? ( Numeric ) && ok . to_i == 1
end
2008-12-08 16:38:42 +00:00
# DB commands need to be ordered, so selector must be an OrderedHash
# (or a Hash with only one element). What DB commands really need is
# that the "command" key be first.
2009-01-07 20:36:12 +00:00
#
# Do not call this. Intended for driver use only.
2008-11-22 01:00:51 +00:00
def db_command ( selector )
2008-12-08 16:38:42 +00:00
if ! selector . kind_of? ( OrderedHash )
if ! selector . kind_of? ( Hash ) || selector . keys . length > 1
raise " db_command must be given an OrderedHash when there is more than one key "
end
end
2008-11-22 01:00:51 +00:00
q = Query . new ( selector )
q . number_to_return = 1
2009-01-13 20:38:46 +00:00
query ( Collection . new ( self , SYSTEM_COMMAND_COLLECTION ) , q ) . next_object
2008-11-22 01:00:51 +00:00
end
2009-01-21 16:26:18 +00:00
private
2009-01-30 21:49:19 +00:00
def hash_password ( username , plaintext )
Digest :: MD5 . hexdigest ( " #{ username } :mongo: #{ plaintext } " )
2009-01-21 16:26:18 +00:00
end
2009-02-26 17:06:03 +00:00
def gen_index_name ( spec )
temp = [ ]
spec . each_pair { | field , direction |
temp = temp . push ( " #{ field } _ #{ direction } " )
}
return temp . join ( " _ " )
end
2008-11-22 01:00:51 +00:00
end
end
end
end