RACSubscriber.m 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. //
  2. // RACSubscriber.m
  3. // ReactiveObjC
  4. //
  5. // Created by Josh Abernathy on 3/1/12.
  6. // Copyright (c) 2012 GitHub, Inc. All rights reserved.
  7. //
  8. #import "RACSubscriber.h"
  9. #import "RACSubscriber+Private.h"
  10. #import <ReactiveObjC/RACEXTScope.h>
  11. #import "RACCompoundDisposable.h"
  12. @interface RACSubscriber ()
  13. // These callbacks should only be accessed while synchronized on self.
  14. @property (nonatomic, copy) void (^next)(id value);
  15. @property (nonatomic, copy) void (^error)(NSError *error);
  16. @property (nonatomic, copy) void (^completed)(void);
  17. @property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;
  18. @end
  19. @implementation RACSubscriber
  20. #pragma mark Lifecycle
  21. + (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
  22. RACSubscriber *subscriber = [[self alloc] init];
  23. subscriber->_next = [next copy];
  24. subscriber->_error = [error copy];
  25. subscriber->_completed = [completed copy];
  26. return subscriber;
  27. }
  28. - (instancetype)init {
  29. self = [super init];
  30. @unsafeify(self);
  31. RACDisposable *selfDisposable = [RACDisposable disposableWithBlock:^{
  32. @strongify(self);
  33. @synchronized (self) {
  34. self.next = nil;
  35. self.error = nil;
  36. self.completed = nil;
  37. }
  38. }];
  39. _disposable = [RACCompoundDisposable compoundDisposable];
  40. [_disposable addDisposable:selfDisposable];
  41. return self;
  42. }
  43. - (void)dealloc {
  44. [self.disposable dispose];
  45. }
  46. #pragma mark RACSubscriber
  47. - (void)sendNext:(id)value {
  48. @synchronized (self) {
  49. void (^nextBlock)(id) = [self.next copy];
  50. if (nextBlock == nil) return;
  51. nextBlock(value);
  52. }
  53. }
  54. - (void)sendError:(NSError *)e {
  55. @synchronized (self) {
  56. void (^errorBlock)(NSError *) = [self.error copy];
  57. [self.disposable dispose];
  58. if (errorBlock == nil) return;
  59. errorBlock(e);
  60. }
  61. }
  62. - (void)sendCompleted {
  63. @synchronized (self) {
  64. void (^completedBlock)(void) = [self.completed copy];
  65. [self.disposable dispose];
  66. if (completedBlock == nil) return;
  67. completedBlock();
  68. }
  69. }
  70. - (void)didSubscribeWithDisposable:(RACCompoundDisposable *)otherDisposable {
  71. if (otherDisposable.disposed) return;
  72. RACCompoundDisposable *selfDisposable = self.disposable;
  73. [selfDisposable addDisposable:otherDisposable];
  74. @unsafeify(otherDisposable);
  75. // If this subscription terminates, purge its disposable to avoid unbounded
  76. // memory growth.
  77. [otherDisposable addDisposable:[RACDisposable disposableWithBlock:^{
  78. @strongify(otherDisposable);
  79. [selfDisposable removeDisposable:otherDisposable];
  80. }]];
  81. }
  82. @end