Skip to content

Commit

Permalink
comments, more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 12, 2024
1 parent 5d50bd9 commit a8b812a
Show file tree
Hide file tree
Showing 6 changed files with 498 additions and 108 deletions.
72 changes: 72 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 43 additions & 39 deletions broker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! This file contains the implementation of the `Broker`, which routes messages
//! for the Push CDN.
// TODO: massive cleanup on this file
// TODO: split out this file into multiple files.
// TODO: logging

mod map;
mod state;
Expand All @@ -25,8 +26,7 @@ use proto::{
redis::{self, BrokerIdentifier},
verify_broker,
};
use slotmap::DefaultKey;
use state::ConnectionLookup;
use state::{ConnectionId, ConnectionLookup, Sender};
use tokio::{select, spawn, sync::RwLock, time::sleep};
use tracing::{error, info, warn};

Expand Down Expand Up @@ -96,27 +96,36 @@ struct Inner<
pd: PhantomData<(UserProtocolType, BrokerProtocolType, UserSignatureScheme)>,
}

/// This macro is a helper macro that lets us "send many messages", and remove
/// the actor from the local state if the message failed to send
macro_rules! send_or_remove_many {
($connections: expr, $lookup:expr, $message: expr, $position: expr) => {
// For each connection,
for connection in $connections {
// Queue a message back
if connection
.1
.queue_message($message.clone(), $position)
.is_err()
{
// If it fails, remove the connection.
get_lock!($lookup, write).remove_connection(connection.0);
};
}
};
}

/// We use this macro to help send direct messages. It just makes the code easier
/// to look at.
macro_rules! send_direct {
($lookup: expr, $key: expr, $message: expr) => {{
let connections = $lookup.read().await.get_connections_by_key(&$key).clone();
send_or_remove_many!(connections, $lookup, $message, Position::Back);
}};
}

/// We use this macro to help send broadcast messages. It just makes the code easier
/// to look at.
macro_rules! send_broadcast {
($lookup:expr, $topics: expr, $message: expr) => {{
let connections = $lookup
Expand All @@ -128,6 +137,7 @@ macro_rules! send_broadcast {
}};
}

/// This is a macro to acquire an async lock, which helps readability.
macro_rules! get_lock {
($lock :expr, $type: expr) => {
paste::item! {
Expand All @@ -136,6 +146,7 @@ macro_rules! get_lock {
};
}

// Creates and serializes a new message of the specified type with the specified data.
macro_rules! new_serialized_message {
($type: ident, $data: expr) => {
Arc::<Vec<u8>>::from(bail!(
Expand Down Expand Up @@ -487,6 +498,10 @@ where
}
}

/// This task deals with sending connected user and topic updates to other brokers. It takes advantage of
/// `SnapshotMap`, so we can send partial or full updates to the other brokers as they need it.
/// Right now, we do it every 5 seconds, or on every user connect if the number of connections is
/// sufficiently low.
async fn send_updates_task(
inner: Arc<
Inner<BrokerSignatureScheme, BrokerProtocolType, UserSignatureScheme, UserProtocolType>,
Expand All @@ -510,6 +525,8 @@ where
}
}

/// This task deals with setting the number of our connected users in `Redis`. It allows
/// the marshal to correctly choose the broker with the least amount of connections.
async fn heartbeat_task(
inner: Arc<
Inner<BrokerSignatureScheme, BrokerProtocolType, UserSignatureScheme, UserProtocolType>,
Expand Down Expand Up @@ -576,8 +593,12 @@ where
}
}

/// This is a macro that helps us send an update to other brokers. The message type depends on
/// whether it is a user update or a topic update. The recipients is the other brokers (or broker)
/// for which we want the partial/complete update, and the position refers to the position the message
/// should go in the queue.
macro_rules! send_update_to_brokers {
($lookup:expr, $message_type: ident, $data:expr, $recipients: expr, $position: ident) => {{
($self:expr, $message_type: ident, $data:expr, $recipients: expr, $position: ident) => {{
// If the data is not empty, make a message of the specified type
if !$data.is_empty() {
// Create a `Subscribe` message, which contains the full list of topics we're subscribed to
Expand All @@ -586,7 +607,7 @@ macro_rules! send_update_to_brokers {
// For each recipient, send to the destined position in the queue
send_or_remove_many!(
$recipients,
$lookup,
$self.broker_connection_lookup,
message,
Position::$position
);
Expand All @@ -606,9 +627,11 @@ where
BrokerSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::SigningKey: Serializable,
{
/// This is the main loop where we deal with broker connectins. On exit, the calling function
/// should remove the broker from the map.
pub async fn broker_recv_loop(
self: &Arc<Self>,
connection_id: DefaultKey,
connection_id: ConnectionId,
mut receiver: BrokerProtocolType::Receiver,
) -> Result<()> {
while let Ok(message) = receiver.recv_message().await {
Expand Down Expand Up @@ -657,9 +680,11 @@ where
Err(Error::Connection("connection closed".to_string()))
}

/// This is the main loop where we deal with user connectins. On exit, the calling function
/// should remove the user from the map.
pub async fn user_recv_loop(
self: &Arc<Self>,
connection_id: DefaultKey,
connection_id: ConnectionId,
mut receiver: UserProtocolType::Receiver,
) {
while let Ok(message) = receiver.recv_message().await {
Expand Down Expand Up @@ -703,10 +728,13 @@ where
}
}

/// This function lets us send updates to brokers on demand. We need this to ensure consistency between brokers
/// (e.g. which brokers have which users connected). We send these updates out periodically, but also
/// on every user join if the number of connected users is sufficiently small.
pub async fn send_updates_to_brokers(
self: &Arc<Self>,
full: Vec<(DefaultKey, Arc<BatchedSender<BrokerProtocolType>>)>,
partial: Vec<(DefaultKey, Arc<BatchedSender<BrokerProtocolType>>)>,
full: Vec<(ConnectionId, Sender<BrokerProtocolType>)>,
partial: Vec<(ConnectionId, Sender<BrokerProtocolType>)>,
) -> Result<()> {
// When a broker connects, we have to send:
// 1. Our snapshot to the new broker (of what topics/users we're subscribed for)
Expand All @@ -720,26 +748,14 @@ where

// Send the full connected users to interested brokers first in the queue (so that it is the correct order)
// TODO: clean up this function
send_update_to_brokers!(
self.broker_connection_lookup,
UsersConnected,
key_snapshot.snapshot,
&full,
Front
);
send_update_to_brokers!(self, UsersConnected, key_snapshot.snapshot, &full, Front);

// Send the full topics list to interested brokers first in the queue (so that it is the correct order)
send_update_to_brokers!(
self.broker_connection_lookup,
Subscribe,
topic_snapshot.snapshot,
&full,
Front
);
send_update_to_brokers!(self, Subscribe, topic_snapshot.snapshot, &full, Front);

// Send the insertion updates for keys, if any
send_update_to_brokers!(
self.broker_connection_lookup,
self,
UsersConnected,
key_snapshot.insertions,
&partial,
Expand All @@ -748,30 +764,18 @@ where

// Send the removal updates for keys, if any
send_update_to_brokers!(
self.broker_connection_lookup,
self,
UsersDisconnected,
key_snapshot.removals,
&partial,
Back
);

// Send the insertion updates for topics, if any
send_update_to_brokers!(
self.broker_connection_lookup,
Subscribe,
topic_snapshot.insertions,
&partial,
Back
);
send_update_to_brokers!(self, Subscribe, topic_snapshot.insertions, &partial, Back);

// Send the removal updates for topics, if any
send_update_to_brokers!(
self.broker_connection_lookup,
Unsubscribe,
topic_snapshot.removals,
&partial,
Back
);
send_update_to_brokers!(self, Unsubscribe, topic_snapshot.removals, &partial, Back);

Ok(())
}
Expand Down
Loading

0 comments on commit a8b812a

Please sign in to comment.