Skip to content

Commit

Permalink
feat(handshake): limit number of connections
Browse files Browse the repository at this point in the history
  • Loading branch information
kckeiks committed Oct 17, 2023
1 parent c3e8809 commit 04414d9
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 2 deletions.
1 change: 1 addition & 0 deletions core/e2e/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ fn build_config(
udp_address: ([127, 0, 0, 1], ports.handshake.webrtc).into(),
})],
http_address: ([127, 0, 0, 1], ports.handshake.http).into(),
..Default::default()
});

config.inject::<ServiceExecutor<FinalTypes>>(ServiceExecutorConfig {
Expand Down
13 changes: 12 additions & 1 deletion core/handshake/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct HandshakeConfig {
#[serde(rename = "transport")]
pub transports: Vec<TransportConfig>,
pub http_address: SocketAddr,
pub max_client_connection_limit: usize,
pub max_global_connection_limit: usize,
}

impl Default for HandshakeConfig {
Expand All @@ -60,6 +62,8 @@ impl Default for HandshakeConfig {
TransportConfig::Tcp(Default::default()),
],
http_address: ([0, 0, 0, 0], 4220).into(),
max_client_connection_limit: 254,
max_global_connection_limit: 10_000,
}
}
}
Expand All @@ -72,7 +76,14 @@ impl<C: Collection> HandshakeInterface<C> for Handshake<C> {
) -> anyhow::Result<Self> {
let shutdown = ShutdownNotifier::default();
let (_, sk) = signer.get_sk();
let state = StateRef::new(CONNECTION_TIMEOUT, shutdown.waiter(), sk, provider);
let state = StateRef::new(
CONNECTION_TIMEOUT,
shutdown.waiter(),
sk,
provider,
config.max_client_connection_limit,
config.max_global_connection_limit,
);

Ok(Self {
status: Mutex::new(Some(Run {
Expand Down
41 changes: 41 additions & 0 deletions core/handshake/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ impl<P: ExecutorProviderInterface> StateRef<P> {
waiter: ShutdownWaiter,
sk: NodeSecretKey,
provider: P,
max_client_connection_limit: usize,
max_global_connection_limit: usize,
) -> Self {
let gc = Gc::default();

Expand All @@ -44,6 +46,9 @@ impl<P: ExecutorProviderInterface> StateRef<P> {
shutdown: waiter,
provider,
connections: Default::default(),
connection_count: Default::default(),
max_client_connection_limit,
max_global_connection_limit,
gc_sender: gc.sender(),
}));

Expand All @@ -63,6 +68,9 @@ pub struct StateData<P: ExecutorProviderInterface> {
pub provider: P,
/// Map each connection to the information we have about it.
pub connections: DashMap<u64, Connection<P::Handle>, fxhash::FxBuildHasher>,
pub connection_count: DashMap<ClientPublicKey, usize>,
pub max_client_connection_limit: usize,
pub max_global_connection_limit: usize,
pub gc_sender: GcSender,
}

Expand Down Expand Up @@ -172,6 +180,9 @@ impl<P: ExecutorProviderInterface> StateData<P> {
};

self.connections.insert(connection_id, connection);
self.connection_count
.entry(pk)
.and_modify(|count| *count += 1);

// Let the service know we got the connection.
handle.connected(fn_sdk::internal::OnConnectedArgs {
Expand Down Expand Up @@ -276,6 +287,12 @@ impl<P: ExecutorProviderInterface> StateData<P> {
frame: schema::HandshakeRequestFrame,
sender: StaticSender,
) -> Option<ProcessHandshakeResult> {
if let schema::HandshakeRequestFrame::Handshake { pk, .. } = &frame {
if self.check_connection_limits(pk) {
return None;
}
}

match frame {
schema::HandshakeRequestFrame::Handshake {
retry: Some(conn_id),
Expand Down Expand Up @@ -444,6 +461,8 @@ impl<P: ExecutorProviderInterface> StateData<P> {
return;
};

self.update_client_connection_count(connection.pk);

for sender in connection.senders.into_iter().flatten() {
sender.terminate(schema::TerminationReason::ServiceTerminated);
}
Expand Down Expand Up @@ -482,12 +501,34 @@ impl<P: ExecutorProviderInterface> StateData<P> {
return;
}

self.update_client_connection_count(connection.pk);

let connection = entry.remove();
connection
.handle
.disconnected(fn_sdk::internal::OnDisconnectedArgs { connection_id });
}
}

#[inline]
fn update_client_connection_count(&self, pk: ClientPublicKey) {
if let Entry::Occupied(mut entry) = self.connection_count.entry(pk) {
if *entry.get() > 1 {
*entry.get_mut() -= 1;
} else {
entry.remove();
}
}
}

#[inline]
fn check_connection_limits(&self, pk: &ClientPublicKey) -> bool {
self.connection_count
.get(pk)
.map(|count| *count.value() >= self.max_client_connection_limit)
.unwrap_or(false)
|| self.connections.len() >= self.max_global_connection_limit
}
}

impl<H: ServiceHandleInterface> Connection<H> {
Expand Down
4 changes: 3 additions & 1 deletion core/handshake/tests/demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ async fn demo() -> anyhow::Result<()> {
"type": "AsyncWorker"
},
],
"http_addr": "0.0.0.0:4210"
"http_addr": "0.0.0.0:4210",
"max_client_connection_limit": 254,
"max_global_connection_limit": 10_000,
},
"service-executor": {
"services": [0]
Expand Down

0 comments on commit 04414d9

Please sign in to comment.