forked from dotnet/performance
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupload.py
93 lines (81 loc) · 4.53 KB
/
upload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from random import randint
import uuid
from azure.storage.blob import BlobClient, ContentSettings
from azure.storage.queue import QueueClient, TextBase64EncodePolicy
from azure.core.exceptions import ResourceExistsError, ClientAuthenticationError
from azure.identity import DefaultAzureCredential, ClientAssertionCredential, CertificateCredential
from traceback import format_exc
from glob import glob
from performance.common import retry_on_exception, RunCommand, helixpayload, base64_to_bytes, extension
from performance.constants import TENANT_ID, ARC_CLIENT_ID, CERT_CLIENT_ID
import os
import json
from logging import getLogger
class QueueMessage:
container_name: str
blob_name: str
def __init__(self, container: str, name: str):
self.container_name = container
self.blob_name = name
def get_unique_name(filename: str, unique_id: str) -> str:
newname = "{0}-{1}".format(unique_id, os.path.basename(filename))
if len(newname) > 1024:
newname = "{0}-perf-lab-report.json".format(randint(1000, 9999))
return newname
def upload(globpath: str, container: str, queue: str, sas_token_env: str, storage_account_uri: str):
try:
credential = None
try:
dac = DefaultAzureCredential()
credential = ClientAssertionCredential(TENANT_ID, ARC_CLIENT_ID, lambda: dac.get_token("api://AzureADTokenExchange/.default").token)
credential.get_token("https://storage.azure.com/.default")
except ClientAuthenticationError as ex:
credential = None
getLogger().info("Unable to use managed identity. Falling back to certificate.")
cmd_line = [(os.path.join(str(helixpayload()), 'certhelper', "CertHelper%s" % extension()))]
cert_helper = RunCommand(cmd_line, None, True, False, 0)
cert_helper.run()
for cert in cert_helper.stdout.splitlines():
credential = CertificateCredential(TENANT_ID, CERT_CLIENT_ID, certificate_data=base64_to_bytes(cert))
try:
credential.get_token("https://storage.azure.com/.default")
except ClientAuthenticationError as ex:
credential = None
continue
if credential is None:
getLogger().error("Unable to authenticate with managed identity or certificates.")
getLogger().info("Falling back to environment variable.")
credential = os.getenv(sas_token_env)
files = glob(globpath, recursive=True)
any_upload_or_queue_failed = False
for infile in files:
blob_name = get_unique_name(infile, os.getenv('HELIX_WORKITEM_ID') or str(uuid.uuid4()))
getLogger().info("uploading {}".format(infile))
blob_client = BlobClient(account_url=storage_account_uri.format('blob'), container_name=container, blob_name=blob_name, credential=credential)
upload_succeded = False
with open(infile, "rb") as data:
try:
retry_on_exception(lambda: blob_client.upload_blob(data, blob_type="BlockBlob", content_settings=ContentSettings(content_type="application/json")), raise_exceptions=[ResourceExistsError])
upload_succeded = True
except Exception as ex:
any_upload_or_queue_failed = True
getLogger().error("upload failed")
getLogger().error('{0}: {1}'.format(type(ex), str(ex)))
if upload_succeded:
if queue is not None:
try:
queue_client = QueueClient(account_url=storage_account_uri.format('queue'), queue_name=queue, credential=credential, message_encode_policy=TextBase64EncodePolicy())
message = QueueMessage(container, blob_name)
retry_on_exception(lambda: queue_client.send_message(json.dumps(message.__dict__)))
getLogger().info("upload and queue complete")
except Exception as ex:
any_upload_or_queue_failed = True
getLogger().error("queue failed")
getLogger().error('{0}: {1}'.format(type(ex), str(ex)))
else:
getLogger().info("upload complete")
return any_upload_or_queue_failed # 0 (False) if all uploads and queues succeeded, 1 (True) otherwise
except Exception as ex:
getLogger().error('{0}: {1}'.format(type(ex), str(ex)))
getLogger().error(format_exc())
return 1