RACReplaySubject.m 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. //
  2. // RACReplaySubject.m
  3. // ReactiveObjC
  4. //
  5. // Created by Josh Abernathy on 3/14/12.
  6. // Copyright (c) 2012 GitHub, Inc. All rights reserved.
  7. //
  8. #import "RACReplaySubject.h"
  9. #import "RACCompoundDisposable.h"
  10. #import "RACDisposable.h"
  11. #import "RACScheduler+Private.h"
  12. #import "RACSubscriber.h"
  13. #import "RACTuple.h"
  14. const NSUInteger RACReplaySubjectUnlimitedCapacity = NSUIntegerMax;
  15. @interface RACReplaySubject ()
  16. @property (nonatomic, assign, readonly) NSUInteger capacity;
  17. // These properties should only be modified while synchronized on self.
  18. @property (nonatomic, strong, readonly) NSMutableArray *valuesReceived;
  19. @property (nonatomic, assign) BOOL hasCompleted;
  20. @property (nonatomic, assign) BOOL hasError;
  21. @property (nonatomic, strong) NSError *error;
  22. @end
  23. @implementation RACReplaySubject
  24. #pragma mark Lifecycle
  25. + (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity {
  26. return [(RACReplaySubject *)[self alloc] initWithCapacity:capacity];
  27. }
  28. - (instancetype)init {
  29. return [self initWithCapacity:RACReplaySubjectUnlimitedCapacity];
  30. }
  31. - (instancetype)initWithCapacity:(NSUInteger)capacity {
  32. self = [super init];
  33. _capacity = capacity;
  34. _valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
  35. return self;
  36. }
  37. #pragma mark RACSignal
  38. - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
  39. RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
  40. RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
  41. @synchronized (self) {
  42. for (id value in self.valuesReceived) {
  43. if (compoundDisposable.disposed) return;
  44. [subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
  45. }
  46. if (compoundDisposable.disposed) return;
  47. if (self.hasCompleted) {
  48. [subscriber sendCompleted];
  49. } else if (self.hasError) {
  50. [subscriber sendError:self.error];
  51. } else {
  52. RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
  53. [compoundDisposable addDisposable:subscriptionDisposable];
  54. }
  55. }
  56. }];
  57. [compoundDisposable addDisposable:schedulingDisposable];
  58. return compoundDisposable;
  59. }
  60. #pragma mark RACSubscriber
  61. - (void)sendNext:(id)value {
  62. @synchronized (self) {
  63. [self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
  64. if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
  65. [self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
  66. }
  67. [super sendNext:value];
  68. }
  69. }
  70. - (void)sendCompleted {
  71. @synchronized (self) {
  72. self.hasCompleted = YES;
  73. [super sendCompleted];
  74. }
  75. }
  76. - (void)sendError:(NSError *)e {
  77. @synchronized (self) {
  78. self.hasError = YES;
  79. self.error = e;
  80. [super sendError:e];
  81. }
  82. }
  83. @end