Skip to content

Add a stop-the-world, serial Compressor #1340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/scripts/ci-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ find ./src ./tests -type f -name "mock_test_*" | while read -r file; do

# Run the test with each plan it needs.
for MMTK_PLAN in $PLANS; do
env MMTK_PLAN=$MMTK_PLAN cargo test --features mock_test,"$FEATURES" -- $t;
env MMTK_PLAN=$MMTK_PLAN cargo test --features mock_test,mock_test_side_metadata,"$FEATURES" -- $t;
# We should alro run the tests with mock_test_header_metadata feature -- be careful that some plans like compressor requires side mark bits which will fail if we use header metadata (including mark bits).
# env MMTK_PLAN=$MMTK_PLAN cargo test --features mock_test,mock_test_header_metadata,"$FEATURES" -- $t;
done
done

Expand Down
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ perf_counter = ["dep:pfm"]
# This feature is only used for tests with MockVM.
# CI scripts run those tests with this feature.
mock_test = ["test_private"]
mock_test_header_metadata = []
mock_test_side_metadata = []

# This feature will expose some private functions for testings or benchmarking.
test_private = []
Expand Down Expand Up @@ -197,6 +199,9 @@ extreme_assertions = []
# Enable multiple spaces for NoGC, each allocator maps to an individual ImmortalSpace.
nogc_multi_space = []

# Disable multiple spaces for Compressor.
compressor_single_space = []

# To collect statistics for each GC work packet. Enabling this may introduce a small overhead (several percentage slowdown on benchmark time).
work_packet_stats = []

Expand Down
114 changes: 114 additions & 0 deletions src/plan/compressor/gc_work.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use super::global::Compressor;
use crate::policy::compressor::CompressorSpace;
use crate::policy::compressor::{TRACE_KIND_FORWARD, TRACE_KIND_MARK};
use crate::scheduler::gc_work::PlanProcessEdges;
use crate::scheduler::gc_work::*;
use crate::scheduler::GCWork;
use crate::scheduler::GCWorker;
use crate::scheduler::WorkBucketStage;
use crate::vm::ActivePlan;
use crate::vm::Scanning;
use crate::vm::VMBinding;
use crate::MMTK;
use std::marker::PhantomData;

/// Iterate through the heap and calculate the new location of live objects.
pub struct CalculateForwardingAddress<VM: VMBinding> {
compressor_space: &'static CompressorSpace<VM>,
}

impl<VM: VMBinding> GCWork<VM> for CalculateForwardingAddress<VM> {
fn do_work(&mut self, _worker: &mut GCWorker<VM>, _mmtk: &'static MMTK<VM>) {
self.compressor_space.calculate_offset_vector();
}
}

impl<VM: VMBinding> CalculateForwardingAddress<VM> {
pub fn new(compressor_space: &'static CompressorSpace<VM>) -> Self {
Self { compressor_space }
}
}

/// Create another round of root scanning work packets
/// to update object references.
pub struct UpdateReferences<VM: VMBinding> {
plan: *const Compressor<VM>,
p: PhantomData<VM>,
}

unsafe impl<VM: VMBinding> Send for UpdateReferences<VM> {}

impl<VM: VMBinding> GCWork<VM> for UpdateReferences<VM> {
fn do_work(&mut self, worker: &mut GCWorker<VM>, mmtk: &'static MMTK<VM>) {
// The following needs to be done right before the second round of root scanning
VM::VMScanning::prepare_for_roots_re_scanning();
mmtk.state.prepare_for_stack_scanning();
// Prepare common and base spaces for the 2nd round of transitive closure
let plan_mut = unsafe { &mut *(self.plan as *mut Compressor<VM>) };
plan_mut.common.release(worker.tls, true);
plan_mut.common.prepare(worker.tls, true);
#[cfg(feature = "extreme_assertions")]
mmtk.slot_logger.reset();

// We do two passes of transitive closures. We clear the live bytes from the first pass.
mmtk.scheduler
.worker_group
.get_and_clear_worker_live_bytes();

for mutator in VM::VMActivePlan::mutators() {
mmtk.scheduler.work_buckets[WorkBucketStage::SecondRoots].add(ScanMutatorRoots::<
CompressorForwardingWorkContext<VM>,
>(mutator));
}

mmtk.scheduler.work_buckets[WorkBucketStage::SecondRoots]
.add(ScanVMSpecificRoots::<CompressorForwardingWorkContext<VM>>::new());
}
}

impl<VM: VMBinding> UpdateReferences<VM> {
pub fn new(plan: &Compressor<VM>) -> Self {
Self {
plan,
p: PhantomData,
}
}
}

/// Compact live objects based on the previously-calculated forwarding pointers.
pub struct Compact<VM: VMBinding> {
compressor_space: &'static CompressorSpace<VM>,
}

impl<VM: VMBinding> GCWork<VM> for Compact<VM> {
fn do_work(&mut self, worker: &mut GCWorker<VM>, _mmtk: &'static MMTK<VM>) {
self.compressor_space.compact(worker);
}
}

impl<VM: VMBinding> Compact<VM> {
pub fn new(compressor_space: &'static CompressorSpace<VM>) -> Self {
Self { compressor_space }
}
}

/// Marking trace
pub type MarkingProcessEdges<VM> = PlanProcessEdges<VM, Compressor<VM>, TRACE_KIND_MARK>;
/// Forwarding trace
pub type ForwardingProcessEdges<VM> = PlanProcessEdges<VM, Compressor<VM>, TRACE_KIND_FORWARD>;

pub struct CompressorWorkContext<VM: VMBinding>(std::marker::PhantomData<VM>);
impl<VM: VMBinding> crate::scheduler::GCWorkContext for CompressorWorkContext<VM> {
type VM = VM;
type PlanType = Compressor<VM>;
type DefaultProcessEdges = MarkingProcessEdges<VM>;
type PinningProcessEdges = UnsupportedProcessEdges<VM>;
}

pub struct CompressorForwardingWorkContext<VM: VMBinding>(std::marker::PhantomData<VM>);
impl<VM: VMBinding> crate::scheduler::GCWorkContext for CompressorForwardingWorkContext<VM> {
type VM = VM;
type PlanType = Compressor<VM>;
type DefaultProcessEdges = ForwardingProcessEdges<VM>;
type PinningProcessEdges = UnsupportedProcessEdges<VM>;
}
198 changes: 198 additions & 0 deletions src/plan/compressor/global.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
use super::gc_work::CompressorWorkContext;
use super::gc_work::{
CalculateForwardingAddress, Compact, ForwardingProcessEdges, MarkingProcessEdges,
UpdateReferences,
};
use crate::plan::compressor::mutator::ALLOCATOR_MAPPING;
use crate::plan::global::CreateGeneralPlanArgs;
use crate::plan::global::CreateSpecificPlanArgs;
use crate::plan::global::{BasePlan, CommonPlan};
use crate::plan::AllocationSemantics;
use crate::plan::Plan;
use crate::plan::PlanConstraints;
use crate::policy::compressor::CompressorSpace;
use crate::policy::space::Space;
use crate::scheduler::gc_work::*;
use crate::scheduler::GCWorkScheduler;
use crate::scheduler::WorkBucketStage;
use crate::util::alloc::allocators::AllocatorSelector;
use crate::util::heap::gc_trigger::SpaceStats;
#[allow(unused_imports)]
use crate::util::heap::VMRequest;
use crate::util::metadata::side_metadata::SideMetadataContext;
use crate::util::opaque_pointer::*;
use crate::vm::VMBinding;
use enum_map::EnumMap;
use mmtk_macros::{HasSpaces, PlanTraceObject};

/// Compressor implements a stop-the-world and serial implementation of
/// the Compressor, as described in Kermany and Petrank
/// "The Compressor: concurrent, incremental, and parallel compaction"
/// https://dl.acm.org/doi/10.1145/1133255.1134023
#[derive(HasSpaces, PlanTraceObject)]
pub struct Compressor<VM: VMBinding> {
#[parent]
pub common: CommonPlan<VM>,
#[space]
pub compressor_space: CompressorSpace<VM>,
}

/// The plan constraints for the Compressor plan.
pub const COMPRESSOR_CONSTRAINTS: PlanConstraints = PlanConstraints {
moves_objects: true,
needs_forward_after_liveness: true,
..PlanConstraints::default()
};

impl<VM: VMBinding> Plan for Compressor<VM> {
fn constraints(&self) -> &'static PlanConstraints {
&COMPRESSOR_CONSTRAINTS
}

fn collection_required(&self, space_full: bool, _space: Option<SpaceStats<Self::VM>>) -> bool {
self.base().collection_required(self, space_full)
}

fn common(&self) -> &CommonPlan<VM> {
&self.common
}

fn base(&self) -> &BasePlan<VM> {
&self.common.base
}

fn base_mut(&mut self) -> &mut BasePlan<Self::VM> {
&mut self.common.base
}

fn prepare(&mut self, tls: VMWorkerThread) {
self.common.prepare(tls, true);
self.compressor_space.prepare();
}

fn release(&mut self, tls: VMWorkerThread) {
self.common.release(tls, true);
self.compressor_space.release();
}

fn end_of_gc(&mut self, tls: VMWorkerThread) {
self.common.end_of_gc(tls);
}

fn get_allocator_mapping(&self) -> &'static EnumMap<AllocationSemantics, AllocatorSelector> {
&ALLOCATOR_MAPPING
}

fn schedule_collection(&'static self, scheduler: &GCWorkScheduler<VM>) {
// TODO use schedule_common once it can work with the Compressor
// The main issue there is that we need to ForwardingProcessEdges
// in FinalizableForwarding.

// Stop & scan mutators (mutator scanning can happen before STW)
scheduler.work_buckets[WorkBucketStage::Unconstrained]
.add(StopMutators::<CompressorWorkContext<VM>>::new());

// Prepare global/collectors/mutators
scheduler.work_buckets[WorkBucketStage::Prepare]
.add(Prepare::<CompressorWorkContext<VM>>::new(self));

scheduler.work_buckets[WorkBucketStage::CalculateForwarding].add(
CalculateForwardingAddress::<VM>::new(&self.compressor_space),
);
// do another trace to update references
scheduler.work_buckets[WorkBucketStage::SecondRoots].add(UpdateReferences::<VM>::new(self));
scheduler.work_buckets[WorkBucketStage::Compact]
.add(Compact::<VM>::new(&self.compressor_space));

// Release global/collectors/mutators
scheduler.work_buckets[WorkBucketStage::Release]
.add(Release::<CompressorWorkContext<VM>>::new(self));

// Reference processing
if !*self.base().options.no_reference_types {
use crate::util::reference_processor::{
PhantomRefProcessing, SoftRefProcessing, WeakRefProcessing,
};
scheduler.work_buckets[WorkBucketStage::SoftRefClosure]
.add(SoftRefProcessing::<MarkingProcessEdges<VM>>::new());
scheduler.work_buckets[WorkBucketStage::WeakRefClosure]
.add(WeakRefProcessing::<VM>::new());
scheduler.work_buckets[WorkBucketStage::PhantomRefClosure]
.add(PhantomRefProcessing::<VM>::new());

use crate::util::reference_processor::RefForwarding;
scheduler.work_buckets[WorkBucketStage::RefForwarding]
.add(RefForwarding::<ForwardingProcessEdges<VM>>::new());

use crate::util::reference_processor::RefEnqueue;
scheduler.work_buckets[WorkBucketStage::Release].add(RefEnqueue::<VM>::new());
}

// Finalization
if !*self.base().options.no_finalizer {
use crate::util::finalizable_processor::{Finalization, ForwardFinalization};
// finalization
// treat finalizable objects as roots and perform a closure (marking)
// must be done before calculating forwarding pointers
scheduler.work_buckets[WorkBucketStage::FinalRefClosure]
.add(Finalization::<MarkingProcessEdges<VM>>::new());
// update finalizable object references
// must be done before compacting
scheduler.work_buckets[WorkBucketStage::FinalizableForwarding]
.add(ForwardFinalization::<ForwardingProcessEdges<VM>>::new());
}

// VM-specific weak ref processing
scheduler.work_buckets[WorkBucketStage::VMRefClosure]
.set_sentinel(Box::new(VMProcessWeakRefs::<MarkingProcessEdges<VM>>::new()));

// VM-specific weak ref forwarding
scheduler.work_buckets[WorkBucketStage::VMRefForwarding]
.add(VMForwardWeakRefs::<ForwardingProcessEdges<VM>>::new());

// VM-specific work after forwarding, possible to implement ref enququing.
scheduler.work_buckets[WorkBucketStage::Release].add(VMPostForwarding::<VM>::default());

// Analysis GC work
#[cfg(feature = "analysis")]
{
use crate::util::analysis::GcHookWork;
scheduler.work_buckets[WorkBucketStage::Unconstrained].add(GcHookWork);
}
#[cfg(feature = "sanity")]
scheduler.work_buckets[WorkBucketStage::Final]
.add(crate::util::sanity::sanity_checker::ScheduleSanityGC::<Self>::new(self));
}

fn current_gc_may_move_object(&self) -> bool {
true
}

fn get_used_pages(&self) -> usize {
self.compressor_space.reserved_pages() + self.common.get_used_pages()
}
}

impl<VM: VMBinding> Compressor<VM> {
pub fn new(args: CreateGeneralPlanArgs<VM>) -> Self {
let mut plan_args = CreateSpecificPlanArgs {
global_args: args,
constraints: &COMPRESSOR_CONSTRAINTS,
global_side_metadata_specs: SideMetadataContext::new_global_specs(&[]),
};

let res = Compressor {
compressor_space: CompressorSpace::new(plan_args.get_space_args(
"compressor_space",
true,
false,
VMRequest::discontiguous(),
)),
common: CommonPlan::new(plan_args),
};

res.verify_side_metadata_sanity();

res
}
}
5 changes: 5 additions & 0 deletions src/plan/compressor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub(super) mod gc_work;
pub(super) mod global;
pub(super) mod mutator;

pub use self::global::Compressor;
Loading
Loading