1 module rx.util;
2 
3 import core.atomic;
4 import core.sync.mutex;
5 import core.sync.condition;
6 
7 T exchange(T, U)(ref shared(T) store, U val)
8 {
9     shared(T) temp = void;
10     do
11     {
12         temp = store;
13     }
14     while (!cas(&store, temp, val));
15     return atomicLoad(temp);
16 }
17 
18 unittest
19 {
20     shared(int) n = 1;
21     auto temp = exchange(n, 10);
22     assert(n == 10);
23     assert(temp == 1);
24 }
25 
26 class EventSignal
27 {
28 public:
29     this()
30     {
31         _mutex = new Mutex;
32         _condition = new Condition(_mutex);
33     }
34 
35 public:
36     bool signal() @property
37     {
38         synchronized (_mutex)
39         {
40             return _signal;
41         }
42     }
43 
44 public:
45     void setSignal()
46     {
47         synchronized (_mutex)
48         {
49             _signal = true;
50             _condition.notify();
51         }
52     }
53 
54     void wait()
55     {
56         synchronized (_mutex)
57         {
58             if (_signal)
59                 return;
60             _condition.wait();
61         }
62     }
63 
64 private:
65     Mutex _mutex;
66     Condition _condition;
67     bool _signal;
68 }
69 
70 unittest
71 {
72     auto event = new EventSignal;
73     assert(!event.signal);
74     event.setSignal();
75     assert(event.signal);
76 }
77 
78 package shared class AtomicCounter
79 {
80 public:
81     this(size_t n)
82     {
83         _count = n;
84     }
85 
86 public:
87     bool tryUpdateCount() @trusted
88     {
89         shared(size_t) oldValue = void;
90         size_t newValue = void;
91         do
92         {
93             oldValue = _count;
94             if (oldValue == 0)
95                 return true;
96 
97             newValue = oldValue - 1;
98         }
99         while (!cas(&_count, oldValue, newValue));
100 
101         return false;
102     }
103 
104 private:
105     size_t _count;
106 }
107 
108 shared class TicketBase
109 {
110 public:
111     bool stamp()
112     {
113         return cas(&_flag, false, true);
114     }
115 
116 public:
117     bool isStamped() @property
118     {
119         return atomicLoad(_flag);
120     }
121 
122 private:
123     bool _flag = false;
124 }
125 
126 alias Ticket = shared(TicketBase);
127 
128 unittest
129 {
130     auto t = new Ticket;
131     assert(!t.isStamped);
132     assert(t.stamp());
133     assert(t.isStamped);
134     assert(!t.stamp());
135 }