1 module rx.util; 2 3 import core.atomic; 4 import core.sync.mutex; 5 import core.sync.condition; 6 7 T exchange(T, U)(ref shared(T) store, U val) 8 { 9 shared(T) temp = void; 10 do 11 { 12 temp = store; 13 } 14 while (!cas(&store, temp, val)); 15 return atomicLoad(temp); 16 } 17 18 unittest 19 { 20 shared(int) n = 1; 21 auto temp = exchange(n, 10); 22 assert(n == 10); 23 assert(temp == 1); 24 } 25 26 class EventSignal 27 { 28 public: 29 this() 30 { 31 _mutex = new Mutex; 32 _condition = new Condition(_mutex); 33 } 34 35 public: 36 bool signal() @property 37 { 38 synchronized (_mutex) 39 { 40 return _signal; 41 } 42 } 43 44 public: 45 void setSignal() 46 { 47 synchronized (_mutex) 48 { 49 _signal = true; 50 _condition.notify(); 51 } 52 } 53 54 void wait() 55 { 56 synchronized (_mutex) 57 { 58 if (_signal) 59 return; 60 _condition.wait(); 61 } 62 } 63 64 private: 65 Mutex _mutex; 66 Condition _condition; 67 bool _signal; 68 } 69 70 unittest 71 { 72 auto event = new EventSignal; 73 assert(!event.signal); 74 event.setSignal(); 75 assert(event.signal); 76 } 77 78 package shared class AtomicCounter 79 { 80 public: 81 this(size_t n) 82 { 83 _count = n; 84 } 85 86 public: 87 bool tryUpdateCount() @trusted 88 { 89 shared(size_t) oldValue = void; 90 size_t newValue = void; 91 do 92 { 93 oldValue = _count; 94 if (oldValue == 0) 95 return true; 96 97 newValue = oldValue - 1; 98 } 99 while (!cas(&_count, oldValue, newValue)); 100 101 return false; 102 } 103 104 private: 105 size_t _count; 106 } 107 108 shared class TicketBase 109 { 110 public: 111 bool stamp() 112 { 113 return cas(&_flag, false, true); 114 } 115 116 public: 117 bool isStamped() @property 118 { 119 return atomicLoad(_flag); 120 } 121 122 private: 123 bool _flag = false; 124 } 125 126 alias Ticket = shared(TicketBase); 127 128 unittest 129 { 130 auto t = new Ticket; 131 assert(!t.isStamped); 132 assert(t.stamp()); 133 assert(t.isStamped); 134 assert(!t.stamp()); 135 }