Krita Source Code Documentation
Loading...
Searching...
No Matches
Leapfrog< Map >::TableMigration Class Reference

#include <leapfrog.h>

+ Inheritance diagram for Leapfrog< Map >::TableMigration:

Classes

struct  Source
 

Public Member Functions

void destroy ()
 
SourcegetSources () const
 
bool migrateRange (Table *srcTable, quint64 startIdx)
 
virtual void run () override
 
 TableMigration (Map &map)
 
virtual ~TableMigration () override
 
- Public Member Functions inherited from SimpleJobCoordinator::Job
virtual ~Job ()
 

Static Public Member Functions

static TableMigrationcreate (Map &map, quint64 numSources)
 

Public Attributes

Tablem_destination {nullptr}
 
Map & m_map
 
quint64 m_numSources {0}
 
Atomic< bool > m_overflowed
 
Atomic< qint64 > m_unitsRemaining
 
Atomic< quint64 > m_workerStatus
 

Detailed Description

template<class Map>
class Leapfrog< Map >::TableMigration

Definition at line 100 of file leapfrog.h.

Constructor & Destructor Documentation

◆ TableMigration()

template<class Map >
Leapfrog< Map >::TableMigration::TableMigration ( Map & map)
inline

Definition at line 115 of file leapfrog.h.

115 : m_map(map)
116 {
117 }

◆ ~TableMigration()

template<class Map >
virtual Leapfrog< Map >::TableMigration::~TableMigration ( )
inlineoverridevirtual

Definition at line 133 of file leapfrog.h.

134 {
135 }

Member Function Documentation

◆ create()

template<class Map >
static TableMigration * Leapfrog< Map >::TableMigration::create ( Map & map,
quint64 numSources )
inlinestatic

Definition at line 119 of file leapfrog.h.

120 {
121 TableMigration* migration =
122 (TableMigration*) std::malloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
123 new (migration) TableMigration(map);
124
125 migration->m_workerStatus.storeNonatomic(0);
126 migration->m_overflowed.storeNonatomic(false);
127 migration->m_unitsRemaining.storeNonatomic(0);
128 migration->m_numSources = numSources;
129 // Caller is responsible for filling in sources & destination
130 return migration;
131 }

References Leapfrog< Map >::TableMigration::m_numSources, Leapfrog< Map >::TableMigration::m_overflowed, Leapfrog< Map >::TableMigration::m_unitsRemaining, Leapfrog< Map >::TableMigration::m_workerStatus, Atomic< T >::storeNonatomic(), and Leapfrog< Map >::TableMigration::TableMigration().

◆ destroy()

template<class Map >
void Leapfrog< Map >::TableMigration::destroy ( )
inline

Definition at line 137 of file leapfrog.h.

138 {
139 // Destroy all source tables.
140 for (quint64 i = 0; i < m_numSources; i++)
141 if (getSources()[i].table)
142 getSources()[i].table->destroy();
143 // Delete the migration object itself.
145 std::free(this);
146 }
Source * getSources() const
Definition leapfrog.h:148
virtual ~TableMigration() override
Definition leapfrog.h:133
void destroy()
Definition leapfrog.h:83

References Leapfrog< Map >::Table::destroy(), Leapfrog< Map >::TableMigration::getSources(), Leapfrog< Map >::TableMigration::m_numSources, Leapfrog< Map >::TableMigration::Source::table, and Leapfrog< Map >::TableMigration::~TableMigration().

◆ getSources()

template<class Map >
Source * Leapfrog< Map >::TableMigration::getSources ( ) const
inline

Definition at line 148 of file leapfrog.h.

149 {
150 return (Source*)(this + 1);
151 }

◆ migrateRange()

template<class Map >
bool Leapfrog< Map >::TableMigration::migrateRange ( Table * srcTable,
quint64 startIdx )

Definition at line 358 of file leapfrog.h.

359{
360 quint64 srcSizeMask = srcTable->sizeMask;
361 quint64 endIdx = qMin(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
362 // Iterate over source range.
363 for (quint64 srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
364 CellGroup* srcGroup = srcTable->getCellGroups() + ((srcIdx & srcSizeMask) >> 2);
365 Cell* srcCell = srcGroup->cells + (srcIdx & 3);
366 Hash srcHash;
367 Value srcValue;
368 // Fetch the srcHash and srcValue.
369 for (;;) {
370 srcHash = srcCell->hash.load(Relaxed);
371 if (srcHash == KeyTraits::NullHash) {
372 // An unused cell. Try to put a Redirect marker in its value.
373 srcValue =
374 srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), Relaxed);
375 if (srcValue == Value(ValueTraits::Redirect)) {
376 // srcValue is already marked Redirect due to previous incomplete migration.
377 break;
378 }
379 if (srcValue == Value(ValueTraits::NullValue)) {
380 break; // Redirect has been placed. Break inner loop, continue outer loop.
381 }
382 // Otherwise, somebody just claimed the cell. Read srcHash again...
383 } else {
384 // Check for deleted/uninitialized value.
385 srcValue = srcCell->value.load(Relaxed);
386 if (srcValue == Value(ValueTraits::NullValue)) {
387 // Try to put a Redirect marker.
388 if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), Relaxed)) {
389 break; // Redirect has been placed. Break inner loop, continue outer loop.
390 }
391
392 if (srcValue == Value(ValueTraits::Redirect)) {
393 // FIXME: I don't think this will happen. Investigate & change to assert
394 break;
395 }
396 } else if (srcValue == Value(ValueTraits::Redirect)) {
397 // srcValue is already marked Redirect due to previous incomplete migration.
398 break;
399 }
400
401 // We've got a key/value pair to migrate.
402 // Reserve a destination cell in the destination.
403#ifdef SANITY_CHECK
404 KIS_ASSERT_RECOVER_NOOP(srcHash != KeyTraits::NullHash);
405 KIS_ASSERT_RECOVER_NOOP(srcValue != Value(ValueTraits::NullValue));
406 KIS_ASSERT_RECOVER_NOOP(srcValue != Value(ValueTraits::Redirect));
407#endif // SANITY_CHECK
408 Cell* dstCell;
409 quint64 overflowIdx;
410 InsertResult result = insertOrFind(srcHash, m_destination, dstCell, overflowIdx);
411 // During migration, a hash can only exist in one place among all the source tables,
412 // and it is only migrated by one thread. Therefore, the hash will never already exist
413 // in the destination table:
414#ifdef SANITY_CHECK
416#endif // SANITY_CHECK
417 if (result == InsertResult_Overflow) {
418 // Destination overflow.
419 // This can happen for several reasons. For example, the source table could have
420 // existed of all deleted cells when it overflowed, resulting in a small destination
421 // table size, but then another thread could re-insert all the same hashes
422 // before the migration completed.
423 // Caller will cancel the current migration and begin a new one.
424 return false;
425 }
426 // Migrate the old value to the new cell.
427 for (;;) {
428 // Copy srcValue to the destination.
429 dstCell->value.store(srcValue, Relaxed);
430 // Try to place a Redirect marker in srcValue.
431 Value doubleCheckedSrcValue = srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), Relaxed);
432#ifdef SANITY_CHECK
433 KIS_ASSERT_RECOVER_NOOP(doubleCheckedSrcValue != Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
434#endif // SANITY_CHECK
435 if (doubleCheckedSrcValue == srcValue) {
436 // No racing writes to the src. We've successfully placed the Redirect marker.
437 // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
438 // by a late-arriving erase.
439 if (srcValue == Value(ValueTraits::NullValue)) {
440 // racing update was erase", uptr(srcTable), srcIdx)
441 }
442
443 break;
444 }
445 // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
446 srcValue = doubleCheckedSrcValue;
447 }
448 // Cell successfully migrated. Proceed to next source cell.
449 break;
450 }
451 }
452 }
453 // Range has been migrated successfully.
454 return true;
455}
@ Relaxed
Definition atomic.h:57
#define KIS_ASSERT_RECOVER_NOOP(cond)
Definition kis_assert.h:97
Map::Hash Hash
Definition leapfrog.h:22
@ InsertResult_AlreadyFound
Definition leapfrog.h:194
@ InsertResult_Overflow
Definition leapfrog.h:194
Map::Value Value
Definition leapfrog.h:23
static InsertResult insertOrFind(Hash hash, Table *table, Cell *&cell, quint64 &overflowIdx)
Definition leapfrog.h:195
static const quint64 TableMigrationUnitSize
Definition leapfrog.h:28
union Value::@1 value

References Leapfrog< Map >::CellGroup::cells, Atomic< T >::compareExchange(), Atomic< T >::compareExchangeStrong(), Leapfrog< Map >::Table::getCellGroups(), Leapfrog< Map >::Cell::hash, Leapfrog< Map >::insertOrFind(), Leapfrog< Map >::InsertResult_AlreadyFound, Leapfrog< Map >::InsertResult_Overflow, KIS_ASSERT_RECOVER_NOOP, Atomic< T >::load(), Leapfrog< Map >::TableMigration::m_destination, Relaxed, Leapfrog< Map >::Table::sizeMask, Atomic< T >::store(), Leapfrog< Map >::TableMigrationUnitSize, and Leapfrog< Map >::Cell::value.

◆ run()

template<class Map >
void Leapfrog< Map >::TableMigration::run ( )
overridevirtual

Implements SimpleJobCoordinator::Job.

Definition at line 458 of file leapfrog.h.

459{
460#ifdef SANITY_CHECK
461 KIS_ASSERT_RECOVER_NOOP(m_map.getGC().sanityRawPointerAccessLocked());
462#endif // SANITY_CHECK
463
464
465 // Conditionally increment the shared # of workers.
466 quint64 probeStatus = m_workerStatus.load(Relaxed);
467 do {
468 if (probeStatus & 1) {
469 // End flag is already set, so do nothing.
470 return;
471 }
472 } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, Relaxed, Relaxed));
473 // # of workers has been incremented, and the end flag is clear.
474#ifdef SANITY_CHECK
475 KIS_ASSERT_RECOVER_NOOP((probeStatus & 1) == 0);
476#endif // SANITY_CHECK
477
478 // Iterate over all source tables.
479 for (quint64 s = 0; s < m_numSources; s++) {
480 Source& source = getSources()[s];
481 // Loop over all migration units in this source table.
482 for (;;) {
483 if (m_workerStatus.load(Relaxed) & 1) {
484 goto endMigration;
485 }
486 quint64 startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, Relaxed);
487 if (startIdx >= source.table->sizeMask + 1)
488 break; // No more migration units in this table. Try next source table.
489 bool overflowed = !migrateRange(source.table, startIdx);
490 if (overflowed) {
491 // *** FAILED MIGRATION ***
492 // TableMigration failed due to destination table overflow.
493 // No other thread can declare the migration successful at this point, because *this* unit will never complete,
494 // hence m_unitsRemaining won't reach zero.
495 // However, multiple threads can independently detect a failed migration at the same time.
496 // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
497 // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
498 // the thread
499 // that deals with it.
500 bool oldOverflowed = m_overflowed.exchange(overflowed, Relaxed);
501 if (oldOverflowed) {
502 // race to set m_overflowed
503 }
504
506 goto endMigration;
507 }
508
509 qint64 prevRemaining = m_unitsRemaining.fetchSub(1, Relaxed);
510#ifdef SANITY_CHECK
511 KIS_ASSERT_RECOVER_NOOP(prevRemaining > 0);
512#endif // SANITY_CHECK
513 if (prevRemaining == 1) {
514 // *** SUCCESSFUL MIGRATION ***
515 // That was the last chunk to migrate.
517 goto endMigration;
518 }
519 }
520 }
521
522endMigration:
523 // Decrement the shared # of workers.
524 probeStatus = m_workerStatus.fetchSub(2, AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
525 if (probeStatus >= 4) {
526 // There are other workers remaining. Return here so that only the very last worker will proceed.
527 return;
528 }
529
530 // We're the very last worker thread.
531 // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
532#ifdef SANITY_CHECK
533 KIS_ASSERT_RECOVER_NOOP(probeStatus == 3);
534#endif // SANITY_CHECK
535 bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
536 if (!overflowed) {
537 // The migration succeeded. This is the most likely outcome. Publish the new subtree.
538 m_map.publishTableMigration(this);
539 // End the jobCoodinator.
541 } else {
542 // The migration failed due to the overflow of the destination table.
543 Table* origTable = getSources()[0].table;
544 QMutexLocker guard(&origTable->mutex);
545 SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
546
547 if (checkedJob != this) {
548 // a new TableMigration was already started
549 } else {
551 // Double the destination table size.
552 migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
553 // Transfer source tables to the new migration.
554 for (quint64 i = 0; i < m_numSources; i++) {
555 migration->getSources()[i].table = getSources()[i].table;
556 getSources()[i].table = NULL;
557 migration->getSources()[i].sourceIndex.storeNonatomic(0);
558 }
559
560 migration->getSources()[m_numSources].table = m_destination;
561 migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
562 // Calculate total number of migration units to move.
563 quint64 unitsRemaining = 0;
564 for (quint64 s = 0; s < migration->m_numSources; s++) {
565 unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
566 }
567
568 migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
569 // Publish the new migration.
570 origTable->jobCoordinator.storeRelease(migration);
571 }
572 }
573
574 // We're done with this TableMigration. Queue it for GC.
575 m_map.getGC().enqueue(&TableMigration::destroy, this, true);
576}
KisMagneticGraph::vertex_descriptor source(typename KisMagneticGraph::edge_descriptor e, KisMagneticGraph g)
@ AcquireRelease
Definition atomic.h:62
T exchange(T desired, MemoryOrder memoryOrder)
Definition atomic.h:120
bool compareExchangeWeak(T &expected, T desired, MemoryOrder success, MemoryOrder failure)
Definition atomic.h:115
T loadNonatomic() const
Definition atomic.h:84
T fetchSub(T operand, MemoryOrder memoryOrder)
Definition atomic.h:130
T fetchOr(T operand, MemoryOrder memoryOrder)
Definition atomic.h:140
T load(MemoryOrder memoryOrder) const
Definition atomic.h:89
Atomic< qint64 > m_unitsRemaining
Definition leapfrog.h:112
bool migrateRange(Table *srcTable, quint64 startIdx)
Definition leapfrog.h:358
static TableMigration * create(Map &map, quint64 numSources)
Definition leapfrog.h:119
Atomic< bool > m_overflowed
Definition leapfrog.h:111
Atomic< quint64 > m_workerStatus
Definition leapfrog.h:110
quint64 getNumMigrationUnits() const
Definition leapfrog.h:94
const quint64 sizeMask
Definition leapfrog.h:52
static Table * create(quint64 tableSize)
Definition leapfrog.h:60
SimpleJobCoordinator jobCoordinator
Definition leapfrog.h:54

References AcquireRelease, Leapfrog< Map >::TableMigration::create(), Leapfrog< Map >::Table::create(), Leapfrog< Map >::TableMigration::destroy(), Leapfrog< Map >::Table::getNumMigrationUnits(), Leapfrog< Map >::TableMigration::getSources(), Leapfrog< Map >::Table::jobCoordinator, KIS_ASSERT_RECOVER_NOOP, SimpleJobCoordinator::loadConsume(), Leapfrog< Map >::TableMigration::m_destination, Leapfrog< Map >::TableMigration::m_numSources, Leapfrog< Map >::TableMigration::m_unitsRemaining, Leapfrog< Map >::Table::mutex, Relaxed, source(), Leapfrog< Map >::TableMigration::Source::sourceIndex, Atomic< T >::storeNonatomic(), SimpleJobCoordinator::storeRelease(), Leapfrog< Map >::TableMigration::Source::table, and Leapfrog< Map >::TableMigrationUnitSize.

Member Data Documentation

◆ m_destination

template<class Map >
Table* Leapfrog< Map >::TableMigration::m_destination {nullptr}

Definition at line 109 of file leapfrog.h.

109{nullptr};

◆ m_map

template<class Map >
Map& Leapfrog< Map >::TableMigration::m_map

Definition at line 108 of file leapfrog.h.

◆ m_numSources

template<class Map >
quint64 Leapfrog< Map >::TableMigration::m_numSources {0}

Definition at line 113 of file leapfrog.h.

113{0};

◆ m_overflowed

template<class Map >
Atomic<bool> Leapfrog< Map >::TableMigration::m_overflowed

Definition at line 111 of file leapfrog.h.

◆ m_unitsRemaining

template<class Map >
Atomic<qint64> Leapfrog< Map >::TableMigration::m_unitsRemaining

Definition at line 112 of file leapfrog.h.

◆ m_workerStatus

template<class Map >
Atomic<quint64> Leapfrog< Map >::TableMigration::m_workerStatus

Definition at line 110 of file leapfrog.h.


The documentation for this class was generated from the following file: