diff --git a/.gitignore b/.gitignore index 0a3b76e..ed50a40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ ### Mac ### .DS_Store **/.DS_Store + +### compiled python ### +python/*.pyc +python3/__pycache__ diff --git a/README.md b/README.md index 7c263cc..3e2d8e3 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,55 @@ Both Python 2 and Python 3 scripts are available. The Python 2 scripts can be f To run these scripts you will need to have Python installed. You can download either Python 2 or Python 3 from [here](https://www.python.org/downloads/). If you already have Python installed, you can find out which version when you start the python interpreter. If using Python 2, we suggest you upgrade to the latest version if you don't already have it: 2.7. +Note that EBI now uses HTTPS servers. This can create a problem when using Python 3 on a Mac due to an oft-missed +installation step. Please run the "Install Certificates.command" command to ensure your Python 3 installation on +the Mac can correctly authenticate against the servers. To do this, run the following from a terminal window, updating +the Python version with the correct version of Python 3 that you have installed: +open "/Applications/Python 3.6/Install Certificates.command" + +We have had a report from a user than when Python 3 was installed using homebrew, the following code needed to be run instead: +``` +# install_certifi.py +# +# sample script to install or update a set of default Root Certificates +# for the ssl module. Uses the certificates provided by the certifi package: +# https://pypi.python.org/pypi/certifi + +import os +import os.path +import ssl +import stat +import subprocess +import sys + +STAT_0o775 = ( stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR + | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP + | stat.S_IROTH | stat.S_IXOTH ) + +openssl_dir, openssl_cafile = os.path.split( + ssl.get_default_verify_paths().openssl_cafile) + +print(" -- pip install --upgrade certifi") +subprocess.check_call([sys.executable, + "-E", "-s", "-m", "pip", "install", "--upgrade", "certifi"]) + +import certifi + +# change working directory to the default SSL directory +os.chdir(openssl_dir) +relpath_to_certifi_cafile = os.path.relpath(certifi.where()) +print(" -- removing any existing file or link") +try: + os.remove(openssl_cafile) +except FileNotFoundError: + pass +print(" -- creating symlink to certifi certificate bundle") +os.symlink(relpath_to_certifi_cafile, openssl_cafile) +print(" -- setting permissions") +os.chmod(openssl_cafile, STAT_0o775) +print(" -- update complete") +``` + ## Installing and running the scripts Download the [latest release](https://github.com/enasequence/enaBrowserTools/releases/latest) and extract it to the preferred location on your computer. You will now have the enaBrowserTools folder, containing both the python 2 and 3 option scripts. If you are using a Unix/Linux or Mac computer, we suggest you add the following aliases to your .bashrc or .bash_profile file. Where INSTALLATION_DIR is the location where you have saved the enaBrowserTools to and PYTHON_CHOICE will depend on whether you are using the Python 2 or Python 3 scripts. @@ -107,12 +156,16 @@ optional arguments: File format required. Format requested must be permitted for data type selected. sequence, assembly and wgs accessions: embl(default) and fasta formats. - read group: submitted, fastq and sra - formats. analysis group: submitted only. + read group: submitted, fastq and sra formats. analysis + group: submitted only. -d DEST, --dest DEST Destination directory (default is current running directory) -w, --wgs Download WGS set for each assembly if available (default is false) + -e, --extract-wgs Extract WGS scaffolds for each assembly if available + (default is false) + -exp, --expanded Expand CON scaffolds when downloading embl format + (default is false) -m, --meta Download read or analysis XML in addition to data files (default is false) -i, --index Download CRAM index files with submitted CRAM files, @@ -141,7 +194,7 @@ usage: enaGroupGet [-h] [-g {sequence,wgs,assembly,read,analysis}] [-i] [-a] [-as ASPERA_SETTINGS] [-t] [-v] accession -Download data for a given study or sample +Download data for a given study or sample, or (for sequence and assembly) taxon positional arguments: accession Study or sample accession or NCBI tax ID to fetch data @@ -162,6 +215,10 @@ optional arguments: directory) -w, --wgs Download WGS set for each assembly if available (default is false) + -e, --extract-wgs Extract WGS scaffolds for each assembly if available + (default is false) + -exp, --expanded Expand CON scaffolds when downloading embl format + (default is false) -m, --meta Download read or analysis XML in addition to data files (default is false) -i, --index Download CRAM index files with submitted CRAM files, @@ -180,10 +237,10 @@ optional arguments: # Tips -From version 1.4, when downloading read data if you use the default format (that is, don't use the format option), the scripts will look for available files in the following priority: submitted, sra, fastq. +From version 1.4, when downloading read data if you use the default format (that is, don't use the format option), the scripts will look for available files in the following priority: submitted, sra, fastq. A word of advice for read formats: -- submitted: only read data submitted to ENA have files available as submitted by the user. +- submitted: only read data submitted to ENA have files available as submitted by the user. - sra: this is the NCBI SRA format, and is the format in which all NCBI/DDBJ data is mirrored to ENA. - fastq: not all submitted format files can be converted to FASTQ diff --git a/python/__init__.py b/python/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/assemblyGet.py b/python/assemblyGet.py index 2fb8f1a..792d87b 100644 --- a/python/assemblyGet.py +++ b/python/assemblyGet.py @@ -20,6 +20,7 @@ import os import sys import argparse +import gzip import xml.etree.ElementTree as ElementTree import utils @@ -31,7 +32,7 @@ PATCH = 'patch' def check_format(output_format): - if format not in [utils.EMBL_FORMAT, utils.FASTA_FORMAT]: + if output_format not in [utils.EMBL_FORMAT, utils.FASTA_FORMAT]: sys.stderr.write( 'ERROR: Invalid format. Please select a valid format for this accession: {0}\n'.format([utils.EMBL_FORMAT, utils.FASTA_FORMAT]) ) @@ -69,31 +70,72 @@ def parse_sequence_report(local_sequence_report): patch_list = [l.split('\t')[0] for l in lines[1:] if PATCH in l.split('\t')[3]] return (replicon_list, unlocalised_list, unplaced_list, patch_list) -def download_sequence_set(accession_list, mol_type, assembly_dir, output_format, quiet): +def extract_wgs_sequences(accession_list): + wgs_sequences = [a for a in accession_list if utils.is_wgs_sequence(a)] + other_sequences = [a for a in accession_list if not utils.is_wgs_sequence(a)] + return wgs_sequences, other_sequences + +def download_sequence_set(accession_list, mol_type, assembly_dir, output_format, expanded, quiet): failed_accessions = [] - if len(accession_list) > 0: + count = 0 + sequence_cnt = len(accession_list) + divisor = utils.get_divisor(sequence_cnt) + if sequence_cnt > 0: if not quiet: - print 'fetching sequences: ' + mol_type - target_file = os.path.join(assembly_dir, utils.get_filename(mol_type, output_format)) + print 'fetching {0} sequences: {1}'.format(sequence_cnt, mol_type) + target_file_path = os.path.join(assembly_dir, utils.get_filename(mol_type, output_format)) + target_file = open(target_file_path, 'w') for accession in accession_list: - success = sequenceGet.append_record(target_file, accession, output_format) + success = sequenceGet.write_record(target_file, accession, output_format, expanded) if not success: failed_accessions.append(accession) + else: + count += 1 + if count % divisor == 0 and not quiet: + print 'downloaded {0} of {1} sequences'.format(count, sequence_cnt) + if not quiet: + print 'downloaded {0} of {1} sequences'.format(count, sequence_cnt) + target_file.close() elif not quiet: print 'no sequences: ' + mol_type if len(failed_accessions) > 0: - print 'Failed to fetch following ' + mol_type + ', format ' + output_format - print failed_accessions.join(',') + print 'Failed to fetch following {0}, format {1}'.format(mol_type, output_format) + print ','.join(failed_accessions) -def download_sequences(sequence_report, assembly_dir, output_format, quiet): +def download_sequences(sequence_report, assembly_dir, output_format, expanded, quiet): local_sequence_report = os.path.join(assembly_dir, sequence_report) replicon_list, unlocalised_list, unplaced_list, patch_list = parse_sequence_report(local_sequence_report) - download_sequence_set(replicon_list, REPLICON, assembly_dir, output_format, quiet) - download_sequence_set(unlocalised_list, UNLOCALISED, assembly_dir, output_format, quiet) - download_sequence_set(unplaced_list, UNPLACED, assembly_dir, output_format, quiet) - download_sequence_set(patch_list, PATCH, assembly_dir, output_format, quiet) + wgs_scaffolds, other_unlocalised = extract_wgs_sequences(unlocalised_list) + wgs_unplaced, other_unplaced = extract_wgs_sequences(unplaced_list) + download_sequence_set(replicon_list, REPLICON, assembly_dir, output_format, expanded, quiet) + download_sequence_set(other_unlocalised, UNLOCALISED, assembly_dir, output_format, expanded, quiet) + download_sequence_set(other_unplaced, UNPLACED, assembly_dir, output_format, expanded, quiet) + download_sequence_set(patch_list, PATCH, assembly_dir, output_format, expanded, quiet) + wgs_scaffolds.extend(wgs_unplaced) + return wgs_scaffolds -def download_assembly(dest_dir, accession, output_format, fetch_wgs, quiet=False): +def extract_wgs_scaffolds(assembly_dir, wgs_scaffolds, wgs_set, output_format, quiet): + if not quiet: + print 'extracting {0} WGS scaffolds from WGS set file'.format(len(wgs_scaffolds)) + accs = [a.split('.')[0] for a in wgs_scaffolds] + wgs_file_path = os.path.join(assembly_dir, wgs_set + utils.get_wgs_file_ext(output_format)) + target_file_path = os.path.join(assembly_dir, utils.get_filename('wgs_scaffolds', output_format)) + write_line = False + target_file = open(target_file_path, 'w') + with gzip.open(wgs_file_path, 'rb') as f: + for line in f: + if utils.record_start_line(line, output_format): + if utils.extract_acc_from_line(line, output_format) in accs: + write_line = True + else: + write_line = False + target_file.flush() + if write_line: + target_file.write(line) + target_file.flush() + target_file.close() + +def download_assembly(dest_dir, accession, output_format, fetch_wgs, extract_wgs, expanded, quiet=False, handler=None): if output_format is None: output_format = utils.EMBL_FORMAT assembly_dir = os.path.join(dest_dir, accession) @@ -106,12 +148,24 @@ def download_assembly(dest_dir, accession, output_format, fetch_wgs, quiet=False has_sequence_report = False # download sequence report if sequence_report is not None: - has_sequence_report = utils.get_ftp_file(sequence_report, assembly_dir) + has_sequence_report = utils.get_ftp_file(sequence_report, assembly_dir, handler) + # parse sequence report and download sequences + wgs_scaffolds = [] + wgs_scaffold_cnt = 0 + if has_sequence_report: + wgs_scaffolds = download_sequences(sequence_report.split('/')[-1], assembly_dir, output_format, expanded, quiet) + wgs_scaffold_cnt = len(wgs_scaffolds) + if wgs_scaffold_cnt > 0: + if not quiet: + print 'Assembly contains {} WGS scaffolds, will fetch WGS set'.format(wgs_scaffold_cnt) + fetch_wgs = True + else: + fetch_wgs = True # download wgs set if needed if wgs_set is not None and fetch_wgs: if not quiet: print 'fetching wgs set' - sequenceGet.download_wgs(assembly_dir, wgs_set, output_format) - # parse sequence report and download sequences - if has_sequence_report: - download_sequences(sequence_report.split('/')[-1], assembly_dir, output_format, quiet) + sequenceGet.download_wgs(assembly_dir, wgs_set, output_format, handler) + # extract wgs scaffolds from WGS file + if wgs_scaffold_cnt > 0 and extract_wgs: + extract_wgs_scaffolds(assembly_dir, wgs_scaffolds, wgs_set, output_format, quiet) diff --git a/python/enaDataGet.py b/python/enaDataGet.py index 2e04681..ab2ba8f 100644 --- a/python/enaDataGet.py +++ b/python/enaDataGet.py @@ -25,6 +25,7 @@ import assemblyGet import readGet import utils +import traceback def set_parser(): parser = argparse.ArgumentParser(prog='enaDataGet', @@ -40,6 +41,10 @@ def set_parser(): help='Destination directory (default is current running directory)') parser.add_argument('-w', '--wgs', action='store_true', help='Download WGS set for each assembly if available (default is false)') + parser.add_argument('-e', '--extract-wgs', action='store_true', + help='Extract WGS scaffolds for each assembly if available (default is false)') + parser.add_argument('-exp', '--expanded', action='store_true', + help='Expand CON scaffolds when downloading embl format (default is false)') parser.add_argument('-m', '--meta', action='store_true', help='Download read or analysis XML in addition to data files (default is false)') parser.add_argument('-i', '--index', action='store_true', @@ -50,7 +55,10 @@ def set_parser(): parser.add_argument('-as', '--aspera-settings', default=None, help="""Use the provided settings file, will otherwise check for environment variable or default settings file location.""") - parser.add_argument('-v', '--version', action='version', version='%(prog)s 1.4.1') + parser.add_argument('-r', '--redirect-handler', default=None, + choices=['queue', 'file'], + help="""File download progress handler. Specify an output handler to process the download progress. Default is no handler (output is printed to stdout). 'queue' redirects all output to a queue handler, such as RabbitMQ. 'file' redirects to a file handle (default is [current_file_download.log]).""") + parser.add_argument('-v', '--version', action='version', version='%(prog)s 1.5.3') return parser @@ -62,42 +70,51 @@ def set_parser(): output_format = args.format dest_dir = args.dest fetch_wgs = args.wgs + extract_wgs = args.extract_wgs + expanded = args.expanded fetch_meta = args.meta fetch_index = args.index aspera = args.aspera aspera_settings = args.aspera_settings + handler = args.redirect_handler if aspera or aspera_settings is not None: aspera = utils.set_aspera(aspera_settings) try: if utils.is_wgs_set(accession): + print("Downloading WGS set") if output_format is not None: sequenceGet.check_format(output_format) - sequenceGet.download_wgs(dest_dir, accession, output_format) + sequenceGet.download_wgs(dest_dir, accession, output_format, handler) elif not utils.is_available(accession): sys.stderr.write('ERROR: Record does not exist or is not available for accession provided\n') sys.exit(1) elif utils.is_sequence(accession): + print("Downloading sequence(s)") if output_format is not None: sequenceGet.check_format(output_format) - sequenceGet.download_sequence(dest_dir, accession, output_format) + sequenceGet.download_sequence(dest_dir, accession, output_format, expanded, handler) elif utils.is_analysis(accession): + print("Downloading analysis") if output_format is not None: readGet.check_read_format(output_format) - readGet.download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, aspera) + readGet.download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, aspera, handler) elif utils.is_run(accession) or utils.is_experiment(accession): + print("Downloading reads") if output_format is not None: readGet.check_read_format(output_format) - readGet.download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, aspera) + readGet.download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, aspera, handler) elif utils.is_assembly(accession): + print("Downloading assembly") if output_format is not None: assemblyGet.check_format(output_format) - assemblyGet.download_assembly(dest_dir, accession, output_format, fetch_wgs) + assemblyGet.download_assembly(dest_dir, accession, output_format, fetch_wgs, extract_wgs, expanded, handler) else: sys.stderr.write('ERROR: Invalid accession provided\n') sys.exit(1) print 'Completed' except Exception: + traceback.print_exc() utils.print_error() sys.exit(1) diff --git a/python/enaGroupGet.py b/python/enaGroupGet.py index 4a5762f..53d59bd 100644 --- a/python/enaGroupGet.py +++ b/python/enaGroupGet.py @@ -25,10 +25,11 @@ import assemblyGet import readGet import utils +import traceback def set_parser(): parser = argparse.ArgumentParser(prog='enaGroupGet', - description = 'Download data for a given study or sample') + description = 'Download data for a given study or sample, or (for sequence and assembly) taxon') parser.add_argument('accession', help='Study or sample accession or NCBI tax ID to fetch data for') parser.add_argument('-g', '--group', default='read', choices=['sequence', 'wgs', 'assembly', 'read', 'analysis'], @@ -42,6 +43,10 @@ def set_parser(): help='Destination directory (default is current running directory)') parser.add_argument('-w', '--wgs', action='store_true', help='Download WGS set for each assembly if available (default is false)') + parser.add_argument('-e', '--extract-wgs', action='store_true', + help='Extract WGS scaffolds for each assembly if available (default is false)') + parser.add_argument('-exp', '--expanded', action='store_true', + help='Expand CON scaffolds when downloading embl format (default is false)') parser.add_argument('-m', '--meta', action='store_true', help='Download read or analysis XML in addition to data files (default is false)') parser.add_argument('-i', '--index', action='store_true', @@ -54,7 +59,10 @@ def set_parser(): for environment variable or default settings file location.""") parser.add_argument('-t', '--subtree', action='store_true', help='Include subordinate taxa (taxon subtree) when querying with NCBI tax ID (default is false)') - parser.add_argument('-v', '--version', action='version', version='%(prog)s 1.4.1') + parser.add_argument('-r', '--redirect-handler', default=None, + choices=['queue', 'file'], + help="""File download progress handler. Specify an output handler to process the download progress. Default is no handler (output is printed to stdout). 'queue' redirects all output to a queue handler, such as RabbitMQ. 'file' redirects to a file handle (default is [current_file_download.log]).""") + parser.add_argument('-v', '--version', action='version', version='%(prog)s 1.5.3') return parser def download_report(group, result, accession, temp_file, subtree): @@ -66,77 +74,71 @@ def download_report(group, result, accession, temp_file, subtree): f.flush() f.close() -def download_data(group, data_accession, output_format, group_dir, fetch_wgs, fetch_meta, fetch_index, aspera): +def download_data(group, data_accession, output_format, group_dir, fetch_wgs, extract_wgs, expanded, fetch_meta, fetch_index, aspera, handler=None): if group == utils.WGS: print 'Fetching ' + data_accession[:6] - if aspera: - print 'Aspera not supported for WGS data. Using FTP...' - sequenceGet.download_wgs(group_dir, data_accession[:6], output_format) + sequenceGet.download_wgs(group_dir, data_accession[:6], output_format, handler) else: print 'Fetching ' + data_accession if group == utils.ASSEMBLY: - if aspera: - print 'Aspera not supported for assembly data. Using FTP...' - assemblyGet.download_assembly(group_dir, data_accession, output_format, fetch_wgs, True) + assemblyGet.download_assembly(group_dir, data_accession, output_format, fetch_wgs, extract_wgs, expanded, True, handler) elif group in [utils.READ, utils.ANALYSIS]: - readGet.download_files(data_accession, output_format, group_dir, fetch_index, fetch_meta, aspera) + readGet.download_files(data_accession, output_format, group_dir, fetch_index, fetch_meta, aspera, handler) -def download_data_group(group, accession, output_format, group_dir, fetch_wgs, fetch_meta, fetch_index, aspera, subtree): - temp_file = os.path.join(group_dir, accession + '_temp.txt') - download_report(group, utils.get_group_result(group), accession, temp_file, subtree) - f = open(temp_file) +def download_data_group(group, accession, output_format, group_dir, fetch_wgs, extract_wgs, fetch_meta, fetch_index, aspera, subtree, expanded, handler=None): + temp_file_path = os.path.join(group_dir, accession + '_temp.txt') + download_report(group, utils.get_group_result(group), accession, temp_file_path, subtree) header = True - for line in f: - if header: - header = False - continue - data_accession = line.strip() - download_data(group, data_accession, output_format, group_dir, fetch_wgs, fetch_meta, fetch_index, aspera) - f.close() - os.remove(temp_file) - -def download_sequence_group(accession, output_format, group_dir, subtree): + with open(temp_file_path) as f: + for line in f: + if header: + header = False + continue + data_accession = line.strip() + download_data(group, data_accession, output_format, group_dir, fetch_wgs, extract_wgs, expanded, fetch_meta, fetch_index, aspera, handler) + os.remove(temp_file_path) + +def download_sequence_result(dest_file, group_dir, result, accession, subtree, update_accs, expanded): + temp_file_path = os.path.join(group_dir, 'temp.txt') + download_report(utils.SEQUENCE, result, accession, temp_file_path, subtree) + header = True + with open(temp_file_path) as f: + for line in f: + if header: + header = False + continue + data_accession = line.strip() + write_record = False + if result == utils.SEQUENCE_UPDATE_RESULT: + update_accs.append(data_accession) + write_record = True + elif result == utils.SEQUENCE_RELEASE_RESULT: + if data_accession not in update_accs: + write_record = True + if write_record: + sequenceGet.write_record(dest_file, data_accession, output_format, expanded) + dest_file.flush() + os.remove(temp_file_path) + return update_accs + +def download_sequence_group(accession, output_format, group_dir, subtree, expanded): print 'Downloading sequences' update_accs = [] - dest_file = os.path.join(group_dir, utils.get_filename(accession + '_sequences', output_format)) + dest_file_path = os.path.join(group_dir, utils.get_filename(accession + '_sequences', output_format)) + dest_file = open(dest_file_path, 'w') #sequence update - temp_file = os.path.join(group_dir, 'temp.txt') - download_report(utils.SEQUENCE, utils.SEQUENCE_UPDATE_RESULT, accession, temp_file, subtree) - f = open(temp_file) - header = True - for line in f: - if header: - header = False - continue - data_accession = line.strip() - update_accs.append(data_accession) - sequenceGet.append_record(dest_file, data_accession, output_format) - f.close() - os.remove(temp_file) + update_accs = download_sequence_result(dest_file, group_dir, utils.SEQUENCE_UPDATE_RESULT, accession, subtree, update_accs, expanded) #sequence release - temp_file = os.path.join(group_dir, 'temp.txt') - download_report(utils.SEQUENCE, utils.SEQUENCE_RELEASE_RESULT, accession, temp_file, subtree) - f = open(temp_file) - header = True - for line in f: - if header: - header = False - continue - data_accession = line.strip() - if data_accession not in update_accs: - sequenceGet.append_record(dest_file, data_accession, output_format) - f.close() - os.remove(temp_file) + update_accs = download_sequence_result(dest_file, group_dir, utils.SEQUENCE_RELEASE_RESULT, accession, subtree, update_accs, expanded) + dest_file.close() -def download_group(accession, group, output_format, dest_dir, fetch_wgs, fetch_meta, fetch_index, aspera, subtree): +def download_group(accession, group, output_format, dest_dir, fetch_wgs, extract_wgs, fetch_meta, fetch_index, aspera, subtree, expanded, handler=None): group_dir = os.path.join(dest_dir, accession) utils.create_dir(group_dir) if group == utils.SEQUENCE: - if aspera: - print 'Aspera not supported for sequence downloads. Using FTP...' - download_sequence_group(accession, output_format, group_dir, subtree) + download_sequence_group(accession, output_format, group_dir, subtree, expanded) else: - download_data_group(group, accession, output_format, group_dir, fetch_wgs, fetch_meta, fetch_index, aspera, subtree) + download_data_group(group, accession, output_format, group_dir, fetch_wgs, extract_wgs, fetch_meta, fetch_index, aspera, subtree, expanded, handler) if __name__ == '__main__': @@ -148,11 +150,14 @@ def download_group(accession, group, output_format, dest_dir, fetch_wgs, fetch_m output_format = args.format dest_dir = args.dest fetch_wgs = args.wgs + extract_wgs = args.extract_wgs + expanded = args.expanded fetch_meta = args.meta fetch_index = args.index aspera = args.aspera aspera_settings = args.aspera_settings subtree = args.subtree + handler = args.redirect_handler if aspera or aspera_settings is not None: aspera = utils.set_aspera(aspera_settings) @@ -185,8 +190,9 @@ def download_group(accession, group, output_format, dest_dir, fetch_wgs, fetch_m if utils.is_taxid(accession) and group in ['read', 'analysis']: print 'Sorry, tax ID retrieval not yet supported for read and analysis' sys.exit(1) - download_group(accession, group, output_format, dest_dir, fetch_wgs, fetch_meta, fetch_index, aspera, subtree) + download_group(accession, group, output_format, dest_dir, fetch_wgs, extract_wgs, fetch_meta, fetch_index, aspera, subtree, expanded, handler) print 'Completed' except Exception: + traceback.print_exc() utils.print_error() sys.exit(1) diff --git a/python/readGet.py b/python/readGet.py index 8e505d7..8d67ba3 100644 --- a/python/readGet.py +++ b/python/readGet.py @@ -39,11 +39,11 @@ def check_analysis_format(output_format): ) sys.exit(1) -def attempt_file_download(file_url, dest_dir, md5, aspera): +def attempt_file_download(file_url, dest_dir, md5, aspera, handler=None): if md5 is not None: print 'Downloading file with md5 check:' + file_url if aspera: - return utils.get_aspera_file_with_md5_check(file_url, dest_dir, md5) + return utils.get_aspera_file_with_md5_check(file_url, dest_dir, md5, handler) else: return utils.get_ftp_file_with_md5_check('ftp://' + file_url, dest_dir, md5) print 'Downloading file:' + file_url @@ -51,12 +51,12 @@ def attempt_file_download(file_url, dest_dir, md5, aspera): return utils.get_aspera_file(file_url, dest_dir) return utils.get_ftp_file('ftp://' + file_url, dest_dir) -def download_file(file_url, dest_dir, md5, aspera): +def download_file(file_url, dest_dir, md5, aspera, handler=None): if utils.file_exists(file_url, dest_dir, md5): return - success = attempt_file_download(file_url, dest_dir, md5, aspera) + success = attempt_file_download(file_url, dest_dir, md5, aspera, handler) if not success: - success = attempt_file_download(file_url, dest_dir, md5, aspera) + success = attempt_file_download(file_url, dest_dir, md5, aspera, handler) if not success: print 'Failed to download file after two attempts' @@ -75,7 +75,7 @@ def download_experiment_meta(run_accession, dest_dir): break download_meta(experiment_accession, dest_dir) -def download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, aspera): +def download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, aspera, handler=None): accession_dir = os.path.join(dest_dir, accession) utils.create_dir(accession_dir) # download experiment xml @@ -116,11 +116,11 @@ def download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, file_url = filelist[i] md5 = md5list[i] if file_url != '': - download_file(file_url, target_dir, md5, aspera) + download_file(file_url, target_dir, md5, aspera, handler) if fetch_index: for index_file in indexlist: if index_file != '': - download_file(index_file, target_dir, None, aspera) + download_file(index_file, target_dir, None, aspera, handler) if utils.is_empty_dir(target_dir): print 'Deleting directory ' + os.path.basename(target_dir) os.rmdir(target_dir) diff --git a/python/sequenceGet.py b/python/sequenceGet.py index 37df7a5..5941778 100644 --- a/python/sequenceGet.py +++ b/python/sequenceGet.py @@ -22,48 +22,50 @@ import utils -def append_record(dest_file, accession, output_format): +def write_record(dest_file, accession, output_format, expanded=False): url = utils.get_record_url(accession, output_format) - return utils.append_record(url, dest_file) + if expanded: + url = url + '&expanded=true' + return utils.write_record(url, dest_file) -def download_sequence(dest_dir, accession, output_format): +def download_sequence(dest_dir, accession, output_format, expanded, handler=None): if output_format is None: output_format = utils.EMBL_FORMAT - success = utils.download_record(dest_dir, accession, output_format) + success = utils.download_record(dest_dir, accession, output_format, expanded, handler) if not success: print 'Unable to fetch file for {0}, format {1}'.format(accession, output_format) return success -def download_wgs(dest_dir, accession, output_format): +def download_wgs(dest_dir, accession, output_format, handler=None): if utils.is_unversioned_wgs_set(accession): - return download_unversioned_wgs(dest_dir, accession, output_format) + return download_unversioned_wgs(dest_dir, accession, output_format, handler) else: - return download_versioned_wgs(dest_dir, accession, output_format) + return download_versioned_wgs(dest_dir, accession, output_format, handler) -def download_versioned_wgs(dest_dir, accession, output_format): +def download_versioned_wgs(dest_dir, accession, output_format, handler=None): prefix = accession[:6] if output_format is None: output_format = utils.EMBL_FORMAT public_set_url = utils.get_wgs_ftp_url(prefix, utils.PUBLIC, output_format) supp_set_url = utils.get_wgs_ftp_url(prefix, utils.SUPPRESSED, output_format) - success = utils.get_ftp_file(public_set_url, dest_dir) + success = utils.get_ftp_file(public_set_url, dest_dir, handler) if not success: - success = utils.get_ftp_file(supp_set_url, dest_dir) + success = utils.get_ftp_file(supp_set_url, dest_dir, handler) if not success: print 'No WGS set file available for {0}, format {1}'.format(accession, output_format) print 'Please contact ENA (datasubs@ebi.ac.uk) if you feel this set should be available' -def download_unversioned_wgs(dest_dir, accession, output_format): +def download_unversioned_wgs(dest_dir, accession, output_format, handler=None): prefix = accession[:4] if output_format is None: output_format = utils.EMBL_FORMAT public_set_url = utils.get_nonversioned_wgs_ftp_url(prefix, utils.PUBLIC, output_format) if public_set_url is not None: - utils.get_ftp_file(public_set_url, dest_dir) + utils.get_ftp_file(public_set_url, dest_dir, handler) else: supp_set_url = utils.get_nonversioned_wgs_ftp_url(prefix, utils.SUPPRESSED, output_format) if supp_set_url is not None: - utils.get_ftp_file(supp_set_url, dest_dir) + utils.get_ftp_file(supp_set_url, dest_dir, handler) else: print 'No WGS set file available for {0}, format {1}'.format(accession, output_format) print 'Please contact ENA (datasubs@ebi.ac.uk) if you feel this set should be available' diff --git a/python/utils.py b/python/utils.py index 7d85652..73e91ef 100644 --- a/python/utils.py +++ b/python/utils.py @@ -22,11 +22,16 @@ import hashlib import re import os -import subprocess import sys import urllib import urllib2 +import pexpect +import time +import shlex +import json +from datetime import datetime import xml.etree.ElementTree as ElementTree +from subprocess import call, Popen, PIPE, STDOUT from ConfigParser import SafeConfigParser @@ -35,9 +40,6 @@ ASPERA_OPTIONS = '' # set any extra aspera options ASPERA_SPEED = '100M' # set aspera download speed -ANON_USER = 'anon' -ANON_PWD = 'anon' - SUPPRESSED = 'suppressed' PUBLIC = 'public' @@ -106,7 +108,7 @@ sequence_pattern_1 = re.compile('^[A-Z]{1}[0-9]{5}(\.[0-9]+)?$') sequence_pattern_2 = re.compile('^[A-Z]{2}[0-9]{6}(\.[0-9]+)?$') -sequence_pattern_3 = re.compile('^[A-Z]{4}[0-9]{8,9}(\.[0-9]+)?$') +wgs_sequence_pattern = re.compile('^[A-Z]{4}[0-9]{8,9}(\.[0-9]+)?$') coding_pattern = re.compile('^[A-Z]{3}[0-9]{5}(\.[0-9]+)?$') wgs_prefix_pattern = re.compile('^[A-Z]{4}[0-9]{2}$') wgs_master_pattern = re.compile('^[A-Z]{4}[0-9]{2}[0]{6}$') @@ -134,8 +136,10 @@ def is_available(accession): return (not 'entry is not found' in record.text) and (len(record.getchildren()) > 0) def is_sequence(accession): - return sequence_pattern_1.match(accession) or sequence_pattern_2.match(accession) \ - or sequence_pattern_3.match(accession) + return sequence_pattern_1.match(accession) or sequence_pattern_2.match(accession) + +def is_wgs_sequence(accession): + return wgs_sequence_pattern.match(accession) def is_coding(accession): return coding_pattern.match(accession) @@ -257,41 +261,46 @@ def get_destination_file(dest_dir, accession, output_format): def download_single_record(url, dest_file): urllib.urlretrieve(url, dest_file) -def download_record(dest_dir, accession, output_format): +def download_record(dest_dir, accession, output_format, expanded=False): try: dest_file = get_destination_file(dest_dir, accession, output_format) url = get_record_url(accession, output_format) + if (expanded): + url = url + '&expanded=true' download_single_record(url, dest_file) return True except Exception: return False -def append_record(url, dest_file): +def write_record(url, dest_file): try: response = urllib2.urlopen(url) - f = open(dest_file, 'a') + linenum = 1 for line in response: - f.write(line) - f.flush() - f.close() + if linenum == 1 and line.startswith('Entry:'): + return False + dest_file.write(line) + linenum += 1 + dest_file.flush() return True except Exception: return False -def get_ftp_file(ftp_url, dest_dir): +def get_ftp_file(ftp_url, dest_dir, handler=None): try: filename = ftp_url.split('/')[-1] dest_file = os.path.join(dest_dir, filename) - urllib.urlretrieve(ftp_url, dest_file) + response = urllib.urlopen(ftp_url) + chunk_read(response, file_handle=dest_file, handler=handler) return True except Exception: return False -def get_aspera_file(aspera_url, dest_dir): +def get_aspera_file(aspera_url, dest_dir, handler=None): try: filename = aspera_url.split('/')[-1] dest_file = os.path.join(dest_dir, filename) - asperaretrieve(aspera_url, dest_dir, dest_file) + asperaretrieve(aspera_url, dest_dir, dest_file, handler=handler) return True except Exception: return False @@ -303,12 +312,14 @@ def get_md5(filepath): hash_md5.update(chunk) return hash_md5.hexdigest() -def check_md5(filepath, expected_md5): +def check_md5(filepath, expected_md5, handler): generated_md5 = get_md5(filepath) if expected_md5 != generated_md5: - print 'MD5 mismatch for downloaded file ' + filepath + '. Deleting file' - print ('generated md5', generated_md5) - print ('expected md5', expected_md5) + if not handler: + handler=sys.stdout.write + handler('MD5 mismatch for downloaded file ' + filepath + '. Deleting file') + handler('Generated md5: %s' % generated_md5) + handler('Expected md5: %s' % expected_md5) os.remove(filepath) return False return True @@ -323,23 +334,24 @@ def file_exists(file_url, dest_dir, md5): return True return False -def get_ftp_file_with_md5_check(ftp_url, dest_dir, md5): +def get_ftp_file_with_md5_check(ftp_url, dest_dir, md5, handler=None): try: filename = ftp_url.split('/')[-1] dest_file = os.path.join(dest_dir, filename) - urllib.urlretrieve(ftp_url, dest_file) - return check_md5(dest_file, md5) + response = urllib.urlopen(ftp_url) + chunk_read(response, file_handle=dest_file, handler=handler) + return check_md5(dest_file, md5, handler=handler) except Exception as e: sys.stderr.write("Error with FTP transfer: {0}\n".format(e)) return False -def get_aspera_file_with_md5_check(aspera_url, dest_dir, md5): +def get_aspera_file_with_md5_check(aspera_url, dest_dir, md5, handler=None): try: filename = aspera_url.split('/')[-1] dest_file = os.path.join(dest_dir, filename) - success = asperaretrieve(aspera_url, dest_dir, dest_file) + success = asperaretrieve(aspera_url, dest_dir, dest_file, handler=handler) if success: - return check_md5(dest_file, md5) + return check_md5(dest_file, md5, handler=handler) return success except Exception as e: sys.stderr.write("Error with Aspera transfer: {0}\n".format(e)) @@ -349,13 +361,22 @@ def set_aspera_variables(filepath): try: parser = SafeConfigParser() parser.read(filepath) + # set and check binary location global ASPERA_BIN ASPERA_BIN = parser.get('aspera', 'ASPERA_BIN') + if not os.path.exists(ASPERA_BIN): + print 'Aspera binary ({0}) does not exist. Defaulting to FTP transfer'.format(ASPERA_BIN) + return False + if not os.access(ASPERA_BIN, os.X_OK): + print 'You do not have permissions to execute the aspera binary ({0}). Defaulting to FTP transfer'.format(ASPERA_BIN) + return False + # set and check private key location global ASPERA_PRIVATE_KEY ASPERA_PRIVATE_KEY = parser.get('aspera', 'ASPERA_PRIVATE_KEY') if not os.path.exists(ASPERA_PRIVATE_KEY): print 'Private key file ({0}) does not exist. Defaulting to FTP transfer'.format(ASPERA_PRIVATE_KEY) return False + # set non-file variables global ASPERA_SPEED ASPERA_SPEED = parser.get('aspera', 'ASPERA_SPEED') global ASPERA_OPTIONS @@ -384,10 +405,8 @@ def set_aspera(aspera_filepath): aspera = False return aspera -def asperaretrieve(url, dest_dir, dest_file): +def asperaretrieve(url, dest_dir, dest_file, handler=None): try: - if not os.path.exists(ASPERA_BIN) or not os.path.exists(ASPERA_PRIVATE_KEY): - raise FileNotFoundError('Aspera not available. Check your ascp binary path and your private key file is specified correctly') logdir=os.path.abspath(os.path.join(dest_dir, "logs")) create_dir(logdir) aspera_line="{bin} -QT -L {logs} -l {speed} -P33001 {aspera} -i {key} era-fasp@{file} {outdir}" @@ -400,13 +419,72 @@ def asperaretrieve(url, dest_dir, dest_file): key=ASPERA_PRIVATE_KEY, speed=ASPERA_SPEED, ) - print aspera_line - subprocess.call(aspera_line, shell=True) # this blocks so we're fine to wait and return True + _do_aspera_pexpect(url, aspera_line, handler) return True except Exception as e: sys.stderr.write("Error with Aspera transfer: {0}\n".format(e)) return False +def _do_aspera_pexpect(url, cmd, handler): + thread = pexpect.spawn(cmd, timeout=None) + cpl = thread.compile_pattern_list([pexpect.EOF, '(.+)']) + started = 0 + + while True: + i = thread.expect_list(cpl, timeout=None) + if i == 0: + if started == 0: + if handler and callable(handler): + handler("Error initiating transfer") + else: + sys.stderr.write("Error initiating transfer") + break + elif i == 1: + started = 1 + output = dict() + pexp_match = thread.match.group(1) + prev_file = '' + tokens_to_match = ["Mb/s"] + units_to_match = ["KB", "MB", "GB"] + rates_to_match = ["Kb/s", "kb/s", "Mb/s", "mb/s", "Gb/s", "gb/s"] + end_of_transfer = False + if any(tm in pexp_match.decode("utf-8") for tm in tokens_to_match): + tokens = pexp_match.decode("utf-8").split(" ") + if 'ETA' in tokens: + pct_completed = [x for x in tokens if len(x) > 1 and x[-1] == '%'] + if pct_completed: + output["pct_completed"] = pct_completed[0][:-1] + + bytes_transferred = [x for x in tokens if len(x) > 2 and x[-2:] in units_to_match] + if bytes_transferred: + output["bytes_transferred"] = bytes_transferred[0] + + transfer_rate = [x for x in tokens if len(x) > 4 and x[-4:] in rates_to_match] + if transfer_rate: + output["transfer_rate"] = transfer_rate[0] + + output["url"] = url + + if handler and callable(handler): + handler(json.dumps(output)) + else: + sys.stdout.write("%s : %s%% transferred, %s, %s\n" % (output["url"], output["pct_completed"], output["bytes_transferred"], output["transfer_rate"])) + thread.close() + +def _do_aspera_transfer(cmd, handler): + p = Popen(shlex.split(cmd), + stdout=PIPE, + stderr=STDOUT, + bufsize=1) + with p.stdout: + for line in iter(p.stdout.readline, b''): + if handler and callable(handler): + handler(line) + else: + sys.stdout.write(line), + p.wait() + pass + def get_wgs_file_ext(output_format): if output_format == EMBL_FORMAT: return WGS_EMBL_EXT @@ -436,7 +514,6 @@ def get_nonversioned_wgs_ftp_url(wgs_set, status, output_format): def get_report_from_portal(url): request = urllib2.Request(url) - request.add_header('Authorization', b'Basic ' + base64.b64encode(ANON_USER + b':' + ANON_PWD)) return urllib2.urlopen(request) def download_report_from_portal(url, dest_file): @@ -580,4 +657,77 @@ def is_empty_dir(target_dir): def print_error(): sys.stderr.write('ERROR: Something unexpected went wrong please try again.\n') - sys.stderr.write('If problem persists, please contact datasubs@ebi.ac.uk for assistance.\n') + sys.stderr.write('If problem persists, please contact datasubs@ebi.ac.uk for assistance. with the above error details.\n') + +def chunk_report(f, bytes_so_far, chunk_size, total_size, handler=None): + global start_time + if bytes_so_far == chunk_size: + start_time = time.time() + return + duration = time.time() - start_time + progress_size = int(bytes_so_far) + speed = int(progress_size / ((1024 * 1024) * duration)) + percent = int(progress_size * 100 /total_size) + line = "\r%s: %0.f%%, %d MB, %d MB/s" % (f, percent, progress_size / (1024 * 1024), speed) + + if handler and callable(handler): + handler(line) + else: + sys.stdout.write(line) + + if bytes_so_far >= total_size: + if handler and callable(handler): + handler('\n') + else: + sys.stdout.write('\n') + +def chunk_read(response, file_handle, chunk_size=104857, tick_rate=10, report_hook=chunk_report, handler=None): + total_size = response.info().getheader('Content-Length').strip() + bytes_so_far = 0 + + with open(file_handle, 'wb') as f: + base = os.path.basename(file_handle) + if total_size is None: # cannot chunk - no content length + f.write(response.content) + else: + tick = 0 + total_size = int(total_size) + while 1: + chunk = response.read(chunk_size) + bytes_so_far += len(chunk) + + if not chunk: + break + + f.write(chunk) + + if report_hook: + if tick == 0 or tick % int(tick_rate) == 0: + tick = 0 + report_hook(base, bytes_so_far, chunk_size, total_size, handler) + tick += 1 + + return bytes_so_far + +def get_divisor(record_cnt): + if record_cnt <= 10000: + return 1000 + elif record_cnt <= 50000: + return 5000 + return 10000 + +def record_start_line(line, output_format): + if output_format == FASTA_FORMAT: + return line.startswith(b'>') + elif output_format == EMBL_FORMAT: + return line.startswith(b'ID ') + else: + return False + +def extract_acc_from_line(line, output_format): + if output_format == FASTA_FORMAT: + return line.split(b'|')[1] + elif output_format == EMBL_FORMAT: + return line.split()[1][:-1] + else: + return '' diff --git a/python3/assemblyGet.py b/python3/assemblyGet.py index 65de13d..4b28f0d 100644 --- a/python3/assemblyGet.py +++ b/python3/assemblyGet.py @@ -20,6 +20,7 @@ import os import sys import argparse +import gzip import xml.etree.ElementTree as ElementTree import utils @@ -70,31 +71,72 @@ def parse_sequence_report(local_sequence_report): patch_list = [l.split('\t')[0] for l in lines[1:] if PATCH in l.split('\t')[3]] return (replicon_list, unlocalised_list, unplaced_list, patch_list) -def download_sequence_set(accession_list, mol_type, assembly_dir, output_format, quiet): +def extract_wgs_sequences(accession_list): + wgs_sequences = [a for a in accession_list if utils.is_wgs_sequence(a)] + other_sequences = [a for a in accession_list if not utils.is_wgs_sequence(a)] + return wgs_sequences, other_sequences + +def download_sequence_set(accession_list, mol_type, assembly_dir, output_format, expanded, quiet): failed_accessions = [] - if len(accession_list) > 0: + count = 0 + sequence_cnt = len(accession_list) + divisor = utils.get_divisor(sequence_cnt) + if sequence_cnt > 0: if not quiet: - print ('fetching sequences: ' + mol_type) - target_file = os.path.join(assembly_dir, utils.get_filename(mol_type, output_format)) + print ('fetching {0} sequences: {1}'.format(sequence_cnt, mol_type)) + target_file_path = os.path.join(assembly_dir, utils.get_filename(mol_type, output_format)) + target_file = open(target_file_path, 'wb') for accession in accession_list: - success = sequenceGet.append_record(target_file, accession, output_format) + success = sequenceGet.write_record(target_file, accession, output_format, expanded) if not success: failed_accessions.append(accession) + else: + count += 1 + if count % divisor == 0 and not quiet: + print ('downloaded {0} of {1} sequences'.format(count, sequence_cnt)) + if not quiet: + print ('downloaded {0} of {1} sequences'.format(count, sequence_cnt)) + target_file.close() elif not quiet: print ('no sequences: ' + mol_type) if len(failed_accessions) > 0: print ('Failed to fetch following {0}, format {1}'.format(mol_type, output_format)) - print (failed_accessions.join(',')) + print (','.join(failed_accessions)) -def download_sequences(sequence_report, assembly_dir, output_format, quiet): +def download_sequences(sequence_report, assembly_dir, output_format, expanded, quiet): local_sequence_report = os.path.join(assembly_dir, sequence_report) replicon_list, unlocalised_list, unplaced_list, patch_list = parse_sequence_report(local_sequence_report) - download_sequence_set(replicon_list, REPLICON, assembly_dir, output_format, quiet) - download_sequence_set(unlocalised_list, UNLOCALISED, assembly_dir, output_format, quiet) - download_sequence_set(unplaced_list, UNPLACED, assembly_dir, output_format, quiet) - download_sequence_set(patch_list, PATCH, assembly_dir, output_format, quiet) + wgs_scaffolds, other_unlocalised = extract_wgs_sequences(unlocalised_list) + wgs_unplaced, other_unplaced = extract_wgs_sequences(unplaced_list) + download_sequence_set(replicon_list, REPLICON, assembly_dir, output_format, expanded, quiet) + download_sequence_set(other_unlocalised, UNLOCALISED, assembly_dir, output_format, expanded, quiet) + download_sequence_set(other_unplaced, UNPLACED, assembly_dir, output_format, expanded, quiet) + download_sequence_set(patch_list, PATCH, assembly_dir, output_format, expanded, quiet) + wgs_scaffolds.extend(wgs_unplaced) + return wgs_scaffolds -def download_assembly(dest_dir, accession, output_format, fetch_wgs, quiet=False): +def extract_wgs_scaffolds(assembly_dir, wgs_scaffolds, wgs_set, output_format, quiet): + if not quiet: + print ('extracting {0} WGS scaffolds from WGS set file'.format(len(wgs_scaffolds))) + accs = [a.split('.')[0] for a in wgs_scaffolds] + wgs_file_path = os.path.join(assembly_dir, wgs_set + utils.get_wgs_file_ext(output_format)) + target_file_path = os.path.join(assembly_dir, utils.get_filename('wgs_scaffolds', output_format)) + write_line = False + target_file = open(target_file_path, 'w') + with gzip.open(wgs_file_path, 'rb') as f: + for line in f: + if utils.record_start_line(line, output_format): + if utils.extract_acc_from_line(line, output_format) in accs: + write_line = True + else: + write_line = False + target_file.flush() + if write_line: + target_file.write(line) + target_file.flush() + target_file.close() + +def download_assembly(dest_dir, accession, output_format, fetch_wgs, extract_wgs, expanded, quiet=False, handler=None): if output_format is None: output_format = utils.EMBL_FORMAT assembly_dir = os.path.join(dest_dir, accession) @@ -107,12 +149,24 @@ def download_assembly(dest_dir, accession, output_format, fetch_wgs, quiet=False has_sequence_report = False # download sequence report if sequence_report is not None: - has_sequence_report = utils.get_ftp_file(sequence_report, assembly_dir) + has_sequence_report = utils.get_ftp_file(sequence_report, assembly_dir, handler) + # parse sequence report and download sequences + wgs_scaffolds = [] + wgs_scaffold_cnt = 0 + if has_sequence_report: + wgs_scaffolds = download_sequences(sequence_report.split('/')[-1], assembly_dir, output_format, expanded, quiet) + wgs_scaffold_cnt = len(wgs_scaffolds) + if wgs_scaffold_cnt > 0: + if not quiet: + print ('Assembly contains {} WGS scaffolds, will fetch WGS set'.format(wgs_scaffold_cnt)) + fetch_wgs = True + else: + fetch_wgs = True # download wgs set if needed if wgs_set is not None and fetch_wgs: if not quiet: print ('fetching wgs set') - sequenceGet.download_wgs(assembly_dir, wgs_set, output_format) - # parse sequence report and download sequences - if has_sequence_report: - download_sequences(sequence_report.split('/')[-1], assembly_dir, output_format, quiet) + sequenceGet.download_wgs(assembly_dir, wgs_set, output_format, handler) + # extract wgs scaffolds from WGS file + if wgs_scaffold_cnt > 0 and extract_wgs: + extract_wgs_scaffolds(assembly_dir, wgs_scaffolds, wgs_set, output_format, quiet) diff --git a/python3/enaDataGet.py b/python3/enaDataGet.py index fe2b66f..794c822 100644 --- a/python3/enaDataGet.py +++ b/python3/enaDataGet.py @@ -25,6 +25,7 @@ import assemblyGet import readGet import utils +import traceback def set_parser(): parser = argparse.ArgumentParser(prog='enaDataGet', @@ -40,6 +41,10 @@ def set_parser(): help='Destination directory (default is current running directory)') parser.add_argument('-w', '--wgs', action='store_true', help='Download WGS set for each assembly if available (default is false)') + parser.add_argument('-e', '--extract-wgs', action='store_true', + help='Extract WGS scaffolds for each assembly if available (default is false)') + parser.add_argument('-exp', '--expanded', action='store_true', + help='Expand CON scaffolds when downloading embl format (default is false)') parser.add_argument('-m', '--meta', action='store_true', help='Download read or analysis XML in addition to data files (default is false)') parser.add_argument('-i', '--index', action='store_true', @@ -50,7 +55,10 @@ def set_parser(): parser.add_argument('-as', '--aspera-settings', default=None, help="""Use the provided settings file, will otherwise check for environment variable or default settings file location.""") - parser.add_argument('-v', '--version', action='version', version='%(prog)s 1.4.1') + parser.add_argument('-r', '--redirect-handler', default=None, + choices=['stdout', 'file'], + help="""File download progress handler. Specify an output handler to process the download progress. Default is no handler (output is swallowed). 'stdout' redirects all output to standard out. 'file' redirects to a file handle (default is [current_file_download.log]).""") + parser.add_argument('-v', '--version', action='version', version='%(prog)s 1.5.3') return parser @@ -62,10 +70,13 @@ def set_parser(): output_format = args.format dest_dir = args.dest fetch_wgs = args.wgs + extract_wgs = args.extract_wgs + expanded = args.expanded fetch_meta = args.meta fetch_index = args.index aspera = args.aspera aspera_settings = args.aspera_settings + handler = args.redirect_handler if aspera or aspera_settings is not None: aspera = utils.set_aspera(aspera_settings) @@ -74,30 +85,31 @@ def set_parser(): if utils.is_wgs_set(accession): if output_format is not None: sequenceGet.check_format(output_format) - sequenceGet.download_wgs(dest_dir, accession, output_format) + sequenceGet.download_wgs(dest_dir, accession, output_format, handler) elif not utils.is_available(accession): sys.stderr.write('ERROR: Record does not exist or is not available for accession provided\n') sys.exit(1) elif utils.is_sequence(accession): if output_format is not None: sequenceGet.check_format(output_format) - sequenceGet.download_sequence(dest_dir, accession, output_format) + sequenceGet.download_sequence(dest_dir, accession, output_format, expanded, handler) elif utils.is_analysis(accession): if output_format is not None: readGet.check_read_format(output_format) - readGet.download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, aspera) + readGet.download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, aspera, handler) elif utils.is_run(accession) or utils.is_experiment(accession): if output_format is not None: readGet.check_read_format(output_format) - readGet.download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, aspera) + readGet.download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, aspera, handler) elif utils.is_assembly(accession): if output_format is not None: assemblyGet.check_format(output_format) - assemblyGet.download_assembly(dest_dir, accession, output_format, fetch_wgs) + assemblyGet.download_assembly(dest_dir, accession, output_format, fetch_wgs, extract_wgs, expanded, handler) else: sys.stderr.write('ERROR: Invalid accession provided\n') sys.exit(1) print ('Completed') except Exception: + traceback.print_exc() utils.print_error() sys.exit(1) diff --git a/python3/enaGroupGet.py b/python3/enaGroupGet.py index d8a9ee7..5c311a9 100644 --- a/python3/enaGroupGet.py +++ b/python3/enaGroupGet.py @@ -25,10 +25,11 @@ import assemblyGet import readGet import utils +import traceback def set_parser(): parser = argparse.ArgumentParser(prog='enaGroupGet', - description = 'Download data for a given study or sample') + description = 'Download data for a given study or sample, or (for sequence and assembly) taxon') parser.add_argument('accession', help='Study or sample accession or NCBI tax ID to fetch data for') parser.add_argument('-g', '--group', default='read', choices=['sequence', 'wgs', 'assembly', 'read', 'analysis'], @@ -42,6 +43,10 @@ def set_parser(): help='Destination directory (default is current running directory)') parser.add_argument('-w', '--wgs', action='store_true', help='Download WGS set for each assembly if available (default is false)') + parser.add_argument('-e', '--extract-wgs', action='store_true', + help='Extract WGS scaffolds for each assembly if available (default is false)') + parser.add_argument('-exp', '--expanded', action='store_true', + help='Expand CON scaffolds when downloading embl format (default is false)') parser.add_argument('-m', '--meta', action='store_true', help='Download read or analysis XML in addition to data files (default is false)') parser.add_argument('-i', '--index', action='store_true', @@ -54,7 +59,7 @@ def set_parser(): for environment variable or default settings file location.""") parser.add_argument('-t', '--subtree', action='store_true', help='Include subordinate taxa (taxon subtree) when querying with NCBI tax ID (default is false)') - parser.add_argument('-v', '--version', action='version', version='%(prog)s 1.4.1') + parser.add_argument('-v', '--version', action='version', version='%(prog)s 1.5.3') return parser def download_report(group, result, accession, temp_file, subtree): @@ -66,77 +71,71 @@ def download_report(group, result, accession, temp_file, subtree): f.flush() f.close() -def download_data(group, data_accession, output_format, group_dir, fetch_wgs, fetch_meta, fetch_index, aspera): +def download_data(group, data_accession, output_format, group_dir, fetch_wgs, extract_wgs, expanded, fetch_meta, fetch_index, aspera): if group == utils.WGS: print ('Fetching ' + data_accession[:6]) - if aspera: - print ('Aspera not supported for WGS data. Using FTP...') sequenceGet.download_wgs(group_dir, data_accession[:6], output_format) else: print ('Fetching ' + data_accession) if group == utils.ASSEMBLY: - if aspera: - print ('Aspera not supported for assembly data. Using FTP...') - assemblyGet.download_assembly(group_dir, data_accession, output_format, fetch_wgs, True) + assemblyGet.download_assembly(group_dir, data_accession, output_format, fetch_wgs, extract_wgs, expanded, True) elif group in [utils.READ, utils.ANALYSIS]: readGet.download_files(data_accession, output_format, group_dir, fetch_index, fetch_meta, aspera) -def download_data_group(group, accession, output_format, group_dir, fetch_wgs, fetch_meta, fetch_index, aspera, subtree): - temp_file = os.path.join(group_dir, accession + '_temp.txt') - download_report(group, utils.get_group_result(group), accession, temp_file, subtree) - f = open(temp_file) +def download_data_group(group, accession, output_format, group_dir, fetch_wgs, extract_wgs, fetch_meta, fetch_index, aspera, subtree, expanded): + temp_file_path = os.path.join(group_dir, accession + '_temp.txt') + download_report(group, utils.get_group_result(group), accession, temp_file_path, subtree) header = True - for line in f: - if header: - header = False - continue - data_accession = line.strip() - download_data(group, data_accession, output_format, group_dir, fetch_wgs, fetch_meta, fetch_index, aspera) - f.close() - os.remove(temp_file) + with open(temp_file_path) as f: + for line in f: + if header: + header = False + continue + data_accession = line.strip() + download_data(group, data_accession, output_format, group_dir, fetch_wgs, extract_wgs, expanded, fetch_meta, fetch_index, aspera) + os.remove(temp_file_path) + +def download_sequence_result(dest_file, group_dir, result, accession, subtree, update_accs, expanded): + temp_file_path = os.path.join(group_dir, 'temp.txt') + download_report(utils.SEQUENCE, result, accession, temp_file_path, subtree) + header = True + with open(temp_file_path) as f: + for line in f: + if header: + header = False + continue + data_accession = line.strip() + write_record = False + if result == utils.SEQUENCE_UPDATE_RESULT: + update_accs.append(data_accession) + write_record = True + elif result == utils.SEQUENCE_RELEASE_RESULT: + if data_accession not in update_accs: + write_record = True + if write_record: + sequenceGet.write_record(dest_file, data_accession, output_format) + dest_file.flush() + os.remove(temp_file_path) + return update_accs -def download_sequence_group(accession, output_format, group_dir, subtree): - print('Downloading sequences') +def download_sequence_group(accession, output_format, group_dir, subtree, expanded): + print ('Downloading sequences') update_accs = [] - dest_file = os.path.join(group_dir, utils.get_filename(accession + '_sequences', output_format)) + dest_file_path = os.path.join(group_dir, utils.get_filename(accession + '_sequences', output_format)) + dest_file = open(dest_file_path, 'wb') #sequence update - temp_file = os.path.join(group_dir, 'temp.txt') - download_report(utils.SEQUENCE, utils.SEQUENCE_UPDATE_RESULT, accession, temp_file, subtree) - f = open(temp_file) - header = True - for line in f: - if header: - header = False - continue - data_accession = line.strip() - update_accs.append(data_accession) - sequenceGet.append_record(dest_file, data_accession, output_format) - f.close() - os.remove(temp_file) + update_accs = download_sequence_result(dest_file, group_dir, utils.SEQUENCE_UPDATE_RESULT, accession, subtree, update_accs, expanded) #sequence release - temp_file = os.path.join(group_dir, 'temp.txt') - download_report(utils.SEQUENCE, utils.SEQUENCE_RELEASE_RESULT, accession, temp_file, subtree) - f = open(temp_file) - header = True - for line in f: - if header: - header = False - continue - data_accession = line.strip() - if data_accession not in update_accs: - sequenceGet.append_record(dest_file, data_accession, output_format) - f.close() - os.remove(temp_file) + update_accs = download_sequence_result(dest_file, group_dir, utils.SEQUENCE_RELEASE_RESULT, accession, subtree, update_accs, expanded) + dest_file.close() -def download_group(accession, group, output_format, dest_dir, fetch_wgs, fetch_meta, fetch_index, aspera, subtree): +def download_group(accession, group, output_format, dest_dir, fetch_wgs, extract_wgs, fetch_meta, fetch_index, aspera, subtree, expanded): group_dir = os.path.join(dest_dir, accession) utils.create_dir(group_dir) if group == utils.SEQUENCE: - if aspera: - print ('Aspera not supported for sequence downloads. Using FTP...') - download_sequence_group(accession, output_format, group_dir, subtree) + download_sequence_group(accession, output_format, group_dir, subtree, expanded) else: - download_data_group(group, accession, output_format, group_dir, fetch_wgs, fetch_meta, fetch_index, aspera, subtree) + download_data_group(group, accession, output_format, group_dir, fetch_wgs, extract_wgs, fetch_meta, fetch_index, aspera, subtree, expanded) if __name__ == '__main__': parser = set_parser() @@ -147,6 +146,8 @@ def download_group(accession, group, output_format, dest_dir, fetch_wgs, fetch_m output_format = args.format dest_dir = args.dest fetch_wgs = args.wgs + extract_wgs = args.extract_wgs + expanded = args.expanded fetch_meta = args.meta fetch_index = args.index aspera = args.aspera @@ -184,8 +185,9 @@ def download_group(accession, group, output_format, dest_dir, fetch_wgs, fetch_m if utils.is_taxid(accession) and group in ['read', 'analysis']: print('Sorry, tax ID retrieval not yet supported for read and analysis') sys.exit(1) - download_group(accession, group, output_format, dest_dir, fetch_wgs, fetch_meta, fetch_index, aspera, subtree) + download_group(accession, group, output_format, dest_dir, fetch_wgs, extract_wgs, fetch_meta, fetch_index, aspera, subtree, expanded) print ('Completed') except Exception: + traceback.print_exc() utils.print_error() sys.exit(1) diff --git a/python3/readGet.py b/python3/readGet.py index dd62c30..a3f8597 100644 --- a/python3/readGet.py +++ b/python3/readGet.py @@ -38,11 +38,11 @@ def check_analysis_format(output_format): ) sys.exit(1) -def attempt_file_download(file_url, dest_dir, md5, aspera): +def attempt_file_download(file_url, dest_dir, md5, aspera, handler=None): if md5 is not None: print('Downloading file with md5 check:' + file_url) if aspera: - return utils.get_aspera_file_with_md5_check(file_url, dest_dir, md5) + return utils.get_aspera_file_with_md5_check(file_url, dest_dir, md5, handler) else: return utils.get_ftp_file_with_md5_check('ftp://' + file_url, dest_dir, md5) print('Downloading file:' + file_url) @@ -50,12 +50,12 @@ def attempt_file_download(file_url, dest_dir, md5, aspera): return utils.get_aspera_file(file_url, dest_dir) return utils.get_ftp_file('ftp://' + file_url, dest_dir) -def download_file(file_url, dest_dir, md5, aspera): +def download_file(file_url, dest_dir, md5, aspera, handler=None): if utils.file_exists(file_url, dest_dir, md5): return - success = attempt_file_download(file_url, dest_dir, md5, aspera) + success = attempt_file_download(file_url, dest_dir, md5, aspera, handler) if not success: - success = attempt_file_download(file_url, dest_dir, md5, aspera) + success = attempt_file_download(file_url, dest_dir, md5, aspera, handler) if not success: print('Failed to download file after two attempts') @@ -74,7 +74,7 @@ def download_experiment_meta(run_accession, dest_dir): break download_meta(experiment_accession, dest_dir) -def download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, aspera): +def download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, aspera, handler=None): accession_dir = os.path.join(dest_dir, accession) utils.create_dir(accession_dir) # download experiment xml @@ -114,10 +114,10 @@ def download_files(accession, output_format, dest_dir, fetch_index, fetch_meta, file_url = filelist[i] md5 = md5list[i] if file_url != '': - download_file(file_url, target_dir, md5, aspera) + download_file(file_url, target_dir, md5, aspera, handler) for index_file in indexlist: if index_file != '': - download_file(index_file, target_dir, None, aspera) + download_file(index_file, target_dir, None, aspera, handler) if utils.is_empty_dir(target_dir): print('Deleting directory ' + os.path.basename(target_dir)) os.rmdir(target_dir) diff --git a/python3/sequenceGet.py b/python3/sequenceGet.py index b57b90d..5d8d5cc 100644 --- a/python3/sequenceGet.py +++ b/python3/sequenceGet.py @@ -23,47 +23,49 @@ import utils -def append_record(dest_file, accession, output_format): +def write_record(dest_file, accession, output_format, expanded=False): url = utils.get_record_url(accession, output_format) - return utils.append_record(url, dest_file) + if expanded: + url = url + '&expanded=true' + return utils.write_record(url, dest_file) -def download_sequence(dest_dir, accession, output_format): +def download_sequence(dest_dir, accession, output_format, expanded, handler=None): if output_format is None: output_format = utils.EMBL_FORMAT - success = utils.download_record(dest_dir, accession, output_format) + success = utils.download_record(dest_dir, accession, output_format, expanded, handler) if not success: print ('Unable to fetch file for {0}, format {1}'.format(accession, output_format)) -def download_wgs(dest_dir, accession, output_format): +def download_wgs(dest_dir, accession, output_format, handler=None): if utils.is_unversioned_wgs_set(accession): - return download_unversioned_wgs(dest_dir, accession, output_format) + return download_unversioned_wgs(dest_dir, accession, output_format, handler) else: - return download_versioned_wgs(dest_dir, accession, output_format) + return download_versioned_wgs(dest_dir, accession, output_format, handler) -def download_versioned_wgs(dest_dir, accession, output_format): +def download_versioned_wgs(dest_dir, accession, output_format, handler=None): prefix = accession[:6] if output_format is None: output_format = utils.EMBL_FORMAT public_set_url = utils.get_wgs_ftp_url(prefix, utils.PUBLIC, output_format) supp_set_url = utils.get_wgs_ftp_url(prefix, utils.SUPPRESSED, output_format) - success = utils.get_ftp_file(public_set_url, dest_dir) + success = utils.get_ftp_file(public_set_url, dest_dir, handler) if not success: - success = utils.get_ftp_file(supp_set_url, dest_dir) + success = utils.get_ftp_file(supp_set_url, dest_dir, handler) if not success: print ('No WGS set file available for {0}, format {1}'.format(accession, output_format)) print ('Please contact ENA (datasubs@ebi.ac.uk) if you feel this set should be available') -def download_unversioned_wgs(dest_dir, accession, output_format): +def download_unversioned_wgs(dest_dir, accession, output_format, handler=None): prefix = accession[:4] if output_format is None: output_format = utils.EMBL_FORMAT public_set_url = utils.get_nonversioned_wgs_ftp_url(prefix, utils.PUBLIC, output_format) if public_set_url is not None: - utils.get_ftp_file(public_set_url, dest_dir) + utils.get_ftp_file(public_set_url, dest_dir, handler) else: supp_set_url = utils.get_nonversioned_wgs_ftp_url(prefix, utils.SUPPRESSED, output_format) if supp_set_url is not None: - utils.get_ftp_file(supp_set_url, dest_dir) + utils.get_ftp_file(supp_set_url, dest_dir, handler) else: print ('No WGS set file available for {0}, format {1}'.format(accession, output_format)) print ('Please contact ENA (datasubs@ebi.ac.uk) if you feel this set should be available') diff --git a/python3/utils.py b/python3/utils.py index 92eba10..f545589 100644 --- a/python3/utils.py +++ b/python3/utils.py @@ -25,8 +25,14 @@ import ssl import subprocess import sys +import urllib +import pexpect +import time +import shlex +import json import urllib.request as urlrequest import xml.etree.ElementTree as ElementTree +import urllib.error as urlerror from configparser import SafeConfigParser from http.client import HTTPSConnection @@ -36,8 +42,6 @@ ASPERA_OPTIONS = '' # set any extra aspera options ASPERA_SPEED = '100M' # set aspera download speed -ANON_AUTH = b'anon:anon' - SUPPRESSED = 'suppressed' PUBLIC = 'public' @@ -68,7 +72,7 @@ SAMPLE = 'sample' TAXON = 'taxon' -VIEW_URL_BASE = 'http://www.ebi.ac.uk/ena/data/view/' +VIEW_URL_BASE = 'https://www.ebi.ac.uk/ena/data/view/' XML_DISPLAY = '&display=xml' EMBL_DISPLAY = '&display=text' FASTA_DISPLAY = '&display=fasta' @@ -83,7 +87,7 @@ WGS_FTP_BASE = 'ftp://ftp.ebi.ac.uk/pub/databases/ena/wgs' WGS_FTP_DIR = 'pub/databases/ena/wgs' -PORTAL_SEARCH_BASE = 'http://www.ebi.ac.uk/ena/portal/api/search?' +PORTAL_SEARCH_BASE = 'https://www.ebi.ac.uk/ena/portal/api/search?' RUN_RESULT = 'result=read_run' ANALYSIS_RESULT = 'result=analysis' WGS_RESULT = 'result=wgs_set' @@ -106,7 +110,7 @@ sequence_pattern_1 = re.compile('^[A-Z]{1}[0-9]{5}(\.[0-9]+)?$') sequence_pattern_2 = re.compile('^[A-Z]{2}[0-9]{6}(\.[0-9]+)?$') -sequence_pattern_3 = re.compile('^[A-Z]{4}[0-9]{8,9}(\.[0-9]+)?$') +wgs_sequence_pattern = re.compile('^[A-Z]{4}[0-9]{8,9}(\.[0-9]+)?$') coding_pattern = re.compile('^[A-Z]{3}[0-9]{5}(\.[0-9]+)?$') wgs_prefix_pattern = re.compile('^[A-Z]{4}[0-9]{2}$') wgs_master_pattern = re.compile('^[A-Z]{4}[0-9]{2}[0]{6}$') @@ -125,8 +129,10 @@ enaBrowserTools_path = os.path.dirname(os.path.dirname(__file__)) def is_sequence(accession): - return sequence_pattern_1.match(accession) or sequence_pattern_2.match(accession) \ - or sequence_pattern_3.match(accession) + return sequence_pattern_1.match(accession) or sequence_pattern_2.match(accession) + +def is_wgs_sequence(accession): + return wgs_sequence_pattern.match(accession) def is_coding(accession): return coding_pattern.match(accession) @@ -235,9 +241,17 @@ def is_available(accession): url = get_record_url('Taxon:{0}'.format(accession), XML_FORMAT) else: url = get_record_url(accession, XML_FORMAT) - response = urlrequest.urlopen(url) - record = ElementTree.parse(response).getroot() - return (not 'entry is not found' in record.text) and (len(record.getchildren()) > 0) + try: + response = urlrequest.urlopen(url) + record = ElementTree.parse(response).getroot() + return (not 'entry is not found' in record.text) and (len(record.getchildren()) > 0) + except urlerror.URLError as e: + if 'CERTIFICATE_VERIFY_FAILED' in str(e): + print ('Error verifying SSL certificate. Have you run "Install Certificates" as part of your Python3 installation?') + print ('This is a commonly missed step in Python3 installation on a Mac.') + print ('Please run the following from a terminal window (update to your Python3 version as needed):') + print ('open "/Applications/Python 3.6/Install Certificates.command"') + raise def get_filename(base_name, output_format): if output_format == XML_FORMAT: @@ -257,43 +271,48 @@ def get_destination_file(dest_dir, accession, output_format): def download_single_record(url, dest_file): urlrequest.urlretrieve(url, dest_file) -def download_record(dest_dir, accession, output_format): +def download_record(dest_dir, accession, output_format, expanded=False): try: dest_file = get_destination_file(dest_dir, accession, output_format) url = get_record_url(accession, output_format) + if expanded: + url = url + '&expanded=true' download_single_record(url, dest_file) return True except Exception as e: print ("Error downloading read record: {0}".format(e)) return False -def append_record(url, dest_file): +def write_record(url, dest_file): try: response = urlrequest.urlopen(url) - f = open(dest_file, 'ab') + linenum = 1 for line in response: - chars = f.write(line) - f.flush() - f.close() + if linenum == 1 and line.startswith(b'Entry:'): + return False + chars = dest_file.write(line) + linenum += 1 + dest_file.flush() return True except Exception: return False -def get_ftp_file(ftp_url, dest_dir): +def get_ftp_file(ftp_url, dest_dir, handler=None): try: filename = ftp_url.split('/')[-1] dest_file = os.path.join(dest_dir, filename) - urlrequest.urlretrieve(ftp_url, dest_file) + response = urllib.urlopen(ftp_url) + chunk_read(response, file_handle=dest_file, handler=handler) return True except Exception as e: print ("Error with FTP transfer: {0}".format(e)) return False -def get_aspera_file(aspera_url, dest_dir): +def get_aspera_file(aspera_url, dest_dir, handler=None): try: filename = aspera_url.split('/')[-1] dest_file = os.path.join(dest_dir, filename) - asperaretrieve(aspera_url, dest_dir, dest_file) + asperaretrieve(aspera_url, dest_dir, dest_file, handler) return True except Exception: print ("Error with FTP transfer: {0}".format(e)) @@ -306,12 +325,14 @@ def get_md5(filepath): hash_md5.update(chunk) return hash_md5.hexdigest() -def check_md5(filepath, expected_md5): +def check_md5(filepath, expected_md5, handler): generated_md5 = get_md5(filepath) if expected_md5 != generated_md5: - print ('MD5 mismatch for downloaded file ' + filepath + '. Deleting file') - print ('generated md5', generated_md5) - print ('expected md5', expected_md5) + if not handler: + handler=sys.stdout.write + handler('MD5 mismatch for downloaded file ' + filepath + '. Deleting file') + handler('Generated md5: %s' % generated_md5) + handler('Expected md5: %s' % expected_md5) os.remove(filepath) return False return True @@ -326,23 +347,24 @@ def file_exists(file_url, dest_dir, md5): return True return False -def get_ftp_file_with_md5_check(ftp_url, dest_dir, md5): +def get_ftp_file_with_md5_check(ftp_url, dest_dir, md5, handler=None): try: filename = ftp_url.split('/')[-1] dest_file = os.path.join(dest_dir, filename) - urlrequest.urlretrieve(ftp_url, dest_file) - return check_md5(dest_file, md5) + response = urllib.urlopen(ftp_url) + chunk_read(response, file_handle=dest_file, handler=handler) + return check_md5(dest_file, md5, handler=handler) except Exception as e: sys.stderr.write("Error with FTP transfer: {0}\n".format(e)) return False -def get_aspera_file_with_md5_check(aspera_url, dest_dir, md5): +def get_aspera_file_with_md5_check(aspera_url, dest_dir, md5, handler=None): try: filename = aspera_url.split('/')[-1] dest_file = os.path.join(dest_dir, filename) - success = asperaretrieve(aspera_url, dest_dir, dest_file) + success = asperaretrieve(aspera_url, dest_dir, dest_file, handler=handler) if success: - return check_md5(dest_file, md5) + return check_md5(dest_file, md5, handler=handler) return False except Exception as e: sys.stderr.write("Error with Aspera transfer: {0}\n".format(e)) @@ -352,13 +374,22 @@ def set_aspera_variables(filepath): try: parser = SafeConfigParser() parser.read(filepath) + # set and check binary location global ASPERA_BIN ASPERA_BIN = parser.get('aspera', 'ASPERA_BIN') + if not os.path.exists(ASPERA_BIN): + print ('Aspera binary ({0}) does not exist. Defaulting to FTP transfer'.format(ASPERA_BIN)) + return False + if not os.access(ASPERA_BIN, os.X_OK): + print ('You do not have permissions to execute the aspera binary ({0}). Defaulting to FTP transfer'.format(ASPERA_BIN)) + return False + # set and check private key location global ASPERA_PRIVATE_KEY ASPERA_PRIVATE_KEY = parser.get('aspera', 'ASPERA_PRIVATE_KEY') if not os.path.exists(ASPERA_PRIVATE_KEY): - print('Private key file ({0}) does not exist. Defaulting to FTP transfer'.format(ASPERA_PRIVATE_KEY)) + print ('Private key file ({0}) does not exist. Defaulting to FTP transfer'.format(ASPERA_PRIVATE_KEY)) return False + # set non-file variables global ASPERA_SPEED ASPERA_SPEED = parser.get('aspera', 'ASPERA_SPEED') global ASPERA_OPTIONS @@ -387,10 +418,8 @@ def set_aspera(aspera_filepath): aspera = False return aspera -def asperaretrieve(url, dest_dir, dest_file): +def asperaretrieve(url, dest_dir, dest_file, handler=None): try: - if not os.path.exists(ASPERA_BIN) or not os.path.exists(ASPERA_PRIVATE_KEY): - raise FileNotFoundError('Aspera not available. Check your ascp binary path and your private key file is specified correctly') logdir=os.path.abspath(os.path.join(dest_dir, "logs")) print ('Creating', logdir) create_dir(logdir) @@ -404,13 +433,70 @@ def asperaretrieve(url, dest_dir, dest_file): key=ASPERA_PRIVATE_KEY, speed=ASPERA_SPEED, ) - print(aspera_line) - subprocess.call(aspera_line, shell=True) # this blocks so we're fine to wait and return True + _do_aspera_pexpect(aspera_line, handler) return True except Exception as e: sys.stderr.write("Error with Aspera transfer: {0}\n".format(e)) return False +def _do_aspera_pexpect(cmd, handler): + thread = pexpect.spawn(cmd, timeout=None) + cpl = thread.compile_pattern_list([pexpect.EOF, '(.+)']) + started = 0 + + while True: + i = thread.expect_list(cpl, timeout=None) + if i == 0: + if started == 0: + if handler and callable(handler): + handler("Error initiating transfer") + else: + sys.stderr.write("Error initiating transfer") + break + elif i == 1: + started = 1 + output = dict() + pexp_match = thread.match.group(1) + prev_file = '' + tokens_to_match = ["Mb/s"] + units_to_match = ["KB", "MB", "GB"] + rates_to_match = ["Kb/s", "kb/s", "Mb/s", "mb/s", "Gb/s", "gb/s"] + end_of_transfer = False + if any(tm in pexp_match.decode("utf-8") for tm in tokens_to_match): + tokens = pexp_match.decode("utf-8").split(" ") + if 'ETA' in tokens: + pct_completed = [x for x in tokens if len(x) > 1 and x[-1] == '%'] + if pct_completed: + output["pct_completed"] = pct_completed[0][:-1] + + bytes_transferred = [x for x in tokens if len(x) > 2 and x[-2:] in units_to_match] + if bytes_transferred: + output["bytes_transferred"] = bytes_transferred[0] + + transfer_rate = [x for x in tokens if len(x) > 4 and x[-4:] in rates_to_match] + if transfer_rate: + output["transfer_rate"] = transfer_rate[0] + + if handler and callable(handler): + handler(json.dumps(output)) + else: + sys.stdout.write("%s%% transferred, %s, %s\n" % (output["pct_completed"], output["bytes_transferred"], output["transfer_rate"])) + thread.close() + +def _do_aspera_transfer(cmd, handler): + p = Popen(shlex.split(cmd), + stdout=PIPE, + stderr=STDOUT, + bufsize=1) + with p.stdout: + for line in iter(p.stdout.readline, b''): + if handler and callable(handler): + handler(line) + else: + sys.stdout.write(line), + p.wait() + pass + def get_wgs_file_ext(output_format): if output_format == EMBL_FORMAT: return WGS_EMBL_EXT @@ -439,9 +525,7 @@ def get_nonversioned_wgs_ftp_url(wgs_set, status, output_format): return base_url + '/' + max(files) def get_report_from_portal(url): - userAndPass = base64.b64encode(ANON_AUTH).decode("ascii") - headers = { 'Authorization' : 'Basic %s' % userAndPass } - request = urlrequest.Request(url, headers=headers) + request = urlrequest.Request(url) gcontext = ssl.SSLContext(ssl.PROTOCOL_TLSv1) return urlrequest.urlopen(request, context=gcontext) @@ -586,4 +670,77 @@ def is_empty_dir(target_dir): def print_error(): sys.stderr.write('ERROR: Something unexpected went wrong please try again.\n') - sys.stderr.write('If problem persists, please contact datasubs@ebi.ac.uk for assistance.\n') + sys.stderr.write('If problem persists, please contact datasubs@ebi.ac.uk for assistance, with the above error details.\n') + +def chunk_report(f, bytes_so_far, chunk_size, total_size, handler=None): + global start_time + if bytes_so_far == chunk_size: + start_time = time.time() + return + duration = time.time() - start_time + progress_size = int(bytes_so_far) + speed = int(progress_size / ((1024 * 1024) * duration)) + percent = int(progress_size * 100 /total_size) + line = "\r%s: %0.f%%, %d MB, %d MB/s" % (f, percent, progress_size / (1024 * 1024), speed) + + if handler and callable(handler): + handler(line) + else: + sys.stdout.write(line) + + if bytes_so_far >= total_size: + if handler and callable(handler): + handler('\n') + else: + sys.stdout.write('\n') + +def chunk_read(response, file_handle, chunk_size=104857, tick_rate=10, report_hook=chunk_report, handler=None): + total_size = response.info().getheader('Content-Length').strip() + bytes_so_far = 0 + + with open(file_handle, 'wb') as f: + base = os.path.basename(file_handle) + if total_size is None: # cannot chunk - no content length + f.write(response.content) + else: + tick = 0 + total_size = int(total_size) + while 1: + chunk = response.read(chunk_size) + bytes_so_far += len(chunk) + + if not chunk: + break + + f.write(chunk) + + if report_hook: + if tick == 0 or tick % int(tick_rate) == 0: + tick = 0 + report_hook(base, bytes_so_far, chunk_size, total_size, handler) + tick += 1 + + return bytes_so_far + +def get_divisor(record_cnt): + if record_cnt <= 10000: + return 1000 + elif record_cnt <= 50000: + return 5000 + return 10000 + +def record_start_line(line, output_format): + if output_format == FASTA_FORMAT: + return line.startswith(b'>') + elif output_format == EMBL_FORMAT: + return line.startswith(b'ID ') + else: + return False + +def extract_acc_from_line(line, output_format): + if output_format == FASTA_FORMAT: + return line.split(b'|')[1] + elif output_format == EMBL_FORMAT: + return line.split()[1][:-1] + else: + return ''