Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix thread syncronization problems #2204

Merged
merged 6 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ if (MSVC)
endif()

if (${CMAKE_C_COMPILER_ID} STREQUAL "GNU" OR ${CMAKE_C_COMPILER_ID} STREQUAL "Clang")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -pipe -W -Wall -Wnonnull -Wshadow -Wformat -Wundef -Wno-unused-parameter -Wmissing-prototypes -Wno-unknown-pragmas -Wno-int-conversion -Werror=implicit-function-declaration -D_GNU_SOURCE -std=c99")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -pipe -W -Wall -Wnonnull -Wshadow -Wformat -Wundef -Wno-unused-parameter -Wmissing-prototypes -Wno-unknown-pragmas -Wno-int-conversion -Werror=implicit-function-declaration -D_GNU_SOURCE -DRBT_IMPLICIT_LOCKING=1 -std=c99")
add_link_options(-Wl,-z,now)
endif()
if(${CMAKE_SYSTEM_NAME} STREQUAL "FreeBSD")
Expand Down
26 changes: 11 additions & 15 deletions src/OVAL/probes/SEAP/seap-command.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ static SEXP_t *__SEAP_cmd_sync_handler (SEXP_t *res, void *arg)
h->args = res;
(void) pthread_mutex_lock (&h->mtx);
h->signaled = 1;
(void) pthread_cond_signal (&h->cond);
(void) pthread_cond_broadcast (&h->cond);
(void) pthread_mutex_unlock (&h->mtx);

return (NULL);
Expand Down Expand Up @@ -322,9 +322,6 @@ SEXP_t *SEAP_cmd_exec (SEAP_CTX_t *ctx,
h.args = NULL;
h.signaled = 0;

if (pthread_mutex_lock (&(h.mtx)) != 0)
abort ();

rec = SEAP_cmdrec_new ();
rec->code = cmdptr->id;
rec->func = &__SEAP_cmd_sync_handler;
Expand Down Expand Up @@ -377,8 +374,6 @@ SEXP_t *SEAP_cmd_exec (SEAP_CTX_t *ctx,
timeout.tv_nsec = 0;
*/
for (;;) {
pthread_mutex_unlock(&h.mtx);

if (SEAP_packet_recv(ctx, sd, &packet_rcv) != 0) {
dD("FAIL: ctx=%p, sd=%d, errno=%u, %s.", ctx, sd, errno, strerror(errno));
SEAP_packet_free(packet);
Expand Down Expand Up @@ -407,21 +402,23 @@ SEXP_t *SEAP_cmd_exec (SEAP_CTX_t *ctx,
}

/* Morbo: THIS IS NOT HOW SYCHNRONIZATION WORKS! */
if (h.signaled)
if (h.signaled) {
h.signaled = 0;
break;
}
}
} else {
/*
* Someone else does receiving of events for us.
* Just wait for the condition to be signaled.
*/
if (pthread_cond_wait(&h.cond, &h.mtx) != 0) {
/*
* Fatal error - don't know how to handle
* this so let's just call abort()...
*/
abort();
}
pthread_mutex_lock(&h.mtx);
while (!h.signaled) {
pthread_cond_wait(&h.cond, &h.mtx);
}
// This might not be needed, but still
h.signaled = 0;
pthread_mutex_unlock(&h.mtx);
}

dD("cond return: h.args=%p", h.args);
Expand All @@ -436,7 +433,6 @@ SEXP_t *SEAP_cmd_exec (SEAP_CTX_t *ctx,
/*
* SEAP_cmdtbl_del(dsc->cmd_w_table, rec);
*/
pthread_mutex_unlock (&(h.mtx));
pthread_cond_destroy (&(h.cond));
pthread_mutex_destroy (&(h.mtx));
SEAP_packet_free(packet);
Expand Down
9 changes: 7 additions & 2 deletions src/OVAL/probes/SEAP/seap-packet.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,14 @@ static int SEAP_packet_sexp2msg (SEXP_t *sexp_msg, SEAP_msg_t *seap_msg)
_A(attr_i >= (SEXP_list_length (sexp_msg) - 4)/2);

seap_msg->attrs_cnt = attr_i;
void *new_attrs = realloc(seap_msg->attrs, sizeof(SEAP_attr_t) * seap_msg->attrs_cnt);
if (new_attrs != NULL || seap_msg->attrs_cnt == 0)
if (seap_msg->attrs_cnt == 0) {
free(seap_msg->attrs);
seap_msg->attrs = NULL;
} else {
void *new_attrs = realloc(seap_msg->attrs, sizeof(SEAP_attr_t) * seap_msg->attrs_cnt);
seap_msg->attrs = new_attrs;
}

seap_msg->sexp = SEXP_list_last (sexp_msg);

return (0);
Expand Down
5 changes: 3 additions & 2 deletions src/OVAL/probes/probe/icache.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ const char* thread_name = "icache_worker";
pair = &pair_mem;
dD("icache worker ready");

switch (errno = pthread_barrier_wait(&OSCAP_GSYM(th_barrier)))
switch (errno = pthread_barrier_wait(cache->th_barrier))
{
case 0:
case PTHREAD_BARRIER_SERIAL_THREAD:
Expand Down Expand Up @@ -309,7 +309,7 @@ const char* thread_name = "icache_worker";
return (NULL);
}

probe_icache_t *probe_icache_new(void)
probe_icache_t *probe_icache_new(pthread_barrier_t *th_barrier)
{
probe_icache_t *cache = malloc(sizeof(probe_icache_t));
cache->tree = rbt_i64_new();
Expand All @@ -323,6 +323,7 @@ probe_icache_t *probe_icache_new(void)
cache->queue_end = 0;
cache->queue_cnt = 0;
cache->queue_max = PROBE_IQUEUE_CAPACITY;
cache->th_barrier = th_barrier;

if (pthread_cond_init(&cache->queue_notempty, NULL) != 0) {
dE("Can't initialize icache queue condition variable (notempty): %u, %s",
Expand Down
4 changes: 3 additions & 1 deletion src/OVAL/probes/probe/icache.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <stddef.h>
#include <sexp.h>
#include "../SEAP/generic/rbt/rbt.h"
#include "common/compat_pthread_barrier.h"

#ifndef PROBE_IQUEUE_CAPACITY
#define PROBE_IQUEUE_CAPACITY 1024
Expand All @@ -41,6 +42,7 @@ typedef struct {
typedef struct {
rbt_t *tree; /* XXX: rewrite to extensible or linear hashing */
pthread_t thid;
pthread_barrier_t *th_barrier;

pthread_mutex_t queue_mutex;
pthread_cond_t queue_notempty;
Expand All @@ -58,7 +60,7 @@ typedef struct {
uint16_t count;
} probe_citem_t;

probe_icache_t *probe_icache_new(void);
probe_icache_t *probe_icache_new(pthread_barrier_t *th_barrier);
int probe_icache_add(probe_icache_t *cache, SEXP_t *cobj, SEXP_t *item);
int probe_icache_nop(probe_icache_t *cache);
void probe_icache_free(probe_icache_t *cache);
Expand Down
2 changes: 1 addition & 1 deletion src/OVAL/probes/probe/input_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void *probe_input_handler(void *arg)

pthread_cleanup_push(pthread_attr_cleanup_handler, (void *)&pth_attr);

switch (errno = pthread_barrier_wait(&OSCAP_GSYM(th_barrier)))
switch (errno = pthread_barrier_wait(probe->th_barrier))
{
case 0:
case PTHREAD_BARRIER_SERIAL_THREAD:
Expand Down
3 changes: 1 addition & 2 deletions src/OVAL/probes/probe/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ typedef struct {

pthread_t th_input;
pthread_t th_signal;
pthread_barrier_t *th_barrier;

rbt_t *workers;
uint32_t max_threads;
Expand Down Expand Up @@ -105,6 +106,4 @@ typedef enum {
PROBE_OFFLINE_ALL = 0x0f
} probe_offline_flags;

extern pthread_barrier_t OSCAP_GSYM(th_barrier);

#endif /* PROBE_H */
11 changes: 7 additions & 4 deletions src/OVAL/probes/probe/probe_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ bool OSCAP_GSYM(varref_handling) = true;
char **OSCAP_GSYM(no_varref_ents) = NULL;
size_t OSCAP_GSYM(no_varref_ents_cnt) = 0;

pthread_barrier_t OSCAP_GSYM(th_barrier);

extern probe_ncache_t *OSCAP_GSYM(ncache);

static int probe_optecmp(char **a, char **b)
Expand Down Expand Up @@ -191,8 +189,10 @@ void *probe_common_main(void *arg)

dD("probe_common_main started");

pthread_barrier_t barrier;

const unsigned thread_count = 2; // input and icache threads
if ((errno = pthread_barrier_init(&OSCAP_GSYM(th_barrier), NULL, thread_count)) != 0) {
if ((errno = pthread_barrier_init(&barrier, NULL, thread_count)) != 0) {
fail(errno, "pthread_barrier_init", __LINE__ - 6);
}

Expand All @@ -218,9 +218,10 @@ void *probe_common_main(void *arg)
* Initialize result & name caching
*/
probe.rcache = probe_rcache_new();
probe.icache = probe_icache_new();
probe.icache = probe_icache_new(&barrier);
probe_ncache_clear(OSCAP_GSYM(ncache));
probe.ncache = OSCAP_GSYM(ncache);
probe.th_barrier = &barrier;

/*
* Initialize probe option handlers
Expand Down Expand Up @@ -266,5 +267,7 @@ void *probe_common_main(void *arg)

pthread_cleanup_pop(1);

pthread_barrier_destroy(&barrier);

return NULL;
}
11 changes: 11 additions & 0 deletions src/OVAL/probes/probe/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ static void preload_libraries_before_chroot()
pthread_join(t, NULL);
}

static void pthread_pair_cleanup_handler(void *arg)
{
probe_pwpair_t *pair = (probe_pwpair_t *)arg;
dW("Probe worker thread finished unxpectedly, trying to avoid deadlock now...");
SEAP_replyerr(pair->probe->SEAP_ctx, pair->probe->sd, pair->pth->msg, -100);
}

void *probe_worker_runfn(void *arg)
{
dD("probe_worker_runfn has started");
Expand All @@ -88,6 +95,8 @@ void *probe_worker_runfn(void *arg)
# endif
#endif
dD("handling SEAP message ID %u", pair->pth->sid);
pthread_cleanup_push(pthread_pair_cleanup_handler, (void *)pair);

//
probe_ret = -1;
probe_res = pair->pth->msg_handler(pair->probe, pair->pth->msg, &probe_ret);
Expand Down Expand Up @@ -172,6 +181,8 @@ void *probe_worker_runfn(void *arg)
free(pair);
pthread_detach(pthread_self());

pthread_cleanup_pop(0);

dD("probe_worker_runfn has finished");
return (NULL);
}
Expand Down
5 changes: 0 additions & 5 deletions src/common/oscap_pcre.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ oscap_pcre_t *oscap_pcre_compile(const char *pattern, oscap_pcre_options_t optio
int errno;
PCRE2_SIZE erroffset2;
res->re_ctx = NULL;
dD("pcre2_compile_8: patt=%s", pattern);
res->re = pcre2_compile_8((PCRE2_SPTR)pattern, PCRE2_ZERO_TERMINATED, _oscap_pcre_opts_to_pcre(options), &errno, &erroffset2, NULL);
if (res->re == NULL) {
PCRE2_UCHAR8 errmsg[PCRE2_ERR_BUF_SIZE];
Expand Down Expand Up @@ -139,13 +138,9 @@ int oscap_pcre_exec(const oscap_pcre_t *opcre, const char *subject,
// The ovecsize is multiplied by 3 in the code for compatibility with PCRE1
int ovecsize2 = ovecsize/3;
pcre2_match_data_8 *mdata = pcre2_match_data_create_8(ovecsize2, NULL);
dD("pcre2_match_8: subj=%s", subject);
rc = pcre2_match_8(opcre->re, (PCRE2_SPTR8)subject, length, startoffset, _oscap_pcre_opts_to_pcre(options), mdata, opcre->re_ctx);
dD("pcre2_match_8: rc=%d, ", rc);
if (rc > PCRE2_ERROR_NOMATCH) {
PCRE2_SIZE *ovecp = pcre2_get_ovector_pointer_8(mdata);
uint32_t ovecp_count = pcre2_get_ovector_count_8(mdata);
dD("pcre2_match_8: pcre2_get_ovector_count_8=%d", ovecp_count);
for (int i = 0; i < rc; i++) {
if (i < ovecsize2) {
ovector[i*2] = ovecp[i*2];
Expand Down
Loading