1 /++
2  + This module is a utility for some implements at internals.
3  +/
4 module rx.util;
5 
6 import core.atomic;
7 import core.sync.mutex;
8 import core.sync.condition;
9 
10 // @@TODO@@ Remove this overload, when the phobos of LDC supports TailShared.
11 ///
12 auto ref T assumeThreadLocal(T)(auto ref T obj) if (!is(T == shared))
13 {
14     return obj;
15 }
16 
17 ///
18 auto ref T assumeThreadLocal(T)(auto ref shared(T) obj)
19 {
20     return cast() obj;
21 }
22 
23 ///
24 unittest
25 {
26     class Test
27     {
28         int hoge()
29         {
30             return 0;
31         }
32     }
33 
34     auto raw = new shared(Test);
35     Test local1 = assumeThreadLocal(raw);
36     Test local2 = assumeThreadLocal(new shared(Test));
37 }
38 
39 ///
40 auto exchange(T, U)(ref shared(T) store, U val)
41 {
42     shared(T) temp = void;
43     do
44     {
45         temp = store;
46     }
47     while (!cas(&store, temp, val));
48     return atomicLoad(temp);
49 }
50 
51 ///
52 unittest
53 {
54     shared(int) n = 1;
55     auto temp = exchange(n, 10);
56     assert(n == 10);
57     assert(temp == 1);
58 }
59 
60 ///
61 class EventSignal
62 {
63 public:
64     ///
65     this()
66     {
67         _mutex = new Mutex;
68         _condition = new Condition(_mutex);
69     }
70 
71 public:
72     ///
73     bool signal() @property
74     {
75         synchronized (_mutex)
76         {
77             return _signal;
78         }
79     }
80 
81 public:
82     ///
83     void setSignal()
84     {
85         synchronized (_mutex)
86         {
87             _signal = true;
88             _condition.notify();
89         }
90     }
91 
92     ///
93     void wait()
94     {
95         synchronized (_mutex)
96         {
97             if (_signal)
98                 return;
99             _condition.wait();
100         }
101     }
102 
103 private:
104     Mutex _mutex;
105     Condition _condition;
106     bool _signal;
107 }
108 
109 ///
110 unittest
111 {
112     auto event = new EventSignal;
113     assert(!event.signal);
114     event.setSignal();
115     assert(event.signal);
116 }
117 
118 ///
119 package shared class AtomicCounter
120 {
121 public:
122     ///
123     this(size_t n)
124     {
125         _count = n;
126     }
127 
128 public:
129     ///
130     bool isZero() @property
131     {
132         return atomicLoad(_count) == 0;
133     }
134 
135     ///
136     bool tryUpdateCount() @trusted
137     {
138         shared(size_t) oldValue = void;
139         size_t newValue = void;
140         do
141         {
142             oldValue = _count;
143             if (oldValue == 0)
144                 return true;
145 
146             newValue = oldValue - 1;
147         }
148         while (!cas(&_count, oldValue, newValue));
149 
150         return false;
151     }
152 
153     ///
154     auto tryDecrement() @trusted
155     {
156         static struct DecrementResult
157         {
158             bool success;
159             size_t count;
160         }
161 
162         shared(size_t) oldValue = void;
163         size_t newValue = void;
164         do
165         {
166             oldValue = _count;
167             if (oldValue == 0)
168                 return DecrementResult(false, oldValue);
169 
170             newValue = oldValue - 1;
171         }
172         while (!cas(&_count, oldValue, newValue));
173 
174         return DecrementResult(true, newValue);
175     }
176 
177     ///
178     bool trySetZero() @trusted
179     {
180         shared(size_t) oldValue = void;
181         do
182         {
183             oldValue = _count;
184             if (oldValue == 0)
185                 return false;
186         }
187         while (!cas(&_count, oldValue, cast(size_t)0));
188 
189         return true;
190     }
191 
192 private:
193     size_t _count;
194 }
195 
196 ///
197 shared class TicketBase
198 {
199 public:
200     ///
201     bool stamp()
202     {
203         return cas(&_flag, false, true);
204     }
205 
206 public:
207     ///
208     bool isStamped() @property
209     {
210         return atomicLoad(_flag);
211     }
212 
213 private:
214     bool _flag = false;
215 }
216 
217 ///
218 alias Ticket = shared(TicketBase);
219 
220 ///
221 unittest
222 {
223     auto t = new Ticket;
224     assert(!t.isStamped);
225     assert(t.stamp());
226     assert(t.isStamped);
227     assert(!t.stamp());
228 }