Skip to content

Commit

Permalink
fix: handle individual topic errors
Browse files Browse the repository at this point in the history
  • Loading branch information
chris13524 committed Mar 5, 2024
1 parent 374db7a commit 4abd2ba
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
9 changes: 5 additions & 4 deletions src/publish_relay_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ pub async fn batch_subscribe_relay_topics(
metrics.relay_batch_subscribe_request(start);
}
// TODO process each error individually
// TODO retry internal failures?
// TODO retry relay internal failures?
// https://github.com/WalletConnect/notify-server/issues/395
match result {
Ok(_) => Ok(()),
Err(e) => match e {
Expand All @@ -180,7 +181,7 @@ pub async fn batch_subscribe_relay_topics(
)) => {
// FIXME figure out how to handle this properly; being unable to subscribe means a broken state
// https://walletconnect.slack.com/archives/C058RS0MH38/p1708183383748259
warn!("Subscriber limit exceeded for topic {topics:?}");
warn!("Subscriber limit exceeded for topics {topics:?}");
Ok(())
}
e => Err(e),
Expand All @@ -197,7 +198,7 @@ pub async fn batch_subscribe_relay_topics(
}

if is_permanent {
error!("Permanent error subscribing to topic, took {tries} tries: {e:?}");
error!("Permanent error batch subscribing to topics, took {tries} tries: {e:?}");

if let Some(metrics) = metrics {
// TODO make DRY with end-of-function call
Expand All @@ -208,7 +209,7 @@ pub async fn batch_subscribe_relay_topics(

let retry_in = calculate_retry_in(tries);
warn!(
"Temporary error batch subscribing to topic, retrying attempt {tries} in {retry_in:?}: {e:?}"
"Temporary error batch subscribing to topics, retrying attempt {tries} in {retry_in:?}: {e:?}"
);
sleep(retry_in).await;
}
Expand Down
26 changes: 15 additions & 11 deletions src/services/relay_renewal_job/refresh_topic_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use {
model::helpers::{get_project_topics, get_subscriber_topics},
publish_relay_message::batch_subscribe_relay_topics,
},
futures_util::{StreamExt, TryStreamExt},
futures_util::StreamExt,
relay_client::http::Client,
relay_rpc::{domain::Topic, rpc::MAX_SUBSCRIPTION_BATCH_SIZE},
relay_rpc::domain::Topic,
sqlx::PgPool,
std::{sync::Arc, time::Instant},
tokio::sync::Mutex,
Expand Down Expand Up @@ -47,7 +47,10 @@ pub async fn run(
info!("topics_count: {topics_count}");

let topic_batches = topics
.chunks(MAX_SUBSCRIPTION_BATCH_SIZE)
// Chunk as 1 since we don't yet have the ability to process each topic error individually
// https://github.com/WalletConnect/notify-server/issues/395
// .chunks(MAX_SUBSCRIPTION_BATCH_SIZE)
.chunks(1)
.map(|topics| topics.to_vec())
.collect::<Vec<_>>();

Expand All @@ -72,17 +75,18 @@ pub async fn run(
let result = futures_util::stream::iter(topic_batches)
.map(|topics| batch_subscribe_relay_topics(client, topics, metrics))
.buffer_unordered(REQUEST_CONCURRENCY)
.try_collect::<Vec<_>>()
.collect::<Vec<_>>()
.await;
let elapsed: u64 = start.elapsed().as_millis().try_into().unwrap();
if let Err(e) = result {
// An error here is bad, as topics will not have been renewed.
// However, this should be rare and many resubscribes will happen within 30 days so all topics should be renewed eventually.
// With <https://github.com/WalletConnect/notify-server/issues/325> we will be able to guarantee renewal much better.
error!("Failed to renew all topic subscriptions in {elapsed}ms: {e}");
} else {
info!("Success renewing all topic subscriptions in {elapsed}ms");
for result in &result {
if let Err(e) = result {
// An error here is bad, as topics will not have been renewed.
// However, this should be rare and many resubscribes will happen within 30 days so all topics should be renewed eventually.
// With <https://github.com/WalletConnect/notify-server/issues/325> we will be able to guarantee renewal much better.
error!("Failed to renew some topic subscriptions: {e}");
}
}
info!("Completed topic renew job (possibly with errors) in {elapsed}ms");
*renew_all_topics_lock.lock().await = false;

if let Some(metrics) = metrics {
Expand Down

0 comments on commit 4abd2ba

Please sign in to comment.