diff --git a/src/hvm.c b/src/hvm.c index 786de2d2..9ea6cede 100644 --- a/src/hvm.c +++ b/src/hvm.c @@ -124,6 +124,7 @@ static const f32 I24_MIN = (f32) (i32) ((-1u) << 23); #define G_VARS_LEN (1ul << 29) // max 536m vars #define G_RBAG_LEN (TPC * RLEN) + typedef struct Net { APair node_buf[G_NODE_LEN]; // global node buffer APort vars_buf[G_VARS_LEN]; // global vars buffer @@ -146,20 +147,6 @@ typedef struct Def { typedef struct Book Book; -// A Foreign Function -typedef struct { - char name[256]; - Port (*func)(Net*, Book*, Port); -} FFn; - -// Book of Definitions -typedef struct Book { - u32 defs_len; - Def defs_buf[0x4000]; - u32 ffns_len; - FFn ffns_buf[0x4000]; -} Book; - // Local Thread Memory typedef struct TM { u32 tid; // thread id @@ -174,6 +161,40 @@ typedef struct TM { Pair hbag_buf[HLEN]; // high-priority redexes } TM; +// Thread data +typedef struct { + Net* net; + TM* tm; + Book* book; + u32 done; + pthread_mutex_t* work_mutex; + pthread_cond_t* work_cond; + pthread_mutex_t* done_mutex; + pthread_cond_t* done_cond; +} ThreadArg; + +typedef struct { + Net* net; + Book* book; + ThreadArg* thread_arg; + pthread_t* threads; +} State; + +// A Foreign Function +typedef struct { + char name[256]; + Port (*func)(State, Port); +} FFn; + +// Book of Definitions +typedef struct Book { + u32 defs_len; + Def defs_buf[0x4000]; + u32 ffns_len; + FFn ffns_buf[0x4000]; +} Book; + + // Debugger // -------- @@ -1217,16 +1238,20 @@ void evaluator(Net* net, TM* tm, Book* book) { // Normalizer // ---------- -// Thread data -typedef struct { - Net* net; - TM* tm; - Book* book; -} ThreadArg; - void* thread_func(void* arg) { ThreadArg* data = (ThreadArg*)arg; - evaluator(data->net, data->tm, data->book); + while (TRUE) { + // Wait for main to signal that the thread should run + pthread_mutex_lock(data->work_mutex); + pthread_cond_wait(data->work_cond, data->work_mutex); + + evaluator(data->net, data->tm, data->book); + + // Signal main that the work of this thread is done + data->done = 1; + pthread_cond_signal(data->done_cond); + pthread_mutex_unlock(data->work_mutex); + } return NULL; } @@ -1238,37 +1263,41 @@ void boot_redex(Net* net, Pair redex) { // Evaluates all redexes. // TODO: cache threads to avoid spawning overhead -void normalize(Net* net, Book* book) { - // Inits thread_arg objects - ThreadArg thread_arg[TPC]; - for (u32 t = 0; t < TPC; ++t) { - thread_arg[t].net = net; - thread_arg[t].tm = tm[t]; - thread_arg[t].book = book; - } +void normalize(State state) { + u64 start_time = time64(); - // Spawns the evaluation threads - pthread_t threads[TPC]; + // Signal the threads to start. for (u32 t = 0; t < TPC; ++t) { - pthread_create(&threads[t], NULL, thread_func, &thread_arg[t]); + //printf("Signaling thread %d\n", t); + state.thread_arg[t].done = 0; + pthread_mutex_lock(state.thread_arg[t].done_mutex); + pthread_cond_signal(state.thread_arg[t].work_cond); } - // Wait for the threads to finish + // Wait for the threads to signal their finish for (u32 t = 0; t < TPC; ++t) { - pthread_join(threads[t], NULL); + while (state.thread_arg[t].done == 0) { + pthread_cond_wait(state.thread_arg[t].done_cond, state.thread_arg[t].done_mutex); + } + pthread_mutex_unlock(state.thread_arg[t].done_mutex); + //printf("Thread %d finished\n", t); } + + u64 end_time = time64(); + u64 time = (end_time - start_time) / 1000000; + printf("normalize took %llu ms\n", time); } // Util: expands a REF Port. -Port expand(Net* net, Book* book, Port port) { - Port old = vars_load(net, get_val(ROOT)); - Port got = peek(net, port); +Port expand(State state, Port port) { + Port old = vars_load(state.net, get_val(ROOT)); + Port got = peek(state.net, port); while (get_tag(got) == REF) { - boot_redex(net, new_pair(new_port(REF,get_val(got)), ROOT)); - normalize(net, book); - got = peek(net, vars_load(net, get_val(ROOT))); + boot_redex(state.net, new_pair(new_port(REF,get_val(got)), ROOT)); + normalize(state); + got = peek(state.net, vars_load(state.net, get_val(ROOT))); } - vars_create(net, get_val(ROOT), old); + vars_create(state.net, get_val(ROOT), old); return got; } @@ -1752,7 +1781,7 @@ void pretty_print_port(Net* net, Book* book, Port port) { //COMPILED_BOOK_BUF// #ifdef IO -void do_run_io(Net* net, Book* book, Port port); +void do_run_io(State state, Port port); #endif // Main @@ -1779,12 +1808,46 @@ void hvm_c(u32* book_buffer) { // Creates an initial redex that calls main boot_redex(net, new_pair(new_port(REF, 0), ROOT)); + // Inits thread_arg objects + ThreadArg thread_arg[TPC]; + pthread_mutex_t work_mutex[TPC]; + pthread_cond_t work_cond[TPC]; + pthread_mutex_t done_mutex[TPC]; + pthread_cond_t done_cond[TPC]; + for (u32 t = 0; t < TPC; ++t) { + pthread_mutex_init(&work_mutex[t], NULL); + pthread_cond_init(&work_cond[t], NULL); + pthread_mutex_init(&done_mutex[t], NULL); + pthread_cond_init(&done_cond[t], NULL); + + thread_arg[t].net = net; + thread_arg[t].tm = tm[t]; + thread_arg[t].book = book; + thread_arg[t].work_mutex = &work_mutex[t]; + thread_arg[t].work_cond = &work_cond[t]; + thread_arg[t].done_mutex = &done_mutex[t]; + thread_arg[t].done_cond = &done_cond[t]; + } + pthread_t threads[TPC]; + State state = {net, book, thread_arg, threads}; + + // Spawns the evaluation threads + for (u32 t = 0; t < TPC; ++t) { + pthread_create(&state.threads[t], NULL, thread_func, &state.thread_arg[t]); + } + #ifdef IO - do_run_io(net, book, ROOT); + do_run_io(state, ROOT); #else - normalize(net, book); + normalize(state); #endif + printf("Threads done\n"); + + for (u32 t = 0; t < TPC; ++t) { + pthread_cancel(state.threads[t]); + } + // Prints the result printf("Result: "); pretty_print_port(net, book, enter(net, ROOT)); diff --git a/src/run.c b/src/run.c index 28304508..aed0868f 100644 --- a/src/run.c +++ b/src/run.c @@ -46,32 +46,32 @@ typedef struct Bytes { // Reads back a λ-Encoded constructor from device to host. // Encoding: λt ((((t TAG) arg0) arg1) ...) -Ctr readback_ctr(Net* net, Book* book, Port port) { +Ctr readback_ctr(State state, Port port) { Ctr ctr; ctr.tag = -1; ctr.args_len = 0; // Loads root lambda - Port lam_port = expand(net, book, port); + Port lam_port = expand(state, port); if (get_tag(lam_port) != CON) return ctr; - Pair lam_node = node_load(net, get_val(lam_port)); + Pair lam_node = node_load(state.net, get_val(lam_port)); // Loads first application - Port app_port = expand(net, book, get_fst(lam_node)); + Port app_port = expand(state, get_fst(lam_node)); if (get_tag(app_port) != CON) return ctr; - Pair app_node = node_load(net, get_val(app_port)); + Pair app_node = node_load(state.net, get_val(app_port)); // Loads first argument (as the tag) - Port arg_port = expand(net, book, get_fst(app_node)); + Port arg_port = expand(state, get_fst(app_node)); if (get_tag(arg_port) != NUM) return ctr; ctr.tag = get_u24(get_val(arg_port)); // Loads remaining arguments while (TRUE) { - app_port = expand(net, book, get_snd(app_node)); + app_port = expand(state, get_snd(app_node)); if (get_tag(app_port) != CON) break; - app_node = node_load(net, get_val(app_port)); - arg_port = expand(net, book, get_fst(app_node)); + app_node = node_load(state.net, get_val(app_port)); + arg_port = expand(state, get_fst(app_node)); ctr.args_buf[ctr.args_len++] = arg_port; } @@ -81,16 +81,16 @@ Ctr readback_ctr(Net* net, Book* book, Port port) { // Reads back a tuple of at most `size` elements. Tuples are // (right-nested con nodes) (CON 1 (CON 2 (CON 3 (...)))) // The provided `port` should be `expanded` before calling. -Tup readback_tup(Net* net, Book* book, Port port, u32 size) { +Tup readback_tup(State state, Port port, u32 size) { Tup tup; tup.elem_len = 0; // Loads remaining arguments while (get_tag(port) == CON && (tup.elem_len + 1 < size)) { - Pair node = node_load(net, get_val(port)); - tup.elem_buf[tup.elem_len++] = expand(net, book, get_fst(node)); + Pair node = node_load(state.net, get_val(port)); + tup.elem_buf[tup.elem_len++] = expand(state, get_fst(node)); - port = expand(net, book, get_snd(node)); + port = expand(state, get_snd(node)); } tup.elem_buf[tup.elem_len++] = port; @@ -104,7 +104,7 @@ Tup readback_tup(Net* net, Book* book, Port port, u32 size) { // Encoding: // - λt (t NIL) // - λt (((t CONS) head) tail) -Str readback_str(Net* net, Book* book, Port port) { +Str readback_str(State state, Port port) { // Result Str str; str.text_len = 0; @@ -112,10 +112,10 @@ Str readback_str(Net* net, Book* book, Port port) { // Readback loop while (TRUE) { // Normalizes the net - normalize(net, book); + normalize(state); // Reads the λ-Encoded Ctr - Ctr ctr = readback_ctr(net, book, peek(net, port)); + Ctr ctr = readback_ctr(state, peek(state.net, port)); // Reads string layer switch (ctr.tag) { @@ -127,7 +127,7 @@ Str readback_str(Net* net, Book* book, Port port) { if (get_tag(ctr.args_buf[0]) != NUM) break; if (str.text_len >= 256) { printf("ERROR: for now, HVM can only readback strings of length <256."); break; } str.text_buf[str.text_len++] = get_u24(get_val(ctr.args_buf[0])); - boot_redex(net, new_pair(ctr.args_buf[1], ROOT)); + boot_redex(state.net, new_pair(ctr.args_buf[1], ROOT)); port = ROOT; continue; } @@ -144,7 +144,7 @@ Str readback_str(Net* net, Book* book, Port port) { // Encoding: // - λt (t NIL) // - λt (((t CONS) head) tail) -Bytes readback_bytes(Net* net, Book* book, Port port) { +Bytes readback_bytes(State state, Port port) { // Result Bytes bytes; bytes.buf = (char*) malloc(sizeof(char) * MAX_BYTES); @@ -153,10 +153,10 @@ Bytes readback_bytes(Net* net, Book* book, Port port) { // Readback loop while (TRUE) { // Normalizes the net - normalize(net, book); + normalize(state); // Reads the λ-Encoded Ctr - Ctr ctr = readback_ctr(net, book, peek(net, port)); + Ctr ctr = readback_ctr(state, peek(state.net, port)); // Reads string layer switch (ctr.tag) { @@ -168,7 +168,7 @@ Bytes readback_bytes(Net* net, Book* book, Port port) { if (get_tag(ctr.args_buf[0]) != NUM) break; if (bytes.len >= MAX_BYTES) { printf("ERROR: for now, HVM can only readback list of bytes of length <%u.", MAX_BYTES); break; } bytes.buf[bytes.len++] = get_u24(get_val(ctr.args_buf[0])); - boot_redex(net, new_pair(ctr.args_buf[1], ROOT)); + boot_redex(state.net, new_pair(ctr.args_buf[1], ROOT)); port = ROOT; continue; } @@ -281,8 +281,8 @@ FILE* readback_file(Port port) { // Reads from a file a specified number of bytes. // `argm` is a tuple of (file_descriptor, num_bytes). -Port io_read(Net* net, Book* book, Port argm) { - Tup tup = readback_tup(net, book, argm, 2); +Port io_read(State state, Port argm) { + Tup tup = readback_tup(state, argm, 2); if (tup.elem_len != 2) { fprintf(stderr, "io_read: expected 2-tuple\n"); return new_port(ERA, 0); @@ -308,7 +308,7 @@ Port io_read(Net* net, Book* book, Port argm) { } // Convert it to a port. - Port ret = inject_bytes(net, &bytes); + Port ret = inject_bytes(state.net, &bytes); free(bytes.buf); return ret; } @@ -316,15 +316,15 @@ Port io_read(Net* net, Book* book, Port argm) { // Opens a file with the provided mode. // `argm` is a tuple (CON node) of the // file name and mode as strings. -Port io_open(Net* net, Book* book, Port argm) { - Tup tup = readback_tup(net, book, argm, 2); +Port io_open(State state, Port argm) { + Tup tup = readback_tup(state, argm, 2); if (tup.elem_len != 2) { fprintf(stderr, "io_open: expected 2-tuple\n"); return new_port(ERA, 0); } - Str name = readback_str(net, book, tup.elem_buf[0]); - Str mode = readback_str(net, book, tup.elem_buf[1]); + Str name = readback_str(state, tup.elem_buf[0]); + Str mode = readback_str(state, tup.elem_buf[1]); for (u32 fd = 3; fd < sizeof(FILE_POINTERS); fd++) { if (FILE_POINTERS[fd] == NULL) { @@ -338,7 +338,7 @@ Port io_open(Net* net, Book* book, Port argm) { } // Closes a file, reclaiming the file descriptor. -Port io_close(Net* net, Book* book, Port argm) { +Port io_close(State state, Port argm) { FILE* fp = readback_file(argm); if (fp == NULL) { fprintf(stderr, "io_close: failed to close\n"); @@ -358,15 +358,15 @@ Port io_close(Net* net, Book* book, Port argm) { // Writes a list of bytes to a file. // `argm` is a tuple (CON node) of the // file descriptor and list of bytes to write. -Port io_write(Net* net, Book* book, Port argm) { - Tup tup = readback_tup(net, book, argm, 2); +Port io_write(State state, Port argm) { + Tup tup = readback_tup(state, argm, 2); if (tup.elem_len != 2) { fprintf(stderr, "io_write: expected 2-tuple\n"); return new_port(ERA, 0); } FILE* fp = readback_file(tup.elem_buf[0]); - Bytes bytes = readback_bytes(net, book, tup.elem_buf[1]); + Bytes bytes = readback_bytes(state, tup.elem_buf[1]); if (fp == NULL) { fprintf(stderr, "io_write: invalid file descriptor\n"); @@ -390,8 +390,8 @@ Port io_write(Net* net, Book* book, Port argm) { // - 0 (SEEK_SET): beginning of file // - 1 (SEEK_CUR): current position of the file pointer // - 2 (SEEK_END): end of the file -Port io_seek(Net* net, Book* book, Port argm) { - Tup tup = readback_tup(net, book, argm, 3); +Port io_seek(State state, Port argm) { + Tup tup = readback_tup(state, argm, 3); if (tup.elem_len != 3) { fprintf(stderr, "io_seek: expected 3-tuple\n"); return new_port(ERA, 0); @@ -425,7 +425,7 @@ Port io_seek(Net* net, Book* book, Port argm) { // Returns the current time as a tuple of the high // and low 24 bits of a 48-bit nanosecond timestamp. -Port io_get_time(Net* net, Book* book, Port argm) { +Port io_get_time(State state, Port argm) { // Get the current time in nanoseconds u64 time_ns = time64(); // Encode the time as a 64-bit unsigned integer @@ -433,8 +433,8 @@ Port io_get_time(Net* net, Book* book, Port argm) { u32 time_lo = (u32)(time_ns & 0xFFFFFFF); // Allocate a node to store the time u32 lps = 0; - u32 loc = node_alloc_1(net, tm[0], &lps); - node_create(net, loc, new_pair(new_port(NUM, new_u24(time_hi)), new_port(NUM, new_u24(time_lo)))); + u32 loc = node_alloc_1(state.net, tm[0], &lps); + node_create(state.net, loc, new_pair(new_port(NUM, new_u24(time_hi)), new_port(NUM, new_u24(time_lo)))); // Return the encoded time return new_port(CON, loc); } @@ -442,15 +442,15 @@ Port io_get_time(Net* net, Book* book, Port argm) { // Sleeps. // `argm` is a tuple (CON node) of the high and low // 24 bits for a 48-bit duration in nanoseconds. -Port io_sleep(Net* net, Book* book, Port argm) { - Tup tup = readback_tup(net, book, argm, 2); +Port io_sleep(State state, Port argm) { + Tup tup = readback_tup(state, argm, 2); if (tup.elem_len != 2) { - fprintf(stderr, "io_sleep: expected 3-tuple\n"); + fprintf(stderr, "io_sleep: expected 2-tuple\n"); return new_port(ERA, 0); } // Get the sleep duration node - Pair dur_node = node_load(net, get_val(argm)); + Pair dur_node = node_load(state.net, get_val(argm)); // Get the high and low 24-bit parts of the duration u32 dur_hi = get_u24(get_val(tup.elem_buf[0])); u32 dur_lo = get_u24(get_val(tup.elem_buf[1])); @@ -482,16 +482,22 @@ void book_init(Book* book) { // --------------------- // Runs an IO computation. -void do_run_io(Net* net, Book* book, Port port) { - book_init(book); +void do_run_io(State state, Port port) { + book_init(state.book); + + int count = 0; // IO loop while (TRUE) { + printf("IO loop start\n"); + count++; + // Normalizes the net - normalize(net, book); + normalize(state); + printf("IO loop normalize end\n"); // Reads the λ-Encoded Ctr - Ctr ctr = readback_ctr(net, book, peek(net, port)); + Ctr ctr = readback_ctr(state, peek(state.net, port)); // Checks if IO Magic Number is a CON if (get_tag(ctr.args_buf[0]) != CON) { @@ -499,7 +505,7 @@ void do_run_io(Net* net, Book* book, Port port) { } // Checks the IO Magic Number - Pair io_magic = node_load(net, get_val(ctr.args_buf[0])); + Pair io_magic = node_load(state.net, get_val(ctr.args_buf[0])); //printf("%08x %08x\n", get_u24(get_val(get_fst(io_magic))), get_u24(get_val(get_snd(io_magic)))); if (get_val(get_fst(io_magic)) != new_u24(IO_MAGIC_0) || get_val(get_snd(io_magic)) != new_u24(IO_MAGIC_1)) { break; @@ -507,12 +513,12 @@ void do_run_io(Net* net, Book* book, Port port) { switch (ctr.tag) { case IO_CALL: { - Str func = readback_str(net, book, ctr.args_buf[1]); + Str func = readback_str(state, ctr.args_buf[1]); FFn* ffn = NULL; // FIXME: optimize this linear search - for (u32 fid = 0; fid < book->ffns_len; ++fid) { - if (strcmp(func.text_buf, book->ffns_buf[fid].name) == 0) { - ffn = &book->ffns_buf[fid]; + for (u32 fid = 0; fid < state.book->ffns_len; ++fid) { + if (strcmp(func.text_buf, state.book->ffns_buf[fid].name) == 0) { + ffn = &state.book->ffns_buf[fid]; break; } } @@ -523,12 +529,12 @@ void do_run_io(Net* net, Book* book, Port port) { Port argm = ctr.args_buf[2]; Port cont = ctr.args_buf[3]; - Port ret = ffn->func(net, book, argm); + Port ret = ffn->func(state, argm); u32 lps = 0; - u32 loc = node_alloc_1(net, tm[0], &lps); - node_create(net, loc, new_pair(ret, ROOT)); - boot_redex(net, new_pair(new_port(CON, loc), cont)); + u32 loc = node_alloc_1(state.net, tm[0], &lps); + node_create(state.net, loc, new_pair(ret, ROOT)); + boot_redex(state.net, new_pair(new_port(CON, loc), cont)); port = ROOT; continue; }