diff --git a/.github/workflows/black.yml b/.github/workflows/black.yml deleted file mode 100644 index 0cc9761..0000000 --- a/.github/workflows/black.yml +++ /dev/null @@ -1,11 +0,0 @@ -name: Lint-black - -on: [push, pull_request] - -jobs: - lint: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 - - uses: psf/black@stable diff --git a/.github/workflows/flake8.yml b/.github/workflows/flake8.yml deleted file mode 100644 index cc9b41e..0000000 --- a/.github/workflows/flake8.yml +++ /dev/null @@ -1,20 +0,0 @@ -name: Lint-flake8 - -on: [push, pull_request] - -jobs: - flake8_py3: - runs-on: ubuntu-latest - steps: - - name: Setup Python - uses: actions/setup-python@v1 - - name: Checkout PyTorch - uses: actions/checkout@master - - name: Install flake8 - run: pip install flake8 - - name: Run flake8 - uses: suo/flake8-github-action@releases/v1 - with: - checkName: 'flake8_py3' # NOTE: this needs to be the same as the job name - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/pre-commit.yaml b/.github/workflows/pre-commit.yaml new file mode 100644 index 0000000..313a2c7 --- /dev/null +++ b/.github/workflows/pre-commit.yaml @@ -0,0 +1,11 @@ +name: pre-commit + +on: [push, pull_request] + +jobs: + pre-commit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v3 + - uses: pre-commit/action@v3.0.0 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c448f88..775401a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,18 +2,19 @@ # See https://pre-commit.com/hooks.html for more hooks repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.3.0 + rev: v4.4.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer - id: check-yaml - id: check-added-large-files +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.0.278 + hooks: + - id: ruff + args: [ --fix, --exit-non-zero-on-fix ] - repo: https://github.com/psf/black - rev: 22.10.0 + rev: 23.7.0 hooks: - id: black language_version: python3 -- repo: https://gitlab.com/pycqa/flake8 - rev: '3.9.2' # pick a git hash / tag to point to - hooks: - - id: flake8 diff --git a/.travis.yml b/.travis.yml index 290dd18..5f434a3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -40,4 +40,3 @@ deploy: repo: OpenDataCubePipelines/tesp api-key: secure: DmJq8xtY+ANHl9GOmGjt+t7qBhTLIigHjEBUHdy8xXuNvxMRB1gRmPLYIeMkCnTIyY4YB7FMsYNo6G+9Dbn3NElQ4wkCUgjIPipZDPo9JBEUo5V2gqPgOK9Jjivm0keTxtjHMCLF+pLL591ww8vRgfHDCTQ6klJ0FJ41dEZpAMEut+ZS9gLXnKchkUvsVUc7q4uIcQZoZIh9phSPpp7fV+HKIUDUvL0UG9FXeFE/CsbU5qEUfK7VLZMNQyogEKkkDYXe+gNGJb0c3LILVgteNyNknN9GZ0cBaNl7CNYTwL7bWPegfbhZbbv23z5MoN/LXbnj8D7N8ALioODn354fMzw5MrETbL3vGEd64BWXLvhR/jHg+1LJWqV/2+ZSISEnr9Hca/JujyXYO34OyOelr4VvRaURpUL3r3UNwtfwAV9b6RbyuhSw013CZkGWQJxpWDkM5U8GC1rz64RJTRSa/yCn3aH4J6Z8sQtZqsIkONps13L5f7r4WMsRPFtgsJ/evdkgWylYXdvzl7RF/HtOuBlvAZC3/G86c2k6J9MZnCCs+adOo8kJMkEmUwN+EdJEICPeOwF53XJdYeF4jraAOkYwpKL3PGmXz9bFEQtmFIjLQ1avjv4kqMHpw8CRxSx1577BZwMbZ59umI+rDwtzId1F8c+6wLtHl16IWXjInrU= - diff --git a/README.md b/README.md index 4cff88b..f18e1b8 100644 --- a/README.md +++ b/README.md @@ -34,4 +34,4 @@ invoke it manually by running `pre-commit run` Module creation ---------------- -The tesp code is built into the ard-pipeline module. This module is used to generate ARD on NCI. Instructions and code to build this module are stored in the [dea-wagl-docker](https://bitbucket.org/geoscienceaustralia/dea-wagl-docker/src/master/ard-pipeline/) repository. \ No newline at end of file +The tesp code is built into the ard-pipeline module. This module is used to generate ARD on NCI. Instructions and code to build this module are stored in the [dea-wagl-docker](https://bitbucket.org/geoscienceaustralia/dea-wagl-docker/src/master/ard-pipeline/) repository. diff --git a/bin/ard_pbs b/bin/ard_pbs index 1c4c414..21dfd69 100755 --- a/bin/ard_pbs +++ b/bin/ard_pbs @@ -20,7 +20,7 @@ import math from wagl.tiling import scatter -PBS_RESOURCES = ("""#!/bin/bash +PBS_RESOURCES = """#!/bin/bash #PBS -P {project} #PBS -W umask=017 #PBS -q {queue} @@ -29,22 +29,22 @@ PBS_RESOURCES = ("""#!/bin/bash #PBS -l storage={filesystem_projects} #PBS -me {email} -""") +""" -NODE_TEMPLATE = ("""{pbs_resources} +NODE_TEMPLATE = """{pbs_resources} source {env} {daemon} luigi --module tesp.workflow ARDP --level1-list {scene_list} --workdir {outdir} --pkgdir {pkgdir} --yamls-dir="{yamls_dir}" --workers {workers} --parallel-scheduling -""") +""" -CLEAN_UP_TEMPLATE = (""" +CLEAN_UP_TEMPLATE = """ # clean up workdir since we don't need it afterwards, even if the processing has failed rm -rf {outdir} -""") +""" -SUMMARY_TEMPLATE = ("""{pbs_resources} +SUMMARY_TEMPLATE = """{pbs_resources} #PBS -W depend=afterany:{jobids} source {env} @@ -70,38 +70,39 @@ jq 'select(.status == "success" and .task == "Package") | {{level1, granule: .pa # compile a list of successfully packaged datasets (and their path) to pass over to indexing jq 'select(.event == "packaged dataset") | .dataset_path' batch-{batchid}-status-log.jsonl | jq -sr 'unique | .[]' > batch-{batchid}-datasets-to-index.txt -""") +""" -INDEXING_TEMPLATE = ("""{pbs_resources} +INDEXING_TEMPLATE = """{pbs_resources} #PBS -W depend=afterany:{jobid} source {env} # indexing cat batch-{batchid}-datasets-to-index.txt | parallel -j 47 -m -n 20 --line-buffer datacube dataset add --no-verify-lineage -""") +""" -ARCHIVING_TEMPLATE = ("""{pbs_resources} +ARCHIVING_TEMPLATE = """{pbs_resources} source {env} # archiving cat {archive_list} | parallel -j 47 -m -n 20 --line-buffer datacube dataset archive -""") +""" + +FMT1 = "batchid-{batchid}" +FMT2 = "jobid-{jobid}" +FMT3 = "level1-scenes-{jobid}.txt" +FMT4 = "jobid-{jobid}.bash" +FMT5 = "batch-{batchid}-summary.bash" +FMT6 = "batch-{batchid}-indexing.bash" +FMT7 = "scratch/{f_project}+gdata/{f_project}" +FMT8 = "archiving-{batchid}.bash" +DAEMON_FMT = "luigid --background --logdir {}" -FMT1 = 'batchid-{batchid}' -FMT2 = 'jobid-{jobid}' -FMT3 = 'level1-scenes-{jobid}.txt' -FMT4 = 'jobid-{jobid}.bash' -FMT5 = 'batch-{batchid}-summary.bash' -FMT6 = 'batch-{batchid}-indexing.bash' -FMT7 = 'scratch/{f_project}+gdata/{f_project}' -FMT8 = 'archiving-{batchid}.bash' -DAEMON_FMT = 'luigid --background --logdir {}' def _calc_nodes_req(granule_count, walltime, workers, hours_per_granule=1.5): - """ Provides estimation of the number of nodes required to process granule count + """Provides estimation of the number of nodes required to process granule count >>> _calc_nodes_req(400, '20:59', 28) 2 @@ -109,7 +110,7 @@ def _calc_nodes_req(granule_count, walltime, workers, hours_per_granule=1.5): 3 """ - hours, _, _ = [int(x) for x in walltime.split(':')] + hours, _, _ = (int(x) for x in walltime.split(":")) return int(math.ceil(float(hours_per_granule * granule_count) / (hours * workers))) @@ -173,7 +174,7 @@ def _filesystem_projects( fs_projects.update(_get_projects_for_path(Path(path))) # All input paths - with open(input_paths_file, 'r') as src: + with open(input_paths_file) as src: paths = [p.strip() for p in src.readlines()] for input_path in paths: @@ -185,8 +186,18 @@ def _filesystem_projects( # pylint: disable=too-many-arguments -def _submit_multiple(scattered, env, batch_logdir, batch_outdir, pkgdir, yamls_dir, - workers, pbs_resources, cleanup, test): +def _submit_multiple( + scattered, + env, + batch_logdir, + batch_outdir, + pkgdir, + yamls_dir, + workers, + pbs_resources, + cleanup, + test, +): """Submit multiple PBS formatted jobs.""" nci_job_ids = [] @@ -205,40 +216,48 @@ def _submit_multiple(scattered, env, batch_logdir, batch_outdir, pkgdir, yamls_d # write level1 data listing out_fname = pjoin(jobdir, FMT3.format(jobid=jobid)) - with open(out_fname, 'w') as src: + with open(out_fname, "w") as src: src.writelines(block) - pbs = NODE_TEMPLATE.format(pbs_resources=pbs_resources, env=env, - daemon=DAEMON_FMT.format(jobdir), - scene_list=out_fname, outdir=job_outdir, - pkgdir=pkgdir, yamls_dir=yamls_dir, workers=workers) + pbs = NODE_TEMPLATE.format( + pbs_resources=pbs_resources, + env=env, + daemon=DAEMON_FMT.format(jobdir), + scene_list=out_fname, + outdir=job_outdir, + pkgdir=pkgdir, + yamls_dir=yamls_dir, + workers=workers, + ) if cleanup: pbs += CLEAN_UP_TEMPLATE.format(outdir=job_outdir) # write pbs script out_fname = pjoin(jobdir, FMT4.format(jobid=jobid)) - with open(out_fname, 'w') as src: + with open(out_fname, "w") as src: src.write(pbs) if test: - click.echo("Mocking... Submitting Job: {} ...Mocking".format(jobid)) - click.echo("qsub {}".format(out_fname)) + click.echo(f"Mocking... Submitting Job: {jobid} ...Mocking") + click.echo(f"qsub {out_fname}") continue os.chdir(dirname(out_fname)) - click.echo("Submitting Job: {}".format(jobid)) + click.echo(f"Submitting Job: {jobid}") try: - raw_output = subprocess.check_output(['qsub', out_fname]) + raw_output = subprocess.check_output(["qsub", out_fname]) except subprocess.CalledProcessError as exc: - logging.error('qsub failed with exit code %s', str(exc.returncode)) + logging.error("qsub failed with exit code %s", str(exc.returncode)) logging.error(exc.output) raise - if hasattr(raw_output, 'decode'): - matches = re.match(r'^(?P\d+\.gadi-pbs)$', raw_output.decode('utf-8')) + if hasattr(raw_output, "decode"): + matches = re.match( + r"^(?P\d+\.gadi-pbs)$", raw_output.decode("utf-8") + ) if matches: - nci_job_ids.append(matches.groupdict()['nci_job_id']) + nci_job_ids.append(matches.groupdict()["nci_job_id"]) # return a list of the nci job ids return nci_job_ids @@ -246,33 +265,38 @@ def _submit_multiple(scattered, env, batch_logdir, batch_outdir, pkgdir, yamls_d def _submit_summary(indir, outdir, batch_id, pbs_resources, env, job_ids, test): """Summarise the jobs submitted within the batchjob.""" - jobids = ":".join([j.split('.')[0] for j in job_ids]) - pbs = SUMMARY_TEMPLATE.format(pbs_resources=pbs_resources, env=env, - indir=indir, outdir=outdir, jobids=jobids, - batchid=batch_id) + jobids = ":".join([j.split(".")[0] for j in job_ids]) + pbs = SUMMARY_TEMPLATE.format( + pbs_resources=pbs_resources, + env=env, + indir=indir, + outdir=outdir, + jobids=jobids, + batchid=batch_id, + ) out_fname = pjoin(indir, FMT5.format(batchid=batch_id)) - with open(out_fname, 'w') as src: + with open(out_fname, "w") as src: src.write(pbs) if test: - click.echo("Mocking... Submitting Summary Job for batch: {} ...Mocking".format(batch_id)) - click.echo("qsub {}".format(out_fname)) + click.echo(f"Mocking... Submitting Summary Job for batch: {batch_id} ...Mocking") + click.echo(f"qsub {out_fname}") return os.chdir(dirname(out_fname)) - click.echo("Submitting Summary Job for batch: {}".format(batch_id)) + click.echo(f"Submitting Summary Job for batch: {batch_id}") try: - raw_output = subprocess.check_output(['qsub', out_fname]) + raw_output = subprocess.check_output(["qsub", out_fname]) except subprocess.CalledProcessError as exc: - logging.error('qsub failed with exit code %s', str(exc.returncode)) + logging.error("qsub failed with exit code %s", str(exc.returncode)) logging.error(exc.output) raise - if hasattr(raw_output, 'decode'): - matches = re.match(r'^(?P\d+\.gadi-pbs)$', raw_output.decode('utf-8')) + if hasattr(raw_output, "decode"): + matches = re.match(r"^(?P\d+\.gadi-pbs)$", raw_output.decode("utf-8")) if matches: - job_id = matches.groupdict()['nci_job_id'] + job_id = matches.groupdict()["nci_job_id"] return job_id @@ -280,114 +304,172 @@ def _submit_summary(indir, outdir, batch_id, pbs_resources, env, job_ids, test): def _submit_index(indir, outdir, batch_id, pbs_resources, env, job_id, test): """Submit a job that adds datasets to a datacube index.""" if job_id: - jobid = job_id.split('.')[0] + jobid = job_id.split(".")[0] else: - jobid = '' - pbs = INDEXING_TEMPLATE.format(pbs_resources=pbs_resources, env=env, - indir=indir, outdir=outdir, jobid=jobid, - batchid=batch_id) + jobid = "" + pbs = INDEXING_TEMPLATE.format( + pbs_resources=pbs_resources, + env=env, + indir=indir, + outdir=outdir, + jobid=jobid, + batchid=batch_id, + ) out_fname = pjoin(indir, FMT6.format(batchid=batch_id)) - with open(out_fname, 'w') as src: + with open(out_fname, "w") as src: src.write(pbs) if test: - click.echo("Mocking... Submitting Indexing Job for batch: {} ...Mocking".format(batch_id)) + click.echo(f"Mocking... Submitting Indexing Job for batch: {batch_id} ...Mocking") return os.chdir(dirname(out_fname)) - click.echo("Submitting Indexing Job for batch: {}".format(batch_id)) + click.echo(f"Submitting Indexing Job for batch: {batch_id}") try: - raw_output = subprocess.check_output(['qsub', out_fname]) + raw_output = subprocess.check_output(["qsub", out_fname]) except subprocess.CalledProcessError as exc: - logging.error('qsub failed with exit code %s', str(exc.returncode)) + logging.error("qsub failed with exit code %s", str(exc.returncode)) logging.error(exc.output) raise - if hasattr(raw_output, 'decode'): - matches = re.match(r'^(?P\d+\.gadi-pbs)$', raw_output.decode('utf-8')) + if hasattr(raw_output, "decode"): + matches = re.match(r"^(?P\d+\.gadi-pbs)$", raw_output.decode("utf-8")) if matches: - job_id = matches.groupdict()['nci_job_id'] + job_id = matches.groupdict()["nci_job_id"] return job_id def _submit_archive(indir, outdir, archive_list, batch_id, pbs_resources, env, test): """Submit a job that archives datasets given a file listing UUIDs.""" - pbs = ARCHIVING_TEMPLATE.format(pbs_resources=pbs_resources, env=env, - indir=indir, outdir=outdir, - archive_list=archive_list) + pbs = ARCHIVING_TEMPLATE.format( + pbs_resources=pbs_resources, + env=env, + indir=indir, + outdir=outdir, + archive_list=archive_list, + ) out_fname = pjoin(indir, FMT8.format(batchid=batch_id)) - with open(out_fname, 'w') as src: + with open(out_fname, "w") as src: src.write(pbs) if test: - click.echo("Mocking... Submitting Archiving Job for batch: {} ...Mocking".format(batch_id)) + click.echo( + f"Mocking... Submitting Archiving Job for batch: {batch_id} ...Mocking" + ) return os.chdir(dirname(out_fname)) - click.echo("Submitting Archiving Job for batch: {}".format(batch_id)) + click.echo(f"Submitting Archiving Job for batch: {batch_id}") try: - raw_output = subprocess.check_output(['qsub', out_fname]) + raw_output = subprocess.check_output(["qsub", out_fname]) except subprocess.CalledProcessError as exc: - logging.error('qsub failed with exit code %s', str(exc.returncode)) + logging.error("qsub failed with exit code %s", str(exc.returncode)) logging.error(exc.output) raise - if hasattr(raw_output, 'decode'): - matches = re.match(r'^(?P\d+\.gadi-pbs)$', raw_output.decode('utf-8')) + if hasattr(raw_output, "decode"): + matches = re.match(r"^(?P\d+\.gadi-pbs)$", raw_output.decode("utf-8")) if matches: - job_id = matches.groupdict()['nci_job_id'] + job_id = matches.groupdict()["nci_job_id"] return job_id @click.command() -@click.option("--level1-list", type=click.Path(exists=True, readable=True), - help="The input level1 scene list.") -@click.option("--workdir", type=click.Path(file_okay=False, writable=True), - help="The base output working directory.") -@click.option("--logdir", type=click.Path(file_okay=False, writable=True), - help="The base logging and scripts output directory.") -@click.option("--pkgdir", type=click.Path(file_okay=False, writable=True), - help="The base output packaged directory.") -@click.option("--yamls-dir", type=click.Path(file_okay=False), default="", - help="The base directory for level-1 dataset documents.") -@click.option("--env", type=click.Path(exists=True, readable=True), - help="Environment script to source.") -@click.option("--workers", type=click.IntRange(1, 48), default=30, - help="The number of workers to request per node.") +@click.option( + "--level1-list", + type=click.Path(exists=True, readable=True), + help="The input level1 scene list.", +) +@click.option( + "--workdir", + type=click.Path(file_okay=False, writable=True), + help="The base output working directory.", +) +@click.option( + "--logdir", + type=click.Path(file_okay=False, writable=True), + help="The base logging and scripts output directory.", +) +@click.option( + "--pkgdir", + type=click.Path(file_okay=False, writable=True), + help="The base output packaged directory.", +) +@click.option( + "--yamls-dir", + type=click.Path(file_okay=False), + default="", + help="The base directory for level-1 dataset documents.", +) +@click.option( + "--env", + type=click.Path(exists=True, readable=True), + help="Environment script to source.", +) +@click.option( + "--workers", + type=click.IntRange(1, 48), + default=30, + help="The number of workers to request per node.", +) @click.option("--nodes", default=0, help="The number of nodes to request.") -@click.option("--memory", default=192, - help="The memory in GB to request per node.") -@click.option("--jobfs", default=50, - help="The jobfs memory in GB to request per node.") +@click.option("--memory", default=192, help="The memory in GB to request per node.") +@click.option("--jobfs", default=50, help="The jobfs memory in GB to request per node.") @click.option("--project", required=True, help="Project code to run under.") -@click.option("--queue", default='normal', - help="Queue to submit the job into, eg normal, express.") -@click.option("--walltime", default="48:00:00", - help="Job walltime in `hh:mm:ss` format.") -@click.option("--email", default="", - help="Notification email address.") -@click.option("--index-datacube-env", type=click.Path(exists=True, readable=True), - help="Datacube specific environment script to source.") -@click.option("--archive-list", type=click.Path(exists=True, readable=True), - help="UUID's of the scenes to archive. This uses the environment specified in index-datacube-env.") -@click.option("--cleanup", default=False, is_flag=True, - help=("Clean-up work directory afterwards.")) - -@click.option("--test", default=False, is_flag=True, - help=("Test job execution (Don't submit the job to the " - "PBS queue).")) +@click.option( + "--queue", default="normal", help="Queue to submit the job into, eg normal, express." +) +@click.option("--walltime", default="48:00:00", help="Job walltime in `hh:mm:ss` format.") +@click.option("--email", default="", help="Notification email address.") +@click.option( + "--index-datacube-env", + type=click.Path(exists=True, readable=True), + help="Datacube specific environment script to source.", +) +@click.option( + "--archive-list", + type=click.Path(exists=True, readable=True), + help="UUID's of the scenes to archive. This uses the environment specified in index-datacube-env.", +) +@click.option( + "--cleanup", default=False, is_flag=True, help=("Clean-up work directory afterwards.") +) +@click.option( + "--test", + default=False, + is_flag=True, + help=("Test job execution (Don't submit the job to the " "PBS queue)."), +) # pylint: disable=too-many-arguments -def main(level1_list, workdir, logdir, pkgdir, yamls_dir, env, workers, nodes, memory, - jobfs, project, queue, walltime, email, index_datacube_env, archive_list, cleanup, test): +def main( + level1_list, + workdir, + logdir, + pkgdir, + yamls_dir, + env, + workers, + nodes, + memory, + jobfs, + project, + queue, + walltime, + email, + index_datacube_env, + archive_list, + cleanup, + test, +): """ Equally partition a list of scenes across n nodes and submit n jobs into the PBS queue for ARD processing. """ - with open(level1_list, 'r') as src: + with open(level1_list) as src: scenes = src.readlines() if nodes == 0: nodes = _calc_nodes_req(len(scenes), walltime, workers) @@ -405,68 +487,110 @@ def main(level1_list, workdir, logdir, pkgdir, yamls_dir, env, workers, nodes, m logdir, workdir, pkgdir, - yamls_dir + yamls_dir, ) - fsys_projects = '+'.join([FMT7.format(f_project=f) for f in fs_projects]) + fsys_projects = "+".join([FMT7.format(f_project=f) for f in fs_projects]) # optionally set pbs email string - pbs_resources = PBS_RESOURCES.format(project=project, queue=queue, - walltime=walltime, memory=memory, - ncpus=workers, jobfs=jobfs, - filesystem_projects=fsys_projects, - email=('#PBS -M ' + email) if email else "") + pbs_resources = PBS_RESOURCES.format( + project=project, + queue=queue, + walltime=walltime, + memory=memory, + ncpus=workers, + jobfs=jobfs, + filesystem_projects=fsys_projects, + email=("#PBS -M " + email) if email else "", + ) if test: - click.echo("Mocking... Submitting Batch: {} ...Mocking".format(batchid)) + click.echo(f"Mocking... Submitting Batch: {batchid} ...Mocking") else: - click.echo("Submitting Batch: {}".format(batchid)) + click.echo(f"Submitting Batch: {batchid}") - click.echo("Executing Batch: {}".format(batchid)) - nci_job_ids = _submit_multiple(scattered, env, batch_logdir, batch_outdir, - pkgdir, yamls_dir, workers, pbs_resources, cleanup, test) + click.echo(f"Executing Batch: {batchid}") + nci_job_ids = _submit_multiple( + scattered, + env, + batch_logdir, + batch_outdir, + pkgdir, + yamls_dir, + workers, + pbs_resources, + cleanup, + test, + ) # job resources for batch summary - pbs_resources = PBS_RESOURCES.format(project=project, queue='express', - walltime="00:10:00", memory=6, - ncpus=1, jobfs=2, - filesystem_projects=''.join(fsys_projects), - email=('#PBS -M ' + email) if email else "") - - job_id = _submit_summary(batch_logdir, batch_logdir, batchid, - pbs_resources, env, nci_job_ids, test) + pbs_resources = PBS_RESOURCES.format( + project=project, + queue="express", + walltime="00:10:00", + memory=6, + ncpus=1, + jobfs=2, + filesystem_projects="".join(fsys_projects), + email=("#PBS -M " + email) if email else "", + ) + + job_id = _submit_summary( + batch_logdir, batch_logdir, batchid, pbs_resources, env, nci_job_ids, test + ) nci_job_ids.append(job_id) if index_datacube_env: - pbs_resources = PBS_RESOURCES.format(project=project, queue='normal', - walltime="00:30:00", memory=192, - ncpus=48, jobfs=20, - filesystem_projects=''.join(fsys_projects), - email=('#PBS -M ' + email) if email else "") - index_job_id = _submit_index(batch_logdir, batch_logdir, batchid, - pbs_resources, index_datacube_env, job_id, test) + pbs_resources = PBS_RESOURCES.format( + project=project, + queue="normal", + walltime="00:30:00", + memory=192, + ncpus=48, + jobfs=20, + filesystem_projects="".join(fsys_projects), + email=("#PBS -M " + email) if email else "", + ) + index_job_id = _submit_index( + batch_logdir, + batch_logdir, + batchid, + pbs_resources, + index_datacube_env, + job_id, + test, + ) nci_job_ids.append(index_job_id) if archive_list: if index_datacube_env: - pbs_resources = PBS_RESOURCES.format(project=project, queue='normal', - walltime="00:30:00", memory=192, - ncpus=48, jobfs=20, - filesystem_projects=''.join(fsys_projects), - email=('#PBS -M ' + email) if email else "") - archive_job_id = _submit_archive(batch_logdir, batch_logdir, archive_list, batchid, - pbs_resources, index_datacube_env, test) + pbs_resources = PBS_RESOURCES.format( + project=project, + queue="normal", + walltime="00:30:00", + memory=192, + ncpus=48, + jobfs=20, + filesystem_projects="".join(fsys_projects), + email=("#PBS -M " + email) if email else "", + ) + archive_job_id = _submit_archive( + batch_logdir, + batch_logdir, + archive_list, + batchid, + pbs_resources, + index_datacube_env, + test, + ) nci_job_ids.append(archive_job_id) else: - logging.error('Archive list given but --index-datacube-env not specified.') + logging.error("Archive list given but --index-datacube-env not specified.") - job_details = { - 'ardpbs_batch_id': batchid, - 'nci_job_ids': nci_job_ids - } + job_details = {"ardpbs_batch_id": batchid, "nci_job_ids": nci_job_ids} # Enable the job details to be picked up by the calling process click.echo(json.dumps(job_details)) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/bin/s2-nci-processing b/bin/s2-nci-processing deleted file mode 100755 index 81fb501..0000000 --- a/bin/s2-nci-processing +++ /dev/null @@ -1,363 +0,0 @@ -#!/usr/bin/env python3 - -import io -import os -import logging -import math -import tempfile -import zipfile -import sys -from xml.etree import ElementTree -from datetime import datetime, timedelta -from os.path import join, basename -from subprocess import Popen, PIPE, check_output, CalledProcessError -from pathlib import Path - -import click -from click_datetime import Datetime -from dateutil.parser import parse as date_parser - -from wagl.acquisition import acquisitions -from tesp.workflow import Package -from eodatasets.prepare.s2_prepare_cophub_zip import _process_datasets - - -DEFAULT_S2_AOI = '/g/data/v10/eoancillarydata/S2_extent/S2_aoi.csv' -DEFAULT_S2_L1C = '/g/data/fj7/Copernicus/Sentinel-2/MSI/L1C' -DEFAULT_WORKDIR = '/g/data/if87/datacube/002/S2_MSI_ARD/workdir' -DEFAULT_LOGDIR = '/g/data/if87/datacube/002/S2_MSI_ARD/log_dir' -DEFAULT_PKGDIR = '/g/data/if87/datacube/002/S2_MSI_ARD/packaged' - -_TODAY = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - - -def get_archive_metadata(pathname: Path): - """ Code to extract granule names from L1C Metadata, and processing baseline - - Logic has been ported from wagl.acquisition.__init__ - to avoid the overhead of caching the per measurement metadata - methods in wagl.acquisition should be refactored to export - this functionality - - returns a list of granule names - """ - archive = zipfile.ZipFile(str(pathname)) - xmlfiles = [s for s in archive.namelist() if "MTD_MSIL1C.xml" in s] - if not xmlfiles: - pattern = basename(str(pathname).replace('PRD_MSIL1C', 'MTD_SAFL1C')) - pattern = pattern.replace('.zip', '.xml') - xmlfiles = [s for s in archive.namelist() if pattern in s] - - mtd_xml = archive.read(xmlfiles[0]) - xml_root = ElementTree.XML(mtd_xml) - - search_term = './*/Product_Info/Product_Organisation/Granule_List/Granules' - grn_elements = xml_root.findall(search_term) - - # handling multi vs single granules + variants of each type - if not grn_elements: - grn_elements = xml_root.findall(search_term[:-1]) - - if grn_elements[0].findtext('IMAGE_ID'): - search_term = 'IMAGE_ID' - else: - search_term = 'IMAGE_FILE' - - # required to identify granule metadata in a multigranule archive - # in the earlier l1c products - processing_baseline = xml_root.findall('./*/Product_Info/PROCESSING_BASELINE')[0].text - - results = {} - for granule in grn_elements: - gran_id = granule.get('granuleIdentifier') - if not pathname.suffix == '.zip': - gran_path = str(pathname.parent.joinpath('GRANULE', gran_id, gran_id[:-7].replace('MSI', 'MTD') + '.xml')) - root = ElementTree.parse(gran_path).getroot() - else: - xmlzipfiles = [s for s in archive.namelist() if 'MTD_TL.xml' in s] - if not xmlzipfiles: - pattern = gran_id.replace('MSI', 'MTD') - pattern = pattern.replace('_N' + processing_baseline, '.xml') - xmlzipfiles = [s for s in archive.namelist() if pattern in s] - mtd_xml = archive.read(xmlzipfiles[0]) - root = ElementTree.XML(mtd_xml) - sensing_time = root.findall('./*/SENSING_TIME')[0].text - results[gran_id] = date_parser(sensing_time) - - return results - - -def _calc_nodes_req(granule_count, walltime, workers, hours_per_granule=1.5): - """ Provides estimation of the number of nodes required to process granule count - - >>> _calc_nodes_req(400, '20:59', 28) - 2 - >>> _calc_nodes_req(800, '20:00', 28) - 3 - """ - - hours, _, _ = [int(x) for x in walltime.split(':')] - return int(math.ceil(float(hours_per_granule * granule_count) / (hours * workers))) - - -def _level1_dataset_path_iter(root_directory: Path, *find_options, subprocess_depth=1): - """ Yields the path to level1 archives for sentinel2 - - Function splits up filesystem recursion by first identifying files up to the - subprocess_depth (in a subprocess) then separately runs find against all - directories found at that depth. - - This is done in order to reduce the upper bound on memory consumed by - the redirection of stdout. - """ - def _run_find(level1_dir: Path, *find_options): - find_options = find_options or [] - cmd = ['find', str(level1_dir)] + list(find_options) + ['-name', '*.zip', ] - in_stream = io.TextIOWrapper(Popen(cmd, stdout=PIPE).stdout, encoding='utf-8') - logging.info("calling %s", ' '.join(cmd)) - for level1_path in in_stream: - yield Path(level1_path.strip()) - - - def _get_dirs_at_depth(root, depth=1): - """ Returns a list of directories at a set depth from root""" - - def get_dirs_recurse(root, depth, max_depth=1): - results = [] - for _path in root.iterdir(): - if _path.is_dir(): - if depth == max_depth: - results.append(_path) - else: - try: - results.extend(get_dirs_recurse(_path, depth + 1, max_depth)) - except OSError as e: - logging.exception(e) - return results - - # Depth starts at one as it lists the files at the root - return get_dirs_recurse(Path(root), 1, depth) - - - # Handle base case - yield from _run_find(root_directory, '-maxdepth', str(subprocess_depth), *find_options) - # Handle seperate subprocess calls - search_dirs = _get_dirs_at_depth(root_directory, subprocess_depth) - for d in search_dirs: - yield from _run_find(root_directory / d, *find_options) - - -def get_find_options(start_date, end_date, today=_TODAY): - """ Returns the find options for subprocess call - Assumes that range is defined: start_date <= file_modified_time < end_date - - >>> get_find_options(datetime(2018, 6, 30), datetime(2018, 7, 1), datetime(2018, 10, 15)) - ['-daystart', '-mtime', '-108', '-mtime', '+106'] - >>> get_find_options(datetime(2018, 6, 30), datetime(2018, 10, 15), datetime(2018, 10, 15)) - ['-daystart', '-mtime', '-108'] - - """ - find_options = [ - '-daystart', - ] - min_in_days = (today - start_date).days + 1 # -1 since find is exclusive > - find_options.extend(['-mtime', '-{}'.format(str(min_in_days))]) - max_in_days = (today - end_date).days - if max_in_days: - # Ignore upper bound if end_date is today - find_options.extend(['-mtime', '+' + str(max_in_days)]) - - return find_options - - -@click.group() -def cli(): - pass - - -@cli.command('process-level2') -@click.option('--level1-root', default=DEFAULT_S2_L1C, type=str, - help="Folder containing Sentinel-2 level-1 datasets.") -@click.option('--s2-aoi', default=DEFAULT_S2_AOI, type=str, - help="List of MGRS tiles of interest.") -@click.option('--start-date', type=Datetime(format='%Y-%m-%d'), - help="Start of date range to process.") -@click.option('--end-date', type=Datetime(format='%Y-%m-%d'), - help="End of date range to process.") -@click.option('--pkgdir', default=DEFAULT_PKGDIR, type=click.Path(file_okay=False, writable=True), - help="The base output packaged directory.") -@click.option("--workdir", default=DEFAULT_WORKDIR, type=click.Path(file_okay=False, writable=True), - help="The base output working directory.") -@click.option("--logdir", default=DEFAULT_LOGDIR, type=click.Path(file_okay=False, writable=True), - help="The base logging and scripts output directory.") -@click.option("--env", type=click.Path(exists=True, readable=True), - help="Environment script to source.") -@click.option("--workers", type=click.IntRange(1, 32), default=28, - help="The number of workers to request per node.") -@click.option("--memory", default=256, - help="The memory in GB to request per node.") -@click.option("--jobfs", default=60, - help="The jobfs memory in GB to request per node.") -@click.option("--project", required=True, help="Project code to run under.") -@click.option("--queue", default='normalbw', - help="Queue to submit the job into, e.g. normalbw, expressbw.") -@click.option("--walltime", default="48:00:00", - help="Job walltime in `hh:mm:ss` format.") -@click.option("--email", default="", - help="Notification email address.") -@click.option("--test", default=False, is_flag=True, - help="Test job execution (Don't submit the job to the PBS queue).") -def process_level2(level1_root, s2_aoi, start_date, end_date, pkgdir, workdir, logdir, env, - workers, memory, jobfs, project, queue, walltime, email, test): - - click.echo(' '.join(sys.argv)) - - logging.basicConfig(format='%(asctime)s %(levelname)s (%(pathname)s:%(lineno)s) %(message)s', level=logging.INFO) - - # Read area of interest list - with open(s2_aoi) as csv: - tile_ids = {'T' + tile.strip() for tile in csv} - - find_options = [] - - if start_date or end_date: - start_date = start_date or datetime(_TODAY.year, 1, 1) - end_date = end_date or _TODAY - - find_options = get_find_options(start_date, end_date) - - def filter_granule_worker(out_stream): - count = 0 - for level1_dataset in _level1_dataset_path_iter(Path(level1_root), *find_options): - try: - container = acquisitions(str(level1_dataset)) - except Exception as e: - logging.warning('encountered unexpected error for %s: %s', str(level1_dataset), e) - logging.exception(e) - continue - - granule_md = get_archive_metadata(level1_dataset) - - for granule, sensing_date in granule_md.items(): - tile_id = granule.split('_')[-2] - if tile_id not in tile_ids: - logging.debug('granule %s with MGRS tile ID %s outside AOI', granule, tile_id) - continue - - ymd = sensing_date.strftime('%Y-%m-%d') - package = Package( - level1=str(level1_dataset), - workdir='', - granule=granule, - pkgdir=join(pkgdir, ymd) - ) - if package.output().exists(): - logging.debug('granule %s already processed', granule) - continue - - logging.debug('level1 dataset %s needs to be processed', level1_dataset) - print(level1_dataset, file=out_stream) - count += len(granule_md.keys()) # To handle multigranule files - break - return out_stream, count - - with tempfile.NamedTemporaryFile(mode="w+") as out_stream: - _, granule_count = filter_granule_worker(out_stream) - out_stream.flush() - - if granule_count == 0: - logging.info("no granules to process.") - return - - num_nodes = _calc_nodes_req(granule_count, walltime, workers) - assert num_nodes > 0, "cannot ask for {} nodes".format(num_nodes) - assert num_nodes <= 200, "number of nodes to request {} is too high".format(num_nodes) - - ard_cmd = ['ard_pbs', '--level1-list', out_stream.name, '--workdir', workdir, - '--logdir', logdir, '--pkgdir', pkgdir, '--env', env, - '--workers', str(workers), '--nodes', str(num_nodes), '--memory', str(memory), - '--jobfs', str(jobfs), '--project', project, '--queue', queue, '--walltime', walltime] - - if email: - ard_cmd += ['--email', email] - if test: - ard_cmd += ['--test'] - - logging.info("calling %s", ' '.join(ard_cmd)) - raw_output = None - try: - raw_output = check_output(ard_cmd) - except CalledProcessError as exc: - logging.error('ard_pbs failed with exit code %s', str(exc.returncode)) - logging.exception(exc.output) - raise - finally: - if raw_output: - # Echo job info, can be useful to track job submission - click.echo(raw_output.decode('utf-8')) - - -@cli.command('generate-level1') -@click.option('--level1-root', - type=click.Path(exists=True, readable=True), - help='directory to write yamls to') -@click.option('--output-dir', - type=click.Path(exists=True, writable=True), - callback=lambda ctx, param, value: Path(value), - help='directory to write yamls to') -@click.option('--start-date', type=Datetime('%Y-%m-%d'), - default=(_TODAY.year, 1, 1), - help='Start of date range for level 1 generation') -@click.option('--end-date', type=Datetime('%Y-%m-%d'), - default=_TODAY, - help='End of date range for level 1 generation') -@click.option('--copy-parent-dir-count', type=int, default=0) -@click.option('--retries', type=int, default=3) -@click.option('--checksum/--no-checksum', default=False) -@click.option('--dry-run', default=False, is_flag=True) -@click.option('--log-level', default="WARNING", type=str, - help="Set a log level. e.g. WARNING INFO") -def generate_level1(level1_root: Path, output_dir: Path, start_date: datetime, - end_date: datetime, copy_parent_dir_count: int, - retries: int, checksum: bool, dry_run: bool, - log_level: str): - click.echo(' '.join(sys.argv)) - try: - logging.basicConfig(level=log_level) - except: - logging.basicConfig(level="WARNING") - logging.warning("Log level defaulting to warning.") - logging.info('this is an info level test') - # run a find command to generate a list of level1 documents - find_options = get_find_options(start_date, end_date) - - for level1_dataset in _level1_dataset_path_iter(Path(level1_root), *find_options): - # Include the parent directories of the source file; yaml files are broken up into - # 5 degree by 5 degree geographies like level1 datasets - # There is no reason for this breakup - yaml_output_dir = output_dir / '/'.join(level1_dataset.parts[-(1 + copy_parent_dir_count):-1]) - os.makedirs(yaml_output_dir, exist_ok=True) - for i in range(retries): - try: - # This is to avoid skipping due to Dataset creation time being older than start date - # Note if the difference is more than 7 days it will still skip - start_date = start_date - timedelta(7) - if dry_run: - click.echo( - 'Processing: datasets: {}, outdir: {}, checksum: {}, start_date: {}'.format( - str(level1_dataset), str(yaml_output_dir), str(checksum), str(start_date) - ) - ) - else: - logging.info('Processing archive: %s', str(level1_dataset)) - _process_datasets(yaml_output_dir, (level1_dataset, ), checksum, start_date) - break - except Exception as e: - logging.error('Issue processing archive: %s', str(level1_dataset)) - logging.exception(e) - else: - logging.error('Skipping: %s', str(level1_dataset)) - - -if __name__ == '__main__': - cli() diff --git a/bin/search_s2 b/bin/search_s2 index e98d7bd..80429b7 100644 --- a/bin/search_s2 +++ b/bin/search_s2 @@ -5,7 +5,7 @@ Query available S2 L1C data for an area of interest of target input zip archives example usage: - + python search_s2.py --bounds 111 156 -45 -8 --date_start 01/11/2015 --date_stop 01/12/2015 --product s2a_level1c_granule --config ~/.aws_datacube.conf --output /g/data/v10/tmp/output.txt """ @@ -39,7 +39,7 @@ def main(output, bounds, date_start, date_stop, config, product): filename = product+'_'+str(bounds[0])+'_'+str(bounds[1])+'_'+str(bounds[2])+'_'+str(bounds[3])+'_'+str(date_start).replace(' ', '')+'_'+str(date_stop).replace(' ', '')+'.txt' outfile = os.path.join(output, filename) - + query = datacube.api.query.Query(product=product, time=Range(date_start, date_stop), longitude=(bounds[0], bounds[1]), latitude=(bounds[2], bounds[3])) logging.info("Search results output to %s", outfile) data = dc.index.datasets.search_eager(**query.search_terms) @@ -53,4 +53,4 @@ def main(output, bounds, date_start, date_stop, config, product): for item in unique: search_result.write("%s\n" % item) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/pyproject.toml b/pyproject.toml index 7d7084f..bd21d60 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,28 @@ [build-system] requires = ["setuptools>=42", "wheel", "setuptools_scm[toml]>=3.4"] + +[tool.ruff] + +target-version = "py38" + +# Match the old configured flake8 line length +line-length = 120 + +# Which checkers to enable? +select = [ + "E", # pycodestyle + "F", # pyflakes + "UP", # pyupgrade + "N", # pep8-naming +] + + +[tool.ruff.per-file-ignores] + +# Allow the (not-previously-linted) PBS script to continue having long lines +"bin/ard_pbs" = ["E501"] + [tool.black] line-length = 90 target-version = ['py36', 'py37', 'py38'] diff --git a/setup.py b/setup.py index 713cfbf..a3976da 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,6 @@ "shapely", "structlog", "checksumdir", - "eodatasets", "eodatasets3>=0.19.2", "eugl", "wagl", @@ -35,7 +34,6 @@ test=["pytest", "pytest-flake8", "deepdiff", "flake8", "pep8-naming"] ), dependency_links=[ - "git+https://github.com/GeoscienceAustralia/eo-datasets.git@eodatasets-0.12#egg=eodatasets", "git+https://github.com/GeoscienceAustralia/wagl@develop#egg=wagl", "git+https://github.com/OpenDataCubePipelines/eugl.git@master#egg=eugl", ], @@ -43,7 +41,6 @@ "bin/s2package", "bin/ard_pbs", "bin/search_s2", - "bin/s2-nci-processing", "bin/batch_summary", ], include_package_data=True, diff --git a/tesp/checksum.py b/tesp/checksum.py index 7c22215..48b2d8b 100644 --- a/tesp/checksum.py +++ b/tesp/checksum.py @@ -2,7 +2,7 @@ from pathlib import Path import argparse -from eodatasets.verify import PackageChecksum +from eodatasets3.verify import PackageChecksum def checksum(out_fname): diff --git a/tesp/contrast.py b/tesp/contrast.py index a862e13..06d7f9b 100644 --- a/tesp/contrast.py +++ b/tesp/contrast.py @@ -55,7 +55,6 @@ def quicklook(fname, out_fname, src_min, src_max, out_min=0, out_max=255): The output datatype will be `UInt8`. """ with rasterio.open(fname) as ds: - # no data locations nulls = numpy.zeros((ds.height, ds.width), dtype="bool") for band in range(1, 4): diff --git a/tesp/html_geojson.py b/tesp/html_geojson.py index e157d74..e3206cd 100644 --- a/tesp/html_geojson.py +++ b/tesp/html_geojson.py @@ -1,9 +1,7 @@ -# coding=utf-8 """ Execution method for creation of map.html and bounding geojson: python html_geojson.py ALLBANDS_20m.contiguity.img """ -from __future__ import absolute_import import os import logging import json @@ -118,7 +116,7 @@ def html_map(contiguity_fname, html_out_fname, json_out_fname): def style_function(*args): return {"fillColor": None, "color": "#0000ff"} - with open(json_out_fname, "r") as src: + with open(json_out_fname) as src: # Manual handling of json load for multi-version support of folium _geojson = json.load(src) GeoJson(_geojson, name="bounds.geojson", style_function=style_function).add_to(m) diff --git a/tesp/luigi_db_utils.py b/tesp/luigi_db_utils.py index 75a42cb..66160e5 100644 --- a/tesp/luigi_db_utils.py +++ b/tesp/luigi_db_utils.py @@ -48,11 +48,11 @@ def retrieve_status(fname, task_name): how="left", left_on="id", right_on="task_id", - suffixes=["_{}".format(task_name), "_events"], + suffixes=[f"_{task_name}", "_events"], ) # final status for each DataStandardisation Task - final_status = status.drop_duplicates("id_{}".format(task_name), keep="last") + final_status = status.drop_duplicates(f"id_{task_name}", keep="last") # get the DONE, FAILED & PENDING Tasks # (if the task status is PENDING: @@ -65,16 +65,16 @@ def retrieve_status(fname, task_name): running = final_status[final_status.event_name == "RUNNING"] l1_done = done.merge( - l1_granules, how="left", right_on="task_id", left_on="id_{}".format(task_name) + l1_granules, how="left", right_on="task_id", left_on=f"id_{task_name}" ) l1_fail = fail.merge( - l1_granules, how="left", right_on="task_id", left_on="id_{}".format(task_name) + l1_granules, how="left", right_on="task_id", left_on=f"id_{task_name}" ) l1_pending = pending.merge( - l1_granules, how="left", right_on="task_id", left_on="id_{}".format(task_name) + l1_granules, how="left", right_on="task_id", left_on=f"id_{task_name}" ) l1_running = running.merge( - l1_granules, how="left", right_on="task_id", left_on="id_{}".format(task_name) + l1_granules, how="left", right_on="task_id", left_on=f"id_{task_name}" ) return l1_done, l1_fail, l1_pending, l1_running diff --git a/tesp/package.py b/tesp/package.py index 6ea0c53..130cc04 100644 --- a/tesp/package.py +++ b/tesp/package.py @@ -71,9 +71,9 @@ def package_non_standard(outdir, granule): granule_group = fid[granule.name] try: - wagl_path, *ancil_paths = [ + wagl_path, *ancil_paths = ( pth for pth in find(granule_group, "SCALAR") if "METADATA" in pth - ] + ) except ValueError: raise ValueError("No nbar metadata found in granule") @@ -96,7 +96,7 @@ def package_non_standard(outdir, granule): eodatasets3.wagl._read_fmask_doc(da, granule.fmask_doc) with rasterio.open(fmask_img) as ds: - fmask_layer = "/{}/OA_FMASK/oa_fmask".format(granule.name) + fmask_layer = f"/{granule.name}/OA_FMASK/oa_fmask" data = ds.read(1) fmask_ds = f.create_dataset( fmask_layer, data=data, compression="lzf", shuffle=True @@ -127,7 +127,7 @@ def package_non_standard(outdir, granule): grid_spec, pathname, fmask_ds[:], - layer="/{}".format(fmask_layer), + layer=f"/{fmask_layer}", nodata=no_data, expand_valid_data=False, ) @@ -147,7 +147,7 @@ def package_non_standard(outdir, granule): if "STANDARDISED-PRODUCTS" in str(ds_path): product_group = ds_path.parent.name elif "INTERPOLATED-ATMOSPHERIC-COEFFICIENTS" in str(ds_path): - product_group = "oa_{}".format(ds_path.parent.name) + product_group = f"oa_{ds_path.parent.name}" else: product_group = "oa" @@ -193,7 +193,7 @@ def package_non_standard(outdir, granule): grid_spec, pathname, out_ds[:], - layer="/{}".format(out_ds.name), + layer=f"/{out_ds.name}", nodata=no_data, expand_valid_data=include, ) @@ -206,7 +206,7 @@ def package_non_standard(outdir, granule): grid_spec, pathname, ds[:], - layer="/{}".format(ds.name), + layer=f"/{ds.name}", nodata=no_data, expand_valid_data=include, ) diff --git a/tesp/prepare.py b/tesp/prepare.py index caf1a54..64c3ec0 100644 --- a/tesp/prepare.py +++ b/tesp/prepare.py @@ -1,18 +1,18 @@ +import tempfile from pathlib import Path +from typing import Dict -from eodatasets.prepare.ls_usgs_l1_prepare import prepare_dataset as landsat_prepare -from eodatasets.prepare.s2_prepare_cophub_zip import ( - prepare_dataset as sentinel_2_zip_prepare, -) -from eodatasets.prepare.s2_l1c_aws_pds_prepare import ( - prepare_dataset as sentinel_2_aws_pds_prepare, -) +from eodatasets3 import serialise +from eodatasets3.prepare.landsat_l1_prepare import prepare_and_write as ls_prepare +from eodatasets3.prepare.sentinel_l1_prepare import prepare_and_write as s2_prepare +from wagl.acquisition import Acquisition -def extract_level1_metadata(acq): +def extract_level1_metadata(acq: Acquisition) -> Dict: """ Factory method for selecting a level1 metadata script + Returns the serialisable yaml document(s). Dict, or list of dicts. """ # Optional (not installed yet on Travis) # pytest: disable=import-error @@ -22,14 +22,26 @@ def extract_level1_metadata(acq): ) from wagl.acquisition.landsat import LandsatAcquisition - if isinstance(acq, _Sentinel2SinergiseAcquisition): - return sentinel_2_aws_pds_prepare(Path(acq.pathname)) - elif isinstance(acq, Sentinel2Acquisition): - return sentinel_2_zip_prepare(Path(acq.pathname)) - elif isinstance(acq, LandsatAcquisition): - return landsat_prepare(Path(acq.pathname)) + with tempfile.TemporaryDirectory() as tmpdir: + yaml_path = Path(tmpdir) / "level1.yaml" - raise NotImplementedError( - "No level-1 YAML generation defined for target acquisition " - "and no yaml_dir defined for level-1 metadata" - ) + if isinstance(acq, _Sentinel2SinergiseAcquisition): + dataset_doc, path = s2_prepare( + acq.pathname, yaml_path, producer="sinergise.com", embed_location=True + ) + elif isinstance(acq, Sentinel2Acquisition): + dataset_doc, path = s2_prepare( + acq.pathname, yaml_path, producer="esa.int", embed_location=True + ) + elif isinstance(acq, LandsatAcquisition): + uuid, path = ls_prepare( + acq.pathname, yaml_path, producer="usgs.gov", embed_location=True + ) + dataset_doc = serialise.from_path(path, skip_validation=True) + else: + raise NotImplementedError( + "No level-1 YAML generation defined for target acquisition " + "and no yaml_dir defined for level-1 metadata" + ) + + return serialise.to_doc(dataset_doc) diff --git a/tesp/sensitivity.py b/tesp/sensitivity.py index 64fab2e..f1c6009 100644 --- a/tesp/sensitivity.py +++ b/tesp/sensitivity.py @@ -316,10 +316,8 @@ def unpack_dataset(product_group, product_name, band): # human readable band name band_name = dataset.attrs["alias"] - out_file = pjoin(outdir, "{}_{}.tif".format(product_name, band_name)) - count_file = pjoin( - outdir, "{}_{}_valid_pixel_count.tif".format(product_name, band_name) - ) + out_file = pjoin(outdir, f"{product_name}_{band_name}.tif") + count_file = pjoin(outdir, f"{product_name}_{band_name}_valid_pixel_count.tif") nodata = dataset.attrs.get("no_data_value") geobox = GriddedGeoBox.from_dataset(dataset) @@ -611,7 +609,6 @@ def merge_images(left, right, target): with h5py.File(target) as target_fid, h5py.File( left_filename, "r" ) as left_fid, h5py.File(right_filename, "r") as right_fid: - if len(left_fid) != 1 or len(right_fid) != 1: raise ValueError("multiple granules not supported") @@ -644,7 +641,6 @@ def reflectance(granule): def experiment_summary(l2_path, fmask_path, granule, tag, settings): - with rasterio.open(fmask_path) as mask_file: fmask = mask_file.read(1) @@ -665,7 +661,6 @@ def mask_invalid(img): ] with h5py.File(l2_path) as h5: - dataset = h5[granule] date = dataset[GroupName.ATMOSPHERIC_INPUTS_GRP.value].attrs[ "acquisition-datetime" diff --git a/tesp/workflow.py b/tesp/workflow.py index 417333e..7029baa 100644 --- a/tesp/workflow.py +++ b/tesp/workflow.py @@ -111,15 +111,9 @@ def output(self): if not self.platform_id().startswith("SENTINEL"): return None - prob_out_fname = pjoin( - self.workdir, "{}.prob.s2cloudless.tif".format(self.granule) - ) - mask_out_fname = pjoin( - self.workdir, "{}.mask.s2cloudless.tif".format(self.granule) - ) - metadata_out_fname = pjoin( - self.workdir, "{}.s2cloudless.yaml".format(self.granule) - ) + prob_out_fname = pjoin(self.workdir, f"{self.granule}.prob.s2cloudless.tif") + mask_out_fname = pjoin(self.workdir, f"{self.granule}.mask.s2cloudless.tif") + metadata_out_fname = pjoin(self.workdir, f"{self.granule}.s2cloudless.yaml") out_fnames = { "cloud_prob": luigi.LocalTarget(prob_out_fname), @@ -193,8 +187,8 @@ class RunFmask(luigi.Task): acq_parser_hint = luigi.OptionalParameter(default="") def output(self): - out_fname1 = pjoin(self.workdir, "{}.fmask.img".format(self.granule)) - out_fname2 = pjoin(self.workdir, "{}.fmask.yaml".format(self.granule)) + out_fname1 = pjoin(self.workdir, f"{self.granule}.fmask.img") + out_fname2 = pjoin(self.workdir, f"{self.granule}.fmask.yaml") out_fnames = { "image": luigi.LocalTarget(out_fname1), @@ -331,7 +325,7 @@ def output(self): # create a text file to act as a completion target # this could be changed to be a database record parent_dir = Path(self.workdir).parent - out_fname = parent_dir.joinpath("{}.completed".format(self.granule)) + out_fname = parent_dir.joinpath(f"{self.granule}.completed") return luigi.LocalTarget(str(out_fname)) @@ -386,7 +380,7 @@ def search_for_external_level1_metadata() -> Optional[Path]: s2cloudless_mask_fname = None s2cloudless_metadata_fname = None - tesp_doc_fname = Path(self.workdir) / "{}.tesp.yaml".format(self.granule) + tesp_doc_fname = Path(self.workdir) / f"{self.granule}.tesp.yaml" with tesp_doc_fname.open("w") as src: yaml.safe_dump(_get_tesp_metadata(), src) @@ -403,7 +397,6 @@ def search_for_external_level1_metadata() -> Optional[Path]: tesp_doc_path=tesp_doc_fname, level1_metadata_path=search_for_external_level1_metadata(), ): - if self.non_standard_packaging: ds_id, md_path = package_non_standard(Path(self.pkgdir), eods_granule) else: @@ -443,7 +436,7 @@ def search_for_external_level1_metadata() -> Optional[Path]: def list_packages(workdir, acq_parser_hint, pkgdir, yamls_dir): def worker(level1): - work_root = pjoin(workdir, "{}.ARD".format(basename(level1))) + work_root = pjoin(workdir, f"{basename(level1)}.ARD") result = [] for granule in preliminary_acquisitions_data(level1, acq_parser_hint): diff --git a/tesp/yaml_merge.py b/tesp/yaml_merge.py index 2b37ce7..d749999 100644 --- a/tesp/yaml_merge.py +++ b/tesp/yaml_merge.py @@ -1,9 +1,7 @@ -# coding=utf-8 """ Preparation code supporting merge of target Analysis Ready Data yaml metadata document and the source Level 1 yaml """ # nopep8 -from __future__ import absolute_import import os import uuid @@ -63,7 +61,6 @@ def merge_metadata( # for Landsat, from_dt and to_dt in ARD-METADATA is populated from max and min timedelta values if platform == "LANDSAT": - # pylint: disable=too-many-function-args def interpret_landsat_temporal_extent(): """ @@ -80,10 +77,10 @@ def interpret_landsat_temporal_extent(): ) level2_extent = { - "center_dt": "{}Z".format(center_dt), + "center_dt": f"{center_dt}Z", "coord": level1_tags["extent"]["coord"], - "from_dt": "{}Z".format(from_dt), - "to_dt": "{}Z".format(to_dt), + "from_dt": f"{from_dt}Z", + "to_dt": f"{to_dt}Z", } return level2_extent