Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions LiveControlGateway/Controllers/HubControllerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,13 @@ protected override async Task UnregisterConnection()
/// <summary>
/// Keep the hub online
/// </summary>
protected async Task SelfOnline(ulong uptimeMs, ushort? latency = null, int? rssi = null)
protected async Task<bool> SelfOnline(ulong uptimeMs, ushort? latency = null, int? rssi = null)
{
var bootedAt = GetBootedAtFromUptimeMs(uptimeMs);
if (!bootedAt.HasValue)
{
Logger.LogDebug("Client attempted to abuse reported boot time, uptime indicated that hub [{HubId}] booted prior to 2024", CurrentHub.Id);
await DisposeAsync();
return;
return false;
}

Logger.LogDebug("Received keep alive from hub [{HubId}]", CurrentHub.Id);
Expand All @@ -205,6 +204,8 @@ protected async Task SelfOnline(ulong uptimeMs, ushort? latency = null, int? rss
LatencyMs = latency,
Rssi = rssi
});

return true;
}

/// <inheritdoc />
Expand Down
13 changes: 9 additions & 4 deletions LiveControlGateway/Controllers/HubV1Controller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ ILogger<HubV1Controller> logger
private IUserHub HcOwner => _userHubContext.Clients.User(CurrentHub.Owner.ToString());

/// <inheritdoc />
protected override async Task Handle(HubToGatewayMessage data)
protected override async Task<bool> Handle(HubToGatewayMessage data)
{
if(data.Payload == null) return;
if(!data.Payload.HasValue) return false;
var payload = data.Payload.Value;

await using var scope = ServiceProvider.CreateAsyncScope();
Expand All @@ -67,7 +67,10 @@ protected override async Task Handle(HubToGatewayMessage data)
switch (payload.Kind)
{
case HubToGatewayMessagePayload.ItemKind.KeepAlive:
await SelfOnline(payload.KeepAlive.Uptime);
if (!await SelfOnline(payload.KeepAlive.Uptime))
{
return false;
}
break;

case HubToGatewayMessagePayload.ItemKind.OtaInstallStarted:
Expand Down Expand Up @@ -157,8 +160,10 @@ await otaService.Error(CurrentHub.Id, payload.BootStatus.OtaUpdateId, false,
case HubToGatewayMessagePayload.ItemKind.NONE:
default:
Logger.LogWarning("Payload kind not defined [{Kind}]", payload.Kind);
break;
return false;
}

return true;
}

/// <inheritdoc />
Expand Down
13 changes: 9 additions & 4 deletions LiveControlGateway/Controllers/HubV2Controller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ await QueueMessage(new GatewayToHubMessage
private IUserHub HcOwner => _userHubContext.Clients.User(CurrentHub.Owner.ToString());

/// <inheritdoc />
protected override async Task Handle(HubToGatewayMessage data)
protected override async Task<bool> Handle(HubToGatewayMessage data)
{
var payload = data.Payload;

Expand All @@ -96,12 +96,15 @@ protected override async Task Handle(HubToGatewayMessage data)
if (_pingTimestamp == 0)
{
// TODO: Kick or warn client.
return;
return false;
}

_latencyMs = (ushort)Math.Min(Stopwatch.GetElapsedTime(_pingTimestamp).TotalMilliseconds, ushort.MaxValue); // If someone has a ping higher than 65 seconds, they are messing with us. Cap it to 65 seconds
_pingTimestamp = 0;
await SelfOnline(payload.Pong.Uptime, _latencyMs, payload.Pong.Rssi);
if (!await SelfOnline(payload.Pong.Uptime, _latencyMs, payload.Pong.Rssi))
{
return false;
}
break;

case HubToGatewayMessagePayload.ItemKind.OtaUpdateStarted:
Expand Down Expand Up @@ -190,8 +193,10 @@ await otaService.Error(CurrentHub.Id, payload.BootStatus.OtaUpdateId, false,
case HubToGatewayMessagePayload.ItemKind.NONE:
default:
Logger.LogWarning("Payload kind not defined [{Kind}]", payload.Kind);
break;
return false;
}

return true;
}

/// <inheritdoc />
Expand Down
97 changes: 36 additions & 61 deletions LiveControlGateway/Websocket/FlatbuffersWebsocketBaseController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,85 +43,60 @@ protected override Task SendWebSocketMessage(TOut message, WebSocket websocket,
/// Handle the incoming message
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
protected abstract Task Handle(TIn data);
/// <returns>false if message is invalid</returns>
protected abstract Task<bool> Handle(TIn data);

/// <inheritdoc />
protected override async Task Logic()
{
while (!LinkedToken.IsCancellationRequested)
try
{
try
while (!LinkedToken.IsCancellationRequested)
{
if (WebSocket is null or { State: WebSocketState.Aborted }) return;
var message =
await FlatbufferWebSocketUtils.ReceiveFullMessageAsyncNonAlloc(WebSocket,
_incomingSerializer, LinkedToken);

// All is good, normal message, deserialize and handle
if (message.IsT0)
{
try

bool ok = await message.Match(
Handle,
async _ =>
{
var serverMessage = message.AsT0;
await Handle(serverMessage);
}
catch (Exception e)
await WebSocket.CloseAsync(WebSocketCloseStatus.ProtocolError, "Invalid flatbuffers message", LinkedToken);
return false;
},
async _ =>
{
Logger.LogError(e, "Error while handling device message");
}
}

// Deserialization failed, log and continue
else if (message.IsT1)
{
Logger.LogWarning(message.AsT1.Exception, "Deserialization failed for websocket message");
}
if (WebSocket.State != WebSocketState.Open)
{
Logger.LogTrace("Client sent closure, but connection state is not open");
return false;
}

// Device sent closure, close connection
else if (message.IsT2)
{
if (WebSocket.State != WebSocketState.Open)
{
Logger.LogTrace("Client sent closure, but connection state is not open");
break;
}
await WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Normal close", LinkedToken);

try
{
await WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Normal close",
LinkedToken);
}
catch (OperationCanceledException e)
{
Logger.LogError(e, "Error during close handshake");
}
Logger.LogInformation("Closing websocket connection");
return false;
});

Logger.LogInformation("Closing websocket connection");
break;
}
else
{
throw new NotImplementedException(); // message.T? is not implemented
}
}
catch (OperationCanceledException)
{
Logger.LogInformation("WebSocket connection terminated due to close or shutdown");
break;
}
catch (WebSocketException e)
{
if (e.WebSocketErrorCode != WebSocketError.ConnectionClosedPrematurely)
Logger.LogError(e, "Error in receive loop, websocket exception");
if (!ok) break;
}
catch (Exception ex)
}
catch (OperationCanceledException)
{
Logger.LogInformation("WebSocket connection terminated due to close or shutdown");
}
catch (WebSocketException e)
{
if (e.WebSocketErrorCode != WebSocketError.ConnectionClosedPrematurely)
{
Logger.LogError(ex, "Exception while processing websocket request");
Logger.LogError(e, "Error in receive loop, websocket exception");
}
}

await Close.CancelAsync();
catch (Exception ex)
{
Logger.LogError(ex, "Exception while processing websocket request");
}
}

}