Skip to content

Commit

Permalink
adapt most of l3
Browse files Browse the repository at this point in the history
  • Loading branch information
j-haacker committed Dec 12, 2024
1 parent bf4c6fe commit 515b0c7
Showing 1 changed file with 138 additions and 56 deletions.
194 changes: 138 additions & 56 deletions cryoswath/l3.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dask.array
import datetime
from dateutil.relativedelta import relativedelta
import geopandas as gpd
Expand Down Expand Up @@ -114,6 +115,110 @@ def med_iqr_cnt(data):
__all__.append("med_iqr_cnt")


def cache_l2_data(region_of_interest: str|shapely.Polygon,
start_datetime: str|pd.Timestamp,
end_datetime: str|pd.Timestamp, *,
max_elev_diff: float = 150,
timestep_months: int = 1,
window_ntimesteps: int = 3,
cache_filename: str = None,
cache_filename_extra: str = None,
crs: CRS|int = None,
reprocess: bool = False,
**l2_from_id_kwargs
) -> None:
if window_ntimesteps%2 - 1:
old_window = window_ntimesteps
window_ntimesteps = (window_ntimesteps//2+1)
warnings.warn(f"The window should be a uneven number of time steps. You asked for {old_window}, but it has "+ f"been changed to {window_ntimesteps}.")
# ! end time step should be included.
start_datetime, end_datetime = pd.to_datetime([start_datetime, end_datetime])
# this function only makes sense for multiple months, so assume input
# was on the month scale and set end_datetime to end of month
end_datetime = end_datetime.normalize() + pd.offsets.MonthBegin() - pd.Timedelta(1, "s")
if "buffer_region_by" not in locals():
# buffer_by defaults to 30 km to not miss any tracks. Usually,
# 10 km should do.
buffer_region_by = 30_000
time_buffer_months = (window_ntimesteps*timestep_months)//2
print("Caching l2 data for",
"the region "+region_of_interest if isinstance(region_of_interest, str) else "a custom area",
f"from {start_datetime} to {end_datetime} +-{relativedelta(months=time_buffer_months)}.")
cs_tracks = load_cs_ground_tracks(region_of_interest, start_datetime, end_datetime,
buffer_period_by=relativedelta(months=time_buffer_months),
buffer_region_by=buffer_region_by)
print("First and last available ground tracks are on",
f"{cs_tracks.index[0]} and {cs_tracks.index[-1]}, respectively.,",
f"{cs_tracks.shape[0]} tracks in total."
"\n[note] Run update_cs_ground_tracks, optionally with `full=True` or",
"`incremental=True`, if you local ground tracks store is not up to",
"date. Consider pulling the latest version from the repository.")

# ! exclude data out of regions total_bounds in l2.from_id (?possible/logically consistent?)
print("Storing the essential L2 data in hdf5, downloading and",
"processing L1b files if not available...")
if isinstance(region_of_interest, str):
region_id = region_of_interest
region_of_interest = load_glacier_outlines(region_id, "glaciers")
else:
region_id = "_".join([f"{region_of_interest.centroid.x:.0f}", f"{region_of_interest.centroid.y:.0f}"])
if cache_filename is None:
cache_filename = region_id
if cache_filename_extra is not None:
cache_filename += "_"+cache_filename_extra
cache_fullname = os.path.join(tmp_path, cache_filename)
if crs is None:
crs = find_planar_crs(shp=region_of_interest)
else:
crs = ensure_pyproj_crs(crs)
# cutting to actual glacier outlines takes very long. if needed, implement multiprocessing.
# bbox = gpd.GeoSeries(
# shapely.box(*gpd.GeoSeries(region_of_interest, crs=4326).to_crs(crs).bounds.values[0]),
# crs=crs)
# below tries to balance a large cache file with speed. it is not meant
# to retain data in the suroundings - this is merely needed for the
# implicit `simplify`` which would come at the cost of data if not
# buffered
bbox = gpd.GeoSeries(buffer_4326_shp(region_of_interest, 3_000), crs=4326).to_crs(crs)

# l2 backs up the cache when writing to it. however, there should not be a backup, yet. if there is, throw an error
if os.path.isfile(cache_fullname+"__backup"):
raise Exception(f"Backup exists unexpectedly at {cache_fullname+'__backup'}. This may point to a running process. If this is a relict, remove it manually.")
try:
l2.from_id(cs_tracks.index, reprocess=reprocess, save_or_return="save", cache_fullname=cache_fullname, crs=crs,
bbox=bbox, max_elev_diff=max_elev_diff,
**filter_kwargs(l2.from_id, l2_from_id_kwargs,
blacklist=["cache", "max_elev_diff", "save_or_return", "reprocess"]))
finally:
# remove l2's cache backup. it is not needed as no more writing takes
# place but it occupies some 10 Gb disk space.
if os.path.isfile(cache_fullname+"__backup"):
os.remove(cache_fullname+"__backup")
print("Successfully finished caching for",
"the region "+region_of_interest if isinstance(region_of_interest, str) else "a custom area",
f"from {start_datetime} to {end_datetime} +-{relativedelta(months=time_buffer_months)}.")
__all__.append("cache_l2_data")


def preallocate_zarr(
path,
bbox,
crs,
time_index,
data_vars
) -> None:
x_dummy = np.arange((bbox.bounds[0]//500+.5)*500, bbox.bounds[2], 500, dtype="int")
y_dummy = np.arange((bbox.bounds[1]//500+.5)*500, bbox.bounds[3], 500, dtype="int")
array_dummy = xr.DataArray(
dask.array.full(shape=(len(time_index), len(x_dummy), len(y_dummy)), fill_value=np.nan),
coords={"time": time_index, "x": x_dummy, "y": y_dummy}
)
(xr.merge([array_dummy.rename(stat) for stat in data_vars])
.rio.write_crs(crs)
.to_zarr(path, compute=False))
__all__.append("preallocate_zarr")


def build_dataset(region_of_interest: str|shapely.Polygon,
start_datetime: str|pd.Timestamp,
end_datetime: str|pd.Timestamp, *,
Expand Down Expand Up @@ -148,10 +253,6 @@ def build_dataset(region_of_interest: str|shapely.Polygon,
# 10 km should do.
buffer_region_by = 30_000
time_buffer_months = (window_ntimesteps*timestep_months)//2
ext_t_axis = pd.date_range(start_datetime-pd.DateOffset(months=time_buffer_months),
end_datetime+pd.DateOffset(months=time_buffer_months),
freq=f"{timestep_months}MS",
).astype("int64")
cs_tracks = load_cs_ground_tracks(region_of_interest, start_datetime, end_datetime,
buffer_period_by=relativedelta(months=time_buffer_months),
buffer_region_by=buffer_region_by)
Expand Down Expand Up @@ -185,28 +286,36 @@ def build_dataset(region_of_interest: str|shapely.Polygon,
# crs=crs)
# below tries to balance a large cache file with speed. it is not meant
# to retain data in the suroundings - this is merely needed for the
# implicit `simplify`` which would come at the cost of data if not
# implicit `simplify` which would come at the cost of data if not
# buffered
bbox = gpd.GeoSeries(buffer_4326_shp(region_of_interest, 3_000), crs=4326).to_crs(crs)
region_of_interest = gpd.GeoSeries(buffer_4326_shp(region_of_interest, 3_000), crs=4326).to_crs(crs).make_valid()

# l2 backs up the cache when writing to it. however, there should not be a backup, yet. if there is, throw an error
if os.path.isfile(cache_fullname+"__backup"):
raise Exception(f"Backup exists unexpectedly at {cache_fullname+'__backup'}. This may point to a running process. If this is a relict, remove it manually.")
try:
l2.from_id(cs_tracks.index, reprocess=reprocess, save_or_return="save", cache_fullname=cache_fullname, crs=crs,
bbox=bbox, max_elev_diff=max_elev_diff,
bbox=region_of_interest, max_elev_diff=max_elev_diff,
**filter_kwargs(l2.from_id, l2_from_id_kwargs,
blacklist=["cache", "max_elev_diff", "save_or_return", "reprocess"]))
finally:
# remove l2's cache backup. it is not needed as no more writing takes
# place but it occupies some 10 Gb disk space.
if os.path.isfile(cache_fullname+"__backup"):
os.remove(cache_fullname+"__backup")
ext_t_axis = pd.date_range(start_datetime-pd.DateOffset(months=time_buffer_months),
end_datetime+pd.DateOffset(months=time_buffer_months),
freq=f"{timestep_months}MS")
# strip GeoSeries-container -> shapely.Geometry
region_of_interest = region_of_interest.iloc[0]
outfilepath = build_path(region_id, timestep_months, spatial_res_meter)
if not reprocess and os.path.isfile(outfilepath):
with xr.open_dataset(outfilepath, decode_coords="all") as ds:
previously_processed_l3 = ds.load().copy()
region_of_interest = gpd.GeoSeries(region_of_interest, crs=4326).to_crs(crs).make_valid().simplify(1_000).iloc[0]
if reprocess and os.path.isdir(outfilepath):
shutil.rmtree(outfilepath)
if os.path.isdir(outfilepath):
previously_processed_l3 = xr.open_zarr(outfilepath, decode_coords="all")
else:
preallocate_zarr(outfilepath, region_of_interest, crs, ext_t_axis, agg_func_and_meta[1].keys())
ext_t_axis = ext_t_axis.astype("int64")
node_list = []
def collect_chunk_names(name, node):
nonlocal node_list
Expand Down Expand Up @@ -288,67 +397,40 @@ def local_closure(roll_iteration):
l3_data = l3_data.sort_index()
l3_data = l3_data.query(f"time >= '{start_datetime}' and time <= '{end_datetime}'")

# note on "erratically" renaming the data below:
# the function consumes much memory. I tried to use xarrays close
# function where possible to reduce the memory footprint. however,
# does not seem to help. xarrays cache might be the problem. see
# issue #32.

# merge with previous data if there are any
if "previously_processed_l3" in locals():
tmp = previously_processed_l3.to_dataframe().dropna(how="any")
previously_processed_l3.close()
previously_processed_l3 = pd.concat([tmp, l3_data], axis=0)
del tmp
print(l3_data.head())
print("data count of combined data", len(previously_processed_l3.index))
# there may not be duplicates. if there are, there is a bug
duplicates = previously_processed_l3.index.duplicated(keep=False)
if any(duplicates):
print(previously_processed_l3.index[duplicates])
print(previously_processed_l3.tail(30))
raise KeyError("Duplicates found in merging new with preexisting data.")
else:
previously_processed_l3 = l3_data
del l3_data
tmp = previously_processed_l3.to_xarray().sortby("x").sortby("y")
del previously_processed_l3
previously_processed_l3 = fill_missing_coords(tmp).rio.write_crs(crs)
tmp.close()
# save/backup result
tmp_backup_path = os.path.join(tmp_path, f"{region_id}_l3")
# try to write new data to file. if anything goes wrong, restore. is this sufficiently safe?
try:
if os.path.isfile(outfilepath):
shutil.move(outfilepath, tmp_backup_path)
previously_processed_l3.to_netcdf(outfilepath)
(dataframe_to_rioxr(l3_data, crs).rio.clip([region_of_interest])
.drop_vars(['spatial_ref'])
.to_zarr(outfilepath, region="auto"))#[["_median", "_iqr", "_count"]]
except Exception as err:
shutil.move(tmp_backup_path, outfilepath)
print("\n")
warnings.warn(
"Failed to write to netcdf! Restored previous state. Printing error"
+ "message below and continuing. Attempting to save to temporary file.")
"Failed to write to zarr! Attempting to dump current dataframe.")
try:
safety_net_tmp_file_path = os.path.join(tmp_path, f"tmp_l3_state__{datetime.datetime.strftime('%dT%H%M%S')}.nc")
previously_processed_l3.to_netcdf(safety_net_tmp_file_path)
safety_net_tmp_file_path = os.path.join(tmp_path,
f"tmp_l3_state__{datetime.datetime.now().strftime('%Y%m%dT%H%M%S')}.feather")
l3_data.to_feather(safety_net_tmp_file_path)
except Exception as err_inner:
print("\n")
warnings.warn(
"Failed again to save safety! There likely is a problem with the data."
+" Rethrowing errors.")
"Failed to do an emergency dump!"
+" Rethrowing errors:")
raise
else:
print("\n", "Managed to save state to", safety_net_tmp_file_path)
print("\n", "Managed to dump current dataframe to", safety_net_tmp_file_path)
print("\n", "Original error is printed below.", str(err), "\n")
else:
print(datetime.datetime.now())
print(f"processed and stored cell", chunk_name)
if os.path.isfile(tmp_backup_path):
os.remove(tmp_backup_path)
return previously_processed_l3
print(l3_data.head())
print("\n\n+++++++++++++ successfully build dataset ++++++++++++++\n\n")
return xr.open_zarr(outfilepath, decode_coords="all")
__all__.append("build_dataset")


def dataframe_to_rioxr(df, crs):
return fill_missing_coords(df.to_xarray()).rio.write_crs(crs)


def build_path(region_of_interest, timestep_months, spatial_res_meter, aggregation_period = None):
# ! implement parsing aggregation period
if not isinstance(region_of_interest, str):
Expand All @@ -368,7 +450,7 @@ def build_path(region_of_interest, timestep_months, spatial_res_meter, aggregati
# if the trailing ".0" should be omitted, that needs to be implemented here
spatial_res_str = f"{round(spatial_res_meter/1000, 1)}km"
return os.path.join(data_path, "L3", "_".join(
[region_id, timestep_str, spatial_res_str+".nc"]))
[region_id, timestep_str, spatial_res_str+".zarr"]))
__all__.append("build_path")


Expand Down

0 comments on commit 515b0c7

Please sign in to comment.