diff --git a/src/service/service.ts b/src/service/service.ts index 34c4d762..0973c733 100644 --- a/src/service/service.ts +++ b/src/service/service.ts @@ -35,8 +35,6 @@ import { ITalkReqMessage, ITalkRespMessage, createTalkRequestMessage, - createTalkResponseMessage, - RequestId, } from "../message"; import { Discv5EventEmitter, ENRInput, IActiveRequest, IDiscv5Metrics, INodesResponse } from "./types"; import { AddrVotes } from "./addrVotes"; @@ -126,6 +124,11 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { private metrics?: IDiscv5Metrics; + /** + * A map of open listeners for TALKREQ messages that have been set. Used to ensure event listeners + * are cleared when the expected response is returned or the timeout period expires + */ + private talkReqListeners: Map void>; /** * Default constructor. * @param sessionService the service managing sessions underneath. @@ -152,6 +155,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { metrics.activeSessionCount.collect = () => metrics.activeSessionCount.set(discv5.sessionService.sessionsSize()); metrics.lookupCount.collect = () => metrics.lookupCount.set(this.nextLookupId - 1); } + this.talkReqListeners = new Map(); } /** @@ -306,42 +310,84 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { } /** - * Broadcast TALKREQ message to all nodes in routing table + * Broadcast TALKREQ message to all nodes in routing table and returns response */ - public async broadcastTalkReq(payload: Buffer, protocol: string | Uint8Array): Promise { - const msg = createTalkRequestMessage(payload, protocol); - for (const node of this.kadValues()) { - const sendStatus = this.sendRequest(node.nodeId, msg); - if (!sendStatus) { - log(`Failed to send TALKREQ message to node ${node.nodeId}`); - } else { - log(`Sent TALKREQ message to node ${node.nodeId}`); + public async broadcastTalkReq(payload: Buffer, protocol: string | Uint8Array, timeout = 1000): Promise { + return await new Promise((resolve, reject) => { + const msg = createTalkRequestMessage(payload, protocol); + const responseTimeout = setTimeout(() => { + const event = this.talkReqListeners.get(msg.id); + if (event) { + this.removeListener("talkRespReceived", event); + this.talkReqListeners.delete(msg.id); + } + reject("Request timed out"); + }, timeout); + const listener = (srcId: string, enr: ENR | null, res: ITalkRespMessage): void => { + if (res.id === msg.id) { + const event = this.talkReqListeners.get(msg.id); + if (event) { + this.removeListener("talkRespReceived", event); + this.talkReqListeners.delete(msg.id); + } + clearTimeout(responseTimeout); + resolve(res.response); + } + }; + this.talkReqListeners.set(msg.id, listener); + this.on("talkRespReceived", listener); + + for (const node of this.kadValues()) { + const sendStatus = this.sendRequest(node.nodeId, msg); + if (!sendStatus) { + log(`Failed to send TALKREQ message to node ${node.nodeId}`); + } else { + log(`Sent TALKREQ message to node ${node.nodeId}`); + } } - } + }); } /** - * Send TALKRESP message to requesting node + * Send TALKREQ message to dstId and returns response */ - public async sendTalkResp(srcId: NodeId, requestId: RequestId, payload: Uint8Array): Promise { - const msg = createTalkResponseMessage(requestId, payload); - const enr = this.getKadValue(srcId); - const addr = await enr?.getFullMultiaddr("udp"); - if (enr && addr) { - log(`Sending TALKRESP message to node ${enr.id}`); - try { - this.sessionService.sendResponse(addr, srcId, msg); - this.metrics?.sentMessageCount.inc({ type: MessageType[MessageType.TALKRESP] }); - } catch (e) { - log("Failed to send a TALKRESP response. Error: %s", e.message); - } - } else { - if (!addr && enr) { - log(`No ip + udp port found for node ${srcId}`); + public async sendTalkReq( + dstId: string, + payload: Buffer, + protocol: string | Uint8Array, + timeout = 1000 + ): Promise { + return await new Promise((resolve, reject) => { + const msg = createTalkRequestMessage(payload, protocol); + const responseTimeout = setTimeout(() => { + const event = this.talkReqListeners.get(msg.id); + if (event) { + this.removeListener("talkRespReceived", event); + this.talkReqListeners.delete(msg.id); + } + reject("Request timed out"); + }, timeout); + const listener = (srcId: string, enr: ENR | null, res: ITalkRespMessage): void => { + if (res.id === msg.id) { + clearTimeout(responseTimeout); + resolve(res.response); + const event = this.talkReqListeners.get(msg.id); + if (event) { + this.removeListener("talkRespReceived", event); + this.talkReqListeners.delete(msg.id); + } + } + }; + this.talkReqListeners.set(msg.id, listener); + + this.on("talkRespReceived", listener); + const sendStatus = this.sendRequest(dstId, msg); + if (!sendStatus) { + log(`Failed to send TALKREQ message to node ${dstId}`); } else { - log(`Node ${srcId} not found`); + log(`Sent TALKREQ message to node ${dstId}`); } - } + }); } /**