1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'groupBy' 3 +/ 4 module rx.algorithm.groupby; 5 6 import rx.disposable; 7 import rx.observable; 8 import rx.observer; 9 import rx.subject; 10 import rx.util; 11 12 import std.functional : unaryFun; 13 import std.range : put; 14 15 //#################### 16 // GroupBy 17 //#################### 18 /// 19 interface GroupedObservable(TKey, E) : Observable!E 20 { 21 TKey key() const pure nothrow @safe @nogc @property; 22 } 23 24 private class GroupedObservableObject(TKey, E) : GroupedObservable!(TKey, E) 25 { 26 alias ElementType = E; 27 28 public: 29 this(TKey key, Observable!E subject, RefCountDisposable cancel = null) 30 { 31 _key = key; 32 _subject = subject; 33 _cancel = cancel; 34 } 35 36 public: 37 TKey key() const pure nothrow @safe @nogc @property 38 { 39 return _key; 40 } 41 42 public: 43 Disposable subscribe(Observer!E observer) 44 { 45 if (_cancel is null) 46 { 47 return _subject.subscribe(observer); 48 } 49 else 50 { 51 auto canceler = _cancel.getDisposable(); 52 auto subscription = _subject.subscribe(observer); 53 return new CompositeDisposable(canceler, subscription); 54 } 55 } 56 57 private: 58 TKey _key; 59 Observable!E _subject; 60 RefCountDisposable _cancel; 61 } 62 63 private class GroupByObserver(alias selector, TObserver, E) 64 { 65 public: 66 alias TKey = typeof({ return unaryFun!(selector)(E.init); }()); 67 68 public: 69 this(TObserver observer, Disposable disposable, RefCountDisposable refCountedDisposable) 70 { 71 _observer = observer; 72 _disposable = disposable; 73 _refCountDisposable = refCountedDisposable; 74 } 75 76 public: 77 void put(E obj) 78 { 79 alias keySelector = unaryFun!selector; 80 81 Subject!E writer; 82 83 TKey key; 84 bool fireNewEntry = false; 85 86 try 87 { 88 key = keySelector(obj); 89 90 static if (__traits(compiles, { TKey unused = null; })) 91 { 92 if (key is null) 93 { 94 if (_null is null) 95 { 96 _null = new SubjectObject!E; 97 fireNewEntry = true; 98 } 99 writer = _null; 100 } 101 else 102 { 103 if (key in _map) 104 { 105 writer = _map[key]; 106 } 107 else 108 { 109 _map[key] = writer = new SubjectObject!E; 110 fireNewEntry = true; 111 } 112 } 113 } 114 else 115 { 116 if (key in _map) 117 { 118 writer = _map[key]; 119 } 120 else 121 { 122 _map[key] = writer = new SubjectObject!E; 123 fireNewEntry = true; 124 } 125 } 126 } 127 catch (Exception e) 128 { 129 failure(e); 130 } 131 132 if (fireNewEntry) 133 { 134 auto group = new GroupedObservableObject!(TKey, E)(key, writer, _refCountDisposable); 135 .put(_observer, group); 136 } 137 138 .put(writer, obj); 139 } 140 141 void completed() 142 { 143 static if (__traits(compiles, { TKey unused = null; })) 144 { 145 if (_null !is null) 146 { 147 _null.completed(); 148 } 149 } 150 foreach (sink; _map.values) 151 { 152 sink.completed(); 153 } 154 static if (hasCompleted!TObserver) 155 { 156 _observer.completed(); 157 } 158 _disposable.dispose(); 159 } 160 161 void failure(Exception e) 162 { 163 static if (__traits(compiles, { TKey unused = null; })) 164 { 165 if (_null !is null) 166 { 167 _null.failure(e); 168 } 169 } 170 foreach (sink; _map.values) 171 { 172 sink.failure(e); 173 } 174 static if (hasFailure!TObserver) 175 { 176 _observer.failure(e); 177 } 178 _disposable.dispose(); 179 } 180 181 private: 182 TObserver _observer; 183 Disposable _disposable; 184 Subject!E[TKey] _map; 185 static if (__traits(compiles, { TKey unused = null; })) 186 { 187 Subject!E _null; 188 } 189 RefCountDisposable _refCountDisposable; 190 } 191 192 unittest 193 { 194 alias TObserver = GroupByObserver!(n => n % 10, Observer!(GroupedObservable!(int, int)), int); 195 196 auto observer = new CounterObserver!(GroupedObservable!(int, int)); 197 auto refCount = new RefCountDisposable(NopDisposable.instance); 198 auto group = new TObserver(observer, NopDisposable.instance, refCount); 199 200 assert(observer.putCount == 0); 201 group.put(0); 202 assert(observer.putCount == 1); 203 assert(observer.lastValue.key == 0); 204 group.put(0); 205 assert(observer.putCount == 1); 206 assert(observer.lastValue.key == 0); 207 208 group.put(1); 209 assert(observer.putCount == 2); 210 assert(observer.lastValue.key == 1); 211 group.put(11); 212 assert(observer.putCount == 2); 213 assert(observer.lastValue.key == 1); 214 215 group.put(3); 216 assert(observer.putCount == 3); 217 assert(observer.lastValue.key == 3); 218 } 219 220 unittest 221 { 222 alias TObserver = GroupByObserver!(n => n % 2 == 0, 223 Observer!(GroupedObservable!(bool, int)), int); 224 225 import std.typecons : Tuple, tuple; 226 import rx.algorithm.map : map; 227 228 auto tester = new CounterObserver!(Tuple!(bool, int)); 229 auto observer = observerObject!(GroupedObservable!(bool, int))( 230 (GroupedObservable!(bool, int) observable) { 231 observable.map!(n => tuple(observable.key, n)).doSubscribe(tester); 232 }); 233 234 auto refCount = new RefCountDisposable(NopDisposable.instance); 235 236 auto group = new TObserver(observer, NopDisposable.instance, refCount); 237 238 group.put(0); 239 assert(tester.putCount == 1); 240 assert(tester.lastValue == tuple(true, 0)); 241 group.put(1); 242 assert(tester.putCount == 2); 243 assert(tester.lastValue == tuple(false, 1)); 244 group.put(3); 245 assert(tester.putCount == 3); 246 assert(tester.lastValue == tuple(false, 3)); 247 } 248 249 unittest 250 { 251 alias TObserver = GroupByObserver!(n => n % 2 == 0, 252 Observer!(GroupedObservable!(bool, int)), int); 253 254 auto tester = new CounterObserver!int; 255 auto observer = observerObject!(GroupedObservable!(bool, int))( 256 (GroupedObservable!(bool, int) observable) { 257 observable.doSubscribe(tester); 258 }); 259 260 auto refCount = new RefCountDisposable(NopDisposable.instance); 261 262 auto group = new TObserver(observer, NopDisposable.instance, refCount); 263 264 assert(tester.putCount == 0); 265 assert(tester.completedCount == 0); 266 assert(tester.failureCount == 0); 267 268 group.put(0); 269 270 assert(tester.putCount == 1); 271 assert(tester.completedCount == 0); 272 assert(tester.failureCount == 0); 273 274 group.completed(); 275 276 assert(tester.putCount == 1); 277 assert(tester.completedCount == 1); 278 assert(tester.failureCount == 0); 279 } 280 281 private struct GroupByObservable(alias selector, TObservable) 282 { 283 static assert(isObservable!TObservable); 284 285 alias TKey = typeof({ 286 return unaryFun!(selector)(TObservable.ElementType.init); 287 }()); 288 alias ElementType = GroupedObservable!(TKey, TObservable.ElementType); 289 290 public: 291 this(TObservable observable) 292 { 293 _observable = observable; 294 } 295 296 public: 297 Disposable subscribe(TObserver)(TObserver observer) 298 { 299 auto result = new SingleAssignmentDisposable; 300 auto refCountDisposable = new RefCountDisposable(result); 301 302 alias ObserverType = GroupByObserver!(selector, TObserver, TObservable.ElementType); 303 304 auto subscription = _observable.doSubscribe(new ObserverType(observer, 305 result, refCountDisposable)); 306 result.setDisposable(disposableObject(subscription)); 307 return result; 308 } 309 310 private: 311 TObservable _observable; 312 } 313 314 unittest 315 { 316 alias TObservable = GroupByObservable!(n => n % 10, Observable!int); 317 static assert(is(TObservable.TKey == int)); 318 static assert(is(TObservable.ElementType == GroupedObservable!(int, int))); 319 320 auto subject = new SubjectObject!int; 321 auto group = TObservable(subject); 322 323 auto observer = new CounterObserver!(GroupedObservable!(int, int)); 324 auto disposable = group.subscribe(observer); 325 326 subject.put(0); 327 assert(observer.putCount == 1); 328 subject.put(0); 329 assert(observer.putCount == 1); 330 subject.put(10); 331 assert(observer.putCount == 1); 332 333 subject.put(11); 334 assert(observer.putCount == 2); 335 336 subject.put(12); 337 assert(observer.putCount == 3); 338 339 subject.put(102); 340 assert(observer.putCount == 3); 341 } 342 343 /// 344 template groupBy(alias selector) 345 { 346 GroupByObservable!(selector, TObservable) groupBy(TObservable)(auto ref TObservable observable) 347 { 348 static assert(isObservable!TObservable); 349 350 return typeof(return)(observable); 351 } 352 } 353 354 /// 355 unittest 356 { 357 auto sub = new SubjectObject!int; 358 359 auto group = sub.groupBy!(n => n % 10); 360 361 auto tester = new CounterObserver!(typeof(group).ElementType); 362 auto disposable = group.subscribe(tester); 363 364 sub.put(0); 365 assert(tester.putCount == 1); 366 assert(tester.lastValue.key == 0); 367 368 sub.put(10); 369 assert(tester.putCount == 1); 370 } 371 372 /// 373 unittest 374 { 375 auto sub = new SubjectObject!string; 376 377 auto group = sub.groupBy!(text => text); 378 379 auto tester = new CounterObserver!(typeof(group).ElementType); 380 auto disposable = group.subscribe(tester); 381 382 sub.put("A"); 383 assert(tester.putCount == 1); 384 assert(tester.lastValue.key == "A"); 385 386 sub.put("B"); 387 assert(tester.putCount == 2); 388 assert(tester.lastValue.key == "B"); 389 390 sub.put("XXX"); 391 assert(tester.putCount == 3); 392 assert(tester.lastValue.key == "XXX"); 393 }