RACMulticastConnection.m 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. //
  2. // RACMulticastConnection.m
  3. // ReactiveObjC
  4. //
  5. // Created by Josh Abernathy on 4/11/12.
  6. // Copyright (c) 2012 GitHub, Inc. All rights reserved.
  7. //
  8. #import "RACMulticastConnection.h"
  9. #import "RACMulticastConnection+Private.h"
  10. #import "RACDisposable.h"
  11. #import "RACSerialDisposable.h"
  12. #import "RACSubject.h"
  13. #import <libkern/OSAtomic.h>
  14. @interface RACMulticastConnection () {
  15. RACSubject *_signal;
  16. // When connecting, a caller should attempt to atomically swap the value of this
  17. // from `0` to `1`.
  18. //
  19. // If the swap is successful the caller is resposible for subscribing `_signal`
  20. // to `sourceSignal` and storing the returned disposable in `serialDisposable`.
  21. //
  22. // If the swap is unsuccessful it means that `_sourceSignal` has already been
  23. // connected and the caller has no action to take.
  24. int32_t volatile _hasConnected;
  25. }
  26. @property (nonatomic, readonly, strong) RACSignal *sourceSignal;
  27. @property (strong) RACSerialDisposable *serialDisposable;
  28. @end
  29. @implementation RACMulticastConnection
  30. #pragma mark Lifecycle
  31. - (instancetype)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
  32. NSCParameterAssert(source != nil);
  33. NSCParameterAssert(subject != nil);
  34. self = [super init];
  35. _sourceSignal = source;
  36. _serialDisposable = [[RACSerialDisposable alloc] init];
  37. _signal = subject;
  38. return self;
  39. }
  40. #pragma mark Connecting
  41. - (RACDisposable *)connect {
  42. BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
  43. if (shouldConnect) {
  44. self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
  45. }
  46. return self.serialDisposable;
  47. }
  48. - (RACSignal *)autoconnect {
  49. __block volatile int32_t subscriberCount = 0;
  50. return [[RACSignal
  51. createSignal:^(id<RACSubscriber> subscriber) {
  52. OSAtomicIncrement32Barrier(&subscriberCount);
  53. RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber];
  54. RACDisposable *connectionDisposable = [self connect];
  55. return [RACDisposable disposableWithBlock:^{
  56. [subscriptionDisposable dispose];
  57. if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) {
  58. [connectionDisposable dispose];
  59. }
  60. }];
  61. }]
  62. setNameWithFormat:@"[%@] -autoconnect", self.signal.name];
  63. }
  64. @end