diff --git a/src/as3/mongo/db/DB.as b/src/as3/mongo/db/DB.as index 16f19d0..7cd4a99 100644 --- a/src/as3/mongo/db/DB.as +++ b/src/as3/mongo/db/DB.as @@ -17,7 +17,7 @@ package as3.mongo.db { protected var _authenticated:Signal = new Signal(DB); protected var _authenticationProblem:Signal = new Signal(DB); - protected var _connected:Signal = new Signal(DB); +// protected var _connected:Signal = new Signal(DB); protected var _connectionFailed:Signal = new Signal(DB); protected var _socketPolicyFileError:Signal = new Signal(DB); @@ -41,12 +41,12 @@ package as3.mongo.db { return _connectionFailed; } - + /* public function get connected():Signal { return _connected; } - + */ public function get authenticationProblem():Signal { return _authenticationProblem; @@ -76,12 +76,12 @@ package as3.mongo.db { return (_credentials is Credentials); } - + /* public function get isConnected():Boolean { return _isConnected; } - + */ public function get port():uint { return _port; @@ -114,14 +114,14 @@ package as3.mongo.db private function _initialize(databaseName:String, databaseHost:String, databasePort:uint):void { - _wire = new Wire(this); - _collections = new Dictionary(); _authenticationFactory = new AuthenticationFactory(); _name = databaseName; _host = databaseHost; _port = databasePort; + + _wire = new Wire(this); } public function setCredentials(dbCredentials:Credentials):void @@ -165,12 +165,12 @@ package as3.mongo.db { return wire.runCommand(command); } - + /* public function connect():void { wire.connect(); } - + */ public function insert(collectionName:String, document:Document):void { wire.insert(name, collectionName, document); diff --git a/src/as3/mongo/db/document/Document.as b/src/as3/mongo/db/document/Document.as index 26242c6..5c9e540 100644 --- a/src/as3/mongo/db/document/Document.as +++ b/src/as3/mongo/db/document/Document.as @@ -1,7 +1,7 @@ package as3.mongo.db.document { import as3.mongo.error.MongoError; - + import org.serialization.bson.ObjectID; public class Document @@ -71,12 +71,86 @@ package as3.mongo.db.document { return _values[index]; } + + /** + * ERICSOCO ADDED + */ + public function getValueByKey(key:String):* + { + var index:int = _keys.indexOf(key); + if (index == -1) { return null; } + else { return _values[index]; } + } public function put(key:String, value:*):void { + /** + * ERICSOCO ADDED + */ + var existingIndex:int = _keys.indexOf(key); + if (existingIndex != -1) { + _values[existingIndex] = value; + return; + } + const nextIndex:Number = _keys.length; _keys[nextIndex] = key; _values[nextIndex] = value; } + + /** + * ERICSOCO ADDED + */ + public function remove (key:String) :void { + var existingIndex:int = _keys.indexOf(key); + if (existingIndex != -1) { + _keys.splice(existingIndex, 1); + _values.splice(existingIndex, 1); + } + } + + /** + * ERICSOCO ADDED + */ + public function toString (tabLevel:int=0) :String { + function addTabs () :String { + var tabStr:String = ""; + for (var t:int=0; t------------------------------------// + private function beginSequence () :void { + initReplyLoader(); + doQuery(); + } + + private function initReplyLoader () :void { + replyLoader.initializeOpReplyLoader(socket); + replyLoader.LOADED.addOnce(onReplyLoaded); + } + + private function doQuery () :void { + socket.writeBytes(query.toByteArray()); + socket.flush(); + } + + private function onReplyLoaded (reply:OpReply) :void { + if (cursor) { + cursor.onReplyLoaded(reply); + } + + closeSocket(); + dispatchEvent(new Event(Event.COMPLETE)); + } + //----------------------------------------// + + + + //----------------------------------------// + private function initSocket () :void { + socket = new Socket(); + socket.addEventListener(IOErrorEvent.IO_ERROR, onSocketReady); + socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, onSocketReady); + socket.addEventListener(Event.CONNECT, onSocketReady); + } + + private function onSocketReady (evt:Event) :void { + var socket:Socket = evt.target as Socket; + if (!socket) { return; } + socket.removeEventListener(IOErrorEvent.IO_ERROR, onSocketReady); + socket.removeEventListener(SecurityErrorEvent.SECURITY_ERROR, onSocketReady); + socket.removeEventListener(Event.CONNECT, onSocketReady); + + if (evt is ErrorEvent) { + dispatchEvent(evt); + return; + } + + beginSequence(); + } + + private function closeSocket () :void { + try { + socket.close(); + } catch (e:Error) {} + + socket = null; + } + //---------------------------------------// + } +} \ No newline at end of file diff --git a/src/as3/mongo/wire/Wire.as b/src/as3/mongo/wire/Wire.as index 92442b7..382ab63 100644 --- a/src/as3/mongo/wire/Wire.as +++ b/src/as3/mongo/wire/Wire.as @@ -5,6 +5,7 @@ package as3.mongo.wire import as3.mongo.error.MongoError; import as3.mongo.wire.cursor.Cursor; import as3.mongo.wire.cursor.GetMoreMessage; + import as3.mongo.wire.messages.IMessage; import as3.mongo.wire.messages.MessageFactory; import as3.mongo.wire.messages.client.FindOptions; import as3.mongo.wire.messages.client.OpDelete; @@ -16,204 +17,177 @@ package as3.mongo.wire import as3.mongo.wire.messages.database.OpReply; import as3.mongo.wire.messages.database.OpReplyLoader; + import flash.events.Event; + import flash.events.IOErrorEvent; + import flash.events.SecurityErrorEvent; import flash.net.Socket; + import flash.utils.Dictionary; import org.osflash.signals.Signal; import org.serialization.bson.Int64; - public class Wire - { - protected var _socket:Socket; - protected var _messageFactory:MessageFactory; - - protected var _db:DB; - protected var _isConnected:Boolean; - protected var _messenger:Messenger; - protected var _connector:Connector; - protected var _cursorFactory:CursorFactory; - protected var _activeCursors:Array; - - public function get connector():Connector - { - return _connector; - } - - public function get messenger():Messenger - { - return _messenger; - } - - public function get db():DB - { - return _db; - } - - public function get messageFactory():MessageFactory - { - return _messageFactory; - } - - public function get socket():Socket - { - return _socket; - } - - public function get isConnected():Boolean - { - return _isConnected; - } - - public function Wire(db:DB) - { - _initializeWire(db); - } - - private function _initializeWire(db:DB):void - { - _db = db; - _socket = new Socket(); - _messenger = new Messenger(this); - _connector = new Connector(this); - _messageFactory = new MessageFactory(); - _cursorFactory = new CursorFactory(); - _activeCursors = new Array(); - } - - public function connect():void - { - _connector.connect(); - } - - internal function setConnected(value:Boolean):void - { - _isConnected = value; - } - - public function runCommand(command:Document):Signal - { - _checkIfSocketIsConnected(); - const opQuery:OpQuery = messageFactory.makeRunCommandOpQueryMessage(_db.name, "$cmd", command); - const opReplyLoader:OpReplyLoader = new OpReplyLoader(socket); - _messenger.sendMessage(opQuery); - - _activeOpReplyLoaders.push(opReplyLoader); - opReplyLoader.LOADED.addOnce(_onOpReplyLoaded); - return opReplyLoader.LOADED; - } - - private var _activeOpReplyLoaders:Vector. = new Vector.(); - - public function findOne(collectionName:String, query:Document, returnFields:Document=null):Signal - { - _checkIfSocketIsConnected(); - const opQuery:OpQuery = messageFactory.makeFindOneOpQueryMessage(_db.name, collectionName, query, returnFields); - const findOneOpReplyLoader:FindOneOpReplyLoader = new FindOneOpReplyLoader(socket); - _messenger.sendMessage(opQuery); - - _activeOpReplyLoaders.push(findOneOpReplyLoader); - findOneOpReplyLoader.LOADED.addOnce(_onOpReplyLoaded); - return findOneOpReplyLoader.LOADED; - } - - private function _onOpReplyLoaded(document:Object):void - { - const loadedLoaders:Vector. = new Vector.(); - var loader:OpReplyLoader - for each (loader in _activeOpReplyLoaders) - { - if (loader.isLoaded) - loadedLoaders.push(loader); - } - - for each (loader in loadedLoaders) - { - if (_activeOpReplyLoaders.lastIndexOf(loader) != -1) - _activeOpReplyLoaders.splice(_activeOpReplyLoaders.lastIndexOf(loader), 1); - } - } - - private function _checkIfSocketIsConnected():void - { - if (false === socket.connected) - throw new MongoError(MongoError.SOCKET_NOT_CONNECTED); - } - - // TODO: Write integration tests for this - public function insert(dbName:String, collectionName:String, document:Document):void - { - _checkIfSocketIsConnected(); + public class Wire { + private var db:DB; + private var messageFactory:MessageFactory; + + // for requests that just push bytes through the Socket and don't wait for a reply + private var noReplySocket:Socket; + private var pendingNoReplyMessages:Vector.; + + private var activeRequestSequences:Vector.; + + + public function Wire (db:DB) { + this.db = db; + init(); + } + + + //-------------------------------------------------// + //-----no-reply requests-----// + public function insert (dbName:String, collectionName:String, document:Document) :void { const opInsert:OpInsert = messageFactory.makeSaveOpInsertMessage(dbName, collectionName, document); - _messenger.sendMessage(opInsert); + sendMessage(opInsert); } - - // TODO: Write integration tests for this - public function remove(dbName:String, collectionName:String, selector:Document):void - { - _checkIfSocketIsConnected(); + + public function remove (dbName:String, collectionName:String, selector:Document) :void { const opDelete:OpDelete = messageFactory.makeRemoveOpDeleteMessage(dbName, collectionName, selector); - _messenger.sendMessage(opDelete); + sendMessage(opDelete); } - - // TODO: Write integration tests for this - public function updateFirst(dbName:String, collectionName:String, selector:Document, document:Document):void - { - _checkIfSocketIsConnected(); + + public function updateFirst (dbName:String, collectionName:String, selector:Document, document:Document) :void { const opUpdate:OpUpdate = messageFactory.makeUpdateFirstOpUpdateMessage(dbName, collectionName, selector, document); - _messenger.sendMessage(opUpdate); + sendMessage(opUpdate); } - - // TODO: Write integration tests for this - public function update(dbName:String, collectionName:String, selector:Document, modifier:Document):void - { - _checkIfSocketIsConnected(); + + public function update (dbName:String, collectionName:String, selector:Document, modifier:Document) :void { const opUpdate:OpUpdate = messageFactory.makeUpdateOpUpdateMessage(dbName, collectionName, selector, modifier); - _messenger.sendMessage(opUpdate); + sendMessage(opUpdate); } - - // TODO: Write integration tests for this - public function upsert(dbName:String, collectionName:String, selector:Document, document:Document):void - { - _checkIfSocketIsConnected(); + + public function upsert (dbName:String, collectionName:String, selector:Document, document:Document) :void { const opUpdate:OpUpdate = messageFactory.makeUpsertOpUpdateMessage(dbName, collectionName, selector, document); - _messenger.sendMessage(opUpdate); + sendMessage(opUpdate); } - - public function find(dbName:String, collectionName:String, query:Document, options:FindOptions=null):Signal - { - if (null == options) + + //-----request sequences-----// + public function find (dbName:String, collectionName:String, query:Document, options:FindOptions=null) :Signal { + if (null == options) { options = new FindOptions(); + } - _checkIfSocketIsConnected(); - const opQuery:OpQuery = messageFactory.makeFindOpQueryMessage(dbName, collectionName, query, options); - const opReplyLoader:OpReplyLoader = new OpReplyLoader(socket); - - const cursor:Cursor = _cursorFactory.getCursor(opReplyLoader); + const opQuery:OpQuery = messageFactory.makeFindOpQueryMessage(dbName, collectionName, query, options); + const opReplyLoader:OpReplyLoader = new OpReplyLoader(); + + var cursor:Cursor = new Cursor(); cursor.getMoreMessage = new GetMoreMessage(dbName, collectionName, options, this, cursor); - _activeCursors.push(cursor); - cursor.cursorReady.addOnce(_onCursorReady); - - _messenger.sendMessage(opQuery); + + launchRequestSequence(opQuery, opReplyLoader, cursor); + return cursor.cursorReady; } + + public function findOne (collectionName:String, query:Document, returnFields:Document=null) :Signal { + const opQuery:OpQuery = messageFactory.makeFindOneOpQueryMessage(db.name, collectionName, query, returnFields); + const findOneOpReplyLoader:FindOneOpReplyLoader = new FindOneOpReplyLoader(); + launchRequestSequence(opQuery, findOneOpReplyLoader); - private function _onCursorReady(cursor:Cursor):void - { - if (-1 < _activeCursors.lastIndexOf(cursor)) - { - _activeCursors.splice(_activeCursors.lastIndexOf(cursor), 1); - } + return findOneOpReplyLoader.LOADED; } - - public function getMore(dbName:String, collectionName:String, options:FindOptions, cursorID:Int64):Signal - { - _checkIfSocketIsConnected(); - - const opGetMore:OpGetMore = messageFactory.makeGetMoreOpGetMoreMessage(dbName, collectionName, options.numberToReturn, cursorID); - const opReplyLoader:OpReplyLoader = new OpReplyLoader(socket); - _messenger.sendMessage(opGetMore); - _activeOpReplyLoaders.push(opReplyLoader); - opReplyLoader.LOADED.addOnce(_onOpReplyLoaded); + + public function getMore (dbName:String, collectionName:String, options:FindOptions, cursorID:Int64) :Signal { + const opGetMore:OpGetMore = messageFactory.makeGetMoreOpGetMoreMessage(dbName, collectionName, options.numberToReturn, cursorID); + const opReplyLoader:OpReplyLoader = new OpReplyLoader(); + launchRequestSequence(opGetMore, opReplyLoader); + + return opReplyLoader.LOADED; + } + + public function runCommand (command:Document) :Signal { + const opQuery:OpQuery = messageFactory.makeRunCommandOpQueryMessage(db.name, "$cmd", command); + const opReplyLoader:OpReplyLoader = new OpReplyLoader(); + launchRequestSequence(opQuery, opReplyLoader); + return opReplyLoader.LOADED; } + //------------------------------------------------// + + + + private function init () :void { + initSocketResources(); + messageFactory = new MessageFactory(); + activeRequestSequences = new Vector.(); + } + + private function sendMessage (message:IMessage) :void { + if (!noReplySocket.connected) { + pendingNoReplyMessages.push(message); + return; + } + + noReplySocket.writeBytes(message.toByteArray()); + noReplySocket.flush(); + } + + private function launchRequestSequence (query:IMessage, replyLoader:OpReplyLoader, cursor:Cursor=null) :void { + var rs:RequestSequence = new RequestSequence(query, replyLoader, cursor); + + rs.addEventListener(IOErrorEvent.IO_ERROR, onRequestSequenceComplete); + rs.addEventListener(SecurityErrorEvent.SECURITY_ERROR, onRequestSequenceComplete); + rs.addEventListener(Event.COMPLETE, onRequestSequenceComplete); + + activeRequestSequences.push(rs); + rs.begin(db.host, db.port); + } + + private function onRequestSequenceComplete (evt:Event) :void { + var rs:RequestSequence = evt.target as RequestSequence; + if (!rs) { return; } + + rs.removeEventListener(IOErrorEvent.IO_ERROR, onRequestSequenceComplete); + rs.removeEventListener(SecurityErrorEvent.SECURITY_ERROR, onRequestSequenceComplete); + rs.removeEventListener(Event.COMPLETE, onRequestSequenceComplete); + + var i:int = activeRequestSequences.indexOf(rs); + if (i != -1) { + activeRequestSequences.splice(i, 1); + } + + if (evt is IOErrorEvent) { + db.connectionFailed.dispatch(db); + } else if (evt is SecurityErrorEvent) { + db.socketPolicyFileError.dispatch(db); + } + } + + private function initSocketResources () :void { + pendingNoReplyMessages = new Vector.(); + + noReplySocket = new Socket(); + noReplySocket.addEventListener(IOErrorEvent.IO_ERROR, onSocketReady); + noReplySocket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, onSocketReady); + noReplySocket.addEventListener(Event.CONNECT, onSocketReady); + noReplySocket.connect(db.host, db.port); + } + + private function onSocketReady (evt:Event) :void { + noReplySocket.removeEventListener(IOErrorEvent.IO_ERROR, onSocketReady); + noReplySocket.removeEventListener(SecurityErrorEvent.SECURITY_ERROR, onSocketReady); + noReplySocket.removeEventListener(Event.CONNECT, onSocketReady); + + if (evt is IOErrorEvent) { + db.connectionFailed.dispatch(db); + return; + } else if (evt is SecurityErrorEvent) { + db.socketPolicyFileError.dispatch(db); + return; + } + + for (var i:int=0; i