APIs, concepts, guides, and more
SyncInterrupt_Internal.cpp
1
2
3#if defined(RSI_TEST)
4#include <condition_variable>
5#include <array>
6#include <deque>
7#include <mutex>
8#include <vector>
9
10#include "SyncInterrupt.h"
11
12
13
14struct WelfordStats
15{
16 double mean, M2, variance, count;
17 void Reset()
18 {
19 mean = 0;
20 M2 = 0;
21 variance = 0;
22 count = 0;
23 }
24 // Calculate running mean and variance using...
25 // Welford's Algorithm - Courtesy of StackOverflow + Wikipedia
26 // https://stackoverflow.com/a/17053010
27 WelfordStats UpdateCalculateMeanVariance(const std::vector<double>& vec)
28 {
29 double& mean = this->mean, & M2 = this->M2, & variance = this->variance;
30 double& count = this->count;
31 size_t n = vec.size();
32 for (size_t i = 0; i < n; ++i, ++count)
33 {
34 double delta = vec[i] - mean;
35 mean += delta / (count + 1);
36 M2 += delta * (vec[i] - mean);
37 variance = M2 / (count + 1);
38 if (i >= 2)
39 {
40 // <-- You can use the running mean and variance here
41 }
42 }
43 return *this;
44 }
45
46 // TODO once we get Welford's working we will probably want something like...
47 // A Streaming Parallel Decision Tree Algorithm - Y.Ben-Haim, E. Tom-Tov
48 // https://stackoverflow.com/a/73355085
49 // points to https://github.com/apache/spark/blob/4c7888dd9159dc203628b0d84f0ee2f90ab4bf13/sql/catalyst/src/main/java/org/apache/spark/sql/util/NumericHistogram.java
50};
51
52template<typename DataType, size_t _Size>
53struct locked_buffer_template
54{
55 std::mutex mutex;
56 std::vector<DataType> data;
57 bool full;
58 size_t index;
59
60 locked_buffer_template() :
61 full(false),
62 index(0)
63 {
64 this->data.resize(_Size, 0);
65 }
66
67 void Reset()
68 {
69 this->full = false;
70 this->index = 0;
71 }
72
73 // add data
74 void AddData(const DataType& datum)
75 {
76 if (!this->IsFull())
77 {
78 this->data[this->index++] = datum;
79 }
80 }
81 bool IsFull() const
82 {
83 return !(this->index < _Size);
84 }
85};
86template<typename DataType, size_t rotating_buffer_count, size_t rotating_buffer_size>
87class StatisticsBufferTemplate_RSI : public StatisticsBufferTemplate<DataType> {
88public:
89
90
91 using locked_buffer = locked_buffer_template<DataType, rotating_buffer_size>;
92
93 std::array<locked_buffer, rotating_buffer_count> buffers;
94 int buffer_idx;
95
96 locked_buffer* buffer_in_use;
97
98 WelfordStats running_stat; // should this get locked???
99
100 // These get locked together!
101 std::mutex mtx_nextData;
102 std::condition_variable cv;
103 std::deque<locked_buffer*> nextData;
104
105 StatisticsBufferTemplate_RSI() :
106 buffer_idx(0),
107 buffer_in_use(&buffers[0])
108 {
109 }
110
111 // You must call this from your running thread.
112 // If you do as I did and let the static initializer call it, the main thread
113 // gets the mutex, and calling unlock fails badly
114 void Init()
115 {
116 running_stat.Reset();
117 for (auto& buf : this->buffers)
118 {
119 std::scoped_lock<std::mutex> lock(buf.mutex);
120 buf.Reset();
121 }
122 this->buffer_in_use->mutex.lock(); // start with first buffer locked!
123 }
124 void Reset()
125 {
126 buffer_in_use->mutex.unlock();
127 this->cv.notify_all();
128 }
129 void AddData(const DataType& datum)
130 {
131 this->StatisticsBufferTemplate<DataType>::AddData(datum);
132 if (buffer_in_use->IsFull())
133 {
134 this->RotateBuffer();
135 }
136
137 this->buffer_in_use->AddData(datum);
138 }
139
140private:
141 // The thread that rotates the buffer "owns" the current buffer to be filled
142 // When the current buffer is full, its unlocked, marked as full, and put into the nextData queue
143 // The next buffer is then locked and marked as not full
144 void RotateBuffer()
145 {
146 // lock our deque so we can add to it
147 std::scoped_lock<std::mutex> lock(this->mtx_nextData);
148 this->nextData.emplace_back(buffer_in_use);
149
150 this->buffer_idx = (this->buffer_idx + 1) % this->buffers.size();
151 auto* next_buffer = &this->buffers[this->buffer_idx];
152 // Acquire the lock for the next buffer
153 if (!next_buffer->mutex.try_lock())
154 {
155 // something is wrong. BUT WHAT?!
156
157 // resolve or log...
158
159 // ... or just wait.
160 next_buffer->mutex.lock();
161 }
162 next_buffer->Reset();
163 // next buffer is locked (and loaded) and ready for new data
164
165 this->buffer_in_use->full = true;
166 this->buffer_in_use->mutex.unlock();
167 // current buffer is done and unlocked and unloaded
168
169 this->buffer_in_use = next_buffer;
170 this->cv.notify_all();
171 }
172};
173using StatisticsBufferImpl = StatisticsBufferTemplate_RSI<double, 10, 10>;
174
175
176
177
178
179StatisticsBufferImpl buffered_stats_impl;
180StatisticsBuffer& buffered_stats = buffered_stats_impl;
181
182
183void StatisticsThread()
184{
185 StatisticsBufferImpl::locked_buffer* buffer;
186 while (!readyToCleanup) // the CV should sleep the thread!
187 {
188 {
189 std::unique_lock<std::mutex> lock(buffered_stats_impl.mtx_nextData);
190 buffered_stats_impl.cv.wait(lock, [] { return readyToCleanup || !buffered_stats_impl.nextData.empty(); });
191 if (readyToCleanup)
192 {
193 break;
194 }
195 buffer = buffered_stats_impl.nextData.front();
196 buffered_stats_impl.nextData.pop_front();
197
198
199 if (!buffer->mutex.try_lock())
200 {
201 // if we cant unlock writer thread is still writing...
202 // do something?
203
204 buffer->mutex.lock();// wait for lock...
205 }
206 } // unlock buffered_data.mtx_nextData
207
208
209 buffered_stats_impl.running_stat.UpdateCalculateMeanVariance(buffer->data);
210 buffer->mutex.unlock();
211 buffer->Reset();
212 }
213}
214
215void PrintTimingInfo()
216{
217 // && iterations to wait until we start looping.
218 // short circuit on bPrint
219 if (print && syncInterruptIterations)
220 {
221 printf("\t\t%ld\t|\t%8.1lfus\t|\t%8.1lfus\t|\t%8.1lfus\t|\t%8.1lfus\t|\t%8.1lf\r",
222 syncInterruptIterations, deltaMicroseconds, buffered_stats_impl.min, buffered_stats_impl.max,
223 buffered_stats_impl.running_stat.mean, buffered_stats_impl.running_stat.variance);
224 }
225}
226void printTimingHeaderString()
227{
228 printf("Number Processed\t|\tDeltaT (uS)\t|\tMin (uS)\t|\tMax (uS)\t|\tMean (uS)\t|\tVariance\n");
229}
230
231#endif