Skip to content

Commit

Permalink
signature scheme trait
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 14, 2024
1 parent b8ebca1 commit ba6d56d
Show file tree
Hide file tree
Showing 28 changed files with 363 additions and 429 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 3 additions & 15 deletions broker/src/handlers/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,11 @@
use std::{sync::Arc, time::Duration};

use proto::{
authenticate_with_broker,
connection::{
authenticate_with_broker, connection::{
auth::broker::BrokerAuth,
batch::{BatchedSender, Position},
protocols::{Protocol, Receiver},
},
crypto::{Scheme, Serializable},
error::{Error, Result},
message::Message,
verify_broker, BrokerProtocol,
}, crypto::signature::SignatureScheme, error::{Error, Result}, message::Message, verify_broker, BrokerProtocol
};
use tracing::{error, info};

Expand All @@ -22,14 +17,7 @@ use crate::{

use crate::metrics;

impl<BrokerSignatureScheme: Scheme, UserSignatureScheme: Scheme>
Inner<BrokerSignatureScheme, UserSignatureScheme>
where
BrokerSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::Signature: Serializable,
UserSignatureScheme::VerificationKey: Serializable,
UserSignatureScheme::Signature: Serializable,
{
impl<BrokerScheme: SignatureScheme, UserScheme: SignatureScheme> Inner<BrokerScheme, UserScheme> {
/// This function is the callback for handling a broker (private) connection.
pub async fn handle_broker_connection(
self: Arc<Self>,
Expand Down
25 changes: 6 additions & 19 deletions broker/src/handlers/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ use proto::{
auth::broker::BrokerAuth,
batch::{BatchedSender, Position},
protocols::{Protocol, Receiver},
},
crypto::{Scheme, Serializable},
message::Message,
UserProtocol,
}, crypto::signature::SignatureScheme, message::Message, UserProtocol
};
use slotmap::Key;
use tracing::info;
Expand All @@ -25,13 +22,8 @@ use crate::{
get_lock, send_broadcast, send_direct, send_or_remove_many, state::ConnectionId, Inner,
};

impl<BrokerSignatureScheme: Scheme, UserSignatureScheme: Scheme>
Inner<BrokerSignatureScheme, UserSignatureScheme>
where
BrokerSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::Signature: Serializable,
UserSignatureScheme::VerificationKey: Serializable,
UserSignatureScheme::Signature: Serializable,
impl<BrokerScheme: SignatureScheme, UserScheme: SignatureScheme>
Inner<BrokerScheme, UserScheme>
{
/// This function handles a user (public) connection.
pub async fn handle_user_connection(
Expand All @@ -40,14 +32,9 @@ where
<UserProtocol as Protocol>::Sender,
<UserProtocol as Protocol>::Receiver,
),
) where
BrokerSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::Signature: Serializable,
UserSignatureScheme::VerificationKey: Serializable,
UserSignatureScheme::Signature: Serializable,
{
) {
// Verify (authenticate) the connection
let Ok((verification_key, topics)) = BrokerAuth::<UserSignatureScheme>::verify_user(
let Ok((public_key, topics)) = BrokerAuth::<UserScheme>::verify_user(
&mut connection,
&self.identity,
&mut self.discovery_client.clone(),
Expand All @@ -74,7 +61,7 @@ where

// Add the user for their key
get_lock!(self.user_connection_lookup, write)
.subscribe_connection_id_to_keys(connection_id, vec![verification_key]);
.subscribe_connection_id_to_keys(connection_id, vec![public_key]);

info!("received connection from user {:?}", connection_id.data());

Expand Down
36 changes: 13 additions & 23 deletions broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ use std::{
};

mod metrics;
use proto::metrics as proto_metrics;
use proto::{
crypto::signature::{KeyPair, SignatureScheme},
metrics as proto_metrics,
};

// TODO: figure out if we should use Tokio's here
use proto::{
bail,
connection::protocols::Protocol,
crypto::{KeyPair, Scheme, Serializable},
discovery::{BrokerIdentifier, DiscoveryClient},
error::{Error, Result},
parse_socket_address, BrokerProtocol, DiscoveryClientType, UserProtocol,
Expand All @@ -36,7 +38,7 @@ use crate::metrics::RUNNING_SINCE;

/// The broker's configuration. We need this when we create a new one.
/// TODO: clean up these generics. could be a generic type that implements both
pub struct Config<BrokerSignatureScheme: Scheme> {
pub struct Config<BrokerScheme: SignatureScheme> {
/// The user (public) advertise address: what the marshals send to users upon authentication.
/// Users connect to us with this address.
pub public_advertise_address: String,
Expand All @@ -60,7 +62,7 @@ pub struct Config<BrokerSignatureScheme: Scheme> {
/// The discovery endpoint. We use this to maintain consistency between brokers and marshals.
pub discovery_endpoint: String,

pub keypair: KeyPair<BrokerSignatureScheme>,
pub keypair: KeyPair<BrokerScheme>,

/// An optional TLS cert path
pub maybe_tls_cert_path: Option<String>,
Expand All @@ -69,11 +71,7 @@ pub struct Config<BrokerSignatureScheme: Scheme> {
}

/// The broker `Inner` that we use to share common data between broker tasks.
struct Inner<
// TODO: clean these up with some sort of generic trick or something
BrokerSignatureScheme: Scheme,
UserSignatureScheme: Scheme,
> {
struct Inner<BrokerScheme: SignatureScheme, UserScheme: SignatureScheme> {
/// A broker identifier that we can use to establish uniqueness among brokers.
identity: BrokerIdentifier,

Expand All @@ -82,7 +80,7 @@ struct Inner<

/// The underlying (public) verification key, used to authenticate with the server. Checked
/// against the stake table.
keypair: KeyPair<BrokerSignatureScheme>,
keypair: KeyPair<BrokerScheme>,

/// The set of all broker identities we see. Mapped against the brokers we see in `Discovery`
/// so that we don't connect multiple times.
Expand All @@ -98,15 +96,14 @@ struct Inner<
/// types.
user_connection_lookup: RwLock<ConnectionLookup<UserProtocol>>,

// connected_keys: LoggedSet<UserSignatureScheme::VerificationKey>,
/// The `PhantomData` that we need to be generic over protocol types.
pd: PhantomData<UserSignatureScheme>,
pd: PhantomData<UserScheme>,
}

/// The main `Broker` struct. We instantiate this when we want to run a broker.
pub struct Broker<BrokerSignatureScheme: Scheme, UserSignatureScheme: Scheme> {
pub struct Broker<BrokerScheme: SignatureScheme, UserScheme: SignatureScheme> {
/// The broker's `Inner`. We clone this and pass it around when needed.
inner: Arc<Inner<BrokerSignatureScheme, UserSignatureScheme>>,
inner: Arc<Inner<BrokerScheme, UserScheme>>,

/// The public (user -> broker) listener
user_listener: <UserProtocol as Protocol>::Listener,
Expand All @@ -118,21 +115,14 @@ pub struct Broker<BrokerSignatureScheme: Scheme, UserSignatureScheme: Scheme> {
metrics_bind_address: Option<SocketAddr>,
}

impl<BrokerSignatureScheme: Scheme, UserSignatureScheme: Scheme>
Broker<BrokerSignatureScheme, UserSignatureScheme>
where
BrokerSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::Signature: Serializable,
UserSignatureScheme::VerificationKey: Serializable,
UserSignatureScheme::Signature: Serializable,
{
impl<BrokerScheme: SignatureScheme, UserScheme: SignatureScheme> Broker<BrokerScheme, UserScheme> {
/// Create a new `Broker` from a `Config`
///
/// # Errors
/// - If we fail to create the `Discovery` client
/// - If we fail to bind to our public endpoint
/// - If we fail to bind to our private endpoint
pub async fn new(config: Config<BrokerSignatureScheme>) -> Result<Self> {
pub async fn new(config: Config<BrokerScheme>) -> Result<Self> {
// Extrapolate values from the underlying broker configuration
let Config {
public_advertise_address,
Expand Down
14 changes: 8 additions & 6 deletions broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
use broker::{Broker, Config};
use clap::Parser;
use jf_primitives::signatures::bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS;
use jf_primitives::signatures::{
bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme,
};
use local_ip_address::local_ip;
use proto::{
bail,
crypto::{generate_random_keypair, DeterministicRng},
crypto::{rng::DeterministicRng, signature::KeyPair},
error::{Error, Result},
};

Expand Down Expand Up @@ -56,7 +58,7 @@ async fn main() -> Result<()> {
let private_ip_address = bail!(local_ip(), Connection, "failed to get local IP address");

// Create deterministic keys for brokers (for now, obviously)
let (signing_key, verification_key) = generate_random_keypair::<BLS, _>(DeterministicRng(0))?;
let (private_key, public_key) = BLS::key_gen(&(), &mut DeterministicRng(0)).unwrap();

let broker_config = Config {
// Public addresses: explicitly defined advertise address, bind address is on every interface
Expand All @@ -74,9 +76,9 @@ async fn main() -> Result<()> {

discovery_endpoint: args.discovery_endpoint,

keypair: proto::crypto::KeyPair {
verification_key,
signing_key,
keypair: KeyPair {
public_key,
private_key,
},

// TODO: clap this
Expand Down
11 changes: 3 additions & 8 deletions broker/src/tasks/broker_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use proto::{
connection::protocols::{Listener, Protocol},
crypto::{Scheme, Serializable},
crypto::signature::SignatureScheme,
BrokerProtocol,
};
use tokio::spawn;
Expand All @@ -13,13 +13,8 @@ use tracing::warn;

use crate::Inner;

impl<BrokerSignatureScheme: Scheme, UserSignatureScheme: Scheme>
Inner<BrokerSignatureScheme, UserSignatureScheme>
where
BrokerSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::Signature: Serializable,
UserSignatureScheme::VerificationKey: Serializable,
UserSignatureScheme::Signature: Serializable,
impl<BrokerScheme: SignatureScheme, UserScheme: SignatureScheme>
Inner<BrokerScheme, UserScheme>
{
/// Runs the broker listener task in a loop.
pub async fn run_broker_listener_task(
Expand Down
15 changes: 3 additions & 12 deletions broker/src/tasks/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,15 @@
use std::{sync::Arc, time::Duration};

use proto::{
connection::protocols::Protocol,
crypto::{Scheme, Serializable},
discovery::DiscoveryClient,
BrokerProtocol,
connection::protocols::Protocol, crypto::signature::SignatureScheme,
discovery::DiscoveryClient, BrokerProtocol,
};
use tokio::{spawn, time::sleep};
use tracing::error;

use crate::{get_lock, Inner};

impl<BrokerSignatureScheme: Scheme, UserSignatureScheme: Scheme>
Inner<BrokerSignatureScheme, UserSignatureScheme>
where
BrokerSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::Signature: Serializable,
UserSignatureScheme::VerificationKey: Serializable,
UserSignatureScheme::Signature: Serializable,
{
impl<BrokerScheme: SignatureScheme, UserScheme: SignatureScheme> Inner<BrokerScheme, UserScheme> {
/// This task deals with setting the number of our connected users in Redis or the embedded db. It allows
/// the marshal to correctly choose the broker with the least amount of connections.
pub async fn run_heartbeat_task(self: Arc<Self>) {
Expand Down
16 changes: 2 additions & 14 deletions broker/src/tasks/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ use crate::{
};
use crate::{new_serialized_message, send_or_remove_many};
use proto::{
bail,
connection::batch::Position,
crypto::{Scheme, Serializable},
error::{Error, Result},
message::Message,
BrokerProtocol,
bail, connection::batch::Position, crypto::signature::SignatureScheme, error::{Error, Result}, message::Message, BrokerProtocol
};
use tokio::time::sleep;
use tracing::error;
Expand All @@ -41,14 +36,7 @@ macro_rules! send_update_to_brokers {
}};
}

impl<BrokerSignatureScheme: Scheme, UserSignatureScheme: Scheme>
Inner<BrokerSignatureScheme, UserSignatureScheme>
where
BrokerSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::Signature: Serializable,
UserSignatureScheme::VerificationKey: Serializable,
UserSignatureScheme::Signature: Serializable,
{
impl<BrokerScheme: SignatureScheme, UserScheme: SignatureScheme> Inner<BrokerScheme, UserScheme> {
/// This task deals with sending connected user and topic updates to other brokers. It takes advantage of
/// `SnapshotMap`, so we can send partial or full updates to the other brokers as they need it.
/// Right now, we do it every 5 seconds, or on every user connect if the number of connections is
Expand Down
11 changes: 2 additions & 9 deletions broker/src/tasks/user_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,15 @@ use std::sync::Arc;

use proto::{
connection::protocols::{Listener, Protocol},
crypto::{Scheme, Serializable},
crypto::signature::SignatureScheme,
UserProtocol,
};
use tokio::spawn;
use tracing::warn;

use crate::Inner;

impl<BrokerSignatureScheme: Scheme, UserSignatureScheme: Scheme>
Inner<BrokerSignatureScheme, UserSignatureScheme>
where
BrokerSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::Signature: Serializable,
UserSignatureScheme::VerificationKey: Serializable,
UserSignatureScheme::Signature: Serializable,
{
impl<BrokerScheme: SignatureScheme, UserScheme: SignatureScheme> Inner<BrokerScheme, UserScheme> {
// We run the user listener task in a loop, accepting and handling new connections as needed.
pub async fn run_user_listener_task(
self: Arc<Self>,
Expand Down
Loading

0 comments on commit ba6d56d

Please sign in to comment.