Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ def convert_to_beam_type(typ):

# Unwrap Python 3.12 `type` aliases (TypeAliasType) to their underlying value.
# This ensures Beam sees the actual aliased type (e.g., tuple[int, ...]).
import sys
if sys.version_info >= (3, 12) and TypeAliasType is not None:
if isinstance(typ, TypeAliasType): # pylint: disable=isinstance-second-argument-not-valid-type
underlying = getattr(typ, '__value__', None)
Expand Down
94 changes: 68 additions & 26 deletions sdks/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ def pytest_addoption(parser):
'--test-pipeline-options',
help='Options to use in test pipelines. NOTE: Tests may '
'ignore some or all of these options.')
parser.addoption(
'--enable-test-cleanup',
action='store_true',
default=None,
help='Enable expensive cleanup operations. Auto-enabled in CI.')
parser.addoption(
'--disable-test-cleanup',
action='store_true',
default=False,
help='Disable expensive cleanup operations even in CI.')


# See pytest.ini for main collection rules.
Expand Down Expand Up @@ -101,56 +111,88 @@ def configure_beam_rpc_timeouts():
print("Successfully configured Beam RPC timeouts")


def _running_in_ci():
"""Returns True if running in a CI environment."""
return (
os.getenv('GITHUB_ACTIONS') == 'true' or
os.getenv('CI') == 'true' or
os.getenv('CONTINUOUS_INTEGRATION') == 'true'
)


def _should_enable_test_cleanup(config):
"""Returns True if expensive cleanup operations should run."""
if config.getoption('--disable-test-cleanup'):
result = False
reason = "disabled via --disable-test-cleanup"
elif config.getoption('--enable-test-cleanup'):
result = True
reason = "enabled via --enable-test-cleanup"
else:
if _running_in_ci():
result = True
reason = "CI detected"
else:
result = False
reason = "local development"

# Log once per session
if not hasattr(config, '_cleanup_decision_logged'):
print(f"\n[Test Cleanup] Enabled: {result} ({reason})")
config._cleanup_decision_logged = True

return result


@pytest.fixture(autouse=True)
def ensure_clean_state():
def ensure_clean_state(request):
"""
Ensure clean state before each test
to prevent cross-test contamination.
Ensures clean state between tests to prevent contamination.

Expensive operations (sleeps, extra GC) only run in CI or when
explicitly enabled to keep local tests fast.
"""
import gc
import threading
import time

# Force garbage collection to clean up any lingering resources
gc.collect()
enable_cleanup = _should_enable_test_cleanup(request.config)

if enable_cleanup:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused - how does this speed up our CI tests? Won't the behavior in CI be the exact same since enable_cleanup will evaluate to True?

Copy link
Contributor Author

@aIbrahiim aIbrahiim Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is as discussed in the issue here, the ensure_clean_state adds significant overhead (~100ms/test). While we need this strict isolation in CI to prevent flakiness (hence enable_cleanup is True), it makes running tests locally very painful and this pr implements the suggested interim solution by keeping the strict cleanup in CI (so CI remains stable) while disabling it by default locally so it can iterate more quickly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 0.1 s overhead comes from here:

https://github.com/apache/beam/pull/35889/files#diff-76dd9e3509f907a47fc83cfd5094ffdfeed3101f73ac31bfffe113605aef0b46R135

I feel skeptical about using busy wait to solve issues in general. I would suggest revert ensure_clean_state and enhance_mock_stability fixtures or move then to per-test-class based (if possible)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree, I'd like to at least try that and try to understand where the core problem is coming from.

If it does reduce stability, I think we can take this as a temporary measure, but with the goal of getting rid of it

gc.collect()

# Log active thread count for debugging
thread_count = threading.active_count()
if thread_count > 50: # Increased threshold since we see 104 threads
if thread_count > 50:
print(f"Warning: {thread_count} active threads detected before test")

# Force a brief pause to let threads settle
time.sleep(0.5)
gc.collect()
if enable_cleanup:
time.sleep(0.5)
gc.collect()

yield

# Enhanced cleanup after test
try:
# Force more aggressive cleanup
gc.collect()

# Brief pause to let any async operations complete
time.sleep(0.1)

# Additional garbage collection
gc.collect()
if enable_cleanup:
gc.collect()
time.sleep(0.1)
gc.collect()
except Exception as e:
print(f"Warning: Cleanup error: {e}")


@pytest.fixture(autouse=True)
def enhance_mock_stability():
"""Enhance mock stability in DinD environment."""
def enhance_mock_stability(request):
"""Improves mock stability in DinD environment."""
import time

# Brief pause before test to ensure clean mock state
time.sleep(0.05)
enable_cleanup = _should_enable_test_cleanup(request.config)

if enable_cleanup:
time.sleep(0.05)

yield

# Brief pause after test to let mocks clean up
time.sleep(0.05)
if enable_cleanup:
time.sleep(0.05)


def pytest_configure(config):
Expand Down
Loading