1 module rx.algorithm.uniq; 2 3 import rx.disposable; 4 import rx.observable; 5 import rx.observer; 6 import rx.util; 7 8 import std.functional : binaryFun; 9 import std.range : put; 10 11 struct UniqObserver(TObserver, E, alias pred = "a == b") 12 { 13 mixin SimpleObserverImpl!(TObserver, E); 14 15 public: 16 this(ref TObserver observer) 17 { 18 _observer = observer; 19 _hasValue = false; 20 } 21 22 static if (hasCompleted!TObserver || hasFailure!TObserver) 23 { 24 this(ref TObserver observer, Disposable disposable) 25 { 26 _observer = observer; 27 _disposable = disposable; 28 } 29 } 30 31 private: 32 void putImpl(E obj) 33 { 34 alias fun = binaryFun!pred; 35 36 if (_hasValue) 37 { 38 if (!fun(_current, obj)) 39 { 40 _current = obj; 41 .put(_observer, obj); 42 } 43 } 44 else 45 { 46 _current = obj; 47 _hasValue = true; 48 .put(_observer, obj); 49 } 50 } 51 52 private: 53 bool _hasValue; 54 E _current; 55 } 56 57 @safe unittest 58 { 59 import std.array : appender; 60 61 auto buf = appender!(int[]); 62 63 auto observer = UniqObserver!(typeof(buf), int)(buf); 64 65 .put(observer, [1, 1, 2, 3]); 66 67 import std.algorithm : equal; 68 69 assert(equal(buf.data, [1, 2, 3])); 70 } 71 72 @safe unittest 73 { 74 struct Person 75 { 76 string name; 77 int age; 78 } 79 80 import std.array : appender; 81 82 auto buf = appender!(Person[]); 83 84 auto observer = UniqObserver!(typeof(buf), Person, "a.name == b.name")(buf); 85 86 .put(observer, Person("Smith", 20)); 87 .put(observer, Person("Smith", 30)); 88 .put(observer, Person("Johnson", 40)); 89 .put(observer, Person("Johnson", 50)); 90 91 import std.algorithm : equal; 92 93 auto data = buf.data; 94 assert(data.length == 2); 95 assert(data[0].name == "Smith"); 96 assert(data[0].age == 20); 97 assert(data[1].name == "Johnson"); 98 assert(data[1].age == 40); 99 } 100 101 struct UniqObservable(TObservable, alias pred = "a == b") 102 { 103 alias ElementType = TObservable.ElementType; 104 105 public: 106 this(ref TObservable observable) 107 { 108 _observable = observable; 109 } 110 111 public: 112 auto subscribe(TObserver)(auto ref TObserver observer) 113 { 114 alias ObserverType = UniqObserver!(TObserver, ElementType, pred); 115 116 static if (hasCompleted!TObserver || hasFailure!TObserver) 117 { 118 auto disposable = new SingleAssignmentDisposable; 119 disposable.setDisposable(disposableObject(doSubscribe(_observable, 120 ObserverType(observer, disposable)))); 121 return disposable; 122 } 123 else 124 { 125 return doSubscribe(_observable, ObserverType(observer)); 126 } 127 } 128 129 private: 130 TObservable _observable; 131 } 132 133 @system unittest 134 { 135 import rx.subject : SubjectObject; 136 137 auto sub = new SubjectObject!int; 138 139 auto observable = UniqObservable!(typeof(sub))(sub); 140 141 import std.array : appender; 142 143 auto buf = appender!(int[]); 144 145 auto disposable = observable.subscribe(buf); 146 scope (exit) 147 disposable.dispose(); 148 149 .put(sub, 10); 150 .put(sub, 10); 151 .put(sub, 20); 152 .put(sub, 30); 153 154 auto data = buf.data; 155 assert(data.length == 3); 156 assert(data[0] == 10); 157 assert(data[1] == 20); 158 assert(data[2] == 30); 159 } 160 161 unittest 162 { 163 struct Point 164 { 165 int x; 166 int y; 167 } 168 169 import rx.subject : SubjectObject; 170 171 auto sub = new SubjectObject!Point; 172 173 auto observable = UniqObservable!(typeof(sub), "a.x == b.x")(sub); 174 175 import std.array : appender; 176 177 auto buf = appender!(Point[]); 178 179 auto disposable = observable.subscribe(buf); 180 scope (exit) 181 disposable.dispose(); 182 183 .put(sub, Point(0, 0)); 184 .put(sub, Point(0, 10)); 185 .put(sub, Point(10, 10)); 186 .put(sub, Point(10, 20)); 187 188 auto data = buf.data; 189 assert(data.length == 2); 190 assert(data[0] == Point(0, 0)); 191 assert(data[1] == Point(10, 10)); 192 } 193 194 /// 195 template uniq(alias pred = "a == b") 196 { 197 UniqObservable!(TObservable, pred) uniq(TObservable)(auto ref TObservable observable) 198 { 199 return typeof(return)(observable); 200 } 201 } 202 203 /// 204 unittest 205 { 206 import rx.subject : SubjectObject; 207 import std.array : appender; 208 209 auto sub = new SubjectObject!int; 210 auto buf = appender!(int[]); 211 212 auto disposable = sub.uniq.subscribe(buf); 213 scope (exit) 214 disposable.dispose(); 215 216 .put(sub, [11, 11, 22, 22, 33]); 217 218 auto data = buf.data; 219 assert(data.length == 3); 220 assert(data[0] == 11); 221 assert(data[1] == 22); 222 assert(data[2] == 33); 223 } 224 225 /// 226 @system unittest 227 { 228 import std.datetime : Date; 229 import rx.subject : SubjectObject; 230 import std.array : appender; 231 232 auto sub = new SubjectObject!Date; 233 auto buf = appender!(Date[]); 234 235 auto disposable = sub.uniq!"a.year == b.year".subscribe(buf); 236 scope (exit) 237 disposable.dispose(); 238 239 .put(sub, Date(2000, 1, 1)); 240 .put(sub, Date(2000, 1, 2)); 241 .put(sub, Date(2017, 3, 24)); 242 .put(sub, Date(2017, 4, 24)); 243 .put(sub, Date(2017, 4, 24)); 244 245 auto data = buf.data; 246 assert(data.length == 2); 247 assert(data[0] == Date(2000, 1, 1)); 248 assert(data[1] == Date(2017, 3, 24)); 249 }