-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathfunction_app.py
160 lines (126 loc) · 6.26 KB
/
function_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import azure.functions as func
import logging
import os
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from telebot import TeleBot
import json
from openai import AzureOpenAI
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.route(route="http_trigger", auth_level=func.AuthLevel.ANONYMOUS)
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
# Get the bot token from environment variables
bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
# Check if the request method is POST
if req.method == 'POST':
# Get the JSON content of the request
update = req.get_json()
try:
# Check if the update contains a message
if update['message']:
# Extract the username, user id, and chat id from the message
username = update['message']['from']['first_name']
user_id = update['message']['from']['id']
chat_id = update['message']['chat']['id']
# Create a file prefix using the username and user id
fileprefix = f'{username}_{user_id}'
# Log the received message
logging.warning(f'User {username} with id {user_id} sent a message: {update["message"]["text"]}')
# Get the Azure storage connection string from environment variables
connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
# Initialize the BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(conn_str=connection_string)
# Get the blob client for the specific blob
blob_client = blob_service_client.get_blob_client("history", f'{fileprefix}_history.txt')
# Call the message_next function to process the message
message_next(chat_id, bot_token, update['message']['text'], fileprefix, blob_client)
# Return a 200 OK response
return func.HttpResponse(status_code=200)
except Exception as e:
# Log any exceptions that occur
bot = TeleBot(bot_token)
bot.send_message(chat_id, f'Oops, something went wrong: {e}', parse_mode="HTML")
logging.error(f'Oops, we got an exception in http_trigger: {e}')
# Return a 200 OK response
return func.HttpResponse(status_code=200)
else:
# If the request method is not POST, return a 200 OK response with a message
return func.HttpResponse("This is a bot server.", status_code=200)
def message_next(chat_id, bot_token, text, fileprefix, blob_client):
# Initialize the bot
bot = TeleBot(bot_token)
# Initialize the conversation and query
conversation = []
query = []
# Check if the blob exists
if blob_client.exists():
# If the text is '/startover', delete the blob and start over
if text == '/startover':
blob_client.delete_blob()
bot.send_message(chat_id, "Okay, let's start over")
logging.warning(f'{fileprefix}_history.txt deleted')
return func.HttpResponse("This is a bot server.", status_code=200)
else:
# Download the blob and split it into lines
conversation = blob_client.download_blob().readall().decode('utf-8')
lines = conversation.split('\n')
conversation = [json.loads(line) for line in lines]
# Check if the previous user input is the same as the current one (Prevent timeout loo)
if conversation[-2]["content"] == text:
logging.warning('The text is the same as the previous user content in the history file')
return func.HttpResponse(status_code=200)
# Append the user's text to the conversation
conversation.append({"role": "user", "content": text})
conversation = conversation[-4:]
# Read the prompt from the file, replace [Language] with the variable Language, and add it to the query
Language = os.getenv("Language")
with open(f'prompt_language.txt', 'r') as f:
prompt = f.read().strip().replace("[Language]", Language)
query = [{"role": "system", "content": prompt}]
query.extend(conversation)
# Get the response from the AI model
full_response = get_response(query)
logging.warning(f'Full response: {full_response}')
# Split the response into code and non-code parts
if '***' in full_response:
split_response = full_response.split('***')
code_response = split_response[1]
bot.send_message(chat_id, f"<b>{code_response}</b>", parse_mode="HTML")
logging.warning(f'Corrected text: {code_response}')
non_code_response = split_response[0] + split_response[2]
if non_code_response:
bot.send_message(chat_id, non_code_response)
logging.warning(f'Explanation: {non_code_response}')
else:
code_response = ''
non_code_response = full_response
logging.warning(f'No corrections provided: {full_response}')
bot.send_message(chat_id, full_response)
# Append the assistant's response to the conversation
conversation.append({"role": "assistant", "content": code_response})
# Convert the conversation to JSON and upload it to the blob
conversation = [json.dumps(message) for message in conversation]
conversation = "\n".join(conversation)
blob_client.upload_blob(conversation, overwrite=True)
def get_response(conversation):
"""
This function is used to get a response from Azure OpenAI.
It takes a conversation as input and returns the response from the AI model.
Parameters:
conversation (list): A list of conversation history.
Returns:
text (str): The response from the AI model.
"""
# Initialize the AzureOpenAI client
client = AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_KEY"),
api_version="2023-09-15-preview",
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
)
# Create a chat completion using the AzureOpenAI client
response = client.chat.completions.create(
model="gpt-4",
messages=conversation,
)
# Extract the content from the response
text = response.choices[0].message.content
return text