Skip to content

Commit

Permalink
Fix and Improve Streaming Rpc (#47)
Browse files Browse the repository at this point in the history
* 🐛 Fix streaming rpc

* ⚡ Add Reconnect
  • Loading branch information
GabrielePicco authored Nov 12, 2023
1 parent d690879 commit 8ad2c2c
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 176 deletions.
2 changes: 1 addition & 1 deletion SharedBuildProperties.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Product>Solana.Unity</Product>
<Version>2.6.1.2</Version>
<Version>2.6.1.3</Version>
<Copyright>Copyright 2022 &#169; Magicblock Labs</Copyright>
<Authors>Magicblock Labs</Authors>
<PublisherName>Magicblock Labs</PublisherName>
Expand Down
5 changes: 4 additions & 1 deletion src/Solana.Unity.Rpc/Core/Sockets/IWebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ internal interface IWebSocket : IDisposable
Task CloseAsync(CancellationToken cancellationToken);
Task SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken);
Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken);
Task<WebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken);

public abstract event WebSocketMessageEventHandler OnMessage;
public delegate void WebSocketMessageEventHandler(byte[] data);
public event EventHandler<WebSocketState> ConnectionStateChangedEvent;
}
}
86 changes: 13 additions & 73 deletions src/Solana.Unity.Rpc/Core/Sockets/StreamingRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ protected StreamingRpcClient(string url, object logger, IWebSocket socket = defa
_logger = logger;
_sem = new SemaphoreSlim(1, 1);
_connectionStats = new ConnectionStats();
ClientSocket.ConnectionStateChangedEvent += (sender, state) => ConnectionStateChangedEvent?.Invoke(sender, state);
}

/// <summary>
Expand All @@ -70,7 +71,7 @@ public async Task ConnectAsync()
if (ClientSocket.State != WebSocketState.Open)
{
await ClientSocket.ConnectAsync(NodeAddress, CancellationToken.None);
_ = StartListening();
ClientSocket.OnMessage += DispatchMessage;
ConnectionStateChangedEvent?.Invoke(this, State);
}
}
Expand All @@ -80,6 +81,16 @@ public async Task ConnectAsync()
}
}

private void DispatchMessage(byte[] message)
{
HandleNewMessage(new Memory<byte>(message));
_connectionStats.AddReceived((uint)message.Length);
if (ClientSocket.State != WebSocketState.Open && ClientSocket.State != WebSocketState.Connecting)
{
ConnectionStateChangedEvent?.Invoke(this, State);
}
}

/// <inheritdoc cref="IStreamingRpcClient.DisconnectAsync"/>
public async Task DisconnectAsync()
{
Expand All @@ -94,6 +105,7 @@ public async Task DisconnectAsync()
//and will also notify when there is a non-user triggered disconnection event

// handle disconnection cleanup
ClientSocket.OnMessage -= DispatchMessage;
ClientSocket.Dispose();
ClientSocket = new WebSocketWrapper();
CleanupSubscriptions();
Expand All @@ -105,78 +117,6 @@ public async Task DisconnectAsync()
}
}

/// <summary>
/// Starts listeing to new messages.
/// </summary>
/// <returns>Returns the task representing the asynchronous task.</returns>
private async Task StartListening()
{
while (ClientSocket.State is WebSocketState.Open or WebSocketState.Connecting)
{
try
{
await ReadNextMessage();
}
catch (Exception e)
{
if (_logger != null)
{
Console.WriteLine($"Exception trying to read next message: {e.Message}");
}
}
}

if (_logger != null)
{
Console.WriteLine($"Stopped reading messages. ClientSocket.State changed to {ClientSocket.State}");
}
ConnectionStateChangedEvent?.Invoke(this, State);
}

/// <summary>
/// Reads the next message from the socket.
/// </summary>
/// <param name="cancellationToken">The cancelation token.</param>
/// <returns>Returns the task representing the asynchronous task.</returns>
private async Task ReadNextMessage(CancellationToken cancellationToken = default)
{
var buffer = new byte[32768];
Memory<byte> mem = new(buffer);
WebSocketReceiveResult result = await ClientSocket.ReceiveAsync(mem, cancellationToken);
int count = result.Count;

if (result.MessageType == WebSocketMessageType.Close)
{
await ClientSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken);
}
else
{
if (!result.EndOfMessage)
{
MemoryStream ms = new MemoryStream();
ms.Write(mem.Span.ToArray(), 0, mem.Span.Length);


while (!result.EndOfMessage)
{
result = await ClientSocket.ReceiveAsync(mem, cancellationToken).ConfigureAwait(false);

var memSlice = mem.Slice(0, result.Count).Span.ToArray();
ms.Write(memSlice, 0, memSlice.Length);
count += result.Count;
}

mem = new Memory<byte>(ms.ToArray());
}
else
{
mem = mem.Slice(0, count);
}
_connectionStats.AddReceived((uint)count);
HandleNewMessage(mem);
}
}

/// <summary>
/// Handless a new message payload.
/// </summary>
Expand Down
17 changes: 16 additions & 1 deletion src/Solana.Unity.Rpc/Core/Sockets/SubscriptionState.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using Solana.Unity.Rpc.Messages;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -29,6 +30,11 @@ public abstract class SubscriptionState
/// The current state of the subscription.
/// </summary>
public SubscriptionStatus State { get; protected set; }

/// <summary>
/// The JsonRpcRequest for this subscription.
/// </summary>
internal JsonRpcRequest Request;

/// <summary>
/// The last error message.
Expand Down Expand Up @@ -95,6 +101,15 @@ internal void ChangeState(SubscriptionStatus newState, string error = null, stri

/// <inheritdoc cref="Unsubscribe"/>
public async Task UnsubscribeAsync() => await _rpcClient.UnsubscribeAsync(this).ConfigureAwait(false);

/// <summary>
/// Set the request for this subscription.
/// </summary>
/// <param name="request"></param>
public void SetRequest(JsonRpcRequest request)
{
Request = request;
}
}

/// <summary>
Expand Down
63 changes: 42 additions & 21 deletions src/Solana.Unity.Rpc/Core/Sockets/WebSocketWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using NativeWebSocket;
using System;
using System;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -13,8 +12,10 @@ internal class WebSocketWrapper : IWebSocket
private NativeWebSocket.IWebSocket webSocket;

public WebSocketCloseStatus? CloseStatus => WebSocketCloseStatus.NormalClosure;

public string CloseStatusDescription => "Not implemented";

private TaskCompletionSource<bool> _webSocketConnectionTask = new();

public WebSocketState State
{
Expand All @@ -33,37 +34,57 @@ public WebSocketState State
}
}

public Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken)
=> webSocket.Close();
public Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription,
CancellationToken cancellationToken)
{
return webSocket.Close();
}

public Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
{
webSocket = WebSocket.Create(uri.AbsoluteUri);
webSocket.OnOpen += () =>
{
_webSocketConnectionTask.TrySetResult(true);
webSocket.OnMessage += MessageReceived;
ConnectionStateChangedEvent?.Invoke(this, State);
};
webSocket.OnClose += _ =>
{
webSocket.OnMessage -= MessageReceived;
ConnectionStateChangedEvent?.Invoke(this, State);
};
return webSocket.Connect();
}

private void MessageReceived(byte[] message)
{
OnMessage?.Invoke(message);
}

public Task CloseAsync(CancellationToken cancellationToken)
=> webSocket.Close();

public Task<WebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken)
{
TaskCompletionSource<WebSocketReceiveResult> receiveMessageTask = new();
public event IWebSocket.WebSocketMessageEventHandler OnMessage;
public event EventHandler<WebSocketState> ConnectionStateChangedEvent;

void WebSocketOnOnMessage(byte[] bytes)
public Task SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
{
if (webSocket.State == NativeWebSocket.WebSocketState.Connecting)
{
bytes.CopyTo(buffer);
WebSocketReceiveResult webSocketReceiveResult = new(bytes.Length, WebSocketMessageType.Text, true);
MainThreadUtil.Run(() => receiveMessageTask.SetResult(webSocketReceiveResult));
webSocket.OnMessage -= WebSocketOnOnMessage;
Console.WriteLine("Message received");
return _webSocketConnectionTask.Task.ContinueWith(_ =>
{
if (webSocket.State != NativeWebSocket.WebSocketState.Open)
{
throw new WebSocketException(WebSocketError.InvalidState, "WebSocket is not connected.");
}
return webSocket.Send(buffer.ToArray());
}, cancellationToken).Unwrap();
}
if (webSocket.State != NativeWebSocket.WebSocketState.Open)
{
throw new WebSocketException(WebSocketError.InvalidState, "WebSocket is not connected.");
}
webSocket.OnMessage += WebSocketOnOnMessage;
return receiveMessageTask.Task;
}

public Task SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage,
CancellationToken cancellationToken)
{
return webSocket.Send(buffer.ToArray());
}

Expand Down
Loading

0 comments on commit 8ad2c2c

Please sign in to comment.