-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdb.py
69 lines (50 loc) · 1.98 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
'''
Wrapper around the master database, master.sqlite, which contains all raw and
intermediate data.
'''
import logging
import os
import sqlalchemy
from sqlalchemy.ext.compiler import compiles, deregister
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import Insert
from base import Base
session = None
class ExtraSession(Session):
'''
Simply an SQLAlchemy Session class, with some extra goodies thrown in.
'''
def bulk_save_objects_with_replace(self, objects):
'''
Like Session.bulk_save_objects, but replaces any whose primary key is
already present. Only works on SQLite.
'''
# https://stackoverflow.com/questions/2218304/sqlalchemy-insert-ignore
# This is a bit hacky, because the deregister call will remove *all*
# visitors, not the one we just registered. But I don't see a better
# way right now.
def _prefix_insert_with_replace(insert, compiler, **kw):
return compiler.visit_insert(insert.prefix_with('OR REPLACE'), **kw)
compiles(Insert)(_prefix_insert_with_replace)
try:
self.bulk_save_objects(objects)
finally:
deregister(Insert)
def _create_engine(file_name):
logging.info(f'Opening database {file_name}')
return sqlalchemy.create_engine('sqlite:///' + file_name,
echo=os.environ.get('ECHO_SQL') == '1')
def create_connection(file_name):
'''
Creates a connecting (no ORM session) to an arbitrary database.
'''
return _create_engine(file_name).connect()
_session_factory = sqlalchemy.orm.sessionmaker(class_=ExtraSession)
def create_session(file_name):
'''
Creates and returns a new session and populates it with the tables of the
master database.
'''
return _session_factory(bind=_create_engine(file_name))
def create_master_schema(session): # pylint: disable=redefined-outer-name
Base.metadata.create_all(session.connection().engine)