Skip to content

Commit

Permalink
Added string routing capability and renamed to ([Message|Raw])RoutedB…
Browse files Browse the repository at this point in the history
…y... to alingn with semantic.
  • Loading branch information
lsfera committed Aug 19, 2024
1 parent cebdd98 commit 6cbcf1e
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public static IServiceCollection AddBlumchen<T>(
IWorkerOptionsBuilder MinimalWorkerOptions(IServiceProvider provider, IWorkerOptionsBuilder builder)
=> builder.Subscription(optionsBuilder => consumerFn(provider, optionsBuilder)
.ConnectionString(connectionString)
.DataSource(NpgsqlDataSource.Create(connectionString)));
.DataSource(new NpgsqlDataSourceBuilder(connectionString)
.UseLoggerFactory(provider.GetService<ILoggerFactory>()).Build()));


}
Expand Down
2 changes: 1 addition & 1 deletion src/Blumchen/Ensure.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public bool IsValid(T value, params string[] parameters)
}
}

internal class RawUrnTrait<T,TR>(): Validable<T>(v => v is ICollection { Count: > 0 }, $"`{nameof(RawUrnAttribute)}` missing on `{typeof(TR).Name}` message type");
internal class RawUrnTrait<T,TR>(): Validable<T>(v => v is ICollection { Count: > 0 }, $"`{nameof(RawRoutedByUrnAttribute)}` missing on `{typeof(TR).Name}` message type");
internal class NullTrait<T>(): Validable<T>(v => v is null, $"`{{0}}` method on {nameof(OptionsBuilder)} called more then once");
internal class NotNullTrait<T>(): Validable<T>(v => v is not null, $"`{{0}}` method not called on {nameof(OptionsBuilder)}");
internal class NotEmptyTrait<T>(): Validable<T>(v => v is ICollection { Count: > 0 }, $"No `{{0}}` method called on {nameof(OptionsBuilder)}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,27 @@

namespace Blumchen.Serialization;

public interface IRouted
{
string Route { get; }
}

[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
public class MessageUrnAttribute:
Attribute
public class MessageRoutedByUrnAttribute(string route):
Attribute, IRouted
{
/// <summary>
/// </summary>
/// <param name="urn">The urn value to use for this message type.</param>
public MessageUrnAttribute(string urn)
public string Route { get; } = Format(route);

private static string Format(string urn)
{
ArgumentException.ThrowIfNullOrEmpty(urn, nameof(urn));

if (urn.StartsWith(MessageUrn.Prefix))
throw new ArgumentException($"Value should not contain the default prefix '{MessageUrn.Prefix}'.", nameof(urn));

Urn = FormatUrn(urn);
return FormatUrn(urn).AbsoluteUri;
}

public Uri Urn { get; }

private static Uri FormatUrn(string urn)
{
var fullValue = MessageUrn.Prefix + urn;
Expand All @@ -32,30 +34,44 @@ private static Uri FormatUrn(string urn)
}
}

[AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]
public class RawUrnAttribute(string urn): MessageUrnAttribute(urn);

public static class MessageUrn
internal static class MessageUrn
{
public const string Prefix = "urn:message:";

private static readonly ConcurrentDictionary<Type, ICached> Cache = new();



public static string ForTypeString(Type type) =>
Cache.GetOrAdd(type,t =>
Cache.GetOrAdd(type, t =>
{
var attribute = Attribute.GetCustomAttribute(t, typeof(MessageUrnAttribute)) as MessageUrnAttribute ??
var attribute = Attribute.GetCustomAttribute(t, typeof(MessageRoutedByUrnAttribute)) as MessageRoutedByUrnAttribute ??
throw new NotSupportedException($"Attribute not defined fot type '{type}'");
return new Cached(attribute.Urn, attribute.Urn.ToString());
return new Cached(attribute.Route);
}).UrnString;


private interface ICached
{
Uri Urn { get; }
string UrnString { get; }
}

private record Cached(Uri Urn, string UrnString): ICached;
private record Cached(string UrnString): ICached;
}

[AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]
public class RawRoutedByUrnAttribute(string route): MessageRoutedByUrnAttribute(route);

[AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]
public class RawRoutedByStringAttribute(string name): Attribute, IRouted
{
private static string Format(string name)
{
if(string.IsNullOrWhiteSpace(name))
throw new FormatException($"Invalid {nameof(name)}: {name}.");

return name;
}
public string Route { get; } = Format(name);
}



10 changes: 6 additions & 4 deletions src/Blumchen/Subscriber/OptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,12 @@ private OptionsBuilder ConsumesRaw<T, TU>(IMessageHandler<TU> handler,
IReplicationJsonBMapper dataMapper) where T : class where TU : class
{
var urns = typeof(T)
.GetCustomAttributes(typeof(RawUrnAttribute), false)
.OfType<RawUrnAttribute>()
.Select(attribute => attribute.Urn).ToList();
Ensure.RawUrn<IEnumerable<Uri>,T>(urns, nameof(NamingPolicy));
.GetCustomAttributes(typeof(RawRoutedByUrnAttribute), false)
.Union(typeof(T)
.GetCustomAttributes(typeof(RawRoutedByStringAttribute), false))
.OfType<IRouted>()
.Select(attribute => attribute.Route).ToList();
Ensure.RawUrn<IEnumerable<string>,T>(urns, nameof(NamingPolicy));

var methodInfo = handler
.GetType()
Expand Down
8 changes: 4 additions & 4 deletions src/Publisher/Contracts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,25 @@
namespace Publisher;
public interface IContract{}

[MessageUrn("user-created:v1")]
[MessageRoutedByUrn("user-created:v1")]
internal record UserCreated(
Guid Id,
string Name = "Created"
):IContract;

[MessageUrn("user-deleted:v1")]
[MessageRoutedByUrn("user-deleted:v1")]
internal record UserDeleted(
Guid Id,
string Name = "Deleted"
): IContract;

[MessageUrn("user-modified:v1")]
[MessageRoutedByUrn("user-modified:v1")]
internal record UserModified(
Guid Id,
string Name = "Modified"
): IContract;

[MessageUrn("user-subscribed:v1")]
[MessageRoutedByUrn("user-subscribed:v1")]
internal record UserSubscribed(
Guid Id,
string Name = "Subscribed"
Expand Down
6 changes: 3 additions & 3 deletions src/Subscriber/Contracts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@

namespace Subscriber
{
[MessageUrn("user-created:v1")]
[MessageRoutedByUrn("user-created:v1")]
public record UserCreatedContract(
Guid Id,
string Name
);

[RawUrn("user-deleted:v1")]
[RawRoutedByUrn("user-deleted:v1")]
public class MessageObjects;


[RawUrn("user-modified:v1")]
[RawRoutedByUrn("user-modified:v1")]
internal class MessageString;

[JsonSourceGenerationOptions(WriteIndented = true)]
Expand Down
6 changes: 3 additions & 3 deletions src/SubscriberWorker/Contracts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@

namespace SubscriberWorker
{
[MessageUrn("user-created:v1")]
[MessageRoutedByUrn("user-created:v1")]
public record UserCreatedContract(
Guid Id,
string Name
);

[MessageUrn("user-deleted:v1")]
[MessageRoutedByUrn("user-deleted:v1")]
public record UserDeletedContract(
Guid Id,
string Name
);

[MessageUrn("user-modified:v1")] //subscription ignored
[MessageRoutedByUrn("user-modified:v1")] //subscription ignored
public record UserModifiedContract(
Guid Id,
string Name = "Modified"
Expand Down
4 changes: 2 additions & 2 deletions src/Tests/DatabaseFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Tests;
public abstract class DatabaseFixture(ITestOutputHelper output): IAsyncLifetime
{
protected ITestOutputHelper Output { get; } = output;
protected readonly Func<CancellationTokenSource> TimeoutTokenSource = () => new(Debugger.IsAttached ? TimeSpan.FromHours(1) : TimeSpan.FromSeconds(2));
protected readonly Func<CancellationTokenSource> TimeoutTokenSource = () => new(Debugger.IsAttached ? TimeSpan.FromHours(1) : TimeSpan.FromSeconds(3));
protected class TestMessageHandler<T>(Action<string> log, JsonTypeInfo info): IMessageHandler<T> where T : class
{
public async Task Handle(T value)
Expand All @@ -39,7 +39,7 @@ protected class TestHandler<T>(ILogger<TestHandler<T>> logger): IMessageHandler<
{
public Task Handle(T value)
{
logger.LogTrace(value.ToString());
logger.LogTrace($"Message consumed:{value}");
return Task.CompletedTask;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/Tests/PublisherContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

namespace Tests;

[MessageUrn("user-created:v1")]
[MessageRoutedByUrn("user-created:v1")]
internal record PublisherUserCreated(
Guid Id,
string Name
);

[MessageUrn("user-deleted:v1")]
[MessageRoutedByUrn("user-deleted:v1")]
internal record PublisherUserDeleted(
Guid Id,
string Name
Expand Down
18 changes: 6 additions & 12 deletions src/Tests/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@ namespace Tests;

internal static class ServiceCollectionExtensions
{
internal static IServiceCollection AddXunitLogging(this IServiceCollection services, ITestOutputHelper output) =>
services
.AddLogging(loggingBuilder =>
{
loggingBuilder
.AddFilter("Microsoft", LogLevel.Warning)
.AddFilter("System", LogLevel.Warning)
.AddFilter("Npgsql", LogLevel.Information)
.AddFilter("Blumchen", LogLevel.Trace)
.AddFilter("SubscriberWorker", LogLevel.Debug)
.AddXunit(output);
});
internal static IServiceCollection AddXunitLogging(this IServiceCollection services, ITestOutputHelper output)
=> services.AddLogging(loggingBuilder =>
loggingBuilder
.AddFilter("Tests", LogLevel.Trace)
.AddXunit(output)
);
}
15 changes: 12 additions & 3 deletions src/Tests/ServiceWorker..cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Xunit.Abstractions;
using SubscriberOptionsBuilder = Blumchen.Subscriber.OptionsBuilder;
using PublisherOptionsBuilder = Blumchen.Publisher.OptionsBuilder;
using Microsoft.Extensions.Logging;

namespace Tests
{
Expand Down Expand Up @@ -35,9 +36,7 @@ public async Task ConsumesRawObject() => await Consumes<object>(
var handler = services.GetRequiredService<TestHandler<object>>();
return opts.ConsumesRawObject<DecoratedContract>(handler);
});



[Fact]
public async Task ConsumesJson_without_shared_kernel() => await Consumes<SubscriberUserCreated>(
(services, builder) => builder
Expand All @@ -62,13 +61,23 @@ await Consumes<PublisherUserCreated>(
);
}

[Fact]
public async Task ConsumesRawString_from_FQNNaming()
{
await Consumes<string>(
(services, builder) => builder
.ConsumesRawString<DecoratedContract>(services.GetRequiredService<TestHandler<string>>()
), new FQNNamingPolicy()
);
}

private async Task Consumes<T>(
Func<IServiceProvider,
IConsumes,SubscriberOptionsBuilder> consumesFn,
INamingPolicy? namingPolicy = default
) where T : class
{
var ct = new CancellationTokenSource(TimeSpan.FromSeconds(3));
var ct = TimeoutTokenSource();

var options = await new PublisherOptionsBuilder()
.JsonContext(PublisherContext.Default)
Expand Down
7 changes: 5 additions & 2 deletions src/Tests/SubscriberContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Tests;

[MessageUrn("user-created:v1")]
[MessageRoutedByUrn("user-created:v1")]
internal record SubscriberUserCreated(
Guid Id,
string Name
Expand All @@ -19,5 +19,8 @@ string Name
[JsonSerializable(typeof(SubscriberUserCreated))]
internal partial class SubscriberContext: JsonSerializerContext;

[RawUrn("user-created:v1")]
[RawRoutedByUrn("user-created:v1")]
[RawRoutedByString("Tests.PublisherUserCreated")]
internal class DecoratedContract;


8 changes: 4 additions & 4 deletions src/UnitTests/Contracts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@

namespace UnitTests
{
[MessageUrn("user-created:v1")]
[MessageRoutedByUrn("user-created:v1")]
public record UserCreatedContract(
Guid Id,
string Name
);

[MessageUrn("user-registered:v1")]
[MessageRoutedByUrn("user-registered:v1")]
public record UserRegisteredContract(
Guid Id,
string Name
);

[RawUrn("user-deleted:v1")]
[RawRoutedByUrn("user-deleted:v1")]
public class MessageObjects;


[RawUrn("user-modified:v1")]
[RawRoutedByUrn("user-modified:v1")]
internal class MessageString;

public class InvalidMessage;
Expand Down
4 changes: 2 additions & 2 deletions src/UnitTests/subscriber_options_builder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void with_typed_raw_consumer_of_object_requires_RawUrn_decoration()
var messageHandler = Substitute.For<IMessageHandler<object>>();
var exception = Record.Exception(() => _builder(ValidConnectionString).ConsumesRawObject<InvalidMessage>(messageHandler).Build());
Assert.IsType<ConfigurationException>(exception);
Assert.Equal("`RawUrnAttribute` missing on `InvalidMessage` message type", exception.Message);
Assert.Equal($"`{nameof(RawRoutedByUrnAttribute)}` missing on `InvalidMessage` message type", exception.Message);
}

[Fact]
Expand All @@ -135,7 +135,7 @@ public void with_typed_raw_consumer_of_string_requires_RawUrn_decoration()
var messageHandler = Substitute.For<IMessageHandler<string>>();
var exception = Record.Exception(() => _builder(ValidConnectionString).ConsumesRawString<InvalidMessage>(messageHandler).Build());
Assert.IsType<ConfigurationException>(exception);
Assert.Equal("`RawUrnAttribute` missing on `InvalidMessage` message type", exception.Message);
Assert.Equal($"`{nameof(RawRoutedByUrnAttribute)}` missing on `InvalidMessage` message type", exception.Message);
}

[Fact]
Expand Down

0 comments on commit 6cbcf1e

Please sign in to comment.