1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'uniq' 3 + 4 + This is also called 'distinctUntilChanged'. 5 +/ 6 module rx.algorithm.uniq; 7 8 import rx.disposable; 9 import rx.observable; 10 import rx.observer; 11 import rx.util; 12 13 import std.functional : binaryFun; 14 import std.range : put; 15 16 struct UniqObserver(TObserver, E, alias pred = "a == b") 17 { 18 mixin SimpleObserverImpl!(TObserver, E); 19 20 public: 21 this(ref TObserver observer) 22 { 23 _observer = observer; 24 _hasValue = false; 25 } 26 27 static if (hasCompleted!TObserver || hasFailure!TObserver) 28 { 29 this(ref TObserver observer, Disposable disposable) 30 { 31 _observer = observer; 32 _disposable = disposable; 33 } 34 } 35 36 private: 37 void putImpl(E obj) 38 { 39 alias fun = binaryFun!pred; 40 41 if (_hasValue) 42 { 43 if (!fun(_current, obj)) 44 { 45 _current = obj; 46 .put(_observer, obj); 47 } 48 } 49 else 50 { 51 _current = obj; 52 _hasValue = true; 53 .put(_observer, obj); 54 } 55 } 56 57 private: 58 bool _hasValue; 59 E _current; 60 } 61 62 @safe unittest 63 { 64 import std.array : appender; 65 66 auto buf = appender!(int[]); 67 68 auto observer = UniqObserver!(typeof(buf), int)(buf); 69 70 .put(observer, [1, 1, 2, 3]); 71 72 import std.algorithm : equal; 73 74 assert(equal(buf.data, [1, 2, 3])); 75 } 76 77 @safe unittest 78 { 79 struct Person 80 { 81 string name; 82 int age; 83 } 84 85 import std.array : appender; 86 87 auto buf = appender!(Person[]); 88 89 auto observer = UniqObserver!(typeof(buf), Person, "a.name == b.name")(buf); 90 91 .put(observer, Person("Smith", 20)); 92 .put(observer, Person("Smith", 30)); 93 .put(observer, Person("Johnson", 40)); 94 .put(observer, Person("Johnson", 50)); 95 96 import std.algorithm : equal; 97 98 auto data = buf.data; 99 assert(data.length == 2); 100 assert(data[0].name == "Smith"); 101 assert(data[0].age == 20); 102 assert(data[1].name == "Johnson"); 103 assert(data[1].age == 40); 104 } 105 106 struct UniqObservable(TObservable, alias pred = "a == b") 107 { 108 alias ElementType = TObservable.ElementType; 109 110 public: 111 this(ref TObservable observable) 112 { 113 _observable = observable; 114 } 115 116 public: 117 auto subscribe(TObserver)(auto ref TObserver observer) 118 { 119 alias ObserverType = UniqObserver!(TObserver, ElementType, pred); 120 121 static if (hasCompleted!TObserver || hasFailure!TObserver) 122 { 123 auto disposable = new SingleAssignmentDisposable; 124 disposable.setDisposable(disposableObject(doSubscribe(_observable, 125 ObserverType(observer, disposable)))); 126 return disposable; 127 } 128 else 129 { 130 return doSubscribe(_observable, ObserverType(observer)); 131 } 132 } 133 134 private: 135 TObservable _observable; 136 } 137 138 @system unittest 139 { 140 import rx.subject : SubjectObject; 141 142 auto sub = new SubjectObject!int; 143 144 auto observable = UniqObservable!(typeof(sub))(sub); 145 146 import std.array : appender; 147 148 auto buf = appender!(int[]); 149 150 auto disposable = observable.subscribe(buf); 151 scope (exit) 152 disposable.dispose(); 153 154 .put(sub, 10); 155 .put(sub, 10); 156 .put(sub, 20); 157 .put(sub, 30); 158 159 auto data = buf.data; 160 assert(data.length == 3); 161 assert(data[0] == 10); 162 assert(data[1] == 20); 163 assert(data[2] == 30); 164 } 165 166 unittest 167 { 168 struct Point 169 { 170 int x; 171 int y; 172 } 173 174 import rx.subject : SubjectObject; 175 176 auto sub = new SubjectObject!Point; 177 178 auto observable = UniqObservable!(typeof(sub), "a.x == b.x")(sub); 179 180 import std.array : appender; 181 182 auto buf = appender!(Point[]); 183 184 auto disposable = observable.subscribe(buf); 185 scope (exit) 186 disposable.dispose(); 187 188 .put(sub, Point(0, 0)); 189 .put(sub, Point(0, 10)); 190 .put(sub, Point(10, 10)); 191 .put(sub, Point(10, 20)); 192 193 auto data = buf.data; 194 assert(data.length == 2); 195 assert(data[0] == Point(0, 0)); 196 assert(data[1] == Point(10, 10)); 197 } 198 199 /// 200 template uniq(alias pred = "a == b") 201 { 202 UniqObservable!(TObservable, pred) uniq(TObservable)(auto ref TObservable observable) 203 { 204 return typeof(return)(observable); 205 } 206 } 207 208 ///ditto 209 alias distinctUntilChanged = uniq; 210 211 ///ditto 212 unittest 213 { 214 import rx.subject : SubjectObject; 215 import std.array : appender; 216 217 auto sub = new SubjectObject!int; 218 auto buf = appender!(int[]); 219 220 auto disposable = sub.uniq.subscribe(buf); 221 scope (exit) 222 disposable.dispose(); 223 224 .put(sub, [11, 11, 22, 22, 33]); 225 226 auto data = buf.data; 227 assert(data.length == 3); 228 assert(data[0] == 11); 229 assert(data[1] == 22); 230 assert(data[2] == 33); 231 } 232 233 ///ditto 234 @system unittest 235 { 236 import std.datetime : Date; 237 import rx.subject : SubjectObject; 238 import std.array : appender; 239 240 auto sub = new SubjectObject!Date; 241 auto buf = appender!(Date[]); 242 243 auto disposable = sub.uniq!"a.year == b.year".subscribe(buf); 244 scope (exit) 245 disposable.dispose(); 246 247 .put(sub, Date(2000, 1, 1)); 248 .put(sub, Date(2000, 1, 2)); 249 .put(sub, Date(2017, 3, 24)); 250 .put(sub, Date(2017, 4, 24)); 251 .put(sub, Date(2017, 4, 24)); 252 253 auto data = buf.data; 254 assert(data.length == 2); 255 assert(data[0] == Date(2000, 1, 1)); 256 assert(data[1] == Date(2017, 3, 24)); 257 } 258 259 ///ditto 260 @system unittest 261 { 262 import std.datetime : Date; 263 import rx.subject : SubjectObject; 264 import std.array : appender; 265 266 auto sub = new SubjectObject!Date; 267 auto buf = appender!(Date[]); 268 269 auto disposable = sub.distinctUntilChanged!"a.year == b.year".subscribe(buf); 270 scope (exit) 271 disposable.dispose(); 272 273 .put(sub, Date(2000, 1, 1)); 274 .put(sub, Date(2000, 1, 2)); 275 .put(sub, Date(2017, 3, 24)); 276 .put(sub, Date(2017, 4, 24)); 277 .put(sub, Date(2017, 4, 24)); 278 279 auto data = buf.data; 280 assert(data.length == 2); 281 assert(data[0] == Date(2000, 1, 1)); 282 assert(data[1] == Date(2017, 3, 24)); 283 }