|
| 1 | +from functools import cmp_to_key |
| 2 | +from typing import Callable, Optional |
| 3 | + |
| 4 | +from rcrs_core.entities.ambulanceTeam import AmbulanceTeamEntity |
| 5 | +from rcrs_core.entities.entity import Entity |
| 6 | +from rcrs_core.entities.human import Human |
| 7 | +from rcrs_core.worldmodel.entityID import EntityID |
| 8 | + |
| 9 | +from adf_core_python.core.agent.communication.message_manager import MessageManager |
| 10 | +from adf_core_python.core.agent.develop.develop_data import DevelopData |
| 11 | +from adf_core_python.core.agent.info.agent_info import AgentInfo |
| 12 | +from adf_core_python.core.agent.info.scenario_info import ScenarioInfo |
| 13 | +from adf_core_python.core.agent.info.world_info import WorldInfo |
| 14 | +from adf_core_python.core.agent.module.module_manager import ModuleManager |
| 15 | +from adf_core_python.core.agent.precompute.precompute_data import PrecomputeData |
| 16 | +from adf_core_python.core.component.module.complex.ambulance_target_allocator import ( |
| 17 | + AmbulanceTargetAllocator, |
| 18 | +) |
| 19 | + |
| 20 | + |
| 21 | +class DefaultAmbulanceTargetAllocator(AmbulanceTargetAllocator): |
| 22 | + def __init__( |
| 23 | + self, |
| 24 | + agent_info: AgentInfo, |
| 25 | + world_info: WorldInfo, |
| 26 | + scenario_info: ScenarioInfo, |
| 27 | + module_manager: ModuleManager, |
| 28 | + develop_data: DevelopData, |
| 29 | + ) -> None: |
| 30 | + super().__init__( |
| 31 | + agent_info, world_info, scenario_info, module_manager, develop_data |
| 32 | + ) |
| 33 | + self._priority_humans: set[EntityID] = set() |
| 34 | + self._target_humans: set[EntityID] = set() |
| 35 | + self._ambulance_team_info_map: dict[ |
| 36 | + EntityID, DefaultAmbulanceTargetAllocator.AmbulanceTeamInfo |
| 37 | + ] = {} |
| 38 | + |
| 39 | + def resume(self, precompute_data: PrecomputeData) -> AmbulanceTargetAllocator: |
| 40 | + super().resume(precompute_data) |
| 41 | + if self.get_count_resume() >= 2: |
| 42 | + return self |
| 43 | + for entity_id in self._world_info.get_entity_ids_of_types( |
| 44 | + [AmbulanceTeamEntity] |
| 45 | + ): |
| 46 | + self._ambulance_team_info_map[entity_id] = self.AmbulanceTeamInfo(entity_id) |
| 47 | + return self |
| 48 | + |
| 49 | + def prepare(self) -> AmbulanceTargetAllocator: |
| 50 | + super().prepare() |
| 51 | + if self.get_count_prepare() >= 2: |
| 52 | + return self |
| 53 | + for entity_id in self._world_info.get_entity_ids_of_types( |
| 54 | + [AmbulanceTeamEntity] |
| 55 | + ): |
| 56 | + self._ambulance_team_info_map[entity_id] = self.AmbulanceTeamInfo(entity_id) |
| 57 | + return self |
| 58 | + |
| 59 | + def update_info(self, message_manager: MessageManager) -> AmbulanceTargetAllocator: |
| 60 | + super().update_info(message_manager) |
| 61 | + # TODO: implement after message_manager is implemented |
| 62 | + return self |
| 63 | + |
| 64 | + def calculate(self) -> AmbulanceTargetAllocator: |
| 65 | + agents = self._get_action_agents(self._ambulance_team_info_map) |
| 66 | + removes = [] |
| 67 | + current_time = self._agent_info.get_time() |
| 68 | + |
| 69 | + for target in self._priority_humans: |
| 70 | + if len(agents) > 0: |
| 71 | + target_entity = self._world_info.get_entity(target) |
| 72 | + if target_entity is not None and isinstance(target_entity, Human): |
| 73 | + agents = sorted( |
| 74 | + agents, key=cmp_to_key(self._compare_by_distance(target_entity)) |
| 75 | + ) |
| 76 | + result = agents.pop(0) |
| 77 | + info = self._ambulance_team_info_map[result.get_id()] |
| 78 | + if info is not None: |
| 79 | + info._can_new_action = False |
| 80 | + info._target = target |
| 81 | + info.command_time = current_time |
| 82 | + self._ambulance_team_info_map[result.get_id()] = info |
| 83 | + removes.append(target) |
| 84 | + |
| 85 | + for r in removes: |
| 86 | + self._priority_humans.remove(r) |
| 87 | + removes.clear() |
| 88 | + |
| 89 | + for target in self._target_humans: |
| 90 | + if len(agents) > 0: |
| 91 | + target_entity = self._world_info.get_entity(target) |
| 92 | + if target_entity is not None and isinstance(target_entity, Human): |
| 93 | + agents = sorted( |
| 94 | + agents, key=cmp_to_key(self._compare_by_distance(target_entity)) |
| 95 | + ) |
| 96 | + result = agents.pop(0) |
| 97 | + info = self._ambulance_team_info_map[result.get_id()] |
| 98 | + if info is not None: |
| 99 | + info._can_new_action = False |
| 100 | + info._target = target |
| 101 | + info.command_time = current_time |
| 102 | + self._ambulance_team_info_map[result.get_id()] = info |
| 103 | + removes.append(target) |
| 104 | + |
| 105 | + for r in removes: |
| 106 | + self._target_humans.remove(r) |
| 107 | + |
| 108 | + return self |
| 109 | + |
| 110 | + def get_result(self) -> dict[EntityID, EntityID]: |
| 111 | + return self._convert(self._ambulance_team_info_map) |
| 112 | + |
| 113 | + def _get_action_agents( |
| 114 | + self, |
| 115 | + info_map: dict[EntityID, "DefaultAmbulanceTargetAllocator.AmbulanceTeamInfo"], |
| 116 | + ) -> list[AmbulanceTeamEntity]: |
| 117 | + result = [] |
| 118 | + for entity in self._world_info.get_entities_of_types([AmbulanceTeamEntity]): |
| 119 | + info = info_map[entity.get_id()] |
| 120 | + if info is not None and info._can_new_action: |
| 121 | + result.append(entity) |
| 122 | + return result |
| 123 | + |
| 124 | + def _compare_by_distance( |
| 125 | + self, target_entity: Entity |
| 126 | + ) -> Callable[[Entity, Entity], int]: |
| 127 | + def _cmp_func(entity_a: Entity, entity_b: Entity) -> int: |
| 128 | + distance_a = self._world_info.get_distance( |
| 129 | + target_entity.get_id(), entity_a.get_id() |
| 130 | + ) |
| 131 | + distance_b = self._world_info.get_distance( |
| 132 | + target_entity.get_id(), entity_b.get_id() |
| 133 | + ) |
| 134 | + if distance_a < distance_b: |
| 135 | + return -1 |
| 136 | + elif distance_a > distance_b: |
| 137 | + return 1 |
| 138 | + else: |
| 139 | + return 0 |
| 140 | + |
| 141 | + return _cmp_func |
| 142 | + |
| 143 | + def _convert( |
| 144 | + self, |
| 145 | + info_map: dict[EntityID, "DefaultAmbulanceTargetAllocator.AmbulanceTeamInfo"], |
| 146 | + ) -> dict[EntityID, EntityID]: |
| 147 | + result = {} |
| 148 | + for entity_id in info_map.keys(): |
| 149 | + info = info_map[entity_id] |
| 150 | + if info is not None and info._target is not None: |
| 151 | + result[entity_id] = info._target |
| 152 | + return result |
| 153 | + |
| 154 | + class AmbulanceTeamInfo: |
| 155 | + def __init__(self, entity_id: EntityID) -> None: |
| 156 | + self._agent_id: EntityID = entity_id |
| 157 | + self._target: Optional[EntityID] = None |
| 158 | + self._can_new_action: bool = True |
| 159 | + self.command_time: int = -1 |
0 commit comments