1+ import time
12import glob
23import json
34import os .path
89import duckdb
910
1011
11- def advice (prefix , crawl ):
12+ def index_download_advice (prefix , crawl ):
1213 print ('Do you need to download this index?' )
1314 print (f' mkdir -p { prefix } /commmoncrawl/cc-index/table/cc-main/warc/crawl={ crawl } /subset=warc/' )
1415 print (f' cd { prefix } /commmoncrawl/cc-index/table/cc-main/warc/crawl={ crawl } /subset=warc/' )
1516 print (f' aws s3 sync s3://commoncrawl/cc-index/table/cc-main/warc/crawl={ crawl } /subset=warc/ .' )
1617
1718
19+ def print_row_as_cdxj (row ):
20+ df = row .fetchdf ()
21+ for ro in df .itertuples (index = False ):
22+ d = ro ._asdict ()
23+ cdxjd = {
24+ 'url' : d ['url' ],
25+ 'mime' : d ['content_mime_type' ],
26+ 'status' : str (d ['fetch_status' ]),
27+ 'digest' : 'sha1:' + d ['content_digest' ],
28+ 'length' : str (d ['warc_record_length' ]),
29+ 'offset' : str (d ['warc_record_offset' ]),
30+ 'filename' : d ['warc_filename' ],
31+ }
32+
33+ timestamp = d ['fetch_time' ].isoformat (sep = 'T' )
34+ timestamp = timestamp .translate (str .maketrans ('' , '' , '-T :Z' )).replace ('+0000' , '' )
35+
36+ print (d ['url_surtkey' ], timestamp , json .dumps (cdxjd ))
37+
38+
39+ def print_row_as_kv_list (row ):
40+ df = row .fetchdf ()
41+ for ro in df .itertuples (index = False ):
42+ d = ro ._asdict ()
43+ for k , v in d .items ():
44+ print (' ' , k , v )
45+
46+
1847all_algos = ('s3_glob' , 'local_files' , 'ccf_local_files' , 'cloudfront_glob' , 'cloudfront' )
1948
2049
@@ -28,12 +57,12 @@ def get_files(algo, crawl):
2857 files = glob .glob (files )
2958 # did we already download? we expect 300 files of about a gigabyte
3059 if len (files ) < 250 :
31- advice ('~' , crawl )
60+ index_download_advice ('~' , crawl )
3261 exit (1 )
3362 elif algo == 'ccf_local_files' :
3463 files = glob .glob (f'/home/cc-pds/commoncrawl/cc-index/table/cc-main/warc/crawl={ crawl } /subset=warc/*.parquet' )
3564 if len (files ) < 250 :
36- advice ('/home/cc-pds' , crawl )
65+ index_download_advice ('/home/cc-pds' , crawl )
3766 exit (1 )
3867 elif algo == 'cloudfront_glob' :
3968 # duckdb can't glob this, same reason as s3_glob above
@@ -54,12 +83,44 @@ def get_files(algo, crawl):
5483
5584def main (algo , crawl ):
5685 files = get_files (algo , crawl )
57- ccindex = duckdb .read_parquet (files , hive_partitioning = True )
86+ retries_left = 100
87+
88+ while True :
89+ try :
90+ ccindex = duckdb .read_parquet (files , hive_partitioning = True )
91+ break
92+ except (duckdb .HTTPException , duckdb .InvalidInputException ) as e :
93+ # read_parquet exception seen: HTTPException("HTTP Error: HTTP GET error on 'https://...' (HTTP 403)")
94+ # duckdb.duckdb.InvalidInputException: Invalid Input Error: No magic bytes found at end of file 'https://...'
95+ print ('read_parquet exception seen:' , repr (e ), file = sys .stderr )
96+ if retries_left :
97+ print ('sleeping for 60s' , file = sys .stderr )
98+ time .sleep (60 )
99+ retries_left -= 1
100+ else :
101+ raise
102+
103+ duckdb .sql ('SET enable_progress_bar = true;' )
104+ duckdb .sql ('SET http_retries = 100;' )
105+ #duckdb.sql("SET enable_http_logging = true;SET http_logging_output = 'duck.http.log'")
58106
59107 print ('total records for crawl:' , crawl )
60- print (duckdb .sql ('SELECT COUNT(*) FROM ccindex;' ))
61-
62- sq2 = '''
108+ retries_left = 100
109+ while True :
110+ try :
111+ print (duckdb .sql ('SELECT COUNT(*) FROM ccindex;' ))
112+ break
113+ except duckdb .InvalidInputException as e :
114+ # duckdb.duckdb.InvalidInputException: Invalid Input Error: No magic bytes found at end of file 'https://...'
115+ print ('duckdb exception seen:' , repr (e ), file = sys .stderr )
116+ if retries_left :
117+ print ('sleeping for 10s' , file = sys .stderr )
118+ time .sleep (10 )
119+ retries_left -= 1
120+ else :
121+ raise
122+
123+ sq2 = f'''
63124 select
64125 *
65126 from ccindex
@@ -73,7 +134,19 @@ def main(algo, crawl):
73134
74135 row2 = duckdb .sql (sq2 )
75136 print ('our one row' )
76- row2 .show ()
137+ while True :
138+ try :
139+ row2 .show ()
140+ break
141+ except duckdb .InvalidInputException as e :
142+ # duckdb.duckdb.InvalidInputException: Invalid Input Error: No magic bytes found at end of file 'https://...'
143+ print ('duckdb exception seen:' , repr (e ), file = sys .stderr )
144+ if retries_left :
145+ print ('sleeping for 10s' , file = sys .stderr )
146+ time .sleep (10 )
147+ retries_left -= 1
148+ else :
149+ raise
77150
78151 print ('writing our one row to a local parquet file, whirlwind.parquet' )
79152 row2 .write_parquet ('whirlwind.parquet' )
@@ -89,29 +162,11 @@ def main(algo, crawl):
89162 row3 .show ()
90163
91164 print ('complete row:' )
92- df = row3 .fetchdf ()
93- for row in df .itertuples (index = False ):
94- d = row ._asdict ()
95- for k , v in d .items ():
96- print (' ' , k , v )
165+ print_row_as_kv_list (row3 )
97166 print ('' )
98167
99168 print ('equivalent to cdxj:' )
100-
101- cdxjd = {
102- 'url' : d ['url' ],
103- 'mime' : d ['content_mime_type' ],
104- 'status' : str (d ['fetch_status' ]),
105- 'digest' : 'sha1:' + d ['content_digest' ],
106- 'length' : str (d ['warc_record_length' ]),
107- 'offset' : str (d ['warc_record_offset' ]),
108- 'filename' : d ['warc_filename' ],
109- }
110-
111- timestamp = d ['fetch_time' ].isoformat (sep = 'T' )
112- timestamp = timestamp .translate (str .maketrans ('' , '' , '-T :Z' )).replace ('+0000' , '' )
113-
114- print (d ['url_surtkey' ], timestamp , json .dumps (cdxjd ))
169+ print_row_as_cdxj (row3 )
115170
116171
117172if __name__ == '__main__' :
0 commit comments