1
1
#!/usr/bin/env python
2
+
2
3
import sys
3
4
import time
4
5
import argparse
5
6
import psycopg2 as pg
7
+ import os
6
8
import row_processor as Processor
7
9
import six
8
10
import json
12
14
('Posts' , 'ViewCount' ): "NULLIF(%(ViewCount)s, '')::int"
13
15
}
14
16
17
+ # part of the file already downloaded
18
+ file_part = None
19
+
20
+
21
+ def show_progress (block_num , block_size , total_size ):
22
+ """Display the total size of the file to download and the progress in percent"""
23
+ global file_part
24
+ if file_part is None :
25
+ suffixes = ['B' , 'KB' , 'MB' , 'GB' , 'TB' ]
26
+ suffixIndex = 0
27
+ pp_size = total_size
28
+ while pp_size > 1024 :
29
+ suffixIndex += 1 # Increment the index of the suffix
30
+ pp_size = pp_size / 1024.0 # Apply the division
31
+ six .print_ ('Total file size is: {0:.1f} {1}'
32
+ .format (pp_size , suffixes [suffixIndex ]))
33
+ six .print_ ("0 % of the file downloaded ...\r " , end = "" , flush = True )
34
+ file_part = 0
35
+
36
+ downloaded = block_num * block_size
37
+ if downloaded < total_size :
38
+ percent = 100 * downloaded / total_size
39
+ if percent - file_part > 1 :
40
+ file_part = percent
41
+ six .print_ ("{0} % of the file downloaded ...\r " .format (int (percent )), end = "" , flush = True )
42
+ else :
43
+ file_part = None
44
+ six .print_ ("" )
45
+
46
+
47
+ def buildConnectionString (dbname , mbHost , mbPort , mbUsername , mbPassword ):
48
+ dbConnectionParam = "dbname={}" .format (dbname )
49
+
50
+ if mbPort is not None :
51
+ dbConnectionParam += ' port={}' .format (mbPort )
52
+
53
+ if mbHost is not None :
54
+ dbConnectionParam += ' host={}' .format (mbHost )
55
+
56
+ # TODO Is the escaping done here correct?
57
+ if mbUsername is not None :
58
+ dbConnectionParam += ' user={}' .format (mbUsername )
59
+
60
+ # TODO Is the escaping done here correct?
61
+ if mbPassword is not None :
62
+ dbConnectionParam += ' password={}' .format (mbPassword )
63
+ return dbConnectionParam
64
+
65
+
15
66
def _makeDefValues (keys ):
16
67
"""Returns a dictionary containing None for all keys."""
17
- return dict (( (k , None ) for k in keys ))
68
+ return dict (((k , None ) for k in keys ))
69
+
18
70
19
71
def _createMogrificationTemplate (table , keys , insertJson ):
20
72
"""Return the template string for mogrification for the given keys."""
21
- table_keys = ', ' .join ( [ '%(' + k + ')s' if (table , k ) not in specialRules
22
- else specialRules [table , k ]
23
- for k in keys ])
73
+ table_keys = ', ' .join ([ '%(' + k + ')s' if (table , k ) not in specialRules
74
+ else specialRules [table , k ]
75
+ for k in keys ])
24
76
if insertJson :
25
77
return ('(' + table_keys + ', %(jsonfield)s' + ')' )
26
78
else :
27
79
return ('(' + table_keys + ')' )
28
80
81
+
29
82
def _createCmdTuple (cursor , keys , templ , attribs , insertJson ):
30
83
"""Use the cursor to mogrify a tuple of data.
31
84
The passed data in `attribs` is augmented with default data (NULLs) and the
@@ -37,14 +90,14 @@ def _createCmdTuple(cursor, keys, templ, attribs, insertJson):
37
90
defs .update (attribs )
38
91
39
92
if insertJson :
40
- dict_attribs = { }
93
+ dict_attribs = {}
41
94
for name , value in attribs .items ():
42
95
dict_attribs [name ] = value
43
96
defs ['jsonfield' ] = json .dumps (dict_attribs )
44
97
45
- values_to_insert = cursor .mogrify (templ , defs )
46
98
return cursor .mogrify (templ , defs )
47
99
100
+
48
101
def _getTableKeys (table ):
49
102
"""Return an array of the keys for a given table"""
50
103
keys = None
@@ -131,26 +184,27 @@ def _getTableKeys(table):
131
184
]
132
185
elif table == 'PostHistory' :
133
186
keys = [
134
- 'Id' ,
135
- 'PostHistoryTypeId' ,
136
- 'PostId' ,
137
- 'RevisionGUID' ,
138
- 'CreationDate' ,
139
- 'UserId' ,
140
- 'Text'
187
+ 'Id'
188
+ , 'PostHistoryTypeId'
189
+ , 'PostId'
190
+ , 'RevisionGUID'
191
+ , 'CreationDate'
192
+ , 'UserId'
193
+ , 'Text'
141
194
]
142
195
elif table == 'Comments' :
143
196
keys = [
144
- 'Id' ,
145
- 'PostId' ,
146
- 'Score' ,
147
- 'Text' ,
148
- 'CreationDate' ,
149
- 'UserId' ,
197
+ 'Id'
198
+ , 'PostId'
199
+ , 'Score'
200
+ , 'Text'
201
+ , 'CreationDate'
202
+ , 'UserId'
150
203
]
151
204
return keys
152
205
153
- def handleTable (table , insertJson , createFk , dbname , mbDbFile , mbHost , mbPort , mbUsername , mbPassword ):
206
+
207
+ def handleTable (table , insertJson , createFk , mbDbFile , dbConnectionParam ):
154
208
"""Handle the table including the post/pre processing."""
155
209
keys = _getTableKeys (table )
156
210
dbFile = mbDbFile if mbDbFile is not None else table + '.xml'
@@ -165,23 +219,6 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
165
219
six .print_ ("Could not load pre/post/fk sql. Are you running from the correct path?" , file = sys .stderr )
166
220
sys .exit (- 1 )
167
221
168
- dbConnectionParam = "dbname={}" .format (dbname )
169
-
170
- if mbPort is not None :
171
- dbConnectionParam += ' port={}' .format (mbPort )
172
-
173
- if mbHost is not None :
174
- dbConnectionParam += ' host={}' .format (mbHost )
175
-
176
- # TODO Is the escaping done here correct?
177
- if mbUsername is not None :
178
- dbConnectionParam += ' user={}' .format (mbUsername )
179
-
180
- # TODO Is the escaping done here correct?
181
- if mbPassword is not None :
182
- dbConnectionParam += ' password={}' .format (mbPassword )
183
-
184
-
185
222
try :
186
223
with pg .connect (dbConnectionParam ) as conn :
187
224
with conn .cursor () as cur :
@@ -199,32 +236,32 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
199
236
six .print_ ('Processing data ...' )
200
237
for rows in Processor .batch (Processor .parse (xml ), 500 ):
201
238
valuesStr = ',\n ' .join (
202
- [ _createCmdTuple (cur , keys , tmpl , row_attribs , insertJson ).decode ('utf-8' )
203
- for row_attribs in rows
204
- ]
205
- )
239
+ [ _createCmdTuple (cur , keys , tmpl , row_attribs , insertJson ).decode ('utf-8' )
240
+ for row_attribs in rows
241
+ ]
242
+ )
206
243
if len (valuesStr ) > 0 :
207
244
cmd = 'INSERT INTO ' + table + \
208
245
' VALUES\n ' + valuesStr + ';'
209
246
cur .execute (cmd )
210
247
conn .commit ()
211
- six .print_ ('Table {0} processing took {1:.1f} seconds' .format (table , time .time () - start_time ))
248
+ six .print_ ('Table \' {0}\' processing took {1:.1f} seconds' .format (table , time .time () - start_time ))
212
249
213
250
# Post-processing (creation of indexes)
214
251
start_time = time .time ()
215
252
six .print_ ('Post processing ...' )
216
253
if post != '' :
217
254
cur .execute (post )
218
255
conn .commit ()
219
- six .print_ ('Post processing took {} seconds' .format (time .time () - start_time ))
256
+ six .print_ ('Post processing took {0:.1f } seconds' .format (time .time () - start_time ))
220
257
if createFk :
221
258
# fk-processing (creation of foreign keys)
222
259
start_time = time .time ()
223
- six .print_ ('fk processing ...' )
260
+ six .print_ ('Foreign Key processing ...' )
224
261
if post != '' :
225
262
cur .execute (fk )
226
263
conn .commit ()
227
- six .print_ ('fk processing took {} seconds' .format (time .time () - start_time ))
264
+ six .print_ ('Foreign Key processing took {0:.1f } seconds' .format (time .time () - start_time ))
228
265
229
266
except IOError as e :
230
267
six .print_ ("Could not read from file {}." .format (dbFile ), file = sys .stderr )
@@ -237,80 +274,184 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
237
274
six .print_ ("Warning from the database." , file = sys .stderr )
238
275
six .print_ ("pg.Warning: {0}" .format (str (w )), file = sys .stderr )
239
276
277
+
278
+ def moveTableToSchema (table , schemaName , dbConnectionParam ):
279
+ try :
280
+ with pg .connect (dbConnectionParam ) as conn :
281
+ with conn .cursor () as cur :
282
+ # create the schema
283
+ cur .execute ('CREATE SCHEMA IF NOT EXISTS ' + schemaName + ';' )
284
+ conn .commit ()
285
+ # move the table to the right schema
286
+ cur .execute ('ALTER TABLE ' + table + ' SET SCHEMA ' + schemaName + ';' )
287
+ conn .commit ()
288
+ except pg .Error as e :
289
+ six .print_ ("Error in dealing with the database." , file = sys .stderr )
290
+ six .print_ ("pg.Error ({0}): {1}" .format (e .pgcode , e .pgerror ), file = sys .stderr )
291
+ six .print_ (str (e ), file = sys .stderr )
292
+ except pg .Warning as w :
293
+ six .print_ ("Warning from the database." , file = sys .stderr )
294
+ six .print_ ("pg.Warning: {0}" .format (str (w )), file = sys .stderr )
295
+
240
296
#############################################################
241
297
298
+
242
299
parser = argparse .ArgumentParser ()
243
- parser .add_argument ( ' table'
300
+ parser .add_argument ('-t' , '-- table'
244
301
, help = 'The table to work on.'
245
302
, choices = ['Users' , 'Badges' , 'Posts' , 'Tags' , 'Votes' , 'PostLinks' , 'PostHistory' , 'Comments' ]
303
+ , default = None
246
304
)
247
305
248
- parser .add_argument ( '-d' , '--dbname'
306
+ parser .add_argument ('-d' , '--dbname'
249
307
, help = 'Name of database to create the table in. The database must exist.'
250
308
, default = 'stackoverflow'
251
309
)
252
310
253
- parser .add_argument ( '-f' , '--file'
311
+ parser .add_argument ('-f' , '--file'
254
312
, help = 'Name of the file to extract data from.'
255
313
, default = None
256
314
)
257
315
258
- parser .add_argument ( '-u' , '--username'
316
+ parser .add_argument ('-s' , '--so-project'
317
+ , help = 'StackExchange project to load.'
318
+ , default = None
319
+ )
320
+
321
+ parser .add_argument ('--archive-url'
322
+ , help = 'URL of the archive directory to retrieve.'
323
+ , default = 'https://ia800107.us.archive.org/27/items/stackexchange'
324
+ )
325
+
326
+ parser .add_argument ('-k' , '--keep-archive'
327
+ , help = 'Will preserve the downloaded archive instead of deleting it.'
328
+ , action = 'store_true'
329
+ , default = False
330
+ )
331
+
332
+ parser .add_argument ('-u' , '--username'
259
333
, help = 'Username for the database.'
260
334
, default = None
261
335
)
262
336
263
- parser .add_argument ( '-p' , '--password'
337
+ parser .add_argument ('-p' , '--password'
264
338
, help = 'Password for the database.'
265
339
, default = None
266
340
)
267
341
268
- parser .add_argument ( '-P' , '--port'
342
+ parser .add_argument ('-P' , '--port'
269
343
, help = 'Port to connect with the database on.'
270
344
, default = None
271
345
)
272
346
273
- parser .add_argument ( '-H' , '--host'
347
+ parser .add_argument ('-H' , '--host'
274
348
, help = 'Hostname for the database.'
275
349
, default = None
276
350
)
277
351
278
- parser .add_argument ( '--with-post-body'
279
- , help = 'Import the posts with the post body. Only used if importing Posts.xml'
280
- , action = 'store_true'
352
+ parser .add_argument ('--with-post-body'
353
+ , help = 'Import the posts with the post body. Only used if importing Posts.xml'
354
+ , action = 'store_true'
281
355
, default = False
282
356
)
283
357
284
- parser .add_argument ( '-j' , '--insert-json'
358
+ parser .add_argument ('-j' , '--insert-json'
285
359
, help = 'Insert raw data as JSON.'
286
- , action = 'store_true'
360
+ , action = 'store_true'
287
361
, default = False
288
362
)
289
363
290
- parser .add_argument ( '--foreign-keys'
364
+ parser .add_argument ('-n' , '--schema-name'
365
+ , help = 'Use specific schema.'
366
+ , default = 'public'
367
+ )
368
+
369
+ parser .add_argument ('--foreign-keys'
291
370
, help = 'Create foreign keys.'
292
- , action = 'store_true'
371
+ , action = 'store_true'
293
372
, default = False
294
373
)
295
374
296
375
args = parser .parse_args ()
297
376
298
- table = args .table
299
-
300
377
try :
301
378
# Python 2/3 compatibility
302
379
input = raw_input
303
380
except NameError :
304
381
pass
305
382
383
+ dbConnectionParam = buildConnectionString (args .dbname , args .host , args .port , args .username , args .password )
384
+
385
+ # load given file in table
386
+ if args .file and args .table :
387
+ table = args .table
306
388
307
- if table == 'Posts' :
308
- # If the user has not explicitly asked for loading the body, we replace it with NULL
309
- if not args .with_post_body :
310
- specialRules [('Posts' , 'Body' )] = 'NULL'
389
+ if table == 'Posts' :
390
+ # If the user has not explicitly asked for loading the body, we replace it with NULL
391
+ if not args .with_post_body :
392
+ specialRules [('Posts' , 'Body' )] = 'NULL'
393
+
394
+ choice = input ('This will drop the {} table. Are you sure [y/n]?' .format (table ))
395
+ if len (choice ) > 0 and choice [0 ].lower () == 'y' :
396
+ handleTable (table , args .insert_json , args .foreign_keys , args .file , dbConnectionParam )
397
+ else :
398
+ six .print_ ("Cancelled." )
399
+ if args .schema_name != 'public' :
400
+ moveTableToSchema (table , args .schema_name , dbConnectionParam )
401
+ exit (0 )
402
+
403
+ # load a project
404
+ elif args .so_project :
405
+ import libarchive
406
+ import tempfile
407
+
408
+ filepath = None
409
+ temp_dir = None
410
+ if args .file :
411
+ filepath = args .file
412
+ url = filepath
413
+ else :
414
+ # download the 7z archive in tempdir
415
+ file_name = args .so_project + '.stackexchange.com.7z'
416
+ url = '{0}/{1}' .format (args .archive_url , file_name )
417
+ temp_dir = tempfile .mkdtemp (prefix = 'so_' )
418
+ filepath = os .path .join (temp_dir , file_name )
419
+ six .print_ ('Downloading the archive in {0}' .format (filepath ))
420
+ six .print_ ('please be patient ...' )
421
+ try :
422
+ six .moves .urllib .request .urlretrieve (url , filepath , show_progress )
423
+ except Exception as e :
424
+ six .print_ ('Error: impossible to download the {0} archive ({1})' .format (url , e ))
425
+ exit (1 )
426
+
427
+ try :
428
+ libarchive .extract_file (filepath )
429
+ except Exception as e :
430
+ six .print_ ('Error: impossible to extract the {0} archive ({1})' .format (url , e ))
431
+ exit (1 )
432
+
433
+ tables = ['Tags' , 'Users' , 'Badges' , 'Posts' , 'Comments' ,
434
+ 'Votes' , 'PostLinks' , 'PostHistory' ]
435
+
436
+ for table in tables :
437
+ six .print_ ('Load {0}.xml file' .format (table ))
438
+ handleTable (table , args .insert_json , args .foreign_keys , None , dbConnectionParam )
439
+ # remove file
440
+ os .remove (table + '.xml' )
441
+
442
+ if not args .keep_archive :
443
+ os .remove (filepath )
444
+ if temp_dir :
445
+ # remove the archive and the temporary directory
446
+ os .rmdir (temp_dir )
447
+ else :
448
+ six .print_ ("Archive '{0}' deleted" .format (filepath ))
449
+
450
+ if args .schema_name != 'public' :
451
+ for table in tables :
452
+ moveTableToSchema (table , args .schema_name , dbConnectionParam )
453
+ exit (0 )
311
454
312
- choice = input ('This will drop the {} table. Are you sure [y/n]? ' .format (table ))
313
- if len (choice ) > 0 and choice [0 ].lower () == 'y' :
314
- handleTable (table , args .insert_json , args .foreign_keys , args .dbname , args .file , args .host , args .port , args .username , args .password )
315
455
else :
316
- six .print_ ("Cancelled." )
456
+ six .print_ ("Error: you must either use '-f' and '-t' arguments or the '-s' argument." )
457
+ parser .print_help ()
0 commit comments