-
Notifications
You must be signed in to change notification settings - Fork 411
Expand file tree
/
Copy pathStreamUtil.cs
More file actions
54 lines (49 loc) · 2.17 KB
/
StreamUtil.cs
File metadata and controls
54 lines (49 loc) · 2.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
using System;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
namespace Docker.DotNet.Models
{
internal static class StreamUtil
{
internal static async Task MonitorStreamAsync(Task<Stream> streamTask, DockerClient client, CancellationToken cancellationToken, IProgress<string> progress)
{
var tcs = new TaskCompletionSource<string>();
using (var stream = await streamTask)
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)))
{
string line;
while ((line = await await Task.WhenAny(reader.ReadLineAsync(), tcs.Task)) != null)
{
progress.Report(line);
}
}
}
internal static async Task MonitorStreamForMessagesAsync<T>(Task<Stream> streamTask, DockerClient client, CancellationToken cancellationToken, IProgress<T> progress)
{
var tcs = new TaskCompletionSource<bool>();
using (var stream = await streamTask)
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
using (var jsonReader = new JsonTextReader(reader) { SupportMultipleContent = true })
using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)))
{
while (await await Task.WhenAny(jsonReader.ReadAsync(cancellationToken), tcs.Task))
{
var ev = await client.JsonSerializer.Deserialize<T>(jsonReader, cancellationToken);
progress.Report(ev);
}
}
}
internal static async Task MonitorResponseForMessagesAsync<T>(Task<HttpResponseMessage> responseTask, DockerClient client, CancellationToken cancel, IProgress<T> progress)
{
using (var response = await responseTask)
{
await MonitorStreamForMessagesAsync(response.Content.ReadAsStreamAsync(), client, cancel, progress);
}
}
}
}