diff --git a/packets/log_header.py b/packets/log_header.py index 6eab097..5ebe2fa 100644 --- a/packets/log_header.py +++ b/packets/log_header.py @@ -47,6 +47,10 @@ def appname(self, appname: str) -> None: def packet_type(self) -> PacketType: return PacketType.LOG_HEADER + @property + def values(self): + return self._values + def add_value(self, key: str, value: str) -> None: if not isinstance(key, str): raise TypeError("key must be an str") diff --git a/protocols/cloud/cloud_protocol.py b/protocols/cloud/cloud_protocol.py index a66500e..586f7d0 100644 --- a/protocols/cloud/cloud_protocol.py +++ b/protocols/cloud/cloud_protocol.py @@ -1,6 +1,6 @@ -import re import collections import logging +import re import threading import uuid from datetime import datetime @@ -11,10 +11,10 @@ from packets.log_header import LogHeader from packets.packet import Packet from packets.packet_type import PacketType +from protocols.cloud.chunk import Chunk +from protocols.cloud.exceptions import * from protocols.cloud.scheduled_executor import ScheduledExecutor from protocols.tcp_protocol import TcpProtocol -from protocols.cloud.exceptions import * -from protocols.cloud.chunk import Chunk logger = logging.getLogger(__name__) @@ -317,7 +317,13 @@ def _do_rotate_virtual_file_id(self) -> None: self._virtual_file_id = uuid.uuid4() self._virtual_file_size = 0 + logger.debug("Composing new log header at virtual file rotation") log_header = self._compose_log_header_packet() + logger.debug( + "New log header id {} NOT stored as protocol's reconnect log header and is written (into the queue)".format( + log_header.values.get("virtualfileid"))) + + # self._reconnect_log_header = log_header super().write_packet(log_header) def connect(self) -> None: diff --git a/protocols/protocol.py b/protocols/protocol.py index af45718..e42b0b9 100644 --- a/protocols/protocol.py +++ b/protocols/protocol.py @@ -2,6 +2,8 @@ import logging import threading import time +import typing +from typing import Optional from common.events.error_event import ErrorEvent from common.exceptions import ProtocolError, SmartInspectError @@ -18,6 +20,7 @@ from packets.log_header import LogHeader from packets.packet import Packet from packets.packet_queue import PacketQueue +from packets.packet_type import PacketType from scheduler.scheduler import Scheduler from scheduler.scheduler_action import SchedulerAction from scheduler.scheduler_command import SchedulerCommand @@ -44,6 +47,7 @@ def __init__(self): self.__initialized = False self.__failed = False self.__backlog_enabled = False + self._reconnect_log_header = None def __create_options(self, options: str) -> None: try: @@ -141,15 +145,25 @@ def _compose_log_header_packet(self) -> LogHeader: return log_header - def _internal_write_log_header(self) -> None: - log_header = self._compose_log_header_packet() - logger.debug("Writing LogHeader with values %s", log_header._values.items()) - self._internal_write_packet(log_header) + # def _internal_write_log_header(self, connect_log_header: typing.Optional[LogHeader] = None) -> None: + # if connect_log_header is None and self._reconnect_log_header is None: + # log_header = self._compose_log_header_packet() + # elif connect_log_header: + # log_header = connect_log_header + # + # self._internal_write_packet(log_header) + + def _internal_write_connect_log_header(self, connect_log_header: typing.Optional[LogHeader] = None) -> None: + if connect_log_header is None: + logger.debug("Connection log header is None, composing a new one") + connect_log_header = self._compose_log_header_packet() + logger.debug("Writing log header with vf_id {}".format(connect_log_header.values.get("virtualfileid"))) + self._internal_write_packet(connect_log_header) def _internal_write_packet(self, packet: Packet): pass - def _internal_connect(self): + def _internal_connect(self, connect_log_header: Optional[LogHeader] = None): pass @property @@ -197,15 +211,28 @@ def disconnect(self) -> None: else: self._impl_disconnect() - def _impl_connect(self): + def _impl_connect(self, connect_log_header: Optional[LogHeader] = None): if self.__keep_open and not self._connected: try: try: - self._internal_connect() + self._internal_connect(connect_log_header) + logger.debug(f"{self.__class__.__name__} connected succesfully.") + + logger.debug( + "Storing successful log header with vf_id {} as reconnect log header.".format( + None if connect_log_header is None else connect_log_header.values.get("virtualfileid")) + ) + self._reconnect_log_header = connect_log_header self._connected = True self.__failed = False - logger.debug(f"{self.__class__.__name__} connected succesfully") + except Exception as exception: + logger.debug(f"{self.__class__.__name__} failed to connect.") + logger.debug( + "Storing log header with vf_id {} used at connection as reconnect log header.".format( + None if connect_log_header is None else connect_log_header.values.get("virtualfileid")) + ) + self._reconnect_log_header = connect_log_header self._reset() raise exception except Exception as exception: @@ -310,6 +337,11 @@ def __stop_scheduler(self): def __schedule_connect(self) -> None: command = SchedulerCommand() command.action = SchedulerAction.CONNECT + log_header = self._compose_log_header_packet() + logger.debug( + "Scheduling connection. with vf_id {} composed, scheduling into SchedulerQueue".format( + log_header.values["virtualfileid"])) + command.state = log_header self.__scheduler.schedule(command, SchedulerQueueEnd.TAIL) def get_caption(self) -> str: @@ -370,7 +402,10 @@ def remove_listener(self, listener: ProtocolListener): self.__listeners.remove(listener) def _internal_reconnect(self) -> bool: - self._internal_connect() + logger.debug( + "Using current reconnect log header with vf_id {} during reconnect".format( + None if self._reconnect_log_header is None else self._reconnect_log_header.values.get("virtualfileid"))) + self._internal_connect(self._reconnect_log_header) return True def _internal_disconnect(self) -> None: @@ -382,10 +417,17 @@ def __flush_queue(self) -> None: self.__forward_packet(packet, False) packet = self.__queue.pop() - def __forward_packet(self, packet: Packet, disconnect: bool) -> None: + def __forward_packet(self, packet: [Packet, LogHeader], disconnect: bool) -> None: if not self._connected: if not self.__keep_open: - self._internal_connect() + self._internal_connect(self._reconnect_log_header) + if packet.packet_type == PacketType.LOG_HEADER: + self._reconnect_log_header = packet + logger.debug( + "Updating reconnect log header - logheader packet with vf_id {} was forwarded.".format( + packet.values.get("virtualfileid")) + ) + self._connected = True self.__failed = False else: @@ -402,7 +444,7 @@ def __forward_packet(self, packet: Packet, disconnect: bool) -> None: self._connected = False self._internal_disconnect() - def __do_reconnect(self) -> None: + def __do_reconnect(self, connect_log_header: typing.Optional[LogHeader] = None) -> None: if self.__reconnect_interval > 0: tick_count = time.time() * 1000 if tick_count - self.__reconnect_tick_count < self.__reconnect_interval: @@ -412,6 +454,11 @@ def __do_reconnect(self) -> None: try: if self._internal_reconnect(): self._connected = True + logger.debug( + "Reconnect successful - storing log header with vf_id {} as current reconnect log header".format( + None if connect_log_header is None else connect_log_header.values.get("virtualfileid")) + ) + self._reconnect_log_header = connect_log_header except Exception: pass # Reconnect exceptions are not reported, diff --git a/protocols/tcp_protocol.py b/protocols/tcp_protocol.py index 41f0022..3578ac8 100644 --- a/protocols/tcp_protocol.py +++ b/protocols/tcp_protocol.py @@ -1,10 +1,12 @@ # Copyright (C) Code Partners Pty. Ltd. All rights reserved. # import logging import socket +import typing from common.exceptions import SmartInspectError from connections.builders import ConnectionsBuilder from formatters.binary_formatter import BinaryFormatter +from packets.log_header import LogHeader from packets.packet import Packet from protocols.protocol import Protocol @@ -61,7 +63,7 @@ def _send_client_banner(self) -> None: self.__stream.write(self.__CLIENT_BANNER) self.__stream.flush() - def _internal_connect(self): + def _internal_connect(self, connect_log_header: typing.Optional[LogHeader] = None): try: self.__socket = self._internal_initialize_socket() except Exception as e: @@ -71,7 +73,7 @@ def _internal_connect(self): self.__stream = self.__socket.makefile("rwb", self.__BUFFER_SIZE) self._do_handshake() - self._internal_write_log_header() + self._internal_write_connect_log_header(connect_log_header) def _internal_initialize_socket(self) -> socket.socket: socket_ = socket.socket(socket.AF_INET, socket.SOCK_STREAM) diff --git a/scheduler/scheduler.py b/scheduler/scheduler.py index 4eaf7e4..dc9d31d 100644 --- a/scheduler/scheduler.py +++ b/scheduler/scheduler.py @@ -31,8 +31,8 @@ def run(self) -> None: if not self.run_commands(count): break - from protocols.cloud.cloud_protocol import CloudProtocol - if isinstance(self.parent.protocol, CloudProtocol): + from protocols.tcp_protocol import TcpProtocol + if isinstance(self.parent.protocol, TcpProtocol): if self.consecutive_packet_write_fail_count > 0: try: @@ -65,7 +65,12 @@ def __run_command(self, command: SchedulerCommand) -> None: # noinspection PyBroadException try: if action == SchedulerAction.CONNECT: - protocol._impl_connect() + connect_log_header: LogHeader = command.state + logger.debug( + "Received CONNECT command with log header vf_id {}. Using it to connect".format( + connect_log_header.values.get("virtualfileid"))) + + protocol._impl_connect(connect_log_header) elif action == SchedulerAction.WRITE_PACKET: self.__write_packet_action(command) elif action == SchedulerAction.DISCONNECT: @@ -83,12 +88,14 @@ def __write_packet_action(self, command): protocol = self.parent.protocol protocol._impl_write_packet(packet) - from protocols.cloud.cloud_protocol import CloudProtocol - if isinstance(protocol, CloudProtocol) and protocol.failed: + from protocols.tcp_protocol import TcpProtocol + if isinstance(protocol, TcpProtocol) and protocol.failed: - if not protocol.is_reconnect_allowed(): - logging.debug("Reconnect is disabled, no need to requeue packet we failed to send") - return + from protocols.cloud.cloud_protocol import CloudProtocol + if isinstance(protocol, CloudProtocol) and protocol.failed: + if not protocol.is_reconnect_allowed(): + logging.debug("Reconnect is disabled, no need to requeue packet we failed to send") + return self.consecutive_packet_write_fail_count += 1 logging.debug("Sending packet failed, scheduling again to the head of the queue, " @@ -106,7 +113,7 @@ def __write_packet_action(self, command): class Scheduler: __BUFFER_SIZE = 0x10 - __CLOUD_PROTOCOL_BUFFER_SIZE = 0x1 + __TCP_PROTOCOL_BUFFER_SIZE = 0x1 def __init__(self, protocol): super().__init__() @@ -114,12 +121,12 @@ def __init__(self, protocol): self.__condition = threading.Condition() self.__queue = SchedulerQueue() - # if protocol is CloudProtocol - respective buffer size is set - from protocols.cloud.cloud_protocol import CloudProtocol + # if protocol is TcpProtocol - respective buffer size is set + from protocols.tcp_protocol import TcpProtocol self.__buffer: List[Optional[SchedulerCommand]] = [ [None] * self.__BUFFER_SIZE, - [None] * self.__CLOUD_PROTOCOL_BUFFER_SIZE, - ][isinstance(self.__protocol, CloudProtocol)] + [None] * self.__TCP_PROTOCOL_BUFFER_SIZE, + ][isinstance(self.__protocol, TcpProtocol)] self.__started: bool = False self.__stopped: bool = False diff --git a/scheduler/scheduler_command.py b/scheduler/scheduler_command.py index 24273f0..e2305b7 100644 --- a/scheduler/scheduler_command.py +++ b/scheduler/scheduler_command.py @@ -1,8 +1,9 @@ from typing import Optional, Union from common.protocol_command import ProtocolCommand -from scheduler.scheduler_action import SchedulerAction +from packets.log_header import LogHeader from packets.packet import Packet +from scheduler.scheduler_action import SchedulerAction class SchedulerCommand: @@ -20,7 +21,7 @@ def action(self, action: SchedulerAction) -> None: self.__action = action @property - def state(self) -> Union[ProtocolCommand, Packet, object]: + def state(self) -> Union[ProtocolCommand, Packet, LogHeader, object]: return self.__state @state.setter