Krita Source Code Documentation
Loading...
Searching...
No Matches
concurrent_map.h
Go to the documentation of this file.
1/*------------------------------------------------------------------------
2 Junction: Concurrent data structures in C++
3 Copyright (c) 2016 Jeff Preshing
4 Distributed under the Simplified BSD License.
5 Original location: https://github.com/preshing/junction
6 This software is distributed WITHOUT ANY WARRANTY; without even the
7 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
8 See the LICENSE file for more information.
9------------------------------------------------------------------------*/
10
11#ifndef CONCURRENTMAP_H
12#define CONCURRENTMAP_H
13
14#include "leapfrog.h"
15#include "qsbr.h"
16
17template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V> >
19{
20public:
21 typedef K Key;
22 typedef V Value;
23 typedef KT KeyTraits;
24 typedef VT ValueTraits;
25 typedef quint32 Hash;
27
28private:
31
32public:
33 ConcurrentMap(quint64 capacity = Details::InitialSize) : m_root(Details::Table::create(capacity))
34 {
35 }
36
38 {
39 typename Details::Table* table = m_root.loadNonatomic();
40 table->destroy();
41 m_gc.flush();
42 }
43
45 {
46 return m_gc;
47 }
48
50 {
51 return quint64(m_root.loadNonatomic()->jobCoordinator.loadConsume()) > 1;
52 }
53
54 // publishTableMigration() is called by exactly one thread from Details::TableMigration::run()
55 // after all the threads participating in the migration have completed their work.
57 {
58 m_root.store(migration->m_destination, Release);
59 // Caller will GC the TableMigration and the source table.
60 }
61
62 // A Mutator represents a known cell in the hash table.
63 // It's meant for manipulations within a temporary function scope.
64 // Obviously you must not call QSBR::Update while holding a Mutator.
65 // Any operation that modifies the table (exchangeValue, eraseValue)
66 // may be forced to follow a redirected cell, which changes the Mutator itself.
67 // Note that even if the Mutator was constructed from an existing cell,
68 // exchangeValue() can still trigger a resize if the existing cell was previously marked deleted,
69 // or if another thread deletes the key between the two steps.
70 class Mutator
71 {
72 private:
73 friend class ConcurrentMap;
74
79
80 // Constructor: Find existing cell
81 Mutator(ConcurrentMap& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue))
82 {
83 Hash hash = KeyTraits::hash(key);
84 for (;;) {
87 if (!m_cell) {
88 return;
89 }
90
92 if (value != Value(ValueTraits::Redirect)) {
93 // Found an existing value
94 m_value = value;
95 return;
96 }
97 // We've encountered a Redirect value. Help finish the migration.
99 // Try again using the latest root.
100 }
101 }
102
103 // Constructor: Insert or find cell
104 Mutator(ConcurrentMap& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue))
105 {
106 Hash hash = KeyTraits::hash(key);
107 for (;;) {
108 m_table = m_map.m_root.load(Consume);
109 quint64 overflowIdx;
110 switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
112 // We've inserted a new cell. Don't load m_cell->value.
113 return;
114 }
116 // The hash was already found in the table.
118 if (value == Value(ValueTraits::Redirect)) {
119 // We've encountered a Redirect value.
120 break; // Help finish the migration.
121 }
122 // Found an existing value
123 m_value = value;
124 return;
125 }
127 // Unlike ConcurrentMap_Linear, we don't need to keep track of & pass a "mustDouble" flag.
128 // Passing overflowIdx is sufficient to prevent an infinite loop here.
129 // It defines the start of the range of cells to check while estimating total cells in use.
130 // After the first migration, deleted keys are purged, so if we hit this line during the
131 // second loop iteration, every cell in the range will be in use, thus the estimate will be 100%.
132 // (Concurrent deletes could result in further iterations, but it will eventually settle.)
134 break;
135 }
136 }
137 // A migration has been started (either by us, or another thread). Participate until it's complete.
139 // Try again using the latest root.
140 }
141 }
142
143 public:
145 {
146 // Return previously loaded value. Don't load it again.
147 return Value(m_value);
148 }
149
151 {
152 for (;;) {
153 Value oldValue = m_value;
155 // Exchange was successful. Return previous value.
156 Value result = m_value;
157 m_value = desired; // Leave the mutator in a valid state
158 return result;
159 }
160 // The CAS failed and m_value has been updated with the latest value.
161 if (m_value != Value(ValueTraits::Redirect)) {
162 if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
163 // racing write inserted new value
164 }
165 // There was a racing write (or erase) to this cell.
166 // Pretend we exchanged with ourselves, and just let the racing write win.
167 return desired;
168 }
169
170 // We've encountered a Redirect value. Help finish the migration.
171 Hash hash = m_cell->hash.load(Relaxed);
172 for (;;) {
173 // Help complete the migration.
175 // Try again in the new table.
176 m_table = m_map.m_root.load(Consume);
177 m_value = Value(ValueTraits::NullValue);
178 quint64 overflowIdx;
179
180 switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
183 if (m_value == Value(ValueTraits::Redirect)) {
184 break;
185 }
186 goto breakOuter;
188 goto breakOuter;
191 break;
192 }
193 // We were redirected... again
194 }
195 breakOuter:;
196 // Try again in the new table.
197 }
198 }
199
200 void assignValue(Value desired)
201 {
202 exchangeValue(desired);
203 }
204
206 {
207 for (;;) {
208 if (m_value == Value(ValueTraits::NullValue)) {
209 return Value(m_value);
210 }
211
212 if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), Consume)) {
213 // Exchange was successful and a non-NULL value was erased and returned by reference in m_value.
214 Value result = m_value;
215 m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
216 return result;
217 }
218
219 // The CAS failed and m_value has been updated with the latest value.
220 if (m_value != Value(ValueTraits::Redirect)) {
221 // There was a racing write (or erase) to this cell.
222 // Pretend we erased nothing, and just let the racing write win.
223 return Value(ValueTraits::NullValue);
224 }
225
226 // We've been redirected to a new table.
227 Hash hash = m_cell->hash.load(Relaxed); // Re-fetch hash
228 for (;;) {
229 // Help complete the migration.
231 // Try again in the new table.
232 m_table = m_map.m_root.load(Consume);
234 if (!m_cell) {
235 m_value = Value(ValueTraits::NullValue);
236 return m_value;
237 }
238
240 if (m_value != Value(ValueTraits::Redirect)) {
241 break;
242 }
243 }
244 }
245 }
246 };
247
249 {
250 return Mutator(*this, key);
251 }
252
254 {
255 return Mutator(*this, key, false);
256 }
257
258 // Lookup without creating a temporary Mutator.
260 {
261 Hash hash = KeyTraits::hash(key);
262 for (;;) {
263 typename Details::Table* table = m_root.load(Consume);
264 typename Details::Cell* cell = Details::find(hash, table);
265 if (!cell) {
266 return Value(ValueTraits::NullValue);
267 }
268
269 Value value = cell->value.load(Consume);
270 if (value != Value(ValueTraits::Redirect)) {
271 return value; // Found an existing value
272 }
273 // We've been redirected to a new table. Help with the migration.
275 // Try again in the new table.
276 }
277 }
278
279 Value assign(Key key, Value desired)
280 {
281 Mutator iter(*this, key);
282 return iter.exchangeValue(desired);
283 }
284
285 Value exchange(Key key, Value desired)
286 {
287 Mutator iter(*this, key);
288 return iter.exchangeValue(desired);
289 }
290
292 {
293 Mutator iter(*this, key, false);
294 return iter.eraseValue();
295 }
296
297 // The easiest way to implement an Iterator is to prevent all Redirects.
298 // The currrent Iterator does that by forbidding concurrent inserts.
299 // To make it work with concurrent inserts, we'd need a way to block TableMigrations.
301 {
302 private:
304 quint64 m_idx;
307
308 public:
309 Iterator() = default;
311 {
312 // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
313 m_table = map.m_root.load(Consume);
314 m_idx = -1;
315 next();
316 }
317
319 {
320 m_table = map.m_root.load(Consume);
321 m_idx = -1;
322 next();
323 }
324
325 void next()
326 {
327 while (++m_idx <= m_table->sizeMask) {
328 // Index still inside range of table.
329 typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2);
330 typename Details::Cell* cell = group->cells + (m_idx & 3);
331 m_hash = cell->hash.load(Relaxed);
332
333 if (m_hash != KeyTraits::NullHash) {
334 // Cell has been reserved.
335 m_value = cell->value.load(Relaxed);
336 if (m_value != Value(ValueTraits::NullValue))
337 return; // Yield this cell.
338 }
339 }
340 // That's the end of the map.
341 m_hash = KeyTraits::NullHash;
342 m_value = Value(ValueTraits::NullValue);
343 }
344
345 bool isValid() const
346 {
347#ifdef SANITY_CHECK
348 KIS_SAFE_ASSERT_RECOVER_RETURN_VALUE(m_value != Value(ValueTraits::Redirect), false);
349#endif
350 return m_value != Value(ValueTraits::NullValue);
351 }
352
353 Key getKey() const
354 {
355 // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
356 return KeyTraits::dehash(m_hash);
357 }
358
360 {
361 return m_value;
362 }
363 };
364};
365
366#endif // CONCURRENTMAP_LEAPFROG_H
float value(const T *src, size_t ch)
@ Release
Definition atomic.h:60
@ ConsumeRelease
Definition atomic.h:61
@ Relaxed
Definition atomic.h:57
@ Consume
Definition atomic.h:58
bool compareExchangeStrong(T &expected, T desired, MemoryOrder memoryOrder)
Definition atomic.h:110
T load(MemoryOrder memoryOrder) const
Definition atomic.h:89
Details::Table * m_table
void setMap(ConcurrentMap &map)
Iterator(ConcurrentMap &map)
ConcurrentMap & m_map
Details::Table * m_table
Details::Cell * m_cell
Value exchangeValue(Value desired)
Mutator(ConcurrentMap &map, Key key, bool)
void assignValue(Value desired)
Mutator(ConcurrentMap &map, Key key)
Atomic< typename Details::Table * > m_root
Value erase(Key key)
bool migrationInProcess()
Mutator find(Key key)
Value exchange(Key key, Value desired)
Value assign(Key key, Value desired)
void publishTableMigration(typename Details::TableMigration *migration)
Mutator insertOrFind(Key key)
ConcurrentMap(quint64 capacity=Details::InitialSize)
Value get(Key key)
Leapfrog< ConcurrentMap > Details
Definition qsbr.h:19
void flush()
Definition qsbr.h:99
#define KIS_SAFE_ASSERT_RECOVER_RETURN_VALUE(cond, val)
Definition kis_assert.h:129
Atomic< Hash > hash
Definition leapfrog.h:36
Atomic< Value > value
Definition leapfrog.h:37
CellGroup * getCellGroups() const
Definition leapfrog.h:89
void destroy()
Definition leapfrog.h:83
SimpleJobCoordinator jobCoordinator
Definition leapfrog.h:54
static Cell * find(Hash hash, Table *table)
Definition leapfrog.h:157
@ InsertResult_InsertedNew
Definition leapfrog.h:194
@ InsertResult_AlreadyFound
Definition leapfrog.h:194
@ InsertResult_Overflow
Definition leapfrog.h:194
static const quint64 InitialSize
Definition leapfrog.h:27
static void beginTableMigration(Map &map, Table *table, quint64 overflowIdx)
Definition leapfrog.h:332
static InsertResult insertOrFind(Hash hash, Table *table, Cell *&cell, quint64 &overflowIdx)
Definition leapfrog.h:195