1 module rx.util;
2 
3 import core.atomic;
4 import core.sync.mutex;
5 import core.sync.condition;
6 
7 // @@TODO@@ Remove this overload, when the phobos of LDC supports TailShared.
8 auto ref T assumeThreadLocal(T)(auto ref T obj) if (!is(T == shared))
9 {
10     return obj;
11 }
12 
13 auto ref T assumeThreadLocal(T)(auto ref shared(T) obj)
14 {
15     return cast() obj;
16 }
17 
18 unittest
19 {
20     class Test
21     {
22         int hoge()
23         {
24             return 0;
25         }
26     }
27 
28     auto raw = new shared(Test);
29     Test local1 = assumeThreadLocal(raw);
30     Test local2 = assumeThreadLocal(new shared(Test));
31 }
32 
33 auto exchange(T, U)(ref shared(T) store, U val)
34 {
35     shared(T) temp = void;
36     do
37     {
38         temp = store;
39     }
40     while (!cas(&store, temp, val));
41     return atomicLoad(temp);
42 }
43 
44 unittest
45 {
46     shared(int) n = 1;
47     auto temp = exchange(n, 10);
48     assert(n == 10);
49     assert(temp == 1);
50 }
51 
52 class EventSignal
53 {
54 public:
55     this()
56     {
57         _mutex = new Mutex;
58         _condition = new Condition(_mutex);
59     }
60 
61 public:
62     bool signal() @property
63     {
64         synchronized (_mutex)
65         {
66             return _signal;
67         }
68     }
69 
70 public:
71     void setSignal()
72     {
73         synchronized (_mutex)
74         {
75             _signal = true;
76             _condition.notify();
77         }
78     }
79 
80     void wait()
81     {
82         synchronized (_mutex)
83         {
84             if (_signal)
85                 return;
86             _condition.wait();
87         }
88     }
89 
90 private:
91     Mutex _mutex;
92     Condition _condition;
93     bool _signal;
94 }
95 
96 unittest
97 {
98     auto event = new EventSignal;
99     assert(!event.signal);
100     event.setSignal();
101     assert(event.signal);
102 }
103 
104 package shared class AtomicCounter
105 {
106 public:
107     this(size_t n)
108     {
109         _count = n;
110     }
111 
112 public:
113     bool isZero() @property
114     {
115         return atomicLoad(_count) == 0;
116     }
117 
118     bool tryUpdateCount() @trusted
119     {
120         shared(size_t) oldValue = void;
121         size_t newValue = void;
122         do
123         {
124             oldValue = _count;
125             if (oldValue == 0)
126                 return true;
127 
128             newValue = oldValue - 1;
129         }
130         while (!cas(&_count, oldValue, newValue));
131 
132         return false;
133     }
134 
135     auto tryDecrement() @trusted
136     {
137         static struct DecrementResult
138         {
139             bool success;
140             size_t count;
141         }
142 
143         shared(size_t) oldValue = void;
144         size_t newValue = void;
145         do
146         {
147             oldValue = _count;
148             if (oldValue == 0)
149                 return DecrementResult(false, oldValue);
150 
151             newValue = oldValue - 1;
152         }
153         while (!cas(&_count, oldValue, newValue));
154 
155         return DecrementResult(true, newValue);
156     }
157 
158     bool trySetZero() @trusted
159     {
160         shared(size_t) oldValue = void;
161         do
162         {
163             oldValue = _count;
164             if (oldValue == 0)
165                 return false;
166         }
167         while (!cas(&_count, oldValue, cast(size_t)0));
168 
169         return true;
170     }
171 
172 private:
173     size_t _count;
174 }
175 
176 shared class TicketBase
177 {
178 public:
179     bool stamp()
180     {
181         return cas(&_flag, false, true);
182     }
183 
184 public:
185     bool isStamped() @property
186     {
187         return atomicLoad(_flag);
188     }
189 
190 private:
191     bool _flag = false;
192 }
193 
194 alias Ticket = shared(TicketBase);
195 
196 unittest
197 {
198     auto t = new Ticket;
199     assert(!t.isStamped);
200     assert(t.stamp());
201     assert(t.isStamped);
202     assert(!t.stamp());
203 }