Skip to content

Commit c60832a

Browse files
Update DF service functions for uploads, etc. (#48)
* Add private method to scrub user payload during call operation, refactored from main call() function * Add private method to handle paging during main call() function * Add private method to handle error CdpError parsing in main call() function * Add new helper method to encode values for HTTP submission * Add new helper method to expand file paths in local user OS * Add private method to handle new alternative call() process which handles functions that the CDP Control Plane redirects to another URL to complete a file upload * Move private method to handle standard call()s with unchanged functionality * Simplify existing call() method to use these refactored methods to improve readability given new branching behavior requirements * Modify DF import custom flow and custom flow version to use the new redirect call and cleanly upload files to the control plane * Add Squelch for 409 error with uploads where the file already exists * Prevent exceptions when errors are squelched. * Add dataviz functions to the dw module. Signed-off-by: Andre Araujo <[email protected]> Signed-off-by: Daniel Chaffelson <[email protected]> Co-authored-by: Daniel Chaffelson <[email protected]>
1 parent 4ecedfd commit c60832a

File tree

3 files changed

+214
-46
lines changed

3 files changed

+214
-46
lines changed

src/cdpy/common.py

Lines changed: 107 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
from urllib3.exceptions import InsecureRequestWarning
1717
from json import JSONDecodeError
1818
from typing import Union
19+
import urllib.parse as urlparse
20+
from os import path
1921

2022
import cdpcli
2123
from cdpcli import VERSION as CDPCLI_VERSION
@@ -457,6 +459,20 @@ def _get_path(obj, path):
457459
return None
458460
return value
459461

462+
@staticmethod
463+
def encode_value(value):
464+
if value:
465+
return urlparse.quote(value)
466+
return None
467+
468+
def expand_file_path(self, file_path):
469+
if path.exists(file_path):
470+
return path.expandvars(path.expanduser(file_path))
471+
else:
472+
self.throw_error(
473+
CdpError('Path [{}] not found'.format(file_path))
474+
)
475+
460476
def wait_for_state(self, describe_func, params: dict, field: Union[str, None, list] = 'status',
461477
state: Union[list, str, None] = None, delay: int = 15, timeout: int = 3600,
462478
ignore_failures: bool = False):
@@ -538,8 +554,85 @@ def wait_for_state(self, describe_func, params: dict, field: Union[str, None, li
538554
CdpError("Timeout waiting for function {0} with params [{1}] to return field {2} with state {3}"
539555
.format(describe_func.__name__, str(params), field, str(state))))
540556

557+
def _scrub_inputs(self, inputs):
558+
# Used in main call() function
559+
logging.debug("Scrubbing inputs in payload")
560+
# Remove unused submission values as the API rejects them
561+
payload = {x: y for x, y in inputs.items() if y is not None}
562+
# Remove and issue warning for empty string submission values as the API rejects them
563+
_ = [self.throw_warning(
564+
CdpWarning('Removing empty string arg %s from submission' % x))
565+
for x, y in payload.items() if y == '']
566+
payload = {x: y for x, y in payload.items() if y != ''}
567+
return payload
568+
569+
def _handle_paging(self, response, call_function, payload):
570+
# Used in main call() function
571+
while 'nextToken' in response:
572+
token = response.pop('nextToken')
573+
next_page = call_function(
574+
**payload, startingToken=token, pageSize=self.DEFAULT_PAGE_SIZE)
575+
for key in next_page.keys():
576+
if isinstance(next_page[key], str):
577+
response[key] = next_page[key]
578+
elif isinstance(next_page[key], list):
579+
response[key] += (next_page[key])
580+
return response
581+
582+
def _handle_call_errors(self, err, squelch):
583+
# Used in main call() function
584+
# Note that the cascade of behaviors here is designed to be convenient for Ansible module development
585+
parsed_err = CdpError(err)
586+
if self.debug:
587+
log = self.get_log()
588+
parsed_err.update(sdk_out=log, sdk_out_lines=log.splitlines())
589+
if self.strict_errors is True:
590+
self.throw_error(parsed_err)
591+
if isinstance(err, ClientError):
592+
if squelch is not None:
593+
for item in squelch:
594+
if item.value in str(parsed_err.__getattribute__(item.field)):
595+
warning = item.warning if item.warning is not None else str(parsed_err.violations)
596+
self.throw_warning(CdpWarning(warning))
597+
return item.default
598+
return parsed_err
599+
600+
def _handle_redirect_call(self, client, call_function, payload, headers):
601+
# cdpcli/extensions/redirect.py
602+
http, resp = client.make_api_call(
603+
client.meta.method_to_api_mapping[call_function],
604+
payload,
605+
allow_redirects=False
606+
)
607+
if not http.is_redirect:
608+
self.throw_error(CdpError("Redirect headers supplied but no redirect URL from API call"))
609+
redirect_url = http.headers.get('Location', None)
610+
611+
if redirect_url is not None:
612+
with open(self.expand_file_path(payload['file']), 'rb') as f:
613+
http, full_response = client.make_request(
614+
operation_name=client.meta.method_to_api_mapping[call_function],
615+
method='post',
616+
url_path=redirect_url,
617+
headers=self._scrub_inputs(inputs=headers),
618+
body=f
619+
)
620+
else:
621+
self.throw_error(CdpError("Redirect call attempted but redirect URL was empty"))
622+
return full_response
623+
624+
def _handle_std_call(self, client, call_function, payload):
625+
func_to_call = getattr(client, call_function)
626+
raw_response = func_to_call(**payload)
627+
if raw_response is not None and 'nextToken' in raw_response:
628+
logging.debug("Found paged results in %s" % call_function)
629+
full_response = self._handle_paging(raw_response, func_to_call, payload)
630+
else:
631+
full_response = raw_response
632+
return full_response
633+
541634
def call(self, svc: str, func: str, ret_field: str = None, squelch: ['Squelch'] = None, ret_error: bool = False,
542-
**kwargs: Union[dict, bool, str, list]) -> Union[list, dict, 'CdpError']:
635+
redirect_headers: dict = None, **kwargs: Union[dict, bool, str, list]) -> Union[list, dict, 'CdpError']:
543636
"""
544637
Wraps the call to an underlying CDP CLI Service, handles common errors, and parses output
545638
@@ -550,61 +643,34 @@ def call(self, svc: str, func: str, ret_field: str = None, squelch: ['Squelch']
550643
squelch (list(Squelch)): list of Descriptions of Error squelching options
551644
ret_error (bool): Whether to return the error object if generated,
552645
defaults to False and raise instead
553-
**kwargs (dict): Keyword Args to be supplied to the Function, eg userId
646+
redirect_headers (dict): Dict of http submission headers for the call, triggers redirected upload call.
647+
**kwargs (dict): Keyword Args to be supplied to the Function, e.g. userId
554648
555649
Returns (dict, list, None): Output of CDP CLI Call
556650
"""
557651
try:
558-
call_function = getattr(self._client(service=svc, parameters=kwargs), func)
559652
if self.scrub_inputs:
560-
# Remove unused submission values as the API rejects them
561-
payload = {x: y for x, y in kwargs.items() if y is not None}
562-
# Remove and issue warning for empty string submission values as the API rejects them
563-
_ = [self.throw_warning(
564-
CdpWarning('Removing empty string arg %s from submission' % x))
565-
for x, y in payload.items() if y == '']
566-
payload = {x: y for x, y in payload.items() if y != ''}
653+
payload = self._scrub_inputs(inputs=kwargs)
567654
else:
568655
payload = kwargs
569656

570-
resp = call_function(**payload)
657+
svc_client = self._client(service=svc, parameters=payload)
571658

572-
if 'nextToken' in resp:
573-
while 'nextToken' in resp:
574-
logging.debug("Found paged results in %s" % func)
575-
token = resp.pop('nextToken')
576-
next_page = call_function(
577-
**payload, startingToken=token, pageSize=self.DEFAULT_PAGE_SIZE)
578-
for key in next_page.keys():
579-
if isinstance(next_page[key], str):
580-
resp[key] = next_page[key]
581-
elif isinstance(next_page[key], list):
582-
resp[key] += (next_page[key])
659+
if redirect_headers is not None:
660+
full_response = self._handle_redirect_call(svc_client, func, payload, redirect_headers)
661+
else:
662+
full_response = self._handle_std_call(svc_client, func, payload)
583663

584664
if ret_field is not None:
585-
if not resp:
665+
if not full_response:
586666
self.throw_warning(CdpWarning('Call Response is empty, cannot return child field %s' % ret_field))
587667
else:
588-
return resp[ret_field]
589-
590-
return resp
668+
return full_response[ret_field]
669+
return full_response
591670

592671
except Exception as err:
593-
# Note that the cascade of behaviors here is designed to be convenient for Ansible module development
594-
parsed_err = CdpError(err)
595-
if self.debug:
596-
log = self.get_log()
597-
parsed_err.update(sdk_out=log, sdk_out_lines=log.splitlines())
598-
if self.strict_errors is True:
599-
self.throw_error(parsed_err)
600-
if isinstance(err, ClientError):
601-
if squelch is not None:
602-
for item in squelch:
603-
if item.value in str(parsed_err.__getattribute__(item.field)):
604-
warning = item.warning if item.warning is not None else str(parsed_err.violations)
605-
self.throw_warning(CdpWarning(warning))
606-
return item.default
607-
if ret_error is True:
672+
parsed_err = self._handle_call_errors(err, squelch)
673+
if ret_error is True or not isinstance(parsed_err, CdpError):
608674
return parsed_err
609675
self.throw_error(parsed_err)
610676

src/cdpy/df.py

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from cdpy.common import CdpSdkBase, Squelch, CdpError, CdpWarning
44
from cdpcli.extensions.df.createdeployment import CreateDeploymentOperationCaller
55

6-
ENTITLEMENT_DISABLED='DataFlow not enabled on CDP Tenant'
6+
ENTITLEMENT_DISABLED = 'DataFlow not enabled on CDP Tenant'
7+
78

89
class CdpyDf(CdpSdkBase):
910
def __init__(self, *args, **kwargs):
@@ -156,7 +157,7 @@ def list_readyflows(self, name=None):
156157
Squelch(value='NOT_FOUND',
157158
warning='No ReadyFlows found within your CDP Tenant'),
158159
Squelch(value='PATH_DISABLED',
159-
warning=ENTITLEMENT_DISABLED)
160+
warning=ENTITLEMENT_DISABLED)
160161
],
161162
)
162163
if name is not None:
@@ -198,7 +199,7 @@ def import_readyflow(self, def_crn):
198199
Squelch(value='NOT_FOUND',
199200
warning='No ReadyFlow Definition with crn %s found' % def_crn),
200201
Squelch(value='PATH_DISABLED',
201-
warning=ENTITLEMENT_DISABLED)
202+
warning=ENTITLEMENT_DISABLED)
202203
],
203204
readyflowCrn=def_crn
204205
)
@@ -211,7 +212,7 @@ def delete_added_readyflow(self, def_crn):
211212
Squelch(value='NOT_FOUND',
212213
warning='No ReadyFlow Definition with crn %s found' % def_crn),
213214
Squelch(value='PATH_DISABLED',
214-
warning=ENTITLEMENT_DISABLED)
215+
warning=ENTITLEMENT_DISABLED)
215216
],
216217
readyflowCrn=def_crn
217218
)
@@ -224,7 +225,7 @@ def describe_added_readyflow(self, def_crn, sort_versions=True):
224225
Squelch(value='NOT_FOUND',
225226
warning='No ReadyFlow Definition with crn %s found' % def_crn),
226227
Squelch(value='PATH_DISABLED',
227-
warning=ENTITLEMENT_DISABLED)
228+
warning=ENTITLEMENT_DISABLED)
228229
],
229230
readyflowCrn=def_crn
230231
)
@@ -251,6 +252,54 @@ def describe_customflow(self, def_crn, sort_versions=True):
251252
out['versions'] = sorted(result['versions'], key=lambda d: d['version'], reverse=True)
252253
return out
253254

255+
def import_customflow(self, def_file, name, description=None, comments=None):
256+
# cdpcli/extensions/df/__init__.py: DfExtension._df_upload_flow
257+
258+
return self.sdk.call(
259+
svc='df', func='import_flow_definition', squelch=[
260+
Squelch(value='PATH_DISABLED', warning=ENTITLEMENT_DISABLED),
261+
Squelch(field='status_code', value='409')
262+
],
263+
redirect_headers={
264+
'Content-Type': 'application/json',
265+
'Flow-Definition-Name': self.sdk.encode_value(name),
266+
'Flow-Definition-Description': self.sdk.encode_value(description),
267+
'Flow-Definition-Comments': self.sdk.encode_value(comments)
268+
},
269+
name=name,
270+
file=def_file,
271+
description=description,
272+
comments=comments
273+
)
274+
275+
def import_customflow_version(self, def_crn, def_file, comments=None):
276+
self.sdk.validate_crn(def_crn, 'flow')
277+
return self.sdk.call(
278+
svc='df', func='import_flow_definition_version', ret_field='.', squelch=[
279+
Squelch(value='PATH_DISABLED', warning=ENTITLEMENT_DISABLED),
280+
Squelch(field='status_code', value='409')
281+
],
282+
redirect_headers={
283+
'Content-Type': 'application/json',
284+
'Flow-Definition-Comments': self.sdk.encode_value(comments)
285+
},
286+
flowCrn=def_crn,
287+
file=def_file,
288+
comments=comments
289+
)
290+
291+
def delete_customflow(self, def_crn):
292+
self.sdk.validate_crn(def_crn, 'flow')
293+
return self.sdk.call(
294+
svc='df', func='delete_flow', ret_field='flow', squelch=[
295+
Squelch(value='NOT_FOUND',
296+
warning='No Flow Definition with crn %s found' % def_crn),
297+
Squelch(value='PATH_DISABLED',
298+
warning=ENTITLEMENT_DISABLED)
299+
],
300+
flowCrn=def_crn
301+
)
302+
254303
def get_version_crn_from_flow_definition(self, flow_name, version=None):
255304
summary_list = self.list_flow_definitions(name=flow_name)
256305
if summary_list:

src/cdpy/dw.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,18 @@ def describe_dbc(self, cluster_id, dbc_id):
6363
dbcId=dbc_id
6464
)
6565

66+
def describe_data_visualization(self, cluster_id, data_viz_id):
67+
return self.sdk.call(
68+
svc='dw', func='describe_data_visualization', ret_field='dataVisualization', squelch=[
69+
Squelch('NOT_FOUND'),
70+
Squelch('not found', field='violations'),
71+
Squelch('INVALID_ARGUMENT'),
72+
Squelch(value='PATH_DISABLED', warning=ENTITLEMENT_DISABLED)
73+
],
74+
clusterId=cluster_id,
75+
dataVisualizationId=data_viz_id,
76+
)
77+
6678
def list_clusters(self, env_crn=None):
6779
resp = self.sdk.call(
6880
svc='dw', func='list_clusters', ret_field='clusters', squelch=[
@@ -74,6 +86,15 @@ def list_clusters(self, env_crn=None):
7486
return [x for x in resp if env_crn == x['environmentCrn']]
7587
return resp
7688

89+
def list_data_visualizations(self, cluster_id):
90+
return self.sdk.call(
91+
svc='dw', func='list_data_visualizations', ret_field='dataVisualizations', squelch=[
92+
Squelch(value='NOT_FOUND', default=list()),
93+
Squelch(value='PATH_DISABLED', warning=ENTITLEMENT_DISABLED, default=list())
94+
],
95+
clusterId=cluster_id
96+
)
97+
7798
def gather_clusters(self, env_crn=None):
7899
self.sdk.validate_crn(env_crn)
79100
clusters = self.list_clusters(env_crn=env_crn)
@@ -107,6 +128,17 @@ def create_cluster(self, env_crn: str, overlay: bool, aws_public_subnets: list =
107128
]
108129
)
109130

131+
def create_data_visualization(self, cluster_id: str, name: str, config: dict = None):
132+
return self.sdk.call(
133+
svc='dw', func='create_data_visualization', ret_field='dataVisualizationId',
134+
squelch=[
135+
Squelch(value='PATH_DISABLED', warning=ENTITLEMENT_DISABLED)
136+
],
137+
clusterId=cluster_id,
138+
name=name,
139+
config=config,
140+
)
141+
110142
def delete_cluster(self, cluster_id: str, force: bool = False):
111143
return self.sdk.call(
112144
svc='dw', func='delete_cluster', squelch=[
@@ -116,6 +148,27 @@ def delete_cluster(self, cluster_id: str, force: bool = False):
116148
clusterId=cluster_id, force=force
117149
)
118150

151+
def delete_data_visualization(self, cluster_id: str, data_viz_id: str):
152+
return self.sdk.call(
153+
svc='dw', func='delete_data_visualization', squelch=[
154+
Squelch('NOT_FOUND'),
155+
Squelch(value='PATH_DISABLED', warning=ENTITLEMENT_DISABLED)
156+
],
157+
clusterId=cluster_id,
158+
dataVisualizationId=data_viz_id,
159+
)
160+
161+
def update_data_visualization(self, cluster_id: str, data_viz_id: str, config: dict):
162+
return self.sdk.call(
163+
svc='dw', func='update_data_visualization', squelch=[
164+
Squelch('NOT_FOUND'),
165+
Squelch(value='PATH_DISABLED', warning=ENTITLEMENT_DISABLED)
166+
],
167+
clusterId=cluster_id,
168+
dataVisualizationId=data_viz_id,
169+
config=config,
170+
)
171+
119172
def create_vw(self, cluster_id:str, dbc_id:str, vw_type:str, name:str, template:str = None,
120173
autoscaling_min_cluster:int = None, autoscaling_max_cluster:int = None,
121174
common_configs:dict = None, application_configs:dict = None, ldap_groups:list = None,

0 commit comments

Comments
 (0)