36
36
"""
37
37
38
38
import asyncio
39
- import json
39
+ from dataclasses import dataclass
40
40
import os
41
41
import sys
42
42
from pathlib import Path
47
47
from loguru import logger
48
48
49
49
from pipecat .audio .vad .silero import SileroVADAnalyzer
50
+ from pipecat .frames .frames import ControlFrame , EndFrame , Frame
50
51
from pipecat .pipeline .pipeline import Pipeline
51
52
from pipecat .pipeline .runner import PipelineRunner
52
53
from pipecat .pipeline .task import PipelineParams , PipelineTask
53
54
from pipecat .processors .aggregators .openai_llm_context import OpenAILLMContext
55
+ from pipecat .processors .frame_processor import FrameDirection , FrameProcessor
54
56
from pipecat .services .cartesia import CartesiaTTSService
55
57
from pipecat .services .deepgram import DeepgramSTTService
56
58
from pipecat .services .google import GoogleLLMService
@@ -146,10 +148,14 @@ async def pre_transferring_to_human_agent(action: dict, flow_manager: FlowManage
146
148
}
147
149
)
148
150
149
- # TODO: this should only run after LLM "transferring you to agent" speech is spoken
150
- async def post_transferring_to_human_agent (action : dict , flow_manager : FlowManager ):
151
+ async def queue_post_transferring_to_human_agent (action : dict , flow_manager : FlowManager ):
152
+ task = flow_manager .task
153
+ await task .queue_frame (PostTransferringToHumanAgentFrame ())
154
+
155
+ # NOTE: this isn't a "real" post-action because it needs to run after the "transferring you to a
156
+ # human agent" speech is actually spoken, not just after sending the LLM the instruction to do so.
157
+ async def post_transferring_to_human_agent (transport : DailyTransport ):
151
158
"""Post-action after starting transferring to the human agent."""
152
- transport : DailyTransport = flow_manager .transport
153
159
customer_participant_id = get_customer_participant_id (transport = transport )
154
160
155
161
# Update the customer:
@@ -172,10 +178,15 @@ async def post_transferring_to_human_agent(action: dict, flow_manager: FlowManag
172
178
}
173
179
)
174
180
175
- # TODO: this should only run after LLM "I'm patching you through" speech is spoken
176
- async def post_end_human_agent_conversation (action : dict , flow_manager : FlowManager ):
177
- """Configure the participants' settings (send and receive permissions) for when the customer is taken off of hold."""
178
- transport : DailyTransport = flow_manager .transport
181
+ async def queue_post_end_human_agent_conversation (action : dict , flow_manager : FlowManager ):
182
+ task = flow_manager .task
183
+ await task .queue_frame (PostEndHumanAgentConversationFrame ())
184
+
185
+ # NOTE: this isn't a "real" post-action because it needs to run after the "I'm patching you through
186
+ # to the customer" speech is actually spoken, not just after sending the LLM the instruction to do
187
+ # so.
188
+ async def post_end_human_agent_conversation (transport : DailyTransport ):
189
+ """Post-action after starting to end the conversation with the human agent, when the agent is being patched through to the customer."""
179
190
customer_participant_id = get_customer_participant_id (transport = transport )
180
191
agent_participant_id = get_human_agent_participant_id (transport = transport )
181
192
@@ -361,11 +372,11 @@ def create_transferring_to_human_agent_node() -> NodeConfig:
361
372
handler = pre_transferring_to_human_agent
362
373
)
363
374
],
364
- # TODO: this should only run after LLM "transferring you to agent" speech is spoken
375
+ # NOTE: "real" post action (post_transferring_to_human_agent) is triggered by CustomControlProcessor
365
376
post_actions = [
366
377
ActionConfig (
367
- type = "post_transferring_to_human_agent" ,
368
- handler = post_transferring_to_human_agent
378
+ type = "queue_post_transferring_to_human_agent" ,
379
+ handler = queue_post_transferring_to_human_agent
369
380
)
370
381
]
371
382
)
@@ -431,13 +442,12 @@ def create_end_human_agent_conversation_node() -> NodeConfig:
431
442
},
432
443
],
433
444
functions = [],
434
- # TODO: this should only run after LLM "I'm patching you through" speech is spoken
445
+ # NOTE: "real" post action (post_end_human_agent_conversation) is triggered by CustomControlProcessor
435
446
post_actions = [
436
447
ActionConfig (
437
- type = "post_end_human_agent_conversation" ,
438
- handler = post_end_human_agent_conversation
439
- ),
440
- ActionConfig (type = "end_conversation" )
448
+ type = "queue_post_end_human_agent_conversation" ,
449
+ handler = queue_post_end_human_agent_conversation
450
+ )
441
451
]
442
452
)
443
453
@@ -532,6 +542,36 @@ async def get_token(user_id: str, permissions: dict, daily_rest_helper: DailyRES
532
542
))
533
543
)
534
544
545
+ @dataclass
546
+ class PostTransferringToHumanAgentFrame (ControlFrame ):
547
+ """
548
+ Indicates that the bot has finished speaking the "transferring you to human agent" speech.
549
+ """
550
+ pass
551
+
552
+ @dataclass
553
+ class PostEndHumanAgentConversationFrame (ControlFrame ):
554
+ """
555
+ Indicates that the bot has finished speaking the "I'm patching you through to the customer" speech.
556
+ """
557
+ pass
558
+
559
+ class CustomControlProcessor (FrameProcessor ):
560
+ def __init__ (self , transport : DailyTransport ):
561
+ super ().__init__ ()
562
+ self .__transport = transport
563
+
564
+ async def process_frame (self , frame : Frame , direction : FrameDirection ):
565
+ await super ().process_frame (frame , direction )
566
+
567
+ if isinstance (frame , PostTransferringToHumanAgentFrame ):
568
+ await post_transferring_to_human_agent (transport = self .__transport )
569
+ elif isinstance (frame , PostEndHumanAgentConversationFrame ):
570
+ await post_end_human_agent_conversation (transport = self .__transport )
571
+ # TODO: how to trigger EndFrame() from here? we don't have reference to PipelineTask
572
+ # await self.queue_frame(EndFrame())
573
+
574
+ await self .push_frame (frame , direction )
535
575
536
576
async def main ():
537
577
"""Main function to set up and run the bot."""
@@ -557,6 +597,7 @@ async def main():
557
597
voice_id = "820a3788-2b37-4d21-847a-b65d8a68c99a" , # Salesman
558
598
)
559
599
llm = GoogleLLMService (api_key = os .getenv ("GOOGLE_API_KEY" ))
600
+ custom_control_processor = CustomControlProcessor (transport = transport )
560
601
561
602
# Initialize context
562
603
context = OpenAILLMContext ()
@@ -572,6 +613,7 @@ async def main():
572
613
tts ,
573
614
transport .output (),
574
615
context_aggregator .assistant (),
616
+ custom_control_processor
575
617
]
576
618
)
577
619
task = PipelineTask (pipeline = pipeline , params = PipelineParams (allow_interruptions = True ))
0 commit comments