Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions azure-pipelines-full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
pool:
vmImage: windows-2022
displayName: 'C# (Windows)'
timeoutInMinutes: 150
timeoutInMinutes: 150

strategy:
maxParallel: 2
Expand Down Expand Up @@ -62,7 +62,7 @@ jobs:
command: test
projects: '**/*.test.csproj'
arguments: '--configuration $(buildConfiguration) -l "console;verbosity=detailed"'

- task: PublishTestResults@2
displayName: 'Publish Test Results'
inputs:
Expand All @@ -74,6 +74,7 @@ jobs:
pool:
vmImage: windows-2022
displayName: 'C++ (Windows)'
timeoutInMinutes: 90

strategy:
maxParallel: 2
Expand All @@ -98,14 +99,15 @@ jobs:
solution: 'cc/build/FASTER.sln'
msbuildArguments: '/m /p:Configuration=$(buildConfiguration) /p:Platform=$(buildPlatform)'

- script: 'ctest -j 1 --interactive-debug-mode 0 --output-on-failure -C $(buildConfiguration) -R "in_memory"'
- script: 'ctest -j 1 --interactive-debug-mode 0 --output-on-failure -C $(buildConfiguration)'
workingDirectory: 'cc/build'
displayName: 'Run Ctest'

- job: 'cppLinux'
pool:
vmImage: ubuntu-22.04
displayName: 'C++ (Linux)'
timeoutInMinutes: 75

steps:
- script: |
Expand All @@ -117,10 +119,10 @@ jobs:
mkdir -p build/Debug build/Release
cd build/Debug
cmake -DCMAKE_BUILD_TYPE=Debug ../..
make -j
make -j2
cd ../../build/Release
cmake -DCMAKE_BUILD_TYPE=Release ../..
make -j
make -j2
displayName: 'Compile'
- script: |
ulimit -s 65536
Expand All @@ -130,7 +132,7 @@ jobs:

- job: 'csharpLinux'
pool:
vmImage: ubuntu-20.04
vmImage: ubuntu-22.04
displayName: 'C# (Linux)'

strategy:
Expand Down Expand Up @@ -175,7 +177,7 @@ jobs:
command: test
projects: '**/*.test.csproj'
arguments: '--configuration $(buildConfiguration) -l "console;verbosity=detailed" --filter "TestCategory=Smoke"'

- task: PublishTestResults@2
displayName: 'Publish Test Results'
inputs:
Expand Down
14 changes: 8 additions & 6 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
pool:
vmImage: windows-2022
displayName: 'C# (Windows)'
timeoutInMinutes: 75
timeoutInMinutes: 75

strategy:
maxParallel: 2
Expand Down Expand Up @@ -62,7 +62,7 @@ jobs:
command: test
projects: '**/*.test.csproj'
arguments: '--configuration $(buildConfiguration) -l "console;verbosity=detailed" --filter:TestCategory=Smoke'

- task: PublishTestResults@2
displayName: 'Publish Test Results'
inputs:
Expand All @@ -74,6 +74,7 @@ jobs:
pool:
vmImage: windows-2022
displayName: 'C++ (Windows)'
timeoutInMinutes: 90

strategy:
maxParallel: 2
Expand All @@ -98,14 +99,15 @@ jobs:
solution: 'cc/build/FASTER.sln'
msbuildArguments: '/m /p:Configuration=$(buildConfiguration) /p:Platform=$(buildPlatform)'

- script: 'ctest -j 1 --interactive-debug-mode 0 --output-on-failure -C $(buildConfiguration) -R "in_memory"'
- script: 'ctest -j 1 --interactive-debug-mode 0 --output-on-failure -C $(buildConfiguration)'
workingDirectory: 'cc/build'
displayName: 'Run Ctest'

- job: 'cppLinux'
pool:
vmImage: ubuntu-22.04
displayName: 'C++ (Linux)'
timeoutInMinutes: 75

steps:
- script: |
Expand All @@ -117,10 +119,10 @@ jobs:
mkdir -p build/Debug build/Release
cd build/Debug
cmake -DCMAKE_BUILD_TYPE=Debug ../..
make -j
make -j2
cd ../../build/Release
cmake -DCMAKE_BUILD_TYPE=Release ../..
make -j
make -j2
displayName: 'Compile'
- script: |
ulimit -s 65536
Expand All @@ -130,7 +132,7 @@ jobs:

- job: 'csharpLinux'
pool:
vmImage: ubuntu-20.04
vmImage: ubuntu-22.04
displayName: 'C# (Linux)'

strategy:
Expand Down
56 changes: 32 additions & 24 deletions cc/src/core/checkpoint_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,32 +203,40 @@ class CheckpointState {
}

public:
void IssueIndexPersistenceCallback() {
if (!index_persistence_callback) {
return; // no callback was provided
}
// TODO: status was always Status::Ok on original FASTER code -- normally should check if failed is true
if (callback_context) {
const auto& callback = reinterpret_cast<InternalIndexPersistenceCallback>(index_persistence_callback);
callback(callback_context, Status::Ok);
} else {
const auto& callback = reinterpret_cast<IndexPersistenceCallback>(index_persistence_callback);
callback(Status::Ok);
}
auto GetIndexPersistenceCallback() {
return [cb = this->index_persistence_callback, ctxt = this->callback_context]() {
if (!cb) {
return; // no callback was provided
}

// TODO: Status was always set to Status::Ok on the original FASTER code.
// Ideally, we should check if `failed` member is true
if (ctxt) {
const auto& callback = reinterpret_cast<InternalIndexPersistenceCallback>(cb);
callback(ctxt, Status::Ok);
} else {
const auto& callback = reinterpret_cast<IndexPersistenceCallback>(cb);
callback(Status::Ok);
}
};
}

void IssueHybridLogPersistenceCallback(uint64_t serial_num) {
if (!hybrid_log_persistence_callback) {
return; // no callback was provided
}
// TODO: status was always Status::Ok on original FASTER code -- normally should check if failed is true
if (callback_context) {
const auto& callback = reinterpret_cast<InternalHybridLogPersistenceCallback>(hybrid_log_persistence_callback);
callback(callback_context, Status::Ok, serial_num);
} else {
const auto& callback = reinterpret_cast<HybridLogPersistenceCallback>(hybrid_log_persistence_callback);
callback(Status::Ok, serial_num);
}
auto GetHybridLogPersistenceCallback() {
return [cb = this->hybrid_log_persistence_callback, ctxt = this->callback_context](uint64_t serial_num) {
if (!cb) {
return; // no callback was provided
}

// TODO: Status was always set to Status::Ok on the original FASTER code.
// Ideally, we should check if `failed` member is true
if (ctxt) {
const auto& callback = reinterpret_cast<InternalHybridLogPersistenceCallback>(cb);
callback(ctxt, Status::Ok, serial_num);
} else {
const auto& callback = reinterpret_cast<HybridLogPersistenceCallback>(cb);
callback(Status::Ok, serial_num);
}
};
}

void CheckpointDone() {
Expand Down
12 changes: 7 additions & 5 deletions cc/src/core/faster.h
Original file line number Diff line number Diff line change
Expand Up @@ -2932,7 +2932,7 @@ bool FasterKv<K, V, D, H, OH>::GlobalMoveToNextState(SystemState current_state)
checkpoint_.failed = true;
}
// Notify the host that the index checkpoint has completed.
checkpoint_.IssueIndexPersistenceCallback();
checkpoint_.GetIndexPersistenceCallback()();
break;
case Phase::IN_PROGRESS: {
assert(next_state.action != Action::CheckpointIndex);
Expand Down Expand Up @@ -2991,13 +2991,14 @@ bool FasterKv<K, V, D, H, OH>::GlobalMoveToNextState(SystemState current_state)
if(hash_index_.WriteCheckpointMetadata(checkpoint_) != Status::Ok) {
checkpoint_.failed = true;
}
//auto index_persistence_callback = checkpoint_.index_persistence_callback;
checkpoint_.IssueIndexPersistenceCallback();
auto callback = checkpoint_.GetIndexPersistenceCallback();
// The checkpoint is done; we can reset the contexts now. (Have to reset contexts before
// another checkpoint can be started.)
checkpoint_.CheckpointDone();
// Checkpoint is done--no more work for threads to do.
// Checkpoint is done -- no more work for threads to do.
system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
// Notify the host that the index checkpoint has completed.
callback();
}
break;
default:
Expand Down Expand Up @@ -3206,7 +3207,8 @@ void FasterKv<K, V, D, H, OH>::HandleSpecialPhases() {
// Handle WAIT_FLUSH -> PERSISTENCE_CALLBACK and PERSISTENCE_CALLBACK -> PERSISTENCE_CALLBACK
if(previous_state.phase == Phase::WAIT_FLUSH) {
// Persistence callback
checkpoint_.IssueHybridLogPersistenceCallback(prev_thread_ctx().serial_num);
auto callback = checkpoint_.GetHybridLogPersistenceCallback();
callback(prev_thread_ctx().serial_num);
// Thread has finished checkpointing.
thread_ctx().phase = Phase::REST;
// Thread ack that it has finished checkpointing.
Expand Down
6 changes: 4 additions & 2 deletions cc/src/core/persistent_memory_malloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,10 @@ class PersistentMemoryMalloc {

// Shift head address to tail address
Address desired_head_address{ tail_address.page(), 0 };
if(flushed_until_address.load() < desired_head_address) {
throw std::runtime_error{ "flushed_until_address < tail_address" };
while (flushed_until_address.load() < desired_head_address) {
epoch_->ProtectAndDrain();
disk->TryComplete();
std::this_thread::yield();
}

Address old_head_address;
Expand Down
24 changes: 14 additions & 10 deletions cc/test/cold_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,22 @@ INSTANTIATE_TEST_CASE_P(
ColdIndexTests,
ColdIndexTestParams,
::testing::Values(
// ==================================================================
// NOTE: Some tests are disabled for CI -- enable locally if needed
// ==================================================================

// w/o compaction
std::make_tuple(2048, 192_MiB, 0.4, false),
//std::make_tuple(2048, 192_MiB, 0.4, false),
std::make_tuple(2048, 192_MiB, 0.0, false),
std::make_tuple((1 << 20), 192_MiB, 0.4, false),
//std::make_tuple((1 << 20), 192_MiB, 0.4, false),
std::make_tuple((1 << 20), 192_MiB, 0.0, false),
// \w atomic hash chunk updates
std::make_tuple(2048, 1_GiB, 0.9, false),
std::make_tuple((1 << 20), 1_GiB, 0.9, false),
//std::make_tuple(2048, 1_GiB, 0.9, false),
//std::make_tuple((1 << 20), 1_GiB, 0.9, false),
// \w compaction
std::make_tuple(2048, 192_MiB, 0.4, true),
//std::make_tuple(2048, 192_MiB, 0.4, true),
std::make_tuple(2048, 192_MiB, 0.0, true),
std::make_tuple((1 << 20), 192_MiB, 0.4, true),
//std::make_tuple((1 << 20), 192_MiB, 0.4, true),
std::make_tuple((1 << 20), 192_MiB, 0.0, true),
// \w compaction & atomic hash chunk updates
std::make_tuple(2048, 1_GiB, 0.9, true),
Expand All @@ -62,9 +66,9 @@ INSTANTIATE_TEST_CASE_P(
ColdIndexRecoveryTests,
ColdIndexRecoveryTestParams,
::testing::Values(
std::make_tuple(2048, 192_MiB, 0.4),
//std::make_tuple(2048, 192_MiB, 0.4),
std::make_tuple(2048, 192_MiB, 0.0),
std::make_tuple((1 << 20), 192_MiB, 0.4),
//std::make_tuple((1 << 20), 192_MiB, 0.4),
std::make_tuple((1 << 20), 192_MiB, 0.0)
)
);
Expand Down Expand Up @@ -299,7 +303,7 @@ TEST_P(ColdIndexTestParams, UpsertRead_Serial) {
rc_config.enabled = false;

HlogCompactionConfig compaction_config{
250ms, 0.8, 0.1, 128_MiB, 768_MiB, 4, auto_compaction };
250ms, 0.8, 0.1, 128_MiB, 768_MiB, 2, auto_compaction };

faster_t::IndexConfig index_config{ table_size, 256_MiB, 0.6 };
faster_t store{ index_config, log_mem_size, ROOT_PATH, log_mutable_frac,
Expand Down Expand Up @@ -428,7 +432,7 @@ TEST_P(ColdIndexTestParams, ConcurrentUpsertAndRead) {
faster_t store{ index_config, log_mem_size, ROOT_PATH, log_mutable_frac };

static constexpr size_t kNumRecords = 250'000;
static constexpr size_t kNumThreads = 8;
static constexpr size_t kNumThreads = 2;

static std::atomic<uint64_t> records_read{ 0 };
static std::atomic<uint64_t> records_updated{ 0 };
Expand Down
24 changes: 14 additions & 10 deletions cc/test/compact_lookup_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ static std::string ROOT_PATH{ "test_compact_lookup_store/" };
#endif

// Parameterized test definition for in-memory tests
// ==================================================================
// NOTE: Some tests are disabled for CI -- enable locally if needed
// ==================================================================

// <shift_begin_address, n_threads>
class CompactLookupParameterizedInMemTestFixture : public ::testing::TestWithParam<std::pair<bool, int>> {
};
Expand All @@ -50,9 +54,9 @@ INSTANTIATE_TEST_CASE_P(
CompactLookupParameterizedInMemTestFixture,
::testing::Values(
// Truncate after compaction -- single thread
std::pair<bool, int>(true, 1),
// Truncate after compaction -- 8 threads
std::pair<bool, int>(true, 8))
//std::pair<bool, int>(true, 1),
// Truncate after compaction -- 2 threads
std::pair<bool, int>(true, 2))
);

// Parameterized test definition for on-disk tests
Expand All @@ -64,11 +68,11 @@ INSTANTIATE_TEST_CASE_P(
CompactLookupParameterizedOnDiskTestFixture,
::testing::Values(
// Truncate after compaction, single thread
std::make_tuple(true, false, 1),
// Truncate after compaction, 8 threads
std::make_tuple(true, false, 8),
// Truncate after compaction, 8 threads w/ checkpoint
std::make_tuple(true, true, 8))
//std::make_tuple(true, false, 1),
// Truncate after compaction, 2 threads
std::make_tuple(true, false, 2),
// Truncate after compaction, 2 threads w/ checkpoint
std::make_tuple(true, true, 2))
);


Expand Down Expand Up @@ -1355,7 +1359,7 @@ TEST_P(CompactLookupParameterizedOnDiskTestFixture, OnDiskRmw) {
CreateNewLogDir(ROOT_PATH, log_fp);

// NOTE: deliberately keeping the hash index small to test hash-chain chasing correctness
faster_t store{ 2048, (1 << 20) * 192, log_fp, 0.4 };
faster_t store{ 2048, (1 << 20) * 256, log_fp, 0.4 };
uint32_t num_records = 20000; // ~160 MB of data

bool shift_begin_address = std::get<0>(GetParam());
Expand Down Expand Up @@ -1984,7 +1988,7 @@ TEST_P(CompactLookupParameterizedOnDiskTestFixture, OnDiskVariableLengthKey) {
std::string log_fp;
CreateNewLogDir(ROOT_PATH, log_fp);

faster_t store{ (1 << 20), (1 << 20) * 192, log_fp, 0.4 };
faster_t store{ 2048, (1 << 20) * 192, log_fp, 0.4 };
uint32_t numRecords = 12500; // will occupy ~512 MB space in store

bool shift_begin_address = std::get<0>(GetParam());
Expand Down
Loading