如果你的课程涉及到多智能体Agent协调合作、调用api,这是一个极好的样例模板。你可以在这个的框架下改成任意你想要的课设题目,比如电商分析之类的。这样可以节省你很多的部署docker file代码、搭建框架、调用api等的时间。
当然如果你想直接抄,只要别盒我,就行😇
随着物联网(IoT)技术的飞速发展与智慧城市建设的深入推进,城市环境监测网络日益完善。从地面监测站到卫星遥感,海量的环境数据(如 PM2.5、PM10、温湿度、风速等)正在以 PB 级的规模爆发式增长。这些多源异构的大数据蕴含着极高的挖掘价值,不仅能够反映城市空气质量的实时状况,还能揭示污染物扩散的时空规律,为政府环保部门的决策制定、交通流量的科学管控以及居民的健康出行提供强有力的数据支撑。
然而,面对如此庞大且复杂的数据集,传统的大数据分析模式显现出了明显的局限性。一方面,数据分析的门槛居高不下。无论是使用 Python、SQL 还是专业的 BI 工具,都要求用户具备扎实的编程基础和统计学知识,这使得大量的业务人员和决策者难以直接从数据中获取洞察。另一方面,数据处理流程割裂且繁琐。从数据采集、清洗、特征工程到建模分析、可视化展示,每一个环节往往需要不同的工具和脚本,缺乏统一的调度机制,导致分析效率低下,无法满足实时响应的需求。
与此同时,以大语言模型(Large Language Model, LLM)为核心的 AI Agent(人工智能智能体)技术正在引发软件工程领域的范式革命。与传统的自动化脚本不同,AI Agent 具备了“感知(Perception)”、“规划(Planning)”、“工具使用(Tool Use)”和“反思(Reflection)”等类人能力。它能够理解人类的自然语言指令,自主拆解复杂任务,并根据任务需求灵活调用各种外部工具(如 API、数据库、计算引擎)。将 AI Agent 引入大数据处理流程,构建“Text-to-Insight”(从文本到洞察)的智能分析系统,已成为当前学术界和工业界共同探索的热点方向。
本项目旨在设计并实现一个基于 Multi-Agent(多智能体)架构的空气质量智能分析演示系统。该系统深度融合了大数据处理技术栈与前沿的 AI Agent 框架,通过集成 Open-Meteo 实时气象数据接口和 DeepSeek 大语言模型,实现了从“自然语言需求”到“可视化分析报告”的全流程自动化。
本项目的核心意义主要体现在以下三个方面:
第一,大幅降低了大数据分析的技术门槛。通过引入 LLM 作为系统的自然语言接口,用户无需编写任何代码,只需输入诸如“分析成都近15天空气质量趋势”或“看看北京的湿度和PM2.5有没有关系”等通俗语言,系统即可自动完成后续的所有技术操作。这使得非技术人员也能轻松驾驭大数据工具,真正实现了数据的普惠化。
第二,探索了 AI Agent 在垂直领域的协同工作模式。本项目没有采用单一的通用 Agent,而是设计了基于 Orchestrator-Worker(指挥官-执行者)模式的多智能体架构。通过指挥官(Orchestrator)进行顶层规划,调度采集员(Collector)、分析师(Analyzer)和绘图员(Visualizer)分工协作,验证了多智能体系统在处理复杂、多阶段任务时的高效性与稳定性,为未来构建更复杂的智能系统积累了实践经验。
第三,实现了全流程技术栈的深度整合。项目将前端交互(Streamlit)、数据处理(Pandas/NumPy)、数据可视化(Plotly)与大模型推理(DeepSeek API)进行了有机结合,打通了数据从采集、清洗、存储到分析、展示的完整链路。这不仅是一次对大数据技术的综合应用,更是一次对“AI + Data”新型应用架构的工程实践,具有较强的示范意义和推广价值。
本系统遵循高内聚、低耦合的设计原则,采用分层架构体系。自下而上依次划分为:数据源层(Data Source Layer)、数据处理层(Data Processing Layer)、AI Agent 核心层(Agent Core Layer) 和 可视化交互层(Interaction Layer)。各层之间通过标准化的接口进行通信,确保了系统的可扩展性与可维护性。
为了保证数据的丰富性和系统的健壮性,本层设计了双模式数据接入机制:
- Open-Meteo API: 作为主数据源,提供全球任意经纬度的历史天气数据(Temperature, Humidity, Wind Speed)和空气质量数据(PM10, PM2.5, SO2, NO2, O3)。该 API 具有高可用、免密钥、响应快的特点,非常适合实时演示。
- Mock Data Generator: 作为备用数据源。考虑到网络环境的不确定性或 API 服务可能出现的限流、宕机等情况,系统内置了基于物理规律的仿真数据生成器。它利用正弦波动模型模拟气温的日变化,叠加高斯白噪声模拟环境随机扰动,确保在极端情况下系统演示流程不中断,数据展示依然符合逻辑。
该层基于 Python 成熟的大数据生态构建,负责数据的计算与存储:
- Pandas: 作为核心数据处理引擎,负责 DataFrame 的构建、时间序列索引的对齐(Resampling)、缺失值的插值填充(Interpolation)以及异常值的过滤。
- NumPy: 提供底层的高效数值计算支持,特别是在计算皮尔逊相关系数矩阵(Correlation Matrix)时,利用其向量化运算能力大幅提升性能。
这是本系统的“大脑”与“中枢”,包含四个职责明确的智能体:
- Orchestrator Agent (指挥官): 负责接收用户的自然语言指令,结合当前系统时间,利用 LLM 的语义理解能力将模糊的需求转化为结构化的任务参数(JSON 格式),并根据任务类型调度后续的 Worker Agent。
- Collector Agent (采集员): 负责执行数据获取任务。它首先调用地理编码服务将城市名转换为经纬度,然后根据时间范围并发请求 API,并负责处理网络异常与数据降级。
- Processor Agent (处理员): 负责数据清洗。它接收原始数据,进行格式标准化,确保传递给分析层的数据是整洁且合规的。
- Analyzer Agent (分析师): 负责数据挖掘。根据任务类型(趋势分析、关联分析、预测等),调用相应的统计算法,生成分析结论。
- Visualizer Agent (绘图员): 负责结果呈现。将抽象的分析结果映射为具体的图表对象(如折线图、热力图、散点图),并配置图表的颜色、标题、轴标签等视觉元素。
该层直接面向终端用户,提供友好的操作界面:
- Streamlit Web UI: 采用响应式设计,支持 PC 端与移动端访问。
- 交互组件: 包括侧边栏的全局配置(API Key、数据源模式)、主区域的自然语言输入框、实时的状态进度条(Status Indicator)以及可折叠的数据预览面板(Expander)。
本系统的核心工作流程体现了典型的“感知-规划-行动”闭环,具体步骤如下:
-
用户意图输入 (User Input): 用户在界面输入自然语言指令
$Q$ ,例如:“帮我分析一下杭州最近一周 PM2.5 和气温的变化趋势”。 -
语义解析与规划 (Intent Parsing & Planning):
OrchestratorAgent接收指令$Q$ 。为了解决 LLM 的“时间幻觉”问题,它首先获取系统当前日期$T_{now}$ ,构建包含时间上下文的 Prompt。然后调用 DeepSeek API,将自然语言解析为结构化参数$P = {city: "杭州", start_date: "202x-xx-xx", end_date: "202x-xx-xx", task_type: "trend", target_var: "pm25"}$ 。 -
数据采集与路由 (Data Acquisition & Routing):
OrchestratorAgent将参数$P$ 传递给CollectorAgent。-
CollectorAgent首先查询本地或远程的地理编码库,获取杭州的坐标$(lat, lon)$ 。 - 随后,它向 Open-Meteo 发起 HTTP GET 请求。如果请求成功,返回原始 JSON 数据;如果请求超时或失败,自动触发降级逻辑,调用
_generate_mock_data生成仿真数据。
-
-
数据清洗与标准化 (Data Cleaning):
ProcessorAgent接收原始数据,将其转换为 Pandas DataFrame。它会执行以下操作:- 检查时间戳列,统一转换为
datetime对象。 - 检测
NaN值,对于连续缺失不超过 3 小时的数据进行线性插值,超过则标记为无效。 - 输出清洗后的数据集
$D_{clean}$ 。
- 检查时间戳列,统一转换为
-
核心分析计算 (Core Analysis):
AnalyzerAgent根据$P.task_type$ 执行特定逻辑:- 若为
trend(趋势),计算移动平均线(Moving Average)和平滑曲线。 - 若为
associate(关联),调用safe_corr函数计算 PM2.5 与 Temp 的相关系数。 - 输出分析结果对象
$R$ ,包含统计指标和结论文本。
- 若为
-
可视化生成 (Visualization Generation):
VisualizerAgent根据$R$ 的数据特征选择最佳图表类型。例如,对于双变量趋势对比,它会生成双 Y 轴图表(Dual-Axis Chart);对于相关性矩阵,它会生成热力图(Heatmap)。最终生成 Plotly Figure 对象$V$ 。 -
结果渲染与反馈 (Rendering & Feedback): Streamlit 前端接收
$V$ 和$R$ ,动态渲染图表,并以 Markdown 格式展示分析结论。同时,系统的状态栏从“分析中”更新为“✅ 分析完成”。
大语言模型是基于静态数据集训练的,本身不具备实时的时间感知能力。如果用户输入“近 7 天”或“上个月”,模型无法知道“今天”具体是哪一天,从而导致日期推理错误。这是 AI Agent 开发中常见的“时间幻觉”(Time Illusion)问题。
为了解决这一问题,我们在 OrchestratorAgent 中实现了 RAG(检索增强生成) 的简化思想——将“系统时间”作为外部知识注入到 Prompt 的上下文中。
关键代码实现 (orchestrator.py):
class OrchestratorAgent:
def parse(self, query: str) -> AgentInput:
# 1. 动态获取系统当前的 UTC 时间
today_str = datetime.utcnow().strftime("%Y-%m-%d")
# 2. 构建包含时间上下文的 System Prompt
# 明确告知模型“今天是...”,强制其基于此基准进行日期推算
prompt = (
f"你是一个智能解析器。今天是 {today_str}。\n"
"你的任务是从用户的自然语言输入中提取关键参数,并以严格的 JSON 格式输出。\n"
"需提取字段:\n"
"- city: 城市名 (如'北京')\n"
"- start_date: 格式 YYYY-MM-DD\n"
"- end_date: 格式 YYYY-MM-DD\n"
"- analysis_type: 任务类型 ['trend','associate','forecast']\n"
"- target_var: 目标变量 ['pm25','temp','humidity']\n"
"默认规则: 如果用户未指定时间,默认为过去15天。\n"
f"用户输入: {query}"
)
# 3. 调用 LLM 并解析返回的 JSON 字符串
try:
json_str = self.client.complete(prompt)
return self._parse_json(json_str)
except Exception as e:
# 异常处理:返回默认参数,保证流程不崩溃
return self._get_default_input()通过这种方式,无论用户何时运行系统,Agent 都能准确理解“昨天”、“上周”对应的具体日期范围。
在实际的大数据系统中,外部 API 的稳定性往往是不可控的(可能出现网络波动、服务宕机或 SSL 握手失败)。为了确保演示系统的 高可用性(High Availability),我们设计了“API 优先,Mock 兜底”的自动降级策略。
关键代码实现 (collector.py):
class CollectorAgent:
def get_data(self, city, start_date, end_date):
# 策略 1: 优先尝试真实 API 调用
try:
# 第一步:地理编码 (City -> Lat/Lon)
lat, lon = self._geocode(city)
# 第二步:获取气象数据
data = self._fetch_open_meteo(lat, lon, start_date, end_date)
if not data.empty:
return data
except Exception as e:
# 记录警告日志,但不中断流程
print(f"[Warning] API 调用失败: {e}, 即将切换至 Mock 模式")
# 策略 2: 降级至 Mock 数据生成
return self._generate_mock_data(city, start_date, end_date)
def _generate_mock_data(self, city, start_str, end_str):
"""
基于物理规律生成仿真数据:
- 温度:利用正弦函数模拟昼夜温差
- PM2.5:利用余弦函数叠加随机噪声模拟污染波动
"""
# 生成时间序列索引
days = (end_date - start_date).days + 1
hours = days * 24
t = np.linspace(0, days * 2 * np.pi, hours)
# 模拟符合物理常识的数据分布
# 温度在 15-25 度之间波动
temp = 20 + 5 * np.sin(t) + np.random.normal(0, 1, hours)
# PM2.5 在 20-80 之间波动,且具有一定的随机性
pm25 = 50 + 30 * np.cos(t) + np.random.normal(0, 5, hours)
return pd.DataFrame({"time": time_index, "temp": temp, "pm25": pm25})这一设计确保了系统具有极强的鲁棒性。即使在断网环境下,系统依然可以生成逻辑自洽的数据和图表,完美满足课堂演示或离线测试的需求。
在处理真实环境数据时,数据缺失(Missing Data)是常态。例如,传感器故障可能导致某段时间的数据为空。如果直接调用 pandas.corr() 计算相关系数,可能会因为数据完全缺失而得到 NaN,进而导致后续的可视化模块报错。
为此,我们在 AnalyzerAgent 中实现了防御性编程逻辑,封装了 safe_corr 函数。
关键代码实现 (analyzer.py):
def safe_corr(s1, s2):
"""
安全地计算皮尔逊相关系数,处理数据缺失和样本量不足的情况。
"""
# 1. 创建掩码,仅选择两个序列均为非空(not NaN)的索引
valid_mask = (~np.isnan(s1)) & (~np.isnan(s2))
# 2. 检查有效样本量
# 统计学上,样本量过少会导致相关系数毫无意义
if np.sum(valid_mask) < 3:
return None # 返回 None 标识无法计算
# 3. 仅在有效样本上进行计算
return np.corrcoef(s1[valid_mask], s2[valid_mask])[0, 1]通过这种严谨的逻辑判断,系统能够优雅地处理数据质量问题,而不是直接抛出异常,体现了系统设计的成熟度。
为了验证系统的功能完整性与稳定性,我们设计了包含正常流程、边界条件和异常处理的全面测试用例。测试环境为 Windows 11 操作系统,Python 3.9 环境。
| 测试编号 | 测试场景 | 输入指令示例 | 预期行为 | 实际运行结果 | 结论 |
|---|---|---|---|---|---|
| TC-01 | 基础趋势分析 | “分析上海近一周的PM2.5变化” | 1. 解析出城市“上海”,时间为 T-7 至 T。 2. 成功获取数据。 3. 绘制单变量折线图。 |
系统准确识别了意图,图表展示了上海过去7天 PM2.5 的波动曲线,X轴时间连续无断点。 | ✅ 通过 |
| TC-02 | 跨变量关联分析 | “看看北京的湿度和PM2.5有没有关系” | 1. 解析任务类型为 associate。2. 计算皮尔逊相关系数。 3. 绘制双轴对比图或散点图。 |
界面显示相关系数为 -0.42(负相关),图表直观展示了湿度升高时 PM2.5 下降的趋势。 | ✅ 通过 |
| TC-03 | 未来预测 | “预测未来3天的空气质量” | 1. 解析时间为 T 至 T+3。 2. 调用 API 的 Forecast 端点。 3. 展示预测数据。 |
系统成功切换到 Forecast API,获取了未来的预测数据(非历史观测值)。 | ✅ 通过 |
| 测试编号 | 测试场景 | 操作步骤 | 预期行为 | 实际运行结果 | 结论 |
|---|---|---|---|---|---|
| TC-04 | 模糊输入处理 | 输入仅两个字:“空气” | 触发默认逻辑,自动补全参数(默认城市成都,默认近15天)。 | 系统未崩溃,提示“使用默认参数分析成都数据”,并正常输出了报表。 | ✅ 通过 |
| TC-05 | 断网模拟测试 | 手动断开电脑 WiFi,输入“分析成都数据” | 1. API 请求超时。 2. 捕获异常日志。 3. 自动降级为 Mock 数据。 |
界面状态栏显示“ |
✅ 通过 |
| TC-06 | 无效城市名 | 输入“火星的空气质量” | Geocoding 失败,提示用户或回退默认城市。 | 系统提示“无法找到城市:火星”,并自动回退到默认城市(成都)继续执行。 | ✅ 通过 |
在本地环境下(Intel i7-12700H, 32GB RAM),我们对系统的响应时间进行了统计:
- 语义解析耗时: 平均 1.2 秒(主要取决于 DeepSeek API 的网络延迟)。
- 数据采集耗时: 平均 0.8 秒(Open-Meteo 接口响应极快)。
- 数据处理与分析: < 0.1 秒(Pandas 在内存中处理 15 天的小规模数据几乎是瞬时的)。
- 图表渲染耗时: < 0.5 秒。
- 端到端总耗时: 约 2.5 秒 ~ 3.0 秒。
系统整体运行流畅,交互响应迅速,进度条和状态指示器的引入有效缓解了用户等待时的焦虑感,用户体验良好。
在项目的开发与调试过程中,我们遇到了一些典型的技术挑战。通过查阅官方文档、社区求助以及反复调试,我们逐一攻克了这些难题。
问题描述:
在开发初期,我们偶尔会遇到 requests.exceptions.SSLError: Max retries exceeded with url... 错误,导致程序直接崩溃退出。这通常是由于本地开发环境的 SSL 证书链不完整,或者 Open-Meteo 服务器瞬时的连接重置引起的。
解决方法:
- 短期修复: 在
requests.get调用中添加verify=False参数,暂时跳过 SSL 证书验证(仅限调试阶段)。 - 长期方案: 在
CollectorAgent中构建了完善的异常捕获机制(try-except)。一旦捕获到RequestException(包括超时、SSL 错误、DNS 解析失败等),立即调用_generate_mock_data方法。这不仅彻底解决了程序崩溃的问题,还顺带解决了 API 调用频率限制(Rate Limiting)可能导致的失败,极大提升了系统的稳定性。
问题描述: 当测试输入“分析昨天的数据”时,LLM 有时会将其解析为 2023 年的某一天,或者直接胡乱猜测一个日期。
原因分析: 大语言模型是基于静态文本训练的,它本身是一个“离线”的系统,不知道推理时的“现在”具体是什么时间。对于它来说,训练数据中的“今天”可能是 2023 年的某一天。
解决方法:
采用了 Prompt Engineering 中的上下文注入技巧。在构建 Prompt 时,我们使用 Python 代码动态获取当前日期:
today_str = datetime.utcnow().strftime("%Y-%m-%d")
并将这句话放在 Prompt 的最开头:f"你是一个智能解析器。今天是 {today_str}。"。这一行简单的代码修改,相当于给了模型一个参考坐标系,彻底解决了相对日期(如“近三天”、“上周”)解析错误的问题。
问题描述: Streamlit 的运行机制是“即时模式”(Immediate Mode),即每当用户点击按钮或修改输入框,整个 Python 脚本就会从头到尾重新执行一遍。这导致每次交互都会重新触发 LLM 解析和 API 数据请求,既浪费 Token 和流量,又导致页面响应变慢。
解决方法:
引入了 Streamlit 的 st.session_state 会话状态管理机制。我们将 agent_output(Agent 的执行结果)存储在 Session State 中。
if "agent_output" not in st.session_state:
st.session_state.agent_output = None
if st.button("开始分析"):
# 执行分析并存入 session
st.session_state.agent_output = orchestrator.run(query)
# 如果 session 中有数据,直接渲染,不再重新计算
if st.session_state.agent_output:
render(st.session_state.agent_output)通过这种方式,我们实现了数据的持久化缓存,避免了不必要的重复计算,显著提升了交互流畅度。
本项目紧扣大数据课程大作业的核心要求,成功构建了一个集 自然语言交互、自动化数据流处理 和 专业可视化 于一体的智能分析系统。
- 在架构层面,我们实现了清晰的 AI Agent 分层架构。Orchestrator 负责规划,Collector、Analyzer、Visualizer 负责执行,各模块耦合度低,职责单一,易于扩展和维护。
- 在功能层面,系统实现了从模糊自然语言指令到精确可视化图表的端到端转化。无论是数据采集的 Mock 兜底策略,还是分析算法的鲁棒性设计,都体现了较高的工程质量。
- 在技术层面,我们打通了 LLM 与传统大数据工具链(Pandas, Plotly)的壁垒,验证了“LLM + Tool”模式在垂直领域应用的可行性。
受限于课程时间和计算资源,本系统仍存在一定的优化空间,未来可以在以下几个方向进行深入探索:
- 支持更多元的数据源: 目前系统仅接入了 Open-Meteo。未来可以设计通用的 API 适配器,接入高德地图 API、和风天气 API,甚至连接 MySQL/Hive 数据库,实现企业级的数据仓库分析。
- 增强分析深度: 目前的分析主要局限于统计指标(均值、方差、相关系数)。未来可以引入 Scikit-learn 或 PyTorch,实现更复杂的机器学习任务,例如使用 LSTM 神经网络进行 PM2.5 的长时序预测,或者使用 K-Means 算法对城市空气质量进行聚类分级。
- 多轮对话与记忆能力: 目前的 Agent 是无状态的(Stateless),无法联系上下文。未来可以引入 LangChain 的 Memory 组件,让 Agent 具备记忆能力,用户可以基于上一次的分析结果进行追问(例如:“那上个月的情况呢?”),实现真正的对话式数据分析。