diff --git a/supporting-blog-content/using-llamaIndex-workflows-with-elasticsearch/using-llamaIndex-workflows-with-elasticsearch.ipynb b/supporting-blog-content/using-llamaIndex-workflows-with-elasticsearch/using-llamaIndex-workflows-with-elasticsearch.ipynb new file mode 100644 index 00000000..e6bc4f3f --- /dev/null +++ b/supporting-blog-content/using-llamaIndex-workflows-with-elasticsearch/using-llamaIndex-workflows-with-elasticsearch.ipynb @@ -0,0 +1,560 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "LlrEjmtJNpuX" + }, + "source": [ + "# Using LlamaIndex Workflows with Elasticsearch\n", + "\n", + "This notebook demonstrates how to use AutoGen with Elasticsearch. This notebook is based on the article [Using AutoGen with Elasticsearch](https://www.elastic.co/search-labs/blog/using-autogen-with-elasticsearch)." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "GNaAN-GNO5qp" + }, + "source": [ + "## Installing dependencies and importing packages" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "collapsed": true, + "id": "D1SqWMbbASRS", + "outputId": "246b5a40-aaaf-4642-a6ae-a8477cd468cf" + }, + "outputs": [], + "source": [ + "%pip install elasticsearch==8.17 llama-index llama-index-llms-groq" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "rAesontNXpLu" + }, + "outputs": [], + "source": [ + "import os\n", + "import json\n", + "from getpass import getpass\n", + "\n", + "from elasticsearch import Elasticsearch\n", + "from elasticsearch.helpers import bulk\n", + "\n", + "from llama_index.llms.groq import Groq\n", + "from llama_index.core.workflow import (\n", + " Event,\n", + " StartEvent,\n", + " StopEvent,\n", + " Workflow,\n", + " step,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "NwOmnk99Pfh3" + }, + "source": [ + "## Declaring variables" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "GVKJKfFpPWuj" + }, + "outputs": [], + "source": [ + "os.environ[\"GROQ_API_KEY\"] = getpass(\"Groq Api key: \")\n", + "os.environ[\"ELASTIC_ENDPOINT\"] = getpass(\"Elastic Endpoint: \")\n", + "os.environ[\"ELASTIC_API_KEY\"] = getpass(\"Elastic Api key: \")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "4trslDdyUl9L" + }, + "source": [ + "## Elasticsearch client" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "YwCeepe8gIZ-" + }, + "outputs": [], + "source": [ + "# Elasticsearch client\n", + "_client = Elasticsearch(\n", + " os.environ[\"ELASTIC_ENDPOINT\"],\n", + " api_key=os.environ[\"ELASTIC_API_KEY\"],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "O6s2J6BUUvpe" + }, + "source": [ + "## Elasticsearch mappings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "YLIzkbxEU82r" + }, + "outputs": [], + "source": [ + "INDEX_NAME = \"hotel-rooms\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "tj37YaK2UyiU", + "outputId": "07452ae0-6285-4546-ee61-04faee21f5e3" + }, + "outputs": [], + "source": [ + "try:\n", + " _client.indices.create(\n", + " index=INDEX_NAME,\n", + " body={\n", + " \"mappings\": {\n", + " \"properties\": {\n", + " \"room_name\": {\"type\": \"text\"},\n", + " \"description\": {\"type\": \"text\"},\n", + " \"price_per_night\": {\"type\": \"integer\"},\n", + " \"beds\": {\"type\": \"byte\"},\n", + " \"features\": {\"type\": \"keyword\"},\n", + " }\n", + " }\n", + " },\n", + " )\n", + "\n", + " print(\"index created successfully\")\n", + "except Exception as e:\n", + " print(\n", + " f\"Error creating inference endpoint: {e.info['error']['root_cause'][0]['reason'] }\"\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "LVr6TR8qlw2M" + }, + "outputs": [], + "source": [ + "documents = [\n", + " {\n", + " \"room_name\": \"Standard Room\",\n", + " \"beds\": 1,\n", + " \"description\": \"A cozy room with a comfortable queen-size bed, ideal for solo travelers or couples.\",\n", + " \"price_per_night\": 80,\n", + " \"features\": [\"air conditioning\", \"wifi\", \"flat-screen TV\", \"mini fridge\"],\n", + " },\n", + " {\n", + " \"room_name\": \"Deluxe Room\",\n", + " \"beds\": 1,\n", + " \"description\": \"Spacious room with a king-size bed and modern amenities for a luxurious stay.\",\n", + " \"price_per_night\": 120,\n", + " \"features\": [\"air conditioning\", \"wifi\", \"smart TV\", \"mini bar\", \"city view\"],\n", + " },\n", + " {\n", + " \"room_name\": \"Family Room\",\n", + " \"beds\": 2,\n", + " \"description\": \"A large room with two queen-size beds, perfect for families or small groups.\",\n", + " \"price_per_night\": 150,\n", + " \"features\": [\"air conditioning\", \"wifi\", \"flat-screen TV\", \"sofa\", \"bath tub\"],\n", + " },\n", + " {\n", + " \"room_name\": \"Suite\",\n", + " \"beds\": 1,\n", + " \"description\": \"An elegant suite with a separate living area, offering maximum comfort and luxury.\",\n", + " \"price_per_night\": 200,\n", + " \"features\": [\"air conditioning\", \"wifi\", \"smart TV\", \"jacuzzi\", \"balcony\"],\n", + " },\n", + " {\n", + " \"room_name\": \"Penthouse Suite\",\n", + " \"beds\": 1,\n", + " \"description\": \"The ultimate luxury experience with a panoramic view and top-notch amenities.\",\n", + " \"price_per_night\": 350,\n", + " \"features\": [\n", + " \"air conditioning\",\n", + " \"wifi\",\n", + " \"private terrace\",\n", + " \"jacuzzi\",\n", + " \"exclusive lounge access\",\n", + " ],\n", + " },\n", + " {\n", + " \"room_name\": \"Single Room\",\n", + " \"beds\": 1,\n", + " \"description\": \"A compact and comfortable room designed for solo travelers on a budget.\",\n", + " \"price_per_night\": 60,\n", + " \"features\": [\"wifi\", \"air conditioning\", \"desk\", \"flat-screen TV\"],\n", + " },\n", + " {\n", + " \"room_name\": \"Double Room\",\n", + " \"beds\": 1,\n", + " \"description\": \"A well-furnished room with a queen-size bed, ideal for couples or business travelers.\",\n", + " \"price_per_night\": 100,\n", + " \"features\": [\"air conditioning\", \"wifi\", \"mini fridge\", \"work desk\"],\n", + " },\n", + " {\n", + " \"room_name\": \"Executive Suite\",\n", + " \"beds\": 1,\n", + " \"description\": \"A high-end suite with premium furnishings and exclusive business amenities.\",\n", + " \"price_per_night\": 250,\n", + " \"features\": [\n", + " \"air conditioning\",\n", + " \"wifi\",\n", + " \"smart TV\",\n", + " \"conference table\",\n", + " \"city view\",\n", + " ],\n", + " },\n", + " {\n", + " \"room_name\": \"Honeymoon Suite\",\n", + " \"beds\": 1,\n", + " \"description\": \"A romantic suite with a king-size bed, perfect for newlyweds and special occasions.\",\n", + " \"price_per_night\": 220,\n", + " \"features\": [\n", + " \"air conditioning\",\n", + " \"wifi\",\n", + " \"hot tub\",\n", + " \"romantic lighting\",\n", + " \"balcony\",\n", + " ],\n", + " },\n", + " {\n", + " \"room_name\": \"Presidential Suite\",\n", + " \"beds\": 2,\n", + " \"description\": \"A luxurious suite with separate bedrooms and a living area, offering first-class comfort.\",\n", + " \"price_per_night\": 500,\n", + " \"features\": [\n", + " \"air conditioning\",\n", + " \"wifi\",\n", + " \"private dining area\",\n", + " \"personal butler service\",\n", + " \"exclusive lounge access\",\n", + " ],\n", + " },\n", + "]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def build_data():\n", + " for doc in documents:\n", + " yield {\"_index\": INDEX_NAME, \"_source\": doc}\n", + "\n", + "\n", + "try:\n", + " success, errors = bulk(_client, build_data())\n", + " print(f\"{success} documents indexed successfully\")\n", + " if errors:\n", + " print(\"Errors during indexing:\", errors)\n", + "\n", + "except Exception as e:\n", + " print(f\"Error: {str(e)}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Llama-index workflow" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class ElasticsearchRequest:\n", + " @staticmethod\n", + " def get_mappings(_es_client: Elasticsearch):\n", + " \"\"\"\n", + " Get the mappings of the Elasticsearch index.\n", + " \"\"\"\n", + "\n", + " return _es_client.indices.get_mapping(index=INDEX_NAME)\n", + "\n", + " @staticmethod\n", + " async def do_es_query(query: str, _es_client: Elasticsearch):\n", + " \"\"\"\n", + " Execute an Elasticsearch query and return the results as a JSON string.\n", + " \"\"\"\n", + "\n", + " try:\n", + " parsed_query = json.loads(query)\n", + "\n", + " if \"query\" not in parsed_query:\n", + " return Exception(\n", + " \"Error: Query JSON must contain a 'query' key\"\n", + " ) # if the query is not a valid JSON return an error\n", + "\n", + " response = _es_client.search(index=INDEX_NAME, body=parsed_query)\n", + " hits = response[\"hits\"][\"hits\"]\n", + "\n", + " if not hits or len(hits) == 0:\n", + " return Exception(\n", + " \"Query has not found any results\"\n", + " ) # if the query has no results return an error\n", + "\n", + " return json.dumps([hit[\"_source\"] for hit in hits], indent=2)\n", + "\n", + " except json.JSONDecodeError:\n", + " return Exception(\"Error: Query JSON no valid format\")\n", + " except Exception as e:\n", + " return Exception(str(e))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "EXTRACTION_PROMPT = \"\"\"\n", + "Context information is below:\n", + "---------------------\n", + "{passage}\n", + "---------------------\n", + "\n", + "Given the context information and not prior knowledge, create a Elastic search query from the information in the context.\n", + "The query must return the documents that match with query and the context information and the query used for retrieve the results.\n", + "{schema}\n", + "\n", + "\"\"\"\n", + "\n", + "REFLECTION_PROMPT = \"\"\"\n", + "You already created this output previously:\n", + "---------------------\n", + "{wrong_answer}\n", + "---------------------\n", + "\n", + "This caused the error: {error}\n", + "\n", + "Try again, the response must contain only valid Elasticsearch queries. Do not add any sentence before or after the JSON object.\n", + "Do not repeat the query.\n", + "\"\"\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class ExtractionDone(Event):\n", + " output: str\n", + " passage: str\n", + "\n", + "\n", + "class ValidationErrorEvent(Event):\n", + " error: str\n", + " wrong_output: str\n", + " passage: str" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class ReflectionWorkflow(Workflow):\n", + " model_retries: int = 0\n", + " max_retries: int = 3\n", + "\n", + " @step()\n", + " async def extract(\n", + " self, ev: StartEvent | ValidationErrorEvent\n", + " ) -> StopEvent | ExtractionDone:\n", + "\n", + " print(\"\\n=== EXTRACT STEP ===\\n\")\n", + "\n", + " if isinstance(ev, StartEvent):\n", + " model = ev.get(\"model\")\n", + " passage = ev.get(\"passage\")\n", + "\n", + " if not passage:\n", + " return StopEvent(result=\"Please provide some text in input\")\n", + "\n", + " reflection_prompt = \"\"\n", + " elif isinstance(ev, ValidationErrorEvent):\n", + " passage = ev.passage\n", + " model = ev.model\n", + "\n", + " reflection_prompt = REFLECTION_PROMPT.format(\n", + " wrong_answer=ev.wrong_output, error=ev.error\n", + " )\n", + "\n", + " llm = Groq(model=model, api_key=os.environ[\"GROQ_API_KEY\"])\n", + "\n", + " prompt = EXTRACTION_PROMPT.format(\n", + " passage=passage, schema=ElasticsearchRequest.get_mappings(_client)\n", + " )\n", + " if reflection_prompt:\n", + " prompt += reflection_prompt\n", + "\n", + " output = await llm.acomplete(prompt)\n", + "\n", + " print(f\"MODEL: {model}\")\n", + " print(f\"OUTPUT: {output}\")\n", + " print(\"=================\\n\")\n", + "\n", + " return ExtractionDone(output=str(output), passage=passage, model=model)\n", + "\n", + " @step()\n", + " async def validate(self, ev: ExtractionDone) -> StopEvent | ValidationErrorEvent:\n", + "\n", + " print(\"\\n=== VALIDATE STEP ===\\n\")\n", + "\n", + " try:\n", + " results = await ElasticsearchRequest.do_es_query(ev.output, _client)\n", + " self.model_retries += 1\n", + "\n", + " if self.model_retries > self.max_retries and ev.model != \"llama3-70b-8192\":\n", + " print(f\"Max retries for model {ev.model} reached, changing model\\n\")\n", + " model = \"llama3-70b-8192\" # if the some error occurs, the model will be changed to llama3-70b-8192\n", + " else:\n", + " model = ev.model\n", + "\n", + " print(f\"ELASTICSEARCH RESULTS: {results}\")\n", + "\n", + " if isinstance(results, Exception):\n", + " print(\"STATUS: Validation failed, retrying...\\n\")\n", + " print(\"===================\\n\")\n", + "\n", + " return ValidationErrorEvent(\n", + " error=str(results),\n", + " wrong_output=ev.output,\n", + " passage=ev.passage,\n", + " model=model,\n", + " )\n", + "\n", + " except Exception as e:\n", + " print(\"STATUS: Validation failed, retrying...\\n\")\n", + " print(\"===================\\n\")\n", + "\n", + " return ValidationErrorEvent(\n", + " error=str(e),\n", + " wrong_output=ev.output,\n", + " passage=ev.passage,\n", + " model=model,\n", + " )\n", + "\n", + " return StopEvent(result=ev.output)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Execute workflow" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "w = ReflectionWorkflow(timeout=60, verbose=True)\n", + "\n", + "user_prompt = \"Rooms with smart TV, wifi, jacuzzi and price per night less than 300\"\n", + "\n", + "result = await w.run(\n", + " passage=f\"I need the best possible query for documents that have: {user_prompt}\",\n", + " model=\"mistral-saba-24b\",\n", + ")\n", + "\n", + "print(result)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "S6WZMJayyzxh" + }, + "source": [ + "## Cleaning environment\n", + "\n", + "Delete the resources used to prevent them from consuming resources." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "Sd5o_jIwTthu", + "outputId": "5a294e15-84c6-4017-9319-717b51d49a97" + }, + "outputs": [], + "source": [ + "# Cleanup - Delete Index\n", + "_client.indices.delete(index=INDEX_NAME, ignore=[400, 404])" + ] + } + ], + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "display_name": "3.12.2", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.2" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +}