diff --git a/pdf_agents/agents.py b/pdf_agents/agents.py index fe34dc2..29e859c 100644 --- a/pdf_agents/agents.py +++ b/pdf_agents/agents.py @@ -236,7 +236,7 @@ def get_beamline_objects() -> dict: ) kafka_producer = Publisher( - topic=f"{beamline_tla}.bluesky.adjudicators", + topic=f"{beamline_tla}.mmm.bluesky.adjudicators", bootstrap_servers=",".join(kafka_config["bootstrap_servers"]), key="{beamline_tla}.key", producer_config=kafka_config["runengine_producer_config"], @@ -290,3 +290,42 @@ def tell(self, x, y) -> Dict[str, ArrayLike]: doc = super().tell(x, y) doc["background"] = self.background return doc + + +class PDFReporterMixin: + """Mixin for sending reports to Kafka as well as Tiled. + + Parameters + ---------- + report_producer : Publisher + Bluesky Kafka publisher to produce document stream of agent reports. + + Examples + -------- + >>> class PassiveKmeansAgentReporter(PDFReporterMixin, PassiveKmeansAgent) + >>> agent = PassiveKmeansAgentReporter(report_producer=Publisher(...), k_clusters=3) + """ + + def __init__(self, *args, report_producer: Publisher, **kwargs): + self._report_producer = report_producer + super().__init__(*args, **kwargs) + + def generate_report(self, **kwargs): + doc = self.report(**kwargs) + uid = self._write_event("report", doc) + self._report_producer("report", doc) + logger.info(f"Generated report. Tiled: {uid}\n Kafka: {doc.get('uid', 'No UID')}") + + @classmethod + def get_beamline_objects(cls) -> dict: + ret = super().get_beamline_objects() + beamline_tla = "pdf" + kafka_config = nslsii.kafka_utils._read_bluesky_kafka_config_file( + config_file_path="/etc/bluesky/kafka.yml" + ) + ret["report_producer"] = Publisher( + topic=f"{beamline_tla}.mmm.bluesky.agents", + bootstrap_servers=",".join(kafka_config["bootstrap_servers"]), + key="{beamline_tla}.key", + producer_config=kafka_config["runengine_producer_config"], + )