1 /++ 2 + This module is a utility for some implements at internals. 3 +/ 4 module rx.util; 5 6 import core.atomic; 7 import core.sync.mutex; 8 import core.sync.condition; 9 10 /// 11 auto ref T assumeThreadLocal(T)(auto ref shared(T) obj) 12 { 13 return cast() obj; 14 } 15 16 /// 17 unittest 18 { 19 class Test 20 { 21 int hoge() 22 { 23 return 0; 24 } 25 } 26 27 auto raw = new shared(Test); 28 Test local1 = assumeThreadLocal(raw); 29 Test local2 = assumeThreadLocal(new shared(Test)); 30 } 31 32 /// 33 auto exchange(T, U)(ref shared(T) store, U val) 34 { 35 shared(T) temp = void; 36 do 37 { 38 temp = store; 39 } 40 while (!cas(&store, temp, val)); 41 return atomicLoad(temp); 42 } 43 44 /// 45 unittest 46 { 47 shared(int) n = 1; 48 auto temp = exchange(n, 10); 49 assert(n == 10); 50 assert(temp == 1); 51 } 52 53 /// 54 class EventSignal 55 { 56 public: 57 /// 58 this() 59 { 60 _mutex = new Mutex; 61 _condition = new Condition(_mutex); 62 } 63 64 public: 65 /// 66 bool signal() @property 67 { 68 synchronized (_mutex) 69 { 70 return _signal; 71 } 72 } 73 74 public: 75 /// 76 void setSignal() 77 { 78 synchronized (_mutex) 79 { 80 _signal = true; 81 _condition.notify(); 82 } 83 } 84 85 /// 86 void wait() 87 { 88 synchronized (_mutex) 89 { 90 if (_signal) 91 return; 92 _condition.wait(); 93 } 94 } 95 96 private: 97 Mutex _mutex; 98 Condition _condition; 99 bool _signal; 100 } 101 102 /// 103 unittest 104 { 105 auto event = new EventSignal; 106 assert(!event.signal); 107 event.setSignal(); 108 assert(event.signal); 109 } 110 111 /// 112 package shared class AtomicCounter 113 { 114 public: 115 /// 116 this(size_t n) 117 { 118 _count = n; 119 } 120 121 public: 122 /// 123 bool isZero() @property 124 { 125 return atomicLoad(_count) == 0; 126 } 127 128 /// 129 bool tryUpdateCount() @trusted 130 { 131 shared(size_t) oldValue = void; 132 size_t newValue = void; 133 do 134 { 135 oldValue = _count; 136 if (oldValue == 0) 137 return true; 138 139 newValue = oldValue - 1; 140 } 141 while (!cas(&_count, oldValue, newValue)); 142 143 return false; 144 } 145 146 /// 147 auto tryDecrement() @trusted 148 { 149 static struct DecrementResult 150 { 151 bool success; 152 size_t count; 153 } 154 155 shared(size_t) oldValue = void; 156 size_t newValue = void; 157 do 158 { 159 oldValue = _count; 160 if (oldValue == 0) 161 return DecrementResult(false, oldValue); 162 163 newValue = oldValue - 1; 164 } 165 while (!cas(&_count, oldValue, newValue)); 166 167 return DecrementResult(true, newValue); 168 } 169 170 /// 171 bool trySetZero() @trusted 172 { 173 shared(size_t) oldValue = void; 174 do 175 { 176 oldValue = _count; 177 if (oldValue == 0) 178 return false; 179 } 180 while (!cas(&_count, oldValue, cast(size_t) 0)); 181 182 return true; 183 } 184 185 private: 186 size_t _count; 187 } 188 189 /// 190 shared class TicketBase 191 { 192 public: 193 /// 194 bool stamp() 195 { 196 return cas(&_flag, false, true); 197 } 198 199 public: 200 /// 201 bool isStamped() @property 202 { 203 return atomicLoad(_flag); 204 } 205 206 private: 207 bool _flag = false; 208 } 209 210 /// 211 alias Ticket = shared(TicketBase); 212 213 /// 214 unittest 215 { 216 auto t = new Ticket; 217 assert(!t.isStamped); 218 assert(t.stamp()); 219 assert(t.isStamped); 220 assert(!t.stamp()); 221 }