Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ endif()

# Link with thread library, unless we are on the Zephyr platform.
if(NOT DEFINED LF_SINGLE_THREADED OR DEFINED LF_TRACE)
if (NOT PLATFORM_ZEPHYR)
if (NOT (PLATFORM_ZEPHYR OR PLATFORM_RP2040))
find_package(Threads REQUIRED)
target_link_libraries(reactor-c PUBLIC Threads::Threads)
endif()
Expand Down
1 change: 1 addition & 0 deletions core/platform/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Zephyr")
set(PLATFORM_ZEPHYR true)
elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Rp2040")
list(APPEND REACTORC_COMPILE_DEFS PLATFORM_RP2040)
set(PLATFORM_RP2040 true)
endif()

# Prepend all sources with platform
Expand Down
117 changes: 111 additions & 6 deletions core/platform/lf_rp2040_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* @author{Abhi Gundrala <gundralaa@berkeley.edu>}
*/

#if !defined(LF_SINGLE_THREADED)
#error "Only the single-threaded runtime has support for RP2040"
#endif

#include "lf_rp2040_support.h"
#include "platform.h"
#include "utils/util.h"
Expand All @@ -51,6 +47,11 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
static critical_section_t _lf_crit_sec;

/**
* critical section struct for atomics implementation
*/
static critical_section_t _lf_atomics_crit_sec;

/**
* binary semaphore for lf event notification
* used by external isr or second core thread.
Expand All @@ -63,13 +64,16 @@ static uint32_t _lf_num_nested_crit_sec = 0;

/**
* Initialize basic runtime infrastructure and
* synchronization structs for an single-threaded runtime.
* synchronization structs for a single-threaded runtime.
*/
void _lf_initialize_clock(void) {
// init stdio lib
// may fail, but failure may be ok/expected if printing is not needed
// (i.e. if neither USB nor UART are enabled)
stdio_init_all();
// init sync structs
critical_section_init(&_lf_crit_sec);
critical_section_init(&_lf_atomics_crit_sec);
sem_init(&_lf_sem_irq_event, 0, 1);
}

Expand Down Expand Up @@ -203,9 +207,110 @@ int lf_enable_interrupts_nested() {
*/
int _lf_single_threaded_notify_of_event() {
// notify main sleep loop of event
sem_release(&_lf_sem_irq_event);
if (sem_release(&_lf_sem_irq_event)) {
return 0;
}
return 1;
}

#else // LF_SINGLE_THREADED

#warning "Threaded runtime on RP2040 is still experimental"

/**
* @brief Get the number of cores on the host machine.
*/
int lf_available_cores() {
return 2;
}

static void *(*thread_1) (void *);
static void* thread_1_args;
static int num_create_threads_called = 0;
static semaphore_t thread_1_done;
static void* thread_1_return;

#define MAGIC_THREAD1_ID 314159

void core1_entry() {
thread_1_return = thread_1(thread_1_args);
sem_reset(&thread_1_done, 1);
}

int lf_thread_create(lf_thread_t* thread, void *(*lf_thread) (void *), void* arguments) {
// make sure this fn is only called once
if (num_create_threads_called != 0) {
return 1;
}
thread_1 = lf_thread;
thread_1_args = arguments;
num_create_threads_called += 1;
sem_init(&thread_1_done, 0, 1);
multicore_launch_core1(core1_entry);
*thread = MAGIC_THREAD1_ID;
return 0;
}

int lf_thread_join(lf_thread_t thread, void** thread_return) {
if (thread != MAGIC_THREAD1_ID) {
return 1;
}
sem_acquire_blocking(&thread_1_done);
// release in case join is called again
if (!sem_release(&thread_1_done)) {
// shouldn't be possible; lf_thread_join is only called from main thread
return 1;
}
if (thread_return) {
*thread_return = thread_1_return;
}
return 0;
}

int lf_mutex_init(lf_mutex_t* mutex) {
recursive_mutex_init(mutex);
return 0;
}

int lf_mutex_lock(lf_mutex_t* mutex) {
recursive_mutex_enter_blocking(mutex);
return 0;
}

int lf_mutex_unlock(lf_mutex_t* mutex) {
recursive_mutex_exit(mutex);
return 0;
}

int lf_cond_init(lf_cond_t* cond, lf_mutex_t* mutex) {
sem_init(&(cond->sema), 0, 1);
cond->mutex = mutex;
return 0;
}

int lf_cond_broadcast(lf_cond_t* cond) {
sem_reset(&(cond->sema), 1);
return 0;
}

int lf_cond_signal(lf_cond_t* cond) {
sem_reset(&(cond->sema), 1);
return 0;
}

int lf_cond_wait(lf_cond_t* cond) {
lf_mutex_unlock(cond->mutex);
sem_acquire_blocking(&(cond->sema));
lf_mutex_lock(cond->mutex);
return 0;
}

int _lf_cond_timedwait(lf_cond_t* cond, instant_t absolute_time_ns) {
absolute_time_t a = from_us_since_boot(absolute_time_ns / 1000);
bool acquired_permit = sem_acquire_block_until(&(cond->sema), a);
return acquired_permit ? 0 : LF_TIMEOUT;
}

#endif // LF_SINGLE_THREADED


Expand Down
50 changes: 32 additions & 18 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -1026,17 +1026,6 @@ void lf_print_snapshot(environment_t* env) {
}
}

// Start threads in the thread pool.
void start_threads(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);

LF_PRINT_LOG("Starting %u worker threads in environment", env->num_workers);
for (unsigned int i = 0; i < env->num_workers; i++) {
if (lf_thread_create(&env->thread_ids[i], worker, env) != 0) {
lf_print_error_and_exit("Could not start thread-%u", i);
}
}
}

/**
* @brief Determine the number of workers.
Expand Down Expand Up @@ -1161,23 +1150,48 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
_lf_initialize_start_tag(env);

lf_print("Environment %u: ---- Spawning %d workers.",env->id, env->num_workers);
start_threads(env);

for (unsigned int j = 0; j < env->num_workers; j++) {
if (i == 0 && j == 0) {
// The first worker thread of the first environment will be
// run on the main thread, rather than creating a new thread.
// This is important for bare-metal platforms, who can't
// afford to have the main thread sit idle.
continue;
}
if (lf_thread_create(&env->thread_ids[j], worker, env) != 0) {
lf_print_error_and_exit("Could not start thread-%u", j);
}
}

// Unlock mutex and allow threads proceed
LF_MUTEX_UNLOCK(&env->mutex);
}

// main thread worker (first worker thread of first environment)
void* main_thread_exit_status = NULL;
if (num_envs > 0 && envs[0].num_workers > 0) {
environment_t *env = &envs[0];
main_thread_exit_status = worker(env);
}

for (int i = 0; i<num_envs; i++) {
// Wait for the worker threads to exit.
environment_t* env = &envs[i];
void* worker_thread_exit_status = NULL;
int ret = 0;
for (int i = 0; i < env->num_workers; i++) {
int failure = lf_thread_join(env->thread_ids[i], &worker_thread_exit_status);
if (failure) {
lf_print_error("Failed to join thread listening for incoming messages: %s", strerror(failure));
}
for (int j = 0; j < env->num_workers; j++) {
if (i == 0 && j == 0) {
// main thread worker
worker_thread_exit_status = main_thread_exit_status;
} else {
int failure = lf_thread_join(env->thread_ids[j], &worker_thread_exit_status);
if (failure) {
lf_print_error("Failed to join thread listening for incoming messages: %s", strerror(failure));
}
}
if (worker_thread_exit_status != NULL) {
lf_print_error("---- Worker %d reports error code %p", i, worker_thread_exit_status);
lf_print_error("---- Worker %d reports error code %p", j, worker_thread_exit_status);
ret = 1;
}
}
Expand Down
12 changes: 12 additions & 0 deletions include/core/platform/lf_rp2040_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,16 @@
#define LF_TIME_BUFFER_LENGTH 80
#define _LF_TIMEOUT 1

#ifndef LF_SINGLE_THREADED
#warning "Threaded support on rp2040 is still experimental"

typedef recursive_mutex_t lf_mutex_t;
typedef struct {
semaphore_t sema;
lf_mutex_t* mutex;
} lf_cond_t;
typedef int lf_thread_t;

#endif // LF_SINGLE_THREADED

#endif // LF_PICO_SUPPORT_H