1 /++ 2 + This module is a utility for some implements at internals. 3 +/ 4 module rx.util; 5 6 import core.atomic; 7 import core.sync.mutex; 8 import core.sync.condition; 9 10 // @@TODO@@ Remove this overload, when the phobos of LDC supports TailShared. 11 /// 12 auto ref T assumeThreadLocal(T)(auto ref T obj) if (!is(T == shared)) 13 { 14 return obj; 15 } 16 17 /// 18 auto ref T assumeThreadLocal(T)(auto ref shared(T) obj) 19 { 20 return cast() obj; 21 } 22 23 /// 24 unittest 25 { 26 class Test 27 { 28 int hoge() 29 { 30 return 0; 31 } 32 } 33 34 auto raw = new shared(Test); 35 Test local1 = assumeThreadLocal(raw); 36 Test local2 = assumeThreadLocal(new shared(Test)); 37 } 38 39 /// 40 auto exchange(T, U)(ref shared(T) store, U val) 41 { 42 shared(T) temp = void; 43 do 44 { 45 temp = store; 46 } 47 while (!cas(&store, temp, val)); 48 return atomicLoad(temp); 49 } 50 51 /// 52 unittest 53 { 54 shared(int) n = 1; 55 auto temp = exchange(n, 10); 56 assert(n == 10); 57 assert(temp == 1); 58 } 59 60 /// 61 class EventSignal 62 { 63 public: 64 /// 65 this() 66 { 67 _mutex = new Mutex; 68 _condition = new Condition(_mutex); 69 } 70 71 public: 72 /// 73 bool signal() @property 74 { 75 synchronized (_mutex) 76 { 77 return _signal; 78 } 79 } 80 81 public: 82 /// 83 void setSignal() 84 { 85 synchronized (_mutex) 86 { 87 _signal = true; 88 _condition.notify(); 89 } 90 } 91 92 /// 93 void wait() 94 { 95 synchronized (_mutex) 96 { 97 if (_signal) 98 return; 99 _condition.wait(); 100 } 101 } 102 103 private: 104 Mutex _mutex; 105 Condition _condition; 106 bool _signal; 107 } 108 109 /// 110 unittest 111 { 112 auto event = new EventSignal; 113 assert(!event.signal); 114 event.setSignal(); 115 assert(event.signal); 116 } 117 118 /// 119 package shared class AtomicCounter 120 { 121 public: 122 /// 123 this(size_t n) 124 { 125 _count = n; 126 } 127 128 public: 129 /// 130 bool isZero() @property 131 { 132 return atomicLoad(_count) == 0; 133 } 134 135 /// 136 bool tryUpdateCount() @trusted 137 { 138 shared(size_t) oldValue = void; 139 size_t newValue = void; 140 do 141 { 142 oldValue = _count; 143 if (oldValue == 0) 144 return true; 145 146 newValue = oldValue - 1; 147 } 148 while (!cas(&_count, oldValue, newValue)); 149 150 return false; 151 } 152 153 /// 154 auto tryDecrement() @trusted 155 { 156 static struct DecrementResult 157 { 158 bool success; 159 size_t count; 160 } 161 162 shared(size_t) oldValue = void; 163 size_t newValue = void; 164 do 165 { 166 oldValue = _count; 167 if (oldValue == 0) 168 return DecrementResult(false, oldValue); 169 170 newValue = oldValue - 1; 171 } 172 while (!cas(&_count, oldValue, newValue)); 173 174 return DecrementResult(true, newValue); 175 } 176 177 /// 178 bool trySetZero() @trusted 179 { 180 shared(size_t) oldValue = void; 181 do 182 { 183 oldValue = _count; 184 if (oldValue == 0) 185 return false; 186 } 187 while (!cas(&_count, oldValue, cast(size_t)0)); 188 189 return true; 190 } 191 192 private: 193 size_t _count; 194 } 195 196 /// 197 shared class TicketBase 198 { 199 public: 200 /// 201 bool stamp() 202 { 203 return cas(&_flag, false, true); 204 } 205 206 public: 207 /// 208 bool isStamped() @property 209 { 210 return atomicLoad(_flag); 211 } 212 213 private: 214 bool _flag = false; 215 } 216 217 /// 218 alias Ticket = shared(TicketBase); 219 220 /// 221 unittest 222 { 223 auto t = new Ticket; 224 assert(!t.isStamped); 225 assert(t.stamp()); 226 assert(t.isStamped); 227 assert(!t.stamp()); 228 }