1 /++
2  + This module is a utility for some implements at internals.
3  +/
4 module rx.util;
5 
6 import core.atomic;
7 import core.sync.mutex;
8 import core.sync.condition;
9 
10 ///
11 auto ref T assumeThreadLocal(T)(auto ref shared(T) obj)
12 {
13     return cast() obj;
14 }
15 
16 ///
17 unittest
18 {
19     class Test
20     {
21         int hoge()
22         {
23             return 0;
24         }
25     }
26 
27     auto raw = new shared(Test);
28     Test local1 = assumeThreadLocal(raw);
29     Test local2 = assumeThreadLocal(new shared(Test));
30 }
31 
32 ///
33 auto exchange(T, U)(ref shared(T) store, U val)
34 {
35     shared(T) temp = void;
36     do
37     {
38         temp = store;
39     }
40     while (!cas(&store, temp, val));
41     return atomicLoad(temp);
42 }
43 
44 ///
45 unittest
46 {
47     shared(int) n = 1;
48     auto temp = exchange(n, 10);
49     assert(n == 10);
50     assert(temp == 1);
51 }
52 
53 ///
54 class EventSignal
55 {
56 public:
57     ///
58     this()
59     {
60         _mutex = new Mutex;
61         _condition = new Condition(_mutex);
62     }
63 
64 public:
65     ///
66     bool signal() @property
67     {
68         synchronized (_mutex)
69         {
70             return _signal;
71         }
72     }
73 
74 public:
75     ///
76     void setSignal()
77     {
78         synchronized (_mutex)
79         {
80             _signal = true;
81             _condition.notify();
82         }
83     }
84 
85     ///
86     void wait()
87     {
88         synchronized (_mutex)
89         {
90             if (_signal)
91                 return;
92             _condition.wait();
93         }
94     }
95 
96 private:
97     Mutex _mutex;
98     Condition _condition;
99     bool _signal;
100 }
101 
102 ///
103 unittest
104 {
105     auto event = new EventSignal;
106     assert(!event.signal);
107     event.setSignal();
108     assert(event.signal);
109 }
110 
111 ///
112 package shared class AtomicCounter
113 {
114 public:
115     ///
116     this(size_t n)
117     {
118         _count = n;
119     }
120 
121 public:
122     ///
123     bool isZero() @property
124     {
125         return atomicLoad(_count) == 0;
126     }
127 
128     ///
129     bool tryUpdateCount() @trusted
130     {
131         shared(size_t) oldValue = void;
132         size_t newValue = void;
133         do
134         {
135             oldValue = _count;
136             if (oldValue == 0)
137                 return true;
138 
139             newValue = oldValue - 1;
140         }
141         while (!cas(&_count, oldValue, newValue));
142 
143         return false;
144     }
145 
146     ///
147     auto tryDecrement() @trusted
148     {
149         static struct DecrementResult
150         {
151             bool success;
152             size_t count;
153         }
154 
155         shared(size_t) oldValue = void;
156         size_t newValue = void;
157         do
158         {
159             oldValue = _count;
160             if (oldValue == 0)
161                 return DecrementResult(false, oldValue);
162 
163             newValue = oldValue - 1;
164         }
165         while (!cas(&_count, oldValue, newValue));
166 
167         return DecrementResult(true, newValue);
168     }
169 
170     ///
171     bool trySetZero() @trusted
172     {
173         shared(size_t) oldValue = void;
174         do
175         {
176             oldValue = _count;
177             if (oldValue == 0)
178                 return false;
179         }
180         while (!cas(&_count, oldValue, cast(size_t) 0));
181 
182         return true;
183     }
184 
185 private:
186     size_t _count;
187 }
188 
189 ///
190 shared class TicketBase
191 {
192 public:
193     ///
194     bool stamp()
195     {
196         return cas(&_flag, false, true);
197     }
198 
199 public:
200     ///
201     bool isStamped() @property
202     {
203         return atomicLoad(_flag);
204     }
205 
206 private:
207     bool _flag = false;
208 }
209 
210 ///
211 alias Ticket = shared(TicketBase);
212 
213 ///
214 unittest
215 {
216     auto t = new Ticket;
217     assert(!t.isStamped);
218     assert(t.stamp());
219     assert(t.isStamped);
220     assert(!t.stamp());
221 }