forked from doitintl/iris3
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplugin.py
210 lines (173 loc) · 6.95 KB
/
plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import logging
import pkgutil
import re
import typing
from abc import ABCMeta, abstractmethod
from googleapiclient import discovery
from googleapiclient import errors
from util import gcp_utils, config_utils
from util.config_utils import (
is_copying_labels_from_project,
iris_prefix,
specific_prefix,
)
from util.utils import methods, cls_by_name, log_time, timed_lru_cache
PLUGINS_MODULE = "plugins"
class Plugin(object, metaclass=ABCMeta):
__proj_regex = re.compile(r"[a-z]([-a-z0-9]*[a-z0-9])?")
# Underlying API max is 1000; avoid off-by-one errors
# We send a batch when _BATCH_SIZE or more tasks are in it.
_BATCH_SIZE = 990
# For a class to know its subclasses and their instances is generally bad.
# We could create a separate PluginManager but let's not get too Java-ish.
plugins: typing.Dict[str, "Plugin"]
plugins = {}
def __init__(self):
self._google_client = discovery.build(*self.discovery_api())
self.__init_batch_req()
@classmethod
@abstractmethod
def discovery_api(cls) -> typing.Tuple[str, str]:
pass
@classmethod
def relabel_on_cron(cls) -> bool:
"""
We must minimize labeling on cron because it is costly.
Return True if that is needed.
When is_labeled_on_creation is False, we also label on cron
"""
return False
@classmethod
def is_labeled_on_creation(cls) -> bool:
"""
Only a few classes are labeled on creation, and these classes should override this method.
"""
return True
@timed_lru_cache(seconds=600, maxsize=512)
def _project_labels(self, project_id) -> typing.Dict:
assert self.__proj_regex.match(
project_id
), f"Project ID is illegal: {project_id}"
try:
proj = gcp_utils.get_project(project_id)
labels = proj.get("labels", {})
return labels
except errors.HttpError as e:
logging.exception(f"Failing to get labels for project {project_id}: {e}")
return {}
def __iris_labels(self, gcp_object) -> typing.Dict[str, str]:
func_name_pfx = "_gcp_"
def legalize_value(s):
"""
Only hyphens (-), underscores (_), lowercase characters,
and numbers are allowed in label values. International characters are allowed.
"""
label_chars = re.compile(r"[\w\d_-]") # cached
return "".join(c if label_chars.match(c) else "_" for c in s).lower()[:62]
def value(func, gcp_obj):
return legalize_value(func(gcp_obj))
def key(func) -> str:
resource_type = type(self).__name__
general_pfx = iris_prefix()
assert general_pfx is not None
specific_pfx = specific_prefix(resource_type)
pfx = specific_pfx if specific_pfx is not None else general_pfx
pfx_full = pfx + "_" if pfx else ""
return pfx_full + func.__name__[len(func_name_pfx) :]
ret = {key(f): value(f, gcp_object) for f in methods(self, func_name_pfx)}
return ret
def __batch_callback(self, request_id, response, exception):
if exception is not None:
logging.exception(
"in __batch_callback(), %s",
exc_info=exception,
)
def do_batch(self):
"""In do_label, we loop over all objects. But for efficienccy, we do not process
then all at once, but rather gather objects and process them in batches of
self._BATCH_SIZE as we loop; then parse the remaining at the end of the loop"""
try:
self._batch.execute()
except Exception as e:
logging.exception(e)
self.__init_batch_req()
@abstractmethod
def label_all(self, project_id):
"""Label all objects of a type in a given project"""
pass
@abstractmethod
def get_gcp_object(self, log_data):
"""Parse logging data to get a GCP object"""
pass
@abstractmethod
def label_resource(self, gcp_object: typing.Dict, project_id: str):
"""Tag a single new object based on its description that comes from alog-line"""
pass
@abstractmethod
def api_name(self):
pass
@abstractmethod
def method_names(self):
pass
@classmethod
@log_time
def init(cls):
def load_plugin_class(name) -> typing.Type:
module_name = PLUGINS_MODULE + "." + name
__import__(module_name)
assert name == name.lower(), name
plugin_cls = cls_by_name(PLUGINS_MODULE + "." + name + "." + name.title())
return plugin_cls
for _, module, _ in pkgutil.iter_modules([PLUGINS_MODULE]):
if config_utils.is_plugin_enabled(module):
plugin_class = load_plugin_class(module)
instance = plugin_class()
Plugin.plugins[plugin_class.__name__] = instance
assert Plugin.plugins, "No plugins defined"
@staticmethod
def get_plugin(plugin_name: str) -> "Plugin":
return Plugin.plugins.get(plugin_name)
def _build_labels(self, gcp_object, project_id):
"""
:return dict including original labels, project labels (if the system is configured to add those)
and new labels. But if that would result in no change, return None
"""
original_labels = gcp_object["labels"] if "labels" in gcp_object else {}
project_labels = (
self._project_labels(project_id) if is_copying_labels_from_project() else {}
)
iris_labels = self.__iris_labels(gcp_object)
all_labels = {**original_labels, **project_labels, **iris_labels}
if self.block_labeling(gcp_object, original_labels):
return None
elif all_labels == original_labels:
# Skip labeling because no change
return None
else:
labels = {"labels": all_labels}
fingerprint = gcp_object.get("labelFingerprint", "")
if fingerprint:
labels["labelFingerprint"] = fingerprint
return labels
def _name_after_slash(self, gcp_object):
return self.__name(gcp_object, separator="/")
def _name_no_separator(self, gcp_object):
return self.__name(gcp_object, separator="")
def __name(self, gcp_object, separator=""):
try:
name = gcp_object["name"]
if separator:
index = name.rfind(separator)
name = name[index + 1 :]
return name
except KeyError as e:
logging.exception(e)
return None
def __init_batch_req(self):
self.counter = 0
self._batch = self._google_client.new_batch_http_request(
callback=self.__batch_callback
)
# Override and return True if this object must not be labeled (for example, GKE objects)
def block_labeling(self, block_labeling, original_labels):
return False