diff --git a/README.md b/README.md index 26ccbf4..36e9b2b 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,8 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch ## Dependencies - [`lxml`](http://lxml.de/installation.html) - - [`psychopg2`](http://initd.org/psycopg/docs/install.html) + - [`psycopg2`](http://initd.org/psycopg/docs/install.html) + - [`libarchive-c`](https://pypi.org/project/libarchive-c/) ## Usage @@ -18,14 +19,14 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch `Badges.xml`, `Votes.xml`, `Posts.xml`, `Users.xml`, `Tags.xml`. - In some old dumps, the cases in the filenames are different. - Execute in the current folder (in parallel, if desired): - - `python load_into_pg.py Badges` - - `python load_into_pg.py Posts` - - `python load_into_pg.py Tags` (not present in earliest dumps) - - `python load_into_pg.py Users` - - `python load_into_pg.py Votes` - - `python load_into_pg.py PostLinks` - - `python load_into_pg.py PostHistory` - - `python load_into_pg.py Comments` + - `python load_into_pg.py -t Badges` + - `python load_into_pg.py -t Posts` + - `python load_into_pg.py -t Tags` (not present in earliest dumps) + - `python load_into_pg.py -t Users` + - `python load_into_pg.py -t Votes` + - `python load_into_pg.py -t PostLinks` + - `python load_into_pg.py -t PostHistory` + - `python load_into_pg.py -t Comments` - Finally, after all the initial tables have been created: - `psql stackoverflow < ./sql/final_post.sql` - If you used a different database name, make sure to use that instead of @@ -34,7 +35,25 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch - `psql stackoverflow < ./sql/optional_post.sql` - Again, remember to user the correct database name here, if not `stackoverflow`. -## Caveats +## Loading a complete stackexchange project + +You can use the script to download a given stackexchange compressed file from +[archive.org](https://ia800107.us.archive.org/27/items/stackexchange/) and load +all the tables at once, using the `-s` switch. + +You will need the `urllib` and `libarchive` modules. + +If you give a schema name using the `-n` switch, all the tables will be moved +to the given schema. This schema will be created in the script. + +To load the _dba.stackexchange.com_ project in the `dba` schema, you would execute: +`./load_into_pg.py -s dba -n dba` + +The paths are not changed in the final scripts `sql/final_post.sql` and +`sql/optional_post.sql`. To run them, first set the _search_path_ to your +schema name: `SET search_path TO ;` + +## Caveats and TODOs - It prepares some indexes and views which may not be necessary for your analysis. - The `Body` field in `Posts` table is NOT populated by default. You have to use `--with-post-body` argument to include it. diff --git a/load_into_pg.py b/load_into_pg.py index 66b651d..d7c3158 100755 --- a/load_into_pg.py +++ b/load_into_pg.py @@ -1,8 +1,10 @@ #!/usr/bin/env python + import sys import time import argparse import psycopg2 as pg +import os import row_processor as Processor import six import json @@ -12,20 +14,71 @@ ('Posts', 'ViewCount'): "NULLIF(%(ViewCount)s, '')::int" } +# part of the file already downloaded +file_part = None + + +def show_progress(block_num, block_size, total_size): + """Display the total size of the file to download and the progress in percent""" + global file_part + if file_part is None: + suffixes = ['B', 'KB', 'MB', 'GB', 'TB'] + suffixIndex = 0 + pp_size = total_size + while pp_size > 1024: + suffixIndex += 1 # Increment the index of the suffix + pp_size = pp_size / 1024.0 # Apply the division + six.print_('Total file size is: {0:.1f} {1}' + .format(pp_size, suffixes[suffixIndex])) + six.print_("0 % of the file downloaded ...\r", end="", flush=True) + file_part = 0 + + downloaded = block_num * block_size + if downloaded < total_size: + percent = 100 * downloaded / total_size + if percent - file_part > 1: + file_part = percent + six.print_("{0} % of the file downloaded ...\r".format(int(percent)), end="", flush=True) + else: + file_part = None + six.print_("") + + +def buildConnectionString(dbname, mbHost, mbPort, mbUsername, mbPassword): + dbConnectionParam = "dbname={}".format(dbname) + + if mbPort is not None: + dbConnectionParam += ' port={}'.format(mbPort) + + if mbHost is not None: + dbConnectionParam += ' host={}'.format(mbHost) + + # TODO Is the escaping done here correct? + if mbUsername is not None: + dbConnectionParam += ' user={}'.format(mbUsername) + + # TODO Is the escaping done here correct? + if mbPassword is not None: + dbConnectionParam += ' password={}'.format(mbPassword) + return dbConnectionParam + + def _makeDefValues(keys): """Returns a dictionary containing None for all keys.""" - return dict(( (k, None) for k in keys )) + return dict(((k, None) for k in keys)) + def _createMogrificationTemplate(table, keys, insertJson): """Return the template string for mogrification for the given keys.""" - table_keys = ', '.join( [ '%(' + k + ')s' if (table, k) not in specialRules - else specialRules[table, k] - for k in keys ]) + table_keys = ', '.join(['%(' + k + ')s' if (table, k) not in specialRules + else specialRules[table, k] + for k in keys]) if insertJson: return ('(' + table_keys + ', %(jsonfield)s' + ')') else: return ('(' + table_keys + ')') + def _createCmdTuple(cursor, keys, templ, attribs, insertJson): """Use the cursor to mogrify a tuple of data. The passed data in `attribs` is augmented with default data (NULLs) and the @@ -37,14 +90,14 @@ def _createCmdTuple(cursor, keys, templ, attribs, insertJson): defs.update(attribs) if insertJson: - dict_attribs = { } + dict_attribs = {} for name, value in attribs.items(): dict_attribs[name] = value defs['jsonfield'] = json.dumps(dict_attribs) - values_to_insert = cursor.mogrify(templ, defs) return cursor.mogrify(templ, defs) + def _getTableKeys(table): """Return an array of the keys for a given table""" keys = None @@ -131,26 +184,27 @@ def _getTableKeys(table): ] elif table == 'PostHistory': keys = [ - 'Id', - 'PostHistoryTypeId', - 'PostId', - 'RevisionGUID', - 'CreationDate', - 'UserId', - 'Text' + 'Id' + , 'PostHistoryTypeId' + , 'PostId' + , 'RevisionGUID' + , 'CreationDate' + , 'UserId' + , 'Text' ] elif table == 'Comments': keys = [ - 'Id', - 'PostId', - 'Score', - 'Text', - 'CreationDate', - 'UserId', + 'Id' + , 'PostId' + , 'Score' + , 'Text' + , 'CreationDate' + , 'UserId' ] return keys -def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, mbUsername, mbPassword): + +def handleTable(table, insertJson, createFk, mbDbFile, dbConnectionParam): """Handle the table including the post/pre processing.""" keys = _getTableKeys(table) dbFile = mbDbFile if mbDbFile is not None else table + '.xml' @@ -165,23 +219,6 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m six.print_("Could not load pre/post/fk sql. Are you running from the correct path?", file=sys.stderr) sys.exit(-1) - dbConnectionParam = "dbname={}".format(dbname) - - if mbPort is not None: - dbConnectionParam += ' port={}'.format(mbPort) - - if mbHost is not None: - dbConnectionParam += ' host={}'.format(mbHost) - - # TODO Is the escaping done here correct? - if mbUsername is not None: - dbConnectionParam += ' user={}'.format(mbUsername) - - # TODO Is the escaping done here correct? - if mbPassword is not None: - dbConnectionParam += ' password={}'.format(mbPassword) - - try: with pg.connect(dbConnectionParam) as conn: with conn.cursor() as cur: @@ -199,16 +236,16 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m six.print_('Processing data ...') for rows in Processor.batch(Processor.parse(xml), 500): valuesStr = ',\n'.join( - [ _createCmdTuple(cur, keys, tmpl, row_attribs, insertJson).decode('utf-8') - for row_attribs in rows - ] - ) + [_createCmdTuple(cur, keys, tmpl, row_attribs, insertJson).decode('utf-8') + for row_attribs in rows + ] + ) if len(valuesStr) > 0: cmd = 'INSERT INTO ' + table + \ ' VALUES\n' + valuesStr + ';' cur.execute(cmd) conn.commit() - six.print_('Table {0} processing took {1:.1f} seconds'.format(table, time.time() - start_time)) + six.print_('Table \'{0}\' processing took {1:.1f} seconds'.format(table, time.time() - start_time)) # Post-processing (creation of indexes) start_time = time.time() @@ -216,15 +253,15 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m if post != '': cur.execute(post) conn.commit() - six.print_('Post processing took {} seconds'.format(time.time() - start_time)) + six.print_('Post processing took {0:.1f} seconds'.format(time.time() - start_time)) if createFk: # fk-processing (creation of foreign keys) start_time = time.time() - six.print_('fk processing ...') + six.print_('Foreign Key processing ...') if post != '': cur.execute(fk) conn.commit() - six.print_('fk processing took {} seconds'.format(time.time() - start_time)) + six.print_('Foreign Key processing took {0:.1f} seconds'.format(time.time() - start_time)) except IOError as e: six.print_("Could not read from file {}.".format(dbFile), file=sys.stderr) @@ -237,80 +274,184 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m six.print_("Warning from the database.", file=sys.stderr) six.print_("pg.Warning: {0}".format(str(w)), file=sys.stderr) + +def moveTableToSchema(table, schemaName, dbConnectionParam): + try: + with pg.connect(dbConnectionParam) as conn: + with conn.cursor() as cur: + # create the schema + cur.execute('CREATE SCHEMA IF NOT EXISTS ' + schemaName + ';') + conn.commit() + # move the table to the right schema + cur.execute('ALTER TABLE ' + table + ' SET SCHEMA ' + schemaName + ';') + conn.commit() + except pg.Error as e: + six.print_("Error in dealing with the database.", file=sys.stderr) + six.print_("pg.Error ({0}): {1}".format(e.pgcode, e.pgerror), file=sys.stderr) + six.print_(str(e), file=sys.stderr) + except pg.Warning as w: + six.print_("Warning from the database.", file=sys.stderr) + six.print_("pg.Warning: {0}".format(str(w)), file=sys.stderr) + ############################################################# + parser = argparse.ArgumentParser() -parser.add_argument( 'table' +parser.add_argument('-t', '--table' , help = 'The table to work on.' , choices = ['Users', 'Badges', 'Posts', 'Tags', 'Votes', 'PostLinks', 'PostHistory', 'Comments'] + , default = None ) -parser.add_argument( '-d', '--dbname' +parser.add_argument('-d', '--dbname' , help = 'Name of database to create the table in. The database must exist.' , default = 'stackoverflow' ) -parser.add_argument( '-f', '--file' +parser.add_argument('-f', '--file' , help = 'Name of the file to extract data from.' , default = None ) -parser.add_argument( '-u', '--username' +parser.add_argument('-s', '--so-project' + , help = 'StackExchange project to load.' + , default = None + ) + +parser.add_argument('--archive-url' + , help = 'URL of the archive directory to retrieve.' + , default = 'https://ia800107.us.archive.org/27/items/stackexchange' + ) + +parser.add_argument('-k', '--keep-archive' + , help = 'Will preserve the downloaded archive instead of deleting it.' + , action = 'store_true' + , default = False + ) + +parser.add_argument('-u', '--username' , help = 'Username for the database.' , default = None ) -parser.add_argument( '-p', '--password' +parser.add_argument('-p', '--password' , help = 'Password for the database.' , default = None ) -parser.add_argument( '-P', '--port' +parser.add_argument('-P', '--port' , help = 'Port to connect with the database on.' , default = None ) -parser.add_argument( '-H', '--host' +parser.add_argument('-H', '--host' , help = 'Hostname for the database.' , default = None ) -parser.add_argument( '--with-post-body' - , help = 'Import the posts with the post body. Only used if importing Posts.xml' - , action = 'store_true' +parser.add_argument('--with-post-body' + , help = 'Import the posts with the post body. Only used if importing Posts.xml' + , action = 'store_true' , default = False ) -parser.add_argument( '-j', '--insert-json' +parser.add_argument('-j', '--insert-json' , help = 'Insert raw data as JSON.' - , action = 'store_true' + , action = 'store_true' , default = False ) -parser.add_argument( '--foreign-keys' +parser.add_argument('-n', '--schema-name' + , help = 'Use specific schema.' + , default = 'public' + ) + +parser.add_argument('--foreign-keys' , help = 'Create foreign keys.' - , action = 'store_true' + , action = 'store_true' , default = False ) args = parser.parse_args() -table = args.table - try: # Python 2/3 compatibility input = raw_input except NameError: pass +dbConnectionParam = buildConnectionString(args.dbname, args.host, args.port, args.username, args.password) + +# load given file in table +if args.file and args.table: + table = args.table -if table == 'Posts': - # If the user has not explicitly asked for loading the body, we replace it with NULL - if not args.with_post_body: - specialRules[('Posts', 'Body')] = 'NULL' + if table == 'Posts': + # If the user has not explicitly asked for loading the body, we replace it with NULL + if not args.with_post_body: + specialRules[('Posts', 'Body')] = 'NULL' + + choice = input('This will drop the {} table. Are you sure [y/n]?'.format(table)) + if len(choice) > 0 and choice[0].lower() == 'y': + handleTable(table, args.insert_json, args.foreign_keys, args.file, dbConnectionParam) + else: + six.print_("Cancelled.") + if args.schema_name != 'public': + moveTableToSchema(table, args.schema_name, dbConnectionParam) + exit(0) + +# load a project +elif args.so_project: + import libarchive + import tempfile + + filepath = None + temp_dir = None + if args.file: + filepath = args.file + url = filepath + else: + # download the 7z archive in tempdir + file_name = args.so_project + '.stackexchange.com.7z' + url = '{0}/{1}'.format(args.archive_url, file_name) + temp_dir = tempfile.mkdtemp(prefix='so_') + filepath = os.path.join(temp_dir, file_name) + six.print_('Downloading the archive in {0}'.format(filepath)) + six.print_('please be patient ...') + try: + six.moves.urllib.request.urlretrieve(url, filepath, show_progress) + except Exception as e: + six.print_('Error: impossible to download the {0} archive ({1})'.format(url, e)) + exit(1) + + try: + libarchive.extract_file(filepath) + except Exception as e: + six.print_('Error: impossible to extract the {0} archive ({1})'.format(url, e)) + exit(1) + + tables = ['Tags', 'Users', 'Badges', 'Posts', 'Comments', + 'Votes', 'PostLinks', 'PostHistory'] + + for table in tables: + six.print_('Load {0}.xml file'.format(table)) + handleTable(table, args.insert_json, args.foreign_keys, None, dbConnectionParam) + # remove file + os.remove(table + '.xml') + + if not args.keep_archive: + os.remove(filepath) + if temp_dir: + # remove the archive and the temporary directory + os.rmdir(temp_dir) + else: + six.print_("Archive '{0}' deleted".format(filepath)) + + if args.schema_name != 'public': + for table in tables: + moveTableToSchema(table, args.schema_name, dbConnectionParam) + exit(0) -choice = input('This will drop the {} table. Are you sure [y/n]? '.format(table)) -if len(choice) > 0 and choice[0].lower() == 'y': - handleTable(table, args.insert_json, args.foreign_keys, args.dbname, args.file, args.host, args.port, args.username, args.password) else: - six.print_("Cancelled.") + six.print_("Error: you must either use '-f' and '-t' arguments or the '-s' argument.") + parser.print_help() diff --git a/sql/Votes_pre.sql b/sql/Votes_pre.sql index 29aebe0..3ed0b53 100644 --- a/sql/Votes_pre.sql +++ b/sql/Votes_pre.sql @@ -1,7 +1,7 @@ DROP TABLE IF EXISTS Votes CASCADE; CREATE TABLE Votes ( Id int PRIMARY KEY , - PostId int not NULL , + PostId int , -- not NULL , VoteTypeId int not NULL , UserId int , CreationDate timestamp not NULL ,