Skip to content

Commit

Permalink
feat(rpc+synchronizer): get epoch returns errors if not staked or ver…
Browse files Browse the repository at this point in the history
…sion mismatch
  • Loading branch information
matthias-wright committed Oct 17, 2023
1 parent 2b41963 commit f433a49
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5097,6 +5097,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"fleek-crypto",
"lightning-interfaces",
"rand 0.8.5",
"reqwest",
Expand Down
5 changes: 5 additions & 0 deletions core/interfaces/src/syncronizer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use affair::Socket;
use fleek_crypto::NodePublicKey;
use infusion::c;
use lightning_types::Blake3Hash;
use tokio::sync::mpsc::Receiver;
Expand All @@ -11,6 +12,7 @@ use crate::{
ConfigProviderInterface,
Notification,
NotifierInterface,
SignerInterface,
WithStartAndShutdown,
};

Expand All @@ -25,6 +27,7 @@ pub trait SyncronizerInterface<C: Collection>:
app: ::ApplicationInterface,
blockstore_server: ::BlockStoreServerInterface,
notifier: ::NotifierInterface,
signer: ::SignerInterface,
) {
let sqr = app.sync_query();
let (tx_epoch_change, rx_epoch_change) = tokio::sync::mpsc::channel(10);
Expand All @@ -34,6 +37,7 @@ pub trait SyncronizerInterface<C: Collection>:
sqr,
blockstore_server,
rx_epoch_change,
signer.get_ed25519_pk(),
)
}

Expand All @@ -43,6 +47,7 @@ pub trait SyncronizerInterface<C: Collection>:
query_runner: c!(C::ApplicationInterface::SyncExecutor),
blockstore_server: &C::BlockStoreServerInterface,
rx_epoch_change: Receiver<Notification>,
node_public_key: NodePublicKey,
) -> anyhow::Result<Self>;

/// Returns a socket that will send accross the blake3hash of the checkpoint
Expand Down
2 changes: 2 additions & 0 deletions core/mock/src/syncronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Mutex;

use anyhow::Result;
use async_trait::async_trait;
use fleek_crypto::NodePublicKey;
use lightning_interfaces::infu_collection::{c, Collection};
use lightning_interfaces::types::Blake3Hash;
use lightning_interfaces::{
Expand Down Expand Up @@ -43,6 +44,7 @@ impl<C: Collection> SyncronizerInterface<C> for MockSyncronizer<C> {
_query_runner: c!(C::ApplicationInterface::SyncExecutor),
_blockstore_server: &C::BlockStoreServerInterface,
_rx_epoch_change: Receiver<Notification>,
_node_public_key: NodePublicKey,
) -> Result<Self> {
let (tx, rx) = oneshot::channel();
Ok(Self {
Expand Down
30 changes: 29 additions & 1 deletion core/rpc/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ use crate::eth::{
net_version,
};
use crate::server::RpcData;
use crate::types::{NodeKeyParam, PublicKeyParam};
use crate::types::{NodeKeyParam, PublicKeyParam, VersionedNodeKeyParam};
static OPEN_RPC_DOCS: &str = "../../docs/rpc/openrpc.json";

pub const RPC_VERSION: u8 = 1;

pub type Result<T> = anyhow::Result<T, Error>;

pub async fn rpc_handler(
Expand Down Expand Up @@ -120,6 +122,7 @@ impl RpcServer {
get_committee_members_handler::<C>,
)
.with_method("flk_get_epoch", get_epoch_handler::<C>)
.with_method("flk_get_epoch_testnet", get_epoch_testnet_handler::<C>)
.with_method("flk_get_epoch_info", get_epoch_info_handler::<C>)
.with_method("flk_get_total_supply", get_total_supply_handler::<C>)
.with_method(
Expand Down Expand Up @@ -361,6 +364,31 @@ pub async fn get_epoch_handler<C: Collection>(data: Data<Arc<RpcData<C>>>) -> Re
Ok(data.0.query_runner.get_epoch())
}

pub async fn get_epoch_testnet_handler<C: Collection>(
data: Data<Arc<RpcData<C>>>,
Params(params): Params<VersionedNodeKeyParam>,
) -> Result<u64> {
let Some(node_info) = data.query_runner.get_node_info(&params.public_key) else {
return Err(jsonrpc_v2::Error::Provided { code: 123, message: "Node is not staked" });
};
let min_amount = data.query_runner.get_staking_amount();
if node_info.stake.staked < min_amount.into() {
return Err(jsonrpc_v2::Error::Provided {
code: 123,
message: "Node is not staked",
});
}

if params.version != RPC_VERSION {
return Err(Error::Provided {
code: 69,
message: "Version Mismatch",
});
}

Ok(data.0.query_runner.get_epoch())
}

pub async fn get_epoch_info_handler<C: Collection>(
data: Data<Arc<RpcData<C>>>,
) -> Result<EpochInfo> {
Expand Down
6 changes: 6 additions & 0 deletions core/rpc/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ pub struct ClientKeyParam {
pub public_key: ClientPublicKey,
}

#[derive(Deserialize)]
pub struct VersionedNodeKeyParam {
pub public_key: NodePublicKey,
pub version: u8,
}

#[derive(Deserialize)]
pub struct DhtPutParam {
pub key: Vec<u8>,
Expand Down
1 change: 1 addition & 0 deletions core/syncronizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ serde_json.workspace = true
serde.workspace = true
tokio.workspace = true
rand.workspace = true
fleek-crypto.workspace = true

40 changes: 39 additions & 1 deletion core/syncronizer/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::IpAddr;

use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context, Result};
use fleek_crypto::{NodePublicKey, PublicKey};
use serde::de::DeserializeOwned;

pub async fn rpc_request<T: DeserializeOwned>(
Expand All @@ -20,6 +21,17 @@ pub async fn rpc_request<T: DeserializeOwned>(
if value.get("result").is_some() {
let value: RpcResponse<T> = serde_json::from_value(value)?;
Ok(value)
} else if value.get("error").is_some() {
let code = value
.get("error")
.unwrap()
.get("code")
.context("Failed to parse response")?;
match serde_json::from_value::<u8>(code.clone()) {
Ok(69) => Err(anyhow!("Version mismatch")).context(RequestError::VersionMismatch),
Ok(123) => Err(anyhow!("Node is not staked")).context(RequestError::NotStaked),
_ => Err(anyhow!("Failed to parse response")),
}
} else {
Err(anyhow!("Failed to parse response"))
}
Expand Down Expand Up @@ -53,3 +65,29 @@ pub fn rpc_epoch() -> serde_json::Value {
"id":1,
})
}

pub fn rpc_epoch_testnet(node_public_key: NodePublicKey) -> serde_json::Value {
serde_json::json!({
"jsonrpc": "2.0",
"method":"flk_get_epoch_testnet",
"params": {"public_key": node_public_key.to_base58(), "version": 1},
"id":1,
})
}

#[derive(Debug)]
pub enum RequestError {
NotStaked,
VersionMismatch,
}

impl std::fmt::Display for RequestError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RequestError::NotStaked => write!(f, "Node is not staked."),
RequestError::VersionMismatch => {
write!(f, "Version mismatch. Please update your binary.")
},
}
}
}
24 changes: 20 additions & 4 deletions core/syncronizer/src/syncronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::{Duration, SystemTime};

use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use fleek_crypto::NodePublicKey;
use lightning_interfaces::infu_collection::{c, Collection};
use lightning_interfaces::types::{
Blake3Hash,
Expand All @@ -27,10 +28,10 @@ use serde::de::DeserializeOwned;
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tracing::info;
use tracing::{error, info};

use crate::config::Config;
use crate::rpc::{rpc_epoch, rpc_last_epoch_hash, rpc_request};
use crate::rpc::{rpc_epoch_testnet, rpc_last_epoch_hash, rpc_request, RequestError};

pub struct Syncronizer<C: Collection> {
inner: Mutex<Option<SyncronizerInner<C>>>,
Expand All @@ -46,6 +47,7 @@ pub struct SyncronizerInner<C: Collection> {
genesis_committee: Vec<(NodeIndex, NodeInfo)>,
rpc_client: reqwest::Client,
epoch_change_delta: Duration,
node_public_key: NodePublicKey,
}

#[async_trait]
Expand Down Expand Up @@ -99,12 +101,14 @@ impl<C: Collection> SyncronizerInterface<C> for Syncronizer<C> {
query_runner: c!(C::ApplicationInterface::SyncExecutor),
blockstore_server: &C::BlockStoreServerInterface,
rx_epoch_change: Receiver<Notification>,
node_public_key: NodePublicKey,
) -> Result<Self> {
let inner = SyncronizerInner::new(
query_runner,
blockstore_server,
rx_epoch_change,
config.epoch_change_delta,
node_public_key,
);

Ok(Self {
Expand All @@ -128,6 +132,7 @@ impl<C: Collection> SyncronizerInner<C> {
blockstore_server: &C::BlockStoreServerInterface,
rx_epoch_change: Receiver<Notification>,
epoch_change_delta: Duration,
node_public_key: NodePublicKey,
) -> Self {
let mut genesis_committee = query_runner.genesis_committee();
// Shuffle this since we often hit this list in order until one responds. This will give our
Expand All @@ -143,6 +148,7 @@ impl<C: Collection> SyncronizerInner<C> {
genesis_committee,
rpc_client,
epoch_change_delta,
node_public_key,
}
}

Expand Down Expand Up @@ -199,7 +205,16 @@ impl<C: Collection> SyncronizerInner<C> {
let current_epoch = self.query_runner.get_epoch();

// Get the epoch the bootstrap nodes are at
let bootstrap_epoch = self.get_current_epoch().await?;
let bootstrap_epoch = match self.get_current_epoch().await {
Ok(bootstrap_epoch) => bootstrap_epoch,
Err(e) => {
if let Some(e) = e.downcast_ref::<RequestError>() {
// node not staked or invalid version
error!("{e:?}");
}
return Err(e);
},
};

if bootstrap_epoch <= current_epoch {
bail!("Bootstrap nodes are on the same epoch");
Expand Down Expand Up @@ -262,7 +277,8 @@ impl<C: Collection> SyncronizerInner<C> {

/// Returns the epoch the bootstrap nodes are on
async fn get_current_epoch(&self) -> Result<Epoch> {
self.ask_bootstrap_nodes(rpc_epoch().to_string()).await
self.ask_bootstrap_nodes(rpc_epoch_testnet(self.node_public_key).to_string())
.await
}
}

Expand Down

0 comments on commit f433a49

Please sign in to comment.