123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- //
- // RACMulticastConnection.m
- // ReactiveObjC
- //
- // Created by Josh Abernathy on 4/11/12.
- // Copyright (c) 2012 GitHub, Inc. All rights reserved.
- //
- #import "RACMulticastConnection.h"
- #import "RACMulticastConnection+Private.h"
- #import "RACDisposable.h"
- #import "RACSerialDisposable.h"
- #import "RACSubject.h"
- #import <libkern/OSAtomic.h>
- @interface RACMulticastConnection () {
- RACSubject *_signal;
- // When connecting, a caller should attempt to atomically swap the value of this
- // from `0` to `1`.
- //
- // If the swap is successful the caller is resposible for subscribing `_signal`
- // to `sourceSignal` and storing the returned disposable in `serialDisposable`.
- //
- // If the swap is unsuccessful it means that `_sourceSignal` has already been
- // connected and the caller has no action to take.
- int32_t volatile _hasConnected;
- }
- @property (nonatomic, readonly, strong) RACSignal *sourceSignal;
- @property (strong) RACSerialDisposable *serialDisposable;
- @end
- @implementation RACMulticastConnection
- #pragma mark Lifecycle
- - (instancetype)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
- NSCParameterAssert(source != nil);
- NSCParameterAssert(subject != nil);
- self = [super init];
- _sourceSignal = source;
- _serialDisposable = [[RACSerialDisposable alloc] init];
- _signal = subject;
-
- return self;
- }
- #pragma mark Connecting
- - (RACDisposable *)connect {
- BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
- if (shouldConnect) {
- self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
- }
- return self.serialDisposable;
- }
- - (RACSignal *)autoconnect {
- __block volatile int32_t subscriberCount = 0;
- return [[RACSignal
- createSignal:^(id<RACSubscriber> subscriber) {
- OSAtomicIncrement32Barrier(&subscriberCount);
- RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber];
- RACDisposable *connectionDisposable = [self connect];
- return [RACDisposable disposableWithBlock:^{
- [subscriptionDisposable dispose];
- if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) {
- [connectionDisposable dispose];
- }
- }];
- }]
- setNameWithFormat:@"[%@] -autoconnect", self.signal.name];
- }
- @end
|