forked from nbrinckm/quakeledger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patheventdataprovider.py
174 lines (140 loc) · 5.04 KB
/
eventdataprovider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/env python3
"""
Classes to load the data from the database
"""
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from eventdb import Site, DeaggregationData, Event
def read_from_sql(query):
"""
Utitily function to load all the data from a sql query
and to fill the None values with nans
"""
dataframe = pd.read_sql(query.statement, query.session.bind)
dataframe.fillna(pd.np.nan, inplace=True)
return dataframe
class SiteProvider:
"""
Class to provide access to the site data
"""
def __init__(self, query):
self.query = query
def get_nearest(self, lon, lat):
"""
Returns the site that is the nearest to the given position
"""
sites = read_from_sql(self.query)
return SiteProvider._find_nearest(sites, lon, lat)
@staticmethod
def _find_nearest(sites, lon, lat):
squared_dists = (sites.lon - lon) ** 2 + (sites.lat - lat) ** 2
nearest_idx = squared_dists.idxmin()
return sites.iloc[nearest_idx]
class DeaggregationProvider:
"""
Class to provide access to the deaggregation data
"""
def __init__(self, query):
self.query = query
def get_all_for_site_and_poe(self, site, poe):
"""
Returns the deaggregation data for one given site
and a given probability for 50 years
"""
return self._get_all_for_sid_and_poe(site.sid, poe)
def _get_all_for_sid_and_poe(self, sid, poe):
query = self.query.filter_by(sid=sid)
query = query.filter_by(poe50y=poe)
query = query.order_by(DeaggregationData.id)
return read_from_sql(query)
class EventProvider:
"""
Class to provide access to the event data
"""
def __init__(self, query):
self.query = query
def add_filter_type(self, etype, arg_probability):
"""
Adds a filter for the type and the probability
in the catalog
Returns nothing
"""
if etype in ["expert", "observed"]:
self.query = self.query.filter_by(type_=etype)
elif etype in ["stochastic"]:
self.query = self.query.filter_by(type_=etype)
self.query = self.query.filter(Event.probability > arg_probability)
elif etype in ["deaggregation"]:
self.query = self.query.filter_by(type_="stochastic")
def add_filter_magnitude(self, mmin, mmax):
"""
Adds a filter for the magnitude
Returns nothing
"""
self.query = self.query.filter(Event.magnitude >= mmin)
self.query = self.query.filter(Event.magnitude <= mmax)
def add_filter_spatial(self, lonmin, lonmax, latmin, latmax, zmin, zmax):
"""
Adds a spatial filter
Returns nothing
"""
self.query = self.query.filter(Event.longitude >= lonmin)
self.query = self.query.filter(Event.longitude <= lonmax)
self.query = self.query.filter(Event.latitude >= latmin)
self.query = self.query.filter(Event.latitude <= latmax)
self.query = self.query.filter(Event.depth >= zmin)
self.query = self.query.filter(Event.depth <= zmax)
def add_ordering_magnitude_desc(self):
"""
Makes sure that the events with the highest magnitude are the first ones
Returns nothing
"""
self.query = self.query.order_by(Event.magnitude.desc())
def get_results(self):
"""
Returns the results as a pandas dataframe
"""
return read_from_sql(self.query)
class DataProvider:
"""
Overall data provider class
to give access to the more specific
data providers
"""
def __init__(self, session):
self.session = session
def create_provider_for_sites(self):
"""SiteProvider"""
return SiteProvider(self.session.query(Site))
def create_provider_for_mean_deagg(self):
"""DeaggregationProvider"""
return DeaggregationProvider(self.session.query(DeaggregationData))
def create_provider_for_events(self):
"""EventProvider"""
return EventProvider(self.session.query(Event))
class Database:
"""
Class to store the data for the database connect.
Should be used with an "with-Statement" to give the
access to the data provider.
"""
def __init__(self, sql_connection_string):
self.sql_connection_string = sql_connection_string
self.session = None
def __enter__(self):
engine = create_engine(self.sql_connection_string)
session_maker = sessionmaker(bind=engine)
self.session = session_maker()
return DataProvider(self.session)
def __exit__(self, exception_type, exception_value, trackback):
self.session.close()
@classmethod
def from_local_sql_db(cls, folder, filename):
"""
Creates the connection string using a local sqlite database
Then creates the database instance
and returns that
"""
sql_connection = "sqlite:///" + folder + filename
return cls(sql_connection)