Skip to content

Commit 5f8d066

Browse files
committed
Creates PoC pipeline to estimate disk usage of vcf_to_bq on Dataflow.
The pipeline uses raw file size and raw+encoded sizes of a short snippet at beginning of VCF files to estimate the encoded size for a commit. The major blocking bug is that when the snippets are being read from VCFs in an encoded format, lines are being read more than once.
1 parent 60f36ef commit 5f8d066

File tree

7 files changed

+344
-30
lines changed

7 files changed

+344
-30
lines changed
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
# Copyright 2018 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""A source for estimating the size of VCF files when processed by vcf_to_bq."""
16+
17+
from __future__ import absolute_import
18+
19+
from typing import Iterable, List, Tuple # pylint: disable=unused-import
20+
import logging
21+
22+
import apache_beam as beam
23+
from apache_beam import coders
24+
from apache_beam import transforms
25+
from apache_beam.io import filebasedsource
26+
from apache_beam.io import range_trackers # pylint: disable=unused-import
27+
from apache_beam.io import filesystem
28+
from apache_beam.io import filesystems
29+
from apache_beam.io import iobase
30+
31+
from gcp_variant_transforms.beam_io import vcfio
32+
33+
34+
def _get_file_sizes(file_pattern):
35+
# type: (str) -> List[FileSizeInfo]
36+
file_sizes = []
37+
match_result = filesystems.FileSystems.match([file_pattern])[0]
38+
for file_metadata in match_result.metadata_list:
39+
compression_type = filesystem.CompressionTypes.detect_compression_type(
40+
file_metadata.path)
41+
if compression_type != filesystem.CompressionTypes.UNCOMPRESSED:
42+
logging.error("VCF file %s is compressed; disk requirement estimator "
43+
"will not be accurate.", file_metadata.path)
44+
file_sizes.append((file_metadata.path, file_metadata.size_in_bytes,))
45+
return file_sizes
46+
47+
48+
def _convert_variants_to_bytesize(variant):
49+
# type: (vcfio.Variant) -> int
50+
return coders.registry.get_coder(vcfio.Variant).estimate_size(variant)
51+
52+
53+
class FileSizeInfo(object):
54+
def __init__(self, name, raw_file_size, encoded_file_size=None):
55+
# type: (str, int, int) -> None
56+
self.name = name
57+
self.raw_size = raw_file_size
58+
self.encoded_size = encoded_file_size # Optional, useful for SumFn.
59+
60+
def estimate_encoded_file_size(self, raw_sample_size, encoded_sample_size):
61+
# type: (int, int) -> None
62+
"""Estimates a VCF file's encoded (byte) size by analyzing sample Variants.
63+
64+
Given the raw_file_size and measurements of several VCF lines from the file,
65+
estimate how much disk the file will take after expansion due to encoding
66+
lines as `vcfio.Variant` objects. The encoded_sample_size will be set as
67+
`self.encoded`.
68+
69+
This is a simple ratio problem, solving for encoded_sample_size which is
70+
the only unknown:
71+
encoded_sample_size / raw_sample_size = encoded_file_size / raw_file_size
72+
"""
73+
if raw_sample_size == 0:
74+
# Propagate in-band error state to avoid divide-by-zero.
75+
logging.warning("File %s appears to have no valid Variant lines. File "
76+
"will be ignored for size estimation.", self.name)
77+
self.encoded_size = 0
78+
self.raw_size = 0
79+
else:
80+
self.encoded_size = (self.raw_size * encoded_sample_size /
81+
raw_sample_size)
82+
83+
84+
class FileSizeInfoSumFn(beam.CombineFn):
85+
"""Combiner Function to sum up the size fields of FileSizeInfo objects.
86+
87+
Unlike VariantsSizeInfoSumFn, the input is a PTable mapping str to
88+
FileSizeInfo, so the input is a tuple with the FileSizeInfos as the second
89+
field. The output strips out the str key which represents the file path.
90+
91+
Example: [FileSizeInfo(a, b), FileSizeInfo(c, d)] -> FileSizeInfo(a+c, b+d)
92+
"""
93+
def create_accumulator(self):
94+
# type: (None) -> Tuple[int, int]
95+
return (0, 0) # (raw, encoded) sums
96+
97+
def add_input(self, (raw, encoded), file_size_info):
98+
# type: (Tuple[int, int], FileSizeInfo) -> Tuple[int, int]
99+
return raw + file_size_info.raw_size, encoded + file_size_info.encoded_size
100+
101+
def merge_accumulators(self, accumulators):
102+
# type: (Iterable[Tuple[int, int]]) -> Tuple[int, int]
103+
raw, encoded = zip(*accumulators)
104+
return sum(raw), sum(encoded)
105+
106+
def extract_output(self, (raw, encoded)):
107+
# type: (Tuple[int, int]) -> FileSizeInfo
108+
return FileSizeInfo("cumulative", raw, encoded)
109+
110+
111+
class _EstimateVcfSizeSource(filebasedsource.FileBasedSource):
112+
"""A source for estimating the encoded size of a VCF file in `vcf_to_bq`.
113+
114+
This source first reads a limited number of variants from a set of VCF files,
115+
then
116+
117+
Lines that are malformed are skipped.
118+
119+
Parses VCF files (version 4) using PyVCF library.
120+
"""
121+
122+
DEFAULT_VCF_READ_BUFFER_SIZE = 65536 # 64kB
123+
124+
def __init__(self,
125+
file_pattern,
126+
sample_size,
127+
compression_type=filesystem.CompressionTypes.AUTO,
128+
validate=True,
129+
vcf_parser_type=vcfio.VcfParserType.PYVCF):
130+
# type: (str, int, str, bool, vcfio.VcfParserType) -> None
131+
super(_EstimateVcfSizeSource, self).__init__(
132+
file_pattern,
133+
compression_type=compression_type,
134+
validate=validate,
135+
splittable=False)
136+
self._compression_type = compression_type
137+
self._sample_size = sample_size
138+
self._vcf_parser_type = vcf_parser_type
139+
140+
def read_records(
141+
self,
142+
file_name, # type: str
143+
range_tracker # type: range_trackers.UnsplittableRangeTracker
144+
):
145+
# type: (...) -> Iterable[Tuple[str, str, vcfio.Variant]]
146+
"""Iterator to emit lines encoded as `Variant` objects."""
147+
vcf_parser_class = vcfio.get_vcf_parser(self._vcf_parser_type)
148+
record_iterator = vcf_parser_class(
149+
file_name,
150+
range_tracker,
151+
self._pattern,
152+
self._compression_type,
153+
allow_malformed_records=True,
154+
representative_header_lines=None,
155+
buffer_size=self.DEFAULT_VCF_READ_BUFFER_SIZE,
156+
skip_header_lines=0)
157+
158+
_, raw_file_size = _get_file_sizes(file_name)[0]
159+
160+
# Open distinct channel to read lines as raw bytestrings.
161+
with filesystems.FileSystems.open(file_name,
162+
self._compression_type) as raw_reader:
163+
raw_record = raw_reader.readline()
164+
while raw_record and raw_record.startswith('#'):
165+
# Skip headers, assume header size is negligible.
166+
raw_record = raw_reader.readline()
167+
168+
count, raw_size, encoded_size = 0, 0, 0
169+
for encoded_record in record_iterator:
170+
if count >= self._sample_size:
171+
break
172+
if not isinstance(encoded_record, vcfio.Variant):
173+
logging.error(
174+
"Skipping VCF line that could not be decoded as a "
175+
"`vcfio.Variant` in file %s: %s", file_name, raw_record)
176+
continue
177+
178+
raw_size += len(raw_record)
179+
encoded_size += _convert_variants_to_bytesize(encoded_record)
180+
count += 1
181+
182+
raw_record = raw_reader.readline() # Increment raw iterator.
183+
file_size_info = FileSizeInfo(file_name, raw_file_size)
184+
file_size_info.estimate_encoded_file_size(raw_size, encoded_size)
185+
yield file_size_info
186+
187+
188+
class EstimateVcfSize(transforms.PTransform):
189+
"""A PTransform for reading a limited number of lines from a set of VCF files.
190+
191+
Output will be a PTable mapping from `file names -> Tuple[(line, Variant)]`
192+
objects. The list contains the first `sample_size` number of lines that are
193+
not malformed, first as a raw string and then encoded as a `Variant` class.
194+
195+
Parses VCF files (version 4) using PyVCF library.
196+
"""
197+
198+
def __init__(
199+
self,
200+
file_pattern, # type: str
201+
sample_size, # type: int
202+
compression_type=filesystem.CompressionTypes.AUTO, # type: str
203+
validate=True, # type: bool
204+
**kwargs # type: **str
205+
):
206+
# type: (...) -> None
207+
"""Initialize the :class:`ReadVcfHeaders` transform.
208+
209+
Args:
210+
file_pattern: The file path to read from either as a single file or a glob
211+
pattern.
212+
sample_size: The number of lines that should be read from the file.
213+
compression_type: Used to handle compressed input files.
214+
Typical value is :attr:`CompressionTypes.AUTO
215+
<apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the
216+
underlying file_path's extension will be used to detect the compression.
217+
validate: Flag to verify that the files exist during the pipeline creation
218+
time.
219+
"""
220+
super(EstimateVcfSize, self).__init__(**kwargs)
221+
self._source = _EstimateVcfSizeSource(
222+
file_pattern, sample_size, compression_type, validate=validate)
223+
224+
def expand(self, pvalue):
225+
return pvalue.pipeline | iobase.Read(self._source)

gcp_variant_transforms/beam_io/vcf_file_size_io_test.py

Whitespace-only changes.

gcp_variant_transforms/beam_io/vcfio.py

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
from __future__ import absolute_import
2121

22-
from typing import Any, Iterable, List, Tuple # pylint: disable=unused-import
22+
from typing import Any, Iterable, List, Tuple, Type # pylint: disable=unused-import
2323
from functools import partial
2424
import enum
2525

@@ -29,9 +29,9 @@
2929
from apache_beam.io import filesystems
3030
from apache_beam.io import range_trackers # pylint: disable=unused-import
3131
from apache_beam.io import textio
32-
from apache_beam.io.filesystem import CompressionTypes
33-
from apache_beam.io.iobase import Read
34-
from apache_beam.transforms import PTransform
32+
from apache_beam.io import filesystem
33+
from apache_beam.io import iobase
34+
from apache_beam import transforms
3535

3636
from gcp_variant_transforms.beam_io import vcf_parser
3737

@@ -55,6 +55,16 @@ class VcfParserType(enum.Enum):
5555
PYVCF = 0
5656
NUCLEUS = 1
5757

58+
def get_vcf_parser(vcf_parser_type):
59+
# type: (VcfParserType) -> Type[vcf_parser.VcfParser]
60+
if vcf_parser_type == VcfParserType.PYVCF:
61+
return vcf_parser.PyVcfParser
62+
elif vcf_parser_type == VcfParserType.NUCLEUS:
63+
return vcf_parser.NucleusParser
64+
else:
65+
raise ValueError(
66+
'Unrecognized _vcf_parser_type: %s.' % str(vcf_parser_type))
67+
5868

5969
class _ToVcfRecordCoder(coders.Coder):
6070
"""Coder for encoding :class:`Variant` objects as VCF text lines."""
@@ -192,7 +202,7 @@ class _VcfSource(filebasedsource.FileBasedSource):
192202
def __init__(self,
193203
file_pattern, # type: str
194204
representative_header_lines=None, # type: List[str]
195-
compression_type=CompressionTypes.AUTO, # type: str
205+
compression_type=filesystem.CompressionTypes.AUTO, # type: str
196206
buffer_size=DEFAULT_VCF_READ_BUFFER_SIZE, # type: int
197207
validate=True, # type: bool
198208
allow_malformed_records=False, # type: bool
@@ -213,14 +223,7 @@ def read_records(self,
213223
range_tracker # type: range_trackers.OffsetRangeTracker
214224
):
215225
# type: (...) -> Iterable[MalformedVcfRecord]
216-
vcf_parser_class = None
217-
if self._vcf_parser_type == VcfParserType.PYVCF:
218-
vcf_parser_class = vcf_parser.PyVcfParser
219-
elif self._vcf_parser_type == VcfParserType.NUCLEUS:
220-
vcf_parser_class = vcf_parser.NucleusParser
221-
else:
222-
raise ValueError(
223-
'Unrecognized _vcf_parser_type: %s.' % str(self._vcf_parser_type))
226+
vcf_parser_class = get_vcf_parser(self._vcf_parser_type)
224227
record_iterator = vcf_parser_class(
225228
file_name,
226229
range_tracker,
@@ -235,7 +238,8 @@ def read_records(self,
235238
for record in record_iterator:
236239
yield record
237240

238-
class ReadFromVcf(PTransform):
241+
242+
class ReadFromVcf(transforms.PTransform):
239243
"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading VCF
240244
files.
241245
@@ -249,7 +253,7 @@ def __init__(
249253
self,
250254
file_pattern=None, # type: str
251255
representative_header_lines=None, # type: List[str]
252-
compression_type=CompressionTypes.AUTO, # type: str
256+
compression_type=filesystem.CompressionTypes.AUTO, # type: str
253257
validate=True, # type: bool
254258
allow_malformed_records=False, # type: bool
255259
vcf_parser_type=VcfParserType.PYVCF, # type: int
@@ -280,7 +284,7 @@ def __init__(
280284
vcf_parser_type=vcf_parser_type)
281285

282286
def expand(self, pvalue):
283-
return pvalue.pipeline | Read(self._source)
287+
return pvalue.pipeline | iobase.Read(self._source)
284288

285289

286290
def _create_vcf_source(
@@ -292,7 +296,7 @@ def _create_vcf_source(
292296
allow_malformed_records=allow_malformed_records)
293297

294298

295-
class ReadAllFromVcf(PTransform):
299+
class ReadAllFromVcf(transforms.PTransform):
296300
"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading a
297301
:class:`~apache_beam.pvalue.PCollection` of VCF files.
298302
@@ -310,7 +314,7 @@ def __init__(
310314
self,
311315
representative_header_lines=None, # type: List[str]
312316
desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, # type: int
313-
compression_type=CompressionTypes.AUTO, # type: str
317+
compression_type=filesystem.CompressionTypes.AUTO, # type: str
314318
allow_malformed_records=False, # type: bool
315319
**kwargs # type: **str
316320
):
@@ -339,21 +343,21 @@ def __init__(
339343
allow_malformed_records=allow_malformed_records)
340344
self._read_all_files = filebasedsource.ReadAllFiles(
341345
True, # splittable
342-
CompressionTypes.AUTO, desired_bundle_size,
346+
filesystem.CompressionTypes.AUTO, desired_bundle_size,
343347
0, # min_bundle_size
344348
source_from_file)
345349

346350
def expand(self, pvalue):
347351
return pvalue | 'ReadAllFiles' >> self._read_all_files
348352

349353

350-
class WriteToVcf(PTransform):
354+
class WriteToVcf(transforms.PTransform):
351355
"""A PTransform for writing to VCF files."""
352356

353357
def __init__(self,
354358
file_path,
355359
num_shards=1,
356-
compression_type=CompressionTypes.AUTO,
360+
compression_type=filesystem.CompressionTypes.AUTO,
357361
headers=None):
358362
# type: (str, int, str, List[str]) -> None
359363
"""Initialize a WriteToVcf PTransform.
@@ -404,7 +408,7 @@ def process(self, (file_path, variants), *args, **kwargs):
404408
file_to_write.write(self._coder.encode(variant))
405409

406410

407-
class WriteVcfDataLines(PTransform):
411+
class WriteVcfDataLines(transforms.PTransform):
408412
"""A PTransform for writing VCF data lines.
409413
410414
This PTransform takes PCollection<`file_path`, `variants`> as input, and

0 commit comments

Comments
 (0)