| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- //
- // RACReplaySubject.m
- // ReactiveObjC
- //
- // Created by Josh Abernathy on 3/14/12.
- // Copyright (c) 2012 GitHub, Inc. All rights reserved.
- //
- #import "RACReplaySubject.h"
- #import "RACCompoundDisposable.h"
- #import "RACDisposable.h"
- #import "RACScheduler+Private.h"
- #import "RACSubscriber.h"
- #import "RACTuple.h"
- const NSUInteger RACReplaySubjectUnlimitedCapacity = NSUIntegerMax;
- @interface RACReplaySubject ()
- @property (nonatomic, assign, readonly) NSUInteger capacity;
- // These properties should only be modified while synchronized on self.
- @property (nonatomic, strong, readonly) NSMutableArray *valuesReceived;
- @property (nonatomic, assign) BOOL hasCompleted;
- @property (nonatomic, assign) BOOL hasError;
- @property (nonatomic, strong) NSError *error;
- @end
- @implementation RACReplaySubject
- #pragma mark Lifecycle
- + (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity {
- return [(RACReplaySubject *)[self alloc] initWithCapacity:capacity];
- }
- - (instancetype)init {
- return [self initWithCapacity:RACReplaySubjectUnlimitedCapacity];
- }
- - (instancetype)initWithCapacity:(NSUInteger)capacity {
- self = [super init];
-
- _capacity = capacity;
- _valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
-
- return self;
- }
- #pragma mark RACSignal
- - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
- RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
- RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
- @synchronized (self) {
- for (id value in self.valuesReceived) {
- if (compoundDisposable.disposed) return;
- [subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
- }
- if (compoundDisposable.disposed) return;
- if (self.hasCompleted) {
- [subscriber sendCompleted];
- } else if (self.hasError) {
- [subscriber sendError:self.error];
- } else {
- RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
- [compoundDisposable addDisposable:subscriptionDisposable];
- }
- }
- }];
- [compoundDisposable addDisposable:schedulingDisposable];
- return compoundDisposable;
- }
- #pragma mark RACSubscriber
- - (void)sendNext:(id)value {
- @synchronized (self) {
- [self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
-
- if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
- [self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
- }
-
- [super sendNext:value];
- }
- }
- - (void)sendCompleted {
- @synchronized (self) {
- self.hasCompleted = YES;
- [super sendCompleted];
- }
- }
- - (void)sendError:(NSError *)e {
- @synchronized (self) {
- self.hasError = YES;
- self.error = e;
- [super sendError:e];
- }
- }
- @end
|