-
Notifications
You must be signed in to change notification settings - Fork 94
Expand file tree
/
Copy pathsensitivity.py
More file actions
262 lines (226 loc) · 8.98 KB
/
Copy pathsensitivity.py
File metadata and controls
262 lines (226 loc) · 8.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
"""Sensitivity analysis for MCDA weight stability."""
from __future__ import annotations
import numpy as np
import xarray as xr
from xrspatial.mcda.combine import (
_check_wpm_positive,
_validate_criteria,
_validate_weights,
wlc,
wpm,
)
from xrspatial.mcda.standardize import _get_xp
from xrspatial.utils import _dask_task_name_kwargs
def sensitivity(
criteria: xr.Dataset,
weights: dict[str, float],
method: str = "one_at_a_time",
combine_method: str = "wlc",
delta: float = 0.05,
n_samples: int = 1000,
seed: int = 42,
name: str = "sensitivity",
) -> xr.DataArray | xr.Dataset:
"""Assess how weight changes affect the suitability surface.
Parameters
----------
criteria : xr.Dataset
Standardized criterion layers.
weights : dict
Base weights (must sum to ~1.0).
method : str
``"one_at_a_time"`` or ``"monte_carlo"``.
combine_method : str
Combination function: ``"wlc"`` or ``"wpm"``.
delta : float
Perturbation magnitude for one-at-a-time (default 0.05).
n_samples : int
Number of random weight vectors for Monte Carlo (default 1000).
Must be >= 1 when ``method="monte_carlo"``.
seed : int
Random seed for Monte Carlo reproducibility.
name : str
Name prefix for output arrays.
Returns
-------
xr.Dataset or xr.DataArray
For ``"one_at_a_time"``: Dataset with per-criterion DataArrays
showing the absolute score difference under perturbation.
For ``"monte_carlo"``: DataArray of per-pixel coefficient of
variation across random weight samples. Dask-backed criteria
produce a lazy dask-backed result; the sample loop runs inside
each chunk, so memory stays bounded by chunk size.
"""
combine_fn = {"wlc": wlc, "wpm": wpm}.get(combine_method)
if combine_fn is None:
raise ValueError(
f"Unknown combine_method {combine_method!r}. "
"Choose 'wlc' or 'wpm'"
)
if method == "one_at_a_time":
return _oat(criteria, weights, combine_fn, delta, name)
elif method == "monte_carlo":
if not isinstance(n_samples, (int, np.integer)) or n_samples < 1:
raise ValueError(
f"n_samples must be a positive integer >= 1, got {n_samples!r}"
)
return _monte_carlo(
criteria, weights, combine_method, int(n_samples), seed, name
)
else:
raise ValueError(
f"Unknown method {method!r}. "
"Choose 'one_at_a_time' or 'monte_carlo'"
)
def _perturb_weights(
weights: dict[str, float], target: str, delta: float
) -> tuple[dict[str, float], dict[str, float]]:
"""Create +delta and -delta weight variants for one criterion.
The target criterion's weight shifts by delta; remaining weights
are rescaled proportionally to maintain a sum of 1.0.
"""
names = list(weights.keys())
base_w = weights[target]
others = [k for k in names if k != target]
other_sum = sum(weights[k] for k in others)
# Single criterion: weight must stay at 1.0, perturbation is a no-op
if not others:
copy = dict(weights)
return copy, dict(copy)
results = []
for sign in (+1, -1):
new_target = max(0.0, min(1.0, base_w + sign * delta))
remainder = 1.0 - new_target
perturbed = {}
for k in names:
if k == target:
perturbed[k] = new_target
else:
if other_sum > 0:
perturbed[k] = weights[k] / other_sum * remainder
else:
perturbed[k] = remainder / len(others)
results.append(perturbed)
return results[0], results[1]
def _oat(criteria, weights, combine_fn, delta, name):
"""One-at-a-time sensitivity: perturb each weight +/- delta."""
result_vars = {}
for criterion in weights:
w_plus, w_minus = _perturb_weights(weights, criterion, delta)
score_plus = combine_fn(criteria, w_plus)
score_minus = combine_fn(criteria, w_minus)
diff = abs(score_plus - score_minus)
diff.name = f"{name}_{criterion}"
result_vars[criterion] = diff
return xr.Dataset(result_vars)
def _is_dask_dataset(criteria):
"""Check if any variable in the Dataset is backed by dask."""
try:
import dask.array as da
return any(
isinstance(criteria[v].data, da.Array)
for v in criteria.data_vars
)
except ImportError:
return False
def _mc_block(block, weight_matrix, combine_method):
"""Run the Monte Carlo sample loop on one in-memory block.
``block`` has shape ``(n_criteria,) + grid`` with weight columns in
the same criterion order. Welford's online algorithm keeps only the
running mean and M2 buffers alive, so peak memory per block is a
few times the block size regardless of ``n_samples``. Works on
numpy and cupy blocks alike.
"""
xp = _get_xp(block)
n_samples = weight_matrix.shape[0]
n_criteria = block.shape[0]
shape = block.shape[1:]
running_mean = xp.zeros(shape, dtype=np.float64)
running_m2 = xp.zeros(shape, dtype=np.float64)
for i in range(n_samples):
w = weight_matrix[i]
# Same per-pixel arithmetic as combine.wlc / combine.wpm (the
# canonical definitions), inlined on raw arrays so the loop
# runs inside a single dask chunk. Keep in sync with those.
if combine_method == "wpm":
score = block[0] ** float(w[0])
for j in range(1, n_criteria):
score = score * block[j] ** float(w[j])
else:
score = block[0] * float(w[0])
for j in range(1, n_criteria):
score = score + block[j] * float(w[j])
delta_val = score - running_mean
running_mean = running_mean + delta_val / (i + 1)
delta2 = score - running_mean
running_m2 = running_m2 + delta_val * delta2
variance = running_m2 / n_samples
std_dev = xp.sqrt(variance)
# Coefficient of variation
with np.errstate(divide="ignore", invalid="ignore"):
return xp.where(
running_mean != 0, std_dev / xp.abs(running_mean), 0.0
)
def _monte_carlo(criteria, weights, combine_method, n_samples, seed, name):
"""Monte Carlo sensitivity: random weight vectors, measure CV."""
rng = np.random.default_rng(seed)
n_criteria = len(weights)
criteria_names = list(weights.keys())
# Generate random weight vectors using Dirichlet distribution
# Use base weights as concentration parameters (scaled up)
alpha = np.array([weights[k] for k in criteria_names]) * n_criteria * 5
alpha = np.maximum(alpha, 0.5) # floor to avoid zero concentrations
# Draw sequentially so a given seed reproduces the sample stream of
# the previous per-iteration implementation.
weight_matrix = np.stack(
[rng.dirichlet(alpha) for _ in range(n_samples)]
)
# Validate once up front with the first sampled vector (it is
# finite and sums to 1, so only key-set mismatches can fire) the
# way the first combine call used to.
_validate_criteria(criteria)
probe = {k: float(weight_matrix[0][j])
for j, k in enumerate(criteria_names)}
_validate_weights(probe, criteria)
if combine_method == "wpm":
_check_wpm_positive(criteria)
first_var = list(criteria.data_vars)[0]
template = criteria[first_var]
# Stack criterion layers in data_vars order (the order wlc/wpm
# combine in) and permute the weight columns to match.
var_order = list(criteria.data_vars)
col = {k: j for j, k in enumerate(criteria_names)}
ordered_weights = weight_matrix[:, [col[v] for v in var_order]]
if _is_dask_dataset(criteria):
import dask.array as da
base_chunks = next(
criteria[v].data.chunks for v in var_order
if isinstance(criteria[v].data, da.Array)
)
layers = []
for v in var_order:
arr = criteria[v].data
if isinstance(arr, da.Array):
arr = arr.rechunk(base_chunks)
else:
arr = da.from_array(arr, chunks=base_chunks)
layers.append(arr)
# The sample loop runs inside one chunk function, so the graph
# stays at one task per chunk and nothing materializes the full
# criteria stack.
stacked = da.stack(layers).rechunk({0: -1})
cv_vals = da.map_blocks(
_mc_block, stacked,
weight_matrix=ordered_weights, combine_method=combine_method,
drop_axis=0, dtype=np.float64,
meta=stacked._meta.sum(axis=0),
**_dask_task_name_kwargs('xrspatial.sensitivity'),
)
else:
stacked = np.stack([criteria[v].data for v in var_order])
cv_vals = _mc_block(stacked, ordered_weights, combine_method)
result = xr.DataArray(
cv_vals, name=name, dims=template.dims,
coords=template.coords, attrs=template.attrs,
)
return result