Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 115 additions & 18 deletions src/scriptworker/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,38 @@ def to_upload_future(target_path):
path = os.path.join(context.config["artifact_dir"], target_path)
content_type, content_encoding = compress_artifact_if_supported(path)
return asyncio.ensure_future(
retry_create_artifact(context, path, target_path=target_path, content_type=content_type, content_encoding=content_encoding)
retry_create_artifact(
context,
"s3",
path,
target_path=target_path,
content_type=content_type,
content_encoding=content_encoding,
)
)

tasks = list(map(to_upload_future, files))

log_artifact = None
for artifact in ["public/logs/live_backing.log", "public/logs/chain_of_trust.json"]:
if artifact in files:
log_artifact = artifact
break

if log_artifact:
tasks.append(
asyncio.ensure_future(
retry_create_artifact(
context,
"reference",
url=context.queue.buildUrl(
"getArtifact", context.task_id, context.run_id, log_artifact,
),
target_path="public/logs/live.log",
content_type="text/plain; charset=utf-8",
)
)
)
await raise_future_exceptions(tasks)


Expand Down Expand Up @@ -120,20 +148,16 @@ def guess_content_type_and_encoding(path):
return content_type, encoding


# retry_create_artifact {{{1
async def retry_create_artifact(*args, **kwargs):
"""Retry create_artifact() calls.

Args:
*args: the args to pass on to create_artifact
**kwargs: the args to pass on to create_artifact

"""
await retry_async(create_artifact, retry_exceptions=(ScriptWorkerRetryException, aiohttp.ClientError, asyncio.TimeoutError), args=args, kwargs=kwargs)


# create_artifact {{{1
async def create_artifact(context, path, target_path, content_type, content_encoding, storage_type="s3", expires=None):
async def create_s3_artifact(
context,
path,
target_path,
content_type,
content_encoding,
storage_type="s3",
expires=None,
):
"""Create an artifact and upload it.

This should support s3 and azure out of the box; we'll need some tweaking
Expand All @@ -147,16 +171,18 @@ async def create_artifact(context, path, target_path, content_type, content_enco
scriptworker.artifacts.guess_content_type_and_encoding()
content_encoding (str): Encoding (per mimetypes' library) of the artifact. None is for no encoding. Values can
be found via scriptworker.artifacts.guess_content_type_and_encoding()
storage_type (str, optional): the taskcluster storage type to use.
Defaults to 's3'
expires (str, optional): datestring of when the artifact expires.
Defaults to None.

Raises:
ScriptWorkerRetryException: on failure.

"""
payload = {"storageType": storage_type, "expires": expires or get_expiration_arrow(context).isoformat(), "contentType": content_type}
payload = {
"storageType": "s3",
"expires": expires or get_expiration_arrow(context).isoformat(),
"contentType": content_type,
}
args = [get_task_id(context.claim_task), get_run_id(context.claim_task), target_path, payload]

tc_response = await context.temp_queue.createArtifact(*args)
Expand Down Expand Up @@ -189,6 +215,70 @@ def _craft_artifact_put_headers(content_type, encoding=None):
return headers


async def create_reference_artifact(
context, url, target_path, content_type, expires=None
):
"""Create an artifact and upload it.

This should support s3 and azure out of the box; we'll need some tweaking
if we want to support redirect/error artifacts.

Args:
context (scriptworker.context.Context): the scriptworker context.
url (str): the url to redirect to
target_path (str):
content_type (str): Content type (MIME type) of the artifact.
expires (str, optional): datestring of when the artifact expires.
Defaults to None.

Raises:
ScriptWorkerRetryException: on failure.

"""
payload = {
"storageType": "reference",
"expires": expires or get_expiration_arrow(context).isoformat(),
"contentType": content_type,
"url": url,
}
args = [
get_task_id(context.claim_task),
get_run_id(context.claim_task),
target_path,
payload,
]

await context.temp_queue.createArtifact(*args)


# retry_create_artifact {{{1
ARTIFACT_HANDLERS = {
"s3": create_s3_artifact,
"reference": create_reference_artifact,
}


async def retry_create_artifact(context, storage_type, *args, **kwargs):
"""Retry create_artifact() calls.

Args:
*args: the args to pass on to create_artifact
**kwargs: the args to pass on to create_artifact

"""
handler = ARTIFACT_HANDLERS[storage_type]
await retry_async(
handler,
retry_exceptions=(
ScriptWorkerRetryException,
aiohttp.ClientError,
asyncio.TimeoutError,
),
args=(context,) + args,
kwargs=kwargs,
)


# get_artifact_url {{{1
def get_artifact_url(context, task_id, path):
"""Get a TaskCluster artifact url.
Expand Down Expand Up @@ -234,7 +324,14 @@ def get_expiration_arrow(context):


# download_artifacts {{{1
async def download_artifacts(context, file_urls, parent_dir=None, session=None, download_func=download_file, valid_artifact_task_ids=None):
async def download_artifacts(
context,
file_urls,
parent_dir=None,
session=None,
download_func=download_file,
valid_artifact_task_ids=None,
):
"""Download artifacts in parallel after validating their URLs.

Valid ``taskId``s for download include the task's dependencies and the
Expand Down
2 changes: 1 addition & 1 deletion src/scriptworker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def check_config(config, path):
else:
value_type = type(value)
if isinstance(DEFAULT_CONFIG[key], Mapping) and "by-cot-product" in DEFAULT_CONFIG[key]:
default_type = type(DEFAULT_CONFIG[key]["by-cot-product"][config["cot_product"]])
continue
else:
default_type = type(DEFAULT_CONFIG[key])
if value_type is not default_type:
Expand Down
6 changes: 6 additions & 0 deletions src/scriptworker/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ def task_id(self):
if self.claim_task:
return self.claim_task["status"]["taskId"]

@property
def run_id(self):
"""string: The running task's taskId."""
if self.claim_task:
return self.claim_task["runId"]

def create_queue(self, credentials):
"""Create a taskcluster queue.

Expand Down
12 changes: 6 additions & 6 deletions tests/test_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from scriptworker.artifacts import (
_craft_artifact_put_headers,
compress_artifact_if_supported,
create_artifact,
create_s3_artifact,
download_artifacts,
get_and_check_single_upstream_artifact_full_path,
get_artifact_url,
Expand Down Expand Up @@ -74,10 +74,10 @@ def test_expiration_arrow(context):
async def test_upload_artifacts(context):
create_artifact_paths = []

async def foo(_, path, **kwargs):
async def foo(_, storage_type, path, **kwargs):
create_artifact_paths.append(path)

with mock.patch("scriptworker.artifacts.create_artifact", new=foo):
with mock.patch("scriptworker.artifacts.retry_create_artifact", new=foo):
await upload_artifacts(context, ["one", "public/two"])

assert create_artifact_paths == [os.path.join(context.config["artifact_dir"], "one"), os.path.join(context.config["artifact_dir"], "public/two")]
Expand All @@ -93,7 +93,7 @@ async def mock_create_artifact(*args, **kwargs):
raise exc("foo")
return 0

mocker.patch("scriptworker.artifacts.create_artifact", new=mock_create_artifact)
mocker.patch("scriptworker.artifacts.retry_create_artifact", new=mock_create_artifact)
with pytest.raises(ArithmeticError):
await upload_artifacts(context, ["one", "public/two"])

Expand Down Expand Up @@ -139,7 +139,7 @@ async def test_create_artifact(context, fake_session, successful_queue):
context.session = fake_session
expires = arrow.utcnow().isoformat()
context.temp_queue = successful_queue
await create_artifact(context, path, "public/env/one.txt", content_type="text/plain", content_encoding=None, expires=expires)
await create_s3_artifact(context, path, "public/env/one.txt", content_type="text/plain", content_encoding=None, expires=expires)
assert successful_queue.info == [
"createArtifact",
("taskId", "runId", "public/env/one.txt", {"storageType": "s3", "expires": expires, "contentType": "text/plain"}),
Expand All @@ -158,7 +158,7 @@ async def test_create_artifact_retry(context, fake_session_500, successful_queue
expires = arrow.utcnow().isoformat()
with pytest.raises(ScriptWorkerRetryException):
context.temp_queue = successful_queue
await create_artifact(context, path, "public/env/one.log", content_type="text/plain", content_encoding=None, expires=expires)
await create_s3_artifact(context, path, "public/env/one.log", content_type="text/plain", content_encoding=None, expires=expires)


def test_craft_artifact_put_headers():
Expand Down