diff --git a/src/index.ts b/src/index.ts index 2ac3d21e..d3e732bc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,6 +12,7 @@ import { randomUUID, sanitizeFunctionArguments, } from "./utils.js"; +import { execLocally, setupFn } from "./local.js"; const withDelay = (dt: Date, delay: Duration): Date => new Date(dt.getTime() + parseDuration(delay)!); @@ -31,39 +32,6 @@ function getHTTPClient(): HTTPClient | undefined { export const deferEnabled = () => !!getEnv("DEFER_TOKEN"); -async function execLocally( - id: string, - fn: any, - args: any -): Promise { - let state: client.ExecutionState = "succeed"; - let originalResult: any; - try { - originalResult = await fn(...args); - } catch (error) { - const e = error as Error; - state = "failed"; - originalResult = { - name: e.name, - message: e.message, - cause: e.cause, - stack: e.stack, - }; - } - - let result: any; - try { - result = JSON.parse(JSON.stringify(originalResult || "")); - } catch (error) { - const e = error as Error; - throw new DeferError(`cannot serialize function return: ${e.message}`); - } - - const response = { id, state, result }; - __database.set(id, response); - - return response; -} async function enqueue( func: DeferredFunction, @@ -105,7 +73,7 @@ async function enqueue( const id = randomUUID(); __database.set(id, { id: id, state: "started" }); - execLocally(id, originalFunction, functionArguments); + execLocally(id, func, functionArguments) return { id }; } @@ -139,6 +107,8 @@ export type Concurrency = Range<0, 51>; export type NextRouteString = `/api/${string}`; export interface Manifest { + id: string; + name: string; version: number; cron?: string; retry?: RetryPolicy; @@ -249,12 +219,16 @@ export function defer( wrapped.__fn = fn; wrapped.__metadata = { + id: randomUUID(), + name: fn.name, version: INTERNAL_VERSION, retry: parseRetryPolicy(config), concurrency: config?.concurrency, maxDuration: config?.maxDuration, }; + setupFn(wrapped.__metadata) + return wrapped; } @@ -271,6 +245,8 @@ defer.cron = function ( wrapped.__fn = fn; wrapped.__metadata = { + id: randomUUID(), + name: fn.name, version: INTERNAL_VERSION, retry: parseRetryPolicy(config), cron: cronExpr, @@ -361,7 +337,7 @@ export function awaitResult( } else { const id = randomUUID(); __database.set(id, { id: id, state: "started" }); - response = await execLocally(id, originalFunction, functionArguments); + response = await execLocally(id, fn, functionArguments); } if (response.state === "failed") { diff --git a/src/local.ts b/src/local.ts new file mode 100644 index 00000000..4334fb20 --- /dev/null +++ b/src/local.ts @@ -0,0 +1,76 @@ +import { ExecutionState, FetchExecutionResponse } from "./client.js"; +import { DeferError } from "./errors.js"; +import { + DeferredFunction, + __database, + DeferableFunction, + Manifest, +} from "./index"; +import { Queue } from "./queue.js"; + +type Invocation = { + id: string; + func: DeferredFunction + args: any + oncomplete: ((result: FetchExecutionResponse) => void) | undefined +}; +type FunctionConfiguration = Manifest & { + queue: Queue>; +}; +const fns: Record> = {}; + +export function setupFn(metadata: Manifest) { + fns[metadata.id] = { + ...metadata, + queue: new Queue>(invoke, metadata.concurrency), + }; +} + +export function execLocally( + id: string, + func: DeferredFunction, + args: any +): Promise> { + return new Promise(resolve => { + fns[func.__metadata.id]!.queue.push({ + id, + func, + args, + oncomplete: resolve + }); + }) +} + +async function invoke(invocation: Invocation) { + __database.set(invocation.id, { id: invocation.id, state: "started" }); + let state: ExecutionState = "succeed"; + let originalResult: any; + try { + originalResult = await invocation.func.__fn(...invocation.args); + } catch (error) { + const e = error as Error; + state = "failed"; + originalResult = { + name: e.name, + message: e.message, + cause: e.cause, + stack: e.stack, + }; + + console.error('Error in deferred function ' + invocation.func.__fn.name + ' (invocation ' + invocation.id + '):\n', e) + } + + let result: any; + try { + result = JSON.parse(JSON.stringify(originalResult || "")); + } catch (error) { + const e = error as Error; + throw new DeferError(`cannot serialize function return: ${e.message}`); + } + + const response = { id: invocation.id, state, result } + __database.set(invocation.id, response); + + invocation.oncomplete?.(response) + +} \ No newline at end of file diff --git a/src/queue.ts b/src/queue.ts new file mode 100644 index 00000000..bdbba603 --- /dev/null +++ b/src/queue.ts @@ -0,0 +1,33 @@ +import { Concurrency } from "."; + +export class Queue extends Array { + private running = 0; + + constructor(public invoker: (item: T) => Promise, public concurrency?: Concurrency) { + super(); + Object.setPrototypeOf(this, Queue.prototype) + } + + override push(...items: T[]): number { + const len = super.push(...items); + this.next(); + return len; + } + + private next() { + if (!this.concurrency || this.running < this.concurrency) { + this.dequeue(); + } + } + + private dequeue() { + const item = this.shift(); + if (item) { + ++this.running; + this.invoker(item).finally(() => { + --this.running; + this.next(); + }); + } + } +}