Skip to content

Commit

Permalink
ENH: Add variable non-redundant adjudicator
Browse files Browse the repository at this point in the history
  • Loading branch information
maffettone committed Mar 17, 2023
1 parent 9265ccb commit b995898
Showing 1 changed file with 61 additions and 3 deletions.
64 changes: 61 additions & 3 deletions bluesky_adaptive/adjudicators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections import deque
from copy import deepcopy
from threading import Lock, Thread
from typing import Sequence, Tuple
from typing import Callable, Sequence, Tuple

from bluesky_kafka import BlueskyConsumer
from bluesky_queueserver_api import BPlan
Expand Down Expand Up @@ -39,7 +39,7 @@ class AdjudicatorBase(BlueskyConsumer, ABC):
"""
An agent adjudicator that listens to published suggestions by agents.
This Base approach (as per `process_document`) only retains the most recent suggestions by any named agents.
Other mechanisms for tracking can be provided
Other mechanisms for tracking can be provided as in example sub-classes.
Parameters
----------
Expand Down Expand Up @@ -140,7 +140,7 @@ class AgentByNameAdjudicator(AdjudicatorBase):
"""Adjudicator that only allows messages from a set primary agent, and uses a single qserver.
Parameters
----------
qserver : dict[str, API_Threads_Mixin]
qservers : dict[str, API_Threads_Mixin]
Dictionary of objects to manage communication with Queue Server. These should be keyed by the beamline TLA
expected in AdjudicatorMsg.suggestions dictionary.
"""
Expand Down Expand Up @@ -176,3 +176,61 @@ def make_judgments(self) -> Sequence[Tuple[API_Threads_Mixin, str, Suggestion]]:
Judgment(re_manager=manager, agent_name=self.primary_agent, suggestion=suggestion)
)
return judgments


class NonredundantAdjudicator(AdjudicatorBase):
"""Use a hashing function to convert any suggestion into a unique hash.
Parameters
----------
topics : list of str
List of existing_topics as strings such as ["topic-1", "topic-2"]
bootstrap_servers : str
Comma-delimited list of Kafka server addresses as a string
such as ``'broker1:9092,broker2:9092,127.0.0.1:9092'``
group_id : str
Required string identifier for the consumer's Kafka Consumer group.
qservers : dict[str, API_Threads_Mixin]
Dictionary of objects to manage communication with Queue Server. These should be keyed by the beamline TLA
expected in AdjudicatorMsg.suggestions dictionary.
hash_suggestion : Callable
Function that takes the tla and Suggestion object, and returns a hashable object.
This hashable object will be used to check redundancy in a set.
Examples
--------
>>> def hash_suggestion(tla: str, suggestion: Suggestion):
>>> # Uses only the tla, plan name, and args to define redundancy, avoiding any details in kwargs
>>> return f"{tla} {suggestion.plan_name} {str(suggestion.plan_args)}"
"""

def __init__(
self,
topics: list[str],
bootstrap_servers: str,
group_id: str,
*args,
qservers: dict[str, API_Threads_Mixin],
hash_suggestion: Callable,
**kwargs,
):

super().__init__(topics, bootstrap_servers, group_id, *args, **kwargs)
self.hash_suggestion = hash_suggestion
self.suggestion_set = set()
self._re_managers = qservers

def make_judgments(self) -> Sequence[Tuple[API_Threads_Mixin, str, Suggestion]]:
"""Loop over all recieved adjudicator mesages, and their suggested plans by beamline,
seeking redundancy."""
passing_judgements = []
for agent_name, adjudicator_msg in self.current_suggestions.items():
for tla, suggestions in adjudicator_msg.suggestions.items():
for suggestion in suggestions:
hashable = self.hash_suggestion(tla, suggestion)
if hashable in self.suggestion_set:
continue
else:
passing_judgements.append(Judgment(self._re_managers[tla], agent_name, suggestion))
self.suggestion_set.add(hashable)
return passing_judgements

0 comments on commit b995898

Please sign in to comment.