diff --git a/client/src/lib.rs b/client/src/lib.rs index c2c5b18..bce3273 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -15,7 +15,7 @@ use proto::{ crypto, error::Error, error::Result, - message::{Broadcast, Direct, Message, Subscribe, Topic, Unsubscribe}, + message::{Broadcast, Direct, Message, Topic}, }; /// `Client` is a light wrapper around a `Sticky` connection that provides functions @@ -130,20 +130,8 @@ where /// If the connection or serialization has failed /// /// TODO IMPORTANT: see if we want this, or if we'd prefer `set_subscriptions()` - pub async fn subscribe(&self, topics: Vec) -> Result<()> { - // Lock subscribed topics here so if we're reconnecting we maintain parity - // with our list - - // Lock our topics here so we can't add them on failure - let mut topic_guard = self.0.inner.subscribed_topics.lock().await; - - // Form and send the single message - self.send_message_raw(Arc::from(Message::Subscribe(Subscribe { - topics: topics.clone(), - }))) - .await - // Only add to our topic map if the message was successful - .map(|()| topic_guard.extend(topics)) + pub async fn subscribe(&self, _topics: Vec) -> Result<()> { + todo!() } /// Sends a message to the server that asserts that this client is no longer @@ -151,20 +139,8 @@ where /// /// # Errors /// If the connection or serialization has failed - pub async fn unsubscribe(&self, topics: Vec) -> Result<()> { - // Lock subscribed topics here so if we're reconnecting we maintain parity - // with our list - - // Lock our topics here so we can't add them on failure - let mut topic_guard = self.0.inner.subscribed_topics.lock().await; - - // Form and send the single message - self.send_message_raw(Arc::from(Message::Unsubscribe(Unsubscribe { - topics: topics.clone(), - }))) - .await - // Only add to our topic map if the message was successful - .map(|()| topic_guard.extend(topics)) + pub async fn unsubscribe(&self, _topics: Vec) -> Result<()> { + todo!() } /// Sends a pre-formed message over the wire. Various functions make use diff --git a/client/src/main.rs b/client/src/main.rs index 209033a..ad4b3fe 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -2,10 +2,9 @@ use std::marker::PhantomData; use client::{Client, Config}; use proto::{ - connection::{flow::ToMarshal, protocols::quic::Quic}, + connection::{flow::UserToMarshal, protocols::quic::Quic}, crypto, error::Result, - message::Topic, }; use jf_primitives::signatures::bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS; @@ -14,11 +13,14 @@ use jf_primitives::signatures::bls_over_bn254::BLSOverBN254CurveSignatureScheme async fn main() -> Result<()> { let (signing_key, verification_key) = crypto::generate_random_keypair::()?; - let client = Client::::new(Config { + let connection_flow = UserToMarshal { verification_key, signing_key, - remote_address: "google.com:80".to_string(), - initial_subscribed_topics: vec![Topic::Global], + endpoint: "google.com:80".to_string(), + }; + + let client = Client::::new(Config { + flow: connection_flow, pd: PhantomData, }) .await?; diff --git a/proto/src/connection/flow.rs b/proto/src/connection/flow.rs index 28e13ba..92e7e61 100644 --- a/proto/src/connection/flow.rs +++ b/proto/src/connection/flow.rs @@ -16,7 +16,7 @@ use crate::{ bail, crypto::{self, DeterministicRng}, error::{Error, Result}, - message::{AuthenticateWithKey, AuthenticateWithPermit, Message, Subscribe, Topic}, + message::{AuthenticateWithKey, AuthenticateWithPermit, Message}, }; use super::protocols::Protocol; @@ -24,51 +24,55 @@ use crate::connection::protocols::Connection; /// TODO: BIDIRECTIONAL AUTHENTICATION FOR USERS<->BROKERS /// -/// The `Flow` trait implements a connection flow that takes in an endpoint, -/// signing key, and verification key and returns a connection. +/// The `Flow` trait implements a connection flow that takes in a `Flow implementation`, +/// and returns an authenticated endpoint (or an error). #[async_trait] pub trait Flow< SignatureScheme: JfSignatureScheme, ProtocolType: Protocol, ->: Send + Sync +>: Send + Sync + 'static { /// This is the meat of `Flow`. We define this for every type of connection flow we have. - async fn connect( - endpoint: String, - signing_key: &SignatureScheme::SigningKey, - verification_key: &SignatureScheme::VerificationKey, - subscribed_topics: Vec, - ) -> Result; + async fn connect(&self) -> Result; } /// This struct implements `Flow`. It defines an implementation wherein we connect /// to a marshal first, who returns the server address we should connect to, along /// with a permit. Only after that do we try connecting to the broker. -pub struct ToMarshal {} +#[derive(Clone)] +pub struct UserToMarshal> +{ + /// This is the remote address that we authenticate to. It can either be a broker + /// or a marshal. + pub endpoint: String, + + /// The underlying (public) verification key, used to authenticate with the server. Checked + /// against the stake table. + pub verification_key: SignatureScheme::VerificationKey, + + /// The underlying (private) signing key, used to sign messages to send to the server during the + /// authentication phase. + pub signing_key: SignatureScheme::SigningKey, +} #[async_trait] impl< SignatureScheme: JfSignatureScheme, ProtocolType: Protocol, - > Flow for ToMarshal + > Flow for UserToMarshal where SignatureScheme::Signature: CanonicalSerialize + CanonicalDeserialize, SignatureScheme::VerificationKey: CanonicalSerialize + CanonicalDeserialize, SignatureScheme::SigningKey: CanonicalSerialize + CanonicalDeserialize, { - /// The steps on `ToMarshal`'s connection: + /// The steps on `UserToMarshal`'s connection: /// 1. Authenticate with the marshal with a signed message, who optionally /// returns a permit and a server address /// 2. Use the permit and server address to connect to the broker - async fn connect( - endpoint: String, - signing_key: &SignatureScheme::SigningKey, - verification_key: &SignatureScheme::VerificationKey, - subscribed_topics: Vec, - ) -> Result { + async fn connect(&self) -> Result { // Create the initial connection, which is unauthenticated at this point let connection = bail!( - ProtocolType::Connection::connect(endpoint).await, + ProtocolType::Connection::connect(self.endpoint.clone()).await, Connection, "failed to connect to marshal" ); @@ -85,7 +89,7 @@ where let signature = bail!( SignatureScheme::sign( &(), - signing_key, + &self.signing_key, timestamp.to_le_bytes(), &mut DeterministicRng(0), ), @@ -95,7 +99,7 @@ where // Serialize the verify key let verification_key_bytes = bail!( - crypto::serialize(verification_key), + crypto::serialize(&self.verification_key), Serialize, "failed to serialize verification key" ); @@ -195,20 +199,6 @@ where )); }; - // Send our subscribed topics to the broker - let subscribed_topics_to_broker = Message::Subscribe(Subscribe { - topics: subscribed_topics, - }); - - // Subscribe to topics with the broker - bail!( - connection - .send_message(Arc::from(subscribed_topics_to_broker)) - .await, - Connection, - "failed to send initial subscribe message to broker" - ); - Ok(connection) } } diff --git a/proto/src/connection/protocols/mod.rs b/proto/src/connection/protocols/mod.rs index 8cf39db..4a87874 100644 --- a/proto/src/connection/protocols/mod.rs +++ b/proto/src/connection/protocols/mod.rs @@ -13,13 +13,13 @@ pub mod tcp; const _: [(); 0 - (!(usize::BITS >= u64::BITS)) as usize] = []; -pub trait Protocol: Send + Sync { +pub trait Protocol: Send + Sync + 'static { type Connection: Connection; type Listener: Listener; } #[async_trait] -pub trait Connection: Send + Sync { +pub trait Connection: Send + Sync + 'static { /// Receive a single message from the connection. /// /// # Errors @@ -42,7 +42,7 @@ pub trait Connection: Send + Sync { } #[async_trait] -pub trait Listener: Send + Sync { +pub trait Listener: Send + Sync + 'static { /// Bind to the local address, returning an instance of `Self`. /// /// # Errors diff --git a/proto/src/connection/sticky.rs b/proto/src/connection/sticky.rs index c9f1dfd..2d25d2a 100644 --- a/proto/src/connection/sticky.rs +++ b/proto/src/connection/sticky.rs @@ -5,12 +5,12 @@ //! and where we can just return. Most of the errors already have //! enough context from previous bails. -use std::{collections::HashSet, marker::PhantomData, sync::Arc, time::Duration}; +use std::{marker::PhantomData, sync::Arc, time::Duration}; use jf_primitives::signatures::SignatureScheme as JfSignatureScheme; use tokio::{ - sync::{Mutex, RwLock, Semaphore}, + sync::{RwLock, Semaphore}, time::sleep, }; use tracing::error; @@ -18,7 +18,7 @@ use tracing::error; use crate::{ bail, error::{Error, Result}, - message::{Message, Topic}, + message::Message, }; use super::{ @@ -41,66 +41,44 @@ pub struct Sticky< pub inner: Arc>, } -/// `Ommer` is held exclusively by `Sticky`, wherein an `Arc` is used +/// `Inner` is held exclusively by `Sticky`, wherein an `Arc` is used /// to facilitate interior mutability. pub struct Inner< SignatureScheme: JfSignatureScheme, ProtocolType: Protocol, ConnectionFlow: Flow, > { - /// This is the remote address that we authenticate to. It can either be a broker - /// or a marshal. The authentication flow depends on the function defined in - /// `auth_flow`, so this may not be the final address (e.g. if you are asking the - /// marshal for it). - remote_address: String, - - /// The underlying (public) verification key, used to authenticate with the server. Checked against the stake - /// table. - verification_key: SignatureScheme::VerificationKey, - - /// The underlying (private) signing key, used to sign messages to send to the server during the - /// authentication phase. - signing_key: SignatureScheme::SigningKey, - - /// A list of topics we are subscribed to. This allows us to, when reconnecting, - /// easily provide the list of topics we care about. - pub subscribed_topics: Mutex>, + /// This encapsulates the underlying connection parameters that we need + /// to connect, as well as the underyling flow + flow: ConnectionFlow, /// The underlying connection, which we modify to facilitate reconnections. connection: RwLock, /// The task that runs in the background that reconnects us when we need - /// to be. This is so multiple tasks don't try doing it at the same time. + /// to be. This is so we don't spawn multiple tasks at once reconnect_semaphore: Semaphore, - pd: PhantomData<(ProtocolType, ConnectionFlow)>, + /// Phantom data that lets us use `ProtocolType`, `ConnectionFlow`, and + /// `SignatureScheme` downstream. + pd: PhantomData<(SignatureScheme, ProtocolType, ConnectionFlow)>, } -/// The configuration needed to construct a client +/// The configuration needed to construct a `Sticky` connection. pub struct Config< SignatureScheme: JfSignatureScheme, ProtocolType: Protocol, ConnectionFlow: Flow, > { - /// The verification (public) key. Sent to the server to verify - /// our identity. - pub verification_key: SignatureScheme::VerificationKey, - - /// The signing (private) key. Only used for signing the authentication - /// message sent to the server upon connection. - pub signing_key: SignatureScheme::SigningKey, - - /// The remote address(es) to connect and authenticate to. - pub remote_address: String, - - /// The topics we want to be subscribed to initially. This is needed so that - /// we can resubscribe to the same topics upon reconnection. - pub initial_subscribed_topics: Vec, + /// The (optional) state we use to add things to the connection state. + /// For example, with a `UserToMarshal` flow, we may want to send + /// the subscribed topics upon connection. + pub flow: ConnectionFlow, /// Phantom data that we pass down to `Sticky` and `StickInner`. /// Allows us to be generic over a connection method, because /// we need multiple. - pub pd: PhantomData<(ProtocolType, ConnectionFlow)>, + pub pd: PhantomData<(SignatureScheme, ProtocolType, ConnectionFlow)>, } /// This is a macro that helps with reconnections when sending @@ -121,26 +99,24 @@ macro_rules! try_with_reconnect { match operation{ Ok(res) => res, Err(err) => { - // Acquire semaphore. If another task is doing this, just return an error if $self.inner.reconnect_semaphore.try_acquire().is_ok() { // Acquire write guard, drop read guard drop(read_guard); - let mut write_guard = $self.inner.connection.write().await; - // Lock subscribed topics - let subscribed_topics = $self.inner.subscribed_topics.lock().await; - let topics:Vec = subscribed_topics.iter().cloned().collect(); + // Clone everything we need to connect + // TODO: we want to minimize cloning this. We should sign a message + // earlier. + let inner = $self.inner.clone(); + + tokio::spawn(async move{ + // Get write guard on connection so we can write to it + let mut write_guard = inner.connection.write().await; // Loop to connect and authenticate let connection = loop { - // Try to connect - match ConnectionFlow::connect( - $self.inner.remote_address.clone(), - &$self.inner.signing_key, - &$self.inner.verification_key, - topics.clone() - ) + // Try to connect with our parameters + match inner.flow.connect() .await { Ok(connection) => break connection, @@ -153,8 +129,10 @@ macro_rules! try_with_reconnect { // Set connection to new connection *write_guard = connection; + + // Drop here so other tasks can start sending messages drop(write_guard); - drop(subscribed_topics); + }); } // If somebody is already trying to reconnect, fail instantly @@ -187,13 +165,7 @@ impl< maybe_connection: Option, ) -> Result { // Extrapolate values from the underlying client configuration - let Config { - verification_key, - signing_key, - remote_address, - initial_subscribed_topics, - pd, - } = config; + let Config { flow, pd } = config; // Perform the initial connection if not provided. This is to validate // that we have correct parameters and all. @@ -206,13 +178,7 @@ impl< Some(connection) => connection, None => { bail!( - ConnectionFlow::connect( - remote_address.clone(), - &signing_key, - &verification_key, - initial_subscribed_topics.clone(), - ) - .await, + flow.connect().await, Connection, "failed to make initial connection" ) @@ -222,10 +188,7 @@ impl< // Return the slightly transformed connection. Ok(Self { inner: Arc::from(Inner { - remote_address, - signing_key, - verification_key, - subscribed_topics: Mutex::from(HashSet::from_iter(initial_subscribed_topics)), + flow, // Use the existing connection connection: RwLock::from(connection), reconnect_semaphore: Semaphore::const_new(1),