diff --git a/docker-compose.yaml b/docker-compose.yaml index bff4a19ca..01441426a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -114,7 +114,7 @@ services: - 5555 command: | bash -c " - avalon_listener --bind http://avalon-listener:1947 --lmdb_url http://avalon-lmdb:9090 + avalon_listener --bind http://avalon-listener:1947 --lmdb_url http://avalon-lmdb:9090 --zmq_url tcp://avalon-enclave-manager:5555 tail -f /dev/null " depends_on: diff --git a/enclave_manager/avalon_enclave_manager/work_order_kv_delegate.py b/enclave_manager/avalon_enclave_manager/work_order_kv_delegate.py index 0a2dd7bb3..8fdbbf1fe 100644 --- a/enclave_manager/avalon_enclave_manager/work_order_kv_delegate.py +++ b/enclave_manager/avalon_enclave_manager/work_order_kv_delegate.py @@ -124,49 +124,49 @@ def update_receipt(self, wo_id, wo_json_resp): the receipt. """ receipt_entry = self._kv_helper.get("wo-receipts", wo_id) - if receipt_entry: - update_type = None - if "error" in wo_json_resp and \ - wo_json_resp["error"]["code"] != \ - WorkOrderStatus.PENDING.value: - update_type = ReceiptCreateStatus.FAILED.value - else: - update_type = ReceiptCreateStatus.PROCESSED.value - receipt_obj = WorkOrderReceiptRequest() - wo_receipt = receipt_obj.update_receipt( - wo_id, - update_type, - wo_json_resp, - self.private_key - ) - updated_receipt = None - # load previous updates to receipt - updates_to_receipt = self._kv_helper.get( - "wo-receipt-updates", wo_id) - # If it is first update to receipt - if updates_to_receipt is None: - updated_receipt = [] - else: - updated_receipt = json.loads(updates_to_receipt) - # Get the last update to receipt - last_receipt = updated_receipt[len(updated_receipt) - 1] - - # If receipt updateType is completed, - # then no further update allowed - if last_receipt["updateType"] == \ - ReceiptCreateStatus.COMPLETED.value: - logger.info( - "Receipt for the workorder id %s is completed " + - "and no further updates are allowed", - wo_id) - return - updated_receipt.append(wo_receipt) - - # Since receipts_json is jrpc request updating only params object. - self._kv_helper.set("wo-receipt-updates", wo_id, json.dumps( - updated_receipt)) - logger.info("Receipt for the workorder id %s is updated to %s", - wo_id, wo_receipt) + # If receipt is not created yet, add tag "receiptUpdates" to + # Receipt entry and update it + if receipt_entry is None: + receipt_entry = { + "params": { + "receiptUpdates": [] + } + } + # load previous updates to receipt + receipt_update_entry = receipt_entry["params"]["receiptUpdates"] + update_type = None + if "error" in wo_json_resp and \ + wo_json_resp["error"]["code"] != \ + WorkOrderStatus.PENDING.value: + update_type = ReceiptCreateStatus.FAILED.value else: - logger.info("Work order receipt is not created, " + - "so skipping the update") + update_type = ReceiptCreateStatus.PROCESSED.value + receipt_obj = WorkOrderReceiptRequest() + wo_receipt = receipt_obj.update_receipt( + wo_id, + update_type, + wo_json_resp, + self.private_key + ) + + # If it is first update to receipt + if len(receipt_update_entry) > 0: + # Get the last update to receipt + last_receipt = receipt_update_entry[len(receipt_update_entry) - 1] + # If receipt updateType is completed, + # then no further update allowed + if last_receipt["updateType"] == \ + ReceiptCreateStatus.COMPLETED.value: + logger.info( + "Receipt for the workorder id %s is completed " + + "and no further updates are allowed", + wo_id) + return + receipt_update_entry.append(wo_receipt) + + # Since receipts_json is jrpc request updating only params object. + receipt_entry["receiptUpdates"] = receipt_update_entry + self._kv_helper.set("wo-receipts", wo_id, json.dumps( + receipt_entry)) + logger.info("Receipt for the workorder id %s is updated to %s", + wo_id, wo_receipt) diff --git a/listener/avalon_listener/tcs_listener.py b/listener/avalon_listener/tcs_listener.py index 2b3bed177..04cdf7f61 100755 --- a/listener/avalon_listener/tcs_listener.py +++ b/listener/avalon_listener/tcs_listener.py @@ -27,6 +27,7 @@ import sys import logging import argparse +from urllib.parse import urlparse from avalon_listener.tcs_work_order_handler import TCSWorkOrderHandler from avalon_listener.tcs_work_order_handler_sync import TCSWorkOrderHandlerSync @@ -72,8 +73,7 @@ def __init__(self, config): self.workorder_handler = TCSWorkOrderHandlerSync( self.kv_helper, config["Listener"]["max_work_order_count"], - config["Listener"]["zmq_url"], - config["Listener"]["zmq_port"]) + config["Listener"]["zmq_url"]) else: self.workorder_handler = TCSWorkOrderHandler( self.kv_helper, @@ -121,6 +121,13 @@ def parse_command_line(config, args): '--bind', help='URI to listen for requests ', type=str) parser.add_argument( '--lmdb_url', help='DB url to connect to LMDB ', type=str) + # Check if listener is running in sync work load + # execution mode then add additional argument zmq url + is_sync = config["WorkloadExecution"]["sync_workload_execution"] + if is_sync: + parser.add_argument( + '--zmq_url', + help='ZMQ url to connect to enclave manager ', type=str) options = parser.parse_args(args) @@ -150,6 +157,21 @@ def parse_command_line(config, args): logger.error("Quit : remote_storage_url is not \ present in config for Listener") sys.exit(-1) + if options.zmq_url: + if not is_sync: + logger.error("Invalid option zmq_url! It should be supported" + "in work order sync mode") + sys.exit(-1) + else: + if config.get("Listener") is None or \ + config["Listener"].get("zmq_url") is None: + logger.error("Quit : no zmq_url config found for Listener") + sys.exit(-1) + parse_res = urlparse(options.zmq_url) + if parse_res.scheme != "tcp" or parse_res.port == "": + logger.error("Invalid zmq url. It should tcp://:") + sys.exit(-1) + config["Listener"]["zmq_url"] = options.zmq_url return host_name, port diff --git a/listener/avalon_listener/tcs_work_order_handler_sync.py b/listener/avalon_listener/tcs_work_order_handler_sync.py index bdae34a88..cdca3438a 100755 --- a/listener/avalon_listener/tcs_work_order_handler_sync.py +++ b/listener/avalon_listener/tcs_work_order_handler_sync.py @@ -52,14 +52,13 @@ class TCSWorkOrderHandlerSync(TCSWorkOrderHandler): """ # ------------------------------------------------------------------------------------------------ - def __init__(self, kv_helper, max_wo_count, zmq_url, zmq_port): + def __init__(self, kv_helper, max_wo_count, zmq_url): """ Function to perform init activity Parameters: - kv_helper is a object of lmdb database """ self.zmq_url = zmq_url - self.zmq_port_number = zmq_port super(TCSWorkOrderHandlerSync, self).__init__(kv_helper, max_wo_count) # --------------------------------------------------------------------------------------------- @@ -145,11 +144,11 @@ def WorkOrderSubmit(self, **params): # ZeroMQ for sync workorder processing try: socket = context.socket(zmq.REQ) - socket.connect(self.zmq_url + self.zmq_port_number) + socket.connect(self.zmq_url) socket.send_string(wo_id, flags=0, encoding='utf-8') replymessage = socket.recv() logger.info(replymessage) - socket.disconnect(self.zmq_url + self.zmq_port_number) + socket.disconnect(self.zmq_url) except Exception as er: raise JSONRPCDispatchException( WorkOrderStatus.UNKNOWN_ERROR, diff --git a/listener/avalon_listener/tcs_workorder_receipt_handler.py b/listener/avalon_listener/tcs_workorder_receipt_handler.py index 1595de546..998b7c2ef 100644 --- a/listener/avalon_listener/tcs_workorder_receipt_handler.py +++ b/listener/avalon_listener/tcs_workorder_receipt_handler.py @@ -72,7 +72,7 @@ def WorkOrderReceiptCreate(self, **params): wo_id = params["workOrderId"] input_json_str = params["raw"] input_value = json.loads(input_json_str) - + # Check if work order id exists wo_request = self.kv_helper.get("wo-requests", wo_id) if wo_request is None: raise JSONRPCDispatchException( @@ -82,12 +82,22 @@ def WorkOrderReceiptCreate(self, **params): ) else: wo_receipt = self.kv_helper.get("wo-receipts", wo_id) - if wo_receipt is None: + wo_receipt = json.loads(wo_receipt) + if wo_receipt and "workOrderId" not in wo_receipt["params"]: status, err_msg = \ self.__validate_work_order_receipt_create_req( input_value, wo_request) if status is True: - self.kv_helper.set("wo-receipts", wo_id, input_json_str) + # If receiptUpdates doesn't exists then + # create an entry with empty array + if "receiptUpdates" not in wo_receipt["params"]: + input_value["params"]["receiptUpdates"] = [] + else: + input_value["params"]["receiptUpdates"] = \ + wo_receipt["receiptUpdates"] + self.kv_helper.set("wo-receipts", wo_id, json.dumps( + input_value + )) raise JSONRPCDispatchException( JRPCErrorCodes.SUCCESS, "Receipt created successfully" @@ -186,11 +196,14 @@ def WorkOrderReceiptUpdate(self, **params): status, err_msg = self.__validate_work_order_receipt_update_req( input_value) if status is True: + value = json.loads(value) # Load previous updates to receipt - updates_to_receipt = \ - self.kv_helper.get("wo-receipt-updates", wo_id) + if "receiptUpdates" in value["params"]: + updates_to_receipt = value["params"]["receiptUpdates"] + else: + updates_to_receipt = [] # If it is first update to receipt - if updates_to_receipt is None: + if len(updates_to_receipt) == 0: updated_receipt = [] else: updated_receipt = json.loads(updates_to_receipt) @@ -220,8 +233,9 @@ def WorkOrderReceiptUpdate(self, **params): " is not allowed" ) updated_receipt.append(input_value) - self.kv_helper.set("wo-receipt-updates", wo_id, - json.dumps(updated_receipt)) + input_value["receiptUpdates"] = updated_receipt + self.kv_helper.set("wo-receipts", wo_id, + json.dumps(input_value)) raise JSONRPCDispatchException( JRPCErrorCodes.SUCCESS, "Receipt updated successfully" @@ -377,15 +391,22 @@ def WorkOrderReceiptRetrieve(self, **params): value = self.kv_helper.get("wo-receipts", wo_id) if value: receipt = json.loads(value) - receipt_updates = self.kv_helper.get("wo-receipt-updates", wo_id) - if receipt_updates is None: + if "receiptUpdates" in receipt["params"]: + receipt_updates = receipt["params"]["receiptUpdates"] + # Remove "receiptUpdates" key from the receipt entry + # It has updates to receipt. + del receipt["params"]["receiptUpdates"] + else: + receipt_updates = [] + if len(receipt_updates) == 0: + # If there is no updates to receipt + # then current status is same as create status receipt["params"]["receiptCurrentStatus"] = \ receipt["params"]["receiptCreateStatus"] else: - receipt_updates_json = json.loads(receipt_updates) - # Get the recent update to receipt - last_receipt = receipt_updates_json[len(receipt_updates_json) - - 1] + # Get the latest update to receipt + last_receipt = receipt_updates[len(receipt_updates) + - 1] receipt["params"]["receiptCurrentStatus"] = \ last_receipt["updateType"] return receipt["params"] @@ -420,10 +441,12 @@ def WorkOrderReceiptUpdateRetrieve(self, **params): # starts from 1 update_index = input_params["updateIndex"] # Load list of updates to the receipt - receipt_updates = self.kv_helper.get("wo-receipt-updates", wo_id) + receipt_entry = self.kv_helper.get("wo-receipts", wo_id) - if receipt_updates: - receipt_updates_json = json.loads(receipt_updates) + if receipt_entry: + receipt_entry_json = json.loads(receipt_entry) + receipt_updates_json = \ + receipt_entry_json["params"]["receiptUpdates"] total_updates = len(receipt_updates_json) if update_index <= 0: raise JSONRPCDispatchException( diff --git a/listener/listener_config.toml b/listener/listener_config.toml index 3d74db61a..b2b7eb4ba 100644 --- a/listener/listener_config.toml +++ b/listener/listener_config.toml @@ -28,8 +28,7 @@ bind = "http://localhost:1947" max_work_order_count = 10 # ZMQ configurations the listener would connect to # Same as the url and port of enclave manager socket -zmq_url = "tcp://avalon-enclave-manager:" -zmq_port = '5555' +zmq_url = "tcp://localhost:5555" # ------------------------------------------------------------------ # Work load execution-settings for workload execution(synchronous/asynchronous) diff --git a/scripts/tcs_startup.sh b/scripts/tcs_startup.sh index 0acd7bac8..ee3a5fe19 100755 --- a/scripts/tcs_startup.sh +++ b/scripts/tcs_startup.sh @@ -23,19 +23,21 @@ COMPONENTS="$ENCLAVE_MANAGER" # #KV_STORAGE added if -s passed START_STOP_AVALON_SERVICES=0 # default if -s not passed LMDB_URL="http://localhost:9090" # -l default LISTENER_URL="http://localhost:1947" +ENCLAVE_ZMQ_URL="tcp://localhost:5555" # Trap handler trap 'stop_avalon_components' HUP INT QUIT ABRT ALRM TERM +is_sync_mode() +{ + return grep "sync_workload_execution" ${TCF_HOME}/listener/listener_config.toml | awk -F'=' '{print $2}' +} + start_avalon_components() { if [ $START_STOP_AVALON_SERVICES = 1 ] ; then echo "Starting Avalon KV Storage $VERSION ..." $KV_STORAGE --bind $LMDB_URL & echo "Avalon KV Storage started" - - echo "Starting Avalon Listener $VERSION ..." - $LISTENER --bind $LISTENER_URL --lmdb_url $LMDB_URL & - echo "Avalon Listener started" fi # START_STOP_AVALON_SERVICES doesn't control enclave manager. It will be @@ -44,6 +46,18 @@ start_avalon_components() python3 $ENCLAVE_MANAGER --lmdb_url $LMDB_URL & echo "Avalon Enclave Manager started" + if [ $START_STOP_AVALON_SERVICES = 1 ] ; then + echo "Starting Avalon Listener $VERSION ..." + is_sync_mode + is_sync_mode_on=$? + if [ "$is_sync_mode_on" -eq "1" ]; then + $LISTENER --bind $LISTENER_URL --lmdb_url $LMDB_URL --zmq_url $ENCLAVE_ZMQ_URL & + else + $LISTENER --bind $LISTENER_URL --lmdb_url $LMDB_URL & + fi + echo "Avalon Listener started" + fi + sleep 5s check_avalon_components diff --git a/tools/run_tests.sh b/tools/run_tests.sh index b1b337216..a3e0228d3 100755 --- a/tools/run_tests.sh +++ b/tools/run_tests.sh @@ -99,11 +99,10 @@ done #yell "Start testing echo client with reading registry from blockchain................" #yell "#------------------------------------------------------------------------------------------------" -#try $echo_client_path/echo_client.py -m "Hello world" -rs -dh yell "Start testing echo client with service uri ................" yell "#------------------------------------------------------------------------------------------------" -try $echo_client_path/echo_client.py -m "Hello world" -s "http://$LISTENER_URL:1947" -dh +try $echo_client_path/echo_client.py -m "Hello world" -s "http://$LISTENER_URL:1947" -dh -rs yell "Start testing generic client for echo workload ................" yell "#------------------------------------------------------------------------------------------------"