Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Oct 25, 2025

📄 27% (0.27x) speedup for UsersGroupsDataSource.users_update_scoped_role_member_of in backend/python/app/sources/external/microsoft/users_groups/users_groups.py

⏱️ Runtime : 1.69 milliseconds 1.33 milliseconds (best of 21 runs)

📝 Explanation and details

The optimized code achieves a 27% runtime improvement by eliminating unnecessary object instantiation and reducing redundant operations:

Key Optimizations:

  1. Conditional Object Creation: The original code always creates two RequestConfiguration objects regardless of whether any parameters are provided. The optimized version only creates these objects when query parameters or headers are actually present, saving substantial initialization overhead.

  2. Batched Parameter Setting: Instead of checking each parameter individually and setting attributes one-by-one, the optimization collects all parameter values in a dictionary first, then uses a single loop with setattr() to assign them. This reduces the number of conditional checks and attribute assignments.

  3. Early Exit for Empty Parameters: When no query parameters or headers are provided (which appears to be the common case based on test results), the optimized code sets config = None and skips all the configuration object creation entirely.

Performance Impact:
The line profiler shows the most significant gains come from:

  • Reducing RequestConfiguration() instantiation from always executing (909 hits) to only when needed (3 hits)
  • Eliminating redundant attribute assignments when no parameters are present
  • Streamlining the parameter validation and setting process

Test Case Performance:
The optimization is particularly effective for:

  • Basic test cases with minimal parameters (most common scenario)
  • Large-scale concurrent operations where the overhead reduction multiplies across many calls
  • High-throughput scenarios where even small per-operation savings compound significantly

The optimization maintains identical functionality and error handling while reducing unnecessary computational overhead, especially beneficial in high-frequency API call scenarios.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 1818 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 92.9%
🌀 Generated Regression Tests and Runtime
import asyncio  # used to run async functions
# --- Function to test (EXACT COPY) ---
import logging
# Mocks and helpers for testing
from typing import Any, Dict, List, Mapping, Optional

import pytest  # used for our unit tests
from app.sources.external.microsoft.users_groups.users_groups import \
    UsersGroupsDataSource


# Stub for the PATCH endpoint chain
class PatchEndpointStub:
    def __init__(self, response=None, should_raise=False):
        self.response = response
        self.should_raise = should_raise

    async def patch(self, body=None, request_configuration=None):
        if self.should_raise:
            raise Exception("Simulated PATCH error")
        return self.response

# Stub for by_scopedRoleMemberOf_id
class ScopedRoleMemberOfStub:
    def __init__(self, patch_endpoint):
        self.patch_endpoint = patch_endpoint

    def by_scopedRoleMemberOf_id(self, scopedRoleMembership_id):
        return self.patch_endpoint

# Stub for users.by_user_id
class UsersStub:
    def __init__(self, patch_endpoint):
        self.patch_endpoint = patch_endpoint

    def by_user_id(self, user_id):
        return ScopedRoleMemberOfStub(self.patch_endpoint)

# Stub for MSGraphServiceClient
class MSGraphServiceClientStub:
    def __init__(self, patch_endpoint):
        self.users = UsersStub(patch_endpoint)

# Stub for MSGraphClient
class MSGraphClientStub:
    def __init__(self, patch_endpoint):
        self.patch_endpoint = patch_endpoint

    def get_client(self):
        return self

    def get_ms_graph_service_client(self):
        return MSGraphServiceClientStub(self.patch_endpoint)

# --- Unit Tests ---

# 1. Basic Test Cases

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_basic_success():
    """Test basic successful update with required parameters."""
    # Simulate a successful response object
    response_obj = {"id": "123", "role": "admin"}
    patch_endpoint = PatchEndpointStub(response=response_obj)
    client = MSGraphClientStub(patch_endpoint)
    datasource = UsersGroupsDataSource(client)
    result = await datasource.users_update_scoped_role_member_of(
        user_id="user1",
        scopedRoleMembership_id="role1"
    )

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_basic_with_optional_params():
    """Test with all optional parameters provided."""
    response_obj = {"id": "456", "role": "editor"}
    patch_endpoint = PatchEndpointStub(response=response_obj)
    client = MSGraphClientStub(patch_endpoint)
    datasource = UsersGroupsDataSource(client)
    result = await datasource.users_update_scoped_role_member_of(
        user_id="user2",
        scopedRoleMembership_id="role2",
        select=["id", "role"],
        expand=["manager"],
        filter="role eq 'editor'",
        orderby="id desc",
        search="editor",
        top=10,
        skip=2,
        request_body={"role": "editor"},
        headers={"Custom-Header": "value"}
    )

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_basic_none_response():
    """Test when the PATCH endpoint returns None (should be handled as error)."""
    patch_endpoint = PatchEndpointStub(response=None)
    client = MSGraphClientStub(patch_endpoint)
    datasource = UsersGroupsDataSource(client)
    result = await datasource.users_update_scoped_role_member_of(
        user_id="user3",
        scopedRoleMembership_id="role3"
    )

# 2. Edge Test Cases

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_error_in_response_object():
    """Test when the response object has an 'error' attribute."""
    class ErrorResponse:
        error = "Something went wrong"
    patch_endpoint = PatchEndpointStub(response=ErrorResponse())
    client = MSGraphClientStub(patch_endpoint)
    datasource = UsersGroupsDataSource(client)
    result = await datasource.users_update_scoped_role_member_of(
        user_id="user4",
        scopedRoleMembership_id="role4"
    )

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_error_in_response_dict():
    """Test when the response dict has an 'error' key with details."""
    response_obj = {"error": {"code": "BadRequest", "message": "Invalid input"}}
    patch_endpoint = PatchEndpointStub(response=response_obj)
    client = MSGraphClientStub(patch_endpoint)
    datasource = UsersGroupsDataSource(client)
    result = await datasource.users_update_scoped_role_member_of(
        user_id="user5",
        scopedRoleMembership_id="role5"
    )

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_error_in_response_dict_string():
    """Test when the response dict has an 'error' key as a string."""
    response_obj = {"error": "Simple error string"}
    patch_endpoint = PatchEndpointStub(response=response_obj)
    client = MSGraphClientStub(patch_endpoint)
    datasource = UsersGroupsDataSource(client)
    result = await datasource.users_update_scoped_role_member_of(
        user_id="user6",
        scopedRoleMembership_id="role6"
    )

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_error_code_message_attrs():
    """Test when the response object has 'code' and 'message' attributes."""
    class CodeMessageResponse:
        code = "Forbidden"
        message = "Access denied"
    patch_endpoint = PatchEndpointStub(response=CodeMessageResponse())
    client = MSGraphClientStub(patch_endpoint)
    datasource = UsersGroupsDataSource(client)
    result = await datasource.users_update_scoped_role_member_of(
        user_id="user7",
        scopedRoleMembership_id="role7"
    )

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_patch_raises_exception():
    """Test when the PATCH endpoint raises an exception."""
    patch_endpoint = PatchEndpointStub(should_raise=True)
    client = MSGraphClientStub(patch_endpoint)
    datasource = UsersGroupsDataSource(client)
    result = await datasource.users_update_scoped_role_member_of(
        user_id="user8",
        scopedRoleMembership_id="role8"
    )

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_concurrent_execution():
    """Test concurrent execution of multiple calls."""
    patch_endpoint1 = PatchEndpointStub(response={"id": "a", "role": "r1"})
    patch_endpoint2 = PatchEndpointStub(response={"id": "b", "role": "r2"})
    client1 = MSGraphClientStub(patch_endpoint1)
    client2 = MSGraphClientStub(patch_endpoint2)
    datasource1 = UsersGroupsDataSource(client1)
    datasource2 = UsersGroupsDataSource(client2)
    # Run two updates concurrently
    results = await asyncio.gather(
        datasource1.users_update_scoped_role_member_of("userA", "roleA"),
        datasource2.users_update_scoped_role_member_of("userB", "roleB"),
    )

# 3. Large Scale Test Cases

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_large_scale_concurrent():
    """Test large scale concurrent execution with 50 calls."""
    # Each call returns a unique response
    patch_endpoints = [PatchEndpointStub(response={"id": str(i), "role": f"role{i}"}) for i in range(50)]
    datasources = [UsersGroupsDataSource(MSGraphClientStub(pe)) for pe in patch_endpoints]
    coros = [
        ds.users_update_scoped_role_member_of(
            user_id=f"user{i}",
            scopedRoleMembership_id=f"role{i}"
        )
        for i, ds in enumerate(datasources)
    ]
    results = await asyncio.gather(*coros)
    for i, result in enumerate(results):
        pass

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_large_scale_error_handling():
    """Test large scale concurrent execution with some errors."""
    patch_endpoints = []
    # First 10 return error, next 40 succeed
    for i in range(50):
        if i < 10:
            patch_endpoints.append(PatchEndpointStub(should_raise=True))
        else:
            patch_endpoints.append(PatchEndpointStub(response={"id": str(i), "role": f"role{i}"}))
    datasources = [UsersGroupsDataSource(MSGraphClientStub(pe)) for pe in patch_endpoints]
    coros = [
        ds.users_update_scoped_role_member_of(
            user_id=f"user{i}",
            scopedRoleMembership_id=f"role{i}"
        )
        for i, ds in enumerate(datasources)
    ]
    results = await asyncio.gather(*coros)
    for i, result in enumerate(results):
        if i < 10:
            pass
        else:
            pass

# 4. Throughput Test Cases

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_throughput_small_load():
    """Throughput test: small load (10 concurrent calls)."""
    patch_endpoints = [PatchEndpointStub(response={"id": str(i), "role": f"role{i}"}) for i in range(10)]
    datasources = [UsersGroupsDataSource(MSGraphClientStub(pe)) for pe in patch_endpoints]
    coros = [
        ds.users_update_scoped_role_member_of(
            user_id=f"user{i}",
            scopedRoleMembership_id=f"role{i}"
        )
        for i, ds in enumerate(datasources)
    ]
    results = await asyncio.gather(*coros)
    for i, result in enumerate(results):
        pass

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_throughput_medium_load():
    """Throughput test: medium load (100 concurrent calls)."""
    patch_endpoints = [PatchEndpointStub(response={"id": str(i), "role": f"role{i}"}) for i in range(100)]
    datasources = [UsersGroupsDataSource(MSGraphClientStub(pe)) for pe in patch_endpoints]
    coros = [
        ds.users_update_scoped_role_member_of(
            user_id=f"user{i}",
            scopedRoleMembership_id=f"role{i}"
        )
        for i, ds in enumerate(datasources)
    ]
    results = await asyncio.gather(*coros)
    for i, result in enumerate(results):
        pass

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_throughput_high_volume():
    """Throughput test: high volume (500 concurrent calls)."""
    patch_endpoints = [PatchEndpointStub(response={"id": str(i), "role": f"role{i}"}) for i in range(500)]
    datasources = [UsersGroupsDataSource(MSGraphClientStub(pe)) for pe in patch_endpoints]
    coros = [
        ds.users_update_scoped_role_member_of(
            user_id=f"user{i}",
            scopedRoleMembership_id=f"role{i}"
        )
        for i, ds in enumerate(datasources)
    ]
    results = await asyncio.gather(*coros)
    for i, result in enumerate(results):
        pass
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
import asyncio  # used to run async functions
import logging
from typing import Any, Dict, List, Mapping, Optional

import pytest  # used for our unit tests
from app.sources.external.microsoft.users_groups.users_groups import \
    UsersGroupsDataSource


# Mock for the MSGraphClient's users property chain
class MockScopedRoleMemberOf:
    def __init__(self, user_id, scopedRoleMembership_id, patch_result=None, patch_exception=None):
        self.user_id = user_id
        self.scopedRoleMembership_id = scopedRoleMembership_id
        self.patch_result = patch_result
        self.patch_exception = patch_exception

    async def patch(self, body=None, request_configuration=None):
        if self.patch_exception:
            raise self.patch_exception
        return self.patch_result

class MockScopedRoleMemberOfById:
    def __init__(self, user_id, patch_result=None, patch_exception=None):
        self.user_id = user_id
        self.patch_result = patch_result
        self.patch_exception = patch_exception

    def by_scopedRoleMemberOf_id(self, scopedRoleMembership_id):
        return MockScopedRoleMemberOf(self.user_id, scopedRoleMembership_id, self.patch_result, self.patch_exception)

class MockUsers:
    def __init__(self, patch_result=None, patch_exception=None):
        self.patch_result = patch_result
        self.patch_exception = patch_exception

    def by_user_id(self, user_id):
        return MockScopedRoleMemberOfById(user_id, self.patch_result, self.patch_exception)

class MockClient:
    def __init__(self, patch_result=None, patch_exception=None):
        self.users = MockUsers(patch_result, patch_exception)

    def get_ms_graph_service_client(self):
        return self

# ---- Pytest unit tests ----

class DummyGraphClient:
    def __init__(self, patch_result=None, patch_exception=None):
        self._client = MockClient(patch_result, patch_exception)
    def get_client(self):
        return self._client

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_basic_success():
    # Basic: Test a successful update with minimal required parameters
    expected_response_data = {"id": "scopedRoleMembership_id", "role": "Admin"}
    patch_result = expected_response_data
    client = DummyGraphClient(patch_result=patch_result)
    ds = UsersGroupsDataSource(client)
    resp = await ds.users_update_scoped_role_member_of(
        user_id="user1",
        scopedRoleMembership_id="role1",
        request_body={"role": "Admin"}
    )

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_basic_with_all_parameters():
    # Basic: Test with all optional parameters provided
    expected_response_data = {"id": "role2", "role": "User", "extra": True}
    patch_result = expected_response_data
    client = DummyGraphClient(patch_result=patch_result)
    ds = UsersGroupsDataSource(client)
    resp = await ds.users_update_scoped_role_member_of(
        user_id="user2",
        scopedRoleMembership_id="role2",
        select=["id", "role"],
        expand=["manager"],
        filter="role eq 'User'",
        orderby="id",
        search="User",
        top=10,
        skip=0,
        request_body={"role": "User"},
        headers={"Authorization": "Bearer token"}
    )

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_basic_none_response():
    # Basic: Simulate None response from patch (should result in error)
    client = DummyGraphClient(patch_result=None)
    ds = UsersGroupsDataSource(client)
    resp = await ds.users_update_scoped_role_member_of(
        user_id="user3",
        scopedRoleMembership_id="role3"
    )

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_basic_dict_error_response():
    # Basic: Simulate error in dict response
    error_dict = {"error": {"code": "Invalid", "message": "Bad request"}}
    client = DummyGraphClient(patch_result=error_dict)
    ds = UsersGroupsDataSource(client)
    resp = await ds.users_update_scoped_role_member_of(
        user_id="user4",
        scopedRoleMembership_id="role4"
    )

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_basic_object_error_response():
    # Basic: Simulate error in object response
    class ErrorObj:
        error = "Some error"
    client = DummyGraphClient(patch_result=ErrorObj())
    ds = UsersGroupsDataSource(client)
    resp = await ds.users_update_scoped_role_member_of(
        user_id="user5",
        scopedRoleMembership_id="role5"
    )

# ---- Edge cases ----

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_exception_handling():
    # Edge: Simulate an exception thrown during patch
    class CustomException(Exception):
        pass
    client = DummyGraphClient(patch_exception=CustomException("Patch failed"))
    ds = UsersGroupsDataSource(client)
    resp = await ds.users_update_scoped_role_member_of(
        user_id="user6",
        scopedRoleMembership_id="role6"
    )

@pytest.mark.asyncio

async def test_users_update_scoped_role_member_of_concurrent_execution():
    # Edge: Test concurrent execution with different inputs
    patch_result1 = {"id": "roleA", "role": "A"}
    patch_result2 = {"id": "roleB", "role": "B"}
    client1 = DummyGraphClient(patch_result=patch_result1)
    client2 = DummyGraphClient(patch_result=patch_result2)
    ds1 = UsersGroupsDataSource(client1)
    ds2 = UsersGroupsDataSource(client2)
    results = await asyncio.gather(
        ds1.users_update_scoped_role_member_of("userA", "roleA"),
        ds2.users_update_scoped_role_member_of("userB", "roleB")
    )

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_search_consistency_header():
    # Edge: Test that ConsistencyLevel header is set when search is used
    expected_response_data = {"id": "roleC", "role": "C"}
    patch_result = expected_response_data
    client = DummyGraphClient(patch_result=patch_result)
    ds = UsersGroupsDataSource(client)
    resp = await ds.users_update_scoped_role_member_of(
        user_id="userC",
        scopedRoleMembership_id="roleC",
        search="C"
    )
    # No direct way to check header in response, but function should not error

# ---- Large scale ----

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_large_scale_concurrent():
    # Large scale: Run 50 concurrent updates
    N = 50
    patch_results = [{"id": f"role{i}", "role": f"Role{i}"} for i in range(N)]
    clients = [DummyGraphClient(patch_result=patch_results[i]) for i in range(N)]
    ds_list = [UsersGroupsDataSource(clients[i]) for i in range(N)]
    coros = [
        ds_list[i].users_update_scoped_role_member_of(
            user_id=f"user{i}",
            scopedRoleMembership_id=f"role{i}",
            request_body={"role": f"Role{i}"}
        ) for i in range(N)
    ]
    results = await asyncio.gather(*coros)
    for i, resp in enumerate(results):
        pass

# ---- Throughput tests ----

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_throughput_small_load():
    # Throughput: Small load, 5 concurrent updates
    N = 5
    patch_results = [{"id": f"role{i}", "role": f"Role{i}"} for i in range(N)]
    clients = [DummyGraphClient(patch_result=patch_results[i]) for i in range(N)]
    ds_list = [UsersGroupsDataSource(clients[i]) for i in range(N)]
    coros = [
        ds_list[i].users_update_scoped_role_member_of(
            user_id=f"user{i}",
            scopedRoleMembership_id=f"role{i}",
            request_body={"role": f"Role{i}"}
        ) for i in range(N)
    ]
    results = await asyncio.gather(*coros)
    for i, resp in enumerate(results):
        pass

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_throughput_medium_load():
    # Throughput: Medium load, 25 concurrent updates
    N = 25
    patch_results = [{"id": f"role{i}", "role": f"Role{i}"} for i in range(N)]
    clients = [DummyGraphClient(patch_result=patch_results[i]) for i in range(N)]
    ds_list = [UsersGroupsDataSource(clients[i]) for i in range(N)]
    coros = [
        ds_list[i].users_update_scoped_role_member_of(
            user_id=f"user{i}",
            scopedRoleMembership_id=f"role{i}",
            request_body={"role": f"Role{i}"}
        ) for i in range(N)
    ]
    results = await asyncio.gather(*coros)
    for i, resp in enumerate(results):
        pass

@pytest.mark.asyncio
async def test_users_update_scoped_role_member_of_throughput_high_load():
    # Throughput: High load, 100 concurrent updates
    N = 100
    patch_results = [{"id": f"role{i}", "role": f"Role{i}"} for i in range(N)]
    clients = [DummyGraphClient(patch_result=patch_results[i]) for i in range(N)]
    ds_list = [UsersGroupsDataSource(clients[i]) for i in range(N)]
    coros = [
        ds_list[i].users_update_scoped_role_member_of(
            user_id=f"user{i}",
            scopedRoleMembership_id=f"role{i}",
            request_body={"role": f"Role{i}"}
        ) for i in range(N)
    ]
    results = await asyncio.gather(*coros)
    for i, resp in enumerate(results):
        pass
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-UsersGroupsDataSource.users_update_scoped_role_member_of-mh63taad and push.

Codeflash

The optimized code achieves a **27% runtime improvement** by eliminating unnecessary object instantiation and reducing redundant operations:

**Key Optimizations:**

1. **Conditional Object Creation**: The original code always creates two `RequestConfiguration` objects regardless of whether any parameters are provided. The optimized version only creates these objects when query parameters or headers are actually present, saving substantial initialization overhead.

2. **Batched Parameter Setting**: Instead of checking each parameter individually and setting attributes one-by-one, the optimization collects all parameter values in a dictionary first, then uses a single loop with `setattr()` to assign them. This reduces the number of conditional checks and attribute assignments.

3. **Early Exit for Empty Parameters**: When no query parameters or headers are provided (which appears to be the common case based on test results), the optimized code sets `config = None` and skips all the configuration object creation entirely.

**Performance Impact:**
The line profiler shows the most significant gains come from:
- Reducing `RequestConfiguration()` instantiation from always executing (909 hits) to only when needed (3 hits)
- Eliminating redundant attribute assignments when no parameters are present
- Streamlining the parameter validation and setting process

**Test Case Performance:**
The optimization is particularly effective for:
- Basic test cases with minimal parameters (most common scenario)
- Large-scale concurrent operations where the overhead reduction multiplies across many calls
- High-throughput scenarios where even small per-operation savings compound significantly

The optimization maintains identical functionality and error handling while reducing unnecessary computational overhead, especially beneficial in high-frequency API call scenarios.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 October 25, 2025 09:55
@codeflash-ai codeflash-ai bot added the ⚡️ codeflash Optimization PR opened by Codeflash AI label Oct 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI

Projects

None yet

Development

Successfully merging this pull request may close these issues.

0 participants