APIs, concepts, guides, and more
SyncInterrupt_Internal.cpp
1
2
3#if defined(RSI_TEST)
4#include <condition_variable>
5#include <array>
6#include <cstdio>
7#include <deque>
8#include <mutex>
9#include <vector>
10
11#include "SyncInterrupt.h"
12
13
14
15struct WelfordStats
16{
17 double mean, M2, variance, count;
18 void Reset()
19 {
20 mean = 0;
21 M2 = 0;
22 variance = 0;
23 count = 0;
24 }
25 // Calculate running mean and variance using...
26 // Welford's Algorithm - Courtesy of StackOverflow + Wikipedia
27 // https://stackoverflow.com/a/17053010
28 WelfordStats UpdateCalculateMeanVariance(const std::vector<double>& vec)
29 {
30 double& mean = this->mean, & M2 = this->M2, & variance = this->variance;
31 double& count = this->count;
32 size_t n = vec.size();
33 for (size_t i = 0; i < n; ++i, ++count)
34 {
35 double delta = vec[i] - mean;
36 mean += delta / (count + 1);
37 M2 += delta * (vec[i] - mean);
38 variance = M2 / (count + 1);
39 if (i >= 2)
40 {
41 // <-- You can use the running mean and variance here
42 }
43 }
44 return *this;
45 }
46
47 // TODO once we get Welford's working we will probably want something like...
48 // A Streaming Parallel Decision Tree Algorithm - Y.Ben-Haim, E. Tom-Tov
49 // https://stackoverflow.com/a/73355085
50 // points to https://github.com/apache/spark/blob/4c7888dd9159dc203628b0d84f0ee2f90ab4bf13/sql/catalyst/src/main/java/org/apache/spark/sql/util/NumericHistogram.java
51};
52
53template<typename DataType, size_t _Size>
54struct locked_buffer_template
55{
56 std::mutex mutex;
57 std::vector<DataType> data;
58 bool full;
59 size_t index;
60
61 locked_buffer_template() :
62 full(false),
63 index(0)
64 {
65 this->data.resize(_Size, 0);
66 }
67
68 void Reset()
69 {
70 this->full = false;
71 this->index = 0;
72 }
73
74 // add data
75 void AddData(const DataType& datum)
76 {
77 if (!this->IsFull())
78 {
79 this->data[this->index++] = datum;
80 }
81 }
82 bool IsFull() const
83 {
84 return !(this->index < _Size);
85 }
86};
87template<typename DataType, size_t rotating_buffer_count, size_t rotating_buffer_size>
88class StatisticsBufferTemplate_RSI : public StatisticsBufferTemplate<DataType> {
89public:
90
91
92 using locked_buffer = locked_buffer_template<DataType, rotating_buffer_size>;
93
94 std::array<locked_buffer, rotating_buffer_count> buffers;
95 int buffer_idx;
96
97 locked_buffer* buffer_in_use;
98
99 WelfordStats running_stat; // should this get locked???
100
101 // These get locked together!
102 std::mutex mtx_nextData;
103 std::condition_variable cv;
104 std::deque<locked_buffer*> nextData;
105
106 StatisticsBufferTemplate_RSI() :
107 buffer_idx(0),
108 buffer_in_use(&buffers[0])
109 {
110 }
111
112 // You must call this from your running thread.
113 // If you do as I did and let the static initializer call it, the main thread
114 // gets the mutex, and calling unlock fails badly
115 void Init()
116 {
117 running_stat.Reset();
118 for (auto& buf : this->buffers)
119 {
120 std::scoped_lock<std::mutex> lock(buf.mutex);
121 buf.Reset();
122 }
123 this->buffer_in_use->mutex.lock(); // start with first buffer locked!
124 }
125 void Reset()
126 {
127 buffer_in_use->mutex.unlock();
128 this->cv.notify_all();
129 }
130 void AddData(const DataType& datum)
131 {
132 this->StatisticsBufferTemplate<DataType>::AddData(datum);
133 if (buffer_in_use->IsFull())
134 {
135 this->RotateBuffer();
136 }
137
138 this->buffer_in_use->AddData(datum);
139 }
140
141private:
142 // The thread that rotates the buffer "owns" the current buffer to be filled
143 // When the current buffer is full, its unlocked, marked as full, and put into the nextData queue
144 // The next buffer is then locked and marked as not full
145 void RotateBuffer()
146 {
147 // lock our deque so we can add to it
148 std::scoped_lock<std::mutex> lock(this->mtx_nextData);
149 this->nextData.emplace_back(buffer_in_use);
150
151 this->buffer_idx = (this->buffer_idx + 1) % this->buffers.size();
152 auto* next_buffer = &this->buffers[this->buffer_idx];
153 // Acquire the lock for the next buffer
154 if (!next_buffer->mutex.try_lock())
155 {
156 // something is wrong. BUT WHAT?!
157
158 // resolve or log...
159
160 // ... or just wait.
161 next_buffer->mutex.lock();
162 }
163 next_buffer->Reset();
164 // next buffer is locked (and loaded) and ready for new data
165
166 this->buffer_in_use->full = true;
167 this->buffer_in_use->mutex.unlock();
168 // current buffer is done and unlocked and unloaded
169
170 this->buffer_in_use = next_buffer;
171 this->cv.notify_all();
172 }
173};
174using StatisticsBufferImpl = StatisticsBufferTemplate_RSI<double, 10, 10>;
175
176
177
178
179
180StatisticsBufferImpl buffered_stats_impl;
181StatisticsBuffer& buffered_stats = buffered_stats_impl;
182
183
184void StatisticsThread()
185{
186 StatisticsBufferImpl::locked_buffer* buffer;
187 while (!readyToCleanup) // the CV should sleep the thread!
188 {
189 {
190 std::unique_lock<std::mutex> lock(buffered_stats_impl.mtx_nextData);
191 buffered_stats_impl.cv.wait(lock, [] { return readyToCleanup || !buffered_stats_impl.nextData.empty(); });
192 if (readyToCleanup)
193 {
194 break;
195 }
196 buffer = buffered_stats_impl.nextData.front();
197 buffered_stats_impl.nextData.pop_front();
198
199
200 if (!buffer->mutex.try_lock())
201 {
202 // if we cant unlock writer thread is still writing...
203 // do something?
204
205 buffer->mutex.lock();// wait for lock...
206 }
207 } // unlock buffered_data.mtx_nextData
208
209
210 buffered_stats_impl.running_stat.UpdateCalculateMeanVariance(buffer->data);
211 buffer->mutex.unlock();
212 buffer->Reset();
213 }
214}
215
216void PrintTimingInfo()
217{
218 // && iterations to wait until we start looping.
219 // short circuit on bPrint
220 if (print && syncInterruptIterations)
221 {
222 printf("\t\t%ld\t|\t%8.1lfus\t|\t%8.1lfus\t|\t%8.1lfus\t|\t%8.1lfus\t|\t%8.1lf\r",
223 syncInterruptIterations, deltaMicroseconds, buffered_stats_impl.min, buffered_stats_impl.max,
224 buffered_stats_impl.running_stat.mean, buffered_stats_impl.running_stat.variance);
225 }
226}
227void printTimingHeaderString()
228{
229 printf("Number Processed\t|\tDeltaT (uS)\t|\tMin (uS)\t|\tMax (uS)\t|\tMean (uS)\t|\tVariance\n");
230}
231
232#endif