-
Notifications
You must be signed in to change notification settings - Fork 56
Expand file tree
/
Copy pathEntityConversions.cs
More file actions
443 lines (400 loc) · 16.5 KB
/
EntityConversions.cs
File metadata and controls
443 lines (400 loc) · 16.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Entities;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Microsoft.DurableTask;
using Newtonsoft.Json;
using DTCore = DurableTask.Core;
using P = Microsoft.DurableTask.Protobuf;
namespace Microsoft.DurableTask;
/// <summary>
/// Utilities for converting between representations of entity history events. The older backends represent entity
/// messages as external events, using a JSON encoding defined in DT Core.
/// Starting with the DTS backend, we use explicit, separate
/// protobuf encodings for all entity-related history events.
/// </summary>
static class EntityConversions
{
// we copied the relevant data members from DT.Core to allow us to convert between the data structures
// used in the backend, and the legacy encoding of entities within orchestration histories.
// The point of this class is to reverse a Newtonsoft serialization that happened in prior DT code.
// To do this reliably we use the same Newtonsoft.
// This is not introducing a new dependency, and should be eliminated once the original dependency is eliminated.
static readonly JsonSerializerSettings ConversionSettings = new JsonSerializerSettings()
{
TypeNameHandling = TypeNameHandling.None, // the type names do not match, so we have to disable them
};
/// <summary>
/// Encodes an operation signal.
/// </summary>
/// <param name="protoEvent">The proto event.</param>
/// <returns>The core event.</returns>
public static DTCore.History.HistoryEvent EncodeOperationSignaled(P.HistoryEvent protoEvent)
{
P.EntityOperationSignaledEvent signaledEvent = protoEvent.EntityOperationSignaled;
DateTime? scheduledTime = signaledEvent.ScheduledTime?.ToDateTime();
string name = EncodeEventName(scheduledTime);
string input = JsonConvert.SerializeObject(
new RequestMessage()
{
Operation = signaledEvent.Operation,
IsSignal = true,
Input = signaledEvent.Input,
Id = signaledEvent.RequestId,
ScheduledTime = scheduledTime,
},
ConversionSettings);
string? target = signaledEvent.TargetInstanceId
?? throw new InvalidOperationException("missing target instance id");
return CreateEventRaisedOrSentEvent(protoEvent.EventId, name, input, target);
}
/// <summary>
/// Encodes an operation call.
/// </summary>
/// <param name="protoEvent">The proto event.</param>
/// <param name="instance">The orchestration instance.</param>
/// <returns>The core event.</returns>
public static DTCore.History.HistoryEvent EncodeOperationCalled(
P.HistoryEvent protoEvent,
OrchestrationInstance? instance)
{
P.EntityOperationCalledEvent calledEvent = protoEvent.EntityOperationCalled;
DateTime? scheduledTime = calledEvent.ScheduledTime?.ToDateTime();
string name = EncodeEventName(scheduledTime);
string input = JsonConvert.SerializeObject(
new RequestMessage()
{
Operation = calledEvent.Operation,
IsSignal = false,
Input = calledEvent.Input,
Id = calledEvent.RequestId,
ScheduledTime = scheduledTime,
ParentInstanceId = instance?.InstanceId
?? throw new InvalidOperationException("missing instance id"),
ParentExecutionId = instance?.ExecutionId,
},
ConversionSettings);
string? target = calledEvent.TargetInstanceId
?? throw new InvalidOperationException("missing target instance id");
return CreateEventRaisedOrSentEvent(protoEvent.EventId, name, input, target);
}
/// <summary>
/// Encodes an operation lock.
/// </summary>
/// <param name="protoEvent">The proto event.</param>
/// <param name="instance">The orchestration instance.</param>
/// <returns>The core event.</returns>
public static DTCore.History.HistoryEvent EncodeLockRequested(
P.HistoryEvent protoEvent,
OrchestrationInstance? instance)
{
P.EntityLockRequestedEvent lockRequestedEvent = protoEvent.EntityLockRequested;
string name = EncodeEventName(null);
string input = JsonConvert.SerializeObject(
new RequestMessage()
{
Operation = null,
Id = lockRequestedEvent.CriticalSectionId,
LockSet = lockRequestedEvent.LockSet.Select(s => EntityId.FromString(s)).ToArray(),
Position = lockRequestedEvent.Position,
ParentInstanceId = instance?.InstanceId
?? throw new InvalidOperationException("missing instance id"),
},
ConversionSettings);
string? target = lockRequestedEvent.LockSet[lockRequestedEvent.Position];
return CreateEventRaisedOrSentEvent(protoEvent.EventId, name, input, target);
}
/// <summary>
/// Encodes an unlock message.
/// </summary>
/// <param name="protoEvent">The proto event.</param>
/// <param name="instance">The orchestration instance.</param>
/// <returns>The core event.</returns>
public static DTCore.History.HistoryEvent EncodeUnlockSent(
P.HistoryEvent protoEvent,
OrchestrationInstance? instance)
{
P.EntityUnlockSentEvent unlockSentEvent = protoEvent.EntityUnlockSent;
string name = "release";
string input = JsonConvert.SerializeObject(
new ReleaseMessage()
{
Id = unlockSentEvent.CriticalSectionId,
ParentInstanceId = instance?.InstanceId
?? throw new InvalidOperationException("missing instance id"),
},
ConversionSettings);
string? target = unlockSentEvent.TargetInstanceId
?? throw new InvalidOperationException("missing target instance id");
return CreateEventRaisedOrSentEvent(protoEvent.EventId, name, input, target);
}
/// <summary>
/// Encodes a lock grant.
/// </summary>
/// <param name="protoEvent">The proto event.</param>
/// <returns>The core event.</returns>
public static DTCore.History.EventRaisedEvent EncodeLockGranted(P.HistoryEvent protoEvent)
{
P.EntityLockGrantedEvent grantEvent = protoEvent.EntityLockGranted;
return new DTCore.History.EventRaisedEvent(
protoEvent.EventId,
JsonConvert.SerializeObject(
new ResponseMessage()
{
Result = ResponseMessage.LockAcquisitionCompletion,
},
ConversionSettings))
{
Name = grantEvent.CriticalSectionId,
};
}
/// <summary>
/// Encodes an operation completion message.
/// </summary>
/// <param name="protoEvent">The proto event.</param>
/// <returns>The core event.</returns>
public static DTCore.History.EventRaisedEvent EncodeOperationCompleted(P.HistoryEvent protoEvent)
{
P.EntityOperationCompletedEvent completedEvent = protoEvent.EntityOperationCompleted;
return new DTCore.History.EventRaisedEvent(
protoEvent.EventId,
JsonConvert.SerializeObject(
new ResponseMessage()
{
Result = completedEvent.Output,
},
ConversionSettings))
{
Name = completedEvent.RequestId,
};
}
/// <summary>
/// Encodes an operation failed message.
/// </summary>
/// <param name="protoEvent">The proto event.</param>
/// <returns>The core event.</returns>
public static DTCore.History.EventRaisedEvent EncodeOperationFailed(P.HistoryEvent protoEvent)
{
P.EntityOperationFailedEvent failedEvent = protoEvent.EntityOperationFailed;
return new DTCore.History.EventRaisedEvent(
protoEvent.EventId,
JsonConvert.SerializeObject(
new ResponseMessage()
{
ErrorMessage = failedEvent.FailureDetails.ErrorType,
Result = failedEvent.FailureDetails.ErrorMessage,
FailureDetails = failedEvent.FailureDetails.ToCore(),
},
ConversionSettings))
{
Name = failedEvent.RequestId,
};
}
/// <summary>
/// Decodes an orchestration action that sends an entity message from an external event.
/// </summary>
/// <param name="name">The name of the external event.</param>
/// <param name="input">The input of the external event.</param>
/// <param name="target">The target of the external event.</param>
/// <param name="sendAction">The protobuf send action which should be assigned the correct action.</param>
/// <param name="requestId">The request action.</param>
internal static void DecodeEntityMessageAction(
string name,
string input,
string? target,
P.SendEntityMessageAction sendAction,
out string requestId)
{
RequestMessage? message = JsonConvert.DeserializeObject<RequestMessage>(
input,
ConversionSettings);
if (message == null)
{
throw new InvalidOperationException("Cannot convert null event");
}
if (message.Id == null)
{
throw new InvalidOperationException("missing ID");
}
if (name.StartsWith("op", System.StringComparison.Ordinal))
{
if (message.Operation == null)
{
// this is a lock request
sendAction.EntityLockRequested = new P.EntityLockRequestedEvent
{
CriticalSectionId = requestId = message.Id,
LockSet = { message.LockSet!.Select(e => e.ToString()) },
ParentInstanceId = message.ParentInstanceId,
Position = message.Position,
};
}
else
{
// this is an operation call or signal
Timestamp? scheduledTime = null;
if (name.Length >= 3 && name[2] == '@' && DateTime.TryParse(name[3..], out DateTime time))
{
scheduledTime = Timestamp.FromDateTime(time.ToUniversalTime());
}
if (message.IsSignal)
{
sendAction.EntityOperationSignaled = new P.EntityOperationSignaledEvent
{
RequestId = requestId = message.Id,
Input = message.Input,
Operation = message.Operation,
ScheduledTime = scheduledTime,
TargetInstanceId = target,
};
}
else
{
sendAction.EntityOperationCalled = new P.EntityOperationCalledEvent
{
RequestId = requestId = message.Id,
Input = message.Input,
Operation = message.Operation,
ScheduledTime = scheduledTime,
ParentInstanceId = message.ParentInstanceId,
TargetInstanceId = target,
};
}
}
}
else if (name == "release")
{
sendAction.EntityUnlockSent = new P.EntityUnlockSentEvent
{
CriticalSectionId = requestId = message.Id,
TargetInstanceId = target,
ParentInstanceId = message.ParentInstanceId,
};
}
else
{
throw new InvalidOperationException($"Cannot convert event with name {name}");
}
}
static string EncodeEventName(DateTime? scheduledTime)
=> scheduledTime.HasValue ? $"op@{scheduledTime.Value:o}" : "op";
static DTCore.History.HistoryEvent CreateEventRaisedOrSentEvent(
int eventId,
string name,
string input,
string? target)
{
if (target == null)
{
// the event is used inside a message, so it does not include a target
return new DTCore.History.EventRaisedEvent(eventId, input)
{
Name = name,
};
}
else
{
// the event is used inside a history, so it includes a target instance
return new DTCore.History.EventSentEvent(eventId)
{
Name = name,
Input = input,
InstanceId = target,
};
}
}
/// <summary>
/// Copied from DT Core for serialization/deserialization.
/// Modified so that only the relevant data is serialized/deserialized.
/// </summary>
[DataContract]
class RequestMessage
{
/// <summary>
/// Gets or sets the name of the operation being called (if this is an operation message) or <c>null</c>
/// (if this is a lock request).
/// </summary>
[DataMember(Name = "op")]
public string? Operation { get; set; }
/// <summary>
/// Gets or sets a value indicating whether or not this is a one-way message.
/// </summary>
[DataMember(Name = "signal", EmitDefaultValue = false)]
public bool IsSignal { get; set; }
/// <summary>
/// Gets or sets the operation input.
/// </summary>
[DataMember(Name = "input", EmitDefaultValue = false)]
public string? Input { get; set; }
/// <summary>
/// Gets or sets a unique identifier for this operation. The original data type is GUID but since
/// we use just strings in the backend, we may as well parse this as a string.
/// </summary>
[DataMember(Name = "id", IsRequired = true)]
public string? Id { get; set; }
/// <summary>
/// Gets or sets the parent instance that called this operation.
/// </summary>
[DataMember(Name = "parent", EmitDefaultValue = false)]
public string? ParentInstanceId { get; set; }
/// <summary>
/// Gets or sets the parent instance that called this operation.
/// </summary>
[DataMember(Name = "parentExecution", EmitDefaultValue = false)]
public string? ParentExecutionId { get; set; }
/// <summary>
/// Gets or sets an optional avlue, a scheduled time at which to start the operation.
/// </summary>
[DataMember(Name = "due", EmitDefaultValue = false)]
public DateTime? ScheduledTime { get; set; }
/// <summary>
/// Gets or sets the lock set, the set of locks being acquired. Is sorted,
/// contains at least one element, and has no repetitions.
/// </summary>
[DataMember(Name = "lockset", EmitDefaultValue = false)]
public DTCore.Entities.EntityId[]? LockSet { get; set; }
/// <summary>
/// Gets or sets the message number For lock requests involving multiple locks.
/// </summary>
[DataMember(Name = "pos", EmitDefaultValue = false)]
public int Position { get; set; }
}
/// <summary>
/// Copied from DT Core for serialization/deserialization.
/// Modified so that only the relevant data is serialized/deserialized.
/// </summary>
[DataContract]
class ResponseMessage
{
public const string LockAcquisitionCompletion = "Lock Acquisition Completed";
[DataMember(Name = "result")]
public string? Result { get; set; }
[DataMember(Name = "exceptionType", EmitDefaultValue = false)]
public string? ErrorMessage { get; set; }
[DataMember(Name = "failureDetails", EmitDefaultValue = false)]
public FailureDetails? FailureDetails { get; set; }
[IgnoreDataMember]
public bool IsErrorResult => this.ErrorMessage != null;
}
/// <summary>
/// Copied from DT Core for serialization/deserialization.
/// Modified so that only the relevant data is serialized/deserialized.
/// </summary>
[DataContract]
class ReleaseMessage
{
[DataMember(Name = "parent")]
public string? ParentInstanceId { get; set; }
[DataMember(Name = "id")]
public string? Id { get; set; }
}
}