@@ -103,6 +103,7 @@ public HubLifetime([Range(1, 10)] byte tps, IHubController hubController,
103103 _logger . LogWarning ( "Client already registered, not sure how this happened, probably a bug" ) ;
104104 return null ;
105105 }
106+
106107 _liveControlClients = _liveControlClients . Add ( controller ) ;
107108 }
108109
@@ -180,7 +181,8 @@ private async Task ConsumeDeviceQueue(RedisValue value, CancellationToken cancel
180181 DeviceMessage message ;
181182 try
182183 {
183- message = MessagePackSerializer . Deserialize < DeviceMessage > ( ( ReadOnlyMemory < byte > ) value , cancellationToken : cancellationToken ) ;
184+ message = MessagePackSerializer . Deserialize < DeviceMessage > ( ( ReadOnlyMemory < byte > ) value ,
185+ cancellationToken : cancellationToken ) ;
184186 }
185187 catch ( Exception e )
186188 {
@@ -208,7 +210,8 @@ private async Task DeviceMessage(DeviceMessage message)
208210 await OtaInstall ( version ) ;
209211 break ;
210212 default :
211- _logger . LogError ( "Got DeviceMessage with unknown payload type: {PayloadType}" , message . Payload . GetType ( ) . Name ) ;
213+ _logger . LogError ( "Got DeviceMessage with unknown payload type: {PayloadType}" ,
214+ message . Payload . GetType ( ) . Name ) ;
212215 break ;
213216 }
214217 }
@@ -338,7 +341,7 @@ private async Task Update()
338341 foreach ( var state in _shockerStates . Values )
339342 {
340343 if ( state . ActiveUntil < now || state . ExclusiveUntil >= now ) continue ;
341-
344+
342345 commandList . Add ( new ShockerCommand
343346 {
344347 Model = FbsMapper . ToFbsModelType ( state . Model ) ,
@@ -416,7 +419,7 @@ private static DateTimeOffset CalculateActiveUntil(byte tps) =>
416419 public ValueTask Control ( IReadOnlyList < ShockerControlCommand > commands )
417420 {
418421 var shocksTransformed = new List < ShockerCommand > ( commands . Count ) ;
419-
422+
420423 foreach ( var command in commands )
421424 {
422425 if ( ! _shockerStates . TryGetValue ( command . ShockerId , out var state ) ) continue ;
@@ -469,20 +472,7 @@ public async Task<OneOf<Success, OnlineStateUpdated>> Online(Guid device, SelfOn
469472 var online = await deviceOnline . FindByIdAsync ( deviceId ) ;
470473 if ( online is null )
471474 {
472- await deviceOnline . InsertAsync ( new DeviceOnline
473- {
474- Id = device ,
475- Owner = data . Owner ,
476- FirmwareVersion = data . FirmwareVersion ,
477- Gateway = data . Gateway ,
478- ConnectedAt = data . ConnectedAt ,
479- UserAgent = data . UserAgent ,
480- BootedAt = data . BootedAt ,
481- LatencyMs = data . LatencyMs ,
482- Rssi = data . Rssi ,
483- } , Duration . DeviceKeepAliveTimeout ) ;
484-
485-
475+ await InsertNewDeviceOnline ( ) ;
486476 await _redisPubService . SendDeviceOnlineStatus ( device , true ) ;
487477 return new Success ( ) ;
488478 }
@@ -508,7 +498,20 @@ await deviceOnline.InsertAsync(new DeviceOnline
508498 sendOnlineStatusUpdate = true ;
509499 }
510500
511- await deviceOnline . UpdateAsync ( online , Duration . DeviceKeepAliveTimeout ) ;
501+ try
502+ {
503+ // This can fail, when the key TTL expires while we are updating it, we should catch and try to insert a new one
504+ await deviceOnline . UpdateAsync ( online , Duration . DeviceKeepAliveTimeout ) ;
505+ }
506+ catch ( Exception e )
507+ {
508+ _logger . LogDebug ( e , "Failed to update device online status for device [{DeviceId}], trying to insert new" ,
509+ device ) ;
510+ // If this fails, then whatever honestly, we could disconnect the client here
511+ // but I dont think this is relevant
512+ await InsertNewDeviceOnline ( ) ;
513+ sendOnlineStatusUpdate = true ; // In case of there was already a offline message sent to the clients
514+ }
512515
513516 if ( sendOnlineStatusUpdate )
514517 {
@@ -517,6 +520,22 @@ await deviceOnline.InsertAsync(new DeviceOnline
517520 }
518521
519522 return new Success ( ) ;
523+
524+ async Task InsertNewDeviceOnline ( )
525+ {
526+ await deviceOnline . InsertAsync ( new DeviceOnline
527+ {
528+ Id = device ,
529+ Owner = data . Owner ,
530+ FirmwareVersion = data . FirmwareVersion ,
531+ Gateway = data . Gateway ,
532+ ConnectedAt = data . ConnectedAt ,
533+ UserAgent = data . UserAgent ,
534+ BootedAt = data . BootedAt ,
535+ LatencyMs = data . LatencyMs ,
536+ Rssi = data . Rssi ,
537+ } , Duration . DeviceKeepAliveTimeout ) ;
538+ }
520539 }
521540
522541 private bool _disposed ;
@@ -526,7 +545,7 @@ public async ValueTask DisposeAsync()
526545 {
527546 if ( _disposed ) return ;
528547 _disposed = true ;
529-
548+
530549 await _cancellationSource . CancelAsync ( ) ;
531550
532551 if ( _deviceMsgQueue is not null )
0 commit comments