-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathidentity_controller_servicer.py
78 lines (61 loc) · 3.3 KB
/
identity_controller_servicer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import grpc
from csi_general import identity_pb2 as pb2
from csi_general import identity_pb2_grpc as pb2_grpc
from controllers.common.config import config as common_config
from controllers.servers.csi.decorators import csi_method
from controllers.servers.csi.exception_handler import build_error_response
from controllers.common.csi_logger import get_stdout_logger
logger = get_stdout_logger()
class IdentityControllerServicer(pb2_grpc.IdentityServicer):
@csi_method(error_response_type=pb2.GetIdentityResponse)
def GetIdentity(self, request, context):
name = common_config.identity.name
version = common_config.identity.version
if not name or not version:
message = "plugin name or version cannot be empty"
return build_error_response(message, context, grpc.StatusCode.INTERNAL, pb2.GetIdentityResponse)
return pb2.GetIdentityResponse(name=name, vendor_version=version)
def GetCapabilities(self, request, context):
logger.info("GetCapabilities")
response = pb2.GetCapabilitiesResponse(
capabilities=[self._get_replication_capability(),
self._get_controller_capability(),
self._get_network_fence_capability(),
self._get_volume_group_capability(),
self._get_vg_limit_volume_to_one_vg_capability(),
self._get_vg_disable_deleting_volumes_when_vg_deleted_capability()])
logger.info("finished GetCapabilities")
return response
def _get_replication_capability(self):
types = pb2.Capability.VolumeReplication.Type
capability_enum_value = types.Value("VOLUME_REPLICATION")
return pb2.Capability(
volume_replication=pb2.Capability.VolumeReplication(type=capability_enum_value))
def _get_controller_capability(self):
types = pb2.Capability.Service.Type
capability_enum_value = types.Value("CONTROLLER_SERVICE")
return pb2.Capability(
service=pb2.Capability.Service(type=capability_enum_value))
def _get_network_fence_capability(self):
types = pb2.Capability.NetworkFence.Type
capability_enum_value = types.Value("NETWORK_FENCE")
return pb2.Capability(
network_fence=pb2.Capability.NetworkFence(type=capability_enum_value))
def _get_volume_group_capability(self):
types = pb2.Capability.VolumeGroup.Type
capability_enum_value = types.Value("VOLUME_GROUP")
return pb2.Capability(
volume_group=pb2.Capability.VolumeGroup(type=capability_enum_value))
def _get_vg_limit_volume_to_one_vg_capability(self):
types = pb2.Capability.VolumeGroup.Type
capability_enum_value = types.Value("LIMIT_VOLUME_TO_ONE_VG")
return pb2.Capability(
volume_group=pb2.Capability.VolumeGroup(type=capability_enum_value))
def _get_vg_disable_deleting_volumes_when_vg_deleted_capability(self):
types = pb2.Capability.VolumeGroup.Type
capability_enum_value = types.Value("DO_NOT_ALLOW_VG_TO_DELETE_VOLUMES")
return pb2.Capability(
volume_group=pb2.Capability.VolumeGroup(type=capability_enum_value))
def Probe(self, request, context):
context.set_code(grpc.StatusCode.OK)
return pb2.ProbeResponse()