-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtweetset_loader.py
490 lines (429 loc) · 23.5 KB
/
tweetset_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
from elasticsearch_dsl.connections import connections
from elasticsearch import helpers
from elasticsearch_dsl import Search, Index
from glob import glob
import gzip
import logging
import json
import argparse
from datetime import datetime
from time import sleep
import os
import math
import multiprocessing
import os.path
from pyspark.sql import SparkSession
from models import TweetIndex, to_tweet, DatasetIndex, to_dataset, DatasetDocument, TweetDocument, get_tweets_index_name
from utils import groupby_size, read_json, short_uid, groupby_date, create_extract_path
from spark_utils import *
from shutil import copy, copyfileobj
import re
log = logging.getLogger(__name__)
connections.create_connection(hosts=[os.environ.get('ES_HOST', 'elasticsearch')], timeout=90, sniff_on_start=True,
sniff_on_connection_fail=True,
retry_on_timeout=True, maxsize=25)
def find_files(path):
"""
Returns (.json files, .json.gz files, .txt files) found in path.
"""
json_filepaths = glob('{}/*.json'.format(path))
json_filepaths.extend(glob('{}/*.jsonl'.format(path)))
dataset_filepath = os.path.join(path, 'dataset.json')
if dataset_filepath in json_filepaths:
json_filepaths.remove(dataset_filepath)
gz_filepaths = glob('{}/*.json.gz'.format(path))
gz_filepaths.extend(glob('{}/*.jsonl.gz'.format(path)))
return (json_filepaths,
gz_filepaths,
glob('{}/*.txt'.format(path)))
def create_json_extracts(json_files, dataset_id, by_date=False, to_size=None):
'''
Concatenates gzipped JSON files to reduce the number of extracts. Either concatenates by date (using the timestamp in the filename) or up to a specified file-size limit.
:param json_files: a list of files to concatenate
:param dataset_id: 6-character unique ID for this dataset
:param by_date: set to True in order to concatenate by date
:param to_size: provide a size (in bytes) to concatenate up to a limit size
'''
if by_date:
grouped_files = groupby_date(json_files)
elif to_size:
grouped_files = groupby_size(json_files, to_size)
# If neither parameter supplied, just copy files over as is
else:
return copy_json(json_files, dataset_id)
json_extract_dir = create_extract_path(dataset_id)
log.info(f'Copying and concatenating {len(json_files)} JSON files to {len(grouped_files)} files at {json_extract_dir}.')
# Create new file for each grouped file and concatenate contents
for name, files in grouped_files.items():
with open(os.path.join(json_extract_dir, f'tweet-{name}.jsonl.gz'), 'wb') as outfile:
# Iterate over the files to be copied
for file in files:
# Uses copyfileobj for fast copying
with open(file, 'rb') as infile:
copyfileobj(infile, outfile)
return
def copy_json(json_files, dataset_id):
'''
Copies JSON (zipped and unzipped) files to the path for full TS extracts.
:param files: list of files to be moved
:param dataset_id: 6-character unique ID for this dataset
'''
json_extract_dir = create_extract_path(dataset_id)
if not os.path.isdir(json_extract_dir):
os.makedirs(json_extract_dir)
log.info(f'Copying {len(json_files)} JSON files to {json_extract_dir}.')
for file in json_files:
copy(file, json_extract_dir)
return
def create_dataset_params(dataset_id):
'''Creates the dataset_params file required by tweetset_server.py to be associated with each full extract.
:param dataset_id: Id for the dataset being loaded.'''
full_dataset_path = os.environ.get('PATH_TO_EXTRACTS')
dataset_params_path = os.path.join(full_dataset_path, dataset_id, 'dataset_params.json')
# minimal dataset parameters expected for full extract
dataset_params = {"tweet_type_original": "true",
"tweet_type_quote": "true",
"tweet_type_retweet": "true",
"tweet_type_reply": "true",
"source_dataset": dataset_id,
"dataset_name": f"full-extract-of-{dataset_id}"}
with open(dataset_params_path, 'w') as f:
json.dump(dataset_params, f)
return
def count_normal_lines(filepath):
return sum(1 for _ in open(filepath))
def count_gz_lines(filepath):
return sum(1 for _ in gzip.open(filepath))
def count_lines(json_files, json_gz_files, txt_files, threads=6):
total_lines = 0
pool = multiprocessing.Pool(threads)
total_lines += sum(pool.imap_unordered(count_normal_lines, json_files))
total_lines += sum(pool.imap_unordered(count_gz_lines, json_gz_files))
total_lines += sum(pool.imap_unordered(count_normal_lines, txt_files))
return total_lines
def count_files(json_files, json_gz_files, txt_files):
return len(json_files) + len(json_gz_files) + len(txt_files)
def tweet_iter(json_files, json_gz_files, txt_files, limit=None, total_tweets=None):
counter = 0
for filepath in json_files:
with open(filepath) as file:
for line in file:
if counter % 10000 == 0:
log.info('{:,} of {:,} tweets'.format(counter, limit or total_tweets or 0))
if counter == limit:
break
counter += 1
yield json.loads(line)
for filepath in json_gz_files:
with gzip.open(filepath, 'rt') as file:
for line in file:
if counter % 10000 == 0:
log.info('{:,} of {:,} tweets'.format(counter, limit or total_tweets or 0))
if counter == limit:
break
counter += 1
yield json.loads(line)
# TODO: Handle hydration
def delete_tweet_index(dataset_identifier):
_, __, existing_index_name = get_tweet_index_state(dataset_identifier)
if existing_index_name:
TweetIndex(existing_index_name).delete(ignore=404)
log.info('Deleted tweets from {}'.format(dataset_identifier))
def get_tweet_index_state(dataset_identifier):
"""
Returns alias name, new index name, existing index name or none
"""
alias_name = get_tweets_index_name(dataset_identifier)
index_name_a = '{}-a'.format(alias_name)
index_name_b = '{}-b'.format(alias_name)
existing_index = None
new_index = index_name_a
# If a exists then a is existing and b is new
if Index(index_name_a).exists():
existing_index = index_name_a
new_index = index_name_b
# Else if b exists then a is new and b is existing
elif Index(index_name_b).exists():
existing_index = index_name_b
# Handle legacy cases where no existing alias
elif Index(alias_name).exists():
existing_index = alias_name
# Else neither exists and a is new
return alias_name, new_index, existing_index
def update_dataset_stats(dataset):
search = Search(index=get_tweets_index_name(dataset.meta.id)).extra(track_total_hits=True)
search = search.query('term', dataset_id=dataset.meta.id)[0:0]
search.aggs.metric('created_at_min', 'min', field='created_at')
search.aggs.metric('created_at_max', 'max', field='created_at')
search_response = search.execute()
dataset.first_tweet_created_at = datetime.utcfromtimestamp(
search_response.aggregations.created_at_min.value / 1000.0)
dataset.last_tweet_created_at = datetime.utcfromtimestamp(
search_response.aggregations.created_at_max.value / 1000.0)
dataset.tweet_count = search_response.hits.total.value
dataset.save()
def clean_tweet_dict(tweet_dict):
new_tweet_dict = tweet_dict['_source']
new_tweet_dict['created_at'] = tweet_dict['_source']['created_at'].isoformat()
new_tweet_dict['tweet_id'] = tweet_dict['_id']
return new_tweet_dict
def shard_count(tweet_count, store_tweet=True):
# In testing, 500K tweets (storing tweet) = 615MB
# Thus, 32.5 million tweets per shard to have a max shard size of 40GB
# In testing, 500k tweets (not storing tweet) = 145MB
# Thus, 138 million tweets per shard to have a max shard size of 40GB
tweets_per_shard = 32500000 if store_tweet else 138000000
return math.ceil(float(tweet_count) / tweets_per_shard) or 1
if __name__ == '__main__':
parser = argparse.ArgumentParser('tweetset_loader')
parser.add_argument('--debug', action='store_true')
# Subparsers
subparsers = parser.add_subparsers(dest='command', help='command help')
update_parser = subparsers.add_parser('update', help='update dataset metadata (including statistics)')
update_parser.add_argument('dataset_identifier', help='identifier (a UUID) for the dataset')
update_parser.add_argument('path', help='path of dataset')
update_parser.add_argument('--filename', help='filename of dataset file', default='dataset.json')
delete_parser = subparsers.add_parser('delete', help='delete dataset and tweets')
delete_parser.add_argument('dataset_identifier', help='identifier (a UUID) for the dataset')
reload_parser = subparsers.add_parser('reload', help='reload the tweets in a dataset')
reload_parser.add_argument('dataset_identifier', help='identifier (a UUID) for the dataset')
reload_parser.add_argument('path', help='path of the directory containing the tweet files')
reload_parser.add_argument('--limit', type=int, help='limit the number of tweets to load')
reload_parser.add_argument('--skip-count', action='store_true', help='skip count the tweets')
reload_parser.add_argument('--store-tweet', action='store_true', help='store the entire tweet')
reload_parser.add_argument('--replicas', type=int, default='1', help='number of replicas to make of this dataset')
reload_parser.add_argument('--threads', type=int, default='2', help='number of loading threads')
reload_parser.add_argument('--chunk-size', type=int, default='500', help='size of indexing chunk')
spark_reload_parser = subparsers.add_parser('spark-reload', help='reload the tweets in a dataset using spark')
spark_reload_parser.add_argument('dataset_identifier', help='identifier (a UUID) for the dataset')
spark_reload_parser.add_argument('path', help='path of the directory containing the tweet files')
spark_reload_parser.add_argument('--skip-count', action='store_true', help='skip count the tweets')
spark_reload_parser.add_argument('--store-tweet', action='store_true', help='store the entire tweet')
spark_reload_parser.add_argument('--replicas', type=int, default='1',
help='number of replicas to make of this dataset')
dataset_parser = subparsers.add_parser('create', help='create a dataset and add tweets')
dataset_parser.add_argument('path', help='path of dataset')
dataset_parser.add_argument('--filename', help='filename of dataset file', default='dataset.json')
dataset_parser.add_argument('--limit', type=int, help='limit the number of tweets to load')
dataset_parser.add_argument('--skip-count', action='store_true', help='skip count the tweets')
dataset_parser.add_argument('--store-tweet', action='store_true', help='store the entire tweet')
dataset_parser.add_argument('--shards', type=int, help='number of shards for this dataset')
dataset_parser.add_argument('--replicas', type=int, default='1', help='number of replicas to make of this dataset')
dataset_parser.add_argument('--threads', type=int, default='2', help='number of loading threads')
dataset_parser.add_argument('--chunk-size', type=int, default='500', help='size of indexing chunk')
dataset_parser.add_argument('--dataset-identifier', help='identifier (a UUID) for the dataset')
spark_create_parser = subparsers.add_parser('spark-create', help='create a dataset and add tweets using spark')
spark_create_parser.add_argument('path', help='path of dataset')
spark_create_parser.add_argument('--filename', help='filename of dataset file', default='dataset.json')
spark_create_parser.add_argument('--skip-count', action='store_true', help='skip count the tweets')
spark_create_parser.add_argument('--store-tweet', action='store_true', help='store the entire tweet')
spark_create_parser.add_argument('--shards', type=int, help='number of shards for this dataset')
spark_create_parser.add_argument('--replicas', type=int, default='1',
help='number of replicas to make of this dataset')
spark_create_parser.add_argument('--dataset-identifier', help='identifier (a UUID) for the dataset')
subparsers.add_parser('clear', help='delete all indexes')
args = parser.parse_args()
# Logging
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO
)
if not args.command:
parser.print_help()
quit()
# Create indexs if they doesn't exist
dataset_index = DatasetIndex()
dataset_index.document(DatasetDocument)
dataset_index.create(ignore=400)
dataset_id = args.dataset_identifier if hasattr(args, 'dataset_identifier') else None
tweet_index = None
dataset = None
if args.command == 'delete':
dataset = DatasetDocument.get(dataset_id)
if not dataset:
raise Exception('{} not found'.format(args.dataset_id))
dataset.delete()
log.info('Deleted {}'.format(dataset.meta.id))
delete_tweet_index(dataset_id)
if args.command == 'update':
dataset = to_dataset(read_json(os.path.join(args.path, args.filename)),
dataset_id=dataset_id)
dataset.save()
update_dataset_stats(dataset)
log.info('Updated dataset {}'.format(dataset_id))
print('Updated dataset {}'.format(dataset_id))
if args.command in ('create', 'spark-create'):
if dataset_id is None:
dataset_id = short_uid(6, exists_func=lambda uid: DatasetDocument.get(uid, ignore=404) is not None)
dataset = to_dataset(read_json(os.path.join(args.path, args.filename)),
dataset_id=dataset_id)
dataset.save()
log.info('Created {}'.format(dataset_id))
print('Dataset id is {}'.format(dataset_id))
if args.command in ('create', 'reload'):
store_tweet = os.environ.get('STORE_TWEET', 'false').lower() == 'true' or args.store_tweet
if store_tweet:
log.info('Storing tweet')
dataset = DatasetDocument.get(dataset_id)
if not dataset:
raise Exception('{} not found'.format(dataset_id))
filepaths = find_files(args.path)
file_count = count_files(*filepaths)
tweet_count = 0
if not args.skip_count:
log.info('Counting tweets in %s files.', file_count)
tweet_count = count_lines(*filepaths)
log.info('{:,} total tweets'.format(tweet_count))
shards = (args.shards if hasattr(args, 'shards') else None) or shard_count(tweet_count, store_tweet)
log.info('Using %s shards and %s replicas for index.', shards, args.replicas)
alias_name, new_index_name, existing_index_name = get_tweet_index_state(dataset_id)
log.debug('Index name is %s', new_index_name)
tweet_index = TweetIndex(new_index_name, shards=shards, replicas=0, refresh_interval=-1)
tweet_index.document(TweetDocument)
tweet_index.create()
log.debug('Indexing using %s threads', args.threads)
for success, info in helpers.parallel_bulk(connections.get_connection(),
(to_tweet(tweet_json, dataset_id, new_index_name,
store_tweet=store_tweet).to_dict(
include_meta=True) for
tweet_json in
tweet_iter(*filepaths, limit=args.limit,
total_tweets=tweet_count)),
thread_count=args.threads, chunk_size=args.chunk_size):
log.debug('Success: %s. %s', success, info)
if args.command in ('spark-create', 'spark-reload'):
store_tweet = os.environ.get('STORE_TWEET', 'false').lower() == 'true' or args.store_tweet
if store_tweet:
log.info('Storing tweet')
dataset = DatasetDocument.get(dataset_id)
if not dataset:
raise Exception('{} not found'.format(dataset_id))
filepaths = find_files(args.path)
file_count = count_files(*filepaths)
tweet_count = 0
if not args.skip_count:
log.info('Counting tweets in %s files.', file_count)
tweet_count = count_lines(*filepaths)
log.info('{:,} total tweets'.format(tweet_count))
tweets_per_shard = 32500000 if store_tweet else 138000000
shards = (args.shards if hasattr(args, 'shards') else None) or max(shard_count(tweet_count, store_tweet), 4)
log.info('Using %s shards and %s replicas for index.', shards, args.replicas)
alias_name, new_index_name, existing_index_name = get_tweet_index_state(dataset_id)
log.debug('Index name is %s', new_index_name)
tweet_index = TweetIndex(new_index_name, shards=shards, replicas=0, refresh_interval=-1)
tweet_index.document(TweetDocument)
tweet_index.create()
spark = SparkSession.builder.appName('TweetSets').getOrCreate()
# Make Spark v3 use the v2 time parser
# TO DO --> update Spark SQL code to use the new time parser
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
# Set UTC as the time zone
spark.conf.set('spark.sql.session.timeZone', 'UTC')
# List of JSON files to load (for Spark, loads only *.json and *.gz)
filepath_list = []
filepath_list.extend(filepaths[0])
filepath_list.extend(filepaths[1])
# path at which to store full extracts
full_dataset_path = os.environ.get('PATH_TO_EXTRACTS')
full_dataset_path = os.path.join(full_dataset_path, dataset_id)
if not os.path.isdir(full_dataset_path):
os.mkdir(full_dataset_path)
try:
es_conf = {"es.nodes": os.environ.get('ES_HOST', 'elasticsearch'),
"es.port": "9200",
"es.index.auto.create": "false",
"es.mapping.id": "tweet_id",
"es.resource": "{}/_doc".format(new_index_name)}
def to_tweet_dict(tweet_str):
return clean_tweet_dict(
to_tweet(json.loads(tweet_str), dataset_id, '', store_tweet=True).to_dict(include_meta=True))
# Calculate number of initial partitions, based on size of dataset
num_partitions = compute_read_partitions(filepath_list)
# read dataset as RDD (for loading into Elasticsearch)
tweets_str_rdd = spark.sparkContext.textFile(','.join(filepath_list))
# repartition or coalesce for improved performance
tweets_str_rdd = apply_partitions(tweets_str_rdd, num_partitions, filepath_list)
# Apply transform
tweets_rdd = tweets_str_rdd.map(to_tweet_dict).map(lambda row: (row['tweet_id'], row))
log.info('Saving tweets to Elasticsearch.')
tweets_rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)
# Calculate number of partitions per extract, given number of tweets
# If not supplied, all partitions default to 1
# This number determines how many files will be created by the Spark DataFrameWriter
file_partitions = compute_output_partitions(tweet_count)
# Load Spark schema and SQL for preparing TweetSets docs for Elasticsearch
ts_schema = load_schema('./tweetsets_schema.json')
ts_sql = load_sql('./tweetsets_sql_exp.sql')
df = make_spark_df(spark,
schema=ts_schema,
sql=ts_sql,
path_to_dataset=filepath_list,
dataset_id=dataset_id,
num_partitions=num_partitions)
# Create and save tweet ID's
tweet_ids_path = os.path.join(full_dataset_path, 'tweet-ids')
log.info(f'Saving tweet IDs to {tweet_ids_path}.')
tweet_ids = extract_tweet_ids(df)
save_to_csv(tweet_ids, tweet_ids_path, num_partitions=file_partitions['ids'])
# Create and save tweet CSV
tweet_csv_path = os.path.join(full_dataset_path, 'tweet-csv')
log.info(f'Saving CSV extracts to {tweet_csv_path}.')
# Setting the escape character to the double quote. Otherwise, it causes problems for applications reading the CSV.
tweet_csv = extract_csv(df)
save_to_csv(tweet_csv, tweet_csv_path, num_partitions=file_partitions['csv'])
# Create and save mentions
mentions_path = os.path.join(full_dataset_path, 'tweet-mentions')
log.info(f'Saving tweet mentions to {mentions_path}.')
mentions_nodes, mentions_edges = extract_mentions(df, spark)
save_to_csv(mentions_nodes,
path_to_extract=os.path.join(mentions_path, 'nodes'),
num_partitions=file_partitions['mentions-nodes']
)
save_to_csv(mentions_edges,
path_to_extract=os.path.join(mentions_path, 'edges'),
num_partitions=file_partitions['mentions-edges']
)
mentions_agg = extract_agg_mentions(df, spark)
save_to_csv(mentions_agg,
path_to_extract=os.path.join(mentions_path, 'agg-mentions'),
num_partitions=file_partitions['mentions-agg']
)
# Create and save user counts
users_path = os.path.join(full_dataset_path, 'tweet-users')
users = extract_agg_users(df, spark)
save_to_csv(users,
path_to_extract=users_path,
num_partitions=file_partitions['users']
)
finally:
spark.stop()
# Copy full JSON tweet files to extracts directory
create_json_extracts(filepath_list, dataset_id, by_date=True)
# Create dataset params file
create_dataset_params(dataset_id)
if args.command in ('create', 'reload', 'spark-create', 'spark-reload'):
log.debug('Setting replicas and refresh interval')
tweet_index.put_settings(body={
'number_of_replicas': args.replicas, 'refresh_interval': '1s'})
# Delete existing index
if existing_index_name:
log.debug('Deleting existing index %s', existing_index_name)
TweetIndex(existing_index_name).delete()
# Add aliases
log.debug('Adding alias %s to %s', alias_name, new_index_name)
tweet_index.put_alias(name=alias_name)
# Get number of tweets in dataset and update
sleep(5)
update_dataset_stats(dataset)
if args.command == 'clear':
search = DatasetDocument.search()
for dataset in search.execute():
delete_tweet_index(dataset.meta.id)
dataset_index.delete()
log.info("Deleted indexes")