Conversation
Sources/SignalProducer.swift
Outdated
| let producer = SignalProducer(signal).replayLazily(upTo: Int.max) | ||
|
|
||
| // Start the buffering immediately. | ||
| producer.start() |
Sources/SignalProducer.swift
Outdated
| let key = grouping(value) | ||
| var group: Signal<Value, Error>.Observer? | ||
| groups.modify { | ||
| group = $0[key] |
There was a problem hiding this comment.
[Nit] For multi-line modifications, it'd be better to use named parameter.
Sources/SignalProducer.swift
Outdated
| switch event { | ||
| case let .value(value): | ||
| let key = grouping(value) | ||
| var group: Signal<Value, Error>.Observer? |
There was a problem hiding this comment.
You can dodge the need of an optional by using:
let group: Signal<Value, Error>.Observer = groups.modify { groups inThere was a problem hiding this comment.
nice, didn't know about this!
| } | ||
| } | ||
| group!.send(value: value) | ||
| case let .failed(error): |
There was a problem hiding this comment.
[Nit] Blank line before the pattern.
Sources/SignalProducer.swift
Outdated
| return SignalProducer<(Key, SignalProducer<Value, Error>), Error> { observer, disposable in | ||
| let groups = Atomic<[Key: Signal<Value, Error>.Observer]>([:]) | ||
|
|
||
| self.start { event in |
There was a problem hiding this comment.
The start disposable should be added to the producer disposable.
| /// to the group to which the value belongs to (as determined by the key) | ||
| public func groupBy<Key: Hashable>(_ grouping: @escaping (Value) -> Key) -> SignalProducer<(Key, SignalProducer<Value, Error>), Error> { | ||
| return SignalProducer<(Key, SignalProducer<Value, Error>), Error> { observer, disposable in | ||
| let groups = Atomic<[Key: Signal<Value, Error>.Observer]>([:]) |
There was a problem hiding this comment.
It doesn't seem necessary to specialise it. Is it a workaround to the compiler's weirdness or something?
There was a problem hiding this comment.
I'm afraid I don't know what exactly you mean.
There was a problem hiding this comment.
I mean if you really need to write SignalProducer<(Key, SignalProducer<Value, Error>), Error> when the type parameters can be inferred.
There was a problem hiding this comment.
Oh that line. Yeah that works without explicit annotations 👍
| } | ||
|
|
||
| describe("groupBy") { | ||
| let (signal, observer) = Signal<Int, NoError>.pipe() |
|
|
||
| disposable += producer | ||
| .groupBy { $0 % 2 == 0 } | ||
| .start(Observer(value: { key, group in |
There was a problem hiding this comment.
Seems like start that takes an event action would be nicer here.
| disposable.dispose() | ||
|
|
||
| observer.send(value: 1) | ||
| expect(interrupted) == true |
There was a problem hiding this comment.
These should be checked immediately after the disposal.
| } | ||
| } | ||
|
|
||
| describe("groupBy") { |
There was a problem hiding this comment.
It would be great if there are specialized test cases than one giant one. At least this single test case apparently did not catch the undisposed upstreams, caused by this line.
Say we could use a few test cases on how the inner producers behaves with regard to an outer terminal event, and keep this test case focusing on values.
There was a problem hiding this comment.
yeah right, i've split this up into 2 separated cases and extended them to check the lifetime of the inner producers - now the cases catch the disposal issue you've found!
There was a problem hiding this comment.
Perhaps one per terminal event, hmm?
There was a problem hiding this comment.
Sure I can do that.
Should we check for all events that only this event is sent (e.g. when sending completed, also check that interrupted was not received), or can we trust that when completed is received, interrupted can not be received?
There was a problem hiding this comment.
I think asserting just the specific event is enough. The Signal contract guarantees only one would ever be sent. If multiple terminal events are sent, this would be a fault in the Signal basics and should have been caught somewhere else.
| expect(evens) == [2] | ||
| expect(odds) == [1, 3] | ||
| } | ||
| it("should terminate correctly on disposal") { |
| } else { | ||
| group.startWithValues { odds.append($0)} | ||
| } | ||
| } |
There was a problem hiding this comment.
[Nit] This close bracket could be aligned better.
Sources/SignalProducer.swift
Outdated
| /// - returns: A producer of producers amits one producer for each group and forwards | ||
| /// each value from the original producer to the inner producer corresponding | ||
| /// to the group to which the value belongs to (as determined by the key) | ||
| public func groupBy<Key: Hashable>(_ grouping: @escaping (Value) -> Key) -> SignalProducer<(Key, SignalProducer<Value, Error>), Error> { |
There was a problem hiding this comment.
By the API convention though, it should be group(by:). Would the parameter be better named classifier or something?
There was a problem hiding this comment.
I've tried that for a short moment, but on the call site that would then look like producer.group { ... } instead of producer.groupBy { ... } when using trailing closure syntax.
Not sure if thats a problem though...
There was a problem hiding this comment.
We already have skip(while:) and take(while:) working in this way.
|
@andersio all done 😃 |
| expect(oddCompleted) == true | ||
| } | ||
|
|
||
| it("should terminate correctly receiving an error event") { |
|
@mdiep ping |
|
Oops! Sorry @iv-mexx. I put this back on my todo list and will get to it this week. |
| return group | ||
| } else { | ||
| let (signal, innerObserver) = Signal<Value, Error>.pipe() | ||
| let producer = SignalProducer(signal).replayLazily(upTo: Int.max) |
There was a problem hiding this comment.
Is replaying values an essential part of group(by:)? This is my primary hesitation. In general, we try to discourage the use of replaying (which is why buffer was removed).
(You could just as easily send the Signal and not send values on the Signal until it's been sent—giving observers a chance to observe the signal.)
If you feel that it is, I'd love to see a larger code sample that demonstrates why.
There was a problem hiding this comment.
I think so, otherwise consumers could miss events without realizing it. Hence why I erred on the side of buffering. I could be convinced otherwise, but I'm not sure how to best communicate those semantics at the operator level.
There was a problem hiding this comment.
I think returning Signals instead of SignalProducers communicates the semantics.
But I think seeing an actual example of this operator will make it easier to see whether that's a viable change.
There was a problem hiding this comment.
I've posted an example how we use the groupBy in a project in the Issue: #197 (comment)
There was a problem hiding this comment.
If I'm reading this correctly, RxJS is using Hot Observables for the groups (via refCount)
There was a problem hiding this comment.
As much as I agree with @mdiep about the semantics, in my experience having buffering in this operator (specifically RxJava's Observable#groupBy) has been valuable .
There was a problem hiding this comment.
Perhaps should the buffering be an option?
There was a problem hiding this comment.
(You could just as easily send the Signal and not send values on the Signal until it's been sent—giving observers a chance to observe the signal.)
While this is the ideal case, it seems the buffering is essential, say when using with flatten(.concat), which can delay the establishment of the observation.
Edit: I'd suggest to annotate it with warnings for the potential of indefinite buffering, and perhaps offer a non-buffering variant.
Edit: If we have to find a middle ground, perhaps the emitted inner producers should buffer until & only replay values at the first time it started (future work: a configurable #?). Subsequent starts would be invalid and emit interrupted. This should satisfy the common use cases, e.g. @iv-mexx's example, which do not retain and repeatedly start the inner producers.
Sources/SignalProducer.swift
Outdated
| /// - parameters: | ||
| /// - grouping: a closure that determines the grouping key for a given value | ||
| /// | ||
| /// - returns: A producer of producers amits one producer for each group and forwards |
|
I've been thinking about this quite a bit. The replaying of values makes me pretty uncomfortable, so I've felt a bit stuck and unsure how to resolve this. But after thinking about it, I no longer think that class BluetoothManager {
private let allBeaconsSignal: Signal<CLBeacon, NoError>
private var _beacons: [(Int, Int): (Signal<_, _>, Signal<_, _>.Observer)]
public let beacons: Signal<(CLBeacon, Signal<_, _>), NoError>
private let observer: Signal<(CLBeacon, Signal<_, _>), NoError>.Observer
init() {
// I'm not exactly sure how this should all work. I was working off the given example code,
// which is tightly focused on the groupBy. But this will hopefully communicate the idea,
// which is that the class is responsible for grouping values, notifying interested parties
// that new values have come in and giving them an opportunity to subscribe to events.
let (beacons, observer) = Signal<(CLBeacon, Signal), NoError>.pipe()
allBeaconsSignal.observeValues { beacon in
let identifier = (beacon.major, beacon.minor)
if _beacons[identifier] == nil {
let (signal, observer) = Signal<_, _>.pipe()
self.observer.send(value: (beacon, signal))
}
}
}
}But if the rest of the core team feels that this does make sense to add, they can definitely override me. 😄 |
|
In lieu of the rise of unidirectional data flow in the Swift community, perhaps the class of problems intended for Closing because of inactivity. |
As per #197, this is a Port of
groupByfrom REX.I've only changed it to use
Atomicinstead of theNSRecursiveLock