1
- #!/usr/bin/env python
1
+ #!/usr/bin/env python3
2
2
import sys
3
3
import time
4
4
import argparse
5
5
import psycopg2 as pg
6
+ import os
6
7
import row_processor as Processor
7
8
import six
8
9
import json
12
13
('Posts' , 'ViewCount' ): "NULLIF(%(ViewCount)s, '')::int"
13
14
}
14
15
16
+ # part of the file already downloaded
17
+ file_part = None
18
+
19
+ def show_progress (block_num , block_size , total_size ):
20
+ """Display the total size of the file to download and the progess in percent"""
21
+ global file_part
22
+ if file_part is None :
23
+ suffixes = ['B' ,'KB' ,'MB' ,'GB' ,'TB' ]
24
+ suffixIndex = 0
25
+ pp_size = total_size
26
+ while pp_size > 1024 :
27
+ suffixIndex += 1 #increment the index of the suffix
28
+ pp_size = pp_size / 1024.0 #apply the division
29
+ six .print_ ('Total file size is: {0:.1f} {1}' .format (pp_size ,suffixes [suffixIndex ]))
30
+ six .print_ ("0 % of the file downloaded ...\r " , end = "" , flush = True )
31
+ file_part = 0
32
+
33
+ downloaded = block_num * block_size
34
+ if downloaded < total_size :
35
+ percent = 100 * downloaded / total_size
36
+ if percent - file_part > 1 :
37
+ file_part = percent
38
+ six .print_ ("{0} % of the file downloaded ...\r " .format (int (percent )), end = "" , flush = True )
39
+ else :
40
+ file_part = None
41
+ six .print_ ("" )
42
+
43
+ def buildConnectionString (dbname , mbHost , mbPort , mbUsername , mbPassword ):
44
+ dbConnectionParam = "dbname={}" .format (dbname )
45
+
46
+ if mbPort is not None :
47
+ dbConnectionParam += ' port={}' .format (mbPort )
48
+
49
+ if mbHost is not None :
50
+ dbConnectionParam += ' host={}' .format (mbHost )
51
+
52
+ # TODO Is the escaping done here correct?
53
+ if mbUsername is not None :
54
+ dbConnectionParam += ' user={}' .format (mbUsername )
55
+
56
+ # TODO Is the escaping done here correct?
57
+ if mbPassword is not None :
58
+ dbConnectionParam += ' password={}' .format (mbPassword )
59
+ return dbConnectionParam
60
+
15
61
def _makeDefValues (keys ):
16
62
"""Returns a dictionary containing None for all keys."""
17
63
return dict (( (k , None ) for k in keys ))
@@ -150,7 +196,7 @@ def _getTableKeys(table):
150
196
]
151
197
return keys
152
198
153
- def handleTable (table , insertJson , createFk , dbname , mbDbFile , mbHost , mbPort , mbUsername , mbPassword ):
199
+ def handleTable (table , insertJson , createFk , mbDbFile , dbConnectionParam ):
154
200
"""Handle the table including the post/pre processing."""
155
201
keys = _getTableKeys (table )
156
202
dbFile = mbDbFile if mbDbFile is not None else table + '.xml'
@@ -165,23 +211,6 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
165
211
six .print_ ("Could not load pre/post/fk sql. Are you running from the correct path?" , file = sys .stderr )
166
212
sys .exit (- 1 )
167
213
168
- dbConnectionParam = "dbname={}" .format (dbname )
169
-
170
- if mbPort is not None :
171
- dbConnectionParam += ' port={}' .format (mbPort )
172
-
173
- if mbHost is not None :
174
- dbConnectionParam += ' host={}' .format (mbHost )
175
-
176
- # TODO Is the escaping done here correct?
177
- if mbUsername is not None :
178
- dbConnectionParam += ' user={}' .format (mbUsername )
179
-
180
- # TODO Is the escaping done here correct?
181
- if mbPassword is not None :
182
- dbConnectionParam += ' password={}' .format (mbPassword )
183
-
184
-
185
214
try :
186
215
with pg .connect (dbConnectionParam ) as conn :
187
216
with conn .cursor () as cur :
@@ -208,7 +237,7 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
208
237
' VALUES\n ' + valuesStr + ';'
209
238
cur .execute (cmd )
210
239
conn .commit ()
211
- six .print_ ('Table {0} processing took {1:.1f} seconds' .format (table , time .time () - start_time ))
240
+ six .print_ ('Table \' {0}\' processing took {1:.1f} seconds' .format (table , time .time () - start_time ))
212
241
213
242
# Post-processing (creation of indexes)
214
243
start_time = time .time ()
@@ -237,12 +266,32 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
237
266
six .print_ ("Warning from the database." , file = sys .stderr )
238
267
six .print_ ("pg.Warning: {0}" .format (str (w )), file = sys .stderr )
239
268
269
+
270
+ def moveTableToSchema (table , schemaName , dbConnectionParam ):
271
+ try :
272
+ with pg .connect (dbConnectionParam ) as conn :
273
+ with conn .cursor () as cur :
274
+ # create the schema
275
+ cur .execute ('CREATE SCHEMA IF NOT EXISTS ' + schemaName + ';' )
276
+ conn .commit ()
277
+ # move the table to the right schema
278
+ cur .execute ('ALTER TABLE ' + table + ' SET SCHEMA ' + schemaName + ';' )
279
+ conn .commit ()
280
+ except pg .Error as e :
281
+ six .print_ ("Error in dealing with the database." , file = sys .stderr )
282
+ six .print_ ("pg.Error ({0}): {1}" .format (e .pgcode , e .pgerror ), file = sys .stderr )
283
+ six .print_ (str (e ), file = sys .stderr )
284
+ except pg .Warning as w :
285
+ six .print_ ("Warning from the database." , file = sys .stderr )
286
+ six .print_ ("pg.Warning: {0}" .format (str (w )), file = sys .stderr )
287
+
240
288
#############################################################
241
289
242
290
parser = argparse .ArgumentParser ()
243
- parser .add_argument ( 'table'
291
+ parser .add_argument ( '-t' , '-- table'
244
292
, help = 'The table to work on.'
245
293
, choices = ['Users' , 'Badges' , 'Posts' , 'Tags' , 'Votes' , 'PostLinks' , 'PostHistory' , 'Comments' ]
294
+ , default = None
246
295
)
247
296
248
297
parser .add_argument ( '-d' , '--dbname'
@@ -255,6 +304,22 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
255
304
, default = None
256
305
)
257
306
307
+ parser .add_argument ( '-s' , '--so-project'
308
+ , help = 'stackexchange project to load.'
309
+ , default = None
310
+ )
311
+
312
+ parser .add_argument ( '--archive-url'
313
+ , help = 'URL of the archive directory to retrieve.'
314
+ , default = 'https://ia800107.us.archive.org/27/items/stackexchange'
315
+ )
316
+
317
+ parser .add_argument ( '-k' , '--keep-archive'
318
+ , help = 'should we keep the downloaded archive.'
319
+ , action = 'store_true'
320
+ , default = False
321
+ )
322
+
258
323
parser .add_argument ( '-u' , '--username'
259
324
, help = 'Username for the database.'
260
325
, default = None
@@ -287,6 +352,11 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
287
352
, default = False
288
353
)
289
354
355
+ parser .add_argument ( '-n' , '--schema-name'
356
+ , help = 'Use specific schema.'
357
+ , default = 'public'
358
+ )
359
+
290
360
parser .add_argument ( '--foreign-keys'
291
361
, help = 'Create foreign keys.'
292
362
, action = 'store_true'
@@ -295,22 +365,71 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
295
365
296
366
args = parser .parse_args ()
297
367
298
- table = args .table
299
-
300
368
try :
301
369
# Python 2/3 compatibility
302
370
input = raw_input
303
371
except NameError :
304
372
pass
305
373
374
+ dbConnectionParam = buildConnectionString (args .dbname , args .host , args .port , args .username , args .password )
375
+
376
+ # load given file in table
377
+ if args .file and args .table :
378
+ table = args .table
379
+
380
+ if table == 'Posts' :
381
+ # If the user has not explicitly asked for loading the body, we replace it with NULL
382
+ if not args .with_post_body :
383
+ specialRules [('Posts' , 'Body' )] = 'NULL'
384
+
385
+ choice = input ('This will drop the {} table. Are you sure [y/n]?' .format (table ))
386
+ if len (choice ) > 0 and choice [0 ].lower () == 'y' :
387
+ handleTable (table , args .insert_json , args .foreign_keys , args .file , dbConnectionParam )
388
+ else :
389
+ six .print_ ("Cancelled." )
390
+ if args .schema_name != 'public' :
391
+ moveTableToSchema (table , args .schema_name , dbConnectionParam )
392
+ exit (0 )
393
+
394
+ # load a project
395
+ elif args .so_project :
396
+ import urllib .request
397
+ import libarchive
398
+
399
+ # download the 7z archive in /tmp
400
+ file_name = args .so_project + '.stackexchange.com.7z'
401
+ url = '{0}/{1}' .format (args .archive_url , file_name )
402
+ filepath = '/tmp/' + file_name
403
+ six .print_ ('Downloading the archive, please be patient ...' )
404
+ try :
405
+ urllib .request .urlretrieve (url , filepath , show_progress )
406
+ except Exception as e :
407
+ six .print_ ('Error: impossible to download the {0} archive ({1})' .format (url , e ))
408
+ exit (1 )
409
+
410
+ try :
411
+ libarchive .extract_file (filepath )
412
+ except Exception as e :
413
+ six .print_ ('Error: impossible to extract the {0} archive ({1})' .format (url , e ))
414
+ exit (1 )
415
+
416
+ tables = [ 'Tags' , 'Users' , 'Badges' , 'Posts' , 'Comments' , 'Votes' , 'PostLinks' , 'PostHistory' ]
417
+
418
+ for table in tables :
419
+ six .print_ ('Load {0}.xml file' .format (table ))
420
+ handleTable (table , args .insert_json , args .foreign_keys , args .file , dbConnectionParam )
421
+ # remove file
422
+ os .remove (table + '.xml' )
423
+
424
+ if not args .keep_archive :
425
+ # remove archive
426
+ os .remove (filepath )
306
427
307
- if table == 'Posts ' :
308
- # If the user has not explicitly asked for loading the body, we replace it with NULL
309
- if not args .with_post_body :
310
- specialRules [( 'Posts' , 'Body' )] = 'NULL'
428
+ if args . schema_name != 'public ' :
429
+ for table in tables :
430
+ moveTableToSchema ( table , args .schema_name , dbConnectionParam )
431
+ exit ( 0 )
311
432
312
- choice = input ('This will drop the {} table. Are you sure [y/n]? ' .format (table ))
313
- if len (choice ) > 0 and choice [0 ].lower () == 'y' :
314
- handleTable (table , args .insert_json , args .foreign_keys , args .dbname , args .file , args .host , args .port , args .username , args .password )
315
433
else :
316
- six .print_ ("Cancelled." )
434
+ six .print_ ("Error: you must either use '-f' and '-t' arguments or the '-s' argument." )
435
+ parser .print_help ()
0 commit comments