Skip to content
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .mailmap
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ Joe Marsh Rossney <17361029+jmarshrossney@users.noreply.github.com>
Joe Marsh Rossney <17361029+jmarshrossney@users.noreply.github.com> <17361029+marshrossney@users.noreply.github.com>
Joseph Abram <joseph.abram@metoffice.gov.uk> J-J-Abram <joseph.abram@metoffice.gov.uk>
Joseph Abram <joseph.abram@metoffice.gov.uk> J-J-Abram <98320699+J-J-Abram@users.noreply.github.com>
David Rundle <david.rundle@metoffice.gov.uk> david-rundle <david.rundle@metoffice.gov.uk>
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ below:
- Dimitrios Theodorakis (Met Office, UK)
- Joseph Abram (Met Office, UK)
- James Frost (Met Office, UK)
- David Rundle (Met Office, UK)
<!-- end-shortlog -->

(All contributors are identifiable with email addresses in the version control
Expand Down
15 changes: 14 additions & 1 deletion metomi/rose/apps/rose_arch.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,18 @@ def _run_target_setup(
)
)
target.status = target.ST_BAD

target.compress_cores = self._get_conf(config, t_node,
"compress-cores", default="1")
if not target.compress_cores.isdigit() or \
int(target.compress_cores) < 0:
raise ConfigValueError(
[t_key, "compress-cores"],
target.compress_cores,
ValueError("compress-cores must be a 0 (automatic) or \
a positive integer")
)

rename_format = self._get_conf(config, t_node, "rename-format")
if rename_format:
rename_parser_str = self._get_conf(config, t_node, "rename-parser")
Expand Down Expand Up @@ -398,7 +410,8 @@ def _run_target_update(cls, dao, app_runner, compress_manager, target):
# Compress sources
if target.compress_scheme:
handler = compress_manager.get_handler(target.compress_scheme)
handler.compress_sources(target, work_dir)
compress_args = {"cores": target.compress_cores}
handler.compress_sources(target, work_dir, **compress_args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried a very simple check

mode=rose_arch

[arch]
# rose-app.conf
command-format=cp %(sources)s %(target)s
target-prefix=/home/users/tim.pillinger/cylc-src/rose-apps/arch/archive/
source-prefix=/home/users/tim.pillinger/cylc-src/rose-apps/arch/source/

[arch:world.out]
source='world.out'

[arch:gunzipme.gz]
source='gunzipme.out'

[arch:targunzipme.tar.gz]
source='targunzipme.out'
export CYLC_WORKFLOW_ID='hippo'
export CYLC_TASK_ID='task-run'
export CYLC_TASK_NAME='task-run'
export CYLC_TASK_CYCLE_POINT='task-run'
export CYLC_TASK_LOG_ROOT="${HERE}/log"

echo Running app from "${HERE}/app"
rose task-run --config="${HERE}/app"

and got an error:

[FAIL] RoseArchGzip.compress_sources() got an unexpected keyword argument 'threads'

I think that you need to add the threads argument to rose_arch_gzip.py and possibly other items in that folder. You may want to consider emitting a warning if threads != 1 and program_is_single_thread:

times[1] = time() # transformed time
# Run archive command
sources = []
Expand Down
32 changes: 29 additions & 3 deletions metomi/rose/apps/rose_arch_compressions/rose_arch_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@ class RoseArchTarGzip:

"""Compress archive sources in tar."""

SCHEMES = ["pax", "pax.gz", "tar", "tar.gz", "tgz"]
SCHEME_FORMATS = {"pax": tarfile.PAX_FORMAT, "pax.gz": tarfile.PAX_FORMAT}
SCHEMES = ["pax", "pax.gz", "pax.zst", "pax.xz",
"tar", "tar.gz", "tgz", "tar.zst", "tar.xz", "txz"]
SCHEME_FORMATS = {"pax": tarfile.PAX_FORMAT,
"pax.gz": tarfile.PAX_FORMAT,
"pax.zst": tarfile.PAX_FORMAT,
"pax.xz": tarfile.PAX_FORMAT}
GZIP_EXTS = ["pax.gz", "tar.gz", "tgz"]
ZSTD_EXTS = ["pax.zst", "tar.zst"]
XZ_EXTS = ["pax.xz", "tar.xz", "txz"]

def __init__(self, app_runner, *args, **kwargs):
self.app_runner = app_runner

def compress_sources(self, target, work_dir):
def compress_sources(self, target, work_dir, cores="1"):
"""Create a tar archive of all files in target.

Use work_dir to dump results.
Expand Down Expand Up @@ -70,3 +76,23 @@ def compress_sources(self, target, work_dir):
command = "gzip -c '%s' >'%s'" % (tar_name, gz_name)
self.app_runner.popen.run_simple(command, shell=True)
self.app_runner.fs_util.delete(tar_name)

if target.compress_scheme in self.ZSTD_EXTS:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if target.compress_scheme in self.ZSTD_EXTS:
elif target.compress_scheme in self.ZSTD_EXTS:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed thanks!

fdsec, zst_name = mkstemp(
suffix="." + target.compress_scheme, dir=work_dir
)
os.close(fdsec)
target.work_source_path = zst_name
command = f"zstd --rm -T{cores} -c '{tar_name}' >'{zst_name}'"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the high likelyhood of this command being run directly on Cylc servers, we will have to be careful with this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @oliver-sanders . I agree and I hope that the default being to not use this is a good trade-off between exposing useful functionality and maintaining order!

I have updated the documentation to note that this should be used with caution on shared resources.

self.app_runner.popen.run_simple(command, shell=True)
self.app_runner.fs_util.delete(tar_name)

if target.compress_scheme in self.XZ_EXTS:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if target.compress_scheme in self.XZ_EXTS:
elif target.compress_scheme in self.XZ_EXTS:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed thanks!

fdsec, xz_name = mkstemp(
suffix="." + target.compress_scheme, dir=work_dir
)
os.close(fdsec)
target.work_source_path = xz_name
command = "xz -c '%s' >'%s'" % (tar_name, xz_name)
self.app_runner.popen.run_simple(command, shell=True)
self.app_runner.fs_util.delete(tar_name)
49 changes: 49 additions & 0 deletions metomi/rose/apps/rose_arch_compressions/rose_arch_xz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (C) British Crown (Met Office) & Contributors.
# This file is part of Rose, a framework for meteorological suites.
#
# Rose is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Rose is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Rose. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Compress archive sources using xz."""


import os


class RoseArchXz:

"""Compress archive sources in xz."""

SCHEMES = ["xz"]

def __init__(self, app_runner, *args, **kwargs):
self.app_runner = app_runner

def compress_sources(self, target, work_dir):
"""xz each source in target.

Use work_dir to dump results.

"""
for source in target.sources.values():
if source.path.endswith("." + target.compress_scheme):
continue # assume already done
name_xz = source.name + "." + target.compress_scheme
work_path_xz = os.path.join(work_dir, name_xz)
self.app_runner.fs_util.makedirs(
self.app_runner.fs_util.dirname(work_path_xz)
)

command = "xz -c '%s' >'%s'" % (source.path, work_path_xz)
self.app_runner.popen.run_simple(command, shell=True)
source.path = work_path_xz
48 changes: 48 additions & 0 deletions metomi/rose/apps/rose_arch_compressions/rose_arch_zstd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (C) British Crown (Met Office) & Contributors.
# This file is part of Rose, a framework for meteorological suites.
#
# Rose is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Rose is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Rose. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Compress archive sources using zstd."""


import os


class RoseArchZstd:

"""Compress archive sources in zstd."""

SCHEMES = ["zst", "zstd"]

def __init__(self, app_runner, *args, **kwargs):
self.app_runner = app_runner

def compress_sources(self, target, work_dir, cores="1"):
"""zstd each source in target.

Use work_dir to dump results.

"""
for source in target.sources.values():
if source.path.endswith("." + target.compress_scheme):
continue # assume already done
name_zst = source.name + "." + target.compress_scheme
work_path_zst = os.path.join(work_dir, name_zst)
self.app_runner.fs_util.makedirs(
self.app_runner.fs_util.dirname(work_path_zst)
)
command = f"zstd --rm -T{cores} -c {source.path} > {work_path_zst}"
self.app_runner.popen.run_simple(command, shell=True)
source.path = work_path_zst
84 changes: 63 additions & 21 deletions sphinx/api/built-in/rose_arch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The application provides some useful functionalities:
source files and the return code of archive command. In a retry, it
would only redo targets that did not succeed in the previous attempts.
* Rename source files.
* Tar-Gzip or Gzip source files before sending them to the archive.
* Tar and/or compress source files before sending them to the archive.


Invocation
Expand Down Expand Up @@ -124,8 +124,8 @@ on:
[arch:(black-box/)]
source=cats.txt dogs.txt

Zipping files
^^^^^^^^^^^^^
Compressing files
^^^^^^^^^^^^^^^^^
There are multiple ways of specifying that you want your archive to be
compressed:

Expand All @@ -147,9 +147,9 @@ not recognized by rose arch as an extension to be compressed.)

For more details see :rose:conf:`rose_arch[arch]compress`

Zipping directories
^^^^^^^^^^^^^^^^^^^
You can tar and zip entire directories - as with single files Rose Arch will
Compressing directories
^^^^^^^^^^^^^^^^^^^^^^^
You can tar and compress entire directories - as with single files Rose Arch will
attempt to infer archive and compression from ``[arch:TARGET.extension]`` if it
can:

Expand Down Expand Up @@ -206,6 +206,22 @@ with names in the form ``data_001.txt``:
rename-parser=^//some//path//data_(?P<serial_number>[0-9]{3})(?P<name_tail>.*)$
rename-format=hello/%(cycle)s-%(name_head)s%(name_tail)s

Using Multiple Cores for Compression (zstd only)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

When using `zstd`, the number of CPU cores to use for compression is controlled using the
`compress-cores` keyword. This is useful for large files
where multi-threaded compression can significantly improve throughput.

.. code-block:: rose

[arch:large-data.tar.zst]
compress=zst
compress-cores=8
source=large-data/*

In this example, the `zstd` will use 8 CPU cores.

Output
------

Expand Down Expand Up @@ -245,7 +261,6 @@ taken to run the archive command and the return code of the archive
command. For a source line, the third column contains the original name of
the source.


Configuration
-------------

Expand All @@ -262,7 +277,7 @@ Configuration
and ``%(target)s`` for substitution of the sources and the target
respectively.

.. rose:conf:: compress=pax|tar|pax.gz|tar.gz|tgz|gz
.. rose:conf:: compress=pax|tar|pax.gz|tar.gz|tgz|gz|pax.xz|tar.xz|txz|xz|pax.zst|tar.zst|zst

If specified, compress source files scheme before sending them to the
archive. If not set Rose Arch will attempt to set a compression scheme
Expand All @@ -272,19 +287,46 @@ Configuration

Each compression scheme works slightly differently:

+------------------+-----------------------------------------------+
|Compression Scheme|Behaviour |
+------------------+-----------------------------------------------+
|``pax`` or ``tar``|Sources will be placed in a TAR archive before |
| |being sent to the target. |
+------------------+-----------------------------------------------+
|``pax.gz``, |Sources will be placed in a TAR-GZIP file |
|``tar.gz`` or |before being sent to the target. |
|``tgz`` | |
+------------------+-----------------------------------------------+
|``gz`` |Each source file will be compressed by GZIP |
| |before being sent to the target. |
+------------------+-----------------------------------------------+
+-----------------------+-----------------------------------------------+
|Compression Scheme |Behaviour |
+-----------------------+-----------------------------------------------+
|``pax`` or ``tar`` |Sources will be placed in a TAR archive before |
| |being sent to the target. |
+-----------------------+-----------------------------------------------+
|``pax.{gz,xz,zst}`` or |Sources will be placed in a TAR archive and be |
|``tar.{gz,xz,zst}`` or |compressed using the corresponding compressor |
|or ``tgz`` or ``txz`` |before being sent to the target. |
+-----------------------+-----------------------------------------------+
|``gz`` |Each source file will be compressed by gzip |
| |before being sent to the target. |
+-----------------------+-----------------------------------------------+
|``xz`` |Each source file will be compressed by xz |
| |before being sent to the target. |
+-----------------------+-----------------------------------------------+
|``zstd`` |Each source file will be compressed by zstd |
| |before being sent to the target. |
+-----------------------+-----------------------------------------------+

.. rose:conf:: compress-cores=0|1|2|...

Specify the number of CPU cores to use for compression. This setting
is optional and defaults to `1` (single-threaded compression).

* `0`: Let the compression tool automatically determine the number of
cores to use.
* A positive integer: Specifies the exact number of cores to use for
compression.

This setting is currently only supported by `zstd`.

Example:

.. code-block:: rose

[arch:example.tar.zst]
compress=tar.zst
compress-cores=4
source=example/*

.. rose:conf:: rename-format

Expand Down
57 changes: 57 additions & 0 deletions t/rose-task-run/46-app-arch-zstd.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env bash
#-------------------------------------------------------------------------------
# Copyright (C) British Crown (Met Office) & Contributors.
#
# This file is part of Rose, a framework for meteorological suites.
#
# Rose is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Rose is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Rose. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Test "rose_arch" built-in application, archive with optional sources.
#-------------------------------------------------------------------------------
. "$(dirname "$0")/test_header"


#-------------------------------------------------------------------------------
tests 4
#-------------------------------------------------------------------------------
# Run the suite, and wait for it to complete
export CYLC_CONF_PATH=
export ROSE_CONF_PATH=

get_reg
run_pass "${TEST_KEY_BASE}-install" \
cylc install \
"${TEST_SOURCE_DIR}/${TEST_KEY_BASE}" \
--workflow-name="${FLOW}" \
--no-run-name
run_pass "${TEST_KEY_BASE}-play" \
cylc play \
"${FLOW}" \
--abort-if-any-task-fails \
--host=localhost \
--no-detach \
--debug
#-------------------------------------------------------------------------------
TEST_KEY="${TEST_KEY_BASE}-job.status"
file_grep "${TEST_KEY}-archive-01" \
'CYLC_JOB_EXIT=SUCCEEDED' \
"${FLOW_RUN_DIR}/log/job/1/archive/01/job.status"
TEST_KEY="${TEST_KEY_BASE}-find"
(cd "${FLOW_RUN_DIR}/share/backup" && find . -type f) | sort >"${TEST_KEY}.out"
file_cmp "${TEST_KEY}.out" "${TEST_KEY}.out" <<'__FIND__'
./archive.d/2016.txt.zst
./archive.d/whatever.tar.zst
__FIND__

exit 0
16 changes: 16 additions & 0 deletions t/rose-task-run/46-app-arch-zstd/app/archive/rose-app.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
mode=rose_arch

[file:$ROSE_SUITE_DIR/share/backup/archive.d/]
mode=mkdir

[arch]
command-format=cp -pr %(sources)s %(target)s
target-prefix=$ROSE_SUITE_DIR/share/backup/

[arch:archive.d/2016.txt.zst]
source-prefix=work/1/$ROSE_TASK_NAME/
source=2016.txt.zst

[arch:archive.d/whatever.tar.zst]
source-prefix=work/1/$ROSE_TASK_NAME/
source=whatever.tar.zst
Loading
Loading