Skip to content

Commit d5566a1

Browse files
authored
Add max_concurrency config option for agent llm calls (#79)
1 parent aa2f476 commit d5566a1

File tree

7 files changed

+42
-20
lines changed

7 files changed

+42
-20
lines changed

README.md

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,9 @@ limitations under the License.
7070
- [Resetting the entire cache](#resetting-the-entire-cache)
7171
- [Resetting just the LLM cache or the services cache](#resetting-just-the-llm-cache-or-the-services-cache)
7272
- [Vector databases](#vector-databases)
73-
- [Service outages](#service-outages)
73+
- [Service errors](#service-errors)
7474
- [National Vulnerability Database (NVD)](#national-vulnerability-database-nvd)
75+
- [NVIDIA API Catalog / NVIDIA-hosted NIMs](#nvidia-api-catalog--nvidia-hosted-nims)
7576
- [Running out of credits](#running-out-of-credits)
7677
- [Testing and validation](#testing-and-validation)
7778
- [License](#license)
@@ -506,7 +507,7 @@ The configuration defines how the workflow operates, including model settings, i
506507
2. **LLM engine configuration (`engine`)**: The `engine` section configures various models for the LLM nodes.
507508
- LLM processing nodes: `agent`, `checklist_model`, `justification_model`, `summary_model`
508509
- `model_name`: The name of the LLM model used by the node.
509-
- `prompt`: Manually set the prompt for the specific model in the configuration. The prompt can either be passed in as a string of text or as a path to a text file containing the desired prompting.
510+
- `prompt`: Manually set the prompt for the specific model in the configuration. The prompt can either be passed in as a string of text or as a path to a text file containing the desired prompting.
510511
- `service`: Specifies the service for running the LLM inference. (Set to `nvfoundation` if using NIM.)
511512
- `max_tokens`: Defines the maximum number of tokens that can be generated in one output step.
512513
- `temperature`: Controls randomness in the output. A lower temperature produces more deterministic results.
@@ -516,6 +517,7 @@ The configuration defines how the workflow operates, including model settings, i
516517
- `return_intermediate_steps`: Controls whether to return intermediate steps taken by the agent, and include them in the output file. Helpful for troubleshooting agent responses.
517518
- `return_source_documents`: Controls whether to return source documents from the VDB tools, and include them in the intermediate steps output. Helpful for identifying the source files used in agent responses.
518519
- Note: enabling this will also include source documents in the agent's memory and increase the agent's prompt length.
520+
- `max_concurrency`: Controls the maximum number of concurrent requests to the LLM. Default is `None`, which doesn't limit concurrency.
519521
- Embedding model for generating VDB for RAG: `rag_embedding`
520522
- `_type`: Defines the source of the model used for generating embeddings (e.g., `nim`, `huggingface`, `openai`).
521523
- Other model-dependent parameters, such as `model`/`model_name`, `api_key`, `truncate`, or `encode_kwargs`: see the [embedding model customization](#customizing-the-embedding-model) section below for more details.
@@ -725,8 +727,7 @@ To customize the output, modify the configuration file accordingly. In any confi
725727
}
726728
```
727729
728-
To post the output to an HTTP endpoint, update the JSON object in the config file as follows, replacing the domain, port, and endpoint with the desired
729-
destination (note the trailing slash in the "url" field). The output will be sent as JSON data.
730+
To post the output to an HTTP endpoint, update the JSON object in the config file as follows, replacing the domain, port, and endpoint with the desired destination (note the trailing slash in the "url" field). The output will be sent as JSON data.
730731
731732
```
732733
"output": {
@@ -853,7 +854,7 @@ We've integrated VDB and embedding creation directly into the pipeline with cach
853854
854855
NVIDIA offers optimized models and tools like NIMs ([build.nvidia.com/explore/retrieval](https://build.nvidia.com/explore/retrieval)) and cuVS ([github.com/rapidsai/cuvs](https://github.com/rapidsai/cuvs)).
855856
856-
### Service outages
857+
### Service errors
857858
858859
#### National Vulnerability Database (NVD)
859860
These typically resolve on their own. Please wait and try running the pipeline again later. Example errors:
@@ -868,6 +869,13 @@ Error requesting [1/10]: (Retry 0.1 sec) https://services.nvd.nist.gov/rest/json
868869
Error requesting [1/10]: (Retry 0.1 sec) https://services.nvd.nist.gov/rest/json/cves/2.0: 503, message='Service Unavailable', url=URL('https://services.nvd.nist.gov/rest/json/cves/2.0?cveId=CVE-2023-50447')
869870
```
870871
872+
#### NVIDIA API Catalog / NVIDIA-hosted NIMs
873+
874+
429 errors can occur when your requests exceed the rate limit for the model. Try setting the `engine.agent.max_concurrency` to a low value such as 5 to reduce the rate of requests.
875+
```
876+
Exception: [429] Too Many Requests
877+
```
878+
871879
### Running out of credits
872880
873881
If you run out of credits for the NVIDIA API Catalog, you will need to obtain more credits to continue using the API. Please contact your NVIDIA representative to get more credits added.

configs/from_file.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
"top_p": 0.01,
1313
"seed": 42
1414
},
15-
"verbose": false
15+
"verbose": false,
16+
"return_intermediate_steps": false,
17+
"return_source_documents": false,
18+
"max_concurrency": null
1619
},
1720
"checklist_model": {
1821
"service": {

configs/from_http.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
"top_p": 0.01,
1313
"seed": 42
1414
},
15-
"verbose": false
15+
"verbose": false,
16+
"return_intermediate_steps": false,
17+
"return_source_documents": false,
18+
"max_concurrency": null
1619
},
1720
"checklist_model": {
1821
"service": {

configs/from_manual.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
},
1515
"verbose": false,
1616
"return_intermediate_steps": false,
17-
"return_source_documents": false
17+
"return_source_documents": false,
18+
"max_concurrency": null
1819
},
1920
"checklist_model": {
2021
"service": {

src/cve/data_models/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ class EngineAgentConfig(BaseModel):
242242
verbose: bool = False
243243
return_intermediate_steps: bool = False
244244
return_source_documents: bool = False
245+
max_concurrency: int | None = None
245246

246247

247248
class EngineConfig(BaseModel):

src/cve/nodes/cve_langchain_agent_node.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,11 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
1716
import asyncio
1817
import logging
1918
import typing
2019

2120
from langchain_core.exceptions import OutputParserException
22-
2321
from morpheus_llm.llm import LLMContext
2422
from morpheus_llm.llm import LLMNodeBase
2523

@@ -40,21 +38,27 @@ class CVELangChainAgentNode(LLMNodeBase):
4038
----------
4139
agent_executor : AgentExecutor
4240
The agent executor to use to execute.
43-
44-
vdb_names : tuple[str, str]
45-
Name of the VDBs to load from the input.
41+
replace_exceptions : bool, optional
42+
Whether to replace exceptions with a default value, by default False
43+
replace_exceptions_value : Optional[str], optional
44+
The value to replace exceptions with, by default None
45+
max_concurrency : Optional[int], optional
46+
Maximum number of concurrent agent invocations. None means no limit. By default None.
4647
"""
4748

4849
def __init__(self,
4950
*,
5051
create_agent_executor_fn: "typing.Callable[[LLMContext], AgentExecutor]",
5152
replace_exceptions: bool = False,
52-
replace_exceptions_value: typing.Optional[str] = None):
53+
replace_exceptions_value: typing.Optional[str] = None,
54+
max_concurrency: typing.Optional[int] = None):
5355
super().__init__()
5456

5557
self._create_agent_executor_fn = create_agent_executor_fn
5658
self._replace_exceptions = replace_exceptions
5759
self._replace_exceptions_value = replace_exceptions_value
60+
self._max_concurrency = max_concurrency
61+
self._semaphore = asyncio.Semaphore(max_concurrency) if max_concurrency is not None else None
5862

5963
self._input_names = ["input"]
6064

@@ -154,16 +158,18 @@ async def _run_single(self,
154158

155159
results = await asyncio.gather(*results_async, return_exceptions=True)
156160

157-
# # Transform from list[dict[str, Any]] to dict[str, list[Any]]
158-
# results = {k: [x[k] for x in results] for k in results[0]}
159-
160161
return results
161162

162163
# We are not dealing with a list, so run single
163164
try:
164165
input_single = {"input": kwargs.pop("input")}
165166
config = {"callbacks": agent.callbacks, "tags": agent.tags, "metadata": metadata}
166-
return await agent.ainvoke(input=input_single, config=config, **kwargs)
167+
168+
if self._semaphore is not None:
169+
async with self._semaphore:
170+
return await agent.ainvoke(input=input_single, config=config, **kwargs)
171+
else:
172+
return await agent.ainvoke(input=input_single, config=config, **kwargs)
167173
except Exception as e:
168174
logger.exception("Error running agent: %s", e)
169175
return e

src/cve/pipeline/engine.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from langchain.chains.retrieval_qa.base import RetrievalQA
2323
from langchain.vectorstores.faiss import FAISS
2424
from langchain_core.embeddings import Embeddings
25-
2625
from morpheus_llm.llm import LLMContext
2726
from morpheus_llm.llm import LLMEngine
2827
from morpheus_llm.llm.nodes.extracter_node import ManualExtracterNode
@@ -213,7 +212,8 @@ def build_engine(*, run_config: RunConfig, embeddings: Embeddings):
213212
node=CVELangChainAgentNode(
214213
create_agent_executor_fn=_build_dynamic_agent_fn(run_config, embeddings),
215214
replace_exceptions=True,
216-
replace_exceptions_value="I do not have a definitive answer for this checklist item."))
215+
replace_exceptions_value="I do not have a definitive answer for this checklist item.",
216+
max_concurrency=run_config.engine.agent.max_concurrency))
217217

218218
engine.add_node('summary',
219219
inputs=[("/checklist", "checklist_inputs"), ("/agent/outputs", "checklist_outputs"),

0 commit comments

Comments
 (0)