diff --git a/.gitignore b/.gitignore index 0969727..e3b890f 100644 --- a/.gitignore +++ b/.gitignore @@ -134,6 +134,7 @@ dmypy.json dashboard/public/HITL_notes/ dashboard/public/QAQC_plots/ dashboard/public/spectrograms/ +dashboard/public/discrete/ -#mac -.DS_Store \ No newline at end of file +# mac +.DS_Store diff --git a/dashboard/.gitignore b/dashboard/.gitignore index 2a57eff..2a86afa 100644 --- a/dashboard/.gitignore +++ b/dashboard/.gitignore @@ -1,6 +1,5 @@ # Project /public/QAQC_plots/ -/public/discrete/ # Nuxt .nuxt diff --git a/dashboard/app/components/SideBar.vue b/dashboard/app/components/SideBar.vue index 1bbc301..59e463e 100644 --- a/dashboard/app/components/SideBar.vue +++ b/dashboard/app/components/SideBar.vue @@ -168,7 +168,7 @@ const accordionItems = $computed(() => { > Discrete Data -
+
// Fields in series objects used for filtering samples. type SampleFilter = Pick -// Assets that can be selected for plotting. -const selectableAssets = $computed(() => discrete.assets.filter((asset) => asset !== 'PI_REQUEST')) +// Assets that can be selected for plotting, with CTD Cast assets sorted last. +const selectableAssets = $computed(() => { + const assets = compact(discrete.assets.filter((asset) => asset !== 'PI_REQUEST')) + return orderBy(assets, [(asset) => (getAssetGroup(asset) === 'ctd-cast' ? 1 : 0)]) +}) + +function getAssetGroup(asset: string): 'normal' | 'ctd-cast' { + if (asset.startsWith('CTD Cast (')) { + return 'ctd-cast' + } + return 'normal' +} + +// Return the subset of selectable assets that belong to the same group (CTD Cast or non-CTD Cast) +// as the given asset. Used when duplicating a series for all assets of the same kind. If the asset +// is `null` or `undefined`, return all "normal" assets. +function peerAssetsFor(asset: string | null | undefined): string[] { + const group = asset != null ? getAssetGroup(asset) : 'normal' + return selectableAssets.filter((asset) => getAssetGroup(asset) === group) +} // All series with filter fields completely filled out. const series = $computed( @@ -176,6 +194,7 @@ const getSamplesForCache: Record = {} // Return all samples matching the provided filter. function getSamplesFor(filter: SampleFilter): Sample[] { const key = [filter.asset ?? '-', filter.x ?? '-', filter.y ?? '-', filter.year ?? '-'].join('&&') + const cached = getSamplesForCache[key] if (cached != null) { return cached @@ -452,7 +471,7 @@ const option = $computed(() => { top: '24px', left: '0', right: '32px', - bottom: '140px', + bottom: '100px', }, tooltip: { confine: true, @@ -535,7 +554,7 @@ const option = $computed(() => { { id: 'x-slider', type: 'slider', - bottom: 80, + bottom: 48, height: 22, showDetail: false, showDataShadow: false, @@ -556,6 +575,7 @@ const option = $computed(() => { chartedSeries.map((current) => [current.name, current.original.enabled]), ), show: chartedSeries.length <= 20, + type: 'scroll', bottom: 0, }, series: chartedSeries as any, @@ -624,6 +644,16 @@ function removeSeries(index?: number) { return removed } +function removeOtherSeries(index: number) { + const series = state.series[index] + if (series == null) { + return + } + + state.series = [series] + history.save(state) +} + // Move the series at the given index up one position, if possible. function moveSeriesUp(index: number) { const series = state.series[index] @@ -829,10 +859,11 @@ function duplicateSeriesForAllAssets( return } + const peers = peerAssetsFor(series.asset) let assetsToGenerate = withMatchingData ? // All assets for which we have data matching the series' `x`, `y`, and `year`. - selectableAssets.filter((current) => hasDataFor({ ...series, asset: current })) - : selectableAssets + peers.filter((current) => hasDataFor({ ...series, asset: current })) + : peers if (series.asset != null) { // If the original series has an `asset` set and it's in the list, we don't need to generate it. @@ -1053,98 +1084,101 @@ async function copyToClipboard() {
-
-
- -
- - - - - - - - + + +
+ + R + 2 + + = {{ formatValue(rSquared, 6) }} +
+
+
+ + + + - - + + + + - X Y - - - - - - - - -
- -
-
- - R - 2 - - = {{ formatValue(rSquared, 6) }} -
-
+ + history.save(state) + } + " + > + X Y +
+
+ + + + +
-

+

@@ -1322,6 +1356,18 @@ async function copyToClipboard() { onSelect: () => duplicateSeries(i), }, { type: 'separator' }, + { + icon: 'i-lucide-trash-2', + label: 'Remove', + onSelect: () => removeSeries(i), + }, + { + icon: 'i-lucide-brush-cleaning', + label: 'Remove Other Series', + disabled: state.series.length < 2, + onSelect: () => removeOtherSeries(i), + }, + { type: 'separator' }, { icon: 'i-lucide-square-stack', label: `Duplicate For All Years (${possibleYears.length})`, @@ -1344,7 +1390,9 @@ async function copyToClipboard() { { type: 'separator' }, { icon: 'i-lucide-square-stack', - label: `Duplicate For All Assets (${discrete.assets.length})`, + label: + 'Duplicate For All Assets ' + + `(${peerAssetsFor(series.asset).length})`, onSelect: () => duplicateSeriesForAllAssets(i, { withMatchingData: false }), }, @@ -1353,9 +1401,11 @@ async function copyToClipboard() { label: 'Duplicate For All Assets With Matching Data (' + uniq( - getSamplesFor(omit(series, ['asset'])).map( - (current) => current.asset, - ), + getSamplesFor(omit(series, ['asset'])) + .map((current) => current.asset) + .filter((asset) => + peerAssetsFor(series.asset).includes(asset ?? ''), + ), ).length + ')', onSelect: () => @@ -1379,7 +1429,9 @@ async function copyToClipboard() { :items="[ ...selectableAssets.map((asset) => ({ label: - `${asset} (${discrete.assetToStation[asset]})` + + (asset.includes('(') + ? asset + : `${asset} (${discrete.assetToStation[asset]})`) + `${getNoDataIndicator({ asset })}`, value: asset as string | null, })), diff --git a/dashboard/app/components/pages/Event.vue b/dashboard/app/components/pages/Event.vue index 9558e00..a8f762f 100644 --- a/dashboard/app/components/pages/Event.vue +++ b/dashboard/app/components/pages/Event.vue @@ -6,7 +6,7 @@ import { useStore } from '~/store' const store = useStore() const SITES_CSV_URL = - 'https://raw.githubusercontent.com/OOI-CabledArray/rca-data-tools/main/rca_data_tools/qaqc/params/sitesDictionary.csv' + 'https://raw.githubusercontent.com/OOI-CabledArray"+"/rca-data-tools/main/rca_data_tools/qaqc/params/sitesDictionary.csv' const VARIABLE_MAP_URL = 'https://raw.githubusercontent.com/OOI-CabledArray/rca-data-tools/main/rca_data_tools/qaqc/params/variableMap.csv' @@ -47,7 +47,7 @@ function parseCSVRow(row: string): string[] { return result } -/** Strip a single layer of surrounding double-quotes if present (artifact of triple-quoting in CSV). */ +// Strip a single layer of surrounding double-quotes if present (artifact of triple-quoting in CSV). function stripQuotes(s: string): string { return s.startsWith('"') && s.endsWith('"') ? s.slice(1, -1) : s } @@ -224,25 +224,25 @@ function downloadPDF() { -

+
No images found for this selection.
Select instrument and parameters to load images.
@@ -335,7 +338,7 @@ function downloadPDF() {
-
+
Add Image Selection diff --git a/dashboard/app/discrete.ts b/dashboard/app/discrete.ts index 4399f0d..f853208 100644 --- a/dashboard/app/discrete.ts +++ b/dashboard/app/discrete.ts @@ -13,12 +13,27 @@ export const enum KnownSampleFields { } export type Sample = Readonly<{ + type: SampleType station: string - asset: string + asset?: string timestamp: ZonedDateTime data: SampleData }> +const index = [ + { + type: 'summary', + file: 'summary-samples.csv', + }, + { + type: 'ctd-cast', + file: 'ctd-cast-samples.csv', + }, +] as const + +export const sampleTypes = index.map((entry) => entry.type) +export type SampleType = (typeof sampleTypes)[number] + export type SampleData = Record export type SampleValue = string | number | null export type SampleValueType = 'text' | 'number' | 'timestamp' @@ -28,9 +43,9 @@ export type SampleSchemaFieldDefinition = { type: SampleValueType } -type CsvFile = { - name: string - content: string +type RawSampleGroup = { + type: SampleType + samples: RawSample[] } const timestampRegex = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z$/ @@ -39,8 +54,6 @@ const timestampRegex = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z$/ const fill = -9999999 export const useDiscrete = defineStore('discrete', () => { - // List of CSV file names. - let index = $shallowRef([]) // Parsed discrete samples. let samples = $shallowRef([]) // Inferred schema for discrete samples. @@ -50,22 +63,19 @@ export const useDiscrete = defineStore('discrete', () => { console.log('Loading discrete data.') const start = performance.now() - const loadedIndex = await getIndex() - const loadedRawSamples = await getRawSamples(loadedIndex) - const [loadedSamples, loadedSchema] = extractSamples(loadedRawSamples) + const groups = await Promise.all(index.map(({ type, file }) => getRawSamples(type, file))) + + const [extractedSamples, extractedSchema] = extractSamples(groups) const end = performance.now() const duration = ((end - start) / 1000).toFixed(2) - index = loadedIndex - samples = loadedSamples - schema = loadedSchema + samples = extractedSamples + schema = extractedSchema console.log( `Loaded ${samples.length} discrete samples from ${index.length} files in ${duration}s.`, ) - - return [...index] } const assetsToStation = $computed>(() => @@ -77,6 +87,10 @@ export const useDiscrete = defineStore('discrete', () => { for (const sample of samples) { const station = sample.station const asset = sample.asset + if (asset == null) { + continue + } + const assets = (mapping[station] ??= []) if (!assets.includes(asset)) { assets.push(asset) @@ -88,7 +102,6 @@ export const useDiscrete = defineStore('discrete', () => { return { load, - index: computed(() => index), samples: computed(() => samples), schema: computed(() => schema), fields: computed(() => Object.keys(schema)), @@ -104,78 +117,68 @@ export const useDiscrete = defineStore('discrete', () => { } }) -async function getIndex(): Promise { - const response = await fetch('/discrete/index.json') - const data: string[] = await response.json() - return data -} - -async function getCsvs(index: string[]): Promise { - return await Promise.all( - index.map(async (name) => { - const url = `/discrete/${name}` - const response = await fetch(url) - const content = await response.text() - return { name, content } - }), - ) -} - -async function getRawSamples(index: string[]): Promise { - const csvs = await getCsvs(index) - return csvs.flatMap((csv) => parseRawSamples(csv)) -} - -function parseRawSamples(csv: CsvFile): RawSample[] { - const parsed = parse(csv.content, { +async function getRawSamples(type: SampleType, file: string): Promise { + const url = `/discrete/${file}` + const response = await fetch(url) + const content = await response.text() + const parsed = parse(content, { header: true, skipEmptyLines: true, }) - return [...parsed.data] as RawSample[] + const samples = [...parsed.data] as RawSample[] + return { type, samples } } -function extractSamples(raw: RawSample[]): [Sample[], SampleSchema] { - const schema = inferSchema(raw) - let samples = raw.flatMap((raw) => { - // If there are multiple values in a raw sample's station or asset field, split them into - // multiple parsed samples for each defined station/asset. - const stations = - raw[KnownSampleFields.Station]?.split(',')?.map((current) => current.trim()) ?? [] - - return stations.flatMap((station) => { - const assets = - raw[KnownSampleFields.Asset]?.split(',')?.map((current) => current.trim()) ?? [] - return assets.flatMap((asset) => { - let timestamp: ZonedDateTime - try { - timestamp = parseAbsolute(raw[KnownSampleFields.Timestamp] ?? 'unknown', 'UTC') - } catch { - return [] - } +function extractSamples(groups: RawSampleGroup[]): [Sample[], SampleSchema] { + // Infer schema from all raw samples across all groups. + const schema = inferSchema(groups.flatMap((file) => file.samples)) + + let samples = groups.flatMap((group) => + group.samples.flatMap((raw) => { + // If there are multiple values in a raw sample's station or asset field, split them into + // multiple parsed samples for each defined station/asset. + const stations = + raw[KnownSampleFields.Station]?.split(',')?.map((current) => current.trim()) ?? [] + + return stations.flatMap((station) => { + const assets = + raw[KnownSampleFields.Asset]?.split(',')?.map((current) => current.trim()) ?? [] + return assets.flatMap((asset) => { + if (asset === '') { + return [] + } - station = station.split('-').map((current) => current.trim())[0] ?? '' - if (station === '') { - return [] - } + let timestamp: ZonedDateTime + try { + timestamp = parseAbsolute(raw[KnownSampleFields.Timestamp] ?? 'unknown', 'UTC') + } catch { + return [] + } - const data: SampleData = { - [KnownSampleFields.Station]: station, - [KnownSampleFields.Asset]: asset, - } - for (const [name, value] of Object.entries(raw)) { - const field = schema[name] - if (field == null || name in data) { - continue + station = station.split('-').map((current) => current.trim())[0] ?? '' + if (station === '') { + return [] } - data[name] = convertValue(value as string, field) - } + const data: SampleData = { + [KnownSampleFields.Station]: station, + [KnownSampleFields.Asset]: asset, + } + for (const [name, value] of Object.entries(raw)) { + const field = schema[name] + if (field == null || name in data) { + continue + } + + data[name] = parseValue(value as string, field) + } - return { station, asset, timestamp, data } + return { type: group.type, station, asset, timestamp, data } + }) }) - }) - }) + }), + ) samples = orderBy(samples, [(sample) => sample.asset, (sample) => sample.timestamp.toDate()]) // Freeze samples to prevent accidental mutations and improve performance in some cases. @@ -248,7 +251,7 @@ function inferValueType(name: string, raw: string): SampleValueType | null { return 'text' } -function convertValue(raw: string, field: SampleSchemaFieldDefinition): SampleValue { +function parseValue(raw: string, field: SampleSchemaFieldDefinition): SampleValue { raw = raw.trim() if (raw === '') { return null diff --git a/dashboard/app/store.ts b/dashboard/app/store.ts index d711fab..71651bd 100644 --- a/dashboard/app/store.ts +++ b/dashboard/app/store.ts @@ -321,7 +321,6 @@ export const useStore = defineStore('store', () => { { title: 'QAQC HITL Notes Interface', route: - // eslint-disable-next-line max-len 'https://www.appsheet.com/Account/Login?appName=RCA%20HITL%20Notes%20Interface&FullScope=False&provider=google&returnUrl=https%3A%2F%2Fwww.appsheet.com%2Fstart%2F8a3bfde2-1806-4155-847c-b10e555ddea1%3Fplatform%3Ddesktop#appName=SensorNotesTracker-59462577&vss=H4sIAAAAAAAAA6WOvQrCMBRGX0W-OU-QVRxE6lJxMR1icwvBNilNqpaQd_fWHzqr4_0u53ASrpZuZdT1BfKUlmtHEySSwmHqSUEqrL2Lg28VhMJed6-xmFYlueCHoJCRK_ExRAqQ6VuB_LdAwBpy0TaWhtk2s2x5k_yeOR4WClmgG6M-t_TMZipn3hpfj4HMkXN-yghbt7n32pnCG5Y2ug2UH3J6VTNvAQAA&view=My%20Sensors', external: true, }, diff --git a/dashboard/eslint.config.mjs b/dashboard/eslint.config.mjs index 0bcb473..49866c7 100644 --- a/dashboard/eslint.config.mjs +++ b/dashboard/eslint.config.mjs @@ -15,7 +15,7 @@ export default withNuxt([ }, rules: { 'no-undef': 'off', // Handled by TypeScript. - 'max-len': ['warn', 100, 2], + 'max-len': ['warn', 100, 2, { ignoreStrings: true }], '@typescript-eslint/ban-ts-comment': 'off', '@typescript-eslint/no-dynamic-delete': 'off', '@typescript-eslint/no-explicit-any': 'off', diff --git a/pyproject.toml b/pyproject.toml index 202053d..3164305 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,4 +6,6 @@ dependencies = [ "pip", "tqdm", "rca-data-tools @ git+https://github.com/OOI-CabledArray/rca-data-tools.git", + "requests>=2.32.5", + "beautifulsoup4>=4.14.2", ] diff --git a/scripts/collect_discrete_ctd_cast_samples.py b/scripts/collect_discrete_ctd_cast_samples.py new file mode 100755 index 0000000..162dd47 --- /dev/null +++ b/scripts/collect_discrete_ctd_cast_samples.py @@ -0,0 +1,680 @@ +#!/usr/bin/env python3 +""" +Parse all Sea-Bird *.btl bottle files under a given root directory and emit a single CSV/JSON file +where every line/array element represents one bottle-closure sample. + +Usage: + ./collect_discrete_ctd_cast_samples.py ROOT [--out FILE] [--format {csv,json}] [--indent N] [--verbose] + +The output format is inferred from the extension of `--out` when it is ".csv" or ".json". + +If the extension is unrecognised, `--format` is used instead (default: csv). +""" + +from __future__ import annotations + +import argparse +import json +import logging +import re +import sys +from csv import DictWriter +from datetime import datetime, timezone +from pathlib import Path +from textwrap import dedent +from typing import Any + +logging.basicConfig( + level=logging.INFO, + format="%(levelname)-8s %(message)s", +) +log = logging.getLogger(__name__) + +# Sea-Bird BTL column names mapping matching the discrete sample column names as closely as +# possible. +COLUMN_MAP: dict[str, str] = { + "Bottle": "_btl_bottle_number", # Internal, written as "Bottle Position". + "Date": "_btl_date", # Internal, combined with time to create "Bottle Closure Time [UTC]". + "PrDM": "CTD Pressure [db]", + "DepSM": "CTD Depth [m]", + "Latitude": "CTD Latitude [deg]", + "Longitude": "CTD Longitude [deg]", + "T090C": "CTD Temperature 1 [deg C]", + "T190C": "CTD Temperature 2 [deg C]", + "C0S/m": "CTD Conductivity 1 [S/m]", + "C1S/m": "CTD Conductivity 2 [S/m]", + "Sal00": "CTD Salinity 1 [psu]", + "Sal11": "CTD Salinity 2 [psu]", + "Sbeox0V": "CTD Oxygen Voltage [V]", + "Sbeox0ML/L": "CTD Oxygen [mL/L]", + "OxsolML/L": "CTD Oxygen Saturation [mL/L]", + "FlECO-AFL": "CTD Fluorescence [mg/m^3]", + "FlSP": "CTD Fluorescence [mg/m^3]", + "CStarAt0": "CTD Beam Attenuation [1/m]", + "CStarTr0": "CTD Beam Transmission [%]", + "Ph": "CTD pH", +} + +# Rules for mapping a raw `.btl` Location/Station/Station Name/Station Number string to the +# canonical station name used in summary-samples.csv. +# +# Each entry is a (keywords, station_name) pair. All keywords in the tuple must appear +# (case-insensitively) somewhere in the location string for the rule to match. The first +# matching rule wins. More-specific rules (more keywords, or narrower terms) must therefore +# appear before less-specific ones. +# +# Station names deliberately omit the "- m " offset suffix so that multiple nearby +# cast positions all resolve to the same base name. If the matched station name has no recognised +# platform-type suffix (" Shallow Profiler", " Deep Profiler", " BEP", " Junction Box") one is +# appended automatically by `_ensure_station_suffix`. +LOCATION_STATION_RULES: list[tuple[list[str], str]] = [ + # + # Exact CTD-number codes. These must precede generic keyword rules. + # + # TN326 2015: slope base junction box + (["ctd12"], "Slope Base Junction Box LJ01A"), + (["ctd13"], "Slope Base Junction Box LJ01A"), + (["ctd17"], "Slope Base Junction Box LJ01A"), + (["ctd19"], "Slope Base Junction Box LJ01A"), + # TN326 2015: slope base shallow profiler + (["ctd02"], "Slope Base Shallow Profiler"), + (["ctd14"], "Slope Base Shallow Profiler"), + # TN326 2015: axial base shallow profiler + (["ctd03"], "Axial Base Shallow Profiler"), + (["ctd04"], "Axial Base Shallow Profiler"), + (["ctd05"], "Axial Base Shallow Profiler"), + # SKQ201610S 2016: slope base junction box + (["ca_ctd_2016-02"], "Slope Base Junction Box LJ01A"), + # SKQ201610S 2016: slope base shallow profiler + (["ca_ctd_2016-03"], "Slope Base Shallow Profiler"), + (["ca_ctd_2016-06"], "Slope Base Shallow Profiler"), + # AT50-29 2024: axial base shallow profiler + (["ctd 12"], "Axial Base Shallow Profiler"), + (["ctd 13"], "Axial Base Shallow Profiler"), + # AT50-29 2024: oregon offshore shallow profiler + (["ctd 009"], "Oregon Offshore Shallow Profiler"), + (["ctd 14"], "Oregon Offshore Shallow Profiler"), + # + # Generic keyword rules. + # + # Slope Base Junction Box + (["lv01a"], "Slope Base Junction Box LV01A"), + (["lj01a"], "Slope Base Junction Box LJ01A"), + (["bubble", "plume"], "Slope Base Junction Box LJ01A"), + # Slope Base Deep Profiler + (["slope", "base", "dp"], "Slope Base Deep Profiler"), + (["slope", "base", "deep"], "Slope Base Deep Profiler"), + # Slope Base Shallow Profiler + (["slope", "base"], "Slope Base Shallow Profiler"), + # Axial Base Deep Profiler + (["axial", "dp"], "Axial Base Deep Profiler"), + (["axial", "deep"], "Axial Base Deep Profiler"), + # Axial Base Shallow Profiler + (["axial"], "Axial Base Shallow Profiler"), + # Oregon Offshore Deep Profiler + (["offshore", "dp"], "Oregon Offshore Deep Profiler"), + (["offshore", "deep"], "Oregon Offshore Deep Profiler"), + (["offshore", "600"], "Oregon Offshore Deep Profiler"), + (["endurance", "dp"], "Oregon Offshore Deep Profiler"), + # Oregon Shelf BEP + (["shelf"], "Oregon Shelf BEP"), + # Oregon Offshore Shallow Profiler + (["offshore"], "Oregon Offshore Shallow Profiler"), + (["endurance"], "Oregon Offshore Shallow Profiler"), +] + + +def lookup_station(location: str) -> str | None: + """ + Return the canonical summary-samples.csv station name for a raw btl Location/Station value, + or None if no rule matches. + + Each rule in LOCATION_STATION_RULES is checked in order; the first whose keywords are all + present (case-insensitively) in `location` wins. + """ + lower = location.lower() + for keywords, station_name in LOCATION_STATION_RULES: + if all(kw in lower for kw in keywords): + return station_name + return None + + +# Mapping from raw header keys to CSV column names. +META_KEY_TO_COLUMN_MAP: dict[str, str] = { + "cruise": "Cruise", + "cast": "Cast", + "station": "Station", + "station_name": "Station", + "station_number": "Station", + "location": "Station", + "water_depth": "CTD Cast Water Depth [m]", + "depth": "CTD Depth [m]", + "date": "Start Time [UTC]", +} + + +def sanitise_key(column_name: str) -> str: + """ + Convert an unmapped Sea-Bird column name to a safe JSON key by replacing every run of + non-alphanumeric characters with a single underscore. + """ + return re.sub(r"[^A-Za-z0-9]+", "_", column_name).strip("_") + + +def column_to_key(column_name: str) -> str: + """Return the JSON key for a Sea-Bird column name.""" + if column_name in COLUMN_MAP: + return COLUMN_MAP[column_name] + + return sanitise_key(column_name) + + +def is_stdev_key(key: str) -> bool: + """Return `True` if `key` is a standard-deviation companion column.""" + return key.endswith(" stdev") or key.endswith("_sdev") + + +def stdev_key(measurement_key: str) -> str: + """ + Return the standard-deviation companion key for a measurement key. + + For keys that match CSV column names (contain spaces / brackets) the stdev is appended as a + suffix in a consistent way so downstream code can find it. For plain snake_case keys the _sdev + suffix is used. + """ + if " " in measurement_key or "[" in measurement_key: + return measurement_key + " stdev" + return measurement_key + "_sdev" + + +# Regex for a Sea-Bird user-comment metadata line. +META_REGEX = re.compile(r"^\*\*\s+([A-Za-z ]+?):\s*(.*?)\s*$") + +# Regex for a system header line. +SYS_REGEX = re.compile(r"^\*\s+([A-Za-z /()]+?)\s*=\s*(.*?)\s*$") + +# Month abbreviations used when parsing date strings. +MONTHS = { + name: number + for number, name in enumerate( + ( + "jan", + "feb", + "mar", + "apr", + "may", + "jun", + "jul", + "aug", + "sep", + "oct", + "nov", + "dec", + ), + start=1, + ) +} + + +def parse_datetime(text: str, time_text: str) -> str | None: + """ + Combine a date token like "Aug 06 2021" and a time token like "00:08:19" into an ISO-8601 UTC + string. Returns `None` on failure. + """ + try: + parts = text.split() + if len(parts) == 3: + month = MONTHS.get(parts[0].lower()) + if month is None: + return None + day = int(parts[1]) + year = int(parts[2]) + else: + return None + + hour, minute, second = (int(part) for part in time_text.split(":")) + return datetime( + year, + month, + day, + hour, + minute, + second, + tzinfo=timezone.utc, + ).isoformat() + except Exception: + return None + + +def parse_nmea_utc(value: str) -> str | None: + """ + Convert an NMEA UTC header string like "Jul 06 2015 15:59:45" to an ISO-8601 UTC string. Returns + `None` on failure. + """ + try: + # Format: "Mon DD YYYY HH:MM:SS" + nmea_parts = value.split() + if len(nmea_parts) != 4: + return None + date_str = " ".join(nmea_parts[:3]) # "Jul 06 2015" + time_str = nmea_parts[3] # "15:59:45" + return parse_datetime(date_str, time_str) + except Exception: + return None + + +def parse_nmea_coordinates(value: str) -> float | None: + """ + Convert an NMEA degree-minute string like "44 31.57 N" or "125 23.67 W" to a signed decimal + degree float. + """ + try: + parts = value.split() + if len(parts) != 3: + return None + degrees = float(parts[0]) + minutes = float(parts[1]) + hemisphere = parts[2].upper() + decimal_degrees = degrees + minutes / 60.0 + if hemisphere in {"S", "W"}: + decimal_degrees = -decimal_degrees + return round(decimal_degrees, 6) + except Exception: + return None + + +def coerce_number(value: str) -> float | str: + """Try to parse `value` as float; return the original string on failure.""" + try: + return float(value) + except ValueError: + return value + + +def parse(path: Path) -> list[dict[str, Any]]: + try: + text = path.read_text(encoding="latin-1") + except OSError as exc: + log.error("Cannot read %s: %s", path, exc) + return [] + + lines = text.splitlines() + + # + # Extract header metadata values, which are values included in all samples. + # + meta: dict[str, Any] = {} + + for line in path.open(encoding="latin-1"): + # User-defined comment fields: ** Cruise: TN393 + user_comment_match = META_REGEX.match(line) + if user_comment_match: + raw_key = re.sub(r"\s+", "_", user_comment_match.group(1).strip().lower()) + value = user_comment_match.group(2).strip() + if value: + csv_key = META_KEY_TO_COLUMN_MAP.get(raw_key) + if csv_key: + # "Station" can be supplied by multiple source keys, first wins. + if csv_key not in meta: + if csv_key == "Station": + canonical = lookup_station(value) + if canonical is not None: + meta["Station"] = canonical + else: + log.warning( + "No station mapping found for location %r in %s, skipping.", + value, + path, + ) + else: + meta[csv_key] = value + + continue + + # System header fields. + sys_header_match = SYS_REGEX.match(line) + if sys_header_match: + raw_key = sys_header_match.group(1).strip().lower() + value = sys_header_match.group(2).strip() + if "nmea latitude" in raw_key: + coord = parse_nmea_coordinates(value) + if coord is not None: + meta["Start Latitude [degrees]"] = coord + elif "nmea longitude" in raw_key: + coord = parse_nmea_coordinates(value) + if coord is not None: + meta["Start Longitude [degrees]"] = coord + elif "nmea utc" in raw_key: + # Only use as Start Time fallback if not already set by ** Date + if "Start Time [UTC]" not in meta: + iso_time = parse_nmea_utc(value) + meta["Start Time [UTC]"] = ( + iso_time if iso_time is not None else value + ) + elif "software version" in raw_key: + meta["software_version"] = value + continue + + # Stop scanning metadata once we hit the data section + if line.strip().startswith("Bottle") and "Date" in line: + break + + # Find column header line and parse column names. + header_line_index: int | None = None + raw_columns: list[str] = [] + + for line_index, line in enumerate(lines): + stripped = line.strip() + if stripped.startswith("Bottle") and "Date" in stripped: + # Split on whitespace. The header uses fixed-width layout but whitespace splitting is + # reliable for the column names themselves. + raw_columns = stripped.split() + header_line_index = line_index + break + + if "Station" not in meta: + return [] + + if header_line_index is None: + log.warning("No data header found in %s, skipping.", path) + return [] + + # Map Sea-Bird column names → clean JSON keys + json_keys = [column_to_key(column_name) for column_name in raw_columns] + + # + # Parse data rows. + # + # The BTL format interleaves "avg" and "sdev" rows, but we really only care about the averages. + # + # The bottle number and date appear on the avg row; the time of day + # appears in the same column position as "Date" on the sdev row. + # We combine them into a single ISO timestamp. + # + # Skip the "Position Time ..." unit row immediately after the header. + data_start_index = header_line_index + 2 + if data_start_index < len(lines) and "Position" in lines[data_start_index - 1]: + pass # already correct — unit row was at header_line_index + 1 + + samples: list[dict[str, Any]] = [] + row_index = data_start_index + + while row_index < len(lines): + avg_line = lines[row_index] + + # Skip blank lines + if not avg_line.strip(): + row_index += 1 + continue + + if not avg_line.strip().endswith("(avg)"): + row_index += 1 + continue + + # Peek at the next non-empty line to get the sdev row. + sdev_line = "" + sdev_search_index = row_index + 1 + while sdev_search_index < len(lines): + if lines[sdev_search_index].strip(): + if lines[sdev_search_index].strip().endswith("(sdev)"): + sdev_line = lines[sdev_search_index] + break + sdev_search_index += 1 + + # Parse the "avg" row, stripping the trailing "(avg)" marker before splitting. + avg_values_raw = avg_line.strip().rstrip("(avg)").strip() + # The first token is the bottle number, the next three tokens form the date ("Aug 06 2021"), + # then the remaining tokens are numeric values. + avg_tokens = avg_values_raw.split() + + # ---- Parse the sdev row ---------------------------------------- + sdev_tokens: list[str] = [] + if sdev_line: + sdev_values_raw = sdev_line.strip().rstrip("(sdev)").strip() + sdev_tokens = sdev_values_raw.split() + + # Reconstruct per-column token lists. + avg_column_tokens: list[str | None] = [] + sdev_column_tokens: list[str | None] = [] + + try: + # Append bottle number. + avg_column_tokens.append(avg_tokens[0]) + # No bottle number on sdev row. + sdev_column_tokens.append(None) + + # Date is 3 tokens, time is 1. + date_text = " ".join(avg_tokens[1:4]) + avg_column_tokens.append(date_text) + + time_text = sdev_tokens[0] if sdev_tokens else None + sdev_column_tokens.append(time_text) + + # Remaining numeric columns, one token each. + for avg_token_index in range(4, len(avg_tokens)): + avg_column_tokens.append(avg_tokens[avg_token_index]) + for sdev_token_index in range(1, len(sdev_tokens)): + sdev_column_tokens.append(sdev_tokens[sdev_token_index]) + + except IndexError: + log.warning( + "Malformed row in %s at line %d, skipping bottle.", path, row_index + 1 + ) + row_index = sdev_search_index + 1 + continue + + # Build the sample with "Station" and "Target Asset" at the front. + station = meta["Station"] + sample: dict[str, Any] = { + "Station": station, + "Target Asset": f"CTD Cast ({station})", + **{key: value for key, value in meta.items() if key != "Station"}, + } + + # Populate measured columns. + date_part: str = "" + time_part: str = "" + + for column_index, json_key in enumerate(json_keys): + avg_text = ( + avg_column_tokens[column_index] + if column_index < len(avg_column_tokens) + else None + ) + sdev_text = ( + sdev_column_tokens[column_index] + if column_index < len(sdev_column_tokens) + else None + ) + + if json_key == "_btl_bottle_number": + sample["CTD Cast Bottle Position"] = ( + int(avg_text) if avg_text is not None else None + ) + + elif json_key == "_btl_date": + # Defer writing the datetime until we also have the time. + date_part = avg_text or "" + time_part = sdev_text or "" + + else: + # Numeric measurement, skip internal sentinel keys. + if json_key.startswith("_btl_"): + continue + if avg_text is not None: + sample[json_key] = coerce_number(avg_text) + if sdev_text is not None: + sample[stdev_key(json_key)] = coerce_number(sdev_text) + + # Combine date and time into ISO-8601 UTC → "CTD Bottle Closure Time [UTC]". + if date_part and time_part: + iso = parse_datetime(date_part, time_part) + if iso: + sample["CTD Cast Bottle Closure Time [UTC]"] = iso + else: + sample["CTD Cast Bottle Closure Time [UTC]"] = ( + f"{date_part} {time_part}" + ) + elif date_part: + sample["CTD Cast Bottle Closure Time [UTC]"] = date_part + + # Drop stdev keys before storing. + sample = {key: value for key, value in sample.items() if not is_stdev_key(key)} + samples.append(sample) + row_index = sdev_search_index + 1 + + return samples + + +def write_samples_as_json( + samples: list[dict[str, Any]], + output: Path, + indent: int, +) -> None: + with output.open("w", encoding="utf-8") as stream: + json.dump( + samples, + stream, + indent=indent or None, + ensure_ascii=False, + ) + stream.write("\n") + + +def write_samples_as_csv(samples: list[dict[str, Any]], output: Path) -> None: + + columns: dict[str, None] = {} + for sample in samples: + # Add keys from all fields, values are updated here, but ignored. Preserve insertion order + # by using dictionary keys as an ordered set. + columns.update(sample) + + with output.open("w", encoding="utf-8", newline="") as stream: + writer = DictWriter( + stream, + fieldnames=list(columns), + extrasaction="ignore", + ) + writer.writeheader() + writer.writerows(samples) + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Convert all *.btl files under ROOT to a single JSON or CSV file.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=dedent(""" + examples: + # Write to the default output path (.csv). + ./collect_discrete_ctd_cast_samples.py ./ctd-cast-sample-source-files/ + # Write to a JSON file (format inferred from extension). + ./collect_discrete_ctd_cast_samples.py ./ctd-cast-sample-source-files/ --out ctd-cast-source-files.json + # Override format explicitly. + ./collect_discrete_ctd_cast_samples.py ./ctd-cast-sample-source-files/ --out ctd-cast-source-files.json + # Use compact JSON output with no indentation. + ./collect_discrete_ctd_cast_samples.py ./ctd-cast-sample-source-files/ --out ctd-cast-source-files.json --indent 0 + """), + ) + parser.add_argument( + "root", + type=Path, + metavar="ROOT", + help="Root directory to search recursively for *.btl files.", + ) + parser.add_argument( + "--format", + choices=["csv", "json"], + default=None, + dest="format", + help="Output format: csv or json. Takes priority over the extension of --out. (default: csv)", + ) + parser.add_argument( + "--out", + type=Path, + default=None, + metavar="FILE", + help=( + "Output file path. Format is inferred from the extension (.csv or .json) " + "when recognised, otherwise --format is used. " + "Defaults to .csv in the current directory." + ), + ) + parser.add_argument( + "--indent", + type=int, + default=2, + metavar="N", + help="JSON indentation level, use 0 for compact output. (default: 2)", + ) + parser.add_argument( + "--verbose", + action="store_true", + help="Enable DEBUG-level logging.", + ) + + return parser.parse_args(argv) + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(argv) + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + root: Path = args.root + if not root.exists(): + log.error("Root directory '%s' does not exist.", root) + return 1 + + if args.format is not None: + # The `--format` was explicitly provided, and takes priority. + format = args.format + out = args.out if args.out is not None else Path(f"{root.name}.{format}") + elif args.out is not None: + # No explicit `--format`, infer from the file extension. + extension = args.out.suffix.lower()[1:] + if extension in {"csv", "json"}: + format = extension + else: + log.warning( + "Unrecognised extension '.%s' in --out, defaulting to 'csv'.", + extension, + ) + format = "csv" + out = args.out + else: + # Neither `--format` or `--out` provided, use defaults. + format = "csv" + out = Path(f"{root.name}.csv") + + files = sorted(root.rglob("*.btl")) + if not files: + log.error("No *.btl files found under '%s', exiting.", root) + return 1 + + log.info("Found %s *.btl file(s) under '%s'.", len(files), root) + + samples: list[dict[str, Any]] = [] + + for file in files: + log.debug("Parsing '%s'.", file) + file_samples = parse(file) + log.info(" '%s' (%s sample(s))", file, len(file_samples)) + samples.extend(file_samples) + + log.info("Parsed %s sample(s) from %s file(s).", len(samples), len(files)) + + out.parent.mkdir(parents=True, exist_ok=True) + + match format: + case "csv": + write_samples_as_csv(samples, out) + case "json": + write_samples_as_json(samples, out, args.indent) + + log.info(f"'{out}': ({out.stat().st_size / 1024:.1f} KB)") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/collect_discrete_summary_samples.py b/scripts/collect_discrete_summary_samples.py new file mode 100644 index 0000000..29c9727 --- /dev/null +++ b/scripts/collect_discrete_summary_samples.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python3 +""" +Concatenate all discrete summary CSV files under a given root directory into a single CSV/JSON +file where every line/array element represents one sample. + +The header of the first file found is used as the output header. Rows from files whose header +differs from the first are still included, missing fields are written as empty and extra fields +are ignored. + +Usage: + ./collect_discrete_summary_samples.py ROOT [--out FILE] [--format {csv,json}] [--indent N] [--verbose] + +The output format is inferred from the extension of `--out` when it is ".csv" or ".json". + +If the extension is unrecognised, `--format` is used instead (default: csv). +""" + +from __future__ import annotations + +import argparse +import csv +import json +import logging +import sys +from pathlib import Path +from textwrap import dedent +from typing import Any + +logging.basicConfig( + level=logging.INFO, + format="%(levelname)-8s %(message)s", +) +log = logging.getLogger(__name__) + + +def parse( + path: Path, + columns: list[str] | None, +) -> tuple[list[str], list[dict[str, Any]]]: + """Read `path` as a CSV file. + + Returns a (columns, rows) tuple. If `columns` is provided (i.e. this is not the first file) it + is used as the canonical column order. Rows with a differing header are still read but mapped + onto the canonical columns. + """ + try: + text = path.read_text() + except OSError as exc: + log.error("Cannot read %s: %s", path, exc) + return columns or [], [] + + lines = text.splitlines() + if not lines: + log.warning("'%s' is empty, skipping.", path) + return columns or [], [] + + reader = csv.DictReader(lines) + + if columns is None: + columns = list(reader.fieldnames or []) + + rows: list[dict[str, Any]] = list(reader) + return columns, rows + + +def write_as_csv( + columns: list[str], + rows: list[dict[str, Any]], + output: Path, +) -> None: + with output.open("w", encoding="utf-8", newline="") as stream: + writer = csv.DictWriter(stream, fieldnames=columns, extrasaction="ignore") + writer.writeheader() + writer.writerows(rows) + + +def write_as_json( + rows: list[dict[str, Any]], + output: Path, + indent: int, +) -> None: + with output.open("w", encoding="utf-8") as stream: + json.dump(rows, stream, indent=indent or None, ensure_ascii=False) + stream.write("\n") + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Combine all discrete sample summary *.csv files under ROOT into a single CSV or JSON file.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=dedent(""" + examples: + # Write to the default output path (.csv). + ./collect_discrete_summary_samples.py ./source-data/summary-sample-source-files + # Write to a JSON file (format inferred from extension). + ./collect_discrete_summary_samples.py ./source-data/summary-sample-source-files --out summary-samples.json + # Override format explicitly. + ./collect_discrete_summary_samples.py ./source-data/summary-sample-source-files --format json + # Use compact JSON output with no indentation. + ./collect_discrete_summary_samples.py ./source-data/summary-sample-source-files --out summary-samples.json --indent 0 + """), + ) + parser.add_argument( + "root", + type=Path, + metavar="ROOT", + help="Root directory to search recursively for *.csv files.", + ) + parser.add_argument( + "--format", + choices=["csv", "json"], + default=None, + dest="format", + help=( + "Output format: csv or json. Takes priority over the extension of `--out`. (default: csv)" + ), + ) + parser.add_argument( + "--out", + type=Path, + default=None, + metavar="FILE", + help=( + "Output file path. Format is inferred from the extension (.csv or .json) when " + "recognised, otherwise `--format` is used. " + "Defaults to .csv in the current directory." + ), + ) + parser.add_argument( + "--indent", + type=int, + default=2, + metavar="N", + help="JSON indentation level, use 0 for compact output. (default: 2)", + ) + parser.add_argument( + "--verbose", + action="store_true", + help="Enable DEBUG-level logging.", + ) + + return parser.parse_args(argv) + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(argv) + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + root: Path = args.root + if not root.exists(): + log.error("Root directory '%s' does not exist.", root) + return 1 + + if args.format is not None: + fmt = args.format + out = args.out if args.out is not None else Path(f"{root.name}.{fmt}") + elif args.out is not None: + extension = args.out.suffix.lower()[1:] + if extension in {"csv", "json"}: + fmt = extension + else: + log.warning( + "Unrecognised extension '.%s' in --out, defaulting to 'csv'.", + extension, + ) + fmt = "csv" + out = args.out + else: + fmt = "csv" + out = Path(f"{root.name}.csv") + + files = sorted(root.rglob("*.csv")) + if not files: + log.warning("No *.csv files found under '%s', exiting.", root) + return 0 + + log.info("Found %d *.csv file(s) under '%s'.", len(files), root) + + columns: list[str] | None = None + rows: list[dict[str, Any]] = [] + + for file in files: + log.debug("Reading '%s'.", file) + columns, current_rows = parse(file, columns) + log.info(" '%s' (%d row(s))", file, len(current_rows)) + rows.extend(current_rows) + + log.info("Collected %d row(s) from %d file(s).", len(rows), len(files)) + + out.parent.mkdir(parents=True, exist_ok=True) + + match fmt: + case "csv": + write_as_csv(columns or [], rows, out) + case "json": + write_as_json(rows, out, args.indent) + + log.info("'%s': (%.1f KB)", out, out.stat().st_size / 1024) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/download_rawdata.py b/scripts/download_rawdata.py new file mode 100755 index 0000000..06b7f88 --- /dev/null +++ b/scripts/download_rawdata.py @@ -0,0 +1,362 @@ +#!/usr/bin/env python3 +""" +Recursively mirror https://rawdata-west.oceanobservatories.org/files/ into a local directory. + +Usage: + ./download_rawdata.py [GLOB] [--destination DIR] [--flatten] [--workers N] [--dry-run] [--verbose] + + GLOB Optional glob pattern for files to download, relative to the archive root, + e.g. "cruise_data/Cabled/*/Water_Sampling/CTD Data/*.btl" +""" + +import argparse +import fnmatch +import logging +import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from urllib.parse import unquote, urljoin, urlparse + +import requests +from bs4 import BeautifulSoup +from requests import Session +from requests.exceptions import RequestException + +BASE_URL = "https://rawdata-west.oceanobservatories.org/files/" +DEFAULT_DESTINATION = Path("raw") +DEFAULT_WORKERS = 8 +CHUNK_SIZE = 1024 * 256 # 256 KiB +RETRY_LIMIT = 3 +RETRY_BACKOFF = 2.0 # Seconds, doubles on each retry. + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s %(levelname)-8s %(message)s", + datefmt="%H:%M:%S", +) +log = logging.getLogger(__name__) + + +def is_directory_url(href: str) -> bool: + """Return True when the href points to a sub-directory (ends with '/').""" + return href.endswith("/") + + +def url_to_relative_path(file_url: str) -> str: + """ + Strip the BASE_URL prefix from a file URL and return the relative path string. + + Percent-encoding (e.g. %20 = ' ') is decoded so callers always receive a plain text path. + """ + parsed = urlparse(file_url) + remote_path = unquote(parsed.path) + prefix = "/files/" + if remote_path.startswith(prefix): + return remote_path[len(prefix) :] + return remote_path.lstrip("/") + + +def glob_matches(relative_path: str, pattern: str | None) -> bool: + if pattern is None: + return True + + return fnmatch.fnmatchcase(relative_path.lower(), pattern.lower()) + + +def directory_could_match(directory: str, pattern: str) -> bool: + """ + Return True if descending into `directory` could ever yield a file that matches `pattern`. + + Walks the leading segments of the pattern against the directory's relative path. Returns + `False` only when a segment is provably incompatible so we never prune a directory that might + contain matches. A '**' in the pattern always allows further descent. + """ + relative_dir = url_to_relative_path(directory) + if not relative_dir: + return True + + pattern_parts = pattern.replace("\\", "/").split("/") + directory_parts = relative_dir.rstrip("/").split("/") + + for i, directory_segment in enumerate(directory_parts): + if i >= len(pattern_parts): + # Pattern is shallower than the directory, can't match deeper files. + return False + pattern_segment = pattern_parts[i] + if pattern_segment == "**": + # The '**' matches everything from here down. + return True + + if not fnmatch.fnmatchcase(directory_segment.lower(), pattern_segment.lower()): + return False + + return True + + +def list_directory( + session: Session, + url: str, + start_url: str, +) -> tuple[list[str], list[str]]: + """ + Fetch an Apache-style directory listing and return absolute URLs of sub-directories and absolute + URLs of downloadable files. + + The `start_url` is the root of the current crawl (used to reject links that escape back above + it). + """ + directories: list[str] = [] + files: list[str] = [] + + try: + resp = session.get(url, timeout=30) + resp.raise_for_status() + except RequestException as exc: + log.error("Failed to list %s: %s", url, exc) + return directories, files + + soup = BeautifulSoup(resp.text, "html.parser") + + for anchor in soup.find_all("a", href=True): + href: str = str(anchor["href"]) + + if ( + href in ("../", "/") + or href.startswith("?") + or "Parent Directory" in anchor.text + ): + continue + + # Resolve to absolute URL. + absolute = urljoin(url, href) + # Skip anything that escapes above the crawl root. + if not absolute.startswith(start_url): + continue + + if is_directory_url(href): + directories.append(absolute) + else: + files.append(absolute) + + return directories, files + + +def crawl( + session: Session, + url: str, + glob: str | None = None, + *, + _start_url: str | None = None, +) -> list[str]: + """ + Recursively walk the remote directory tree starting at *url* and return + a flat list of all file URLs whose relative paths match *glob*. + + Early pruning: if a glob pattern is given and its leading path segments + cannot possibly match a directory's relative path, that entire subtree is + skipped without being fetched. + """ + if _start_url is None: + _start_url = url + + all_files: list[str] = [] + directories, files = list_directory(session, url, _start_url) + + for file_url in files: + relative = url_to_relative_path(file_url) + if glob_matches(relative, glob): + all_files.append(file_url) + else: + log.debug("Skipping (glob mismatch): %s", relative) + + for directory in directories: + if glob is not None and not directory_could_match(directory, glob): + log.debug("Ignoring directory '%s'.", directory) + continue + + log.debug("Entering directory '%s'.", directory) + all_files.extend(crawl(session, directory, glob, _start_url=_start_url)) + + return all_files + + +def get_local_path(file_url: str, destination: Path, flatten: bool = False) -> Path: + """Convert a remote file URL to the corresponding local Path under `destination`. + + If `flatten` is True the directory structure is collapsed and path separators are + replaced with '__', so all files land directly in `destination`. + """ + relative = url_to_relative_path(file_url) + if flatten: + return destination / relative.replace("/", "__") + return destination / relative + + +def download_file( + session: Session, + url: str, + destination: Path, + flatten: bool = False, +) -> tuple[str, str]: + """ + Download `url` to `destination`, preserving the remote directory structure. Returns (url, + status) where status is 'ok', 'skipped', or 'error'. + """ + local = get_local_path(url, destination, flatten=flatten) + + if local.exists(): + try: + head = session.head(url, timeout=15) + size = int(head.headers.get("Content-Length", -1)) + if size > 0 and local.stat().st_size == size: + log.debug("Skipping (already complete): %s", local) + return url, "skipped" + except Exception: + pass + + local.parent.mkdir( + parents=True, exist_ok=True + ) # no-op for the destination dir itself when flattened + tmp = local.with_suffix(local.suffix + ".part") + + for attempt in range(1, RETRY_LIMIT + 1): + try: + with session.get(url, stream=True, timeout=60) as resp: + resp.raise_for_status() + with open(tmp, "wb") as fh: + for chunk in resp.iter_content(chunk_size=CHUNK_SIZE): + if chunk: + fh.write(chunk) + tmp.rename(local) + log.info("Downloaded: %s", local) + return url, "ok" + except requests.RequestException as exc: + log.warning( + "Attempt %d/%d failed for %s: %s", attempt, RETRY_LIMIT, url, exc + ) + if tmp.exists(): + tmp.unlink() + if attempt < RETRY_LIMIT: + time.sleep(RETRY_BACKOFF * attempt) + + log.error("Giving up on '%s'.", url) + return url, "error" + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Mirror the OOI raw data archive (or a subdirectory) to a local folder.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +examples: + # Download CTD .btl files across every Cabled cruise. + ./download_rawdata.py 'cruise_data/Cabled/*/Water_Sampling/CTD Data/*.btl' + + # Download summary .csv files for all Global cruises. + ./download_rawdata.py 'cruise_data/Cabled/*/Water_Sampling/*Discrete_Summary.csv' + +glob pattern notes: + Patterns are matched against the path relative to the archive root. + Matching is case-insensitive. + * matches any characters within a single path segment + ** matches any characters including path separators + ? matches any single character + [seq] matches any character in seq + """, + ) + parser.add_argument( + "glob", + nargs="?", + default=None, + metavar="GLOB", + help=( + "Glob pattern for files to download, relative to the archive root. " + "Supports *, **, ?, and [seq]. Defaults to all files. " + "Example: 'cruise_data/Cabled/*/Water_Sampling/CTD Data/*.btl'" + ), + ) + parser.add_argument( + "--destination", + type=Path, + default=DEFAULT_DESTINATION, + metavar="DIR", + help=f"Local destination directory (default: {DEFAULT_DESTINATION})", + ) + parser.add_argument( + "--workers", + type=int, + default=DEFAULT_WORKERS, + metavar="N", + help=f"Number of parallel download threads (default: {DEFAULT_WORKERS})", + ) + parser.add_argument( + "--flatten", + action="store_true", + help=( + "Store all files directly in the destination directory, replacing '/' in their " + "relative paths with '__' to form the filename." + ), + ) + parser.add_argument( + "--verbose", + action="store_true", + help="Enable DEBUG-level logging.", + ) + return parser.parse_args(argv) + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(argv) + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + log.info("Start URL : %s", BASE_URL) + log.info("Destination: %s", args.destination.resolve()) + log.info("Workers : %d", args.workers) + if args.glob: + log.info("Glob filter: %s", args.glob) + if args.flatten: + log.info("Flatten : enabled") + + session = requests.Session() + session.headers.update({"User-Agent": "ooi-cruise-data-mirror/1.0"}) + + log.info("Crawling remote directory tree.") + file_urls = crawl(session, BASE_URL, glob=args.glob) + log.info("Found %d file(s) to download.", len(file_urls)) + + if not file_urls: + log.warning("No files found, nothing to do.") + return 0 + + counts: dict[str, int] = {"ok": 0, "skipped": 0, "error": 0} + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + futures = { + pool.submit( + download_file, + session, + url, + args.destination, + args.flatten, + ): url + for url in file_urls + } + for future in as_completed(futures): + _, status = future.result() + counts[status] = counts.get(status, 0) + 1 + + log.info( + "Done. downloaded=%d skipped=%d errors=%d", + counts["ok"], + counts["skipped"], + counts["error"], + ) + + return 1 if counts["error"] > 0 else 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/refresh-discrete-data.sh b/scripts/refresh-discrete-data.sh new file mode 100755 index 0000000..d883c2e --- /dev/null +++ b/scripts/refresh-discrete-data.sh @@ -0,0 +1,99 @@ +#!/usr/bin/env bash +# Download source data files from the OOI raw data archive and parses them into the discrete sample +# CSVs used by the dashboard. Writes to the following locations under `dashboard/public/`: +# - ctd-cast-samples.csv +# - summary-samples.csv +# - source-data/ctd-casts/ (for .btl files) +# - source-data/summaries/ (for *Discrete_Summary.csv files) +# +# Usage: +# ./refresh-discrete-data.sh [TYPE...] [--verbose] +# +# TYPE (optional, repeatable): +# ctd-cast Refresh CTD cast samples (ctd-cast-samples.csv) +# summary Refresh discrete summary samples (summary-samples.csv) +# +# When no TYPE is given, both are refreshed. +# +# Options forwarded to download_rawdata.py: +# --verbose Enable DEBUG-level logging in the download step. + +set -euo pipefail + +SCRIPT_DIRECTORY="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +DISCRETE_DIRECTORY="$SCRIPT_DIRECTORY/../dashboard/public/discrete" + +DOWNLOAD_SCRIPT="$SCRIPT_DIRECTORY/download_rawdata.py" + +TYPES=() +DOWNLOAD_ARGS=() + +for arg in "$@"; do + case "$arg" in + ctd-cast|summary) + TYPES+=("$arg") + ;; + --verbose) + DOWNLOAD_ARGS+=("--verbose") + ;; + -h|--help) + sed -n '2,/^[^#]/{ /^#/{ s/^# \{0,1\}//; p }; /^[^#]/q }' "$0" + exit 0 + ;; + *) + echo "Unknown argument: $arg" >&2 + echo "Usage: $0 [ctd-cast] [summary] [--verbose]" >&2 + exit 1 + ;; + esac +done + +# Default to both types when none are specified. +if [ ${#TYPES[@]} -eq 0 ]; then + TYPES=(ctd-cast summary) +fi + +refresh_ctd_cast() { + local source="$DISCRETE_DIRECTORY/source-data/ctd-casts" + local output="$DISCRETE_DIRECTORY/ctd-cast-samples.csv" + local glob="cruise_data/Cabled/*/Water_Sampling/CTD Data/*.btl" + + echo "==> [ctd-cast] Downloading .btl files into '$source' ..." + python "$DOWNLOAD_SCRIPT" \ + "$glob" \ + --destination "$source" \ + --flatten \ + "${DOWNLOAD_ARGS[@]+"${DOWNLOAD_ARGS[@]}"}" + + echo "==> [ctd-cast] Parsing .btl files into '$output' ..." + python "$SCRIPT_DIRECTORY/collect_discrete_ctd_cast_samples.py" \ + "$source" \ + --out "$output" + echo "==> [ctd-cast] Done. Output written to '$output'." +} + +refresh_summary() { + local source="$DISCRETE_DIRECTORY/source-data/summaries" + local output="$DISCRETE_DIRECTORY/summary-samples.csv" + local glob="cruise_data/Cabled/*/Water_Sampling/*Discrete_Summary.csv" + + echo "==> [summary] Downloading discrete summary CSV files into '$source' ..." + python "$DOWNLOAD_SCRIPT" \ + "$glob" \ + --destination "$source" \ + --flatten \ + "${DOWNLOAD_ARGS[@]+"${DOWNLOAD_ARGS[@]}"}" + + echo "==> [summary] Collecting summary CSVs into '$output' ..." + python "$SCRIPT_DIRECTORY/collect_discrete_summary_samples.py" \ + "$source" \ + --out "$output" + echo "==> [summary] Done. Output written to '$output'." +} + +for type in "${TYPES[@]}"; do + case "$type" in + ctd-cast) refresh_ctd_cast ;; + summary) refresh_summary ;; + esac +done diff --git a/uv.lock b/uv.lock index 9fce587..3248e63 100644 --- a/uv.lock +++ b/uv.lock @@ -1735,15 +1735,19 @@ name = "qaqc-dashboard-environment" version = "0.1.0" source = { virtual = "." } dependencies = [ + { name = "beautifulsoup4" }, { name = "pip" }, { name = "rca-data-tools" }, + { name = "requests" }, { name = "tqdm" }, ] [package.metadata] requires-dist = [ + { name = "beautifulsoup4", specifier = ">=4.14.2" }, { name = "pip" }, { name = "rca-data-tools", git = "https://github.com/OOI-CabledArray/rca-data-tools.git" }, + { name = "requests", specifier = ">=2.32.5" }, { name = "tqdm" }, ]