Skip to content

Commit

Permalink
Don't restart threads every call to normalize (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
developedby committed Jun 17, 2024
1 parent 9d6d0f6 commit 6059aea
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 99 deletions.
153 changes: 108 additions & 45 deletions src/hvm.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ static const f32 I24_MIN = (f32) (i32) ((-1u) << 23);
#define G_VARS_LEN (1ul << 29) // max 536m vars
#define G_RBAG_LEN (TPC * RLEN)


typedef struct Net {
APair node_buf[G_NODE_LEN]; // global node buffer
APort vars_buf[G_VARS_LEN]; // global vars buffer
Expand All @@ -146,20 +147,6 @@ typedef struct Def {

typedef struct Book Book;

// A Foreign Function
typedef struct {
char name[256];
Port (*func)(Net*, Book*, Port);
} FFn;

// Book of Definitions
typedef struct Book {
u32 defs_len;
Def defs_buf[0x4000];
u32 ffns_len;
FFn ffns_buf[0x4000];
} Book;

// Local Thread Memory
typedef struct TM {
u32 tid; // thread id
Expand All @@ -174,6 +161,40 @@ typedef struct TM {
Pair hbag_buf[HLEN]; // high-priority redexes
} TM;

// Thread data
typedef struct {
Net* net;
TM* tm;
Book* book;
u32 done;
pthread_mutex_t* work_mutex;
pthread_cond_t* work_cond;
pthread_mutex_t* done_mutex;
pthread_cond_t* done_cond;
} ThreadArg;

typedef struct {
Net* net;
Book* book;
ThreadArg* thread_arg;
pthread_t* threads;
} State;

// A Foreign Function
typedef struct {
char name[256];
Port (*func)(State, Port);
} FFn;

// Book of Definitions
typedef struct Book {
u32 defs_len;
Def defs_buf[0x4000];
u32 ffns_len;
FFn ffns_buf[0x4000];
} Book;


// Debugger
// --------

Expand Down Expand Up @@ -1217,16 +1238,20 @@ void evaluator(Net* net, TM* tm, Book* book) {
// Normalizer
// ----------

// Thread data
typedef struct {
Net* net;
TM* tm;
Book* book;
} ThreadArg;

void* thread_func(void* arg) {
ThreadArg* data = (ThreadArg*)arg;
evaluator(data->net, data->tm, data->book);
while (TRUE) {
// Wait for main to signal that the thread should run
pthread_mutex_lock(data->work_mutex);
pthread_cond_wait(data->work_cond, data->work_mutex);

evaluator(data->net, data->tm, data->book);

// Signal main that the work of this thread is done
data->done = 1;
pthread_cond_signal(data->done_cond);
pthread_mutex_unlock(data->work_mutex);
}
return NULL;
}

Expand All @@ -1238,37 +1263,41 @@ void boot_redex(Net* net, Pair redex) {

// Evaluates all redexes.
// TODO: cache threads to avoid spawning overhead
void normalize(Net* net, Book* book) {
// Inits thread_arg objects
ThreadArg thread_arg[TPC];
for (u32 t = 0; t < TPC; ++t) {
thread_arg[t].net = net;
thread_arg[t].tm = tm[t];
thread_arg[t].book = book;
}
void normalize(State state) {
u64 start_time = time64();

// Spawns the evaluation threads
pthread_t threads[TPC];
// Signal the threads to start.
for (u32 t = 0; t < TPC; ++t) {
pthread_create(&threads[t], NULL, thread_func, &thread_arg[t]);
//printf("Signaling thread %d\n", t);
state.thread_arg[t].done = 0;
pthread_mutex_lock(state.thread_arg[t].done_mutex);
pthread_cond_signal(state.thread_arg[t].work_cond);
}

// Wait for the threads to finish
// Wait for the threads to signal their finish
for (u32 t = 0; t < TPC; ++t) {
pthread_join(threads[t], NULL);
while (state.thread_arg[t].done == 0) {
pthread_cond_wait(state.thread_arg[t].done_cond, state.thread_arg[t].done_mutex);
}
pthread_mutex_unlock(state.thread_arg[t].done_mutex);
//printf("Thread %d finished\n", t);
}

u64 end_time = time64();
u64 time = (end_time - start_time) / 1000000;
printf("normalize took %llu ms\n", time);
}

// Util: expands a REF Port.
Port expand(Net* net, Book* book, Port port) {
Port old = vars_load(net, get_val(ROOT));
Port got = peek(net, port);
Port expand(State state, Port port) {
Port old = vars_load(state.net, get_val(ROOT));
Port got = peek(state.net, port);
while (get_tag(got) == REF) {
boot_redex(net, new_pair(new_port(REF,get_val(got)), ROOT));
normalize(net, book);
got = peek(net, vars_load(net, get_val(ROOT)));
boot_redex(state.net, new_pair(new_port(REF,get_val(got)), ROOT));
normalize(state);
got = peek(state.net, vars_load(state.net, get_val(ROOT)));
}
vars_create(net, get_val(ROOT), old);
vars_create(state.net, get_val(ROOT), old);
return got;
}

Expand Down Expand Up @@ -1752,7 +1781,7 @@ void pretty_print_port(Net* net, Book* book, Port port) {
//COMPILED_BOOK_BUF//

#ifdef IO
void do_run_io(Net* net, Book* book, Port port);
void do_run_io(State state, Port port);
#endif

// Main
Expand All @@ -1779,12 +1808,46 @@ void hvm_c(u32* book_buffer) {
// Creates an initial redex that calls main
boot_redex(net, new_pair(new_port(REF, 0), ROOT));

// Inits thread_arg objects
ThreadArg thread_arg[TPC];
pthread_mutex_t work_mutex[TPC];
pthread_cond_t work_cond[TPC];
pthread_mutex_t done_mutex[TPC];
pthread_cond_t done_cond[TPC];
for (u32 t = 0; t < TPC; ++t) {
pthread_mutex_init(&work_mutex[t], NULL);
pthread_cond_init(&work_cond[t], NULL);
pthread_mutex_init(&done_mutex[t], NULL);
pthread_cond_init(&done_cond[t], NULL);

thread_arg[t].net = net;
thread_arg[t].tm = tm[t];
thread_arg[t].book = book;
thread_arg[t].work_mutex = &work_mutex[t];
thread_arg[t].work_cond = &work_cond[t];
thread_arg[t].done_mutex = &done_mutex[t];
thread_arg[t].done_cond = &done_cond[t];
}
pthread_t threads[TPC];
State state = {net, book, thread_arg, threads};

// Spawns the evaluation threads
for (u32 t = 0; t < TPC; ++t) {
pthread_create(&state.threads[t], NULL, thread_func, &state.thread_arg[t]);
}

#ifdef IO
do_run_io(net, book, ROOT);
do_run_io(state, ROOT);
#else
normalize(net, book);
normalize(state);
#endif

printf("Threads done\n");

for (u32 t = 0; t < TPC; ++t) {
pthread_cancel(state.threads[t]);
}

// Prints the result
printf("Result: ");
pretty_print_port(net, book, enter(net, ROOT));
Expand Down
Loading

0 comments on commit 6059aea

Please sign in to comment.