LLM应用实战: AI资讯的自动聚合及报告生成

1.背景

花了整整两天时间,本qiang~开发了一个关于AI新闻资讯的自动聚合及报告生成工具。

本篇记录一下整体的框架和实现原理,并且本着它山之石可以攻玉,本qiang~开放了所有的源码,源码可见如下第5章节,感谢各位看官的大力支持。如有问题,可私信或留言沟通。

成品可以参考链接:《AI资讯每日速递(2024.11.05)

2.为什么要做这件事?

深处AI时代,想要追赶前沿的一手技术与资讯,有一个工具能够实时获取每天的重点内容,包括咨询和技术相关内容,并且能够按照公司及内容的优先级进行筛选,然后午后捧着一杯奶茶,点开自动生成的报告,岂不美哉美哉?

3.相关技术

  1. Crawl4ai: 一块集成LLM的开源爬虫工具
  2. Swarm: OpenAI发布的Multi-Agent编排框架,可以参考本人先前的辛苦整理:《LLM应用实战: OpenAI多代理框架-Swarm
  3. Python-docx: word的操作工具
  4. Textdistance: 用于报告模块中资讯排序结果与原始资讯结果的对齐
  5. Gpt-4o-mini: 采用的大模型是gpt-4o-mini,每日免费调用200次,不够用...

4.整体框架

LLM应用实战: AI资讯的自动聚合及报告生成

 整体框架分为三个模块:

4.1下载模块

下载模块的数据源包括各大AI新闻网站及知名博客,然后通过开源爬虫工具crawl4ai进行爬取,爬取的维度包括标题、内容、图片等。

4.2解析模块

解析模块是针对爬取的结果进行解析,采用OpenAi Swarm框架,包含4Agent,其中Analysis Agent是主体Agent,遍历下载的每一个资讯,将每条资讯分别同步给其他Agent完成具体的解析任务。其中Translator Agent主要功能是翻译,将英文翻译为中文;Classifier Agent主要功能是针对资讯进行分类,如涉及技术还是产品之类的;Modifier Agent主要功能是将资讯的标题和内容进行改写,标题可以改写更醒目一些,内容主要是提取摘要信息。

Analysis Agent负责串联其他3Agent,每个Agent结束后均会返回到Analysis Agent,以便让Analysis Agent决定下一步的操作。

4.3报告模块

报告模块包含Sorter Agent,主要功能是将解析后的资讯按照公司、内容等维度进行排序,然后筛选出其中相对排名较高的资讯。

经过排序Agent后,最终将结果保存为word

5.全部源码

5.1下载模块

采用crawl4ai工具进行网站爬取,示例的网站是https://www.aibase.com,网站存在中文及英文,但增加翻译Agent是为了兼容其他网站。

 1. 文件处理file_util.py

import json import hashlib   def get_datas(file_path, json_flag=True, all_flag=False, mode='r'):     """读取文本文件"""     results = []          with open(file_path, mode, encoding='utf-8') as f:         for line in f.readlines():             if json_flag:                 results.append(json.loads(line))             else:                 results.append(line.strip())         if all_flag:             if json_flag:                 return json.loads(''.join(results))             else:                 return 'n'.join(results)         return results       def save_datas(file_path, datas, json_flag=True, all_flag=False, with_indent=False, mode='w'):     """保存文本文件"""     with open(file_path, mode, encoding='utf-8') as f:         if all_flag:             if json_flag:                 f.write(json.dumps(datas, ensure_ascii=False, indent= 4 if with_indent else None))             else:                 f.write(''.join(datas))         else:             for data in datas:                 if json_flag:                     f.write(json.dumps(data, ensure_ascii=False) + 'n')                  else:                     f.write(data + 'n')

  2. 网站爬取web_crawler.py

LLM应用实战: AI资讯的自动聚合及报告生成

from crawl4ai import AsyncWebCrawler from crawl4ai.extraction_strategy import JsonCssExtractionStrategy import json from typing import Dict, Any, Union, List from bs4 import BeautifulSoup from file_util import * import os import datetime import re import requests   class AbstractAICrawler():          def __init__(self) -> None:         pass     def crawl():         raise NotImplementedError()   class AINewsCrawler(AbstractAICrawler):     def __init__(self, domain) -> None:         super().__init__()         self.domain = domain         self.file_path = f'data/{self.domain}.json'         self.history = self.init()          def init(self):         if not os.path.exists(self.file_path):             return {}         return {ele['id']: ele for ele in get_datas(self.file_path)}          def save(self, datas: Union[List, Dict]):         if isinstance(datas, dict):             datas = [datas]         self.history.update({ele['id']: ele for ele in datas})         save_datas(self.file_path, datas=list(self.history.values()))          async def crawl(self, url:str, schema: Dict[str, Any]=None):         extraction_strategy = JsonCssExtractionStrategy(schema, verbose=True) if schema else None         async with AsyncWebCrawler(verbose=True) as crawler:             result = await crawler.arun(                 url=url,                 extraction_strategy=extraction_strategy,                 bypass_cache=True,             )              assert result.success, "Failed to crawl the page"             if schema:                 return json.loads(result.extracted_content)             return result.cleaned_html   class AIBasesCrawler(AINewsCrawler):     def __init__(self) -> None:         self.domain = 'aibase'         super().__init__(self.domain)         self.url = 'https://www.aibase.com'              async def crawl_home(self, url='https://www.aibase.com/news'):         schema = {             'name': 'ai base home page crawler',             'baseSelector': '.flex',             'fields': [                 {                     'name': 'link',                     'selector': 'a[rel="noopener noreferrer"]',                     'type': 'nested_list',                     'fields': [                         {'name': 'href', 'type': 'attribute', 'attribute':'href'}                     ]                 }             ]         }         links = await super().crawl(url, schema)         links = [link['href'] for ele in links for link in ele['link']]         links = list(set([f'{self.url}{ele}' for ele in links if ele.startswith('/news')]))         links = sorted(links, key=lambda x: x, reverse=True)         return links          async def crawl_newsletter_cn(self, url):         html = await super().crawl(url)         body = BeautifulSoup(html, 'html.parser')         title = body.select_one('h1').get_text().replace('u200b', '').strip()         date = [ele.get_text().strip() for ele in body.find_all('span') if re.match(r'(d{4}年d{1,2}月d{1,2}号)', ele.get_text().strip())][0]         date = datetime.datetime.strptime(date, '%Y年%m月%d号 %H:%M').strftime("%Y-%m-%d")         content = 'n'.join([ele.get_text().strip().replace('n', '').replace(' ', '') for ele in body.find_all('p')])         content = content[:content.index('划重点:')].strip() if '划重点:' in content else content         return {             'title': title,             'link': url,             'content': content,             'date': date         }          async def crawl_home_cn(self, url='https://www.aibase.com/zh/news'):         schema = {             'name': 'ai base home page crawler',             'baseSelector': '.flex',             'fields': [                 {                     'name': 'link',                     'selector': 'a[rel="noopener noreferrer"]',                     'type': 'nested_list',                     'fields': [                         {'name': 'href', 'type': 'attribute', 'attribute':'href'}                     ]                 }             ]         }         links = await super().crawl(url, schema)         links = [link['href'] for ele in links for link in ele['link']]         links = list(set([f'{self.url}{ele}' for ele in links if ele.startswith('/zh/news')]))         links = sorted(links, key=lambda x: x, reverse=True)         return links          async def crawl_newsletter(self, url):         html = await super().crawl(url)         body = BeautifulSoup(html, 'html.parser')         title = body.select_one('h1').get_text().replace('u200b', '').strip()         date = ';'.join([ele.get_text().strip() for ele in body.find_all('span')])         date = re.findall(r'(bw{3}s+d{1,2},s+d{4}b)', date)[0]         date = datetime.datetime.strptime(date, '%b %d, %Y').strftime("%Y-%m-%d")         content = 'n'.join([ele.get_text().strip().replace('n', '') for ele in body.find_all('p')])         content = content[:content.index('Key Points:')].strip() if 'Key Points:' in content else content         pic_urls = [ele.get('src').strip() for ele in body.select('img') if ele.get('title')]         pic_url = pic_urls[0] if pic_urls else ''         pic_url = pic_url.replace('\"', '')         pic_path = ''         if pic_url:             pic_path = f'data/images/{md5(url)}.jpg'             response = requests.get(pic_url)             if response.status_code == 200:                 with open(pic_path, 'wb') as f:                     f.write(response.content)         return {             'title': title,             'link': url,             'content': content,             'date': date,             'pic': pic_path,             'id': md5(url)         }          async def crawl(self):         links = await self.crawl_home()         results = []         for link in links:             _id = md5(link)             if _id in self.history:                 continue             results.append({                 'id': _id,                 'link': link,                 'contents': await self.crawl_newsletter(link),                 'time': datetime.datetime.now().strftime('%Y-%m-%d')             })         self.save(results)         return await self.get_last_day_data()          async def get_last_day_data(self):         last_day = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')         datas = self.init()         for v in datas.values():             v['contents']['id'] = v['id']         return [v['contents'] for v in datas.values() if v['contents']['date'] == last_day]

View Code

 5.2解析模块

1. 解析提示语prompt.py

LLM应用实战: AI资讯的自动聚合及报告生成

ANALYST = """你是一个AI领域的分析师,主要工作步骤如下: 1. 首先执行transform_to_translate_agent方法,切换到translate agent,执行翻译任务; 2. 然后再执行transform_to_classifier_agent,调用classifier agent,针对内容进行分类; 3. 接着再执行transform_to_modifier_agent,调用modifier agent,针对内容进行改写; 4. 前三步执行完毕后,意味着整个分析工作已经完成,最后调用finish方法,退出该整个工作流程。 需要注意的是:每个步骤必须执行完成后,才能执行后续的步骤,且同时只能有1个步骤在执行;如果modifier agent已经执行完毕,一定要调用finish退出整体工作流程。 """   TRANSLATE = """你现在是一个AI领域的翻译专家,请将如下英文的标题和内容分别翻译为中文。步骤及要求如下: 1. 首先调用translate方法进行翻译,要求如下: a. 需要注意的标题和内容中如果包含公司名称、产品名称、技术名称等专业词汇,针对这些专业词汇需要保留英文形式,其他非专业词汇需要翻译为中文,注意标题也必须翻译; b. 输出格式为 "标题: xxxxxn内容: xxxxx",且需要保留换行符; c. 注意该translate方法没有输入参数,返回的结果只是需要翻译的原始文本,需要你执行翻译操作,然后返回翻译结果; d. 该translate方法执行完成后,需要你执行具体的翻译,等待翻译完成后,才能开展下一个步骤,不能直接将原文作为参数传给下一个步骤;  2. 抽取完成后,执行extract_translate_result方法,要求如下: a. 该extract_translate_result方法存在1个输入参数,即执行1后得到的翻译结果  3. 待步骤2执行完成后,执行transform_to_analysis_agent方法,切换至analysis agent,执行其他工作。  4. 步骤1,2,3必须按照顺序执行,且同时只能有1个步骤在执行  5. 如果历史记录中已经执行了任何步骤,注意严格禁止再次重复执行,而要直接执行未执行的步骤, """  CLASSIFIER = """你是一个AI领域的分类器,请判断输入是否与AI的技术相关。步骤及要求如下: 1. 首先调用classify方法进行分类,要求如下: a. 输入的内容包括标题和内容两部分,重点基于内容进行判断这条信息是否与AI技术相关; b. 如果是相关技术细节、技术原理、代码说明、架构说明,则输出"是",如果是与公司的最新资讯相关,如发行新产品、成立新部门、公司合作等非技术相关的,则输出"否" c. 输出的结果只能是"是"、"否"两个选项中的一个,不要输出其他内容,包括解释信息等。 d. 注意该classify方法没有输入参数,返回的结果只是需要分类的原始文本,需要你执行分类任务,然后返回分类结果;   2. 获取到分类结果后,执行extract_classify_result方法,要求如下: a. 该extract_classify_result方法存在1个输入参数,即执行1后得到的分类结果  3. 待步骤2执行完成后,执行transform_to_analysis_agent方法,切换至analysis agent,执行其他工作  4. 步骤1,2,3必须按照顺序执行,且同时只能有1个步骤在执行  5. 如果历史记录中已经执行了任何步骤,注意严格禁止再次重复执行,而要直接执行未执行的步骤, """   MODIFIER = """你是一个AI新闻的改写器,请基于输入中的标题和内容进行改写。步骤及要求如下: 1. 首先调用modify方法进行改写,要求如下: a. 输入的内容包括"标题"和"内容"两部分,需要分别针对"标题"和"内容"进行改写; b. "标题"的改写目标是需要醒目且具有吸引力,能够吸引读者进一步阅读,要求字数不能超过30字; c. "内容"需要摘要总结,需要准确提取主要内容,要求字数不超过200字; d. 输出格式为 "标题: xxxxn内容: xxxxx",且需要保留换行符,"标题"和"内容"需要以输入的中文为准; e. 注意该modify方法没有输入参数,返回的结果是需要改写的原始文本,需要你执行改写任务,然后返回改写结果;   2. 获取到改写结果后,执行extract_modify_result方法,要求如下: a. 该extract_modify_result方法存在1个输入参数,即执行1后得到的改写结果  3. 待步骤2执行完成后,执行transform_to_analysis_agent方法,切换至analysis agent,执行其他工作  4. 步骤1,2,3必须按照顺序执行,且同时只能有1个步骤在执行  5. 如果历史记录中已经执行了任何步骤,注意严格禁止再次重复执行,而要直接执行未执行的步骤 """

View Code

 2. 解析Agent整体流程agent.py

LLM应用实战: AI资讯的自动聚合及报告生成

agent copy 2from swarm import Swarm, Agent from web_crawler import AIBasesCrawler import asyncio from prompt import * from file_util import * from tqdm import tqdm import datetime   client = Swarm()  def download():     return asyncio.run(AIBasesCrawler().crawl())  def transform_to_analysis_agent():     return analysis_agent  def transform_to_translate_agent():     return translate_agent  def transform_to_classifier_agent():     return classifier_agent  def transform_to_modifier_agent():     return modifier_agent  def translate(context_variables):     return f'现在请按要求翻译如下内容:n标题: {context_variables["title"]}n内容: {context_variables["content"]}'  def extract_translate_result(result: str, context_variables: dict):     """翻译的结果进行抽取      Args:         result (str): 翻译结果     Returns:         str: 翻译结果提取结束标志     """     context_variables['title_zh'] = result[result.index('标题:')+len('标题:'):result.index('内容:')]     context_variables['content_zh'] = result[result.index('内容:')+len('内容:'):]     return '翻译结果提取任务已经完成,请继续下一步操作。'  def classify(context_variables):     return f'现在请按要求针对以下内容进行分类,n输入:n标题: {context_variables["title_zh"]}n内容: {context_variables["content_zh"]},n输出:'  def extract_classify_result(result: str, context_variables: dict):     """分类的结果进行抽取      Args:         result (str): 翻译结果     Returns:         str: 分类结果提取结束标志     """     context_variables['classify'] = result     return '分类结果提取任务已经完成,请继续下一步操作。'   def modify(context_variables):     return f'现在请按要求针对以下内容进行改写,n输入:n标题: {context_variables["title_zh"]}n内容: {context_variables["content_zh"]},n输出:'  def extract_modify_result(result: str, context_variables: dict):     """改写的结果进行抽取      Args:         result (str): 改写结果     Returns:         str: 改写结果提取结束标志     """     context_variables['title_modify'] = result[result.index('标题:')+len('标题:'):result.index('内容:')]     context_variables['content_modify'] = result[result.index('内容:')+len('内容:'):]     return '改写结果提取任务已经完成,请继续下一步操作。'   def finish():     return '分析任务已经完成,请直接退出整个工作流程,直接输出"退出"。'   analysis_agent = Agent(name='analysis_agent', instructions=ANALYST, functions=[transform_to_translate_agent, transform_to_classifier_agent, transform_to_modifier_agent, finish]) translate_agent = Agent(name='translate_agent', instructions=TRANSLATE, functions=[translate, extract_translate_result, transform_to_analysis_agent]) classifier_agent = Agent(name='classifier_agent', instructions=CLASSIFIER, functions=[classify, extract_classify_result, transform_to_analysis_agent]) modifier_agent = Agent(name='modifier_agent', instructions=MODIFIER, functions=[modify, extract_modify_result, transform_to_analysis_agent])  output_file_pre = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y.%m.%d') output_path = f'data/{output_file_pre}_final_results.json' results = get_datas(output_path) process_ids = [data['id'] for data in results] for data in tqdm(download()):     if data['id'] in process_ids: continue          context_variables = {'title': data['title'], 'content': data['content']}     try:         result = client.run(analysis_agent, messages=[{"role": "user", "content": "现在,请开始分析!"}], context_variables=context_variables, debug=True)         context_variables = result.context_variables         data['title_zh'] = context_variables['title_zh']         data['content_zh'] = context_variables['content_zh']         data['classify'] = context_variables['classify']         data['title_modify'] = context_variables['title_modify']         data['content_modify'] = context_variables['content_modify']         save_datas(output_path, [data], mode='a')     except Exception as e:         print(e)         continue

View Code

  5.3报告模块

1. 排序提示语prompt.py

LLM应用实战: AI资讯的自动聚合及报告生成

SORTER = """你是一个AI新闻的排序助手,请给予输入的新闻标题进行排序。要求如下: 1. 排序的规则是基于标题中所提及公司、组织机构的名气和重要性进行排序,名气和重要性是基于你所学的知识进行排序,名气和重要性越高,排名越靠前; 2. 排序的结果只返回名气最高的top10即可,输出的格式为"1xxxxxn2xxxxxn3xxxxx...n10xxxxx",注意一定要以"n"进行换行; 3. 输出的每个标题,需要和输入中对应的标题保持完全一致,禁止更改; """

View Code

 2. 排序流程agent.py

LLM应用实战: AI资讯的自动聚合及报告生成

from swarm import Swarm, Agent from prompt import * from file_util import * from collections import defaultdict import re import textdistance from word_util import save_2_word import datetime import random   client = Swarm() output_file_pre = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y.%m.%d') output_path = f'data/{output_file_pre}_final_results.json' sort_agent = Agent(name='sort_agent', instructions=SORTER)      datas = get_datas(output_path) for ele in datas:     ele['title_modify'] = ele['title_modify'].strip()     ele['content_modify'] = ele['content_modify'].strip()  def get_most_similar(t1, texts):     most_similarity = 0.0     most_similar_text = ''     for ele in texts:         similarity = textdistance.levenshtein.similarity(t1, ele)         if similarity > most_similarity:             most_similarity = similarity             most_similar_text = ele     return most_similar_text      type_2_title = defaultdict(list) {type_2_title[ele['classify']].append(ele['title_modify']) for ele in datas} title_2_data = {ele['title_modify']: ele for ele in datas} final_results = defaultdict(list) for k, v in type_2_title.items():     content = "n".join([ele for ele in v])     message = f'现在请根据你所学习的知识,按照要求对以下输入进行排序,并且按照输出格式进行输出,n输入:n{content},n输出:'     result = client.run(sort_agent, messages=[{"role": "user", "content": message}], debug=True)     sort_results = [ele['content'] for ele in result.messages[::-1] if 'content' in ele and ele['content'] and ele['content']]     sort_results = sort_results[0].split('n') if sort_results else random.sample(v, 10)     sort_results = [re.sub(r'^d+[.,、s]*', '', ele).strip() for ele in sort_results]     final_results[k].extend([title_2_data[get_most_similar(ele, list(title_2_data.keys()))] for ele in sort_results])  sort_output = f'data/{output_file_pre}_sort_results.json' save_datas(sort_output, [final_results])  # 生成word save_2_word(final_results, output_file_pre)

View Code

 3. 报告生成word_util.py

LLM应用实战: AI资讯的自动聚合及报告生成

from docx import Document from docx.shared import Inches, Pt, RGBColor from docx.enum.text import WD_PARAGRAPH_ALIGNMENT import os   def save_2_word(info_dict, file_pre):     doc = Document()          categories = ['', '']     category_color = 'FF5733'          for category in categories:         news = info_dict[category]         category_paragraph = doc.add_paragraph()         category = '技术' if category == '' else '资讯'         category_run = category_paragraph.add_run(category)         category_run.bold = True         category_run.font.size = Pt(25)         category_run.font.color.rgb = RGBColor.from_string(category_color)         category_paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER         for i, item in enumerate(news):             title = item['title_modify']             doc.add_heading(f'{i+1}. {title}', level=1)                          pic = item['pic'] if 'pic' in item else ''             if pic and os.path.exists(pic):                 pic_paragraph = doc.add_paragraph()                 pic_paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER                 doc.add_picture(pic, width=Inches(5))                          content = item['content_modify']             doc.add_paragraph(content)                  doc.save(f'data/AI资讯每日速递({file_pre}).docx')                 

View Code

 

6.优化思考

1. 爬取模块目前是串行下载,且未增加反爬机制,后续可以增加多线程,且增加代理池机制。

2. 免费的gpt-4o-mini每日调用次数仅有200次,执行本任务远远不够,因此后期尝试切换为私有部署的Qwen2.5

其实已经尝试了Qwen2.5,以vllm部署,但与Swarm框架中的OpenAi接口存在少许不兼容,例如不支持特定的参数,只能运行一轮。不过可以进一步优化Swarm框架来进行适配。

本次实验本qiang~花费了30大洋,买了一个gpt-4o-mini,生成最终结果,直接耗费了其中的8个大洋,烧钱....

3. 信息推送机制不支持,如一键同步到公众号、CSDN、知乎,这块如果有精力可以基于网站的开发接口,实现一键自动发布文章。

7.总结

一句话足矣~

开发了一块AI资讯的自动聚合及报告生成工具,包括具体的框架、实现原理以及完整源码,满满诚意,提供给各位看官。欢迎转发、订阅~

有问题可以私信或留言沟通!

8.参考

(1) Swarm: https://github.com/openai/swarm

(2) Crawl4ai: https://github.com/unclecode/crawl4ai

(3) 资讯网站: https://www.aibase.com/news

 LLM应用实战: AI资讯的自动聚合及报告生成

 

发表评论

相关文章