Skip to content

Commit

Permalink
Added global message bus client
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanKert committed Nov 18, 2022
1 parent e217262 commit c2d1f57
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 28 deletions.
13 changes: 7 additions & 6 deletions src/fiskaltrust.Launcher/Services/MessageBusPOSWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
using fiskaltrust.ifPOS.v0;
using Serilog;

namespace fiskaltrust.Launcher.Services
{
public class MessageBusPOSWrapper : ifPOS.v1.IPOS
{
private readonly MessageBusClient _client;
private readonly IPOS _queue;
private readonly ifPOS.v1.IPOS _queue;

public MessageBusPOSWrapper(MessageBusClient messageBusService, IPOS queue)
public MessageBusPOSWrapper(MessageBusClient messageBusService, ifPOS.v1.IPOS queue)
{
_client = messageBusService;
_queue = queue;
Expand Down Expand Up @@ -35,7 +36,7 @@ public string Echo(string message)

public async Task<ifPOS.v1.EchoResponse> EchoAsync(ifPOS.v1.EchoRequest message)
{
return await EchoAsync(message);
return await _queue.EchoAsync(message);
}

public string EndEcho(IAsyncResult result)
Expand All @@ -60,19 +61,19 @@ public Stream Journal(long ftJournalType, long from, long to)

public IAsyncEnumerable<ifPOS.v1.JournalResponse> JournalAsync(ifPOS.v1.JournalRequest request)
{
return JournalAsync(request);
return _queue.JournalAsync(request);
}

public ReceiptResponse Sign(ReceiptRequest data)
{
var response = Sign(data);
var response = _queue.Sign(data);
Task.Run(() => _client.PublishSignAsync(data.ftCashBoxID, response.ftQueueID, data, response)).Wait();
return response;
}

public async Task<ifPOS.v1.ReceiptResponse> SignAsync(ifPOS.v1.ReceiptRequest request)
{
var response = await SignAsync(request);
var response = await _queue.SignAsync(request);
await _client.PublishSignAsync(request.ftCashBoxID, response.ftQueueID, request, response);
return response;
}
Expand Down
85 changes: 84 additions & 1 deletion src/fiskaltrust.Launcher/Services/MessageBusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
using MQTTnet.AspNetCore;
using System.Text.Json;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Extensions.WebSocket4Net;
using Microsoft.AspNetCore.Hosting.Server;

namespace fiskaltrust.Launcher.Services
{
public class MessageBusService
public class MessageBusService : IDisposable
{
private readonly LauncherConfiguration _launcherConfiguration;
private readonly ILogger<MessageBusService> _logger;
private IManagedMqttClient _managedMqttClient;

public MessageBusService(ILogger<MessageBusService> logger, LauncherConfiguration launcherConfiguration)
{
Expand All @@ -19,6 +24,8 @@ public MessageBusService(ILogger<MessageBusService> logger, LauncherConfiguratio

public async Task<IHost> StartMQQTServer(CancellationToken token = default)
{
_managedMqttClient = await ConnectGlobalBusAsync();

var builder = Host.CreateDefaultBuilder(Array.Empty<string>());
builder.ConfigureWebHostDefaults(
webBuilder =>
Expand Down Expand Up @@ -74,6 +81,10 @@ public async Task<IHost> StartMQQTServer(CancellationToken token = default)
Console.WriteLine($"Client '{eventArgs.ClientId}' has disconnect. Accepting! ({System.Text.Json.JsonSerializer.Serialize(eventArgs)})");
await Task.CompletedTask;
};
server.InterceptingPublishAsync += async eventArgs =>
{
await _managedMqttClient.EnqueueAsync(eventArgs.ApplicationMessage);
};
});
});
});
Expand All @@ -82,5 +93,77 @@ public async Task<IHost> StartMQQTServer(CancellationToken token = default)
_logger.LogInformation("Started mqqt hosting. http://localhost:5000/mqqt ");
return app;
}

public async Task<IManagedMqttClient> ConnectGlobalBusAsync()
{
var mqttFactory = new MqttFactory().UseWebSocket4Net();
var mqttClient = mqttFactory.CreateManagedMqttClient();

var mqttClientOptions = new MqttClientOptionsBuilder()
.WithClientId(_launcherConfiguration.CashboxId + "-localbus")
.WithWebSocketServer("gateway-sandbox.fiskaltrust.eu:80/mqtt")
.Build();

var managedMqttClientOptions = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithClientOptions(mqttClientOptions)
.Build();

mqttClient.ApplicationMessageReceivedAsync += async e =>
{
Console.WriteLine("Received application message.");
// TODO republish on local messagebus
e.DumpToConsole();
await Task.CompletedTask;
};
mqttClient.ConnectionStateChangedAsync += async e =>
{
Console.WriteLine(JsonSerializer.Serialize(e));
await Task.CompletedTask;
};
mqttClient.ConnectedAsync += async e =>
{
Console.WriteLine("Connected: args {0}", JsonSerializer.Serialize(e));
await Task.CompletedTask;
};
mqttClient.DisconnectedAsync += async e =>
{
Console.WriteLine("Disconnected: {0}", JsonSerializer.Serialize(e));
await Task.CompletedTask;
};
mqttClient.ConnectingFailedAsync += async e =>
{
await Task.CompletedTask;
};
await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build().Topic);
//await mqttClient.SubscribeAsync($"*");
await mqttClient.StartAsync(managedMqttClientOptions);
return mqttClient;
}

public void Dispose()
{
_managedMqttClient.Dispose();
}
}
}



public static class Helpers
{
public static TObject DumpToConsole<TObject>(this TObject @object)
{
var output = "NULL";
if (@object != null)
{
output = JsonSerializer.Serialize(@object, new JsonSerializerOptions
{
WriteIndented = true
});
}

Console.WriteLine($"[{@object?.GetType().Name}]:\r\n{output}");
return @object;
}
}
43 changes: 22 additions & 21 deletions src/fiskaltrust.Launcher/fiskaltrust.Launcher.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,32 @@
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<PublishReadyToRun>true</PublishReadyToRun>
<PublishSingleFile>true</PublishSingleFile>
<!--<PublishSingleFile>true</PublishSingleFile>-->
<!-- <PublishTrimmed>true</PublishTrimmed> -->
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="fiskaltrust.interface" Version="1.3.40" />
<PackageReference Include="fiskaltrust.Middleware.Abstractions" Version="1.3.3" />
<PackageReference Include="fiskaltrust.storage.serialization" Version="2.0.0-alpha1-22020-50210" />
<PackageReference Include="fiskaltrust.Middleware.Interface.Client.Grpc" Version="1.3.40" />
<PackageReference Include="fiskaltrust.Middleware.Interface.Client.Http" Version="1.3.40" />
<PackageReference Include="Serilog.AspNetCore" Version="5.0.0" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="4.2.0" />
<PackageReference Include="Serilog.Formatting.Compact.Reader" Version="1.0.5" />
<PackageReference Include="protobuf-net.Grpc.AspNetCore" Version="1.0.152" />
<PackageReference Include="Grpc.Net.Client" Version="2.47.0" />
<PackageReference Include="System.CommandLine.Hosting" Version="0.3.0-alpha.21216.1" />
<PackageReference Include="McMaster.NETCore.Plugins" Version="1.4.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.WindowsServices" Version="6.0.0" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
<PackageReference Include="SemanticVersioning" Version="2.0.2" />
<PackageReference Include="MQTTnet" Version="4.1.3.436" />
<PackageReference Include="MQTTnet.AspNetCore" Version="4.1.3.436" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.1.3.436" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="fiskaltrust.interface" Version="1.3.40" />
<PackageReference Include="fiskaltrust.Middleware.Abstractions" Version="1.3.3" />
<PackageReference Include="fiskaltrust.storage.serialization" Version="2.0.0-alpha1-22020-50210" />
<PackageReference Include="fiskaltrust.Middleware.Interface.Client.Grpc" Version="1.3.40" />
<PackageReference Include="fiskaltrust.Middleware.Interface.Client.Http" Version="1.3.40" />
<PackageReference Include="Serilog.AspNetCore" Version="5.0.0" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="4.2.0" />
<PackageReference Include="Serilog.Formatting.Compact.Reader" Version="1.0.5" />
<PackageReference Include="protobuf-net.Grpc.AspNetCore" Version="1.0.152" />
<PackageReference Include="Grpc.Net.Client" Version="2.47.0" />
<PackageReference Include="System.CommandLine.Hosting" Version="0.3.0-alpha.21216.1" />
<PackageReference Include="McMaster.NETCore.Plugins" Version="1.4.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.WindowsServices" Version="6.0.0" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
<PackageReference Include="SemanticVersioning" Version="2.0.2" />
<PackageReference Include="MQTTnet" Version="4.1.3.436" />
<PackageReference Include="MQTTnet.AspNetCore" Version="4.1.3.436" />
<PackageReference Include="MQTTnet.Extensions.WebSocket4Net" Version="4.1.3.436" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.1.3.436" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="../fiskaltrust.Launcher.Common/fiskaltrust.Launcher.Common.csproj" PrivateAssets="all" />
Expand Down

0 comments on commit c2d1f57

Please sign in to comment.