小实验一:数据清洗+ai研判

张开发
2026/6/27 5:58:59 15 分钟阅读
小实验一:数据清洗+ai研判
在日常事件研判工作中海量数据的预处理与清洗占据了大量时间。为了提升效率我尝试借助AI“偷个懒”——通过编写自动化脚本配合DeepSeek与ChatGPT构建半自动化的处理流程。具体思路利用DeepSeek进行语言润色与Prompt优化利用ChatGPT生成核心代码包括正则匹配、数据清洗、批量处理逻辑等。最初只想实现一个基础的清洗脚本没想到逐步扩展到日志解析、特征提取与结果校验等环节越做越上瘾。本文将分享这套“AI辅助编程”的实践思路与踩坑经验希望对同样面临重复性研判工作的同学有所启发。这是一开始的第一步。这是一开始初版给deepseek的一段话deepseek生成了一段然后我就将生成的语句都交给了chatgpt。chatgpt就生成了清洗的代码。import os import re import numpy as np import pandas as pd # # 1. 用户可修改的输入/输出路径 # input_file data.xlsx output_file cleaned_data.xlsx # # 2. 用户可扩展的自定义清洗规则 # 说明 # - 事件名称默认精确匹配 # - 关键特征 / 主机信息 / 资产分组子串匹配 # - 空字符串表示该条件忽略 # - 若需要事件名称使用正则可将 use_regex 设置为 True # custom_filters [ { 事件名称: APT事件文件情报, 关键特征: 攻击组织: [沙虫], 主机信息: , 资产分组: , use_regex: False }, { 事件名称: 检测到木马, 关键特征: , 主机信息: **********, 资产分组: *********************, use_regex: False } ] # # 3. 固定规则配置 # DROP_COLUMNS [首次时间, 最近时间, 事件ID, 来源设备, 检测模块] DROP_EVENT_NAME nmap端口扫描 DROP_ASSET_GROUPS [ **************************, *********************** ] def print_shape(df, step_name): 打印当前数据形状 print(f[{step_name}] 当前数据形状: {df.shape[0]} 行 x {df.shape[1]} 列) def validate_file_exists(file_path): 检查输入文件是否存在 if not os.path.exists(file_path): raise FileNotFoundError(f输入文件不存在: {file_path}) def read_excel_file(file_path): 读取 Excel 文件 try: df pd.read_excel(file_path) return df except Exception as e: raise RuntimeError(f读取 Excel 文件失败: {e}) def check_required_columns(df, required_columns, stage_desc): 检查所需列是否存在 missing [col for col in required_columns if col not in df.columns] if missing: raise ValueError(f{stage_desc}缺少必要列: {missing}) def safe_str_series(series): 将列安全转换为字符串避免 NaN 报错 用于 contains / match 之类字符串处理 return series.fillna().astype(str) def drop_fixed_columns(df): 删除固定列 existing_cols [col for col in DROP_COLUMNS if col in df.columns] missing_cols [col for col in DROP_COLUMNS if col not in df.columns] if missing_cols: print(f[提示] 以下待删除列不存在将自动跳过: {missing_cols}) df df.drop(columnsexisting_cols, errorsignore) print_shape(df, 删除固定列后) return df def drop_nmap_rows(df): 删除事件名称为 nmap端口扫描 的行 check_required_columns(df, [事件名称], 删除 nmap 规则时) before len(df) df df[df[事件名称] ! DROP_EVENT_NAME].copy() removed before - len(df) print(f[删除事件名称规则] 删除 事件名称 {DROP_EVENT_NAME} 的行数: {removed}) print_shape(df, 删除 nmap端口扫描 后) return df def drop_asset_group_rows(df): 删除特定资产分组的行 check_required_columns(df, [资产分组], 删除资产分组规则时) before len(df) df df[~df[资产分组].isin(DROP_ASSET_GROUPS)].copy() removed before - len(df) print(f[删除资产分组规则] 删除指定资产分组行数: {removed}) print_shape(df, 删除指定资产分组 后) return df def build_custom_filter_mask(df, rule): 根据单条自定义规则构建掩码 - 事件名称精确匹配或正则匹配 - 关键特征 / 主机信息 / 资产分组子串匹配 - 空条件忽略 - 同时满足所有非空条件时判定为需删除 needed_cols [事件名称, 关键特征, 主机信息, 资产分组] check_required_columns(df, needed_cols, 执行自定义清洗规则时) event_name str(rule.get(事件名称, )).strip() key_feature str(rule.get(关键特征, )).strip() host_info str(rule.get(主机信息, )).strip() asset_group str(rule.get(资产分组, )).strip() use_regex bool(rule.get(use_regex, False)) # 初始 mask 全 True后续逐步与运算 mask pd.Series(True, indexdf.index) # 事件名称匹配 if event_name: if use_regex: mask mask safe_str_series(df[事件名称]).str.contains( event_name, regexTrue, naFalse ) else: mask mask (safe_str_series(df[事件名称]) event_name) # 关键特征子串匹配 if key_feature: mask mask safe_str_series(df[关键特征]).str.contains( re.escape(key_feature), regexTrue, naFalse ) # 主机信息子串匹配 if host_info: mask mask safe_str_series(df[主机信息]).str.contains( re.escape(host_info), regexTrue, naFalse ) # 资产分组子串匹配 if asset_group: mask mask safe_str_series(df[资产分组]).str.contains( re.escape(asset_group), regexTrue, naFalse ) return mask def apply_custom_filters(df, custom_filters): 应用所有自定义清洗规则 if not custom_filters: print([自定义清洗] 未配置自定义规则跳过。) return df total_removed 0 for i, rule in enumerate(custom_filters, start1): try: mask build_custom_filter_mask(df, rule) matched int(mask.sum()) print(f\n[自定义规则 {i}] 规则内容: {rule}) print(f[自定义规则 {i}] 命中行数: {matched}) if matched 0: df df[~mask].copy() total_removed matched print_shape(df, f应用自定义规则 {i} 后) else: print(f[自定义规则 {i}] 无匹配跳过删除。) except Exception as e: print(f[警告] 自定义规则 {i} 执行失败已跳过。原因: {e}) print(f\n[自定义清洗完成] 总删除行数: {total_removed}) print_shape(df, 全部自定义规则执行后) return df def save_to_excel(df, file_path): 保存到 Excel try: df.to_excel(file_path, indexFalse) print(f\n[完成] 清洗后的数据已保存到: {file_path}) except Exception as e: raise RuntimeError(f保存 Excel 文件失败: {e}) def main(): try: # 1. 检查文件 validate_file_exists(input_file) # 2. 读取文件 df read_excel_file(input_file) print_shape(df, 原始数据) # 3. 固定清洗规则 df drop_fixed_columns(df) df drop_nmap_rows(df) df drop_asset_group_rows(df) # 4. 自定义清洗规则 df apply_custom_filters(df, custom_filters) # 5. 保存结果 save_to_excel(df, output_file) except FileNotFoundError as e: print(f[错误] {e}) except ValueError as e: print(f[错误] {e}) except RuntimeError as e: print(f[错误] {e}) except Exception as e: print(f[未知错误] 程序执行失败: {e}) if __name__ __main__: main()以上就是清洗的代码不得不说chatgpt生成代码的能力真的很强。然后这时候就想为何不利用大模型给我进行分析原理就是利用云端的网站来配合进行分析。因此我主要的任务就是获得ai的api值以及所用到的情报中心平台的api值。因此下面就是我利用deepseekchatgpt来回修改的步骤第一回合第二回合最后生成的最终脚本演示的效果还可以方便的很多了。尤其是最后生成的critical_events.xlsx副本。这个更加有助于我去分析了。以上是展示的最终的效果ai事件研判效率快多了。

更多文章