-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgwas_mtsl.py
177 lines (162 loc) · 5.81 KB
/
gwas_mtsl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Helper module for performing MTSL GWAS inside of MTMLSEM model.
Note that this has little to do with MTMLSEM, it merely fits the classical LMM
model of the kind:
Y = X B + E + U,
where Y and X are deterministic data matrices, B is a matrix of regression
coefficients, E and U are matrices random matrices with U being the random
effect matrix, that takes genetic kinship between individuals into an account.
"""
from itertools import combinations, product
from tqdm import tqdm
import pandas as pd
import numpy as np
from utils import translate_names, unique_mapping
def gwas_lmm(Model, y: list[str], phenos, genes, desc='', init_args=None,
fit_args=None, dropna=True, verbose=True):
"""
Multi-trait single-locus GWAS via linear (possibly mixed) model.
Parameters
----------
Model : class
semopy class.
y : list[str]
List of phenotype names.
phenos : pd.DataFrame
Phenotypes + possibly other variables.
genes : pd.DataFrame
Genotypes/SNPs.
desc : str, optional
Extra model description. The default is ''.
init_args : dict, optional
Extra arguments for Model constructor. The default is None.
fit_args : dict, optional
Extra arguments for Model fit method (e.g., k). The default is None.
dropna : bool, optional
If True, then NaN rows are dropped for each gene test. The default is
True.
Returns
-------
pd.DataFrame
GWAS results.
"""
if init_args is None:
init_args = dict()
if fit_args is None:
fit_args = dict()
res = list()
desc= desc + '\n{} ~ snp'.format(', '.join(y))
for a, b in combinations(y, 2):
desc += f'\n{a} ~~ {b}'
m = Model(desc, **init_args)
phenos = phenos.copy()
it = genes.iteritems()
if verbose:
it = tqdm(list(it))
for name, gene in it:
chr, pos = translate_names(name)
phenos['snp'] = gene.values
if dropna:
data = phenos.dropna()
try:
r = m.fit(data, clean_slate=True, **fit_args)
if type(r) is not tuple:
succ = r.success
fun = r.fun
else:
succ = r[0].success & r[1].success
fun = r[1].fun
except np.linalg.LinAlgError:
succ = False
if not succ:
t = [name, chr, pos, float('nan')] + [1.0] * len(y)
t += [float('nan')] * len(y)
res.append(t)
else:
ins = m.inspect()
ins = ins[(ins['rval'] == 'snp') & (ins['op'] == '~')]
pvals = list()
ests = list()
for _, row in ins.iterrows():
pvals.append(row['p-value'])
ests.append(row['Estimate'])
res.append([name, chr, pos, fun] + pvals + ests)
cols = ['SNP', 'chr', 'pos'] + [f'{p}_p-value' for p in y] + [f'{p}_b'
for p in y]
return pd.DataFrame(res, columns=cols)
def gwas_w(lt):
gs, lt = lt
mod, y, phenos, genes, desc, init_args, fit_args = lt
return gwas(mod, y, phenos, genes[gs], desc, init_args, fit_args,
verbose=False)
def gwas(Model, y: list[str], phenos, genes, desc='', init_args=None,
fit_args=None, num_processes=-1, chunk_size=1000, verbose=True):
"""
Multi-trait single-locus GWAS with multiprocessing support.
Parameters
----------
Model : class
semopy class.
y : list[str]
List of phenotype names.
phenos : pd.DataFrame
Phenotypes + possibly other variables.
genes : pd.DataFrame
Genotypes/SNPs.
desc : str, optional
Extra model description. The default is ''.
init_args : dict, optional
Extra arguments for Model constructor. The default is None.
fit_args : dict, optional
Extra arguments for Model fit method (e.g., k). The default is None.
num_processes : int, optional
Number of processes to run. If -1, then it is selected to number of
avaialable CPU cores minus 1. "None" is the same as 1. The default is
-1.
chunk_size : int, optional
Number of SNPs to be sent onto a single process. The default is 1000.
verbose : bool, optional
If False, then no progress bar will be printed. The default is True.
Returns
-------
pd.DataFrame
GWAS results.
"""
from multiprocessing import Pool, cpu_count
from tqdm.contrib.concurrent import process_map
if num_processes == -1:
num_processes = cpu_count() - 1
if num_processes in (None, 0, 1):
return gwas_lmm(Model, y, phenos, genes, desc, init_args, fit_args,
verbose=verbose)
# We rule out duplicate SNPs to ease the computational burden:
unique = unique_mapping(genes)
genes = genes[list(unique.keys())]
result = None
lt = list(genes.columns)
lt2 = [lt[i:i+chunk_size] for i in range(0, len(lt), chunk_size)]
lt = (Model, y, phenos, genes, desc, init_args, fit_args)
prod = product(lt2, lt)
if not verbose:
with Pool(num_processes) as p:
for t in p.map(gwas_w, prod):
if result is None:
result = t
else:
result = pd.concat([result, t])
else:
for t in process_map(gwas_w, list(prod)):
if result is None:
result = t
else:
result = pd.concat([result, t])
dups = list()
for _, row in result.iterrows():
for dup in unique[row['SNP']]:
row = row.copy()
row['SNP'] = dup
dups.append(row)
for dup in dups:
result = result.append(dup)
return result.sort_values(['chr', 'pos'])