Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

download and load a complete stackexchange project #9

Merged
merged 9 commits into from
May 2, 2019
38 changes: 28 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch
## Dependencies

- [`lxml`](http://lxml.de/installation.html)
- [`psychopg2`](http://initd.org/psycopg/docs/install.html)
- [`psycopg2`](http://initd.org/psycopg/docs/install.html)

## Usage

@@ -18,14 +18,14 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch
`Badges.xml`, `Votes.xml`, `Posts.xml`, `Users.xml`, `Tags.xml`.
- In some old dumps, the cases in the filenames are different.
- Execute in the current folder (in parallel, if desired):
- `python load_into_pg.py Badges`
- `python load_into_pg.py Posts`
- `python load_into_pg.py Tags` (not present in earliest dumps)
- `python load_into_pg.py Users`
- `python load_into_pg.py Votes`
- `python load_into_pg.py PostLinks`
- `python load_into_pg.py PostHistory`
- `python load_into_pg.py Comments`
- `python load_into_pg.py -t Badges`
- `python load_into_pg.py -t Posts`
- `python load_into_pg.py -t Tags` (not present in earliest dumps)
- `python load_into_pg.py -t Users`
- `python load_into_pg.py -t Votes`
- `python load_into_pg.py -t PostLinks`
- `python load_into_pg.py -t PostHistory`
- `python load_into_pg.py -t Comments`
- Finally, after all the initial tables have been created:
- `psql stackoverflow < ./sql/final_post.sql`
- If you used a different database name, make sure to use that instead of
@@ -34,7 +34,25 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch
- `psql stackoverflow < ./sql/optional_post.sql`
- Again, remember to user the correct database name here, if not `stackoverflow`.

## Caveats
## Loading a complete stackexchange project

You can use the script to download a given stackexchange compressed file from
[archive.org](https://ia800107.us.archive.org/27/items/stackexchange/) and load
all the tables at once, using the `-s` switch.

You will need the `urllib` and `libarchive` modules.

If you give a schema name using the `-n` switch, all the tables will be moved
to the given schema. This schema will be created in the script.

To load the _dba.stackexchange.com_ project in the `dba` schema, you would execute:
`./load_into_pg.py -s dba -n dba`

The paths are not changed in the final scripts `sql/final_post.sql` and
`sql/optional_post.sql`. To run them, first set the _search_path_ to your
schema name: `SET search_path TO <myschema>;`

## Caveats and TODOs

- It prepares some indexes and views which may not be necessary for your analysis.
- The `Body` field in `Posts` table is NOT populated by default. You have to use `--with-post-body` argument to include it.
193 changes: 161 additions & 32 deletions load_into_pg.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
import time
import argparse
import psycopg2 as pg
import os
import row_processor as Processor
import six
import json
@@ -12,6 +13,51 @@
('Posts', 'ViewCount'): "NULLIF(%(ViewCount)s, '')::int"
}

# part of the file already downloaded
file_part = None

def show_progress(block_num, block_size, total_size):
"""Display the total size of the file to download and the progess in percent"""
global file_part
if file_part is None:
suffixes=['B','KB','MB','GB','TB']
suffixIndex = 0
pp_size = total_size
while pp_size > 1024:
suffixIndex += 1 #increment the index of the suffix
pp_size = pp_size/1024.0 #apply the division
six.print_('Total file size is: {0:.1f} {1}'.format(pp_size,suffixes[suffixIndex]))
six.print_("0 % of the file downloaded ...\r", end="", flush=True)
file_part = 0

downloaded = block_num * block_size
if downloaded < total_size:
percent = 100 * downloaded / total_size
if percent - file_part > 1:
file_part = percent
six.print_("{0} % of the file downloaded ...\r".format(int(percent)), end="", flush=True)
else:
file_part = None
six.print_("")

def buildConnectionString(dbname, mbHost, mbPort, mbUsername, mbPassword):
dbConnectionParam = "dbname={}".format(dbname)

if mbPort is not None:
dbConnectionParam += ' port={}'.format(mbPort)

if mbHost is not None:
dbConnectionParam += ' host={}'.format(mbHost)

# TODO Is the escaping done here correct?
if mbUsername is not None:
dbConnectionParam += ' user={}'.format(mbUsername)

# TODO Is the escaping done here correct?
if mbPassword is not None:
dbConnectionParam += ' password={}'.format(mbPassword)
return dbConnectionParam

def _makeDefValues(keys):
"""Returns a dictionary containing None for all keys."""
return dict(( (k, None) for k in keys ))
@@ -150,7 +196,7 @@ def _getTableKeys(table):
]
return keys

def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, mbUsername, mbPassword):
def handleTable(table, insertJson, createFk, mbDbFile, dbConnectionParam):
"""Handle the table including the post/pre processing."""
keys = _getTableKeys(table)
dbFile = mbDbFile if mbDbFile is not None else table + '.xml'
@@ -165,23 +211,6 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
six.print_("Could not load pre/post/fk sql. Are you running from the correct path?", file=sys.stderr)
sys.exit(-1)

dbConnectionParam = "dbname={}".format(dbname)

if mbPort is not None:
dbConnectionParam += ' port={}'.format(mbPort)

if mbHost is not None:
dbConnectionParam += ' host={}'.format(mbHost)

# TODO Is the escaping done here correct?
if mbUsername is not None:
dbConnectionParam += ' user={}'.format(mbUsername)

# TODO Is the escaping done here correct?
if mbPassword is not None:
dbConnectionParam += ' password={}'.format(mbPassword)


try:
with pg.connect(dbConnectionParam) as conn:
with conn.cursor() as cur:
@@ -208,23 +237,23 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
' VALUES\n' + valuesStr + ';'
cur.execute(cmd)
conn.commit()
six.print_('Table {0} processing took {1:.1f} seconds'.format(table, time.time() - start_time))
six.print_('Table \'{0}\' processing took {1:.1f} seconds'.format(table, time.time() - start_time))

# Post-processing (creation of indexes)
start_time = time.time()
six.print_('Post processing ...')
if post != '':
cur.execute(post)
conn.commit()
six.print_('Post processing took {} seconds'.format(time.time() - start_time))
six.print_('Post processing took {0:.1f} seconds'.format(time.time() - start_time))
if createFk:
# fk-processing (creation of foreign keys)
start_time = time.time()
six.print_('fk processing ...')
if post != '':
cur.execute(fk)
conn.commit()
six.print_('fk processing took {} seconds'.format(time.time() - start_time))
six.print_('fk processing took {0:.1f} seconds'.format(time.time() - start_time))

except IOError as e:
six.print_("Could not read from file {}.".format(dbFile), file=sys.stderr)
@@ -237,12 +266,32 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
six.print_("Warning from the database.", file=sys.stderr)
six.print_("pg.Warning: {0}".format(str(w)), file=sys.stderr)


def moveTableToSchema(table, schemaName, dbConnectionParam):
try:
with pg.connect(dbConnectionParam) as conn:
with conn.cursor() as cur:
# create the schema
cur.execute('CREATE SCHEMA IF NOT EXISTS ' + schemaName + ';')
conn.commit()
# move the table to the right schema
cur.execute('ALTER TABLE '+table+' SET SCHEMA ' + schemaName + ';')
conn.commit()
except pg.Error as e:
six.print_("Error in dealing with the database.", file=sys.stderr)
six.print_("pg.Error ({0}): {1}".format(e.pgcode, e.pgerror), file=sys.stderr)
six.print_(str(e), file=sys.stderr)
except pg.Warning as w:
six.print_("Warning from the database.", file=sys.stderr)
six.print_("pg.Warning: {0}".format(str(w)), file=sys.stderr)

#############################################################

parser = argparse.ArgumentParser()
parser.add_argument( 'table'
parser.add_argument( '-t', '--table'
, help = 'The table to work on.'
, choices = ['Users', 'Badges', 'Posts', 'Tags', 'Votes', 'PostLinks', 'PostHistory', 'Comments']
, default = None
)

parser.add_argument( '-d', '--dbname'
@@ -255,6 +304,22 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
, default = None
)

parser.add_argument( '-s', '--so-project'
, help = 'StackExchange project to load.'
, default = None
)

parser.add_argument( '--archive-url'
, help = 'URL of the archive directory to retrieve.'
, default = 'https://ia800107.us.archive.org/27/items/stackexchange'
)

parser.add_argument( '-k', '--keep-archive'
, help = 'Will preserve the downloaded archive instead of deleting it.'
, action = 'store_true'
, default = False
)

parser.add_argument( '-u', '--username'
, help = 'Username for the database.'
, default = None
@@ -287,6 +352,11 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
, default = False
)

parser.add_argument( '-n', '--schema-name'
, help = 'Use specific schema.'
, default = 'public'
)

parser.add_argument( '--foreign-keys'
, help = 'Create foreign keys.'
, action = 'store_true'
@@ -295,22 +365,81 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m

args = parser.parse_args()

table = args.table

try:
# Python 2/3 compatibility
input = raw_input
except NameError:
pass

dbConnectionParam = buildConnectionString(args.dbname, args.host, args.port, args.username, args.password)

# load given file in table
if args.file and args.table:
table = args.table

if table == 'Posts':
# If the user has not explicitly asked for loading the body, we replace it with NULL
if not args.with_post_body:
specialRules[('Posts', 'Body')] = 'NULL'

choice = input('This will drop the {} table. Are you sure [y/n]?'.format(table))
if len(choice) > 0 and choice[0].lower() == 'y':
handleTable(table, args.insert_json, args.foreign_keys, args.file, dbConnectionParam)
else:
six.print_("Cancelled.")
if args.schema_name != 'public':
moveTableToSchema(table, args.schema_name, dbConnectionParam)
exit(0)

# load a project
elif args.so_project:
import libarchive
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you verify that you are using libarchive-c library instead of libarchive?

I will add this to the README.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am indeed using libarchive-c (in version 2.8).

import tempfile

filepath = None
if args.file:
filepath = args.file
url = filepath
else:
# download the 7z archive in tempdir
file_name = args.so_project + '.stackexchange.com.7z'
url = '{0}/{1}'.format(args.archive_url, file_name)
temp_dir = tempfile.gettempdir()
if temp_dir == 'None':
six.print_('WARNING: Could not find temporary directory. Use current directory instead.')
temp_dir = os.getcwd()
filepath = os.path.join(temp_dir, file_name)
six.print_('Downloading the archive in {0}'.format(filepath))
six.print_('please be patient ...')
try:
six.moves.urllib.request.urlretrieve(url, filepath, show_progress)
except Exception as e:
six.print_('Error: impossible to download the {0} archive ({1})'.format(url, e))
exit(1)

try:
libarchive.extract_file(filepath)
except Exception as e:
six.print_('Error: impossible to extract the {0} archive ({1})'.format(url, e))
exit(1)

tables = [ 'Tags', 'Users', 'Badges', 'Posts', 'Comments', 'Votes', 'PostLinks', 'PostHistory' ]

for table in tables:
six.print_('Load {0}.xml file'.format(table))
handleTable(table, args.insert_json, args.foreign_keys, None, dbConnectionParam)
# remove file
os.remove(table+'.xml')

if not args.keep_archive:
# remove archive
os.remove(filepath)

if table == 'Posts':
# If the user has not explicitly asked for loading the body, we replace it with NULL
if not args.with_post_body:
specialRules[('Posts', 'Body')] = 'NULL'
if args.schema_name != 'public':
for table in tables:
moveTableToSchema(table, args.schema_name, dbConnectionParam)
exit(0)

choice = input('This will drop the {} table. Are you sure [y/n]? '.format(table))
if len(choice) > 0 and choice[0].lower() == 'y':
handleTable(table, args.insert_json, args.foreign_keys, args.dbname, args.file, args.host, args.port, args.username, args.password)
else:
six.print_("Cancelled.")
six.print_("Error: you must either use '-f' and '-t' arguments or the '-s' argument.")
parser.print_help()
2 changes: 1 addition & 1 deletion sql/Votes_pre.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
DROP TABLE IF EXISTS Votes CASCADE;
CREATE TABLE Votes (
Id int PRIMARY KEY ,
PostId int not NULL ,
PostId int , -- not NULL ,
VoteTypeId int not NULL ,
UserId int ,
CreationDate timestamp not NULL ,