diff --git a/software/script/chameleon_cli_main.py b/software/script/chameleon_cli_main.py index a7153f4b..7cd22de2 100755 --- a/software/script/chameleon_cli_main.py +++ b/software/script/chameleon_cli_main.py @@ -73,7 +73,7 @@ def get_prompt(self): :return: current cmd prompt """ - device_string = f"{colorama.Fore.GREEN}USB" if self.device_com.isOpen( + device_string = f"{colorama.Fore.GREEN}USB" if self.device_com.is_open( ) else f"{colorama.Fore.RED}Offline" status = f"[{device_string}{colorama.Style.RESET_ALL}] chameleon --> " return status diff --git a/software/script/chameleon_cli_unit.py b/software/script/chameleon_cli_unit.py index 1ac759ad..48b1c0bf 100644 --- a/software/script/chameleon_cli_unit.py +++ b/software/script/chameleon_cli_unit.py @@ -118,7 +118,7 @@ def args_parser(self) -> ArgumentParserNoExit or None: raise NotImplementedError("Please implement this") def before_exec(self, args: argparse.Namespace): - ret = self.device_com.isOpen() + ret = self.device_com.is_open() if ret: return True else: @@ -182,7 +182,8 @@ def on_exec(self, args: argparse.Namespace): class HWConnect(BaseCLIUnit): def args_parser(self) -> ArgumentParserNoExit or None: parser = ArgumentParserNoExit() - parser.add_argument('-p', '--port', type=str, required=False) + parser.add_argument('-p', '--port', type=str, required=False, help='Serial port name/path') + parser.add_argument('-t', '--tcp', type=str, required=False, help='TCP server to connect to (host:port)') return parser def before_exec(self, args: argparse.Namespace): @@ -190,7 +191,7 @@ def before_exec(self, args: argparse.Namespace): def on_exec(self, args: argparse.Namespace): try: - if args.port is None: # Chameleon auto-detect if no port is supplied + if args.port is None and args.tcp is None: # Chameleon auto-detect if no port is supplied platform_name = uname().release if 'Microsoft' in platform_name: path = os.environ["PATH"].split(os.pathsep) @@ -221,9 +222,13 @@ def on_exec(self, args: argparse.Namespace): args.port = port.device break if args.port is None: # If no chameleon was found, exit - print("Chameleon not found, please connect the device or try connecting manually with the -p flag.") + print("Chameleon not found, please connect the device or try connecting manually with the -p or -t flag.") return - self.device_com.open(args.port) + + if args.tcp: + self.device_com.open(chameleon_com.ChameleonTCPTransport(args.tcp)) + else: + self.device_com.open(chameleon_com.ChameleonSerialTransport(args.port)) print(" { Chameleon connected } ") except Exception as e: print(f"Chameleon Connect fail: {str(e)}") diff --git a/software/script/chameleon_com.py b/software/script/chameleon_com.py index 1acd0ee2..4d369624 100644 --- a/software/script/chameleon_com.py +++ b/software/script/chameleon_com.py @@ -4,6 +4,7 @@ import time import serial import chameleon_status +import socket class NotOpenException(Exception): @@ -35,6 +36,79 @@ def __init__(self, cmd, status, data): self.data: bytearray = data +class ChameleonTransport: + def __init__(self): + pass + + def is_open(self): + return False + + def open(self): + raise NotImplementedError('Please implement this') + + def close(self): + pass + + def write(self, data): + raise NotImplementedError('Please implement this') + + def read(self): + raise NotImplementedError('Please implement this') + + +class ChameleonTCPTransport(ChameleonTransport): + def __init__(self, connection_string: str): + self.connection_string = connection_string + self.socket = None + + def is_open(self): + return self.socket is not None + + def open(self): + host, _, port = self.connection_string.rpartition(':') + self.socket = socket.socket(socket.AF_INET, type=socket.SOCK_STREAM) + self.socket.connect((host, int(port))) + + def close(self): + self.socket.close() + self.socket = None + + def write(self, data): + self.socket.send(data) + + def read(self, bufsize=1): # Bufsize matching serial + return self.socket.recv(bufsize) + + +class ChameleonSerialTransport(ChameleonTransport): + def __init__(self, port): + self.port = port + self.serial_instance = None + + def is_open(self): + return self.serial_instance is not None and self.serial_instance.isOpen() + + def open(self): + self.serial_instance = serial.Serial(port=self.port, baudrate=115200) + + try: + self.serial_instance.dtr = 1 # must make dtr enable + except Exception: + # not all serial support dtr, e.g. virtual serial over BLE + pass + self.serial_instance.timeout = 0 # do not block + + def close(self): + self.serial_instance.close() + self.serial_instance = None + + def write(self, data): + self.serial_instance.write(data) + + def read(self, size=1): + return self.serial_instance.read(size=size) + + class ChameleonCom: """ Chameleon device base class @@ -51,37 +125,33 @@ def __init__(self): self.send_data_queue = queue.Queue() self.wait_response_map = {} self.event_closing = threading.Event() + self.transport: ChameleonTransport | None = None - def isOpen(self): + def is_open(self): """ Chameleon is connected and init. :return: """ - return self.serial_instance is not None and self.serial_instance.isOpen() + return self.transport is not None and self.transport.is_open() - def open(self, port): + def open(self, transport): """ Open chameleon port to communication And init some variables :param port: com port, comXXX or ttyXXX :return: """ - if not self.isOpen(): + if not self.is_open(): error = None try: - # open serial port - self.serial_instance = serial.Serial(port=port, baudrate=115200) + self.transport = transport + self.transport.open() except Exception as e: error = e finally: if error is not None: raise OpenFailException(error) - try: - self.serial_instance.dtr = 1 # must make dtr enable - except Exception: - # not all serial support dtr, e.g. virtual serial over BLE - pass - self.serial_instance.timeout = 0 # do not block + # clear variable self.send_data_queue.queue.clear() self.wait_response_map.clear() @@ -97,7 +167,7 @@ def check_open(self): :return: """ - if not self.isOpen(): + if not self.is_open(): raise NotOpenException("Please call open() function to start device.") @staticmethod @@ -121,11 +191,11 @@ def close(self): """ self.event_closing.set() try: - self.serial_instance.close() + self.transport.close() except Exception: pass finally: - self.serial_instance = None + self.transport = None self.wait_response_map.clear() self.send_data_queue.queue.clear() @@ -140,10 +210,10 @@ def thread_data_receive(self): data_status = 0x0000 data_length = 0x0000 - while self.isOpen(): + while self.is_open(): # receive try: - data_bytes = self.serial_instance.read() + data_bytes = self.transport.read() except Exception as e: if not self.event_closing.is_set(): print(f"Serial Error {e}, thread for receiver exit.") @@ -216,7 +286,7 @@ def thread_data_transfer(self): SubThread to transfer data to chameleon device :return: """ - while self.isOpen(): + while self.is_open(): # get a task from queue(if exists) if self.send_data_queue.empty(): time.sleep(0.001) @@ -237,7 +307,7 @@ def thread_data_transfer(self): self.wait_response_map[task_cmd]['is_timeout'] = False try: # send to device - self.serial_instance.write(task['frame']) + self.transport.write(task['frame']) except Exception as e: print(f"Serial Error {e}, thread for transfer exit.") self.close() @@ -253,7 +323,7 @@ def thread_check_timeout(self): Check task timeout :return: """ - while self.isOpen(): + while self.is_open(): for task_cmd in self.wait_response_map.keys(): if time.time() > self.wait_response_map[task_cmd]['end_time']: if 'callback' in self.wait_response_map[task_cmd]: