diff --git a/meson.build b/meson.build index 49dcaf0e..8733f97c 100644 --- a/meson.build +++ b/meson.build @@ -88,6 +88,7 @@ foreach ident : [ ['copy_file_range', '''#define _GNU_SOURCE #include #include '''], + ['reallocarray', '''#include '''], ] have = cc.has_function(ident[0], args : '-D_GNU_SOURCE', prefix : ident[1]) conf.set10('HAVE_' + ident[0].to_upper(), have) diff --git a/src/affinity-count.c b/src/affinity-count.c new file mode 100644 index 00000000..29f749ea --- /dev/null +++ b/src/affinity-count.c @@ -0,0 +1,39 @@ +#include "affinity-count.h" + +#include +#include +#include + +int cpus_in_affinity_mask(void) { + size_t n = 16; + int r; + + for (;;) { + cpu_set_t *c; + + c = CPU_ALLOC(n); + if (!c) + return -ENOMEM; + + if (sched_getaffinity(0, CPU_ALLOC_SIZE(n), c) >= 0) { + int k; + + k = CPU_COUNT_S(CPU_ALLOC_SIZE(n), c); + CPU_FREE(c); + + if (k <= 0) + return -EINVAL; + + return k; + } + + r = -errno; + CPU_FREE(c); + + if (r != -EINVAL) + return r; + if (n*2 < n) + return -ENOMEM; + n *= 2; + } +} diff --git a/src/affinity-count.h b/src/affinity-count.h new file mode 100644 index 00000000..0a07d098 --- /dev/null +++ b/src/affinity-count.h @@ -0,0 +1,8 @@ +/* SPDX-License-Identifier: LGPL-2.1+ */ + +#ifndef fooaffinitycounthfoo +#define fooaffinitycounthfoo + +int cpus_in_affinity_mask(void); + +#endif diff --git a/src/castore.c b/src/castore.c index c290dfd9..612b17eb 100644 --- a/src/castore.c +++ b/src/castore.c @@ -3,9 +3,12 @@ #include #include #include +#include +#include #include #include +#include "affinity-count.h" #include "castore.h" #include "def.h" #include "dirent-util.h" @@ -19,6 +22,8 @@ /* #undef EBADMSG */ /* #define EBADMSG __LINE__ */ +#define WORKER_THREADS_MAX 64U + struct CaStore { char *root; bool is_cache:1; @@ -34,6 +39,10 @@ struct CaStore { uint64_t n_requests; uint64_t n_request_bytes; + + pthread_t worker_threads[WORKER_THREADS_MAX]; + size_t n_worker_threads, n_worker_threads_max; + int worker_thread_socket[2]; }; struct CaStoreIterator { @@ -47,14 +56,17 @@ struct CaStoreIterator { CaStore* ca_store_new(void) { CaStore *store; - store = new0(CaStore, 1); + store = new(CaStore, 1); if (!store) return NULL; - store->digest_type = _CA_DIGEST_TYPE_INVALID; - - store->compression = CA_CHUNK_COMPRESSED; - store->compression_type = CA_COMPRESSION_DEFAULT; + *store = (CaStore) { + .digest_type = _CA_DIGEST_TYPE_INVALID, + .compression = CA_CHUNK_COMPRESSED, + .compression_type = CA_COMPRESSION_DEFAULT, + .worker_thread_socket = { -1, -1}, + .n_worker_threads_max = (size_t) -1, + }; return store; } @@ -62,13 +74,18 @@ CaStore* ca_store_new(void) { CaStore *ca_store_new_cache(void) { CaStore *s; - s = new0(CaStore, 1); + s = new(CaStore, 1); if (!s) return NULL; - s->is_cache = true; - s->compression = CA_CHUNK_AS_IS; - s->compression_type = CA_COMPRESSION_DEFAULT; + *s = (CaStore) { + .is_cache = true, + .compression = CA_CHUNK_AS_IS, + .compression_type = CA_COMPRESSION_DEFAULT, + + .worker_thread_socket = { -1, -1 }, + .n_worker_threads_max = (size_t) -1, + }; return s; } @@ -77,6 +94,8 @@ CaStore* ca_store_unref(CaStore *store) { if (!store) return NULL; + (void) ca_store_finalize(store); + if (store->is_cache && store->root) (void) rm_rf(store->root, REMOVE_ROOT|REMOVE_PHYSICAL); @@ -240,6 +259,203 @@ int ca_store_has(CaStore *store, const CaChunkID *chunk_id) { return ca_chunk_file_test(AT_FDCWD, store->root, chunk_id); } +struct queue_entry { + CaChunkID chunk_id; + CaChunkCompression effective_compression; + void *data; + size_t size; +}; + +static void* worker_thread(void *p) { + CaStore *store = p; + int ret = 0, r; + + assert(store); + assert(store->worker_thread_socket[1] >= 0); + + (void) pthread_setname_np(pthread_self(), "worker-thread"); + + for (;;) { + struct queue_entry e; + ssize_t n; + + n = recv(store->worker_thread_socket[0], &e, sizeof(e), 0); + if (n < 0) { + if (errno == EINTR) + continue; + + log_debug_errno(errno, "Failed to read from thread pool socket: %m"); + return INT_TO_PTR(errno); + } + if (n == 0) /* Either EOF or zero-sized datagram (Linux doesn't really allow us to + * distinguish that), we take both as an indication to exit the worker thread. */ + break; + + assert(n == sizeof(e)); + + r = ca_chunk_file_save( + AT_FDCWD, store->root, + &e.chunk_id, + e.effective_compression, store->compression, + store->compression_type, + e.data, e.size); + free(e.data); + + if (r < 0) { + log_debug_errno(r, "Failed to store chunk in store: %m"); + + if (r != -EEXIST) + ret = r; + } + } + + return INT_TO_PTR(ret); +} + +static int determine_worker_threads_max(CaStore *store) { + const char *e; + int r; + + assert(store); + + if (store->n_worker_threads_max != (size_t) -1) + return 0; + + e = getenv("CASYNC_WORKER_THREADS"); + if (e) { + unsigned u; + + r = safe_atou(e, &u); + if (r < 0) + log_debug_errno(r, "Failed to parse $CASYNC_WORKER_THREADS, ignoring: %s", e); + else if (u > WORKER_THREADS_MAX) { + log_debug("$CASYNC_WORKER_THREADS out of range, clamping to %zu: %s", (size_t) WORKER_THREADS_MAX, e); + store->n_worker_threads_max = WORKER_THREADS_MAX; + } else { + store->n_worker_threads_max = u; + return 0; + } + } + + r = cpus_in_affinity_mask(); + if (r < 0) + return log_debug_errno(r, "Failed to determine CPUs in affinity mask: %m"); + + store->n_worker_threads_max = MIN((size_t) r, WORKER_THREADS_MAX); + return 0; +} + +static int start_worker_thread(CaStore *store) { + int r; + + assert(store); + + r = determine_worker_threads_max(store); + if (r < 0) + return r; + + if (store->n_worker_threads >= (size_t) store->n_worker_threads_max) + return 0; + + if (store->worker_thread_socket[0] < 0) + if (socketpair(AF_UNIX, SOCK_SEQPACKET|SOCK_CLOEXEC, 0, store->worker_thread_socket) < 0) + return -errno; + + r = pthread_create(store->worker_threads + store->n_worker_threads, NULL, worker_thread, store); + if (r != 0) + return -r; + + store->n_worker_threads++; + + log_debug("Started store worker thread %zu.", store->n_worker_threads); + return 0; +} + +static int submit_to_worker_thread( + CaStore *store, + const CaChunkID *chunkid, + CaChunkCompression effective_compression, + const void *p, + uint64_t l) { + + struct queue_entry e; + void *copy = NULL; + ssize_t n; + int r; + + assert(store); + + /* If there's no need to compress/decompress, then let's do things client side, since the operation + * is likely IO bound, not CPU bound */ + if (store->compression == CA_CHUNK_AS_IS || + store->compression == effective_compression) + return -ENOANO; + + /* Before we submit the chunk for compression, let's see if it exists already. If so, let's return + * -EEXIST right away, so that the caller can count reused chunks. Note that this is a bit racy + * currently, as submitted but not yet processed chunks are not considered. */ + r = ca_store_has(store, chunkid); + if (r < 0) + return r; + if (r > 0) + return -EEXIST; + + /* Let's start a new worker thread each time we have a new job to process, until we reached all + * worker threads we need */ + (void) start_worker_thread(store); + + /* If there are no worker threads, do things client side */ + if (store->n_worker_threads <= 0 || + store->worker_thread_socket[1] < 0) + return -ENETDOWN; + + copy = memdup(p, l); + if (!copy) + return -ENOMEM; + + e = (struct queue_entry) { + .chunk_id = *chunkid, + .effective_compression = effective_compression, + .data = copy, + .size = l, + }; + + n = send(store->worker_thread_socket[1], &e, sizeof(e), 0); + if (n < 0) { + free(copy); + return -errno; + } + + assert(n == sizeof(e)); + return 0; +} + +int ca_store_finalize(CaStore *store) { + int ret = 0, r; + size_t i; + + assert(store); + + /* Trigger EOF in all worker threads */ + store->worker_thread_socket[1] = safe_close(store->worker_thread_socket[1]); + + for (i = 0; i < store->n_worker_threads; i++) { + void *p; + r = pthread_join(store->worker_threads[i], &p); + if (r != 0) + ret = -r; + if (p != NULL) + ret = -PTR_TO_INT(p); + } + + store->n_worker_threads = 0; + store->worker_thread_socket[0] = safe_close(store->worker_thread_socket[0]); + + /* Propagate errors we ran into while processing store requests. This is useful for callers to + * determine whether the worker threads ran into any problems. */ + return ret; +} + int ca_store_put( CaStore *store, const CaChunkID *chunk_id, @@ -273,6 +489,14 @@ int ca_store_put( store->mkdir_done = true; } + r = submit_to_worker_thread( + store, + chunk_id, + effective_compression, + data, size); + if (r >= 0) + return 0; + return ca_chunk_file_save( AT_FDCWD, store->root, chunk_id, diff --git a/src/castore.h b/src/castore.h index 003bb955..0d2ee5c3 100644 --- a/src/castore.h +++ b/src/castore.h @@ -25,6 +25,8 @@ int ca_store_get(CaStore *store, const CaChunkID *chunk_id, CaChunkCompression d int ca_store_has(CaStore *store, const CaChunkID *chunk_id); int ca_store_put(CaStore *store, const CaChunkID *chunk_id, CaChunkCompression effective_compression, const void *data, uint64_t size); +int ca_store_finalize(CaStore *store); + int ca_store_get_requests(CaStore *s, uint64_t *ret); int ca_store_get_request_bytes(CaStore *s, uint64_t *ret); diff --git a/src/casync.c b/src/casync.c index 9fee77f7..c8219205 100644 --- a/src/casync.c +++ b/src/casync.c @@ -1079,7 +1079,7 @@ int ca_sync_add_store_path(CaSync *s, const char *path) { return r; } - array = realloc_multiply(s->rstores, sizeof(CaStore*), s->n_rstores+1); + array = reallocarray(s->rstores, sizeof(CaStore*), s->n_rstores+1); if (!array) { ca_store_unref(store); return -ENOMEM; @@ -1110,7 +1110,7 @@ int ca_sync_add_store_remote(CaSync *s, const char *url) { return r; } - array = realloc_multiply(s->remote_rstores, sizeof(CaRemote*), s->n_remote_rstores+1); + array = reallocarray(s->remote_rstores, sizeof(CaRemote*), s->n_remote_rstores+1); if (!array) { ca_remote_unref(remote); return -ENOMEM; @@ -1145,7 +1145,7 @@ static int ca_sync_extend_seeds_array(CaSync *s) { assert(s); - new_seeds = realloc_multiply(s->seeds, sizeof(CaSeed*), s->n_seeds+1); + new_seeds = reallocarray(s->seeds, sizeof(CaSeed*), s->n_seeds+1); if (!new_seeds) return -ENOMEM; @@ -2058,6 +2058,14 @@ static int ca_sync_step_encode(CaSync *s) { if (r < 0) return r; + if (s->wstore) { + /* Make sure the store ends all worker threads and pick up any pending errors from + * it */ + r = ca_store_finalize(s->wstore); + if (r < 0) + return r; + } + r = ca_sync_install_archive(s); if (r < 0) return r; diff --git a/src/meson.build b/src/meson.build index 5b1ce782..f1084bed 100644 --- a/src/meson.build +++ b/src/meson.build @@ -10,6 +10,8 @@ util_sources = files(''' '''.split()) libshared_sources = files(''' + affinity-count.c + affinity-count.h cacache.c cacache.h cachunk.c diff --git a/src/util.c b/src/util.c index 299e4bba..3f21e1e4 100644 --- a/src/util.c +++ b/src/util.c @@ -919,7 +919,7 @@ int strv_push(char ***l, char *value) { if (m < n) return -ENOMEM; - c = realloc_multiply(*l, sizeof(char*), m); + c = reallocarray(*l, sizeof(char*), m); if (!c) return -ENOMEM; diff --git a/src/util.h b/src/util.h index dc9d03e4..ebbe0063 100644 --- a/src/util.h +++ b/src/util.h @@ -24,10 +24,32 @@ #include "gcc-macro.h" #include "log.h" -#define new(t, n) ((t*) malloc((n) * sizeof(t))) -#define new0(t, n) ((t*) calloc((n), sizeof(t))) +/* If for some reason more than 4M are allocated on the stack, let's abort immediately. It's better than + * proceeding and smashing the stack limits. Note that by default RLIMIT_STACK is 8M on Linux. */ +#define ALLOCA_MAX (4U*1024U*1024U) -#define newa(t, n) ((t*) alloca((n) * sizeof(t))) +#define new(t, n) ((t*) malloc_multiply(sizeof(t), (n))) +#define new0(t, n) ((t*) calloc((n) ?: 1, sizeof(t))) + +#define newa(t, n) \ + ({ \ + size_t _n_ = n; \ + assert(!size_multiply_overflow(sizeof(t), _n_)); \ + assert(sizeof(t)*_n_ <= ALLOCA_MAX); \ + (t*) alloca(sizeof(t)*_n_); \ + }) + +#define newa0(t, n) \ + ({ \ + size_t _n_ = n; \ + assert(!size_multiply_overflow(sizeof(t), _n_)); \ + assert(sizeof(t)*_n_ <= ALLOCA_MAX); \ + (t*) alloca0(sizeof(t)*_n_); \ + }) + +#define newdup(t, p, n) ((t*) memdup_multiply(p, sizeof(t), (n))) + +#define malloc0(n) (calloc(1, (n))) #define XCONCATENATE(x, y) x ## y #define CONCATENATE(x, y) XCONCATENATE(x, y) @@ -63,8 +85,6 @@ ((_A) > (_B)) ? (_A) : (_B), \ (void)0)) - - int loop_write(int fd, const void *p, size_t l); int loop_write_block(int fd, const void *p, size_t l); ssize_t loop_read(int fd, void *p, size_t l); @@ -330,22 +350,39 @@ char *strv_find(char **l, const char *name) _pure_; #define strv_contains(l, s) (!!strv_find((l), (s))) static inline bool size_multiply_overflow(size_t size, size_t need) { - return need != 0 && size > (SIZE_MAX / need); + return _unlikely_(need != 0 && size > (SIZE_MAX / need)); } _malloc_ _alloc_(1, 2) static inline void *malloc_multiply(size_t size, size_t need) { - if (_unlikely_(size_multiply_overflow(size, need))) + if (size_multiply_overflow(size, need)) return NULL; - return malloc(size * need); + return malloc(size * need ?: 1); } -_alloc_(2, 3) static inline void *realloc_multiply(void *p, size_t size, size_t need) { - if (_unlikely_(size_multiply_overflow(size, need))) +#if !HAVE_REALLOCARRAY +_alloc_(2, 3) static inline void *reallocarray(void *p, size_t need, size_t size) { + if (size_multiply_overflow(size, need)) return NULL; - return realloc(p, size * need); + return realloc(p, size * need ?: 1); } +#endif + +_alloc_(2, 3) static inline void *memdup_multiply(const void *p, size_t size, size_t need) { + if (size_multiply_overflow(size, need)) + return NULL; + + return memdup(p, size * need); +} + +#define free_and_replace(a, b) \ + ({ \ + free(a); \ + (a) = (b); \ + (b) = NULL; \ + 0; \ + }) #define STRV_FOREACH(s, l) \ for ((s) = (l); (s) && *(s); (s)++)