diff --git a/compiler/rustc_hir_typeck/src/closure.rs b/compiler/rustc_hir_typeck/src/closure.rs
index a4776338f6c39..bc1fc3cef0932 100644
--- a/compiler/rustc_hir_typeck/src/closure.rs
+++ b/compiler/rustc_hir_typeck/src/closure.rs
@@ -163,7 +163,15 @@ impl<'a, 'tcx> FnCtxt<'a, 'tcx> {
                 // Resume type defaults to `()` if the coroutine has no argument.
                 let resume_ty = liberated_sig.inputs().get(0).copied().unwrap_or(tcx.types.unit);
 
-                let interior = self.next_ty_var(expr_span);
+                // In the new solver, we can just instantiate this eagerly
+                // with the witness. This will ensure that goals that don't need
+                // to stall on interior types will get processed eagerly.
+                let interior = if self.next_trait_solver() {
+                    Ty::new_coroutine_witness(tcx, expr_def_id.to_def_id(), parent_args)
+                } else {
+                    self.next_ty_var(expr_span)
+                };
+
                 self.deferred_coroutine_interiors.borrow_mut().push((expr_def_id, interior));
 
                 // Coroutines that come from coroutine closures have not yet determined
diff --git a/compiler/rustc_hir_typeck/src/fn_ctxt/_impl.rs b/compiler/rustc_hir_typeck/src/fn_ctxt/_impl.rs
index d75c2853ba080..edb6cad66d052 100644
--- a/compiler/rustc_hir_typeck/src/fn_ctxt/_impl.rs
+++ b/compiler/rustc_hir_typeck/src/fn_ctxt/_impl.rs
@@ -635,34 +635,39 @@ impl<'a, 'tcx> FnCtxt<'a, 'tcx> {
 
         let mut obligations = vec![];
 
-        for &(coroutine_def_id, interior) in coroutines.iter() {
-            debug!(?coroutine_def_id);
+        if !self.next_trait_solver() {
+            for &(coroutine_def_id, interior) in coroutines.iter() {
+                debug!(?coroutine_def_id);
+
+                // Create the `CoroutineWitness` type that we will unify with `interior`.
+                let args = ty::GenericArgs::identity_for_item(
+                    self.tcx,
+                    self.tcx.typeck_root_def_id(coroutine_def_id.to_def_id()),
+                );
+                let witness =
+                    Ty::new_coroutine_witness(self.tcx, coroutine_def_id.to_def_id(), args);
 
-            // Create the `CoroutineWitness` type that we will unify with `interior`.
-            let args = ty::GenericArgs::identity_for_item(
-                self.tcx,
-                self.tcx.typeck_root_def_id(coroutine_def_id.to_def_id()),
-            );
-            let witness = Ty::new_coroutine_witness(self.tcx, coroutine_def_id.to_def_id(), args);
-
-            // Unify `interior` with `witness` and collect all the resulting obligations.
-            let span = self.tcx.hir_body_owned_by(coroutine_def_id).value.span;
-            let ty::Infer(ty::InferTy::TyVar(_)) = interior.kind() else {
-                span_bug!(span, "coroutine interior witness not infer: {:?}", interior.kind())
-            };
-            let ok = self
-                .at(&self.misc(span), self.param_env)
-                // Will never define opaque types, as all we do is instantiate a type variable.
-                .eq(DefineOpaqueTypes::Yes, interior, witness)
-                .expect("Failed to unify coroutine interior type");
-
-            obligations.extend(ok.obligations);
+                // Unify `interior` with `witness` and collect all the resulting obligations.
+                let span = self.tcx.hir_body_owned_by(coroutine_def_id).value.span;
+                let ty::Infer(ty::InferTy::TyVar(_)) = interior.kind() else {
+                    span_bug!(span, "coroutine interior witness not infer: {:?}", interior.kind())
+                };
+                let ok = self
+                    .at(&self.misc(span), self.param_env)
+                    // Will never define opaque types, as all we do is instantiate a type variable.
+                    .eq(DefineOpaqueTypes::Yes, interior, witness)
+                    .expect("Failed to unify coroutine interior type");
+
+                obligations.extend(ok.obligations);
+            }
         }
 
-        // FIXME: Use a real visitor for unstalled obligations in the new solver.
         if !coroutines.is_empty() {
-            obligations
-                .extend(self.fulfillment_cx.borrow_mut().drain_unstalled_obligations(&self.infcx));
+            obligations.extend(
+                self.fulfillment_cx
+                    .borrow_mut()
+                    .drain_stalled_obligations_for_coroutines(&self.infcx),
+            );
         }
 
         self.typeck_results
diff --git a/compiler/rustc_hir_typeck/src/typeck_root_ctxt.rs b/compiler/rustc_hir_typeck/src/typeck_root_ctxt.rs
index 5b4fc51cec885..ac69493d9567e 100644
--- a/compiler/rustc_hir_typeck/src/typeck_root_ctxt.rs
+++ b/compiler/rustc_hir_typeck/src/typeck_root_ctxt.rs
@@ -84,7 +84,7 @@ impl<'tcx> TypeckRootCtxt<'tcx> {
         let hir_owner = tcx.local_def_id_to_hir_id(def_id).owner;
 
         let infcx =
-            tcx.infer_ctxt().ignoring_regions().build(TypingMode::analysis_in_body(tcx, def_id));
+            tcx.infer_ctxt().ignoring_regions().build(TypingMode::typeck_for_body(tcx, def_id));
         let typeck_results = RefCell::new(ty::TypeckResults::new(hir_owner));
 
         TypeckRootCtxt {
diff --git a/compiler/rustc_hir_typeck/src/writeback.rs b/compiler/rustc_hir_typeck/src/writeback.rs
index b63c0b6ab7e09..4c2e879c49979 100644
--- a/compiler/rustc_hir_typeck/src/writeback.rs
+++ b/compiler/rustc_hir_typeck/src/writeback.rs
@@ -8,6 +8,7 @@ use rustc_data_structures::unord::ExtendUnord;
 use rustc_errors::ErrorGuaranteed;
 use rustc_hir::intravisit::{self, InferKind, Visitor};
 use rustc_hir::{self as hir, AmbigArg, HirId};
+use rustc_infer::traits::solve::Goal;
 use rustc_middle::span_bug;
 use rustc_middle::traits::ObligationCause;
 use rustc_middle::ty::adjustment::{Adjust, Adjustment, PointerCoercion};
@@ -731,7 +732,32 @@ impl<'cx, 'tcx> WritebackCx<'cx, 'tcx> {
         T: TypeFoldable<TyCtxt<'tcx>>,
     {
         let value = self.fcx.resolve_vars_if_possible(value);
-        let value = value.fold_with(&mut Resolver::new(self.fcx, span, self.body, true));
+
+        let mut goals = vec![];
+        let value =
+            value.fold_with(&mut Resolver::new(self.fcx, span, self.body, true, &mut goals));
+
+        // Ensure that we resolve goals we get from normalizing coroutine interiors,
+        // but we shouldn't expect those goals to need normalizing (or else we'd get
+        // into a somewhat awkward fixpoint situation, and we don't need it anyways).
+        let mut unexpected_goals = vec![];
+        self.typeck_results.coroutine_stalled_predicates.extend(
+            goals
+                .into_iter()
+                .map(|pred| {
+                    self.fcx.resolve_vars_if_possible(pred).fold_with(&mut Resolver::new(
+                        self.fcx,
+                        span,
+                        self.body,
+                        false,
+                        &mut unexpected_goals,
+                    ))
+                })
+                // FIXME: throwing away the param-env :(
+                .map(|goal| (goal.predicate, self.fcx.misc(span.to_span(self.fcx.tcx)))),
+        );
+        assert_eq!(unexpected_goals, vec![]);
+
         assert!(!value.has_infer());
 
         // We may have introduced e.g. `ty::Error`, if inference failed, make sure
@@ -749,7 +775,12 @@ impl<'cx, 'tcx> WritebackCx<'cx, 'tcx> {
         T: TypeFoldable<TyCtxt<'tcx>>,
     {
         let value = self.fcx.resolve_vars_if_possible(value);
-        let value = value.fold_with(&mut Resolver::new(self.fcx, span, self.body, false));
+
+        let mut goals = vec![];
+        let value =
+            value.fold_with(&mut Resolver::new(self.fcx, span, self.body, false, &mut goals));
+        assert_eq!(goals, vec![]);
+
         assert!(!value.has_infer());
 
         // We may have introduced e.g. `ty::Error`, if inference failed, make sure
@@ -786,6 +817,7 @@ struct Resolver<'cx, 'tcx> {
     /// Whether we should normalize using the new solver, disabled
     /// both when using the old solver and when resolving predicates.
     should_normalize: bool,
+    nested_goals: &'cx mut Vec<Goal<'tcx, ty::Predicate<'tcx>>>,
 }
 
 impl<'cx, 'tcx> Resolver<'cx, 'tcx> {
@@ -794,8 +826,9 @@ impl<'cx, 'tcx> Resolver<'cx, 'tcx> {
         span: &'cx dyn Locatable,
         body: &'tcx hir::Body<'tcx>,
         should_normalize: bool,
+        nested_goals: &'cx mut Vec<Goal<'tcx, ty::Predicate<'tcx>>>,
     ) -> Resolver<'cx, 'tcx> {
-        Resolver { fcx, span, body, should_normalize }
+        Resolver { fcx, span, body, nested_goals, should_normalize }
     }
 
     fn report_error(&self, p: impl Into<ty::GenericArg<'tcx>>) -> ErrorGuaranteed {
@@ -832,12 +865,18 @@ impl<'cx, 'tcx> Resolver<'cx, 'tcx> {
             let cause = ObligationCause::misc(self.span.to_span(tcx), body_id);
             let at = self.fcx.at(&cause, self.fcx.param_env);
             let universes = vec![None; outer_exclusive_binder(value).as_usize()];
-            solve::deeply_normalize_with_skipped_universes(at, value, universes).unwrap_or_else(
-                |errors| {
+            match solve::deeply_normalize_with_skipped_universes_and_ambiguous_goals(
+                at, value, universes,
+            ) {
+                Ok((value, goals)) => {
+                    self.nested_goals.extend(goals);
+                    value
+                }
+                Err(errors) => {
                     let guar = self.fcx.err_ctxt().report_fulfillment_errors(errors);
                     new_err(tcx, guar)
-                },
-            )
+                }
+            }
         } else {
             value
         };
diff --git a/compiler/rustc_infer/src/infer/mod.rs b/compiler/rustc_infer/src/infer/mod.rs
index fa8dea064daaa..2538a34e83b44 100644
--- a/compiler/rustc_infer/src/infer/mod.rs
+++ b/compiler/rustc_infer/src/infer/mod.rs
@@ -966,9 +966,10 @@ impl<'tcx> InferCtxt<'tcx> {
     pub fn can_define_opaque_ty(&self, id: impl Into<DefId>) -> bool {
         debug_assert!(!self.next_trait_solver());
         match self.typing_mode() {
-            TypingMode::Analysis { defining_opaque_types } => {
-                id.into().as_local().is_some_and(|def_id| defining_opaque_types.contains(&def_id))
-            }
+            TypingMode::Analysis { defining_opaque_types_and_generators } => id
+                .into()
+                .as_local()
+                .is_some_and(|def_id| defining_opaque_types_and_generators.contains(&def_id)),
             // FIXME(#132279): This function is quite weird in post-analysis
             // and post-borrowck analysis mode. We may need to modify its uses
             // to support PostBorrowckAnalysis in the old solver as well.
@@ -1260,7 +1261,7 @@ impl<'tcx> InferCtxt<'tcx> {
             // to handle them without proper canonicalization. This means we may cause cycle
             // errors and fail to reveal opaques while inside of bodies. We should rename this
             // function and require explicit comments on all use-sites in the future.
-            ty::TypingMode::Analysis { defining_opaque_types: _ } => {
+            ty::TypingMode::Analysis { defining_opaque_types_and_generators: _ } => {
                 TypingMode::non_body_analysis()
             }
             mode @ (ty::TypingMode::Coherence
diff --git a/compiler/rustc_infer/src/traits/engine.rs b/compiler/rustc_infer/src/traits/engine.rs
index 1eae10673b62b..9e51a53ae95fa 100644
--- a/compiler/rustc_infer/src/traits/engine.rs
+++ b/compiler/rustc_infer/src/traits/engine.rs
@@ -94,7 +94,7 @@ pub trait TraitEngine<'tcx, E: 'tcx>: 'tcx {
     /// Among all pending obligations, collect those are stalled on a inference variable which has
     /// changed since the last call to `select_where_possible`. Those obligations are marked as
     /// successful and returned.
-    fn drain_unstalled_obligations(
+    fn drain_stalled_obligations_for_coroutines(
         &mut self,
         infcx: &InferCtxt<'tcx>,
     ) -> PredicateObligations<'tcx>;
diff --git a/compiler/rustc_middle/src/query/mod.rs b/compiler/rustc_middle/src/query/mod.rs
index d7ed703f4ae30..6b57eac0cab46 100644
--- a/compiler/rustc_middle/src/query/mod.rs
+++ b/compiler/rustc_middle/src/query/mod.rs
@@ -379,6 +379,15 @@ rustc_queries! {
         }
     }
 
+    query stalled_generators_within(
+        key: LocalDefId
+    ) -> &'tcx ty::List<LocalDefId> {
+        desc {
+            |tcx| "computing the opaque types defined by `{}`",
+            tcx.def_path_str(key.to_def_id())
+        }
+    }
+
     /// Returns the explicitly user-written *bounds* on the associated or opaque type given by `DefId`
     /// that must be proven true at definition site (and which can be assumed at usage sites).
     ///
diff --git a/compiler/rustc_middle/src/query/plumbing.rs b/compiler/rustc_middle/src/query/plumbing.rs
index a099f77041709..69b6f88d72bfd 100644
--- a/compiler/rustc_middle/src/query/plumbing.rs
+++ b/compiler/rustc_middle/src/query/plumbing.rs
@@ -366,7 +366,7 @@ macro_rules! define_callbacks {
 
                 pub type Storage<'tcx> = <$($K)* as keys::Key>::Cache<Erase<$V>>;
 
-                // Ensure that keys grow no larger than 80 bytes by accident.
+                // Ensure that keys grow no larger than 88 bytes by accident.
                 // Increase this limit if necessary, but do try to keep the size low if possible
                 #[cfg(target_pointer_width = "64")]
                 const _: () = {
diff --git a/compiler/rustc_middle/src/ty/context.rs b/compiler/rustc_middle/src/ty/context.rs
index 618a65a018644..2ba591d09271f 100644
--- a/compiler/rustc_middle/src/ty/context.rs
+++ b/compiler/rustc_middle/src/ty/context.rs
@@ -106,7 +106,7 @@ impl<'tcx> Interner for TyCtxt<'tcx> {
     ) -> Self::PredefinedOpaques {
         self.mk_predefined_opaques_in_body(data)
     }
-    type DefiningOpaqueTypes = &'tcx ty::List<LocalDefId>;
+    type LocalDefIds = &'tcx ty::List<LocalDefId>;
     type CanonicalVars = CanonicalVarInfos<'tcx>;
     fn mk_canonical_var_infos(self, infos: &[ty::CanonicalVarInfo<Self>]) -> Self::CanonicalVars {
         self.mk_canonical_var_infos(infos)
@@ -663,9 +663,24 @@ impl<'tcx> Interner for TyCtxt<'tcx> {
         self.anonymize_bound_vars(binder)
     }
 
-    fn opaque_types_defined_by(self, defining_anchor: LocalDefId) -> Self::DefiningOpaqueTypes {
+    fn opaque_types_defined_by(self, defining_anchor: LocalDefId) -> Self::LocalDefIds {
         self.opaque_types_defined_by(defining_anchor)
     }
+
+    fn opaque_types_and_generators_defined_by(
+        self,
+        defining_anchor: Self::LocalDefId,
+    ) -> Self::LocalDefIds {
+        if self.next_trait_solver_globally() {
+            self.mk_local_def_ids_from_iter(
+                self.opaque_types_defined_by(defining_anchor)
+                    .iter()
+                    .chain(self.stalled_generators_within(defining_anchor)),
+            )
+        } else {
+            self.opaque_types_defined_by(defining_anchor)
+        }
+    }
 }
 
 macro_rules! bidirectional_lang_item_map {
@@ -2871,11 +2886,11 @@ impl<'tcx> TyCtxt<'tcx> {
         self.interners.intern_clauses(clauses)
     }
 
-    pub fn mk_local_def_ids(self, clauses: &[LocalDefId]) -> &'tcx List<LocalDefId> {
+    pub fn mk_local_def_ids(self, def_ids: &[LocalDefId]) -> &'tcx List<LocalDefId> {
         // FIXME consider asking the input slice to be sorted to avoid
         // re-interning permutations, in which case that would be asserted
         // here.
-        self.intern_local_def_ids(clauses)
+        self.intern_local_def_ids(def_ids)
     }
 
     pub fn mk_local_def_ids_from_iter<I, T>(self, iter: I) -> T::Output
diff --git a/compiler/rustc_next_trait_solver/src/solve/mod.rs b/compiler/rustc_next_trait_solver/src/solve/mod.rs
index 199f0c7512e1b..b38f41ef1d487 100644
--- a/compiler/rustc_next_trait_solver/src/solve/mod.rs
+++ b/compiler/rustc_next_trait_solver/src/solve/mod.rs
@@ -329,7 +329,7 @@ where
             TypingMode::Coherence | TypingMode::PostAnalysis => false,
             // During analysis, opaques are rigid unless they may be defined by
             // the current body.
-            TypingMode::Analysis { defining_opaque_types: non_rigid_opaques }
+            TypingMode::Analysis { defining_opaque_types_and_generators: non_rigid_opaques }
             | TypingMode::PostBorrowckAnalysis { defined_opaque_types: non_rigid_opaques } => {
                 !def_id.as_local().is_some_and(|def_id| non_rigid_opaques.contains(&def_id))
             }
diff --git a/compiler/rustc_next_trait_solver/src/solve/normalizes_to/opaque_types.rs b/compiler/rustc_next_trait_solver/src/solve/normalizes_to/opaque_types.rs
index 817dffa127bc1..2f3922fd89828 100644
--- a/compiler/rustc_next_trait_solver/src/solve/normalizes_to/opaque_types.rs
+++ b/compiler/rustc_next_trait_solver/src/solve/normalizes_to/opaque_types.rs
@@ -33,11 +33,11 @@ where
                 );
                 self.evaluate_added_goals_and_make_canonical_response(Certainty::AMBIGUOUS)
             }
-            TypingMode::Analysis { defining_opaque_types } => {
+            TypingMode::Analysis { defining_opaque_types_and_generators } => {
                 let Some(def_id) = opaque_ty
                     .def_id
                     .as_local()
-                    .filter(|&def_id| defining_opaque_types.contains(&def_id))
+                    .filter(|&def_id| defining_opaque_types_and_generators.contains(&def_id))
                 else {
                     self.structurally_instantiate_normalizes_to_term(goal, goal.predicate.alias);
                     return self.evaluate_added_goals_and_make_canonical_response(Certainty::Yes);
diff --git a/compiler/rustc_next_trait_solver/src/solve/trait_goals.rs b/compiler/rustc_next_trait_solver/src/solve/trait_goals.rs
index b72f776e5cb48..c8171458f6934 100644
--- a/compiler/rustc_next_trait_solver/src/solve/trait_goals.rs
+++ b/compiler/rustc_next_trait_solver/src/solve/trait_goals.rs
@@ -189,6 +189,21 @@ where
             debug_assert!(ecx.opaque_type_is_rigid(opaque_ty.def_id));
         }
 
+        if let ty::CoroutineWitness(def_id, _) = goal.predicate.self_ty().kind() {
+            match ecx.typing_mode() {
+                TypingMode::Analysis { defining_opaque_types_and_generators } => {
+                    if def_id.as_local().is_some_and(|def_id| {
+                        defining_opaque_types_and_generators.contains(&def_id)
+                    }) {
+                        return ecx.forced_ambiguity(MaybeCause::Ambiguity);
+                    }
+                }
+                TypingMode::Coherence
+                | TypingMode::PostAnalysis
+                | TypingMode::PostBorrowckAnalysis { defined_opaque_types: _ } => {}
+            }
+        }
+
         ecx.probe_and_evaluate_goal_for_constituent_tys(
             CandidateSource::BuiltinImpl(BuiltinImplSource::Misc),
             goal,
diff --git a/compiler/rustc_trait_selection/src/solve.rs b/compiler/rustc_trait_selection/src/solve.rs
index d425ab50ae0fd..0c2451a80a705 100644
--- a/compiler/rustc_trait_selection/src/solve.rs
+++ b/compiler/rustc_trait_selection/src/solve.rs
@@ -9,5 +9,8 @@ mod select;
 pub(crate) use delegate::SolverDelegate;
 pub use fulfill::{FulfillmentCtxt, NextSolverError};
 pub(crate) use normalize::deeply_normalize_for_diagnostics;
-pub use normalize::{deeply_normalize, deeply_normalize_with_skipped_universes};
+pub use normalize::{
+    deeply_normalize, deeply_normalize_with_skipped_universes,
+    deeply_normalize_with_skipped_universes_and_ambiguous_goals,
+};
 pub use select::InferCtxtSelectExt;
diff --git a/compiler/rustc_trait_selection/src/solve/fulfill.rs b/compiler/rustc_trait_selection/src/solve/fulfill.rs
index 192e632a2d5b9..1fafd879ab5e2 100644
--- a/compiler/rustc_trait_selection/src/solve/fulfill.rs
+++ b/compiler/rustc_trait_selection/src/solve/fulfill.rs
@@ -1,18 +1,25 @@
 use std::marker::PhantomData;
 use std::mem;
+use std::ops::ControlFlow;
 
 use rustc_data_structures::thinvec::ExtractIf;
+use rustc_hir::def_id::LocalDefId;
 use rustc_infer::infer::InferCtxt;
 use rustc_infer::traits::query::NoSolution;
 use rustc_infer::traits::{
     FromSolverError, PredicateObligation, PredicateObligations, TraitEngine,
 };
+use rustc_middle::ty::{
+    self, Ty, TyCtxt, TypeSuperVisitable, TypeVisitable, TypeVisitor, TypingMode,
+};
 use rustc_next_trait_solver::solve::{GenerateProofTree, HasChanged, SolverDelegateEvalExt as _};
+use rustc_span::Span;
 use tracing::instrument;
 
 use self::derive_errors::*;
 use super::Certainty;
 use super::delegate::SolverDelegate;
+use super::inspect::{self, ProofTreeInferCtxtExt};
 use crate::traits::{FulfillmentError, ScrubbedTraitError};
 
 mod derive_errors;
@@ -39,7 +46,7 @@ pub struct FulfillmentCtxt<'tcx, E: 'tcx> {
     _errors: PhantomData<E>,
 }
 
-#[derive(Default)]
+#[derive(Default, Debug)]
 struct ObligationStorage<'tcx> {
     /// Obligations which resulted in an overflow in fulfillment itself.
     ///
@@ -55,20 +62,23 @@ impl<'tcx> ObligationStorage<'tcx> {
         self.pending.push(obligation);
     }
 
+    fn has_pending_obligations(&self) -> bool {
+        !self.pending.is_empty() || !self.overflowed.is_empty()
+    }
+
     fn clone_pending(&self) -> PredicateObligations<'tcx> {
         let mut obligations = self.pending.clone();
         obligations.extend(self.overflowed.iter().cloned());
         obligations
     }
 
-    fn take_pending(&mut self) -> PredicateObligations<'tcx> {
-        let mut obligations = mem::take(&mut self.pending);
-        obligations.append(&mut self.overflowed);
-        obligations
-    }
-
-    fn unstalled_for_select(&mut self) -> impl Iterator<Item = PredicateObligation<'tcx>> + 'tcx {
-        mem::take(&mut self.pending).into_iter()
+    fn drain_pending(
+        &mut self,
+        cond: impl Fn(&PredicateObligation<'tcx>) -> bool,
+    ) -> PredicateObligations<'tcx> {
+        let (unstalled, pending) = mem::take(&mut self.pending).into_iter().partition(cond);
+        self.pending = pending;
+        unstalled
     }
 
     fn on_fulfillment_overflow(&mut self, infcx: &InferCtxt<'tcx>) {
@@ -160,7 +170,7 @@ where
             }
 
             let mut has_changed = false;
-            for obligation in self.obligations.unstalled_for_select() {
+            for obligation in self.obligations.drain_pending(|_| true) {
                 let goal = obligation.as_goal();
                 let result = <&SolverDelegate<'tcx>>::from(infcx)
                     .evaluate_root_goal(goal, GenerateProofTree::No, obligation.cause.span)
@@ -196,15 +206,78 @@ where
     }
 
     fn has_pending_obligations(&self) -> bool {
-        !self.obligations.pending.is_empty() || !self.obligations.overflowed.is_empty()
+        self.obligations.has_pending_obligations()
     }
 
     fn pending_obligations(&self) -> PredicateObligations<'tcx> {
         self.obligations.clone_pending()
     }
 
-    fn drain_unstalled_obligations(&mut self, _: &InferCtxt<'tcx>) -> PredicateObligations<'tcx> {
-        self.obligations.take_pending()
+    fn drain_stalled_obligations_for_coroutines(
+        &mut self,
+        infcx: &InferCtxt<'tcx>,
+    ) -> PredicateObligations<'tcx> {
+        self.obligations.drain_pending(|obl| {
+            let stalled_generators = match infcx.typing_mode() {
+                TypingMode::Analysis { defining_opaque_types_and_generators } => {
+                    defining_opaque_types_and_generators
+                }
+                TypingMode::Coherence
+                | TypingMode::PostBorrowckAnalysis { defined_opaque_types: _ }
+                | TypingMode::PostAnalysis => return false,
+            };
+
+            if stalled_generators.is_empty() {
+                return false;
+            }
+
+            infcx.probe(|_| {
+                infcx
+                    .visit_proof_tree(
+                        obl.as_goal(),
+                        &mut StalledOnCoroutines { stalled_generators, span: obl.cause.span },
+                    )
+                    .is_break()
+            })
+        })
+    }
+}
+
+struct StalledOnCoroutines<'tcx> {
+    stalled_generators: &'tcx ty::List<LocalDefId>,
+    span: Span,
+    // TODO: Cache
+}
+
+impl<'tcx> inspect::ProofTreeVisitor<'tcx> for StalledOnCoroutines<'tcx> {
+    type Result = ControlFlow<()>;
+
+    fn span(&self) -> rustc_span::Span {
+        self.span
+    }
+
+    fn visit_goal(&mut self, inspect_goal: &super::inspect::InspectGoal<'_, 'tcx>) -> Self::Result {
+        inspect_goal.goal().predicate.visit_with(self)?;
+
+        if let Some(candidate) = inspect_goal.unique_applicable_candidate() {
+            candidate.visit_nested_no_probe(self)
+        } else {
+            ControlFlow::Continue(())
+        }
+    }
+}
+
+impl<'tcx> TypeVisitor<TyCtxt<'tcx>> for StalledOnCoroutines<'tcx> {
+    type Result = ControlFlow<()>;
+
+    fn visit_ty(&mut self, ty: Ty<'tcx>) -> Self::Result {
+        if let ty::CoroutineWitness(def_id, _) = *ty.kind()
+            && def_id.as_local().is_some_and(|def_id| self.stalled_generators.contains(&def_id))
+        {
+            return ControlFlow::Break(());
+        }
+
+        ty.super_visit_with(self)
     }
 }
 
diff --git a/compiler/rustc_trait_selection/src/solve/fulfill/derive_errors.rs b/compiler/rustc_trait_selection/src/solve/fulfill/derive_errors.rs
index 3a939df25e07b..e48aa43c36465 100644
--- a/compiler/rustc_trait_selection/src/solve/fulfill/derive_errors.rs
+++ b/compiler/rustc_trait_selection/src/solve/fulfill/derive_errors.rs
@@ -109,10 +109,16 @@ pub(super) fn fulfillment_error_for_stalled<'tcx>(
                 false,
             ),
             Ok((_, Certainty::Yes)) => {
-                bug!("did not expect successful goal when collecting ambiguity errors")
+                bug!(
+                    "did not expect successful goal when collecting ambiguity errors for `{:?}`",
+                    infcx.resolve_vars_if_possible(root_obligation.predicate),
+                )
             }
             Err(_) => {
-                bug!("did not expect selection error when collecting ambiguity errors")
+                bug!(
+                    "did not expect selection error when collecting ambiguity errors for `{:?}`",
+                    infcx.resolve_vars_if_possible(root_obligation.predicate),
+                )
             }
         }
     });
diff --git a/compiler/rustc_trait_selection/src/solve/normalize.rs b/compiler/rustc_trait_selection/src/solve/normalize.rs
index 232357dc71a0d..ce9629e507c8c 100644
--- a/compiler/rustc_trait_selection/src/solve/normalize.rs
+++ b/compiler/rustc_trait_selection/src/solve/normalize.rs
@@ -5,6 +5,7 @@ use std::marker::PhantomData;
 use rustc_data_structures::stack::ensure_sufficient_stack;
 use rustc_infer::infer::InferCtxt;
 use rustc_infer::infer::at::At;
+use rustc_infer::traits::solve::Goal;
 use rustc_infer::traits::{FromSolverError, Obligation, TraitEngine};
 use rustc_middle::traits::ObligationCause;
 use rustc_middle::ty::{
@@ -41,6 +42,30 @@ pub fn deeply_normalize_with_skipped_universes<'tcx, T, E>(
     value: T,
     universes: Vec<Option<UniverseIndex>>,
 ) -> Result<T, Vec<E>>
+where
+    T: TypeFoldable<TyCtxt<'tcx>>,
+    E: FromSolverError<'tcx, NextSolverError<'tcx>>,
+{
+    let (value, goals) =
+        deeply_normalize_with_skipped_universes_and_ambiguous_goals(at, value, universes)?;
+    assert_eq!(goals, vec![]);
+
+    Ok(value)
+}
+
+/// Deeply normalize all aliases in `value`. This does not handle inference and expects
+/// its input to be already fully resolved.
+///
+/// Additionally takes a list of universes which represents the binders which have been
+/// entered before passing `value` to the function. This is currently needed for
+/// `normalize_erasing_regions`, which skips binders as it walks through a type.
+///
+/// TODO: doc
+pub fn deeply_normalize_with_skipped_universes_and_ambiguous_goals<'tcx, T, E>(
+    at: At<'_, 'tcx>,
+    value: T,
+    universes: Vec<Option<UniverseIndex>>,
+) -> Result<(T, Vec<Goal<'tcx, ty::Predicate<'tcx>>>), Vec<E>>
 where
     T: TypeFoldable<TyCtxt<'tcx>>,
     E: FromSolverError<'tcx, NextSolverError<'tcx>>,
@@ -48,8 +73,15 @@ where
     let fulfill_cx = FulfillmentCtxt::new(at.infcx);
     let mut folder =
         NormalizationFolder { at, fulfill_cx, depth: 0, universes, _errors: PhantomData };
-
-    value.try_fold_with(&mut folder)
+    let value = value.try_fold_with(&mut folder)?;
+    let goals = folder
+        .fulfill_cx
+        .drain_stalled_obligations_for_coroutines(at.infcx)
+        .into_iter()
+        .map(|obl| obl.as_goal())
+        .collect();
+    let errors = folder.fulfill_cx.select_all_or_error(at.infcx);
+    if errors.is_empty() { Ok((value, goals)) } else { Err(errors) }
 }
 
 struct NormalizationFolder<'me, 'tcx, E> {
@@ -98,7 +130,7 @@ where
         );
 
         self.fulfill_cx.register_predicate_obligation(infcx, obligation);
-        let errors = self.fulfill_cx.select_all_or_error(infcx);
+        let errors = self.fulfill_cx.select_where_possible(infcx);
         if !errors.is_empty() {
             return Err(errors);
         }
@@ -139,7 +171,7 @@ where
 
         let result = if infcx.predicate_may_hold(&obligation) {
             self.fulfill_cx.register_predicate_obligation(infcx, obligation);
-            let errors = self.fulfill_cx.select_all_or_error(infcx);
+            let errors = self.fulfill_cx.select_where_possible(infcx);
             if !errors.is_empty() {
                 return Err(errors);
             }
@@ -253,20 +285,24 @@ impl<'tcx> TypeFolder<TyCtxt<'tcx>> for DeeplyNormalizeForDiagnosticsFolder<'_,
     }
 
     fn fold_ty(&mut self, ty: Ty<'tcx>) -> Ty<'tcx> {
-        deeply_normalize_with_skipped_universes(
-            self.at,
-            ty,
-            vec![None; ty.outer_exclusive_binder().as_usize()],
-        )
-        .unwrap_or_else(|_: Vec<ScrubbedTraitError<'tcx>>| ty.super_fold_with(self))
+        match deeply_normalize_with_skipped_universes_and_ambiguous_goals::<
+            _,
+            ScrubbedTraitError<'tcx>,
+        >(self.at, ty, vec![None; ty.outer_exclusive_binder().as_usize()])
+        {
+            Ok((value, _)) => value,
+            Err(_) => ty.super_fold_with(self),
+        }
     }
 
     fn fold_const(&mut self, ct: ty::Const<'tcx>) -> ty::Const<'tcx> {
-        deeply_normalize_with_skipped_universes(
-            self.at,
-            ct,
-            vec![None; ct.outer_exclusive_binder().as_usize()],
-        )
-        .unwrap_or_else(|_: Vec<ScrubbedTraitError<'tcx>>| ct.super_fold_with(self))
+        match deeply_normalize_with_skipped_universes_and_ambiguous_goals::<
+            _,
+            ScrubbedTraitError<'tcx>,
+        >(self.at, ct, vec![None; ct.outer_exclusive_binder().as_usize()])
+        {
+            Ok((value, _)) => value,
+            Err(_) => ct.super_fold_with(self),
+        }
     }
 }
diff --git a/compiler/rustc_trait_selection/src/traits/fulfill.rs b/compiler/rustc_trait_selection/src/traits/fulfill.rs
index e39f8e673dbac..fa407a7eb7593 100644
--- a/compiler/rustc_trait_selection/src/traits/fulfill.rs
+++ b/compiler/rustc_trait_selection/src/traits/fulfill.rs
@@ -162,7 +162,7 @@ where
         self.select(selcx)
     }
 
-    fn drain_unstalled_obligations(
+    fn drain_stalled_obligations_for_coroutines(
         &mut self,
         infcx: &InferCtxt<'tcx>,
     ) -> PredicateObligations<'tcx> {
diff --git a/compiler/rustc_trait_selection/src/traits/select/mod.rs b/compiler/rustc_trait_selection/src/traits/select/mod.rs
index 0679dbf1296af..07b2e687fd61e 100644
--- a/compiler/rustc_trait_selection/src/traits/select/mod.rs
+++ b/compiler/rustc_trait_selection/src/traits/select/mod.rs
@@ -1491,8 +1491,8 @@ impl<'cx, 'tcx> SelectionContext<'cx, 'tcx> {
             // However, if we disqualify *all* goals from being cached, perf suffers.
             // This is likely fixed by better caching in general in the new solver.
             // See: <https://github.com/rust-lang/rust/issues/132064>.
-            TypingMode::Analysis { defining_opaque_types } => {
-                defining_opaque_types.is_empty() || !pred.has_opaque_types()
+            TypingMode::Analysis { defining_opaque_types_and_generators } => {
+                defining_opaque_types_and_generators.is_empty() || !pred.has_opaque_types()
             }
             // The hidden types of `defined_opaque_types` is not local to the current
             // inference context, so we can freely move this to the global cache.
diff --git a/compiler/rustc_ty_utils/src/opaque_types.rs b/compiler/rustc_ty_utils/src/opaque_types.rs
index 3aad97d86cca1..a8f8df4652409 100644
--- a/compiler/rustc_ty_utils/src/opaque_types.rs
+++ b/compiler/rustc_ty_utils/src/opaque_types.rs
@@ -1,6 +1,7 @@
 use rustc_data_structures::fx::FxHashSet;
+use rustc_hir as hir;
 use rustc_hir::def::DefKind;
-use rustc_hir::def_id::LocalDefId;
+use rustc_hir::def_id::{DefId, LocalDefId};
 use rustc_hir::intravisit;
 use rustc_hir::intravisit::Visitor;
 use rustc_middle::bug;
@@ -351,6 +352,51 @@ fn opaque_types_defined_by<'tcx>(
     tcx.mk_local_def_ids(&collector.opaques)
 }
 
+// TODO: Move this out of `opaque_types`
+fn stalled_generators_within<'tcx>(
+    tcx: TyCtxt<'tcx>,
+    item: LocalDefId,
+) -> &'tcx ty::List<LocalDefId> {
+    if !tcx.next_trait_solver_globally() {
+        return ty::List::empty();
+    }
+
+    let body = tcx.hir_body_owned_by(item);
+    let mut collector =
+        StalledGeneratorVisitor { tcx, root_def_id: item.to_def_id(), stalled_coroutines: vec![] };
+    collector.visit_body(body);
+    tcx.mk_local_def_ids(&collector.stalled_coroutines)
+}
+
+struct StalledGeneratorVisitor<'tcx> {
+    tcx: TyCtxt<'tcx>,
+    root_def_id: DefId,
+    stalled_coroutines: Vec<LocalDefId>,
+}
+
+impl<'tcx> Visitor<'tcx> for StalledGeneratorVisitor<'tcx> {
+    fn visit_nested_body(&mut self, id: hir::BodyId) {
+        if self.tcx.typeck_root_def_id(self.tcx.hir_body_owner_def_id(id).to_def_id())
+            == self.root_def_id
+        {
+            let body = self.tcx.hir_body(id);
+            self.visit_body(body);
+        }
+    }
+
+    fn visit_expr(&mut self, ex: &'tcx hir::Expr<'tcx>) {
+        if let hir::ExprKind::Closure(&hir::Closure {
+            def_id,
+            kind: hir::ClosureKind::Coroutine(_),
+            ..
+        }) = ex.kind
+        {
+            self.stalled_coroutines.push(def_id);
+        }
+        intravisit::walk_expr(self, ex);
+    }
+}
+
 pub(super) fn provide(providers: &mut Providers) {
-    *providers = Providers { opaque_types_defined_by, ..*providers };
+    *providers = Providers { opaque_types_defined_by, stalled_generators_within, ..*providers };
 }
diff --git a/compiler/rustc_type_ir/src/infer_ctxt.rs b/compiler/rustc_type_ir/src/infer_ctxt.rs
index e512e8fc838f1..d4bb6bebf9c01 100644
--- a/compiler/rustc_type_ir/src/infer_ctxt.rs
+++ b/compiler/rustc_type_ir/src/infer_ctxt.rs
@@ -65,13 +65,13 @@ pub enum TypingMode<I: Interner> {
     ///     let x: <() as Assoc>::Output = true;
     /// }
     /// ```
-    Analysis { defining_opaque_types: I::DefiningOpaqueTypes },
+    Analysis { defining_opaque_types_and_generators: I::LocalDefIds },
     /// Any analysis after borrowck for a given body should be able to use all the
     /// hidden types defined by borrowck, without being able to define any new ones.
     ///
     /// This is currently only used by the new solver, but should be implemented in
     /// the old solver as well.
-    PostBorrowckAnalysis { defined_opaque_types: I::DefiningOpaqueTypes },
+    PostBorrowckAnalysis { defined_opaque_types: I::LocalDefIds },
     /// After analysis, mostly during codegen and MIR optimizations, we're able to
     /// reveal all opaque types. As the concrete type should *never* be observable
     /// directly by the user, this should not be used by checks which may expose
@@ -86,13 +86,22 @@ pub enum TypingMode<I: Interner> {
 impl<I: Interner> TypingMode<I> {
     /// Analysis outside of a body does not define any opaque types.
     pub fn non_body_analysis() -> TypingMode<I> {
-        TypingMode::Analysis { defining_opaque_types: Default::default() }
+        TypingMode::Analysis { defining_opaque_types_and_generators: Default::default() }
+    }
+
+    pub fn typeck_for_body(cx: I, body_def_id: I::LocalDefId) -> TypingMode<I> {
+        TypingMode::Analysis {
+            defining_opaque_types_and_generators: cx
+                .opaque_types_and_generators_defined_by(body_def_id),
+        }
     }
 
     /// While typechecking a body, we need to be able to define the opaque
     /// types defined by that body.
     pub fn analysis_in_body(cx: I, body_def_id: I::LocalDefId) -> TypingMode<I> {
-        TypingMode::Analysis { defining_opaque_types: cx.opaque_types_defined_by(body_def_id) }
+        TypingMode::Analysis {
+            defining_opaque_types_and_generators: cx.opaque_types_defined_by(body_def_id),
+        }
     }
 
     pub fn post_borrowck_analysis(cx: I, body_def_id: I::LocalDefId) -> TypingMode<I> {
diff --git a/compiler/rustc_type_ir/src/interner.rs b/compiler/rustc_type_ir/src/interner.rs
index 8f86270d7dce7..e32e9c4a80803 100644
--- a/compiler/rustc_type_ir/src/interner.rs
+++ b/compiler/rustc_type_ir/src/interner.rs
@@ -55,7 +55,7 @@ pub trait Interner:
         data: PredefinedOpaquesData<Self>,
     ) -> Self::PredefinedOpaques;
 
-    type DefiningOpaqueTypes: Copy
+    type LocalDefIds: Copy
         + Debug
         + Hash
         + Default
@@ -316,10 +316,12 @@ pub trait Interner:
         binder: ty::Binder<Self, T>,
     ) -> ty::Binder<Self, T>;
 
-    fn opaque_types_defined_by(
+    fn opaque_types_defined_by(self, defining_anchor: Self::LocalDefId) -> Self::LocalDefIds;
+
+    fn opaque_types_and_generators_defined_by(
         self,
         defining_anchor: Self::LocalDefId,
-    ) -> Self::DefiningOpaqueTypes;
+    ) -> Self::LocalDefIds;
 }
 
 /// Imagine you have a function `F: FnOnce(&[T]) -> R`, plus an iterator `iter`