1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'groupBy' 3 +/ 4 module rx.algorithm.groupby; 5 6 import rx.disposable; 7 import rx.observable; 8 import rx.observer; 9 import rx.subject; 10 import rx.util; 11 12 import std.functional : unaryFun; 13 import std.range : put; 14 15 //#################### 16 // GroupBy 17 //#################### 18 /// 19 interface GroupedObservable(TKey, E) : Observable!E 20 { 21 TKey key() const pure nothrow @safe @nogc @property; 22 } 23 24 private class GroupedObservableObject(TKey, E) : GroupedObservable!(TKey, E) 25 { 26 alias ElementType = E; 27 28 public: 29 this(TKey key, Observable!E subject, RefCountDisposable cancel = null) 30 { 31 _key = key; 32 _subject = subject; 33 _cancel = cancel; 34 } 35 36 public: 37 TKey key() const pure nothrow @safe @nogc @property 38 { 39 return _key; 40 } 41 42 public: 43 Disposable subscribe(Observer!E observer) 44 { 45 if (_cancel is null) 46 { 47 return _subject.subscribe(observer); 48 } 49 else 50 { 51 auto canceler = _cancel.getDisposable(); 52 auto subscription = _subject.subscribe(observer); 53 return new CompositeDisposable(canceler, subscription); 54 } 55 } 56 57 private: 58 TKey _key; 59 Observable!E _subject; 60 RefCountDisposable _cancel; 61 } 62 63 private class GroupByObserver(alias selector, TObserver, E) 64 { 65 public: 66 alias TKey = typeof({ return unaryFun!(selector)(E.init); }()); 67 68 public: 69 this(TObserver observer, Disposable disposable, RefCountDisposable refCountedDisposable) 70 { 71 _observer = observer; 72 _disposable = disposable; 73 _refCountDisposable = refCountedDisposable; 74 } 75 76 public: 77 void put(E obj) 78 { 79 alias keySelector = unaryFun!selector; 80 81 Subject!E writer; 82 83 TKey key; 84 bool fireNewEntry = false; 85 86 try 87 { 88 key = keySelector(obj); 89 90 static if (__traits(compiles, { TKey unused = null; })) 91 { 92 if (key is null) 93 { 94 if (_null is null) 95 { 96 _null = new SubjectObject!E; 97 fireNewEntry = true; 98 } 99 writer = _null; 100 } 101 else 102 { 103 if (key in _map) 104 { 105 writer = _map[key]; 106 } 107 else 108 { 109 _map[key] = writer = new SubjectObject!E; 110 fireNewEntry = true; 111 } 112 } 113 } 114 else 115 { 116 if (key in _map) 117 { 118 writer = _map[key]; 119 } 120 else 121 { 122 _map[key] = writer = new SubjectObject!E; 123 fireNewEntry = true; 124 } 125 } 126 } 127 catch (Exception e) 128 { 129 failure(e); 130 return; 131 } 132 133 if (fireNewEntry) 134 { 135 auto group = new GroupedObservableObject!(TKey, E)(key, writer, _refCountDisposable); 136 .put(_observer, group); 137 } 138 139 .put(writer, obj); 140 } 141 142 void completed() 143 { 144 static if (__traits(compiles, { TKey unused = null; })) 145 { 146 if (_null !is null) 147 { 148 _null.completed(); 149 } 150 } 151 foreach (sink; _map.values) 152 { 153 sink.completed(); 154 } 155 static if (hasCompleted!TObserver) 156 { 157 _observer.completed(); 158 } 159 _disposable.dispose(); 160 } 161 162 void failure(Exception e) 163 { 164 static if (__traits(compiles, { TKey unused = null; })) 165 { 166 if (_null !is null) 167 { 168 _null.failure(e); 169 } 170 } 171 foreach (sink; _map.values) 172 { 173 sink.failure(e); 174 } 175 static if (hasFailure!TObserver) 176 { 177 _observer.failure(e); 178 } 179 _disposable.dispose(); 180 } 181 182 private: 183 TObserver _observer; 184 Disposable _disposable; 185 Subject!E[TKey] _map; 186 static if (__traits(compiles, { TKey unused = null; })) 187 { 188 Subject!E _null; 189 } 190 RefCountDisposable _refCountDisposable; 191 } 192 193 unittest 194 { 195 alias TObserver = GroupByObserver!(n => n % 10, Observer!(GroupedObservable!(int, int)), int); 196 197 auto observer = new CounterObserver!(GroupedObservable!(int, int)); 198 auto refCount = new RefCountDisposable(NopDisposable.instance); 199 auto group = new TObserver(observer, NopDisposable.instance, refCount); 200 201 assert(observer.putCount == 0); 202 group.put(0); 203 assert(observer.putCount == 1); 204 assert(observer.lastValue.key == 0); 205 group.put(0); 206 assert(observer.putCount == 1); 207 assert(observer.lastValue.key == 0); 208 209 group.put(1); 210 assert(observer.putCount == 2); 211 assert(observer.lastValue.key == 1); 212 group.put(11); 213 assert(observer.putCount == 2); 214 assert(observer.lastValue.key == 1); 215 216 group.put(3); 217 assert(observer.putCount == 3); 218 assert(observer.lastValue.key == 3); 219 } 220 221 unittest 222 { 223 alias TObserver = GroupByObserver!(n => n % 2 == 0, 224 Observer!(GroupedObservable!(bool, int)), int); 225 226 import std.typecons : Tuple, tuple; 227 import rx.algorithm.map : map; 228 229 auto tester = new CounterObserver!(Tuple!(bool, int)); 230 auto observer = observerObject!(GroupedObservable!(bool, int))( 231 (GroupedObservable!(bool, int) observable) { 232 observable.map!(n => tuple(observable.key, n)).doSubscribe(tester); 233 }); 234 235 auto refCount = new RefCountDisposable(NopDisposable.instance); 236 237 auto group = new TObserver(observer, NopDisposable.instance, refCount); 238 239 group.put(0); 240 assert(tester.putCount == 1); 241 assert(tester.lastValue == tuple(true, 0)); 242 group.put(1); 243 assert(tester.putCount == 2); 244 assert(tester.lastValue == tuple(false, 1)); 245 group.put(3); 246 assert(tester.putCount == 3); 247 assert(tester.lastValue == tuple(false, 3)); 248 } 249 250 unittest 251 { 252 alias TObserver = GroupByObserver!(n => n % 2 == 0, 253 Observer!(GroupedObservable!(bool, int)), int); 254 255 auto tester = new CounterObserver!int; 256 auto observer = observerObject!(GroupedObservable!(bool, int))( 257 (GroupedObservable!(bool, int) observable) { 258 observable.doSubscribe(tester); 259 }); 260 261 auto refCount = new RefCountDisposable(NopDisposable.instance); 262 263 auto group = new TObserver(observer, NopDisposable.instance, refCount); 264 265 assert(tester.putCount == 0); 266 assert(tester.completedCount == 0); 267 assert(tester.failureCount == 0); 268 269 group.put(0); 270 271 assert(tester.putCount == 1); 272 assert(tester.completedCount == 0); 273 assert(tester.failureCount == 0); 274 275 group.completed(); 276 277 assert(tester.putCount == 1); 278 assert(tester.completedCount == 1); 279 assert(tester.failureCount == 0); 280 } 281 282 private struct GroupByObservable(alias selector, TObservable) 283 { 284 static assert(isObservable!TObservable); 285 286 alias TKey = typeof({ 287 return unaryFun!(selector)(TObservable.ElementType.init); 288 }()); 289 alias ElementType = GroupedObservable!(TKey, TObservable.ElementType); 290 291 public: 292 this(TObservable observable) 293 { 294 _observable = observable; 295 } 296 297 public: 298 Disposable subscribe(TObserver)(TObserver observer) 299 { 300 auto result = new SingleAssignmentDisposable; 301 auto refCountDisposable = new RefCountDisposable(result); 302 303 alias ObserverType = GroupByObserver!(selector, TObserver, TObservable.ElementType); 304 305 auto subscription = _observable.doSubscribe(new ObserverType(observer, 306 result, refCountDisposable)); 307 result.setDisposable(disposableObject(subscription)); 308 return result; 309 } 310 311 private: 312 TObservable _observable; 313 } 314 315 unittest 316 { 317 alias TObservable = GroupByObservable!(n => n % 10, Observable!int); 318 static assert(is(TObservable.TKey == int)); 319 static assert(is(TObservable.ElementType == GroupedObservable!(int, int))); 320 321 auto subject = new SubjectObject!int; 322 auto group = TObservable(subject); 323 324 auto observer = new CounterObserver!(GroupedObservable!(int, int)); 325 auto disposable = group.subscribe(observer); 326 327 subject.put(0); 328 assert(observer.putCount == 1); 329 subject.put(0); 330 assert(observer.putCount == 1); 331 subject.put(10); 332 assert(observer.putCount == 1); 333 334 subject.put(11); 335 assert(observer.putCount == 2); 336 337 subject.put(12); 338 assert(observer.putCount == 3); 339 340 subject.put(102); 341 assert(observer.putCount == 3); 342 } 343 344 /// 345 template groupBy(alias selector) 346 { 347 GroupByObservable!(selector, TObservable) groupBy(TObservable)(auto ref TObservable observable) 348 { 349 static assert(isObservable!TObservable); 350 351 return typeof(return)(observable); 352 } 353 } 354 355 /// 356 unittest 357 { 358 auto sub = new SubjectObject!int; 359 360 auto group = sub.groupBy!(n => n % 10); 361 362 auto tester = new CounterObserver!(typeof(group).ElementType); 363 auto disposable = group.subscribe(tester); 364 365 sub.put(0); 366 assert(tester.putCount == 1); 367 assert(tester.lastValue.key == 0); 368 369 sub.put(10); 370 assert(tester.putCount == 1); 371 } 372 373 /// 374 unittest 375 { 376 auto sub = new SubjectObject!string; 377 378 auto group = sub.groupBy!(text => text); 379 380 auto tester = new CounterObserver!(typeof(group).ElementType); 381 auto disposable = group.subscribe(tester); 382 383 sub.put("A"); 384 assert(tester.putCount == 1); 385 assert(tester.lastValue.key == "A"); 386 387 sub.put("B"); 388 assert(tester.putCount == 2); 389 assert(tester.lastValue.key == "B"); 390 391 sub.put("XXX"); 392 assert(tester.putCount == 3); 393 assert(tester.lastValue.key == "XXX"); 394 } 395 396 unittest 397 { 398 auto sub = new SubjectObject!string; 399 400 string delegate(string _) dg = (test) { throw new Exception(""); }; 401 402 auto group = sub.groupBy!(dg); 403 404 auto tester = new CounterObserver!(typeof(group).ElementType); 405 auto disposable = group.subscribe(tester); 406 407 sub.put("A"); 408 assert(tester.putCount == 0); 409 assert(tester.completedCount == 0); 410 assert(tester.failureCount == 1); 411 } 412 413 unittest 414 { 415 auto sub = new SubjectObject!string; 416 417 auto group = sub.groupBy!(test => null); 418 419 auto tester = new CounterObserver!(typeof(group).ElementType); 420 auto disposable = group.subscribe(tester); 421 422 sub.put("A"); 423 assert(tester.putCount == 1); 424 assert(tester.lastValue.key is null); 425 } 426 427 unittest 428 { 429 auto sub = new SubjectObject!string; 430 431 auto group = sub.groupBy!(test => null); 432 433 auto tester = new CounterObserver!string; 434 auto disposable = group.doSubscribe!((o) { 435 o.doSubscribe(tester); 436 }); 437 438 sub.put("A"); 439 assert(tester.putCount == 1); 440 assert(tester.lastValue == "A"); 441 } 442 443 unittest 444 { 445 import rx; 446 447 auto sub = new SubjectObject!int; 448 449 auto group = sub.groupBy!(i => i % 2 == 0); 450 451 auto evenObserver = new CounterObserver!int; 452 auto oddObserver = new CounterObserver!int; 453 454 auto container = new CompositeDisposable(); 455 auto disposable = group.doSubscribe!((o) { 456 container.insert(o.fold!"a + b"(0).doSubscribe(o.key ? evenObserver : oddObserver)); 457 }); 458 container.insert(disposable); 459 460 scope (exit) 461 container.dispose(); 462 463 sub.put(1); 464 assert(oddObserver.putCount == 0); 465 sub.put(2); 466 assert(evenObserver.putCount == 0); 467 sub.put(3); 468 sub.put(4); 469 sub.completed(); 470 assert(oddObserver.putCount == 1); 471 assert(oddObserver.lastValue == 4); 472 assert(evenObserver.putCount == 1); 473 assert(evenObserver.lastValue == 6); 474 }