Skip to content

Commit dc6edc7

Browse files
committed
wip testing
1 parent 3b1afc2 commit dc6edc7

File tree

5 files changed

+148
-100
lines changed

5 files changed

+148
-100
lines changed

README.md

+7-3
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ python -m src.run_algo --help
4141

4242
### Implementing a runtime
4343

44-
The algorithm works generall in three phases:
44+
The algorithm works generally in three phases:
4545

46-
1. Get the prior data for that year and any number of prior years.
46+
1. Get the prior data for that year.
4747
2. Run the label propagation algorithm
4848
3. Update the posterior data for that year
4949

@@ -57,13 +57,17 @@ MaybeSparseMatrix = Union[np.ndarray, sp.spmatrix]
5757
get_data(
5858
year: int,
5959
logger: logging.Logger
60-
) -> Tuple[MaybeSparseMatrix, np.ndarray, np.ndarray]:
60+
) -> Iterable[Tuple[MaybeSparseMatrix, np.ndarray, np.ndarray]]:
6161
```
6262

6363
This function accepts a year and a logger and returns a tuple of the following:
6464
- The adjacency matrix
6565
- The auids
6666
- The prior for the auids
67+
wrapped by an iterable. This is because the graph may be disconnected and
68+
you may want to parse it in pieces. However, you could parse the entire
69+
graph at and then the iterable would only contain one element.
70+
6771

6872
The second function you need to implement is
6973

requirements/requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ numpy
22
scipy
33
scikit-learn
44
pandas
5+
pandarallel
56
pyarrow

src/backend/elsevier.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,13 @@
2727
import scipy.sparse as sparse
2828

2929

30-
def get_data(year: int, logger: logging.Logger) -> Tuple[sparse.csr_matrix, np.ndarray, np.ndarray]:
30+
def get_data(
31+
year: int, logger: logging.Logger
32+
) -> Tuple[sparse.csr_matrix, np.ndarray, np.ndarray]:
3133
raise NotImplementedError("Not implemented in the Elsevier backend.")
3234

3335

34-
def update_posterior(auids: np.ndarray, posterior: np.ndarray, year: int) -> None:
36+
def update_posterior(
37+
auids: np.ndarray, posterior: np.ndarray, year: int, logger: logging.Logger
38+
) -> None:
3539
raise NotImplementedError("Not implemented in the Elsevier backend.")

src/backend/sciserver.py

+78-80
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,25 @@
3232
import numpy as np
3333
import pandas as pd
3434
import scipy.sparse as sparse
35+
3536
try:
3637
from pandarallel import pandarallel
38+
3739
pandarallel.initialize(progress_bar=True)
38-
parallel_apply = True
40+
PARALLEL_APPLY = True
3941
except ImportError:
40-
warnings.warn("pandarallel not installed, parallel processing will not be available.")
41-
parallel_apply = False
42-
42+
warnings.warn(
43+
"pandarallel not installed, parallel processing will not be available."
44+
)
45+
PARALLEL_APPLY = False
4346

4447
import src.utils.log_time as log_time
4548

4649
MIN_ARR_SIZE_FOR_CACHE = 10_000
4750

4851
POSTIEOR_DATA_PATH = "./data/posterior_y_{year}.parquet"
4952

53+
5054
def default_combine_posterior_prior_y_func(arrs: List[np.ndarray]) -> np.ndarray:
5155
"""Default function for combining the posterior for years t-1..t-n and the prior for year t.
5256
@@ -195,21 +199,21 @@ def build_adjacency_matrix(
195199

196200
def calculate_prior_y_from_eids(
197201
auids: np.ndarray,
198-
auid_eids: pd.Series, # auid:int -> eids:List[int]
199-
eid_score: pd.Series, # eid:int -> score:float
202+
auid_eids: pd.Series, # auid:int -> eids:List[int]
203+
eid_score: pd.Series, # eid:int -> score:float
200204
agg_score_func: Callable[[np.ndarray], float] = np.mean,
201205
) -> np.ndarray:
202206

203207
selected_eids = auid_eids[auids]
204208

205-
if len(selected_eids) > MIN_ARR_SIZE_FOR_CACHE and parallel_apply:
209+
if len(selected_eids) > MIN_ARR_SIZE_FOR_CACHE or PARALLEL_APPLY:
206210
y = selected_eids.parallel_apply(
207211
lambda eids: agg_score_func(eid_score[eids])
208212
).astype(eid_score.dtype)
209213
else:
210-
y = selected_eids.apply(
211-
lambda eids: agg_score_func(eid_score[eids])
212-
).astype(eid_score.dtype)
214+
y = selected_eids.apply(lambda eids: agg_score_func(eid_score[eids])).astype(
215+
eid_score.dtype
216+
)
213217

214218
return y
215219

@@ -247,68 +251,43 @@ def get_previous_posterior(
247251
return post_s.reindex(auids).values
248252

249253

250-
# def calculate_prior_y(
251-
# auids: np.ndarray,
252-
# auid_eids: pd.Series,
253-
# eid_score: pd.Series,
254-
# year: int,
255-
# prior_y_aggregate_eid_score_func: Callable[[np.ndarray], float] = np.mean,
256-
# combine_posterior_prior_y_func: Callable[
257-
# [List[np.ndarray]], np.ndarray
258-
# ] = default_combine_posterior_prior_y_func,
259-
# posterior_y_missing_value: float = 0.5,
260-
# ) -> np.ndarray:
261-
262-
# # get all of eids for each auid
263-
# selected_eids = auid_eids[auids]
264-
265-
# prior_y = selected_eids.apply(
266-
# lambda eids: prior_y_aggregate_eid_score_func(eid_score[eids])
267-
# )
268-
269-
# # TODO: support an arbitrary number of years
270-
# posterior_y_path = f"./data/posterior_y_{year}.parquet"
271-
# if os.path.exists(posterior_y_path):
272-
# posterior_y_dframe = pd.read_parquet(posterior_y_path)
273-
274-
# known_auids = posterior_y_dframe.index.values
275-
# new_auids = set(auids) - set(known_auids)
276-
# posterior_y_t_minus_1 = pd.Series(
277-
# data=posterior_y_dframe["score"].values,
278-
# index=known_auids,
279-
# )
280-
# del posterior_y_dframe
281-
282-
# # if there are new ids tat we haven't seen before, we need to add them
283-
# # with the default value.
284-
# if new_auids:
285-
# for auid in new_auids:
286-
# posterior_y_t_minus_1[auid] = posterior_y_missing_value
287-
# posterior_y_t_minus_1.sort_index(inplace=True)
288-
289-
# # There is a chance that there are less auids in the prior_y than in the
290-
# # posterior_y. If that is the case, we need to limit the calculation
291-
# # to the auids that are in both.
292-
# if len(posterior_y_t_minus_1) > 0:
293-
# posterior_matched = posterior_y_t_minus_1.index.intersection(prior_y.index)
294-
# posterior_y_t_minus_1 = posterior_y_t_minus_1[posterior_matched]
295-
# prior_y = combine_posterior_prior_y_func(
296-
# np.stack([prior_y, posterior_y_t_minus_1], axis=1),
297-
# )
298-
299-
# return prior_y
300-
301-
302254
def get_data(
303255
year: int,
304256
logger: logging.Logger = None,
305257
prior_y_aggregate_eid_score_func: Callable[[np.ndarray], float] = np.mean,
306258
n_years_lookback: int = 1,
307-
combine_posterior_prior_y_func: Callable[[List[np.ndarray]], np.ndarray] = default_combine_posterior_prior_y_func,
259+
combine_posterior_prior_y_func: Callable[
260+
[List[np.ndarray]], np.ndarray
261+
] = default_combine_posterior_prior_y_func,
308262
adj_mat_dtype: np.dtype = bool,
309263
numeric_types: np.dtype = np.float32,
310264
operate_on_subgraphs_separately: bool = False,
311265
) -> Iterable[Tuple[sparse.csr_matrix, np.ndarray, np.ndarray]]:
266+
"""The get_data function for the SciServer backend.
267+
268+
Args:
269+
year (int): The year to get the data for.
270+
logger (logging.Logger): The logger to use. Defaults to None.
271+
prior_y_aggregate_eid_score_func (Callable[[np.ndarray], float]): A function
272+
that takes an array of scores and returns a single score. Defaults to
273+
np.mean.
274+
n_years_lookback (int): The number of years to look back when getting the
275+
previous posterior. Defaults to 1.
276+
combine_posterior_prior_y_func (Callable[[List[np.ndarray]], np.ndarray]): A
277+
function that takes a list of arrays and returns a single array. Defaults
278+
to default_combine_posterior_prior_y_func.
279+
adj_mat_dtype (np.dtype): The data type of the adjacency matrix. Defaults to
280+
bool.
281+
numeric_types (np.dtype): The data type of the numeric values. Defaults to
282+
np.float32.
283+
operate_on_subgraphs_separately (bool): Whether to operate on subgraphs
284+
separately. Defaults to False.
285+
286+
Yields:
287+
Iterable[Tuple[sparse.csr_matrix, np.ndarray, np.ndarray]]: An iterable with
288+
the adjacency matrix, the auids and the prior_y for the given year.
289+
290+
"""
312291

313292
os.makedirs("./data/cache", exist_ok=True)
314293

@@ -346,22 +325,34 @@ def get_data(
346325
for i, auids in enumerate(auids_iter):
347326

348327
if len(auids) > MIN_ARR_SIZE_FOR_CACHE:
349-
#TODO: This section might be too hard to read
328+
# TODO: This section might be too hard to read
350329
logger.info(f"n auids: {len(auids)}, looking for cached adjacency matrix")
351-
if os.path.exists(f"./data/cache/{'iter_'*operate_on_subgraphs_separately}adjacency_matrix_{year}_{i}.npz"):
330+
if os.path.exists(
331+
f"./data/cache/{'iter_'*operate_on_subgraphs_separately}adjacency_matrix_{year}_{i}.npz"
332+
):
352333
logger.info("Found cached adjacency matrix, loading...")
353334
with log_time.LogTime(f"Loading adjacency matrix {i}", logger):
354-
A = sparse.load_npz(f"./data/cache/{'iter_'*operate_on_subgraphs_separately}adjacency_matrix_{year}_{i}.npz")
355-
auids = np.load(f"./data/cache/{'iter_'*operate_on_subgraphs_separately}auids_{year}_{i}.npy")
335+
A = sparse.load_npz(
336+
f"./data/cache/{'iter_'*operate_on_subgraphs_separately}adjacency_matrix_{year}_{i}.npz"
337+
)
338+
auids = np.load(
339+
f"./data/cache/{'iter_'*operate_on_subgraphs_separately}auids_{year}_{i}.npy"
340+
)
356341
else:
357342
logger.info("No cached adjacency matrix found, building...")
358343
with log_time.LogTime(f"Building adjacency matrix {i}", logger):
359344
auids, A = adj_mat_func(auids)
360345
with log_time.LogTime(f"Caching adjacency matrix {i}", logger):
361346
logger.info(f"Saving adjacency matrix to cache")
362-
sparse.save_npz(f"./data/cache/{'iter_'*operate_on_subgraphs_separately}adjacency_matrix_{year}_{i}.npz", A)
347+
sparse.save_npz(
348+
f"./data/cache/{'iter_'*operate_on_subgraphs_separately}adjacency_matrix_{year}_{i}.npz",
349+
A,
350+
)
363351
logger.info(f"Saving auids to cache")
364-
np.save(f"./data/cache/{'iter_'*operate_on_subgraphs_separately}auids_{year}_{i}.npy", auids)
352+
np.save(
353+
f"./data/cache/{'iter_'*operate_on_subgraphs_separately}auids_{year}_{i}.npy",
354+
auids,
355+
)
365356
else:
366357
with log_time.LogTime(f"Building adjacency matrix {i}", logger):
367358
auids, A = adj_mat_func(auids)
@@ -374,18 +365,25 @@ def get_data(
374365
prior_y_aggregate_eid_score_func,
375366
)
376367

377-
with log_time.LogTime(f"Retrieving posteriors for previous {n_years_lookback} years", logger):
378-
previous_posteriors = list(filter(
379-
lambda x: x is not None,
380-
map(
381-
lambda year: get_previous_posterior(auids, year),
382-
range(year - 1, year - n_years_lookback - 1, -1)
383-
),
384-
))
385-
386-
with log_time.LogTime(f"Combining posteriors for previous {n_years_lookback} years", logger):
387-
prior_y = combine_posterior_prior_y_func([prior_y_eids] + previous_posteriors)
368+
with log_time.LogTime(
369+
f"Retrieving posteriors for previous {n_years_lookback} years", logger
370+
):
371+
previous_posteriors = list(
372+
filter(
373+
lambda x: x is not None,
374+
map(
375+
lambda year: get_previous_posterior(auids, year),
376+
range(year - 1, year - n_years_lookback - 1, -1),
377+
),
378+
)
379+
)
388380

381+
with log_time.LogTime(
382+
f"Combining posteriors for previous {n_years_lookback} years", logger
383+
):
384+
prior_y = combine_posterior_prior_y_func(
385+
[prior_y_eids] + previous_posteriors
386+
)
389387

390388
yield A, auids, prior_y.astype(numeric_types)
391389

0 commit comments

Comments
 (0)