Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 328 additions & 0 deletions experiment.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "b6eba6c5",
"metadata": {},
"outputs": [],
"source": [
"import ee\n",
"import geetools"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0f2d1935",
"metadata": {},
"outputs": [],
"source": [
"ee.Authenticate.geetools.list_user()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8cda9cb7",
"metadata": {},
"outputs": [],
"source": [
"ee.Initialize.geetools.from_user(name=\"pierrick\", project=\"ee-geetools\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ee7f0fd7",
"metadata": {},
"outputs": [],
"source": [
"ee.Initialize.geetools.project_id()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "19d7c112",
"metadata": {},
"outputs": [],
"source": [
"ee.Number(1).getInfo()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f932c8e1",
"metadata": {},
"outputs": [],
"source": [
"# initialize the Google cloud storage client to save the generateed tiff files\n",
"from google.cloud.storage.client import Client\n",
"from google.cloud.storage.bucket import Bucket\n",
"\n",
"client = Client(\n",
" project=ee.Initialize.geetools.project_id(),\n",
" credentials=ee.Initialize.geetools.get_credentials()\n",
")\n",
"list(client.list_buckets())\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "100f7815",
"metadata": {},
"outputs": [],
"source": [
"# NOAA CPC Historical Weather Data for GEE\n",
"\n",
"import re\n",
"import tempfile\n",
"from datetime import datetime as dtm\n",
"from datetime import timedelta\n",
"from pathlib import Path\n",
"from functools import partial\n",
"import argparse\n",
"\n",
"import requests\n",
"import rioxarray as rioxarray\n",
"import xarray as xr\n",
"from tqdm import tqdm\n",
"from xarray.coding.times import CFDatetimeCoder\n",
"\n",
"\n",
"# Start the process timer for performance monitoring\n",
"now = dtm.now()\n",
"\n",
"# Configuration constants\n",
"DEFAULT_ASSET_PATH = \"~/assets/cpc_noaa/cpc_daily_raw\"\n",
"\n",
"# Base URLs for NOAA CPC data servers - each contains the prefix for that data type\n",
"CPC_URLS = {\n",
" \"precip\": \"https://downloads.psl.noaa.gov/Datasets/cpc_global_precip/precip.\",\n",
" \"tmin\": \"https://downloads.psl.noaa.gov/Datasets/cpc_global_temp/tmin.\",\n",
" \"tmax\": \"https://downloads.psl.noaa.gov/Datasets/cpc_global_temp/tmax.\",\n",
"}\n",
"\n",
"print(\"=== Starting NOAA CPC Data Processing ===\")\n",
"\n",
"# Create the Earth Engine collection asset if it doesn't exist\n",
"collection = ee.Asset(DEFAULT_ASSET_PATH).expanduser()\n",
"collection.mkdir(parents=True, exist_ok=True, image_collection=True)\n",
"\n",
"# Update the collection description with README content every time to remain up to date\n",
"readme = (Path(__file__).parent / \"README.md\").read_text()\n",
"collection.setProperties(**{\"description\": \"# The CPC NOAA Daily Weather Data\"})\n",
"\n",
"# Discover available years from NOAA CPC servers\n",
"# We check each data type (precip, tmin, tmax) and find years that exist in ALL datasets\n",
"for data_type, base_url in CPC_URLS.items():\n",
" # Remove the data type prefix to get the directory listing URL\n",
" # e.g., \"precip.\" becomes \"\" to get the directory listing\n",
" response = requests.get(base_url.replace(data_type + \".\", \"\"), timeout=30)\n",
" response.raise_for_status()\n",
"\n",
" # Extract year from filenames like \"precip.1979.nc\", \"tmin.2023.nc\"\n",
" # Using regex to find 4-digit years in NetCDF filenames\n",
" pattern = rf\"{data_type}\\.(\\d{{4}})\\.nc\"\n",
" matches = re.findall(pattern, response.text)\n",
" years_for_datatype = {int(year) for year in matches}\n",
"\n",
" # Keep only years that exist in ALL datasets (intersection)\n",
" # First iteration: set the initial years\n",
" # Subsequent iterations: intersect with previous results\n",
" available_years = years_for_datatype if available_years is None else available_years\n",
" available_years &= years_for_datatype\n",
"\n",
"available_years = sorted([year for year in available_years if year < dtm.now().year])\n",
"print(f\"Processing noa_cpc data for years: {available_years}\")\n",
"\n",
"# Download and process data year by year to manage memory efficiently\n",
"# Process each available year sequentially\n",
"task_list, failed_tasks = [], []\n",
"for year in available_years:\n",
" with tempfile.TemporaryDirectory() as temp_dir:\n",
" # Download all three data files (precip, tmin, tmax) for this year\n",
" for var in [\"precip\", \"tmin\", \"tmax\"]:\n",
" local_path = Path(temp_dir) / f\"{var}.{year}.nc\"\n",
" url = f\"{CPC_URLS[var]}{year}.nc\"\n",
"\n",
" # Only download if file doesn't already exist locally\n",
" if not local_path.exists():\n",
" response = requests.get(url, timeout=300)\n",
" response.raise_for_status()\n",
" local_path.write_bytes(response.content)\n",
"\n",
" # Open the three NetCDF datasets we just downloaded\n",
" time_coder = CFDatetimeCoder(use_cftime=True) # Handle time encoding properly\n",
" precip_file = Path(temp_dir) / f\"precip.{year}.nc\"\n",
" tmin_file = Path(temp_dir) / f\"tmin.{year}.nc\"\n",
" tmax_file = Path(temp_dir) / f\"tmax.{year}.nc\"\n",
"\n",
" # Use chunking to process data efficiently - one day at a time\n",
" # The Lazy Load avoids allocating the entire year into memory at once\n",
" # Note: CPC files use \"lat\"/\"lon\" coordinates, not \"latitude\"/\"longitude\"\n",
" chunks = {\"time\": 1, \"lat\": -1, \"lon\": -1}\n",
" open = partial(xr.open_dataset, decode_times=time_coder, chunks=chunks, engine=\"netcdf4\")\n",
" ds_precip, ds_tmin, ds_tmax = open(precip_file), open(tmin_file), open(tmax_file)\n",
"\n",
" # Find the actual variable names in each dataset\n",
" # NOAA CPC uses different variable names across datasets\n",
" precip_var = next(var for var in ds_precip.data_vars if var.lower().startswith(\"precip\"))\n",
" tmin_var = next(var for var in ds_tmin.data_vars if var.lower().startswith(\"tmin\"))\n",
" tmax_var = next(var for var in ds_tmax.data_vars if var.lower().startswith(\"tmax\"))\n",
"\n",
" # Extract and rename variables to standard names for Earth Engine\n",
" ds_precip = ds_precip[precip_var].rename(\"tp\") # Total precipitation\n",
" ds_tmin = ds_tmin[tmin_var].rename(\"tmin\") # Minimum temperature\n",
" ds_tmax = ds_tmax[tmax_var].rename(\"tmax\") # Maximum temperature\n",
"\n",
" # Combine all three variables into a single dataset\n",
" # This creates a dataset with 3 data variables: tp, tmin, tmax\n",
" ds_list = [ds_precip, ds_tmin, ds_tmax]\n",
" ds_merged = xr.merge(ds_list)\n",
"\n",
" # Close individual datasets since we won't use them anymore\n",
" # This releases file handles and saves memory\n",
" [ds.close() for ds in ds_list]\n",
"\n",
" # Standardize coordinate names to longitude/latitude\n",
" # Some datasets use \"lon/lat\", others use \"longitude/latitude\"\n",
" std_keys= {\"lon\": \"longitude\", \"lat\": \"latitude\"}\n",
" std_keys= {k: v for k, v in std_keys.items() if k in ds_merged.coords}\n",
" ds_merged = ds_merged.rename(std_keys)\n",
"\n",
" # Convert longitude from [0, 360] to [-180, 180] format\n",
" # This is the standard format expected by Earth Engine\n",
" adjusted_lon = (ds_merged[\"longitude\"].values - 180) % 360 - 180\n",
" ds_merged = ds_merged.assign_coords(longitude=adjusted_lon).sortby(\"longitude\")\n",
"\n",
" # Get total number of days in this year's dataset\n",
" total_time_steps = len(ds_merged.time)\n",
"\n",
" # Process each day individually to minimize memory usage\n",
" for day_index in tqdm(range(total_time_steps), desc=f\"Processing {year} days\", unit=\"day\"):\n",
" # Extract the date for this day (cftime is a troublsome format it cannot be safely\n",
" # extracted even with pd.to_datetime)\n",
" cftime_date = ds_merged.time.values[day_index]\n",
" start_time = dtm(cftime_date.year, cftime_date.month, cftime_date.day)\n",
"\n",
" # Create a unique asset ID for this day's data\n",
" filename = f\"cpc_daily_root_{start_time:%Y%m%d}\"\n",
" asset_id = ee.Asset(DEFAULT_ASSET_PATH) / filename\n",
" if asset_id.exists():\n",
" continue\n",
"\n",
" # Extract data for just this one day (memory efficient)\n",
" day_slice = ds_merged.isel(time=day_index)\n",
"\n",
" # Convert to a DataArray with bands in the correct order\n",
" # Earth Engine expects data as a multi-band image\n",
" desired_order = [\"tp\", \"tmin\", \"tmax\"]\n",
" ds_merged_gee = day_slice[desired_order].to_array().rio.write_crs(\"epsg:4326\")\n",
"\n",
" # Export this day's data to Google Earth Engine\n",
" # Create a temporary GeoTIFF file in the same temp directory (elegant solution)\n",
" tiff_file = Path(temp_dir) / f\"cpc_daily_{start_time:%Y%m%d}.tiff\"\n",
"\n",
" # Write the data to the temporary GeoTIFF file\n",
" ds_merged_gee.rio.to_raster(tiff_file, driver=\"GTiff\", dtype=\"float32\", compress=\"lzw\")\n",
"\n",
" # Prepare metadata properties for the Earth Engine asset\n",
" properties = {\"upload_time\": int(dtm.now().timestamp() * 1000)}\n",
"\n",
" # send the file to GCP\n",
"\n",
"\n",
"# # Submit the upload task to Earth Engine\n",
"# try:\n",
"# task = ee.batch.Export.ldc.geotiff.toAsset(\n",
"# filename=str(tiff_file),\n",
"# assetId=asset_id,\n",
"# start_time=start_time,\n",
"# end_time=start_time + timedelta(days=1),\n",
"# properties=properties,\n",
"# band_names=desired_order\n",
"# )\n",
"# task.start()\n",
"# task_list.append(task)\n",
"# except Exception as e:\n",
"# logger.error(f\"Error occurred while submitting task for {start_time}: {e}\")\n",
"# failed_tasks.append(asset_id)\n",
"# continue # Exit the loop on error to avoid further issues\n",
"\n",
"# # Clean up the temporary DataArray\n",
"# ds_merged_gee.close()\n",
"\n",
"# # Close the merged dataset after processing all days for this year\n",
"# # This is crucial for Windows file handle management\n",
"# ds_merged.close()\n",
"# logger.info(f\"Completed generating tasks for year {year}, all datasets closed\")\n",
"\n",
"# # Monitor Tasks\n",
"# logger.warning(\"Do not shut down the process\")\n",
"# logger.info(f\"Waiting for {len(task_list)} tasks to complete...\")\n",
"# start_wait = dtm.now()\n",
"# TM = LDCGEETools.TaskMonitor(task_list)\n",
"# TM.run()\n",
"# elapsed_wait_time = dtm.now() - start_wait\n",
"\n",
"# # update the collection start and end time\n",
"# ic = ee.ImageCollection(collection.as_posix())\n",
"# collection.setProperties(**{\n",
"# \"system:time_start\": ic.aggregate_min(\"system:time_start\").getInfo(),\n",
"# \"system:time_end\": ic.aggregate_max(\"system:time_start\").getInfo(),\n",
"# })\n",
"\n",
"# # log the computation total time including the time spent waiting for Google\n",
"# logger.info(\"Processing completed. All tasks have been submitted for export to GEE.\")\n",
"# elapsed_time = dtm.now() - now\n",
"# logger.info(\n",
"# f\"Total processing time: {elapsed_time}\"\n",
"# f\" including {elapsed_wait_time} waiting for GEE tasks to complete.\"\n",
"# )\n",
"\n",
"# # log is some task could not be send to GEE\n",
"# if len(failed_tasks) > 0:\n",
"# logger.error(\"Some tasks failed to submit to GEE:\")\n",
"# log = \"Task ({}) failed with error: {}\"\n",
"# [logger.error(log.format(task, \"Failed to submit to GEE\")) for task in failed_tasks]\n",
"\n",
"# # log if the data are all there\n",
"# not_completed = [t for t in task_list if t.state() not in [\"COMPLETED\", \"SUCCEEDED\"]]\n",
"# if len(not_completed) > 0:\n",
"# logger.error(\"Some tasks did not complete successfully:\")\n",
"# log = \"Task ({}) failed with status: {}\"\n",
"# [logger.error(log.format(task.task_id, task.state())) for task in not_completed]\n",
"\n",
"# if len(not_completed) > 0 or len(failed_tasks) > 0:\n",
"# exit(1) # return falsy exit\n",
"\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading
Loading