Skip to content

Commit

Permalink
Added EnableSubscriptionAutoHeal - see #27
Browse files Browse the repository at this point in the history
  • Loading branch information
lsfera committed Aug 1, 2024
1 parent e56c3c1 commit cb460d1
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 42 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ services:
- "wal_level=logical"
- "-c"
- "wal_compression=on"
- "-c"
- "max_slot_wal_keep_size=1"
pgadmin:
container_name: pgadmin_container
image: dpage/pgadmin4
Expand Down
14 changes: 7 additions & 7 deletions src/Blumchen/DependencyInjection/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ static Action<ILogger, string, object[]> LoggerAction(LogLevel ll, bool enabled)

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await options.ResiliencePipeline.ExecuteAsync(async token =>
await options.OuterPipeline.ExecuteAsync(async token =>
await options.InnerPipeline.ExecuteAsync(async ct =>
{
await using var subscription = new Subscription();
await using var cursor = subscription.Subscribe(options.SubscriberOptions, ct: token)
.GetAsyncEnumerator(token);
Notify(logger, LogLevel.Information,"{WorkerName} started", WorkerName);
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested)
await using var cursor = subscription.Subscribe(options.SubscriberOptions, ct)
.GetAsyncEnumerator(ct);
Notify(logger, LogLevel.Information, "{WorkerName} started", WorkerName);
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested)
Notify(logger, LogLevel.Trace, "{cursor.Current} processed", cursor.Current);

}, stoppingToken).ConfigureAwait(false);
}, token).ConfigureAwait(false), stoppingToken).ConfigureAwait(false);
Notify(logger, LogLevel.Information, "{WorkerName} stopped", WorkerName);
}

Expand Down
42 changes: 36 additions & 6 deletions src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,37 +1,67 @@
using Blumchen.Subscriber;
using Blumchen.Subscriptions.Management;
using Npgsql;
using Npgsql.Replication;
using Polly;

namespace Blumchen.DependencyInjection;

public record WorkerOptions(ResiliencePipeline ResiliencePipeline, ISubscriberOptions SubscriberOptions);
public record WorkerOptions(
ISubscriberOptions SubscriberOptions,
ResiliencePipeline OuterPipeline,
ResiliencePipeline InnerPipeline);

public interface IWorkerOptionsBuilder
{
IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline);
IWorkerOptionsBuilder Subscription(Func<OptionsBuilder, OptionsBuilder>? builder);
WorkerOptions Build();
IWorkerOptionsBuilder EnableSubscriptionAutoHeal();
}

internal sealed class WorkerOptionsBuilder: IWorkerOptionsBuilder
{
private ResiliencePipeline? _resiliencePipeline = default;
private ResiliencePipeline? _outerPipeline = default;
private Func<string, string, ResiliencePipeline>? _innerPipelineFn = default;
private Func<OptionsBuilder, OptionsBuilder>? _builder;

public IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline)
{
_resiliencePipeline = resiliencePipeline;
_outerPipeline = resiliencePipeline;
return this;
}public IWorkerOptionsBuilder Subscription(Func<OptionsBuilder, OptionsBuilder>? builder)
{
_builder = builder;
return this;
return this;
}

public WorkerOptions Build()
{
ArgumentNullException.ThrowIfNull(_resiliencePipeline);
ArgumentNullException.ThrowIfNull(_outerPipeline);
ArgumentNullException.ThrowIfNull(_builder);
return new(_resiliencePipeline, _builder(new OptionsBuilder()).Build());
var subscriberOptions = _builder(new OptionsBuilder()).Build();
return new(subscriberOptions, _outerPipeline,
_innerPipelineFn?.Invoke(subscriberOptions.ReplicationOptions.SlotName,subscriberOptions.ConnectionStringBuilder.ConnectionString) ??
ResiliencePipeline.Empty
);
}

public IWorkerOptionsBuilder EnableSubscriptionAutoHeal()
{
_innerPipelineFn = (replicationSlotName, connectionString) => new ResiliencePipelineBuilder().AddRetry(new()
{
ShouldHandle =
new PredicateBuilder().Handle<PostgresException>(exception =>
exception.SqlState.Equals("55000", StringComparison.OrdinalIgnoreCase)),
MaxRetryAttempts = int.MaxValue,
OnRetry = async args =>
{
await using var conn = new LogicalReplicationConnection(connectionString);
await conn.Open(args.Context.CancellationToken);
await conn.ReCreate(replicationSlotName, args.Context.CancellationToken).ConfigureAwait(false);
},
}).Build();
return this;
}
}

2 changes: 1 addition & 1 deletion src/Blumchen/Subscriber/OptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ internal ISubscriberOptions Build()

if (_typeRegistry.Count > 0)
{
Ensure.NotNull<INamingPolicy?>(_namingPolicy, $"{nameof(NamingPolicy)}");
Ensure.NotNull(_namingPolicy, $"{nameof(NamingPolicy)}");
if (_jsonSerializerContext != null)
{
var typeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy);
Expand Down
56 changes: 30 additions & 26 deletions src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,15 @@ CancellationToken ct
{
(Subscription.CreateStyle.Never,_) => new None(),
(Subscription.CreateStyle.WhenNotExists,true) => new AlreadyExists(),
(Subscription.CreateStyle.WhenNotExists,false) => await Create(connection, slotName, ct).ConfigureAwait(false),
(Subscription.CreateStyle.AlwaysRecreate,true) => await ReCreate(connection, slotName, ct).ConfigureAwait(false),
(Subscription.CreateStyle.AlwaysRecreate, false) => await Create(connection, slotName, ct).ConfigureAwait(false),
(Subscription.CreateStyle.WhenNotExists,false) => await connection.Create(slotName, ct).ConfigureAwait(false),
(Subscription.CreateStyle.AlwaysRecreate,true) => await connection.ReCreate(slotName, ct).ConfigureAwait(false),
(Subscription.CreateStyle.AlwaysRecreate, false) => await connection.Create(slotName, ct).ConfigureAwait(false),

_ => throw new ArgumentOutOfRangeException(nameof(options.CreateStyle))
};

static async Task<CreateReplicationSlotResult> ReCreate(
LogicalReplicationConnection connection,
string slotName,
CancellationToken ct)
{
await connection.DropReplicationSlot(slotName, true, ct).ConfigureAwait(false);
return await Create(connection, slotName, ct).ConfigureAwait(false);
}

static async Task<CreateReplicationSlotResult> Create(
LogicalReplicationConnection connection,
string slotName,
CancellationToken ct)
{
var result = await connection.CreatePgOutputReplicationSlot(
slotName,
slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export,
cancellationToken: ct
).ConfigureAwait(false);

return new Created(result.SnapshotName!, result.ConsistentPoint);
}
}

public record ReplicationSlotOptions(
string SlotName = $"{TableDescriptorBuilder.MessageTable.DefaultName}_slot",
Subscription.CreateStyle CreateStyle = Subscription.CreateStyle.WhenNotExists,
Expand All @@ -74,3 +52,29 @@ public record AlreadyExists: CreateReplicationSlotResult;
public record Created(string SnapshotName, NpgsqlLogSequenceNumber LogSequenceNumber): CreateReplicationSlotResult;
}
}

public static class LogicalReplicationConnectionExtensions
{
internal static async Task<ReplicationSlotManagement.CreateReplicationSlotResult> Create(
this LogicalReplicationConnection connection,
string slotName,
CancellationToken ct)
{
var result = await connection.CreatePgOutputReplicationSlot(
slotName,
slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export,
cancellationToken: ct
).ConfigureAwait(false);

return new Created(result.SnapshotName!, result.ConsistentPoint);
}

public static async Task<ReplicationSlotManagement.CreateReplicationSlotResult> ReCreate(
this LogicalReplicationConnection connection,
string slotName,
CancellationToken ct)
{
await connection.DropReplicationSlot(slotName, true, ct).ConfigureAwait(false);
return await connection.Create(slotName, ct).ConfigureAwait(false);
}
}
2 changes: 1 addition & 1 deletion src/Blumchen/Subscriptions/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ internal async IAsyncEnumerable<IEnvelope> Subscribe(
await dataSource.SetupPublication(publicationSetupOptions, ct).ConfigureAwait(false);
var result = await dataSource.SetupReplicationSlot(_connection, replicationSlotSetupOptions, ct)
.ConfigureAwait(false);
IReplicationDataMapper replicationDataMapper = new ReplicationDataMapper(registry);
var replicationDataMapper = new ReplicationDataMapper(registry);
PgOutputReplicationSlot slot;

if (result is not Created created)
Expand Down
2 changes: 1 addition & 1 deletion src/Subscriber/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
await using var cursor1 = cursor.ConfigureAwait(false);
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested)
if(logger.IsEnabled(LogLevel.Trace))
logger.LogTrace($"{cursor.Current} processed");
logger.LogTrace("{message} processed", cursor.Current);
}
catch (Exception e)
{
Expand Down
2 changes: 2 additions & 0 deletions src/SubscriberWorker/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
)
.ResiliencyPipeline(
provider.GetRequiredService<ResiliencePipelineProvider<string>>().GetPipeline("default"))
.EnableSubscriptionAutoHeal()
)
.AddBlumchen<HandleImpl2>((provider, workerOptions) =>
workerOptions
Expand All @@ -94,6 +95,7 @@
))
.ResiliencyPipeline(
provider.GetRequiredService<ResiliencePipelineProvider<string>>().GetPipeline("default"))
.EnableSubscriptionAutoHeal()
);

await builder
Expand Down

0 comments on commit cb460d1

Please sign in to comment.