Skip to content

Commit

Permalink
refactor connection flow
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 2, 2024
1 parent 98d5ece commit 0cd5c7d
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 143 deletions.
34 changes: 5 additions & 29 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use proto::{
crypto,
error::Error,
error::Result,
message::{Broadcast, Direct, Message, Subscribe, Topic, Unsubscribe},
message::{Broadcast, Direct, Message, Topic},
};

/// `Client` is a light wrapper around a `Sticky` connection that provides functions
Expand Down Expand Up @@ -130,41 +130,17 @@ where
/// If the connection or serialization has failed
///
/// TODO IMPORTANT: see if we want this, or if we'd prefer `set_subscriptions()`
pub async fn subscribe(&self, topics: Vec<Topic>) -> Result<()> {
// Lock subscribed topics here so if we're reconnecting we maintain parity
// with our list

// Lock our topics here so we can't add them on failure
let mut topic_guard = self.0.inner.subscribed_topics.lock().await;

// Form and send the single message
self.send_message_raw(Arc::from(Message::Subscribe(Subscribe {
topics: topics.clone(),
})))
.await
// Only add to our topic map if the message was successful
.map(|()| topic_guard.extend(topics))
pub async fn subscribe(&self, _topics: Vec<Topic>) -> Result<()> {
todo!()
}

/// Sends a message to the server that asserts that this client is no longer
/// interested in a specific topic
///
/// # Errors
/// If the connection or serialization has failed
pub async fn unsubscribe(&self, topics: Vec<Topic>) -> Result<()> {
// Lock subscribed topics here so if we're reconnecting we maintain parity
// with our list

// Lock our topics here so we can't add them on failure
let mut topic_guard = self.0.inner.subscribed_topics.lock().await;

// Form and send the single message
self.send_message_raw(Arc::from(Message::Unsubscribe(Unsubscribe {
topics: topics.clone(),
})))
.await
// Only add to our topic map if the message was successful
.map(|()| topic_guard.extend(topics))
pub async fn unsubscribe(&self, _topics: Vec<Topic>) -> Result<()> {
todo!()
}

/// Sends a pre-formed message over the wire. Various functions make use
Expand Down
12 changes: 7 additions & 5 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use std::marker::PhantomData;

use client::{Client, Config};
use proto::{
connection::{flow::ToMarshal, protocols::quic::Quic},
connection::{flow::UserToMarshal, protocols::quic::Quic},
crypto,
error::Result,
message::Topic,
};

use jf_primitives::signatures::bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS;
Expand All @@ -14,11 +13,14 @@ use jf_primitives::signatures::bls_over_bn254::BLSOverBN254CurveSignatureScheme
async fn main() -> Result<()> {
let (signing_key, verification_key) = crypto::generate_random_keypair::<BLS>()?;

let client = Client::<BLS, Quic, ToMarshal>::new(Config {
let connection_flow = UserToMarshal {
verification_key,
signing_key,
remote_address: "google.com:80".to_string(),
initial_subscribed_topics: vec![Topic::Global],
endpoint: "google.com:80".to_string(),
};

let client = Client::<BLS, Quic, _>::new(Config {
flow: connection_flow,
pd: PhantomData,
})
.await?;
Expand Down
62 changes: 26 additions & 36 deletions proto/src/connection/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,59 +16,63 @@ use crate::{
bail,
crypto::{self, DeterministicRng},
error::{Error, Result},
message::{AuthenticateWithKey, AuthenticateWithPermit, Message, Subscribe, Topic},
message::{AuthenticateWithKey, AuthenticateWithPermit, Message},
};

use super::protocols::Protocol;
use crate::connection::protocols::Connection;

/// TODO: BIDIRECTIONAL AUTHENTICATION FOR USERS<->BROKERS
///
/// The `Flow` trait implements a connection flow that takes in an endpoint,
/// signing key, and verification key and returns a connection.
/// The `Flow` trait implements a connection flow that takes in a `Flow implementation`,
/// and returns an authenticated endpoint (or an error).
#[async_trait]
pub trait Flow<
SignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
ProtocolType: Protocol,
>: Send + Sync
>: Send + Sync + 'static
{
/// This is the meat of `Flow`. We define this for every type of connection flow we have.
async fn connect(
endpoint: String,
signing_key: &SignatureScheme::SigningKey,
verification_key: &SignatureScheme::VerificationKey,
subscribed_topics: Vec<Topic>,
) -> Result<ProtocolType::Connection>;
async fn connect(&self) -> Result<ProtocolType::Connection>;
}

/// This struct implements `Flow`. It defines an implementation wherein we connect
/// to a marshal first, who returns the server address we should connect to, along
/// with a permit. Only after that do we try connecting to the broker.
pub struct ToMarshal {}
#[derive(Clone)]
pub struct UserToMarshal<SignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>>
{
/// This is the remote address that we authenticate to. It can either be a broker
/// or a marshal.
pub endpoint: String,

/// The underlying (public) verification key, used to authenticate with the server. Checked
/// against the stake table.
pub verification_key: SignatureScheme::VerificationKey,

/// The underlying (private) signing key, used to sign messages to send to the server during the
/// authentication phase.
pub signing_key: SignatureScheme::SigningKey,
}

#[async_trait]
impl<
SignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
ProtocolType: Protocol,
> Flow<SignatureScheme, ProtocolType> for ToMarshal
> Flow<SignatureScheme, ProtocolType> for UserToMarshal<SignatureScheme>
where
SignatureScheme::Signature: CanonicalSerialize + CanonicalDeserialize,
SignatureScheme::VerificationKey: CanonicalSerialize + CanonicalDeserialize,
SignatureScheme::SigningKey: CanonicalSerialize + CanonicalDeserialize,
{
/// The steps on `ToMarshal`'s connection:
/// The steps on `UserToMarshal`'s connection:
/// 1. Authenticate with the marshal with a signed message, who optionally
/// returns a permit and a server address
/// 2. Use the permit and server address to connect to the broker
async fn connect(
endpoint: String,
signing_key: &SignatureScheme::SigningKey,
verification_key: &SignatureScheme::VerificationKey,
subscribed_topics: Vec<Topic>,
) -> Result<ProtocolType::Connection> {
async fn connect(&self) -> Result<ProtocolType::Connection> {
// Create the initial connection, which is unauthenticated at this point
let connection = bail!(
ProtocolType::Connection::connect(endpoint).await,
ProtocolType::Connection::connect(self.endpoint.clone()).await,
Connection,
"failed to connect to marshal"
);
Expand All @@ -85,7 +89,7 @@ where
let signature = bail!(
SignatureScheme::sign(
&(),
signing_key,
&self.signing_key,
timestamp.to_le_bytes(),
&mut DeterministicRng(0),
),
Expand All @@ -95,7 +99,7 @@ where

// Serialize the verify key
let verification_key_bytes = bail!(
crypto::serialize(verification_key),
crypto::serialize(&self.verification_key),
Serialize,
"failed to serialize verification key"
);
Expand Down Expand Up @@ -195,20 +199,6 @@ where
));
};

// Send our subscribed topics to the broker
let subscribed_topics_to_broker = Message::Subscribe(Subscribe {
topics: subscribed_topics,
});

// Subscribe to topics with the broker
bail!(
connection
.send_message(Arc::from(subscribed_topics_to_broker))
.await,
Connection,
"failed to send initial subscribe message to broker"
);

Ok(connection)
}
}
6 changes: 3 additions & 3 deletions proto/src/connection/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ pub mod tcp;
const _: [(); 0 - (!(usize::BITS >= u64::BITS)) as usize] = [];


pub trait Protocol: Send + Sync {
pub trait Protocol: Send + Sync + 'static {
type Connection: Connection;
type Listener: Listener<Self::Connection>;
}

#[async_trait]
pub trait Connection: Send + Sync {
pub trait Connection: Send + Sync + 'static {
/// Receive a single message from the connection.
///
/// # Errors
Expand All @@ -42,7 +42,7 @@ pub trait Connection: Send + Sync {
}

#[async_trait]
pub trait Listener<ConnectionType: Connection>: Send + Sync {
pub trait Listener<ConnectionType: Connection>: Send + Sync + 'static {
/// Bind to the local address, returning an instance of `Self`.
///
/// # Errors
Expand Down
Loading

0 comments on commit 0cd5c7d

Please sign in to comment.