From 4abd2bab7c1abbd40b4d9eb05d2513202105fb71 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 5 Mar 2024 10:16:35 -0800 Subject: [PATCH] fix: handle individual topic errors --- src/publish_relay_message.rs | 9 ++++--- .../refresh_topic_subscriptions.rs | 26 +++++++++++-------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/publish_relay_message.rs b/src/publish_relay_message.rs index c4ad570e..f504b773 100644 --- a/src/publish_relay_message.rs +++ b/src/publish_relay_message.rs @@ -171,7 +171,8 @@ pub async fn batch_subscribe_relay_topics( metrics.relay_batch_subscribe_request(start); } // TODO process each error individually - // TODO retry internal failures? + // TODO retry relay internal failures? + // https://github.com/WalletConnect/notify-server/issues/395 match result { Ok(_) => Ok(()), Err(e) => match e { @@ -180,7 +181,7 @@ pub async fn batch_subscribe_relay_topics( )) => { // FIXME figure out how to handle this properly; being unable to subscribe means a broken state // https://walletconnect.slack.com/archives/C058RS0MH38/p1708183383748259 - warn!("Subscriber limit exceeded for topic {topics:?}"); + warn!("Subscriber limit exceeded for topics {topics:?}"); Ok(()) } e => Err(e), @@ -197,7 +198,7 @@ pub async fn batch_subscribe_relay_topics( } if is_permanent { - error!("Permanent error subscribing to topic, took {tries} tries: {e:?}"); + error!("Permanent error batch subscribing to topics, took {tries} tries: {e:?}"); if let Some(metrics) = metrics { // TODO make DRY with end-of-function call @@ -208,7 +209,7 @@ pub async fn batch_subscribe_relay_topics( let retry_in = calculate_retry_in(tries); warn!( - "Temporary error batch subscribing to topic, retrying attempt {tries} in {retry_in:?}: {e:?}" + "Temporary error batch subscribing to topics, retrying attempt {tries} in {retry_in:?}: {e:?}" ); sleep(retry_in).await; } diff --git a/src/services/relay_renewal_job/refresh_topic_subscriptions.rs b/src/services/relay_renewal_job/refresh_topic_subscriptions.rs index 5ed7ed97..8bb8ce9d 100644 --- a/src/services/relay_renewal_job/refresh_topic_subscriptions.rs +++ b/src/services/relay_renewal_job/refresh_topic_subscriptions.rs @@ -5,9 +5,9 @@ use { model::helpers::{get_project_topics, get_subscriber_topics}, publish_relay_message::batch_subscribe_relay_topics, }, - futures_util::{StreamExt, TryStreamExt}, + futures_util::StreamExt, relay_client::http::Client, - relay_rpc::{domain::Topic, rpc::MAX_SUBSCRIPTION_BATCH_SIZE}, + relay_rpc::domain::Topic, sqlx::PgPool, std::{sync::Arc, time::Instant}, tokio::sync::Mutex, @@ -47,7 +47,10 @@ pub async fn run( info!("topics_count: {topics_count}"); let topic_batches = topics - .chunks(MAX_SUBSCRIPTION_BATCH_SIZE) + // Chunk as 1 since we don't yet have the ability to process each topic error individually + // https://github.com/WalletConnect/notify-server/issues/395 + // .chunks(MAX_SUBSCRIPTION_BATCH_SIZE) + .chunks(1) .map(|topics| topics.to_vec()) .collect::>(); @@ -72,17 +75,18 @@ pub async fn run( let result = futures_util::stream::iter(topic_batches) .map(|topics| batch_subscribe_relay_topics(client, topics, metrics)) .buffer_unordered(REQUEST_CONCURRENCY) - .try_collect::>() + .collect::>() .await; let elapsed: u64 = start.elapsed().as_millis().try_into().unwrap(); - if let Err(e) = result { - // An error here is bad, as topics will not have been renewed. - // However, this should be rare and many resubscribes will happen within 30 days so all topics should be renewed eventually. - // With we will be able to guarantee renewal much better. - error!("Failed to renew all topic subscriptions in {elapsed}ms: {e}"); - } else { - info!("Success renewing all topic subscriptions in {elapsed}ms"); + for result in &result { + if let Err(e) = result { + // An error here is bad, as topics will not have been renewed. + // However, this should be rare and many resubscribes will happen within 30 days so all topics should be renewed eventually. + // With we will be able to guarantee renewal much better. + error!("Failed to renew some topic subscriptions: {e}"); + } } + info!("Completed topic renew job (possibly with errors) in {elapsed}ms"); *renew_all_topics_lock.lock().await = false; if let Some(metrics) = metrics {