Skip to content

Commit 553aefd

Browse files
committed
Split performStream into functions handling stream of futures
1 parent c4b0282 commit 553aefd

9 files changed

Lines changed: 292 additions & 294 deletions

File tree

.travis.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
language: node_js
22
node_js:
33
- "node"
4-
- "8"
54

65
after_success:
7-
- "npm run codecov"
6+
- "npm run codecov"

src/behavior.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@ import { combine, isPlaceholder } from "./index";
33
import { State, Reactive, Time, BListener, Parent, SListener } from "./common";
44
import { Future, BehaviorFuture } from "./future";
55
import * as F from "./future";
6-
import { Stream } from "./stream";
6+
import {
7+
Stream,
8+
FlatFutureOrdered,
9+
FlatFutureLatest,
10+
FlatFuture
11+
} from "./stream";
712
import { tick, getTime } from "./clock";
813
import { sample, Now } from "./now";
914

@@ -732,3 +737,29 @@ export function format(
732737
): Behavior<string> {
733738
return new FormatBehavior(strings, behaviors);
734739
}
740+
741+
export const flatFutureFrom = <A>(
742+
stream: Stream<Future<A>>
743+
): Behavior<Stream<A>> => fromFunction(() => new FlatFuture(stream));
744+
745+
export function flatFuture<A>(stream: Stream<Future<A>>): Now<Stream<A>> {
746+
return sample(flatFutureFrom(stream));
747+
}
748+
749+
export const flatFutureOrderedFrom = <A>(
750+
stream: Stream<Future<A>>
751+
): Behavior<Stream<A>> => fromFunction(() => new FlatFutureOrdered(stream));
752+
753+
export function flatFutureOrdered<A>(
754+
stream: Stream<Future<A>>
755+
): Now<Stream<A>> {
756+
return sample(flatFutureOrderedFrom(stream));
757+
}
758+
759+
export const flatFutureLatestFrom = <A>(
760+
stream: Stream<Future<A>>
761+
): Behavior<Stream<A>> => fromFunction(() => new FlatFutureLatest(stream));
762+
763+
export function flatFutureLatest<A>(stream: Stream<Future<A>>): Now<Stream<A>> {
764+
return sample(flatFutureLatestFrom(stream));
765+
}

src/now.ts

Lines changed: 7 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import { IO, runIO } from "@funkia/io";
22
import { placeholder } from "./placeholder";
3-
import { Time, SListener } from "./common";
4-
import { Future, fromPromise, mapCbFuture } from "./future";
5-
import { Node } from "./datastructures";
3+
import { Time } from "./common";
4+
import { Future, fromPromise, mapCbFuture, sinkFuture } from "./future";
65
import { Behavior } from "./behavior";
7-
import { ActiveStream, Stream, mapCbStream, isStream } from "./stream";
6+
import { Stream, mapCbStream, isStream } from "./stream";
87
import { tick } from "./clock";
98

109
export type MapNowTuple<A> = { [K in keyof A]: Now<A[K]> };
@@ -104,11 +103,11 @@ export function sample<A>(b: Behavior<A>): Now<A> {
104103
}
105104

106105
export class PerformNow<A> extends Now<A> {
107-
constructor(private cb: () => A) {
106+
constructor(private _run: () => A) {
108107
super();
109108
}
110109
run(): A {
111-
return this.cb();
110+
return this._run();
112111
}
113112
}
114113

@@ -124,9 +123,9 @@ export function performIO<A>(comp: IO<A>): Now<Future<A>> {
124123
return perform(() => fromPromise(runIO(comp)));
125124
}
126125

127-
export function performStream<A>(s: Stream<IO<A>>): Now<Stream<A>> {
126+
export function performStream<A>(s: Stream<IO<A>>): Now<Stream<Future<A>>> {
128127
return perform(() =>
129-
mapCbStream<IO<A>, A>((io, cb) => runIO(io).then(cb), s)
128+
mapCbStream<IO<A>, Future<A>>((io, cb) => cb(fromPromise(runIO(io))), s)
130129
);
131130
}
132131

@@ -157,91 +156,6 @@ export function performMap<A, B>(
157156
);
158157
}
159158

160-
class PerformIOLatestStream<A> extends ActiveStream<A>
161-
implements SListener<IO<A>> {
162-
private node: Node<this> = new Node(this);
163-
constructor(s: Stream<IO<A>>) {
164-
super();
165-
s.addListener(this.node, tick());
166-
}
167-
next: number = 0;
168-
newest: number = 0;
169-
running: number = 0;
170-
pushS(_t: number, io: IO<A>): void {
171-
const time = ++this.next;
172-
this.running++;
173-
runIO(io).then((a: A) => {
174-
this.running--;
175-
if (time > this.newest) {
176-
const t = tick();
177-
if (this.running === 0) {
178-
this.next = 0;
179-
this.newest = 0;
180-
} else {
181-
this.newest = time;
182-
}
183-
this.pushSToChildren(t, a);
184-
}
185-
});
186-
}
187-
}
188-
189-
export class PerformStreamLatestNow<A> extends Now<Stream<A>> {
190-
constructor(private s: Stream<IO<A>>) {
191-
super();
192-
}
193-
run(): Stream<A> {
194-
return new PerformIOLatestStream(this.s);
195-
}
196-
}
197-
198-
export function performStreamLatest<A>(s: Stream<IO<A>>): Now<Stream<A>> {
199-
return perform(() => new PerformIOLatestStream(s));
200-
}
201-
202-
class PerformIOStreamOrdered<A> extends ActiveStream<A> {
203-
private node: Node<this> = new Node(this);
204-
constructor(s: Stream<IO<A>>) {
205-
super();
206-
s.addListener(this.node, tick());
207-
}
208-
nextId: number = 0;
209-
next: number = 0;
210-
buffer: { value: A }[] = []; // Object-wrapper to support a result as undefined
211-
pushS(_t: number, io: IO<A>): void {
212-
const id = this.nextId++;
213-
runIO(io).then((a: A) => {
214-
if (id === this.next) {
215-
this.buffer[0] = { value: a };
216-
this.pushFromBuffer();
217-
} else {
218-
this.buffer[id - this.next] = { value: a };
219-
}
220-
});
221-
}
222-
pushFromBuffer(): void {
223-
while (this.buffer[0] !== undefined) {
224-
const t = tick();
225-
const { value } = this.buffer.shift();
226-
this.pushSToChildren(t, value);
227-
this.next++;
228-
}
229-
}
230-
}
231-
232-
export class PerformStreamOrderedNow<A> extends Now<Stream<A>> {
233-
constructor(private s: Stream<IO<A>>) {
234-
super();
235-
}
236-
run(): Stream<A> {
237-
return new PerformIOStreamOrdered(this.s);
238-
}
239-
}
240-
241-
export function performStreamOrdered<A>(s: Stream<IO<A>>): Now<Stream<A>> {
242-
return new PerformStreamOrderedNow(s);
243-
}
244-
245159
export function plan<A>(future: Future<Now<A>>): Now<Future<A>> {
246160
return performMap<Now<A>, A>(runNow, future);
247161
}

src/stream.ts

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ import {
1010
accum
1111
} from "./behavior";
1212
import { tick } from "./clock";
13-
import { Now, sample } from "./now";
13+
import { Now, sample, perform } from "./now";
14+
import { Future } from ".";
1415

1516
/**
1617
* A stream is a list of occurrences over time. Each occurrence
@@ -477,3 +478,70 @@ export function mapCbStream<A, B>(
477478
): Stream<B> {
478479
return new PerformCbStream(cb, stream);
479480
}
481+
482+
export class FlatFuture<A> extends Stream<A> {
483+
constructor(stream: Stream<Future<A>>) {
484+
super();
485+
this.parents = cons(stream);
486+
}
487+
pushS(_t: number, fut: Future<A>): void {
488+
fut.subscribe((a) => this.pushSToChildren(tick(), a));
489+
}
490+
}
491+
492+
export class FlatFutureOrdered<A> extends Stream<A> {
493+
constructor(stream: Stream<Future<A>>) {
494+
super();
495+
this.parents = cons(stream);
496+
}
497+
nextId: number = 0;
498+
next: number = 0;
499+
buffer: { value: A }[] = []; // Object-wrapper to support a result as undefined
500+
pushS(_t: number, fut: Future<A>): void {
501+
const id = this.nextId++;
502+
fut.subscribe((a: A) => {
503+
if (id === this.next) {
504+
this.buffer[0] = { value: a };
505+
this.pushFromBuffer();
506+
} else {
507+
this.buffer[id - this.next] = { value: a };
508+
}
509+
});
510+
}
511+
pushFromBuffer(): void {
512+
while (this.buffer[0] !== undefined) {
513+
const t = tick();
514+
const { value } = this.buffer.shift();
515+
this.pushSToChildren(t, value);
516+
this.next++;
517+
}
518+
}
519+
}
520+
521+
export class FlatFutureLatest<A> extends Stream<A>
522+
implements SListener<Future<A>> {
523+
constructor(stream: Stream<Future<A>>) {
524+
super();
525+
this.parents = cons(stream);
526+
}
527+
next: number = 0;
528+
newest: number = 0;
529+
running: number = 0;
530+
pushS(_t: number, fut: Future<A>): void {
531+
const time = ++this.next;
532+
this.running++;
533+
fut.subscribe((a: A) => {
534+
this.running--;
535+
if (time > this.newest) {
536+
const t = tick();
537+
if (this.running === 0) {
538+
this.next = 0;
539+
this.newest = 0;
540+
} else {
541+
this.newest = time;
542+
}
543+
this.pushSToChildren(t, a);
544+
}
545+
});
546+
}
547+
}

src/testing.ts

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import {
88
ScanStream,
99
CombineStream,
1010
SnapshotStream,
11-
isStream
11+
isStream,
12+
FlatFuture,
13+
FlatFutureOrdered,
14+
FlatFutureLatest
1215
} from "./stream";
1316
import {
1417
Behavior,
@@ -35,8 +38,6 @@ import {
3538
FlatMapNow,
3639
PerformNow,
3740
PerformMapNow,
38-
PerformStreamLatestNow,
39-
PerformStreamOrderedNow,
4041
Now,
4142
MapNow,
4243
InstantNow
@@ -218,6 +219,40 @@ DelayStream.prototype.model = function<A>(this: DelayStream<A>) {
218219
return s.map(({ time, value }) => ({ time: time + this.ms, value }));
219220
};
220221

222+
const flatFuture = <A>(o: Occurrence<Future<A>>) => {
223+
const { time, value } = o.value.model();
224+
return time === "infinity" ? [] : [{ time: Math.max(o.time, time), value }];
225+
};
226+
227+
FlatFuture.prototype.model = function<A>(this: FlatFuture<A>) {
228+
return (this.parents.value as Stream<Future<A>>)
229+
.model()
230+
.flatMap(flatFuture)
231+
.sort((o, p) => o.time - p.time); // FIXME: Should use stable sort here
232+
};
233+
234+
FlatFutureOrdered.prototype.model = function<A>(this: FlatFutureOrdered<A>) {
235+
return (this.parents.value as Stream<Future<A>>)
236+
.model()
237+
.flatMap(flatFuture)
238+
.reduce((acc, o) => {
239+
const last = acc.length === 0 ? -Infinity : acc[acc.length - 1].time;
240+
return acc.concat([{ time: Math.max(last, o.time), value: o.value }]);
241+
}, []);
242+
};
243+
244+
FlatFutureLatest.prototype.model = function<A>(this: FlatFutureLatest<A>) {
245+
return (this.parents.value as Stream<Future<A>>)
246+
.model()
247+
.flatMap(flatFuture)
248+
.reduceRight<Occurrence<A>[]>((acc, o) => {
249+
const last = acc.length === 0 ? Infinity : acc[0].time;
250+
return last < o.time
251+
? acc
252+
: [{ time: o.time, value: o.value }].concat(acc);
253+
}, []);
254+
};
255+
221256
class TestStream<A> extends Stream<A> {
222257
constructor(private streamModel: StreamModel<A>) {
223258
super();
@@ -400,22 +435,6 @@ PerformMapNow.prototype.model = function<A, B>(
400435
return { value, mocks };
401436
};
402437

403-
PerformStreamLatestNow.prototype.model = function<A>(
404-
this: PerformStreamLatestNow<A>,
405-
[value, ...mocks]: any[],
406-
_t: Time
407-
): NowModel<A> {
408-
return { value, mocks };
409-
};
410-
411-
PerformStreamOrderedNow.prototype.model = function<A>(
412-
this: PerformStreamOrderedNow<A>,
413-
[value, ...mocks]: any[],
414-
_t: Time
415-
): NowModel<A> {
416-
return { value, mocks };
417-
};
418-
419438
/**
420439
* Test run a now computation without executing its side-effects.
421440
* @param now The now computation to test.

0 commit comments

Comments
 (0)