diff --git a/src/swell/suites/3dfgat_atmos/flow.cylc b/src/swell/suites/3dfgat_atmos/flow.cylc index b4d238308..85ce51923 100644 --- a/src/swell/suites/3dfgat_atmos/flow.cylc +++ b/src/swell/suites/3dfgat_atmos/flow.cylc @@ -21,6 +21,9 @@ initial cycle point = {{start_cycle_point}} final cycle point = {{final_cycle_point}} runahead limit = {{runahead_limit}} + [[queues]] + [[[default]]] + limit = {{concurrent_limit}} [[graph]] R1 = """ @@ -58,13 +61,17 @@ {% if cycling_varbc %} # Cycling VarBC is active, biases from the previous cycle will be used - RunJediVariationalExecutable-{{model_component}}[-PT6H] => GetObservations-{{model_component}} + RunJediVariationalExecutable-{{model_component}}[-PT6H] => GetObservationsStart {% else %} # Cycling VarBC is inactive, static bias files will be used - GetObservations-{{model_component}} + GetObservationsStart {% endif %} + {% for observation in models[model_component]['observations'] %} + GetObservationsStart => GetObservations-{{observation}}-{{model_component}} => GetObservationsFinish + {% endfor %} + # Perform staging that is cycle dependent StageJediCycle-{{model_component}} @@ -73,10 +80,9 @@ CloneJedi[^] => StageJediCycle-{{model_component}} StageJediCycle-{{model_component}} => RunJediVariationalExecutable-{{model_component}} GetBackgroundGeosExperiment-{{model_component}}? | GetBackground-{{model_component}} => RunJediVariationalExecutable-{{model_component}} - GetObsNotInR2d2-{{model_component}}: fail? => GetObservations-{{model_component}} - GetObsNotInR2d2-{{model_component}}? | GetObservations-{{model_component}} => RunJediVariationalExecutable-{{model_component}} + GetObsNotInR2d2-{{model_component}}: fail? => GetObservationsStart + GetObsNotInR2d2-{{model_component}}? | GetObservationsFinish => RenderJediObservations-{{model_component}} GenerateObservingSystemRecords-{{model_component}} => RenderJediObservations-{{model_component}} - GetObservations-{{model_component}} => RenderJediObservations-{{model_component}} RenderJediObservations-{{model_component}} => RunJediVariationalExecutable-{{model_component}} @@ -152,8 +158,25 @@ [[GetBackgroundGeosExperiment-{{model_component}} ]] script = "swell task GetBackgroundGeosExperiment $config -d $datetime -m {{model_component}}" + [[GetObservationsStart]] + script = "true" + + [[GetObservationsFinish]] + script = "true" + + # --------------------------------------------------------------------------------------------- + [[GetObservations-{{model_component}}]] - script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + + # --------------------------------------------------------------------------------------------- + + {% for observation in models[model_component]['observations'] %} + [[GetObservations-{{observation}}-{{model_component}}]] + script = "swell task GetObservations $config -d $datetime -m {{model_component}} -a {{observation}}" + inherit = GetObservations-{{model_component}} + {% endfor %} + # --------------------------------------------------------------------------------------------- [[RenderJediObservations-{{model_component}}]] script = "swell task RenderJediObservations $config -d $datetime -m {{model_component}}" diff --git a/src/swell/suites/3dfgat_marine_cycle/flow.cylc b/src/swell/suites/3dfgat_marine_cycle/flow.cylc index fa1e956f8..2ce3c7c99 100644 --- a/src/swell/suites/3dfgat_marine_cycle/flow.cylc +++ b/src/swell/suites/3dfgat_marine_cycle/flow.cylc @@ -21,6 +21,9 @@ initial cycle point = {{start_cycle_point}} final cycle point = {{final_cycle_point}} runahead limit = {{runahead_limit}} + [[queues]] + [[[default]]] + limit = {{concurrent_limit}} [[graph]] R1 = """ @@ -76,7 +79,13 @@ LinkCoupledGeosOutput-{{model_component}} => GenerateBClimatology-{{model_component}} # Data assimilation preperation - GetObservations-{{model_component}} => RenderJediObservations-{{model_component}} + StageJediCycle-{{model_component}} => RunJediFgatExecutable-{{model_component}} + + GenerateBClimatology-{{model_component}} => RunJediFgatExecutable-{{model_component}} + {% for observation in models[model_component]['observations'] %} + GetObservations-{{observation}}-{{model_component}} => RenderJediObservations-{{model_component}} + {% endfor %} + RenderJediObservations-{{model_component}} => RunJediFgatExecutable-{{model_component}} LinkCoupledGeosOutput-{{model_component}} => RunJediFgatExecutable-{{model_component}} @@ -188,8 +197,19 @@ [[StageJediCycle-{{model_component}}]] script = "swell task StageJedi $config -d $datetime -m {{model_component}}" + # --------------------------------------------------------------------------------------------- + [[GetObservations-{{model_component}}]] - script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + + # --------------------------------------------------------------------------------------------- + + {% for observation in models[model_component]['observations'] %} + [[GetObservations-{{observation}}-{{model_component}}]] + script = "swell task GetObservations $config -d $datetime -m {{model_component}} -a {{observation}}" + inherit = GetObservations-{{model_component}} + {% endfor %} + # --------------------------------------------------------------------------------------------- [[GenerateBClimatology-{{model_component}}]] script = "swell task GenerateBClimatology $config -d $datetime -m {{model_component}}" diff --git a/src/swell/suites/3dvar_atmos/flow.cylc b/src/swell/suites/3dvar_atmos/flow.cylc index e0861d645..8d8c81a3a 100644 --- a/src/swell/suites/3dvar_atmos/flow.cylc +++ b/src/swell/suites/3dvar_atmos/flow.cylc @@ -21,6 +21,9 @@ initial cycle point = {{start_cycle_point}} final cycle point = {{final_cycle_point}} runahead limit = {{runahead_limit}} + [[queues]] + [[[default]]] + limit = {{concurrent_limit}} [[graph]] R1 = """ @@ -58,11 +61,11 @@ {% if cycling_varbc %} # Cycling VarBC is active, biases from the previous cycle will be used - RunJediVariationalExecutable-{{model_component}}[-PT6H] => GetObservations-{{model_component}} + RunJediVariationalExecutable-{{model_component}}[-PT6H] => GetObservationsStart {% else %} # Cycling VarBC is inactive, static bias files will be used - GetObsNotInR2d2-{{model_component}}: fail? => GetObservations-{{model_component}} + GetObsNotInR2d2-{{model_component}}: fail? => GetObservationsStart {% endif %} # Perform staging that is cycle dependent @@ -73,7 +76,10 @@ CloneJedi[^] => StageJediCycle-{{model_component}} StageJediCycle-{{model_component}} => RunJediVariationalExecutable-{{model_component}} GetBackgroundGeosExperiment-{{model_component}}? | GetBackground-{{model_component}} => RunJediVariationalExecutable-{{model_component}} - GetObsNotInR2d2-{{model_component}}? | GetObservations-{{model_component}} => RenderJediObservations-{{model_component}} + {% for observation in models[model_component]['observations'] %} + GetObservationsStart => GetObservations-{{observation}}-{{model_component}} => GetObservationsFinish + {% endfor %} + GetObsNotInR2d2-{{model_component}}? | GetObservationsFinish => RenderJediObservations-{{model_component}} RenderJediObservations-{{model_component}} => RunJediVariationalExecutable-{{model_component}} GenerateObservingSystemRecords-{{model_component}} => RunJediVariationalExecutable-{{model_component}} @@ -129,6 +135,12 @@ --{{key}} = {{value}} {%- endfor %} + [[GetObservationsStart]] + script = "true" + + [[GetObservationsFinish]] + script = "true" + {% for model_component in model_components %} [[CloneGeosMksi-{{model_component}}]] @@ -149,9 +161,18 @@ [[GetBackgroundGeosExperiment-{{model_component}} ]] script = "swell task GetBackgroundGeosExperiment $config -d $datetime -m {{model_component}}" + # --------------------------------------------------------------------------------------------- + [[GetObservations-{{model_component}}]] script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + {% for observation in models[model_component]['observations'] %} + [[GetObservations-{{observation}}-{{model_component}}]] + script = "swell task GetObservations $config -d $datetime -m {{model_component}} -a {{observation}}" + inherit = GetObservations-{{model_component}} + {% endfor %} + # --------------------------------------------------------------------------------------------- + [[RenderJediObservations-{{model_component}}]] script = "swell task RenderJediObservations $config -d $datetime -m {{model_component}}" diff --git a/src/swell/suites/3dvar_marine/flow.cylc b/src/swell/suites/3dvar_marine/flow.cylc index 9b1d063fd..a9a4d7a3f 100644 --- a/src/swell/suites/3dvar_marine/flow.cylc +++ b/src/swell/suites/3dvar_marine/flow.cylc @@ -21,6 +21,9 @@ initial cycle point = {{start_cycle_point}} final cycle point = {{final_cycle_point}} runahead limit = {{runahead_limit}} + [[queues]] + [[[default]]] + limit = {{concurrent_limit}} [[graph]] R1 = """ @@ -60,7 +63,9 @@ GetBackground-{{model_component}} => RunJediVariationalExecutable-{{model_component}} GenerateBClimatology-{{model_component}} => RunJediVariationalExecutable-{{model_component}} - GetObservations-{{model_component}} => RenderJediObservations-{{model_component}} + {% for observation in models[model_component]['observations'] %} + GetObservations-{{observation}}-{{model_component}} => RenderJediObservations-{{model_component}} + {% endfor %} RenderJediObservations-{{model_component}} => RunJediVariationalExecutable-{{model_component}} # EvaObservations @@ -125,8 +130,17 @@ [[ GetBackground-{{model_component}} ]] script = "swell task GetBackground $config -d $datetime -m {{model_component}}" + # --------------------------------------------------------------------------------------------- + [[GetObservations-{{model_component}}]] - script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + + {% for observation in models[model_component]['observations'] %} + [[GetObservations-{{observation}}-{{model_component}}]] + script = "swell task GetObservations $config -d $datetime -m {{model_component}} -a {{observation}}" + inherit = GetObservations-{{model_component}} + {% endfor %} + # --------------------------------------------------------------------------------------------- [[GenerateBClimatology-{{model_component}}]] script = "swell task GenerateBClimatology $config -d $datetime -m {{model_component}}" diff --git a/src/swell/suites/3dvar_marine_cycle/flow.cylc b/src/swell/suites/3dvar_marine_cycle/flow.cylc index d16910113..bf7d57c93 100644 --- a/src/swell/suites/3dvar_marine_cycle/flow.cylc +++ b/src/swell/suites/3dvar_marine_cycle/flow.cylc @@ -20,6 +20,9 @@ initial cycle point = {{start_cycle_point}} final cycle point = {{final_cycle_point}} + [[queues]] + [[[default]]] + limit = {{concurrent_limit}} [[graph]] R1 = """ @@ -78,7 +81,9 @@ StageJediCycle-{{model_component}} => RunJediVariationalExecutable-{{model_component}} GenerateBClimatology-{{model_component}} => RunJediVariationalExecutable-{{model_component}} - GetObservations-{{model_component}} => RenderJediObservations-{{model_component}} + {% for observation in models[model_component]['observations'] %} + GetObservations-{{observation}}-{{model_component}} => RenderJediObservations-{{model_component}} + {% endfor %} RenderJediObservations-{{model_component}} => RunJediVariationalExecutable-{{model_component}} # Run analysis diagnostics @@ -187,8 +192,19 @@ [[StageJediCycle-{{model_component}}]] script = "swell task StageJedi $config -d $datetime -m {{model_component}}" + # --------------------------------------------------------------------------------------------- + [[GetObservations-{{model_component}}]] - script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + + # --------------------------------------------------------------------------------------------- + + {% for observation in models[model_component]['observations'] %} + [[GetObservations-{{observation}}-{{model_component}}]] + script = "swell task GetObservations $config -d $datetime -m {{model_component}} -a {{observation}}" + inherit = GetObservations-{{model_component}} + {% endfor %} + # --------------------------------------------------------------------------------------------- [[GenerateBClimatology-{{model_component}}]] script = "swell task GenerateBClimatology $config -d $datetime -m {{model_component}}" diff --git a/src/swell/suites/hofx/flow.cylc b/src/swell/suites/hofx/flow.cylc index 666399b9d..bcb3bc31c 100644 --- a/src/swell/suites/hofx/flow.cylc +++ b/src/swell/suites/hofx/flow.cylc @@ -21,6 +21,9 @@ initial cycle point = {{start_cycle_point}} final cycle point = {{final_cycle_point}} runahead limit = {{runahead_limit}} + [[queues]] + [[[default]]] + limit = {{concurrent_limit}} [[graph]] R1 = """ @@ -55,7 +58,7 @@ GetBackgroundGeosExperiment-{{model_component}} :fail? => GetBackground-{{model_component}} # Get observations - GetObsNotInR2d2-{{model_component}}: fail? => GetObservations-{{model_component}} + GetObsNotInR2d2-{{model_component}}: fail? => GetObservationsStart # Perform staging that is cycle dependent StageJediCycle-{{model_component}} @@ -65,7 +68,10 @@ CloneJedi[^] => StageJediCycle-{{model_component}} StageJediCycle-{{model_component}} => RunJediHofxExecutable-{{model_component}} GetBackgroundGeosExperiment-{{model_component}}? | GetBackground-{{model_component}} => RunJediHofxExecutable-{{model_component}} - GetObsNotInR2d2-{{model_component}}? | GetObservations-{{model_component}} => RenderJediObservations-{{model_component}} + {% for observation in models[model_component]['observations'] %} + GetObservationsStart => GetObservations-{{observation}}-{{model_component}} => GetObservationsFinish + {% endfor %} + GetObsNotInR2d2-{{model_component}}? | GetObservationsFinish => RenderJediObservations-{{model_component}} RenderJediObservations-{{model_component}} => RunJediHofxExecutable-{{model_component}} GenerateObservingSystemRecords-{{model_component}} => RunJediHofxExecutable-{{model_component}} @@ -114,6 +120,12 @@ --{{key}} = {{value}} {%- endfor %} + [[GetObservationsStart]] + script = "true" + + [[GetObservationsFinish]] + script = "true" + {% for model_component in model_components %} [[CloneGeosMksi-{{model_component}}]] @@ -131,8 +143,19 @@ [[GetBackgroundGeosExperiment-{{model_component}} ]] script = "swell task GetBackgroundGeosExperiment $config -d $datetime -m {{model_component}}" + # --------------------------------------------------------------------------------------------- + [[GetObservations-{{model_component}}]] - script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + + # --------------------------------------------------------------------------------------------- + + {% for observation in models[model_component]['observations'] %} + [[GetObservations-{{observation}}-{{model_component}}]] + script = "swell task GetObservations $config -d $datetime -m {{model_component}} -a {{observation}}" + inherit = GetObservations-{{model_component}} + {% endfor %} + # --------------------------------------------------------------------------------------------- [[GetObsNotInR2d2-{{model_component}}]] script = "swell task GetObsNotInR2d2 $config -d $datetime -m {{model_component}}" diff --git a/src/swell/suites/hofx_cf/flow.cylc b/src/swell/suites/hofx_cf/flow.cylc index 562482ecf..29afcbdac 100644 --- a/src/swell/suites/hofx_cf/flow.cylc +++ b/src/swell/suites/hofx_cf/flow.cylc @@ -21,6 +21,9 @@ initial cycle point = {{start_cycle_point}} final cycle point = {{final_cycle_point}} runahead limit = {{runahead_limit}} + [[queues]] + [[[default]]] + limit = {{concurrent_limit}} [[graph]] R1 = """ @@ -48,8 +51,9 @@ # Get background GetBackground-{{model_component}} - # Get observations - GetObservations-{{model_component}} + {% for observation in models[model_component]['observations'] %} + GetObservations-{{observation}}-{{model_component}} => RenderJediObservations-{{model_component}} + {% endfor %} # Perform staging that is cycle dependent StageJediCycle-{{model_component}} @@ -59,7 +63,6 @@ StageJediCycle-{{model_component}} => RunJediHofxExecutable-{{model_component}} GetBackground-{{model_component}} => RunJediHofxExecutable-{{model_component}} - GetObservations-{{model_component}} => RenderJediObservations-{{model_component}} RenderJediObservations-{{model_component}} => RunJediHofxExecutable-{{model_component}} # EvaObservations @@ -115,8 +118,19 @@ [[GetBackground-{{model_component}}]] script = "swell task GetBackground $config -d $datetime -m {{model_component}}" + # --------------------------------------------------------------------------------------------- + [[GetObservations-{{model_component}}]] script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + + # --------------------------------------------------------------------------------------------- + + {% for observation in models[model_component]['observations'] %} + [[GetObservations-{{observation}}-{{model_component}}]] + script = "swell task GetObservations $config -d $datetime -m {{model_component}} -a {{observation}}" + inherit = GetObservations-{{model_component}} + {% endfor %} + # --------------------------------------------------------------------------------------------- [[RenderJediObservations-{{model_component}}]] script = "swell task RenderJediObservations $config -d $datetime -m {{model_component}}" diff --git a/src/swell/suites/localensembleda/flow.cylc b/src/swell/suites/localensembleda/flow.cylc index 8675bccba..144b636b9 100644 --- a/src/swell/suites/localensembleda/flow.cylc +++ b/src/swell/suites/localensembleda/flow.cylc @@ -21,6 +21,9 @@ initial cycle point = {{start_cycle_point}} final cycle point = {{final_cycle_point}} runahead limit = {{runahead_limit}} + [[queues]] + [[[default]]] + limit = {{concurrent_limit}} [[graph]] R1 = """ @@ -51,9 +54,13 @@ # Perform staging that is cycle dependent BuildJediByLinking[^]? | BuildJedi[^] => StageJediCycle-{{model_component}} => sync_point - GetObsNotInR2d2-{{model_component}}: fail? => GetObservations-{{model_component}} + GetObsNotInR2d2-{{model_component}}: fail? => GetObservationsStart - GetObsNotInR2d2-{{model_component}}? | GetObservations-{{model_component}} => RenderJediObservations-{{model_component}} + {% for observation in models[model_component]['observations'] %} + GetObservationsStart => GetObservations-{{observation}}-{{model_component}} => GetObservationsFinish + {% endfor %} + + GetObsNotInR2d2-{{model_component}}? | GetObservationsFinish => RenderJediObservations-{{model_component}} RenderJediObservations-{{model_component}} => sync_point @@ -133,6 +140,12 @@ --{{key}} = {{value}} {%- endfor %} + [[GetObservationsStart]] + script = "true" + + [[GetObservationsFinish]] + script = "true" + {% for model_component in model_components %} [[CloneGeosMksi-{{model_component}}]] @@ -171,8 +184,19 @@ --{{key}} = {{value}} {%- endfor %} + # --------------------------------------------------------------------------------------------- + [[GetObservations-{{model_component}}]] - script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + script = "swell task GetObservations $config -d $datetime -m {{model_component}}" + + # --------------------------------------------------------------------------------------------- + + {% for observation in models[model_component]['observations'] %} + [[GetObservations-{{observation}}-{{model_component}}]] + script = "swell task GetObservations $config -d $datetime -m {{model_component}} -a {{observation}}" + inherit = GetObservations-{{model_component}} + {% endfor %} + # --------------------------------------------------------------------------------------------- [[GetObsNotInR2d2-{{model_component}}]] script = "swell task GetObsNotInR2d2 $config -d $datetime -m {{model_component}}" diff --git a/src/swell/suites/suite_questions.py b/src/swell/suites/suite_questions.py index cc2824591..ad93fbd5e 100644 --- a/src/swell/suites/suite_questions.py +++ b/src/swell/suites/suite_questions.py @@ -41,7 +41,8 @@ class SuiteQuestions(QuestionContainer, Enum): qd.final_cycle_point(), qd.model_components(), qd.runahead_limit(), - qd.r2d2_experiment_id(), + qd.concurrent_limit(), + qd.r2d2_experiment_id() ] ) diff --git a/src/swell/swell.py b/src/swell/swell.py index 3812b4770..43c1cc948 100644 --- a/src/swell/swell.py +++ b/src/swell/swell.py @@ -82,6 +82,10 @@ def swell_driver() -> None: or for task-model combinations. """ +additional_parameter_help = """ +Additional option to specify parameters to task, context-dependent on individual task. +""" + # -------------------------------------------------------------------------------------------------- @@ -184,12 +188,15 @@ def launch( @click.argument('config') @click.option('-d', '--datetime', 'datetime', default=None, help=datetime_help) @click.option('-m', '--model', 'model', default=None, help=model_help) +@click.option('-a', '--additional-parameter', 'additional_parameter', + default=None, help=additional_parameter_help) @click.option('-p', '--ensemblePacket', 'ensemblePacket', default=None, help=ensemble_help) def task( task: str, config: str, datetime: Optional[str], model: Optional[str], + additional_parameter: Optional[str], ensemblePacket: Optional[str] ) -> None: """ @@ -202,7 +209,7 @@ def task( config (str): Path to the configuration file for the task.\n """ - task_wrapper(task, config, datetime, model, ensemblePacket) + task_wrapper(task, config, datetime, model, additional_parameter, ensemblePacket) # -------------------------------------------------------------------------------------------------- diff --git a/src/swell/tasks/base/task_base.py b/src/swell/tasks/base/task_base.py index 109b3af74..07663e9f1 100644 --- a/src/swell/tasks/base/task_base.py +++ b/src/swell/tasks/base/task_base.py @@ -41,6 +41,7 @@ def __init__( datetime_input: Optional[str], model: str, ensemblePacket: Optional[str], + additional_parameter: Optional[str], task_name: str ) -> None: @@ -62,6 +63,10 @@ def __init__( if datetime_input is not None: self.__datetime__ = Datetime(datetime_input) + # Keep copy of additional parameter + # --------------------------------- + self.__additional_parameter__ = additional_parameter + # Keep copy of ensemblePacket # --------------------------- self.__ensemble_packet__ = ensemblePacket @@ -173,6 +178,11 @@ def get_model(self) -> str: # ---------------------------------------------------------------------------------------------- + def get_parameter(self) -> str: + return self.__additional_parameter__ + + # ---------------------------------------------------------------------------------------------- + def get_model_components(self) -> Union[str, list]: return self.__model_components__ @@ -274,6 +284,7 @@ def create_task( config: str, datetime: Union[str, dt, None], model: str, + additional_parameter: Optional[str], ensemblePacket: Optional[str] ) -> taskBase: @@ -303,7 +314,7 @@ def create_task( factory_logger.info(f'Using module swell.tasks.{task_lower}') # Return task object - return task_class(config, datetime, model, ensemblePacket, task) + return task_class(config, datetime, model, ensemblePacket, additional_parameter, task) # -------------------------------------------------------------------------------------------------- @@ -334,13 +345,15 @@ def task_wrapper( config: str, datetime: Union[str, dt, None], model: Optional[str], + additional_parameter: Optional[str], ensemblePacket: Optional[str] ) -> None: # Create the object constrc_start = time.perf_counter() creator = taskFactory() - task_object = creator.create_task(task, config, datetime, model, ensemblePacket) + task_object = creator.create_task(task, config, datetime, model, additional_parameter, + ensemblePacket) constrc_final = time.perf_counter() constrc_time = f'Constructed in {constrc_final - constrc_start:0.4f} seconds' diff --git a/src/swell/tasks/get_observations.py b/src/swell/tasks/get_observations.py index f8a8a1abd..e053615e8 100644 --- a/src/swell/tasks/get_observations.py +++ b/src/swell/tasks/get_observations.py @@ -98,6 +98,13 @@ def execute(self) -> None: "tlapse" files need to be fetched. """ + # Get the observation from the environment + # ---------------------------------------- + observation = self.get_parameter() + + if observation is None: + raise Exception('No observation specified. Specify observation ' + 'using `swell task GetObservations -a ...`') # Load R2D2 credentials # --------------------- load_r2d2_credentials(self.logger, self.platform()) @@ -106,7 +113,6 @@ def execute(self) -> None: # ------------ obs_experiment = self.config.obs_experiment() background_time_offset = self.config.background_time_offset() - observations = self.config.observations() window_length = self.config.window_length() crtm_coeff_dir = self.config.crtm_coeff_dir(None) window_length = self.config.window_length() @@ -151,180 +157,176 @@ def execute(self) -> None: # Read observation ioda names ioda_names_list = get_ioda_names_list() - # Loop over observation operators - # ------------------------------- - for observation in observations: - - # Open the observation operator dictionary - # ---------------------------------------- - observation_dict = self.jedi_rendering.render_interface_observations(observation) - - # Get the set obs providers for each observation - # ---------------------------------------------- - obs_provider = get_provider_for_observation(observation, ioda_names_list, self.logger) - - # Fetch observation files - # ----------------------- - combine_input_files = [] - # Here, we are fetching - for obs_num, obs_time in enumerate(obs_list_dto): - obs_window_begin = dt.strftime(obs_time, datetime_formats['iso_format']) - target_file = os.path.join(self.cycle_dir(), f'{observation}.{obs_num}.nc4') - combine_input_files.append(target_file) - - fetch_criteria = { - 'item': 'observation', # Required for r2d2 v3 - 'provider': obs_provider, # What we registered with - 'observation_type': observation, # From filename - 'file_extension': 'nc4', - 'window_start': obs_window_begin, # From filename timestamp - 'window_length': obs_window_length, # From filename - 'target_file': target_file, # Where to save - } - - try: - r2d2.fetch(**fetch_criteria) - self.logger.info(f"Successfully fetched {target_file}") - except Exception: - self.logger.info( - f"Failed to fetch {target_file}. " - "Fetch empty observation instead." - ) - - # fetch empty obs - r2d2.fetch( - item='observation', - provider='empty_provider', - observation_type='empty_type', - file_extension='nc4', - window_start='19700101T030000Z', - window_length='PT6H', - target_file=target_file, - ) - - # Check how many of the combine_input_files exist in the cycle directory. - # If all of them are missing proceed without creating an observation input - # file since bias correction files still need to be propagated to the next cycle - # for cycling VarBC. - # ----------------------------------------------------------------------- - if not any([os.path.exists(f) for f in combine_input_files]): - self.logger.info(f'None of the {observation} files exist for this cycle!') - else: - jedi_obs_file = observation_dict['obs space']['obsdatain']['engine']['obsfile'] - self.logger.info(f'Processing observation file {jedi_obs_file}') - # If obs_list_dto has one member, then just rename the file - # --------------------------------------------------------- - if len(obs_list_dto) == 1: - os.rename(combine_input_files[0], jedi_obs_file) - else: - self.read_and_combine(combine_input_files, jedi_obs_file) - # Change permission - os.chmod(jedi_obs_file, 0o644) - - # Otherwise there is only work to do if the observation operator has bias correction - # ---------------------------------------------------------------------------------- - if 'obs bias' not in observation_dict: - continue + # Open the observation operator dictionary + # ---------------------------------------- + observation_dict = self.jedi_rendering.render_interface_observations(observation) + + # Get the set obs providers for each observation + # ---------------------------------------------- + obs_provider = get_provider_for_observation(observation, ioda_names_list, self.logger) + + # Fetch observation files + # ----------------------- + combine_input_files = [] + # Here, we are fetching + for obs_num, obs_time in enumerate(obs_list_dto): + obs_window_begin = dt.strftime(obs_time, datetime_formats['iso_format']) + target_file = os.path.join(self.cycle_dir(), f'{observation}.{obs_num}.nc4') + combine_input_files.append(target_file) + + fetch_criteria = { + 'item': 'observation', # Required for r2d2 v3 + 'provider': obs_provider, # What we registered with + 'observation_type': observation, # From filename + 'file_extension': 'nc4', + 'window_start': obs_window_begin, # From filename timestamp + 'window_length': obs_window_length, # From filename + 'target_file': target_file, # Where to save + } + + try: + r2d2.fetch(**fetch_criteria) + self.logger.info(f"Successfully fetched {target_file}") + except Exception: + self.logger.info( + f"Failed to fetch {target_file}. " + "Fetch empty observation instead." + ) - # Satellite and aircraft bias correction (coeff and cov) files - # ----------------------------------------------- - target_bccoef = observation_dict['obs bias']['input file'] - target_bccovr = observation_dict['obs bias']['covariance']['prior']['input file'] - - # We assume fetch is required unless we are cycling VarBC - fetch_required = True - - if cycling_varbc: - if self.cycle_time_dto() == self.start_cycle_point_dto(): - self.logger.info(f'Process bias file {target_bccoef} for the first cycle') - self.logger.info(f'Process bias file {target_bccovr} for the first cycle') - else: - self.logger.info(f'Using bias files from the previous cycle') - previous_bias_coef = self.previous_cycle_bias(target_bccoef, window_length) - previous_bias_covr = self.previous_cycle_bias(target_bccovr, window_length) - # Link the previous bias file to the current cycle directory - self.logger.info(f'Linking {previous_bias_coef} to {target_bccoef}') - self.geos.linker(previous_bias_coef, target_bccoef, dst_dir=self.cycle_dir()) - self.logger.info(f'Linking {previous_bias_covr} to {target_bccovr}') - self.geos.linker(previous_bias_covr, target_bccovr, dst_dir=self.cycle_dir()) - fetch_required = False - - # Determine the bias file extension and map to R2D2 file_type enum - if observation == 'aircraft_temperature': - bias_file_ext = 'acftbias' - bias_file_type = 'obsbias_coefficients' # Official JCSDA enum - bias_err_type = 'obsbias_coeff_errors' # Official JCSDA enum - # TODO: Do we want to use bias corrections for winds? - # TODO: Confirm for extension and err_type. Bias files exist for aircraft_wind - elif observation == 'aircraft_wind': - bias_file_type = None # Option A: Skip - # Option B: Enable (if bias files should be used for aircraft_wind) - # bias_file_ext = 'acftbias' - # bias_coef_type = 'obsbias_coefficients' - # bias_err_type = 'obsbias_coeff_errors' + # fetch empty obs + r2d2.fetch( + item='observation', + provider='empty_provider', + observation_type='empty_type', + file_extension='nc4', + window_start='19700101T030000Z', + window_length='PT6H', + target_file=target_file, + ) + + # Check how many of the combine_input_files exist in the cycle directory. + # If all of them are missing proceed without creating an observation input + # file since bias correction files still need to be propagated to the next cycle + # for cycling VarBC. + # ----------------------------------------------------------------------- + if not any([os.path.exists(f) for f in combine_input_files]): + self.logger.info(f'None of the {observation} files exist for this cycle!') + else: + jedi_obs_file = observation_dict['obs space']['obsdatain']['engine']['obsfile'] + self.logger.info(f'Processing observation file {jedi_obs_file}') + # If obs_list_dto has one member, then just rename the file + # --------------------------------------------------------- + if len(obs_list_dto) == 1: + os.rename(combine_input_files[0], jedi_obs_file) else: - # Satellite observations - bias_file_ext = 'satbias' - bias_file_type = 'satbias' # Official JCSDA enum - bias_err_type = 'obsbias_coeff_errors' # Official JCSDA enum - - # This will skip the fetch if we are cycling VarBC - if bias_file_type is not None: - if fetch_required: - # Fetch coefficients file (.acftbias or .satbias) - self.logger.info(f'Processing bias file {target_bccoef}') - r2d2.fetch( - item='bias_correction', - target_file=target_bccoef, - model=r2d2_model, - experiment=obs_experiment, - provider='gsi', - observation_type=observation, - file_extension=bias_file_ext, - file_type=bias_file_type, - date=background_time_iso - ) - - r2d2.fetch( - item='bias_correction', - target_file=target_bccovr, - model=r2d2_model, - experiment=obs_experiment, - provider='gsi', - observation_type=observation, - file_extension=bias_file_ext + '_cov', - file_type=bias_err_type, # obsbias_coeff_errors Official JCSDA enum - date=background_time_iso - ) - # Change permission - os.chmod(target_bccoef, 0o644) - os.chmod(target_bccovr, 0o644) - - # Skip time lapse part for aircraft observations - # ---------------------------------------------- - if observation == 'aircraft_temperature' or observation == 'aircraft_wind': - continue + self.read_and_combine(combine_input_files, jedi_obs_file) + # Change permission + os.chmod(jedi_obs_file, 0o644) + + # Otherwise there is only work to do if the observation operator has bias correction + # ---------------------------------------------------------------------------------- + if 'obs bias' not in observation_dict: + return - # Satellite time lapse - # -------------------- - for target_file in self.get_tlapse_files(observation_dict): + # Satellite and aircraft bias correction (coeff and cov) files + # ----------------------------------------------- + target_bccoef = observation_dict['obs bias']['input file'] + target_bccovr = observation_dict['obs bias']['covariance']['prior']['input file'] - self.logger.info(f'Processing satellite time lapse file {target_file}') + # We assume fetch is required unless we are cycling VarBC + fetch_required = True + if cycling_varbc: + if self.cycle_time_dto() == self.start_cycle_point_dto(): + self.logger.info(f'Process bias file {target_bccoef} for the first cycle') + self.logger.info(f'Process bias file {target_bccovr} for the first cycle') + else: + self.logger.info(f'Using bias files from the previous cycle') + previous_bias_coef = self.previous_cycle_bias(target_bccoef, window_length) + previous_bias_covr = self.previous_cycle_bias(target_bccovr, window_length) + # Link the previous bias file to the current cycle directory + self.logger.info(f'Linking {previous_bias_coef} to {target_bccoef}') + self.geos.linker(previous_bias_coef, target_bccoef, dst_dir=self.cycle_dir()) + self.logger.info(f'Linking {previous_bias_covr} to {target_bccovr}') + self.geos.linker(previous_bias_covr, target_bccovr, dst_dir=self.cycle_dir()) + fetch_required = False + + # Determine the bias file extension and map to R2D2 file_type enum + if observation == 'aircraft_temperature': + bias_file_ext = 'acftbias' + bias_file_type = 'obsbias_coefficients' # Official JCSDA enum + bias_err_type = 'obsbias_coeff_errors' # Official JCSDA enum + # TODO: Do we want to use bias corrections for winds? + # TODO: Confirm for extension and err_type. Bias files exist for aircraft_wind + elif observation == 'aircraft_wind': + bias_file_type = None # Option A: Skip + # Option B: Enable (if bias files should be used for aircraft_wind) + # bias_file_ext = 'acftbias' + # bias_coef_type = 'obsbias_coefficients' + # bias_err_type = 'obsbias_coeff_errors' + else: + # Satellite observations + bias_file_ext = 'satbias' + bias_file_type = 'satbias' # Official JCSDA enum + bias_err_type = 'obsbias_coeff_errors' # Official JCSDA enum + + # This will skip the fetch if we are cycling VarBC + if bias_file_type is not None: + if fetch_required: + # Fetch coefficients file (.acftbias or .satbias) + self.logger.info(f'Processing bias file {target_bccoef}') r2d2.fetch( item='bias_correction', - target_file=target_file, + target_file=target_bccoef, model=r2d2_model, experiment=obs_experiment, provider='gsi', observation_type=observation, - file_extension='tlapse', - file_type='obsbias_tlapse', # Official JCSDA enum + file_extension=bias_file_ext, + file_type=bias_file_type, date=background_time_iso ) - # Change permission - os.chmod(target_file, 0o644) + r2d2.fetch( + item='bias_correction', + target_file=target_bccovr, + model=r2d2_model, + experiment=obs_experiment, + provider='gsi', + observation_type=observation, + file_extension=bias_file_ext + '_cov', + file_type=bias_err_type, # obsbias_coeff_errors Official JCSDA enum + date=background_time_iso + ) + # Change permission + os.chmod(target_bccoef, 0o644) + os.chmod(target_bccovr, 0o644) + + # Skip time lapse part for aircraft observations + # ---------------------------------------------- + if observation == 'aircraft_temperature' or observation == 'aircraft_wind': + return + + # Satellite time lapse + # -------------------- + for target_file in self.get_tlapse_files(observation_dict): + + self.logger.info(f'Processing satellite time lapse file {target_file}') + + r2d2.fetch( + item='bias_correction', + target_file=target_file, + model=r2d2_model, + experiment=obs_experiment, + provider='gsi', + observation_type=observation, + file_extension='tlapse', + file_type='obsbias_tlapse', # Official JCSDA enum + date=background_time_iso + ) + + # Change permission + os.chmod(target_file, 0o644) # ---------------------------------------------------------------------------------------------- diff --git a/src/swell/utilities/question_defaults.py b/src/swell/utilities/question_defaults.py index 2441c6f45..fc773bab1 100644 --- a/src/swell/utilities/question_defaults.py +++ b/src/swell/utilities/question_defaults.py @@ -34,6 +34,16 @@ class comparison_experiment_paths(SuiteQuestion): # -------------------------------------------------------------------------------------------------- + @dataclass + class concurrent_limit(SuiteQuestion): + default_value: int = 10 + question_name: str = "concurrent_limit" + ask_question: bool = False + prompt: str = "Cylc parameter for how many jobs can run concurrently." + widget_type: WType = WType.INTEGER + + # -------------------------------------------------------------------------------------------------- + @dataclass class cycle_times(SuiteQuestion): default_value: str = "defer_to_model"