-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlangflow_runner.py
159 lines (135 loc) · 5.61 KB
/
langflow_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""A class to handle running the babblefish.ai Langflow GenAI workflow and extracting responses."""
import os
import logging
from typing import Optional, Dict, Any
import threading
from dotenv import load_dotenv
import requests
import coloredlogs
# Load environment variables from .env file
load_dotenv()
BASE_API_URL = os.getenv('BASE_API_URL')
if not BASE_API_URL:
raise EnvironmentError("BASE_API_URL environment variable not set")
# Configure logging
logger = logging.getLogger(__name__)
coloredlogs.install(level='INFO',
logger=logger,
fmt='%(filename)s %(levelname)s %(message)s',
level_styles={
'debug': {'color': 'green'},
'info': {'color': 'blue'},
'warning': {'color': 'yellow'},
'error': {'color': 'red'},
'critical': {'color': 'magenta'}
}
)
class LangflowRunner:
"""A class to handle running the babblefish flow and extracting responses."""
def __init__(self,
flow_id: str,
api_key: Optional[str] = None,
tweaks: Optional[Dict[str, Any]] = None):
"""
Initialize the FlowRunner with the necessary parameters.
:param flow_id: The ID of the flow to run.
:param api_key: Optional API key for authentication.
:param tweaks: Optional dictionary for custom tweaks to the flow.
"""
self.flow_id = flow_id
self.api_key = api_key
self.tweaks = tweaks
self.condition = threading.Condition()
self.response = None
def run_flow(self,
message: str,
output_type: str = "chat",
input_type: str = "chat") -> Dict[str, Any]:
"""
Run a flow with a given message and optional tweaks.
:param message: The message to send to the flow.
:param output_type: The type of output expected (default is "chat").
:param input_type: The type of input provided (default is "chat").
:return: The JSON response from the flow.
"""
api_url = f"{BASE_API_URL}/{self.flow_id}"
logger.info("API URL: %s", api_url)
payload = {
"input_value": message,
"output_type": output_type,
"input_type": input_type,
}
if self.tweaks:
payload["tweaks"] = self.tweaks # type: ignore
headers = {"x-api-key": self.api_key} if self.api_key else None
try:
response = requests.post(api_url, json=payload, headers=headers, timeout=30)
return response.json()
except requests.RequestException as e:
logger.error("Request failed: %s", e)
return {}
def run_flow_async(self, message: str, output_type: str = "chat", input_type: str = "chat"):
"""
Run a flow asynchronously with a given message and optional tweaks.
:param message: The message to send to the flow.
:param output_type: The type of output expected (default is "chat").
:param input_type: The type of input provided (default is "chat").
"""
def api_thread():
result = self.run_flow(message, output_type, input_type)
with self.condition:
self.response = result
self.condition.notify()
thread = threading.Thread(target=api_thread)
thread.start()
def get_response(self) -> Dict[str, Any]:
"""
Get the response from the asynchronous flow run.
:return: The JSON response from the flow.
"""
with self.condition:
self.condition.wait()
response = self.response
self.response = None
return response or {}
def extract_output_message(self, response_json: Dict[str, Any]) -> Dict[str, str]:
"""
Extract the output messages from the flow response JSON and combine them into a single results object.
:param response_json: The JSON response from the flow.
:return: A dictionary containing the extracted results.
"""
outputs = response_json.get('outputs', [])
if not outputs:
return {
'translation': 'N/A',
'explanation': 'N/A',
'detected_language': 'N/A',
'sentiment': 'N/A'
}
# Initialize default values
translation = 'N/A'
explanation = 'N/A'
detected_language = 'N/A'
sentiment_output = 'N/A'
# Iterate through the outputs to find the relevant components
for output in outputs[0].get('outputs', []):
component_display_name = output.get('component_display_name', '')
message_text = output.get('results', {}).get('message', {}).get('text', 'N/A')
if component_display_name == 'Translation':
translation = message_text
elif component_display_name == 'Explanation':
explanation = message_text
elif component_display_name == 'Detected Language':
detected_language = message_text
elif component_display_name == 'Sentiment':
sentiment_output = message_text
# Log the extracted information
logger.info("Translation: %s", translation)
logger.info("Detected Language: %s\n", detected_language)
results = {
'translation': translation,
'explanation': explanation,
'detected_language': detected_language,
'sentiment': sentiment_output
}
return results