diff --git a/controllers/scripts/csi_general/csi_pb2.sh b/controllers/scripts/csi_general/csi_pb2.sh index d485fabc4..f012dc491 100755 --- a/controllers/scripts/csi_general/csi_pb2.sh +++ b/controllers/scripts/csi_general/csi_pb2.sh @@ -12,6 +12,7 @@ cd ./proto/${PB2_DIR} curl -O https://raw.githubusercontent.com/container-storage-interface/spec/${CSI_VERSION}/csi.proto curl -O https://raw.githubusercontent.com/IBM/csi-volume-group/${VG_VERSION}/volumegroup/volumegroup.proto curl -O https://raw.githubusercontent.com/csi-addons/spec/v0.2.0/replication/replication.proto +curl -O https://raw.githubusercontent.com/csi-addons/spec/main/identity/identity.proto sed -i 's|github.com/container-storage-interface/spec/lib/go/csi/csi.proto|csi_general/csi.proto|g' replication.proto cd - diff --git a/controllers/servers/csi/controller_server_manager.py b/controllers/servers/csi/controller_server_manager.py deleted file mode 100644 index e5458403a..000000000 --- a/controllers/servers/csi/controller_server_manager.py +++ /dev/null @@ -1,57 +0,0 @@ -import os -import time -from concurrent import futures - -import grpc -from csi_general import csi_pb2_grpc, replication_pb2_grpc, volumegroup_pb2_grpc - -from controllers.common.config import config -from controllers.common.csi_logger import get_stdout_logger -from controllers.common.settings import CSI_CONTROLLER_SERVER_WORKERS -from controllers.servers.csi.addons_server import ReplicationControllerServicer -from controllers.servers.csi.csi_controller_server import CSIControllerServicer -from controllers.servers.csi.volume_group_server import VolumeGroupControllerServicer - -logger = get_stdout_logger() - - -def get_max_workers_count(): - cpu_count = (os.cpu_count() or 1) + 4 - return CSI_CONTROLLER_SERVER_WORKERS if cpu_count < CSI_CONTROLLER_SERVER_WORKERS else None - - -class ControllerServerManager: - def __init__(self, array_endpoint): - self.endpoint = array_endpoint - self.csi_servicer = CSIControllerServicer() - self.replication_servicer = ReplicationControllerServicer() - self.volume_group_servicer = VolumeGroupControllerServicer() - - def start_server(self): - max_workers = get_max_workers_count() - controller_server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) - - csi_pb2_grpc.add_ControllerServicer_to_server(self.csi_servicer, controller_server) - csi_pb2_grpc.add_IdentityServicer_to_server(self.csi_servicer, controller_server) - replication_pb2_grpc.add_ControllerServicer_to_server(self.replication_servicer, controller_server) - volumegroup_pb2_grpc.add_ControllerServicer_to_server(self.volume_group_servicer, controller_server) - - # bind the server to the port defined above - # controller_server.add_insecure_port('[::]:{}'.format(self.server_port)) - # controller_server.add_insecure_port('unix://{}'.format(self.server_port)) - controller_server.add_insecure_port(self.endpoint) - - logger.info("Controller version: {}".format(config.identity.version)) - - # start the server - logger.debug("Listening for connections on endpoint address: {}".format(self.endpoint)) - - controller_server.start() - logger.debug('Controller Server running ...') - - try: - while True: - time.sleep(60 * 60 * 60) - except KeyboardInterrupt: - controller_server.stop(0) - logger.debug('Controller Server Stopped ...') diff --git a/controllers/servers/csi/csi_addons_server/__init__.py b/controllers/servers/csi/csi_addons_server/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/controllers/servers/csi/addons_server.py b/controllers/servers/csi/csi_addons_server/replication_controller_servicer.py similarity index 100% rename from controllers/servers/csi/addons_server.py rename to controllers/servers/csi/csi_addons_server/replication_controller_servicer.py diff --git a/controllers/servers/csi/main.py b/controllers/servers/csi/main.py index f17018bf0..830253f26 100644 --- a/controllers/servers/csi/main.py +++ b/controllers/servers/csi/main.py @@ -1,19 +1,70 @@ +import os from argparse import ArgumentParser +from threading import Thread +from concurrent import futures +import grpc +from concurrent import futures + +from csi_general import csi_pb2_grpc, volumegroup_pb2_grpc, identity_pb2_grpc, replication_pb2_grpc from controllers.common.csi_logger import set_log_level -from controllers.servers.csi.controller_server_manager import ControllerServerManager +from controllers.common.settings import CSI_CONTROLLER_SERVER_WORKERS +from controllers.servers.csi.server_manager import ServerManager +from controllers.servers.csi.csi_controller_server import CSIControllerServicer +from controllers.servers.csi.volume_group_server import VolumeGroupControllerServicer +from controllers.servers.csi.csi_addons_server.replication_controller_servicer import ReplicationControllerServicer def main(): parser = ArgumentParser() parser.add_argument("-e", "--csi-endpoint", dest="endpoint", help="grpc endpoint") + parser.add_argument("-a", "--csi-addons-endpoint", dest="addonsendpoint", help="CSI-Addons grpc endpoint") parser.add_argument("-l", "--loglevel", dest="loglevel", help="log level") arguments = parser.parse_args() set_log_level(arguments.loglevel) + controller_server = _create_grpc_server() + csi_addons_server = _create_grpc_server() + + csi_controller_server_manager = ServerManager(arguments.endpoint, "Controller", + _add_csi_controller_servicers(controller_server)) + csi_addons_server_manager = ServerManager(arguments.addonsendpoint, "CSI Addons", + _add_csi_addons_servicers(csi_addons_server)) + _start_servers(csi_controller_server_manager, csi_addons_server_manager) + + +def _create_grpc_server(): + max_workers = _get_max_workers_count() + return grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) + + +def _get_max_workers_count(): + cpu_count = (os.cpu_count() or 1) + 4 + return CSI_CONTROLLER_SERVER_WORKERS if cpu_count < CSI_CONTROLLER_SERVER_WORKERS else None + + +def _add_csi_controller_servicers(controller_server): + csi_servicer = CSIControllerServicer() + volume_group_servicer = VolumeGroupControllerServicer() + csi_pb2_grpc.add_ControllerServicer_to_server(csi_servicer, controller_server) + csi_pb2_grpc.add_IdentityServicer_to_server(csi_servicer, controller_server) + volumegroup_pb2_grpc.add_ControllerServicer_to_server(volume_group_servicer, controller_server) + return controller_server + + +def _add_csi_addons_servicers(csi_addons_server): + replication_servicer = ReplicationControllerServicer() + replication_pb2_grpc.add_ControllerServicer_to_server(replication_servicer, csi_addons_server) + return csi_addons_server + - server_manager = ControllerServerManager(arguments.endpoint) - server_manager.start_server() +def _start_servers(csi_controller_server_manager, csi_addons_server_manager): + servers = ( + csi_controller_server_manager.start_server, + csi_addons_server_manager.start_server) + for server_function in servers: + thread = Thread(target=server_function,) + thread.start() if __name__ == '__main__': diff --git a/controllers/servers/csi/server_manager.py b/controllers/servers/csi/server_manager.py new file mode 100644 index 000000000..0c80e4c19 --- /dev/null +++ b/controllers/servers/csi/server_manager.py @@ -0,0 +1,34 @@ +import time + +from controllers.common.config import config +from controllers.common.csi_logger import get_stdout_logger + +logger = get_stdout_logger() + + +class ServerManager: + def __init__(self, array_endpoint, server_type, grpc_server): + self.endpoint = array_endpoint + self.server_type = server_type + self.grpc_server = grpc_server + + def start_server(self): + # bind the server to the port defined above + # grpc_server.add_insecure_port('[::]:{}'.format(self.server_port)) + # grpc_server.add_insecure_port('unix://{}'.format(self.server_port)) + self.grpc_server.add_insecure_port(self.endpoint) + + logger.info("{} version: {}".format(self.server_type, config.identity.version)) + + # start the server + logger.debug("Listening for connections on endpoint address: {}".format(self.endpoint)) + + self.grpc_server.start() + logger.debug('{} Server running ...'.format(self.server_type)) + + try: + while True: + time.sleep(60 * 60 * 60) + except KeyboardInterrupt: + self.grpc_server.stop(0) + logger.debug('Controller Server Stopped ...') diff --git a/controllers/tests/controller_server/addons_server_test.py b/controllers/tests/controller_server/addons_server_test.py index 7a323ce1b..2b9c13c05 100644 --- a/controllers/tests/controller_server/addons_server_test.py +++ b/controllers/tests/controller_server/addons_server_test.py @@ -7,7 +7,7 @@ from controllers.servers.settings import PARAMETERS_SYSTEM_ID, PARAMETERS_COPY_TYPE, PARAMETERS_REPLICATION_POLICY from controllers.array_action.settings import REPLICATION_TYPE_MIRROR, REPLICATION_TYPE_EAR, REPLICATION_COPY_TYPE_SYNC from controllers.array_action.array_action_types import ReplicationRequest -from controllers.servers.csi.addons_server import ReplicationControllerServicer +from controllers.servers.csi.csi_addons_server.replication_controller_servicer import ReplicationControllerServicer from controllers.tests import utils from controllers.tests.common.test_settings import VOLUME_NAME, VOLUME_UID, OBJECT_INTERNAL_ID, \ OTHER_OBJECT_INTERNAL_ID, REPLICATION_NAME, SYSTEM_ID, COPY_TYPE, SECRET_USERNAME_VALUE, SECRET_PASSWORD_VALUE, \ @@ -16,7 +16,7 @@ from controllers.tests.controller_server.csi_controller_server_test import (CommonControllerTest) from controllers.tests.utils import ProtoBufMock -ADDON_SERVER_PATH = "controllers.servers.csi.addons_server" +ADDON_SERVER_PATH = "controllers.servers.csi.csi_addons_server.replication_controller_servicer" class BaseReplicationSetUp(unittest.TestCase):