diff --git a/src/scriptworker/artifacts.py b/src/scriptworker/artifacts.py index dcbd1499..7b60c634 100644 --- a/src/scriptworker/artifacts.py +++ b/src/scriptworker/artifacts.py @@ -61,10 +61,38 @@ def to_upload_future(target_path): path = os.path.join(context.config["artifact_dir"], target_path) content_type, content_encoding = compress_artifact_if_supported(path) return asyncio.ensure_future( - retry_create_artifact(context, path, target_path=target_path, content_type=content_type, content_encoding=content_encoding) + retry_create_artifact( + context, + "s3", + path, + target_path=target_path, + content_type=content_type, + content_encoding=content_encoding, + ) ) tasks = list(map(to_upload_future, files)) + + log_artifact = None + for artifact in ["public/logs/live_backing.log", "public/logs/chain_of_trust.json"]: + if artifact in files: + log_artifact = artifact + break + + if log_artifact: + tasks.append( + asyncio.ensure_future( + retry_create_artifact( + context, + "reference", + url=context.queue.buildUrl( + "getArtifact", context.task_id, context.run_id, log_artifact, + ), + target_path="public/logs/live.log", + content_type="text/plain; charset=utf-8", + ) + ) + ) await raise_future_exceptions(tasks) @@ -120,20 +148,16 @@ def guess_content_type_and_encoding(path): return content_type, encoding -# retry_create_artifact {{{1 -async def retry_create_artifact(*args, **kwargs): - """Retry create_artifact() calls. - - Args: - *args: the args to pass on to create_artifact - **kwargs: the args to pass on to create_artifact - - """ - await retry_async(create_artifact, retry_exceptions=(ScriptWorkerRetryException, aiohttp.ClientError, asyncio.TimeoutError), args=args, kwargs=kwargs) - - # create_artifact {{{1 -async def create_artifact(context, path, target_path, content_type, content_encoding, storage_type="s3", expires=None): +async def create_s3_artifact( + context, + path, + target_path, + content_type, + content_encoding, + storage_type="s3", + expires=None, +): """Create an artifact and upload it. This should support s3 and azure out of the box; we'll need some tweaking @@ -147,8 +171,6 @@ async def create_artifact(context, path, target_path, content_type, content_enco scriptworker.artifacts.guess_content_type_and_encoding() content_encoding (str): Encoding (per mimetypes' library) of the artifact. None is for no encoding. Values can be found via scriptworker.artifacts.guess_content_type_and_encoding() - storage_type (str, optional): the taskcluster storage type to use. - Defaults to 's3' expires (str, optional): datestring of when the artifact expires. Defaults to None. @@ -156,7 +178,11 @@ async def create_artifact(context, path, target_path, content_type, content_enco ScriptWorkerRetryException: on failure. """ - payload = {"storageType": storage_type, "expires": expires or get_expiration_arrow(context).isoformat(), "contentType": content_type} + payload = { + "storageType": "s3", + "expires": expires or get_expiration_arrow(context).isoformat(), + "contentType": content_type, + } args = [get_task_id(context.claim_task), get_run_id(context.claim_task), target_path, payload] tc_response = await context.temp_queue.createArtifact(*args) @@ -189,6 +215,70 @@ def _craft_artifact_put_headers(content_type, encoding=None): return headers +async def create_reference_artifact( + context, url, target_path, content_type, expires=None +): + """Create an artifact and upload it. + + This should support s3 and azure out of the box; we'll need some tweaking + if we want to support redirect/error artifacts. + + Args: + context (scriptworker.context.Context): the scriptworker context. + url (str): the url to redirect to + target_path (str): + content_type (str): Content type (MIME type) of the artifact. + expires (str, optional): datestring of when the artifact expires. + Defaults to None. + + Raises: + ScriptWorkerRetryException: on failure. + + """ + payload = { + "storageType": "reference", + "expires": expires or get_expiration_arrow(context).isoformat(), + "contentType": content_type, + "url": url, + } + args = [ + get_task_id(context.claim_task), + get_run_id(context.claim_task), + target_path, + payload, + ] + + await context.temp_queue.createArtifact(*args) + + +# retry_create_artifact {{{1 +ARTIFACT_HANDLERS = { + "s3": create_s3_artifact, + "reference": create_reference_artifact, +} + + +async def retry_create_artifact(context, storage_type, *args, **kwargs): + """Retry create_artifact() calls. + + Args: + *args: the args to pass on to create_artifact + **kwargs: the args to pass on to create_artifact + + """ + handler = ARTIFACT_HANDLERS[storage_type] + await retry_async( + handler, + retry_exceptions=( + ScriptWorkerRetryException, + aiohttp.ClientError, + asyncio.TimeoutError, + ), + args=(context,) + args, + kwargs=kwargs, + ) + + # get_artifact_url {{{1 def get_artifact_url(context, task_id, path): """Get a TaskCluster artifact url. @@ -234,7 +324,14 @@ def get_expiration_arrow(context): # download_artifacts {{{1 -async def download_artifacts(context, file_urls, parent_dir=None, session=None, download_func=download_file, valid_artifact_task_ids=None): +async def download_artifacts( + context, + file_urls, + parent_dir=None, + session=None, + download_func=download_file, + valid_artifact_task_ids=None, +): """Download artifacts in parallel after validating their URLs. Valid ``taskId``s for download include the task's dependencies and the diff --git a/src/scriptworker/config.py b/src/scriptworker/config.py index c477476f..9811500f 100644 --- a/src/scriptworker/config.py +++ b/src/scriptworker/config.py @@ -128,7 +128,7 @@ def check_config(config, path): else: value_type = type(value) if isinstance(DEFAULT_CONFIG[key], Mapping) and "by-cot-product" in DEFAULT_CONFIG[key]: - default_type = type(DEFAULT_CONFIG[key]["by-cot-product"][config["cot_product"]]) + continue else: default_type = type(DEFAULT_CONFIG[key]) if value_type is not default_type: diff --git a/src/scriptworker/context.py b/src/scriptworker/context.py index 9c2dda7c..2f67b494 100644 --- a/src/scriptworker/context.py +++ b/src/scriptworker/context.py @@ -124,6 +124,12 @@ def task_id(self): if self.claim_task: return self.claim_task["status"]["taskId"] + @property + def run_id(self): + """string: The running task's taskId.""" + if self.claim_task: + return self.claim_task["runId"] + def create_queue(self, credentials): """Create a taskcluster queue. diff --git a/tests/test_artifacts.py b/tests/test_artifacts.py index 6a99f891..8a948617 100644 --- a/tests/test_artifacts.py +++ b/tests/test_artifacts.py @@ -12,7 +12,7 @@ from scriptworker.artifacts import ( _craft_artifact_put_headers, compress_artifact_if_supported, - create_artifact, + create_s3_artifact, download_artifacts, get_and_check_single_upstream_artifact_full_path, get_artifact_url, @@ -74,10 +74,10 @@ def test_expiration_arrow(context): async def test_upload_artifacts(context): create_artifact_paths = [] - async def foo(_, path, **kwargs): + async def foo(_, storage_type, path, **kwargs): create_artifact_paths.append(path) - with mock.patch("scriptworker.artifacts.create_artifact", new=foo): + with mock.patch("scriptworker.artifacts.retry_create_artifact", new=foo): await upload_artifacts(context, ["one", "public/two"]) assert create_artifact_paths == [os.path.join(context.config["artifact_dir"], "one"), os.path.join(context.config["artifact_dir"], "public/two")] @@ -93,7 +93,7 @@ async def mock_create_artifact(*args, **kwargs): raise exc("foo") return 0 - mocker.patch("scriptworker.artifacts.create_artifact", new=mock_create_artifact) + mocker.patch("scriptworker.artifacts.retry_create_artifact", new=mock_create_artifact) with pytest.raises(ArithmeticError): await upload_artifacts(context, ["one", "public/two"]) @@ -139,7 +139,7 @@ async def test_create_artifact(context, fake_session, successful_queue): context.session = fake_session expires = arrow.utcnow().isoformat() context.temp_queue = successful_queue - await create_artifact(context, path, "public/env/one.txt", content_type="text/plain", content_encoding=None, expires=expires) + await create_s3_artifact(context, path, "public/env/one.txt", content_type="text/plain", content_encoding=None, expires=expires) assert successful_queue.info == [ "createArtifact", ("taskId", "runId", "public/env/one.txt", {"storageType": "s3", "expires": expires, "contentType": "text/plain"}), @@ -158,7 +158,7 @@ async def test_create_artifact_retry(context, fake_session_500, successful_queue expires = arrow.utcnow().isoformat() with pytest.raises(ScriptWorkerRetryException): context.temp_queue = successful_queue - await create_artifact(context, path, "public/env/one.log", content_type="text/plain", content_encoding=None, expires=expires) + await create_s3_artifact(context, path, "public/env/one.log", content_type="text/plain", content_encoding=None, expires=expires) def test_craft_artifact_put_headers():