1 module rx.util; 2 3 import core.atomic; 4 import core.sync.mutex; 5 import core.sync.condition; 6 7 // @@TODO@@ Remove this overload, when the phobos of LDC supports TailShared. 8 auto ref T assumeThreadLocal(T)(auto ref T obj) if (!is(T == shared)) 9 { 10 return obj; 11 } 12 13 auto ref T assumeThreadLocal(T)(auto ref shared(T) obj) 14 { 15 return cast() obj; 16 } 17 18 unittest 19 { 20 class Test 21 { 22 int hoge() 23 { 24 return 0; 25 } 26 } 27 28 auto raw = new shared(Test); 29 Test local1 = assumeThreadLocal(raw); 30 Test local2 = assumeThreadLocal(new shared(Test)); 31 } 32 33 auto exchange(T, U)(ref shared(T) store, U val) 34 { 35 shared(T) temp = void; 36 do 37 { 38 temp = store; 39 } 40 while (!cas(&store, temp, val)); 41 return atomicLoad(temp); 42 } 43 44 unittest 45 { 46 shared(int) n = 1; 47 auto temp = exchange(n, 10); 48 assert(n == 10); 49 assert(temp == 1); 50 } 51 52 class EventSignal 53 { 54 public: 55 this() 56 { 57 _mutex = new Mutex; 58 _condition = new Condition(_mutex); 59 } 60 61 public: 62 bool signal() @property 63 { 64 synchronized (_mutex) 65 { 66 return _signal; 67 } 68 } 69 70 public: 71 void setSignal() 72 { 73 synchronized (_mutex) 74 { 75 _signal = true; 76 _condition.notify(); 77 } 78 } 79 80 void wait() 81 { 82 synchronized (_mutex) 83 { 84 if (_signal) 85 return; 86 _condition.wait(); 87 } 88 } 89 90 private: 91 Mutex _mutex; 92 Condition _condition; 93 bool _signal; 94 } 95 96 unittest 97 { 98 auto event = new EventSignal; 99 assert(!event.signal); 100 event.setSignal(); 101 assert(event.signal); 102 } 103 104 package shared class AtomicCounter 105 { 106 public: 107 this(size_t n) 108 { 109 _count = n; 110 } 111 112 public: 113 bool isZero() @property 114 { 115 return atomicLoad(_count) == 0; 116 } 117 118 bool tryUpdateCount() @trusted 119 { 120 shared(size_t) oldValue = void; 121 size_t newValue = void; 122 do 123 { 124 oldValue = _count; 125 if (oldValue == 0) 126 return true; 127 128 newValue = oldValue - 1; 129 } 130 while (!cas(&_count, oldValue, newValue)); 131 132 return false; 133 } 134 135 auto tryDecrement() @trusted 136 { 137 static struct DecrementResult 138 { 139 bool success; 140 size_t count; 141 } 142 143 shared(size_t) oldValue = void; 144 size_t newValue = void; 145 do 146 { 147 oldValue = _count; 148 if (oldValue == 0) 149 return DecrementResult(false, oldValue); 150 151 newValue = oldValue - 1; 152 } 153 while (!cas(&_count, oldValue, newValue)); 154 155 return DecrementResult(true, newValue); 156 } 157 158 bool trySetZero() @trusted 159 { 160 shared(size_t) oldValue = void; 161 do 162 { 163 oldValue = _count; 164 if (oldValue == 0) 165 return false; 166 } 167 while (!cas(&_count, oldValue, cast(size_t)0)); 168 169 return true; 170 } 171 172 private: 173 size_t _count; 174 } 175 176 shared class TicketBase 177 { 178 public: 179 bool stamp() 180 { 181 return cas(&_flag, false, true); 182 } 183 184 public: 185 bool isStamped() @property 186 { 187 return atomicLoad(_flag); 188 } 189 190 private: 191 bool _flag = false; 192 } 193 194 alias Ticket = shared(TicketBase); 195 196 unittest 197 { 198 auto t = new Ticket; 199 assert(!t.isStamped); 200 assert(t.stamp()); 201 assert(t.isStamped); 202 assert(!t.stamp()); 203 }