4
4
https://github.com/toddwschneider/nyc-taxi-data
5
5
"""
6
6
import concurrent .futures
7
+ import gzip
7
8
import io
8
9
import multiprocessing
9
10
from collections import OrderedDict
@@ -92,10 +93,10 @@ def parse_row(row: OrderedDict):
92
93
93
94
return Point ("taxi-trip-data" ) \
94
95
.tag ("dispatching_base_num" , row ['dispatching_base_num' ]) \
95
- .tag ("PULocationID" , row ['PULocationID ' ]) \
96
- .tag ("DOLocationID" , row ['DOLocationID ' ]) \
96
+ .tag ("PULocationID" , row ['PUlocationID ' ]) \
97
+ .tag ("DOLocationID" , row ['DOlocationID ' ]) \
97
98
.tag ("SR_Flag" , row ['SR_Flag' ]) \
98
- .field ("dropoff_datetime" , row ['dropoff_datetime ' ]) \
99
+ .field ("dropoff_datetime" , row ['dropOff_datetime ' ]) \
99
100
.time (row ['pickup_datetime' ]) \
100
101
.to_line_protocol ()
101
102
@@ -113,7 +114,7 @@ def parse_rows(rows, total_size):
113
114
counter_ .value += len (_parsed_rows )
114
115
if counter_ .value % 10_000 == 0 :
115
116
print ('{0:8}{1}' .format (counter_ .value , ' - {0:.2f} %'
116
- .format (100 * float (progress_ .value ) / float (int (total_size ))) if total_size else "" ))
117
+ .format (float (progress_ .value ) / float (int (total_size ))) if total_size else "" ))
117
118
pass
118
119
119
120
queue_ .put (_parsed_rows )
@@ -141,80 +142,80 @@ def init_counter(counter, progress, queue):
141
142
progress_ = Value ('i' , 0 )
142
143
startTime = datetime .now ()
143
144
144
- url = "https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_2019-01.csv"
145
- # url = "file:///Users/bednar/Developer/influxdata/influxdb-client-python/examples/fhv_tripdata_2019-01.csv"
145
+ url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-01.csv.gz"
146
146
147
147
"""
148
148
Open URL and for stream data
149
149
"""
150
150
response = urlopen (url )
151
- if response .headers :
152
- content_length = response .headers ['Content-length' ]
153
- io_wrapper = ProgressTextIOWrapper (response )
154
- io_wrapper .progress = progress_
151
+ # we can't get content length from response because the gzip stream content length is unknown
152
+ # so we set it to this value, just for progress display
153
+ content_length = 23143223
155
154
156
155
"""
157
- Start writer as a new process
156
+ Open GZIP stream
158
157
"""
159
- writer = InfluxDBWriter (queue_ )
160
- writer .start ()
158
+ with gzip .open (response , 'rb' ) as stream :
159
+ io_wrapper = ProgressTextIOWrapper (stream , encoding = 'utf-8' )
160
+ io_wrapper .progress = progress_
161
161
162
- """
163
- Create process pool for parallel encoding into LineProtocol
164
- """
165
- cpu_count = multiprocessing .cpu_count ()
166
- with concurrent .futures .ProcessPoolExecutor (cpu_count , initializer = init_counter ,
167
- initargs = (counter_ , progress_ , queue_ )) as executor :
168
162
"""
169
- Converts incoming HTTP stream into sequence of LineProtocol
163
+ Start writer as a new process
170
164
"""
171
- data = rx \
172
- .from_iterable (DictReader (io_wrapper )) \
173
- .pipe (ops .buffer_with_count (10_000 ),
174
- # Parse 10_000 rows into LineProtocol on subprocess
175
- ops .flat_map (lambda rows : executor .submit (parse_rows , rows , content_length )))
165
+ writer = InfluxDBWriter (queue_ )
166
+ writer .start ()
176
167
177
168
"""
178
- Write data into InfluxDB
169
+ Create process pool for parallel encoding into LineProtocol
179
170
"""
180
- data .subscribe (on_next = lambda x : None , on_error = lambda ex : print (f'Unexpected error: { ex } ' ))
181
-
182
- """
183
- Terminate Writer
184
- """
185
- queue_ .put (None )
186
- queue_ .join ()
171
+ cpu_count = multiprocessing .cpu_count ()
172
+ with concurrent .futures .ProcessPoolExecutor (cpu_count , initializer = init_counter ,
173
+ initargs = (counter_ , progress_ , queue_ )) as executor :
174
+ """
175
+ Converts incoming HTTP stream into sequence of LineProtocol
176
+ """
177
+ data = rx \
178
+ .from_iterable (DictReader (io_wrapper )) \
179
+ .pipe (ops .buffer_with_count (10_000 ),
180
+ # Parse 10_000 rows into LineProtocol on subprocess
181
+ ops .map (lambda rows : executor .submit (parse_rows , rows , content_length )))
182
+
183
+ """
184
+ Write data into InfluxDB
185
+ """
186
+ data .subscribe (on_next = lambda x : None , on_error = lambda ex : print (f'Unexpected error: { ex } ' ))
187
187
188
- print ()
189
- print (f'Import finished in: { datetime .now () - startTime } ' )
190
- print ()
191
-
192
- """
193
- Querying 10 pickups from dispatching 'B00008'
194
- """
195
- query = 'from(bucket:"my-bucket")' \
196
- '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \
197
- '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \
198
- '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \
199
- '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
200
- '|> rename(columns: {_time: "pickup_datetime"})' \
201
- '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)'
202
-
203
- client = InfluxDBClient (url = "http://localhost:8086" , token = "my-token" , org = "my-org" , debug = False )
204
- result = client .query_api ().query (query = query )
188
+ """
189
+ Terminate Writer
190
+ """
191
+ queue_ .put (None )
192
+ queue_ .join ()
205
193
206
- """
207
- Processing results
208
- """
209
- print ()
210
- print ("=== Querying 10 pickups from dispatching 'B00008' ===" )
211
- print ()
212
- for table in result :
213
- for record in table .records :
214
- print (
215
- f'Dispatching: { record ["dispatching_base_num" ]} pickup: { record ["pickup_datetime" ]} dropoff: { record ["dropoff_datetime" ]} ' )
194
+ print ()
195
+ print (f'Import finished in: { datetime .now () - startTime } ' )
196
+ print ()
216
197
217
- """
218
- Close client
219
- """
220
- client .close ()
198
+ """
199
+ Querying 10 pickups from dispatching 'B00008'
200
+ """
201
+ query = 'from(bucket:"my-bucket")' \
202
+ '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \
203
+ '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \
204
+ '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \
205
+ '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
206
+ '|> rename(columns: {_time: "pickup_datetime"})' \
207
+ '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)'
208
+
209
+ with InfluxDBClient (url = "http://localhost:8086" , token = "my-token" , org = "my-org" , debug = False ) as client :
210
+ result = client .query_api ().query (query = query )
211
+
212
+ """
213
+ Processing results
214
+ """
215
+ print ()
216
+ print ("=== Querying 10 pickups from dispatching 'B00008' ===" )
217
+ print ()
218
+ for table in result :
219
+ for record in table .records :
220
+ print (
221
+ f'Dispatching: { record ["dispatching_base_num" ]} pickup: { record ["pickup_datetime" ]} dropoff: { record ["dropoff_datetime" ]} ' )
0 commit comments