1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'buffer'
3  +/
4 module rx.algorithm.buffer;
5 
6 import rx.disposable;
7 import rx.observable;
8 import rx.observer;
9 
10 import std.range : put;
11 
12 class BufferedObserver(E, TObserver)
13 {
14     this(TObserver observer, Disposable disposable, size_t size)
15     {
16         assert(size > 0);
17 
18         _observer = observer;
19         _disposable = disposable;
20 
21         _gate = new Object;
22         _bufferSize = size;
23         _buffer = new E[](size);
24         _buffer.length = 0;
25     }
26 
27     void put(E obj)
28     {
29         synchronized (_gate)
30         {
31             _buffer ~= obj;
32             if (_buffer.length == _bufferSize)
33             {
34                 .put(_observer, _buffer);
35                 _buffer.length = 0;
36             }
37         }
38     }
39 
40     void completed()
41     {
42         synchronized (_gate)
43         {
44             if (_buffer.length > 0)
45             {
46                 .put(_observer, _buffer);
47                 _buffer.length = 0;
48             }
49         }
50 
51         static if (hasCompleted!TObserver)
52         {
53             _observer.completed();
54         }
55         _disposable.dispose();
56     }
57 
58     static if (hasFailure!TObserver)
59     {
60         void failure(Exception e)
61         {
62             _observer.failure(e);
63             _disposable.dispose();
64         }
65     }
66 
67 private:
68     TObserver _observer;
69     Object _gate;
70     size_t _bufferSize;
71     E[] _buffer;
72     Disposable _disposable;
73 }
74 
75 unittest
76 {
77     alias Bufd = BufferedObserver!(int, Observer!int);
78 
79     size_t putCount, completedCount;
80     auto observer = observerObject!int(makeObserver((int n) { putCount++; }, {
81             completedCount++;
82         }));
83     auto bufd = new Bufd(observer, NopDisposable.instance, 2);
84 
85     bufd.put(1);
86     assert(putCount == 0);
87     bufd.put(1);
88     assert(putCount == 2);
89     bufd.put(1);
90     assert(putCount == 2);
91     assert(completedCount == 0);
92     bufd.completed();
93     assert(putCount == 3);
94     assert(completedCount == 1);
95 }
96 
97 struct BufferedObservable(TObservable)
98 {
99     alias ElementType = TObservable.ElementType[];
100 
101     this(TObservable observable, size_t bufferSize)
102     {
103         _observable = observable;
104         _bufferSize = bufferSize;
105     }
106 
107     auto subscribe(TObserver)(auto ref TObserver observer)
108     {
109         alias ObserverType = BufferedObserver!(TObservable.ElementType, TObserver);
110 
111         auto subscription = new SingleAssignmentDisposable;
112         subscription.setDisposable(disposableObject(_observable.doSubscribe(new ObserverType(observer,
113                 subscription, _bufferSize))));
114         return subscription;
115     }
116 
117 private:
118     TObservable _observable;
119     size_t _bufferSize;
120 }
121 
122 unittest
123 {
124     alias TObservable = BufferedObservable!(Observable!int);
125     static assert(isObservable!(TObservable, int[]));
126 
127     import rx.subject : SubjectObject;
128     import std.array : appender;
129 
130     auto sub = new SubjectObject!int;
131     auto buf = appender!(int[]);
132 
133     auto observable = TObservable(sub, 2);
134     auto d = observable.subscribe(buf);
135 
136     sub.put(0);
137     sub.put(1);
138     assert(buf.data.length == 2);
139     assert(buf.data[0] == 0);
140     assert(buf.data[1] == 1);
141     sub.put(2);
142     assert(buf.data.length == 2);
143     sub.completed();
144     assert(buf.data.length == 3);
145     assert(buf.data[2] == 2);
146 }
147 
148 ///
149 BufferedObservable!(TObservable) buffered(TObservable)(
150         auto ref TObservable observable, size_t bufferSize)
151 {
152     return typeof(return)(observable, bufferSize);
153 }
154 
155 ///
156 unittest
157 {
158     import rx.subject : SubjectObject;
159     import std.array : appender;
160 
161     auto sub = new SubjectObject!int;
162     auto buf = appender!(int[]);
163 
164     auto d = sub.buffered(2).doSubscribe(buf);
165 
166     sub.put(0);
167     sub.put(1);
168     assert(buf.data.length == 2);
169     assert(buf.data[0] == 0);
170     assert(buf.data[1] == 1);
171     sub.put(2);
172     assert(buf.data.length == 2);
173     sub.completed();
174     assert(buf.data.length == 3);
175     assert(buf.data[2] == 2);
176 }
177 
178 ///
179 unittest
180 {
181     import rx.subject : SubjectObject;
182     import std.array : appender;
183     import std.parallelism : taskPool, task;
184 
185     auto sub = new SubjectObject!int;
186     auto buf = appender!(int[]);
187     auto d = sub.buffered(100).doSubscribe(buf);
188 
189     import std.range : iota;
190 
191     auto t1 = task({ .put(sub, iota(100)); });
192     auto t2 = task({ .put(sub, iota(100)); });
193     auto t3 = task({ .put(sub, iota(100)); });
194     taskPool.put(t1);
195     taskPool.put(t2);
196     taskPool.put(t3);
197 
198     t1.workForce;
199     t2.workForce;
200     t3.workForce;
201 
202     sub.completed();
203 
204     assert(buf.data.length == 300);
205 }