From a851e53679edf1063565cf659d99c000860cff51 Mon Sep 17 00:00:00 2001 From: Sergey Udachin Date: Mon, 12 Feb 2024 15:15:59 +0300 Subject: [PATCH 1/6] bugfix: fix virtualfile_id rotation consistency in async mode --- protocols/cloud/cloud_protocol.py | 7 +++--- protocols/protocol.py | 42 +++++++++++++++++++++++-------- protocols/tcp_protocol.py | 6 +++-- scheduler/scheduler.py | 29 +++++++++++---------- 4 files changed, 55 insertions(+), 29 deletions(-) diff --git a/protocols/cloud/cloud_protocol.py b/protocols/cloud/cloud_protocol.py index a66500e..05505a9 100644 --- a/protocols/cloud/cloud_protocol.py +++ b/protocols/cloud/cloud_protocol.py @@ -1,6 +1,6 @@ -import re import collections import logging +import re import threading import uuid from datetime import datetime @@ -11,10 +11,10 @@ from packets.log_header import LogHeader from packets.packet import Packet from packets.packet_type import PacketType +from protocols.cloud.chunk import Chunk +from protocols.cloud.exceptions import * from protocols.cloud.scheduled_executor import ScheduledExecutor from protocols.tcp_protocol import TcpProtocol -from protocols.cloud.exceptions import * -from protocols.cloud.chunk import Chunk logger = logging.getLogger(__name__) @@ -318,6 +318,7 @@ def _do_rotate_virtual_file_id(self) -> None: self._virtual_file_size = 0 log_header = self._compose_log_header_packet() + self._reconnect_log_header = log_header super().write_packet(log_header) def connect(self) -> None: diff --git a/protocols/protocol.py b/protocols/protocol.py index af45718..72379ea 100644 --- a/protocols/protocol.py +++ b/protocols/protocol.py @@ -2,6 +2,8 @@ import logging import threading import time +import typing +from typing import Optional from common.events.error_event import ErrorEvent from common.exceptions import ProtocolError, SmartInspectError @@ -18,6 +20,7 @@ from packets.log_header import LogHeader from packets.packet import Packet from packets.packet_queue import PacketQueue +from packets.packet_type import PacketType from scheduler.scheduler import Scheduler from scheduler.scheduler_action import SchedulerAction from scheduler.scheduler_command import SchedulerCommand @@ -44,6 +47,7 @@ def __init__(self): self.__initialized = False self.__failed = False self.__backlog_enabled = False + self._reconnect_log_header = None def __create_options(self, options: str) -> None: try: @@ -141,15 +145,23 @@ def _compose_log_header_packet(self) -> LogHeader: return log_header - def _internal_write_log_header(self) -> None: - log_header = self._compose_log_header_packet() - logger.debug("Writing LogHeader with values %s", log_header._values.items()) - self._internal_write_packet(log_header) + # def _internal_write_log_header(self, connect_log_header: typing.Optional[LogHeader] = None) -> None: + # if connect_log_header is None and self._reconnect_log_header is None: + # log_header = self._compose_log_header_packet() + # elif connect_log_header: + # log_header = connect_log_header + # + # self._internal_write_packet(log_header) + + def _internal_write_connect_log_header(self, connect_log_header: typing.Optional[LogHeader] = None) -> None: + if connect_log_header is None: + connect_log_header = self._compose_log_header_packet() + self._internal_write_packet(connect_log_header) def _internal_write_packet(self, packet: Packet): pass - def _internal_connect(self): + def _internal_connect(self, connect_log_header: Optional[LogHeader] = None): pass @property @@ -197,15 +209,17 @@ def disconnect(self) -> None: else: self._impl_disconnect() - def _impl_connect(self): + def _impl_connect(self, connect_log_header: Optional[LogHeader] = None): if self.__keep_open and not self._connected: try: try: - self._internal_connect() + self._internal_connect(connect_log_header) + self._reconnect_log_header = connect_log_header self._connected = True self.__failed = False logger.debug(f"{self.__class__.__name__} connected succesfully") except Exception as exception: + self._reconnect_log_header = connect_log_header self._reset() raise exception except Exception as exception: @@ -310,6 +324,9 @@ def __stop_scheduler(self): def __schedule_connect(self) -> None: command = SchedulerCommand() command.action = SchedulerAction.CONNECT + + log_header = self._compose_log_header_packet() + command.state = log_header self.__scheduler.schedule(command, SchedulerQueueEnd.TAIL) def get_caption(self) -> str: @@ -370,7 +387,7 @@ def remove_listener(self, listener: ProtocolListener): self.__listeners.remove(listener) def _internal_reconnect(self) -> bool: - self._internal_connect() + self._internal_connect(self._reconnect_log_header) return True def _internal_disconnect(self) -> None: @@ -385,7 +402,9 @@ def __flush_queue(self) -> None: def __forward_packet(self, packet: Packet, disconnect: bool) -> None: if not self._connected: if not self.__keep_open: - self._internal_connect() + self._internal_connect(self._reconnect_log_header) + if packet.packet_type == PacketType.LOG_HEADER: + self._reconnect_log_header = packet self._connected = True self.__failed = False else: @@ -402,7 +421,7 @@ def __forward_packet(self, packet: Packet, disconnect: bool) -> None: self._connected = False self._internal_disconnect() - def __do_reconnect(self) -> None: + def __do_reconnect(self, connect_log_header: typing.Optional[LogHeader] = None) -> None: if self.__reconnect_interval > 0: tick_count = time.time() * 1000 if tick_count - self.__reconnect_tick_count < self.__reconnect_interval: @@ -412,7 +431,8 @@ def __do_reconnect(self) -> None: try: if self._internal_reconnect(): self._connected = True - except Exception: + self._reconnect_log_header = connect_log_header + except Exception as e: pass # Reconnect exceptions are not reported, # but we need to record that the last connection attempt diff --git a/protocols/tcp_protocol.py b/protocols/tcp_protocol.py index 41f0022..3578ac8 100644 --- a/protocols/tcp_protocol.py +++ b/protocols/tcp_protocol.py @@ -1,10 +1,12 @@ # Copyright (C) Code Partners Pty. Ltd. All rights reserved. # import logging import socket +import typing from common.exceptions import SmartInspectError from connections.builders import ConnectionsBuilder from formatters.binary_formatter import BinaryFormatter +from packets.log_header import LogHeader from packets.packet import Packet from protocols.protocol import Protocol @@ -61,7 +63,7 @@ def _send_client_banner(self) -> None: self.__stream.write(self.__CLIENT_BANNER) self.__stream.flush() - def _internal_connect(self): + def _internal_connect(self, connect_log_header: typing.Optional[LogHeader] = None): try: self.__socket = self._internal_initialize_socket() except Exception as e: @@ -71,7 +73,7 @@ def _internal_connect(self): self.__stream = self.__socket.makefile("rwb", self.__BUFFER_SIZE) self._do_handshake() - self._internal_write_log_header() + self._internal_write_connect_log_header(connect_log_header) def _internal_initialize_socket(self) -> socket.socket: socket_ = socket.socket(socket.AF_INET, socket.SOCK_STREAM) diff --git a/scheduler/scheduler.py b/scheduler/scheduler.py index 4eaf7e4..1bc55e3 100644 --- a/scheduler/scheduler.py +++ b/scheduler/scheduler.py @@ -31,8 +31,8 @@ def run(self) -> None: if not self.run_commands(count): break - from protocols.cloud.cloud_protocol import CloudProtocol - if isinstance(self.parent.protocol, CloudProtocol): + from protocols.tcp_protocol import TcpProtocol + if isinstance(self.parent.protocol, TcpProtocol): if self.consecutive_packet_write_fail_count > 0: try: @@ -65,7 +65,8 @@ def __run_command(self, command: SchedulerCommand) -> None: # noinspection PyBroadException try: if action == SchedulerAction.CONNECT: - protocol._impl_connect() + connect_log_header = command.state + protocol._impl_connect(connect_log_header) elif action == SchedulerAction.WRITE_PACKET: self.__write_packet_action(command) elif action == SchedulerAction.DISCONNECT: @@ -83,12 +84,14 @@ def __write_packet_action(self, command): protocol = self.parent.protocol protocol._impl_write_packet(packet) - from protocols.cloud.cloud_protocol import CloudProtocol - if isinstance(protocol, CloudProtocol) and protocol.failed: + from protocols.tcp_protocol import TcpProtocol + if isinstance(protocol, TcpProtocol) and protocol.failed: - if not protocol.is_reconnect_allowed(): - logging.debug("Reconnect is disabled, no need to requeue packet we failed to send") - return + from protocols.cloud.cloud_protocol import CloudProtocol + if isinstance(protocol, CloudProtocol) and protocol.failed: + if not protocol.is_reconnect_allowed(): + logging.debug("Reconnect is disabled, no need to requeue packet we failed to send") + return self.consecutive_packet_write_fail_count += 1 logging.debug("Sending packet failed, scheduling again to the head of the queue, " @@ -106,7 +109,7 @@ def __write_packet_action(self, command): class Scheduler: __BUFFER_SIZE = 0x10 - __CLOUD_PROTOCOL_BUFFER_SIZE = 0x1 + __TCP_PROTOCOL_BUFFER_SIZE = 0x1 def __init__(self, protocol): super().__init__() @@ -114,12 +117,12 @@ def __init__(self, protocol): self.__condition = threading.Condition() self.__queue = SchedulerQueue() - # if protocol is CloudProtocol - respective buffer size is set - from protocols.cloud.cloud_protocol import CloudProtocol + # if protocol is TcpProtocol - respective buffer size is set + from protocols.tcp_protocol import TcpProtocol self.__buffer: List[Optional[SchedulerCommand]] = [ [None] * self.__BUFFER_SIZE, - [None] * self.__CLOUD_PROTOCOL_BUFFER_SIZE, - ][isinstance(self.__protocol, CloudProtocol)] + [None] * self.__TCP_PROTOCOL_BUFFER_SIZE, + ][isinstance(self.__protocol, TcpProtocol)] self.__started: bool = False self.__stopped: bool = False From 6e7ecc9fbfca6e3a8bdbab16c8383aca9e400575 Mon Sep 17 00:00:00 2001 From: Sergey Udachin Date: Tue, 13 Feb 2024 14:10:40 +0300 Subject: [PATCH 2/6] chore: add logging --- protocols/cloud/cloud_protocol.py | 1 + 1 file changed, 1 insertion(+) diff --git a/protocols/cloud/cloud_protocol.py b/protocols/cloud/cloud_protocol.py index 05505a9..549eb69 100644 --- a/protocols/cloud/cloud_protocol.py +++ b/protocols/cloud/cloud_protocol.py @@ -317,6 +317,7 @@ def _do_rotate_virtual_file_id(self) -> None: self._virtual_file_id = uuid.uuid4() self._virtual_file_size = 0 + logger.debug("Composing new log header at virtual file rotation") log_header = self._compose_log_header_packet() self._reconnect_log_header = log_header super().write_packet(log_header) From 529ab9c14edff6d64dde509da8f5c4f12ebe7e59 Mon Sep 17 00:00:00 2001 From: Sergey Udachin Date: Tue, 13 Feb 2024 14:13:31 +0300 Subject: [PATCH 3/6] chore: add logging --- protocols/protocol.py | 28 ++++++++++++++++++++++++++-- scheduler/scheduler.py | 3 +++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/protocols/protocol.py b/protocols/protocol.py index 72379ea..6b9fb6a 100644 --- a/protocols/protocol.py +++ b/protocols/protocol.py @@ -155,7 +155,9 @@ def _compose_log_header_packet(self) -> LogHeader: def _internal_write_connect_log_header(self, connect_log_header: typing.Optional[LogHeader] = None) -> None: if connect_log_header is None: + logger.debug("Connection log header is None, composing a new one") connect_log_header = self._compose_log_header_packet() + logger.debug("Writing log header with id {}".format(id(connect_log_header))) self._internal_write_packet(connect_log_header) def _internal_write_packet(self, packet: Packet): @@ -214,11 +216,21 @@ def _impl_connect(self, connect_log_header: Optional[LogHeader] = None): try: try: self._internal_connect(connect_log_header) + logger.debug(f"{self.__class__.__name__} connected succesfully.") + logger.debug( + "Storing successful log header with id {} as reconnect log header.".format( + id(connect_log_header)) + ) self._reconnect_log_header = connect_log_header self._connected = True self.__failed = False - logger.debug(f"{self.__class__.__name__} connected succesfully") + except Exception as exception: + logger.debug(f"{self.__class__.__name__} failed to connect.") + logger.debug( + "Storing log header with id {} used at connection as reconnect log header.".format( + id(connect_log_header)) + ) self._reconnect_log_header = connect_log_header self._reset() raise exception @@ -324,8 +336,10 @@ def __stop_scheduler(self): def __schedule_connect(self) -> None: command = SchedulerCommand() command.action = SchedulerAction.CONNECT - log_header = self._compose_log_header_packet() + logger.debug( + "Scheduling connection. Log header with id {} composed, scheduling into SchedulerQueue".format( + id(log_header))) command.state = log_header self.__scheduler.schedule(command, SchedulerQueueEnd.TAIL) @@ -387,6 +401,8 @@ def remove_listener(self, listener: ProtocolListener): self.__listeners.remove(listener) def _internal_reconnect(self) -> bool: + logger.debug( + "Using current reconnect log header with id {} during reconnect".format(id(self._reconnect_log_header))) self._internal_connect(self._reconnect_log_header) return True @@ -405,6 +421,10 @@ def __forward_packet(self, packet: Packet, disconnect: bool) -> None: self._internal_connect(self._reconnect_log_header) if packet.packet_type == PacketType.LOG_HEADER: self._reconnect_log_header = packet + logger.debug( + "Updating reconnect log header - logheader packet with id {} was forwarded.".format( + id(packet)) + ) self._connected = True self.__failed = False else: @@ -431,6 +451,10 @@ def __do_reconnect(self, connect_log_header: typing.Optional[LogHeader] = None) try: if self._internal_reconnect(): self._connected = True + logger.debug( + "Reconnect successful - storing log header with id {} as current reconnect log header".format( + connect_log_header) + ) self._reconnect_log_header = connect_log_header except Exception as e: pass diff --git a/scheduler/scheduler.py b/scheduler/scheduler.py index 1bc55e3..e14dc92 100644 --- a/scheduler/scheduler.py +++ b/scheduler/scheduler.py @@ -66,6 +66,9 @@ def __run_command(self, command: SchedulerCommand) -> None: try: if action == SchedulerAction.CONNECT: connect_log_header = command.state + logger.debug( + "Received CONNECT command with log header id {}. Using it to connect".format( + id(connect_log_header))) protocol._impl_connect(connect_log_header) elif action == SchedulerAction.WRITE_PACKET: self.__write_packet_action(command) From fb625ceb27d151818e7dd54870696fb37090d4a3 Mon Sep 17 00:00:00 2001 From: Sergey Udachin Date: Tue, 13 Feb 2024 14:14:22 +0300 Subject: [PATCH 4/6] fix: reconnect_log_header need not be updated at log_header compose time --- protocols/cloud/cloud_protocol.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/protocols/cloud/cloud_protocol.py b/protocols/cloud/cloud_protocol.py index 549eb69..bbbeaef 100644 --- a/protocols/cloud/cloud_protocol.py +++ b/protocols/cloud/cloud_protocol.py @@ -319,7 +319,10 @@ def _do_rotate_virtual_file_id(self) -> None: logger.debug("Composing new log header at virtual file rotation") log_header = self._compose_log_header_packet() - self._reconnect_log_header = log_header + logger.debug( + "New log header id {} NOT stored as protocol's reconnect log header and is written (into the queue)".format( + id(log_header))) + # self._reconnect_log_header = log_header super().write_packet(log_header) def connect(self) -> None: From 098ba6b59cc1259bc3b511465da11b91808d1239 Mon Sep 17 00:00:00 2001 From: Sergey Udachin Date: Tue, 13 Feb 2024 16:08:18 +0300 Subject: [PATCH 5/6] feat: add `values` property getter to read values for debugging purposes --- packets/log_header.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packets/log_header.py b/packets/log_header.py index 6eab097..5ebe2fa 100644 --- a/packets/log_header.py +++ b/packets/log_header.py @@ -47,6 +47,10 @@ def appname(self, appname: str) -> None: def packet_type(self) -> PacketType: return PacketType.LOG_HEADER + @property + def values(self): + return self._values + def add_value(self, key: str, value: str) -> None: if not isinstance(key, str): raise TypeError("key must be an str") From 9ad4c6ae6cb5cb551a06014357838ca8dc845a99 Mon Sep 17 00:00:00 2001 From: Sergey Udachin Date: Tue, 13 Feb 2024 16:09:45 +0300 Subject: [PATCH 6/6] chore: improve logs readability --- protocols/cloud/cloud_protocol.py | 3 ++- protocols/protocol.py | 31 +++++++++++++++++-------------- scheduler/scheduler.py | 7 ++++--- scheduler/scheduler_command.py | 5 +++-- 4 files changed, 26 insertions(+), 20 deletions(-) diff --git a/protocols/cloud/cloud_protocol.py b/protocols/cloud/cloud_protocol.py index bbbeaef..586f7d0 100644 --- a/protocols/cloud/cloud_protocol.py +++ b/protocols/cloud/cloud_protocol.py @@ -321,7 +321,8 @@ def _do_rotate_virtual_file_id(self) -> None: log_header = self._compose_log_header_packet() logger.debug( "New log header id {} NOT stored as protocol's reconnect log header and is written (into the queue)".format( - id(log_header))) + log_header.values.get("virtualfileid"))) + # self._reconnect_log_header = log_header super().write_packet(log_header) diff --git a/protocols/protocol.py b/protocols/protocol.py index 6b9fb6a..e42b0b9 100644 --- a/protocols/protocol.py +++ b/protocols/protocol.py @@ -157,7 +157,7 @@ def _internal_write_connect_log_header(self, connect_log_header: typing.Optional if connect_log_header is None: logger.debug("Connection log header is None, composing a new one") connect_log_header = self._compose_log_header_packet() - logger.debug("Writing log header with id {}".format(id(connect_log_header))) + logger.debug("Writing log header with vf_id {}".format(connect_log_header.values.get("virtualfileid"))) self._internal_write_packet(connect_log_header) def _internal_write_packet(self, packet: Packet): @@ -217,9 +217,10 @@ def _impl_connect(self, connect_log_header: Optional[LogHeader] = None): try: self._internal_connect(connect_log_header) logger.debug(f"{self.__class__.__name__} connected succesfully.") + logger.debug( - "Storing successful log header with id {} as reconnect log header.".format( - id(connect_log_header)) + "Storing successful log header with vf_id {} as reconnect log header.".format( + None if connect_log_header is None else connect_log_header.values.get("virtualfileid")) ) self._reconnect_log_header = connect_log_header self._connected = True @@ -228,8 +229,8 @@ def _impl_connect(self, connect_log_header: Optional[LogHeader] = None): except Exception as exception: logger.debug(f"{self.__class__.__name__} failed to connect.") logger.debug( - "Storing log header with id {} used at connection as reconnect log header.".format( - id(connect_log_header)) + "Storing log header with vf_id {} used at connection as reconnect log header.".format( + None if connect_log_header is None else connect_log_header.values.get("virtualfileid")) ) self._reconnect_log_header = connect_log_header self._reset() @@ -338,8 +339,8 @@ def __schedule_connect(self) -> None: command.action = SchedulerAction.CONNECT log_header = self._compose_log_header_packet() logger.debug( - "Scheduling connection. Log header with id {} composed, scheduling into SchedulerQueue".format( - id(log_header))) + "Scheduling connection. with vf_id {} composed, scheduling into SchedulerQueue".format( + log_header.values["virtualfileid"])) command.state = log_header self.__scheduler.schedule(command, SchedulerQueueEnd.TAIL) @@ -402,7 +403,8 @@ def remove_listener(self, listener: ProtocolListener): def _internal_reconnect(self) -> bool: logger.debug( - "Using current reconnect log header with id {} during reconnect".format(id(self._reconnect_log_header))) + "Using current reconnect log header with vf_id {} during reconnect".format( + None if self._reconnect_log_header is None else self._reconnect_log_header.values.get("virtualfileid"))) self._internal_connect(self._reconnect_log_header) return True @@ -415,16 +417,17 @@ def __flush_queue(self) -> None: self.__forward_packet(packet, False) packet = self.__queue.pop() - def __forward_packet(self, packet: Packet, disconnect: bool) -> None: + def __forward_packet(self, packet: [Packet, LogHeader], disconnect: bool) -> None: if not self._connected: if not self.__keep_open: self._internal_connect(self._reconnect_log_header) if packet.packet_type == PacketType.LOG_HEADER: self._reconnect_log_header = packet logger.debug( - "Updating reconnect log header - logheader packet with id {} was forwarded.".format( - id(packet)) + "Updating reconnect log header - logheader packet with vf_id {} was forwarded.".format( + packet.values.get("virtualfileid")) ) + self._connected = True self.__failed = False else: @@ -452,11 +455,11 @@ def __do_reconnect(self, connect_log_header: typing.Optional[LogHeader] = None) if self._internal_reconnect(): self._connected = True logger.debug( - "Reconnect successful - storing log header with id {} as current reconnect log header".format( - connect_log_header) + "Reconnect successful - storing log header with vf_id {} as current reconnect log header".format( + None if connect_log_header is None else connect_log_header.values.get("virtualfileid")) ) self._reconnect_log_header = connect_log_header - except Exception as e: + except Exception: pass # Reconnect exceptions are not reported, # but we need to record that the last connection attempt diff --git a/scheduler/scheduler.py b/scheduler/scheduler.py index e14dc92..dc9d31d 100644 --- a/scheduler/scheduler.py +++ b/scheduler/scheduler.py @@ -65,10 +65,11 @@ def __run_command(self, command: SchedulerCommand) -> None: # noinspection PyBroadException try: if action == SchedulerAction.CONNECT: - connect_log_header = command.state + connect_log_header: LogHeader = command.state logger.debug( - "Received CONNECT command with log header id {}. Using it to connect".format( - id(connect_log_header))) + "Received CONNECT command with log header vf_id {}. Using it to connect".format( + connect_log_header.values.get("virtualfileid"))) + protocol._impl_connect(connect_log_header) elif action == SchedulerAction.WRITE_PACKET: self.__write_packet_action(command) diff --git a/scheduler/scheduler_command.py b/scheduler/scheduler_command.py index 24273f0..e2305b7 100644 --- a/scheduler/scheduler_command.py +++ b/scheduler/scheduler_command.py @@ -1,8 +1,9 @@ from typing import Optional, Union from common.protocol_command import ProtocolCommand -from scheduler.scheduler_action import SchedulerAction +from packets.log_header import LogHeader from packets.packet import Packet +from scheduler.scheduler_action import SchedulerAction class SchedulerCommand: @@ -20,7 +21,7 @@ def action(self, action: SchedulerAction) -> None: self.__action = action @property - def state(self) -> Union[ProtocolCommand, Packet, object]: + def state(self) -> Union[ProtocolCommand, Packet, LogHeader, object]: return self.__state @state.setter