Skip to content

Commit

Permalink
fix: use batch subscribe and remove 4050 (#388)
Browse files Browse the repository at this point in the history
* fix: use batch subscribe and remove 4050

* fix: handle individual topic errors

* fix: remove error catch case that is not expected
  • Loading branch information
chris13524 authored Mar 5, 2024
1 parent b398692 commit a3917e1
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 202 deletions.
55 changes: 55 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ pub struct Metrics {
relay_subscribe_failures: Counter<u64>,
relay_subscribe_latency: Histogram<u64>,
relay_subscribe_request_latency: Histogram<u64>,
relay_batch_subscribes: Counter<u64>,
relay_batch_subscribe_failures: Counter<u64>,
relay_batch_subscribe_latency: Histogram<u64>,
relay_batch_subscribe_request_latency: Histogram<u64>,
postgres_queries: Counter<u64>,
postgres_query_latency: Histogram<u64>,
keys_server_requests: Counter<u64>,
Expand Down Expand Up @@ -136,6 +140,28 @@ impl Metrics {
.with_description("The latency subscribing to relay topics")
.init();

let relay_batch_subscribes: Counter<u64> = meter
.u64_counter("relay_batch_subscribes")
.with_description(
"The number of batch subscribes to relay topics (not including retries)",
)
.init();

let relay_batch_subscribe_failures: Counter<u64> = meter
.u64_counter("relay_batch_subscribe_failures")
.with_description("The number of failures to batch subscribe to relay topics")
.init();

let relay_batch_subscribe_latency: Histogram<u64> = meter
.u64_histogram("relay_batch_subscribe_latency")
.with_description("The latency batch subscribing to relay topics w/ built-in retry")
.init();

let relay_batch_subscribe_request_latency: Histogram<u64> = meter
.u64_histogram("relay_batch_subscribe_request_latency")
.with_description("The latency batch subscribing to relay topics")
.init();

let postgres_queries: Counter<u64> = meter
.u64_counter("postgres_queries")
.with_description("The number of Postgres queries executed")
Expand Down Expand Up @@ -222,6 +248,10 @@ impl Metrics {
relay_subscribe_failures,
relay_subscribe_latency,
relay_subscribe_request_latency,
relay_batch_subscribes,
relay_batch_subscribe_failures,
relay_batch_subscribe_latency,
relay_batch_subscribe_request_latency,
postgres_queries,
postgres_query_latency,
keys_server_requests,
Expand Down Expand Up @@ -338,6 +368,31 @@ impl Metrics {
.record(&ctx, elapsed.as_millis() as u64, &[]);
}

pub fn relay_batch_subscribe(&self, success: bool, start: Instant) {
let elapsed = start.elapsed();

let ctx = Context::current();
let attributes = [KeyValue::new("success", success.to_string())];
self.relay_batch_subscribes.add(&ctx, 1, &attributes);
self.relay_batch_subscribe_latency
.record(&ctx, elapsed.as_millis() as u64, &attributes);
}

pub fn relay_batch_subscribe_failure(&self, is_permanent: bool) {
let ctx = Context::current();
let attributes = [KeyValue::new("is_permanent", is_permanent.to_string())];
self.relay_batch_subscribe_failures
.add(&ctx, 1, &attributes);
}

pub fn relay_batch_subscribe_request(&self, start: Instant) {
let elapsed = start.elapsed();

let ctx = Context::current();
self.relay_batch_subscribe_request_latency
.record(&ctx, elapsed.as_millis() as u64, &[]);
}

pub fn postgres_query(&self, query_name: &'static str, start: Instant) {
let elapsed = start.elapsed();

Expand Down
99 changes: 57 additions & 42 deletions src/publish_relay_message.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
use {
crate::{
metrics::Metrics,
spec::{NOTIFY_NOOP_TAG, NOTIFY_NOOP_TTL},
},
crate::metrics::Metrics,
relay_client::{error::Error, http::Client},
relay_rpc::{
domain::Topic,
rpc::{self, msg_id::get_message_id, Publish, PublishError, SubscriptionError},
},
std::{
sync::{Arc, OnceLock},
time::{Duration, Instant},
},
std::time::{Duration, Instant},
tokio::time::sleep,
tracing::{error, info, instrument, warn},
};
Expand All @@ -33,7 +27,7 @@ pub async fn publish_relay_message(
info!("publish_relay_message");
let start = Instant::now();

let client_publish_call = || async {
let call = || async {
let start = Instant::now();
let result = relay_client
.publish(
Expand Down Expand Up @@ -63,7 +57,7 @@ pub async fn publish_relay_message(
};

let mut tries = 0;
while let Err(e) = client_publish_call().await {
while let Err(e) = call().await {
tries += 1;
let is_permanent = tries >= 10;
if let Some(metrics) = metrics {
Expand Down Expand Up @@ -104,30 +98,17 @@ pub async fn subscribe_relay_topic(
info!("subscribe_relay_topic");
let start = Instant::now();

let client_publish_call = || async {
let call = || async {
let start = Instant::now();
let result = relay_client.subscribe_blocking(topic.clone()).await;
if let Some(metrics) = metrics {
metrics.relay_subscribe_request(start);
}
match result {
Ok(_) => Ok(()),
Err(e) => match e {
Error::Response(rpc::Error::Handler(
SubscriptionError::SubscriberLimitExceeded,
)) => {
// FIXME figure out how to handle this properly; being unable to subscribe means a broken state
// https://walletconnect.slack.com/archives/C058RS0MH38/p1708183383748259
warn!("Subscriber limit exceeded for topic {topic}");
Ok(())
}
e => Err(e),
},
}
result
};

let mut tries = 0;
while let Err(e) = client_publish_call().await {
while let Err(e) = call().await {
tries += 1;
let is_permanent = tries >= 10;
if let Some(metrics) = metrics {
Expand Down Expand Up @@ -162,25 +143,59 @@ pub async fn subscribe_relay_topic(
}

#[instrument(skip(relay_client, metrics))]
pub async fn extend_subscription_ttl(
pub async fn batch_subscribe_relay_topics(
relay_client: &Client,
topic: Topic,
topics: Vec<Topic>,
metrics: Option<&Metrics>,
) -> Result<(), Error<PublishError>> {
info!("extend_subscription_ttl");

// Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime
static LOCK: OnceLock<Arc<str>> = OnceLock::new();
let message = LOCK.get_or_init(|| "".into()).clone();

let publish = Publish {
topic,
message,
tag: NOTIFY_NOOP_TAG,
ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32,
prompt: false,
) -> Result<(), Error<SubscriptionError>> {
info!("batch_subscribe_relay_topic");
let start = Instant::now();

let call = || async {
let start = Instant::now();
let result = relay_client.batch_subscribe_blocking(topics.clone()).await;
if let Some(metrics) = metrics {
metrics.relay_batch_subscribe_request(start);
}
// TODO process each error individually
// TODO retry relay internal failures?
// https://github.com/WalletConnect/notify-server/issues/395
result
};
publish_relay_message(relay_client, &publish, metrics).await

let mut tries = 0;
while let Err(e) = call().await {
tries += 1;
let is_permanent = tries >= 10;
if let Some(metrics) = metrics {
metrics.relay_batch_subscribe_failure(is_permanent);
}

if is_permanent {
error!("Permanent error batch subscribing to topics, took {tries} tries: {e:?}");

if let Some(metrics) = metrics {
// TODO make DRY with end-of-function call
metrics.relay_batch_subscribe(false, start);
}
return Err(e);
}

let retry_in = calculate_retry_in(tries);
warn!(
"Temporary error batch subscribing to topics, retrying attempt {tries} in {retry_in:?}: {e:?}"
);
sleep(retry_in).await;
}

if let Some(metrics) = metrics {
metrics.relay_batch_subscribe(true, start);
}

// Sleep to account for some replication lag. Without this, the subscription may not be active on all nodes
sleep(Duration::from_millis(250)).await;

Ok(())
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use {
},
error::NotifyServerError,
model::helpers::{get_project_by_topic, get_welcome_notification, upsert_subscriber},
publish_relay_message::{
extend_subscription_ttl, publish_relay_message, subscribe_relay_topic,
},
publish_relay_message::{publish_relay_message, subscribe_relay_topic},
rate_limit::{self, Clock, RateLimitError},
registry::storage::redis::Redis,
rpc::{decode_key, derive_key, JsonRpcResponse, NotifySubscribe, ResponseAuth},
Expand Down Expand Up @@ -275,10 +273,6 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
info!("Finished publishing subscribe response");
}

extend_subscription_ttl(&state.relay_client, notify_topic, state.metrics.as_ref())
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error?

// TODO do in same txn as upsert_subscriber()
if subscriber.inserted {
let welcome_notification =
Expand Down
5 changes: 1 addition & 4 deletions src/services/public_http_server/handlers/subscribe_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
error::NotifyServerError,
model::helpers::upsert_project,
publish_relay_message::{extend_subscription_ttl, subscribe_relay_topic},
publish_relay_message::subscribe_relay_topic,
rate_limit::{self, Clock, RateLimitError},
registry::{extractor::AuthedProjectId, storage::redis::Redis},
state::AppState,
Expand Down Expand Up @@ -103,9 +103,6 @@ pub async fn handler(
info!("Subscribing to project topic: {topic}");
subscribe_relay_topic(&state.relay_client, &topic, state.metrics.as_ref()).await?;

info!("Extending subscription TTL");
extend_subscription_ttl(&state.relay_client, topic.clone(), state.metrics.as_ref()).await?;

info!("Successfully subscribed to project topic: {topic}");
Ok(Json(SubscribeTopicResponseBody {
authentication_key: project.authentication_public_key,
Expand Down
49 changes: 21 additions & 28 deletions src/services/relay_renewal_job/refresh_topic_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use {
error::NotifyServerError,
metrics::Metrics,
model::helpers::{get_project_topics, get_subscriber_topics},
publish_relay_message::{extend_subscription_ttl, subscribe_relay_topic},
publish_relay_message::batch_subscribe_relay_topics,
},
futures_util::{StreamExt, TryFutureExt, TryStreamExt},
futures_util::StreamExt,
relay_client::http::Client,
relay_rpc::domain::Topic,
sqlx::PgPool,
Expand Down Expand Up @@ -46,6 +46,14 @@ pub async fn run(
let topics_count = topics.len();
info!("topics_count: {topics_count}");

let topic_batches = topics
// Chunk as 1 since we don't yet have the ability to process each topic error individually
// https://github.com/WalletConnect/notify-server/issues/395
// .chunks(MAX_SUBSCRIPTION_BATCH_SIZE)
.chunks(1)
.map(|topics| topics.to_vec())
.collect::<Vec<_>>();

// Limit concurrency to avoid overwhelming the relay with requests.
const REQUEST_CONCURRENCY: usize = 25;

Expand All @@ -64,36 +72,21 @@ pub async fn run(
let client = &client;
let metrics = metrics.as_ref();

// Using `batch_subscription` was removed in https://github.com/WalletConnect/notify-server/pull/359
// We can't really use this right now because we are also extending the topic TTL which could take longer than the 5m TTL
let result = futures_util::stream::iter(topics)
.map(|topic| async move {
// Subscribe a second time as the initial subscription above may have expired
subscribe_relay_topic(client, &topic, metrics)
.map_ok(|_| ())
.map_err(NotifyServerError::from)
.and_then(|_| {
// Subscribing only guarantees 5m TTL, so we always need to extend it.
extend_subscription_ttl(client, topic.clone(), metrics)
.map_ok(|_| ())
.map_err(Into::into)
})
.await
})
// Above we want to resubscribe as quickly as possible so use a high concurrency value
// But here we prefer stability and are OK with a lower value
let result = futures_util::stream::iter(topic_batches)
.map(|topics| batch_subscribe_relay_topics(client, topics, metrics))
.buffer_unordered(REQUEST_CONCURRENCY)
.try_collect::<Vec<_>>()
.collect::<Vec<_>>()
.await;
let elapsed: u64 = start.elapsed().as_millis().try_into().unwrap();
if let Err(e) = result {
// An error here is bad, as topics will not have been renewed.
// However, this should be rare and many resubscribes will happen within 30 days so all topics should be renewed eventually.
// With <https://github.com/WalletConnect/notify-server/issues/325> we will be able to guarantee renewal much better.
error!("Failed to renew all topic subscriptions in {elapsed}ms: {e}");
} else {
info!("Success renewing all topic subscriptions in {elapsed}ms");
for result in &result {
if let Err(e) = result {
// An error here is bad, as topics will not have been renewed.
// However, this should be rare and many resubscribes will happen within 30 days so all topics should be renewed eventually.
// With <https://github.com/WalletConnect/notify-server/issues/325> we will be able to guarantee renewal much better.
error!("Failed to renew some topic subscriptions: {e}");
}
}
info!("Completed topic renew job (possibly with errors) in {elapsed}ms");
*renew_all_topics_lock.lock().await = false;

if let Some(metrics) = metrics {
Expand Down
2 changes: 0 additions & 2 deletions src/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub const NOTIFY_WATCH_SUBSCRIPTIONS_TAG: u32 = 4010;
pub const NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG: u32 = 4011;
pub const NOTIFY_SUBSCRIPTIONS_CHANGED_TAG: u32 = 4012;
pub const NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TAG: u32 = 4013;
pub const NOTIFY_NOOP_TAG: u32 = 4050;
pub const NOTIFY_GET_NOTIFICATIONS_TAG: u32 = 4014;
pub const NOTIFY_GET_NOTIFICATIONS_RESPONSE_TAG: u32 = 4015;

Expand Down Expand Up @@ -56,7 +55,6 @@ pub const NOTIFY_WATCH_SUBSCRIPTIONS_TTL: Duration = T300;
pub const NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TTL: Duration = T300;
pub const NOTIFY_SUBSCRIPTIONS_CHANGED_TTL: Duration = T300;
pub const NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TTL: Duration = T300;
pub const NOTIFY_NOOP_TTL: Duration = T300;
pub const NOTIFY_GET_NOTIFICATIONS_TTL: Duration = T300;
pub const NOTIFY_GET_NOTIFICATIONS_RESPONSE_TTL: Duration = T300;

Expand Down
Loading

0 comments on commit a3917e1

Please sign in to comment.