1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'merge' 3 +/ 4 module rx.algorithm.merge; 5 6 import rx.disposable; 7 import rx.observable; 8 import rx.observer; 9 import rx.util; 10 import std.range : put; 11 12 //#################### 13 // Merge 14 //#################### 15 struct MergeObservable(TObservable1, TObservable2) 16 { 17 import std.traits : CommonType; 18 19 alias ElementType = CommonType!(TObservable1.ElementType, TObservable2.ElementType); 20 21 public: 22 this(TObservable1 o1, TObservable2 o2) 23 { 24 _observable1 = o1; 25 _observable2 = o2; 26 } 27 28 public: 29 auto subscribe(T)(T observer) 30 { 31 static struct MergeObserver 32 { 33 T _observer; 34 shared(AtomicCounter) _counter; 35 Disposable _subscription; 36 37 void put(ElementType obj) 38 { 39 if (_counter.isZero) 40 return; 41 42 .put(_observer, obj); 43 } 44 45 void completed() 46 { 47 auto result = _counter.tryDecrement(); 48 if (result.success && result.count == 0) 49 { 50 static if (hasCompleted!T) 51 { 52 _observer.completed(); 53 } 54 _subscription.dispose(); 55 } 56 } 57 58 void failure(Exception e) 59 { 60 if (_counter.trySetZero()) 61 { 62 static if (hasFailure!T) 63 { 64 _observer.failure(e); 65 } 66 _subscription.dispose(); 67 } 68 } 69 } 70 71 auto subscription = new SingleAssignmentDisposable; 72 auto counter = new shared(AtomicCounter)(2); 73 auto mergeObserver = MergeObserver(observer, counter, subscription); 74 auto d1 = _observable1.doSubscribe(mergeObserver); 75 auto d2 = _observable2.doSubscribe(mergeObserver); 76 subscription.setDisposable(new CompositeDisposable(disposableObject(d1), 77 disposableObject(d2))); 78 return subscription; 79 } 80 81 private: 82 TObservable1 _observable1; 83 TObservable2 _observable2; 84 } 85 86 /// 87 MergeObservable!(T1, T2) merge(T1, T2)(auto ref T1 observable1, auto ref T2 observable2) 88 { 89 return typeof(return)(observable1, observable2); 90 } 91 /// 92 unittest 93 { 94 import rx.subject : SubjectObject; 95 96 auto s1 = new SubjectObject!int; 97 auto s2 = new SubjectObject!short; 98 99 auto merged = s1.merge(s2); 100 101 int count = 0; 102 auto d = merged.doSubscribe((int n) { count++; }); 103 104 assert(count == 0); 105 s1.put(1); 106 assert(count == 1); 107 s2.put(2); 108 assert(count == 2); 109 110 d.dispose(); 111 112 s1.put(10); 113 assert(count == 2); 114 s2.put(100); 115 assert(count == 2); 116 } 117 118 unittest 119 { 120 import rx : SubjectObject, CounterObserver; 121 122 auto s1 = new SubjectObject!int; 123 auto s2 = new SubjectObject!int; 124 125 auto merged = merge(s1, s2); 126 auto observer = new CounterObserver!int; 127 128 auto disposable = merged.doSubscribe(observer); 129 scope (exit) 130 disposable.dispose(); 131 132 s1.put(0); 133 assert(observer.putCount == 1); 134 s2.put(1); 135 assert(observer.putCount == 2); 136 s1.completed(); 137 assert(observer.completedCount == 0); 138 s2.completed(); 139 assert(observer.completedCount == 1); 140 } 141 142 unittest 143 { 144 import rx : SubjectObject, CounterObserver; 145 146 auto source1 = new SubjectObject!int; 147 auto source2 = new SubjectObject!int; 148 auto subject = merge(source1, source2); 149 150 auto counter = new CounterObserver!int; 151 subject.subscribe(counter); 152 153 source1.put(0); 154 assert(counter.putCount == 1); 155 assert(counter.lastValue == 0); 156 source1.completed(); 157 assert(counter.completedCount == 0); 158 159 source2.put(1); 160 assert(counter.putCount == 2); 161 assert(counter.lastValue == 1); 162 163 assert(counter.completedCount == 0); 164 source2.completed(); 165 assert(counter.completedCount == 1); 166 } 167 168 unittest 169 { 170 import rx : SubjectObject, CounterObserver; 171 172 auto s1 = new SubjectObject!int; 173 auto s2 = new SubjectObject!int; 174 175 auto merged = merge(s1, s2); 176 auto observer = new CounterObserver!int; 177 178 auto disposable = merged.doSubscribe(observer); 179 scope (exit) 180 disposable.dispose(); 181 182 s1.put(0); 183 assert(observer.putCount == 1); 184 s2.put(1); 185 assert(observer.putCount == 2); 186 187 auto ex = new Exception("TEST"); 188 s1.failure(ex); 189 assert(observer.failureCount == 1); 190 assert(observer.lastException == ex); 191 192 s2.put(2); 193 assert(observer.putCount == 2); 194 195 s2.completed(); 196 assert(observer.completedCount == 0); 197 } 198 199 unittest 200 { 201 import rx : SubjectObject, CounterObserver; 202 203 auto s1 = new SubjectObject!int; 204 auto s2 = new SubjectObject!int; 205 206 auto merged = merge(s1, s2); 207 auto observer = new CounterObserver!int; 208 209 auto disposable = merged.doSubscribe(observer); 210 211 s1.put(0); 212 s2.put(1); 213 assert(observer.putCount == 2); 214 215 disposable.dispose(); 216 217 s1.put(2); 218 s1.completed(); 219 220 s2.put(3); 221 s2.completed(); 222 // no effect 223 assert(observer.putCount == 2); 224 assert(observer.completedCount == 0); 225 assert(observer.failureCount == 0); 226 } 227 228 unittest 229 { 230 import rx : SubjectObject; 231 232 auto s1 = new SubjectObject!int; 233 auto s2 = new SubjectObject!int; 234 235 int result = -1; 236 auto disposable = merge(s1, s2).doSubscribe((int n) { result = n; }); 237 238 s1.put(0); 239 assert(result == 0); 240 s2.put(1); 241 assert(result == 1); 242 243 s1.failure(null); 244 s2.put(2); 245 assert(result == 1); 246 } 247 248 /// 249 auto merge(TObservable)(auto ref TObservable observable) 250 if (isObservable!TObservable && isObservable!(TObservable.ElementType)) 251 { 252 import rx.subject : SubjectObject; 253 254 static struct MergeObservable_Flat 255 { 256 alias ElementType = TObservable.ElementType.ElementType; 257 258 this(TObservable observable) 259 { 260 _observable = observable; 261 } 262 263 auto subscribe(TObserver)(auto ref TObserver observer) 264 { 265 auto sink = new MergeSink!(TObservable.ElementType, TObserver, ElementType)(observer); 266 sink._upstream = _observable.doSubscribe(sink).disposableObject(); 267 return sink; 268 } 269 270 TObservable _observable; 271 } 272 273 return MergeObservable_Flat(observable); 274 } 275 276 /// 277 unittest 278 { 279 import rx; 280 281 auto outer = new SubjectObject!(Observable!int); 282 283 Observable!int flatten = outer.merge().observableObject!int(); 284 285 int[] xs; 286 auto disposable = flatten.doSubscribe((int n) { xs ~= n; }); 287 scope (exit) 288 disposable.dispose(); 289 290 auto inner1 = new SubjectObject!int; 291 auto inner2 = new SubjectObject!int; 292 293 .put(outer, inner1); 294 .put(inner1, 0); 295 assert(xs == [0]); 296 .put(inner1, 1); 297 assert(xs == [0, 1]); 298 299 .put(outer, inner2); 300 .put(inner1, 2); 301 assert(xs == [0, 1, 2]); 302 .put(inner2, 3); 303 assert(xs == [0, 1, 2, 3]); 304 .put(inner2, 4); 305 assert(xs == [0, 1, 2, 3, 4]); 306 } 307 308 /// 309 unittest 310 { 311 import rx; 312 313 auto outer = new SubjectObject!(Observable!int); 314 315 Observable!int flatten = outer.merge().observableObject!int(); 316 317 auto observer = new CounterObserver!int; 318 auto disposable = flatten.doSubscribe(observer); 319 scope (exit) 320 disposable.dispose(); 321 322 auto inner = new SubjectObject!int; 323 324 .put(outer, inner); 325 .put(inner, 0); 326 327 inner.completed(); 328 assert(observer.completedCount == 0); 329 outer.completed(); 330 assert(observer.completedCount == 1); 331 } 332 333 /// 334 unittest 335 { 336 import rx; 337 338 auto outer = new SubjectObject!(Observable!int); 339 340 Observable!int flatten = outer.merge().observableObject!int(); 341 342 auto observer = new CounterObserver!int; 343 auto disposable = flatten.doSubscribe(observer); 344 scope (exit) 345 disposable.dispose(); 346 347 auto inner = new SubjectObject!int; 348 349 .put(outer, inner); 350 .put(inner, 0); 351 352 outer.failure(new Exception("TEST")); 353 assert(observer.failureCount == 1); 354 .put(inner, 1); 355 import std : format; 356 357 assert(observer.putCount == 1, format!"putCount: %d"(observer.putCount)); 358 } 359 360 /// 361 unittest 362 { 363 import rx.algorithm.groupby : groupBy; 364 import rx.algorithm.map : map; 365 import rx.algorithm.fold : fold; 366 import rx.subject : SubjectObject, CounterObserver; 367 368 auto subject = new SubjectObject!int; 369 auto counted = subject.groupBy!(n => n % 10) 370 .map!(o => o.fold!((a, b) => a + 1)(0)) 371 .merge(); 372 373 auto counter = new CounterObserver!int; 374 375 auto disposable = counted.subscribe(counter); 376 377 subject.put(0); 378 subject.put(0); 379 assert(counter.putCount == 0); 380 subject.completed(); 381 assert(counter.putCount == 1); 382 assert(counter.lastValue == 2); 383 } 384 385 /// 386 unittest 387 { 388 import std.format : format; 389 import rx; 390 391 auto outer = new SubjectObject!(Observable!int); 392 auto inner_pair1 = new SubjectObject!int; 393 auto inner_pair2 = new SubjectObject!int; 394 auto inner_flat1 = new SubjectObject!int; 395 auto inner_flat2 = new SubjectObject!int; 396 397 auto mergePair = merge(inner_pair1, inner_pair2); 398 auto mergeFlat = outer.merge(); 399 400 auto counter1 = new CounterObserver!int; 401 auto counter2 = new CounterObserver!int; 402 403 auto disposable1 = mergePair.doSubscribe(counter1); 404 auto disposable2 = mergeFlat.doSubscribe(counter2); 405 .put(outer, inner_flat1); 406 .put(outer, inner_flat2); 407 408 .put(inner_pair1, 0); 409 .put(inner_flat1, 0); 410 411 .put(inner_pair2, 1); 412 .put(inner_flat2, 1); 413 414 assert(counter1.putCount == counter2.putCount); 415 assert(counter1.lastValue == counter2.lastValue); 416 assert(counter1.completedCount == counter2.completedCount); 417 assert(counter1.failureCount == counter2.failureCount); 418 419 inner_pair1.completed(); 420 inner_flat1.completed(); 421 422 assert(counter1.putCount == counter2.putCount); 423 assert(counter1.lastValue == counter2.lastValue); 424 assert(counter1.completedCount == counter2.completedCount, 425 format!"%d == %d"(counter1.completedCount, counter2.completedCount)); 426 assert(counter1.failureCount == counter2.failureCount); 427 428 .put(inner_pair2, 10); 429 .put(inner_flat2, 10); 430 431 assert(counter1.putCount == counter2.putCount); 432 assert(counter1.lastValue == counter2.lastValue); 433 assert(counter1.completedCount == counter2.completedCount); 434 assert(counter1.failureCount == counter2.failureCount); 435 436 disposable1.dispose(); 437 disposable2.dispose(); 438 439 assert(counter1.putCount == counter2.putCount); 440 assert(counter1.lastValue == counter2.lastValue); 441 assert(counter1.completedCount == counter2.completedCount); 442 assert(counter1.failureCount == counter2.failureCount); 443 444 .put(inner_pair2, 100); 445 .put(inner_flat2, 100); 446 447 assert(counter1.putCount == counter2.putCount); 448 assert(counter1.lastValue == counter2.lastValue); 449 assert(counter1.completedCount == counter2.completedCount); 450 assert(counter1.failureCount == counter2.failureCount); 451 } 452 453 class MergeSink(TObservable, TObserver, E) : Observer!TObservable, Disposable 454 { 455 private TObserver _observer; 456 private Disposable _upstream; 457 private Object _gate; 458 private shared(bool) _disposed; 459 private shared(bool) _isStopped; 460 private CompositeDisposable _group; 461 462 this(TObserver observer) 463 { 464 _observer = observer; 465 _gate = new Object; 466 _group = new CompositeDisposable; 467 } 468 469 void dispose() 470 { 471 import core.atomic : atomicStore; 472 473 atomicStore(_disposed, true); 474 tryDispose(_upstream); 475 _group.dispose(); 476 } 477 478 void put(TObservable obj) 479 { 480 auto inner = new InnerObserver(this); 481 _group.insert(inner); 482 inner._upstream = obj.doSubscribe(inner).disposableObject(); 483 } 484 485 void completed() 486 { 487 import core.atomic; 488 489 atomicStore(_isStopped, true); 490 if (_group.count == 0) 491 { 492 forwardCompleted(); 493 } 494 else 495 { 496 dispose(); 497 } 498 } 499 500 void failure(Exception e) 501 { 502 forwardFailure(e); 503 dispose(); 504 } 505 506 private void forwardPut(E obj) 507 { 508 if (_disposed) 509 return; 510 synchronized (_gate) 511 { 512 .put(_observer, obj); 513 } 514 } 515 516 private void forwardCompleted() 517 { 518 if (_disposed) 519 return; 520 synchronized (_gate) 521 { 522 static if (hasCompleted!TObserver) 523 { 524 _observer.completed(); 525 } 526 tryDispose(_upstream); 527 } 528 } 529 530 private void forwardFailure(Exception e) 531 { 532 if (_disposed) 533 return; 534 synchronized (_gate) 535 { 536 static if (hasFailure!TObserver) 537 { 538 _observer.failure(e); 539 } 540 tryDispose(_upstream); 541 } 542 } 543 544 private static final class InnerObserver : Observer!E, Disposable 545 { 546 private MergeSink _parent; 547 private Disposable _upstream; 548 549 this(MergeSink parent) 550 { 551 assert(parent !is null); 552 553 _parent = parent; 554 } 555 556 void dispose() 557 { 558 tryDispose(_upstream); 559 } 560 561 void put(E obj) 562 { 563 scope (failure) 564 dispose(); 565 _parent.forwardPut(obj); 566 } 567 568 void completed() 569 { 570 scope (exit) 571 dispose(); 572 _parent._group.remove(this); 573 if (_parent._isStopped && _parent._group.count == 0) 574 { 575 _parent.forwardCompleted(); 576 } 577 } 578 579 void failure(Exception e) 580 { 581 scope (exit) 582 dispose(); 583 _parent.forwardFailure(e); 584 } 585 } 586 } 587 588 unittest 589 { 590 import rx; 591 592 auto sub = new SubjectObject!(Observable!int); 593 594 auto counter = new CounterObserver!int; 595 auto sink = new MergeSink!(Observable!int, Observer!int, int)(counter); 596 597 auto d = sub.subscribe(sink.observerObject!(Observable!int)()); 598 sink._upstream = d.disposableObject(); 599 600 auto inner1 = new SubjectObject!int; 601 sub.put(inner1); 602 603 assert(counter.putCount == 0); 604 inner1.put(1); 605 assert(counter.putCount == 1); 606 inner1.put(2); 607 assert(counter.putCount == 2); 608 inner1.put(3); 609 assert(counter.putCount == 3); 610 611 auto inner2 = new SubjectObject!int; 612 sub.put(inner2); 613 614 inner2.put(10); 615 assert(counter.putCount == 4); 616 inner2.put(11); 617 assert(counter.putCount == 5); 618 619 inner1.put(4); 620 assert(counter.putCount == 6); 621 622 inner1.completed(); 623 assert(counter.completedCount == 0); 624 inner2.completed(); 625 assert(counter.completedCount == 0); 626 627 sub.completed(); 628 assert(counter.completedCount == 1); 629 }