Skip to content

Commit

Permalink
More protection against running out of memory (all formats read side).
Browse files Browse the repository at this point in the history
These have been found by using a malloc replacement (added to the
Makefile plus header include using CFLAGS="-g -include malloc_fuzz.h")
so that the Nth memory allocation will always fail.  We then loop from
1 upwards until we succeed, checking every place for a proper handling
of the error.

Eg:

    for i in `seq 1 20000`
    do
        echo $i
        FUZZ=$i ./test/test_view -@4 -B in.bam 2>&1
        test "$?" ">" 1 && break
    done

Changes include

- bgzf MT reading now sets mt->command = CLOSE and has additional
  calls to shutdown the queue on memory errors.
  This is checked for in bgzf_check_EOF to avoid deadlock.
- bgzf now has a pool_create() error check.
- Distinguish between errors vs would-block in cram_decode_slice_mt
- Make cram_drain_rqueue cope with failed jobs.
- Improve cram_close to cope with shuting down after failed
  initialisation, such as rqueue not being initialised.
- Check for realloc failure in refs_load_fai.
- Fix segfault in cram_decode_slice if cram_get_ref failed.
- Removed abort in load_hfile_plugins.  It now returns -1 and is
  checked for in find_scheme_handler.
- Documented return values in hts_getline
- Attempt to propagate memory error in kgetline2 to file pointer, but
  this only works for hgets (not gets).  Fundamentally this is
  unsolvable with the current API.
- Fixed memory tear-down in sam_parse_worker so it doesn't claim to
  have allocated more than it managed.
- sam_set_thread_pool now copes with failure to create the queue.
- Tabix index_load now checks calloc return.
- Extra allocation return checks in hts_tpool_process_init and
  hts_tpool_init.
- Check ksprintf worked in vcf fix_chromosome.

Co-Authored-By: Rob Davies <[email protected]>
  • Loading branch information
jkbonfield and daviesrob committed Aug 18, 2020
1 parent 773b6ed commit 8ae25df
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 21 deletions.
39 changes: 28 additions & 11 deletions bgzf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1530,10 +1530,7 @@ static void *bgzf_mt_reader(void *vp) {
pthread_mutex_lock(&mt->job_pool_m);
bgzf_job *j = pool_alloc(mt->job_pool);
pthread_mutex_unlock(&mt->job_pool_m);
if (!j) {
hts_tpool_process_destroy(mt->out_queue);
return NULL;
}
if (!j) goto err;
j->errcode = 0;
j->comp_len = 0;
j->uncomp_len = 0;
Expand All @@ -1545,8 +1542,7 @@ static void *bgzf_mt_reader(void *vp) {
if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_decode_func, j,
job_cleanup, job_cleanup, 0) < 0) {
job_cleanup(j);
hts_tpool_process_destroy(mt->out_queue);
return NULL;
goto err;
}

// Check for command
Expand Down Expand Up @@ -1658,6 +1654,14 @@ static void *bgzf_mt_reader(void *vp) {
return NULL;
}
}

err:
pthread_mutex_lock(&mt->command_m);
mt->command = CLOSE;
pthread_cond_signal(&mt->command_c);
pthread_mutex_unlock(&mt->command_m);
hts_tpool_process_destroy(mt->out_queue);
return NULL;
}

int bgzf_thread_pool(BGZF *fp, hts_tpool *pool, int qsize) {
Expand All @@ -1674,13 +1678,13 @@ int bgzf_thread_pool(BGZF *fp, hts_tpool *pool, int qsize) {
mt->n_threads = hts_tpool_size(pool);
if (!qsize)
qsize = mt->n_threads*2;
if (!(mt->out_queue = hts_tpool_process_init(mt->pool, qsize, 0))) {
free(mt);
return -1;
}
if (!(mt->out_queue = hts_tpool_process_init(mt->pool, qsize, 0)))
goto err;
hts_tpool_process_ref_incr(mt->out_queue);

mt->job_pool = pool_create(sizeof(bgzf_job));
if (!mt->job_pool)
goto err;

pthread_mutex_init(&mt->job_pool_m, NULL);
pthread_mutex_init(&mt->command_m, NULL);
Expand All @@ -1694,6 +1698,11 @@ int bgzf_thread_pool(BGZF *fp, hts_tpool *pool, int qsize) {
fp->is_write ? bgzf_mt_writer : bgzf_mt_reader, fp);

return 0;

err:
free(mt);
fp->mt = NULL;
return -1;
}

int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
Expand Down Expand Up @@ -2030,17 +2039,25 @@ int bgzf_check_EOF(BGZF *fp) {
// fp->mt->command state transitions should be:
// NONE -> HAS_EOF -> HAS_EOF_DONE -> NONE
// (HAS_EOF -> HAS_EOF_DONE happens in bgzf_mt_reader thread)
fp->mt->command = HAS_EOF;
if (fp->mt->command != CLOSE)
fp->mt->command = HAS_EOF;
pthread_cond_signal(&fp->mt->command_c);
hts_tpool_wake_dispatch(fp->mt->out_queue);
do {
if (fp->mt->command == CLOSE) {
// possible error in bgzf_mt_reader
pthread_mutex_unlock(&fp->mt->command_m);
return 0;
}
pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
switch (fp->mt->command) {
case HAS_EOF_DONE: break;
case HAS_EOF:
// Resend signal intended for bgzf_mt_reader()
pthread_cond_signal(&fp->mt->command_c);
break;
case CLOSE:
continue;
default:
abort(); // Should not get to any other state
}
Expand Down
11 changes: 9 additions & 2 deletions cram/cram_decode.c
Original file line number Diff line number Diff line change
Expand Up @@ -2247,7 +2247,7 @@ int cram_decode_slice(cram_fd *fd, cram_container *c, cram_slice *s,
pthread_mutex_lock(&fd->ref_lock);
pthread_mutex_lock(&fd->refs->lock);
if ((fd->required_fields & SAM_SEQ) &&
ref_id < fd->refs->nref &&
ref_id < fd->refs->nref && fd->refs->ref_id &&
s->ref_end > fd->refs->ref_id[ref_id]->length) {
s->ref_end = fd->refs->ref_id[ref_id]->length;
}
Expand Down Expand Up @@ -2802,13 +2802,18 @@ int cram_decode_slice_mt(cram_fd *fd, cram_container *c, cram_slice *s,

nonblock = hts_tpool_process_sz(fd->rqueue) ? 1 : 0;

int saved_errno = errno;
errno = 0;
if (-1 == hts_tpool_dispatch2(fd->pool, fd->rqueue, cram_decode_slice_thread,
j, nonblock)) {
/* Would block */
if (errno != EAGAIN)
return -1;
fd->job_pending = j;
} else {
fd->job_pending = NULL;
}
errno = saved_errno;

// flush too
return 0;
Expand Down Expand Up @@ -3350,12 +3355,14 @@ int cram_get_bam_seq(cram_fd *fd, bam_seq_t **bam) {
void cram_drain_rqueue(cram_fd *fd) {
cram_container *lc = NULL;

if (!fd->pool)
if (!fd->pool || !fd->rqueue)
return;

// drain queue of any in-flight decode jobs
while (!hts_tpool_process_empty(fd->rqueue)) {
hts_tpool_result *r = hts_tpool_next_result_wait(fd->rqueue);
if (!r)
break;
cram_decode_job *j = (cram_decode_job *)hts_tpool_result_data(r);
if (j->c->slice == j->s)
j->c->slice = NULL;
Expand Down
8 changes: 6 additions & 2 deletions cram/cram_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -1913,10 +1913,14 @@ static refs_t *refs_load_fai(refs_t *r_orig, const char *fn, int is_err) {
}

if (id >= id_alloc) {
ref_entry **new_refs;
int x;

id_alloc = id_alloc ?id_alloc*2 : 16;
r->ref_id = realloc(r->ref_id, id_alloc * sizeof(*r->ref_id));
new_refs = realloc(r->ref_id, id_alloc * sizeof(*r->ref_id));
if (!new_refs)
goto err;
r->ref_id = new_refs;

for (x = id; x < id_alloc; x++)
r->ref_id[x] = NULL;
Expand Down Expand Up @@ -4622,7 +4626,7 @@ int cram_close(cram_fd *fd) {
if (fd->mode != 'w')
cram_drain_rqueue(fd);

if (fd->pool && fd->eof >= 0) {
if (fd->pool && fd->eof >= 0 && fd->rqueue) {
hts_tpool_process_flush(fd->rqueue);

if (0 != cram_flush_result(fd))
Expand Down
26 changes: 22 additions & 4 deletions hfile.c
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,11 @@ void hfile_add_scheme_handler(const char *scheme,
return;
}
khint_t k = kh_put(scheme_string, schemes, scheme, &absent);
if (absent < 0) {
hts_log_warning("Couldn't register scheme handler for %s : %s",
scheme, strerror(errno));
return;
}
if (absent || priority(handler) > priority(kh_value(schemes, k))) {
kh_value(schemes, k) = handler;
}
Expand All @@ -1014,7 +1019,10 @@ static int init_add_plugin(void *obj, int (*init)(struct hFILE_plugin *),
const char *pluginname)
{
struct hFILE_plugin_list *p = malloc (sizeof (struct hFILE_plugin_list));
if (p == NULL) abort();
if (p == NULL) {
hts_log_debug("Failed to allocate memory for plugin \"%s\"", pluginname);
return -1;
}

p->plugin.api_version = 1;
p->plugin.obj = obj;
Expand All @@ -1035,15 +1043,20 @@ static int init_add_plugin(void *obj, int (*init)(struct hFILE_plugin *),
return 0;
}

static void load_hfile_plugins()
/*
* Returns 0 on success,
* <0 on failure
*/
static int load_hfile_plugins()
{
static const struct hFILE_scheme_handler
data = { hopen_mem, hfile_always_local, "built-in", 80 },
file = { hopen_fd_fileuri, hfile_always_local, "built-in", 80 },
preload = { hopen_preload, is_preload_url_remote, "built-in", 80 };

schemes = kh_init(scheme_string);
if (schemes == NULL) abort();
if (schemes == NULL)
return -1;

hfile_add_scheme_handler("data", &data);
hfile_add_scheme_handler("file", &file);
Expand Down Expand Up @@ -1085,6 +1098,8 @@ static void load_hfile_plugins()
// carry on; then eventually when the program exits, we'll merely close
// down the plugins uncleanly, as if we had aborted.
(void) atexit(hfile_exit);

return 0;
}

/* A filename like "foo:bar" in which we don't recognise the scheme is
Expand Down Expand Up @@ -1118,7 +1133,10 @@ static const struct hFILE_scheme_handler *find_scheme_handler(const char *s)
scheme[i] = '\0';

pthread_mutex_lock(&plugins_lock);
if (! schemes) load_hfile_plugins();
if (!schemes && load_hfile_plugins() < 0) {
pthread_mutex_unlock(&plugins_lock);
return NULL;
}
pthread_mutex_unlock(&plugins_lock);

khint_t k = kh_get(scheme_string, schemes, scheme);
Expand Down
9 changes: 9 additions & 0 deletions htslib/hts.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,17 @@ const char *hts_format_file_extension(const htsFormat *format);
HTSLIB_EXPORT
int hts_set_opt(htsFile *fp, enum hts_fmt_option opt, ...);

/*!
@abstract Read a line (and its \n or \r\n terminator) from a file
@param fp The file handle
@param delimiter Unused, but must be '\n' (or KS_SEP_LINE)
@param str The line (not including the terminator) is written here
@return Length of the string read;
-1 on end-of-file; <= -2 on error
*/
HTSLIB_EXPORT
int hts_getline(htsFile *fp, int delimiter, kstring_t *str);

HTSLIB_EXPORT
char **hts_readlines(const char *fn, int *_n);
/*!
Expand Down
13 changes: 12 additions & 1 deletion kstring.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,19 @@ int kgetline2(kstring_t *s, kgets_func2 *fgets_fn, void *fp)

while (s->l == l0 || s->s[s->l-1] != '\n') {
if (s->m - s->l < 200) {
if (ks_resize(s, s->m + 200) < 0)
// We return EOF for both EOF and error and the caller
// needs to check for errors in fp, and we haven't
// even got there yet.
//
// The only way of propagating memory errors is to
// deliberately call something that we know triggers
// and error so fp is also set. This works for
// hgets, but not for gets where reading <= 0 bytes
// isn't an error.
if (ks_resize(s, s->m + 200) < 0) {
fgets_fn(s->s + s->l, 0, fp);
return EOF;
}
}
ssize_t len = fgets_fn(s->s + s->l, s->m - s->l, fp);
if (len <= 0) break;
Expand Down
5 changes: 5 additions & 0 deletions sam.c
Original file line number Diff line number Diff line change
Expand Up @@ -2480,6 +2480,7 @@ static void *sam_parse_worker(void *arg) {
gb->abams *= 2;
b = (bam1_t *)realloc(gb->bams, gb->abams*sizeof(bam1_t));
if (!b) {
gb->abams /= 2;
sam_state_err(fd, ENOMEM);
goto err;
}
Expand Down Expand Up @@ -2877,6 +2878,10 @@ int sam_set_thread_pool(htsFile *fp, htsThreadPool *p) {
if (!qsize)
qsize = 2*hts_tpool_size(fd->p);
fd->q = hts_tpool_process_init(fd->p, qsize, 0);
if (!fd->q) {
sam_state_destroy(fp);
return -1;
}

if (fp->format.compression == bgzf)
return bgzf_thread_pool(fp->fp.bgzf, p->pool, p->qsize);
Expand Down
2 changes: 2 additions & 0 deletions tbx.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ static tbx_t *index_load(const char *fn, const char *fnidx, int flags)
char *nm, *p;
uint32_t l_meta, l_nm;
tbx = (tbx_t*)calloc(1, sizeof(tbx_t));
if (!tbx)
return NULL;
tbx->idx = hts_idx_load3(fn, fnidx, HTS_FMT_TBI, flags);
if ( !tbx->idx )
{
Expand Down
8 changes: 8 additions & 0 deletions thread_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ void *hts_tpool_result_data(hts_tpool_result *r) {
*/
hts_tpool_process *hts_tpool_process_init(hts_tpool *p, int qsize, int in_only) {
hts_tpool_process *q = malloc(sizeof(*q));
if (!q)
return NULL;

pthread_cond_init(&q->output_avail_c, NULL);
pthread_cond_init(&q->input_not_full_c, NULL);
Expand Down Expand Up @@ -696,6 +698,8 @@ static void wake_next_worker(hts_tpool_process *q, int locked) {
hts_tpool *hts_tpool_init(int n) {
int i;
hts_tpool *p = malloc(sizeof(*p));
if (!p)
return NULL;
p->tsize = n;
p->njobs = 0;
p->nwaiting = 0;
Expand All @@ -705,6 +709,10 @@ hts_tpool *hts_tpool_init(int n) {
p->n_count = 0;
p->n_running = 0;
p->t = malloc(n * sizeof(p->t[0]));
if (!p->t) {
free(p);
return NULL;
}

pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
Expand Down
3 changes: 2 additions & 1 deletion vcf.c
Original file line number Diff line number Diff line change
Expand Up @@ -2583,7 +2583,8 @@ static khint_t fix_chromosome(const bcf_hdr_t *h, vdict_t *d, const char *p) {
kstring_t tmp = {0,0,0};
khint_t k;
int l;
ksprintf(&tmp, "##contig=<ID=%s>", p);
if (ksprintf(&tmp, "##contig=<ID=%s>", p) < 0)
return kh_end(d);
bcf_hrec_t *hrec = bcf_hdr_parse_line(h,tmp.s,&l);
free(tmp.s);
int res = hrec ? bcf_hdr_add_hrec((bcf_hdr_t*)h, hrec) : -1;
Expand Down

0 comments on commit 8ae25df

Please sign in to comment.