目录
说在前面:参考刘焕勇老师在 Github 上开源的项目
GitHub地址:基于知识图谱的医疗问答系统
一、搭建 Neo4j 图数据库
1、方式选择
- windows 使用 Neo4j Desktop (2024-12-09开始 Neo4j desktop 无法打开表现为三个/四个僵尸进程,查看本地日志会发现[403]无法获取到https://dist.neo4j.org/neo4j-desktop/win/latest.yml这个路径的资源。解决方案:断网打开 Neo4j Desktop / Neo4j Desktop 1.5.8 Launches Zombie Processes Only - Neo4j Graph Platform / Desktop - Neo4j Online Community)
- 云环境 dockerfile + docker-compose (部署构建简单易懂无需专注 jdk 版本,优先考虑)
- 最终理想化:kubernetes 部署 (符合主流技术导向,虽说部署较复杂且多坑但是企业级以及行业主导地位等因素使用 k8s 部署还是最佳实践)
首次部署优先采用 dockerfile + docker-compose
2、Dockerfile+docker-compose部署neo4j容器
2.1、更新 yum 镜像源
rm -rf /etc/yum.repos.d/* wget -O /etc/yum.repos.d/centos7.repo http://mirrors.aliyun.com/repo/Centos-7.repo wget -O /etc/yum.repos.d/epel-7.repo http://mirrors.aliyun.com/repo/epel-7.repo wget -O /etc/yum.repos.d/docker-ce.repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
2.2、安装 docker-ce 社区版
yum install -y docker-ce
2.3、配置镜像加速
cat > /etc/docker/daemon.json << EOF { "exec-opts": ["native.cgroupdriver=systemd"], "registry-mirrors": [ "https://dockerhub.icu", "https://hub.rat.dev", "https://docker.wanpeng.top", "https://doublezonline.cloud", "https://docker.mrxn.net", "https://docker.anyhub.us.kg", "https://dislabaiot.xyz", "https://docker.fxxk.dedyn.io" ] } EOF systemctl daemon-reload && systemctl restart docker && systemctl enable docker
2.4、安装 Docker Compose
2.4.1、下载 Docker Compose 二进制包
curl -L "https://github.com/docker/compose/releases/download/v2.5.1/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
-L
: 是curl
的一个选项,表示跟随重定向。如果下载链接是重定向的,这个选项会让curl
自动跟踪到最后的目标地址。"https://github.com/docker/compose/releases/download/v2.5.1/docker-compose-$(uname -s)-$(uname -m)"
: 这是Docker Compose的下载URL,其中v2.5.1
指定了要下载的Docker Compose版本号。$(uname -s)
和$(uname -m)
是shell命令,分别返回当前系统的类型(如Linux
)和机器的硬件架构(如x86_64
),这样可以确保下载与当前系统架构相匹配的Docker Compose二进制文件。-o /usr/local/bin/docker-compose
:-o
或--output
指定了下载文件的保存位置及名称。这里,文件会被保存为/usr/local/bin/docker-compose
,这是Docker Compose常见的安装路径,将其放在此处可以使其在PATH环境变量中,从而可以直接在命令行中通过docker-compose
命令调用。
2.4.2、设置可执行权限
chmod +x /usr/local/bin/docker-compose
2.4.3、查看版本
docker-compose -v
2.5、创建目录结构
mkdir -p neo4j-docker/{conf,data,import,logs} && touch neo4j-docker/conf/neo4j.conf chown -R neo4j:neo4j ./{conf,data,import,logs} chmod 755 ./{conf,data,logs,import} tree -L 2 neo4j-docker neo4j-docker ├── conf │ └── neo4j.conf ├── data ├── import └── logs
2.6、编写neo4j.conf配置文件
cat > /root/neo4j-docker/conf/neo4j.conf << EOF server.directories.import=/var/lib/neo4j/import server.memory.pagecache.size=512M server.default_listen_address=0.0.0.0 dbms.security.allow_csv_import_from_file_urls=true server.directories.logs=/logs EOF
2.7、编写 dockerfile 文件
cat > /root/neo4j-docker/Dockerfile << EOF # 使用官方 Neo4j 最新版本镜像作为基础镜像 FROM neo4j:latest # 设置环境变量,仅用于配置 Neo4j 认证 ENV NEO4J_AUTH=neo4j/neo4jpassword # 拷贝本地的配置文件到容器中 COPY ./conf/neo4j.conf /var/lib/neo4j/conf/ # 定义容器启动时执行的命令 CMD ["neo4j"] EOF
2.8、构建ne4j容器镜像
# 命令位置需要与Dockerfile位置同级 docker build -t my_neo4j:v1 .
2.9、编写docker-compose.yaml文件
有坑:neo4j 5.x 版本所需密码位数需要在 8 位以上
version: '3' services: neo4j: build: . image: my_neo4j:v1 container_name: neo4j_container restart: always ports: - "7474:7474" - "7687:7687" environment: - NEO4J_AUTH=neo4j/neo4jpassword volumes: - ./data:/data - ./logs:/logs - ./import:/var/lib/neo4j/import - ./conf:/var/lib/neo4j/conf command: ["neo4j"]
2.10、运行docker-compose
docker-compose -f docker-compose.yaml up -d
2.11、浏览器登录 neo4j
http://192.168.112.30:7474 # 输入用户名:neo4j # 输入密码:neo4jpassword
二、Neo4j 初始配置
1、清空 Neo4j 数据库
MATCH (n) DETACH DELETE n
三、PyCharm 项目安装必备库
1、py2neo 库
pip install py2neo
-
简化 Neo4j 连接和查询
- 连接到 Neo4j:
py2neo
提供了简单易用的接口来连接到 Neo4j 数据库,支持 HTTP 和 Bolt 协议。 - 执行 Cypher 查询:
py2neo
允许你直接执行 Cypher 查询(Neo4j 的图查询语言),并以 Python 对象的形式返回结果。
- 连接到 Neo4j:
-
创建和管理图数据
- 创建节点和关系:
py2neo
提供了高级抽象,允许你像操作 Python 对象一样创建和管理 Neo4j 中的节点和关系。你可以使用Node
和Relationship
类来表示图中的实体,并将它们保存到数据库中。 - 批量操作:
py2neo
支持批量创建节点和关系,提高性能,减少网络往返次数。
- 创建节点和关系:
2、pymongo 库
pip install pymongo
- 用于连接和操作 MongoDB 数据库,读取、处理并重新插入医疗数据。
- 提供了高效的 CRUD 操作,支持批量数据处理。
3、lxml 库
pip install lxml
- 用于解析存储在 MongoDB 中的 HTML 文档,提取有用的医疗检查信息(如疾病名称、描述等)。
- 通过 XPath 提取数据,并进行必要的清理和格式化。
四、python 连接 Neo4j
1、浏览器 browser 查看Neo4j 连接状态
:server status
记住 URL (不是传统意义上的 http://,以及默认的端口号7474)
2、修改源文件中 Graph 连接格式
import os import json from py2neo import Graph,Node class MedicalGraph: def __init__(self): cur_dir = '/'.join(os.path.abspath(__file__).split('/')[:-1]) self.data_path = os.path.join(cur_dir, 'data/medical.json') self.g = Graph("neo4j://192.168.112.30:7687", auth=("neo4j", "neo4jpassword"))
build_medicalgraph.py
和answer_search.py
两个原文件中的self.g = Graph()
的连接格式都更改为上述代码中的格式。
五、PyCharm 导入医疗知识图谱
1、读取文件
# 读取文件 def read_nodes(self): # 共7类节点 drugs = [] # 药品 foods = [] # 食物 checks = [] # 检查 departments = [] #科室 producers = [] #药品大类 diseases = [] #疾病 symptoms = []#症状 disease_infos = []#疾病信息 # 构建节点实体关系 rels_department = [] # 科室-科室关系 rels_noteat = [] # 疾病-忌吃食物关系 rels_doeat = [] # 疾病-宜吃食物关系 rels_recommandeat = [] # 疾病-推荐吃食物关系 rels_commonddrug = [] # 疾病-通用药品关系 rels_recommanddrug = [] # 疾病-热门药品关系 rels_check = [] # 疾病-检查关系 rels_drug_producer = [] # 厂商-药物关系 rels_symptom = [] #疾病症状关系 rels_acompany = [] # 疾病并发关系 rels_category = [] # 疾病与科室之间的关系 count = 0 for data in open(self.data_path, encoding='utf8', mode='r'): disease_dict = {} count += 1 print(count) data_json = json.loads(data) disease = data_json['name'] disease_dict['name'] = disease diseases.append(disease) disease_dict['desc'] = '' disease_dict['prevent'] = '' disease_dict['cause'] = '' disease_dict['easy_get'] = '' disease_dict['cure_department'] = '' disease_dict['cure_way'] = '' disease_dict['cure_lasttime'] = '' disease_dict['symptom'] = '' disease_dict['cured_prob'] = '' if 'symptom' in data_json: symptoms += data_json['symptom'] for symptom in data_json['symptom']: rels_symptom.append([disease, symptom]) if 'acompany' in data_json: for acompany in data_json['acompany']: rels_acompany.append([disease, acompany]) if 'desc' in data_json: disease_dict['desc'] = data_json['desc'] if 'prevent' in data_json: disease_dict['prevent'] = data_json['prevent'] if 'cause' in data_json: disease_dict['cause'] = data_json['cause'] if 'get_prob' in data_json: disease_dict['get_prob'] = data_json['get_prob'] if 'easy_get' in data_json: disease_dict['easy_get'] = data_json['easy_get'] if 'cure_department' in data_json: cure_department = data_json['cure_department'] if len(cure_department) == 1: rels_category.append([disease, cure_department[0]]) if len(cure_department) == 2: big = cure_department[0] small = cure_department[1] rels_department.append([small, big]) rels_category.append([disease, small]) disease_dict['cure_department'] = cure_department departments += cure_department if 'cure_way' in data_json: disease_dict['cure_way'] = data_json['cure_way'] if 'cure_lasttime' in data_json: disease_dict['cure_lasttime'] = data_json['cure_lasttime'] if 'cured_prob' in data_json: disease_dict['cured_prob'] = data_json['cured_prob'] if 'common_drug' in data_json: common_drug = data_json['common_drug'] for drug in common_drug: rels_commonddrug.append([disease, drug]) drugs += common_drug if 'recommand_drug' in data_json: recommand_drug = data_json['recommand_drug'] drugs += recommand_drug for drug in recommand_drug: rels_recommanddrug.append([disease, drug]) if 'not_eat' in data_json: not_eat = data_json['not_eat'] for _not in not_eat: rels_noteat.append([disease, _not]) foods += not_eat do_eat = data_json['do_eat'] for _do in do_eat: rels_doeat.append([disease, _do]) foods += do_eat recommand_eat = data_json['recommand_eat'] for _recommand in recommand_eat: rels_recommandeat.append([disease, _recommand]) foods += recommand_eat if 'check' in data_json: check = data_json['check'] for _check in check: rels_check.append([disease, _check]) checks += check if 'drug_detail' in data_json: drug_detail = data_json['drug_detail'] producer = [i.split('(')[0] for i in drug_detail] rels_drug_producer += [[i.split('(')[0], i.split('(')[-1].replace(')', '')] for i in drug_detail] producers += producer disease_infos.append(disease_dict) return set(drugs), set(foods), set(checks), set(departments), set(producers), set(symptoms), set(diseases), disease_infos, rels_check, rels_recommandeat, rels_noteat, rels_doeat, rels_department, rels_commonddrug, rels_drug_producer, rels_recommanddrug, rels_symptom, rels_acompany, rels_category
2、建立节点
# 建立节点 def create_node(self, label, nodes): count = 0 for node_name in nodes: node = Node(label, name=node_name) self.g.create(node) count += 1 print(count, len(nodes)) return
3、创建知识图谱中心疾病的节点
# 创建知识图谱中心疾病的节点 def create_diseases_nodes(self, disease_infos): count = 0 for disease_dict in disease_infos: node = Node("Disease", name=disease_dict['name'], desc=disease_dict['desc'], prevent=disease_dict['prevent'] ,cause=disease_dict['cause'], easy_get=disease_dict['easy_get'],cure_lasttime=disease_dict['cure_lasttime'], cure_department=disease_dict['cure_department'] ,cure_way=disease_dict['cure_way'] , cured_prob=disease_dict['cured_prob']) self.g.create(node) count += 1 print(count) return
4、创建知识图谱实体节点类型schema
# 创建知识图谱实体节点类型schema def create_graphnodes(self): Drugs, Foods, Checks, Departments, Producers, Symptoms, Diseases, disease_infos,rels_check, rels_recommandeat, rels_noteat, rels_doeat, rels_department, rels_commonddrug, rels_drug_producer, rels_recommanddrug,rels_symptom, rels_acompany, rels_category = self.read_nodes() self.create_diseases_nodes(disease_infos) self.create_node('Drug', Drugs) print(len(Drugs)) self.create_node('Food', Foods) print(len(Foods)) self.create_node('Check', Checks) print(len(Checks)) self.create_node('Department', Departments) print(len(Departments)) self.create_node('Producer', Producers) print(len(Producers)) self.create_node('Symptom', Symptoms) return
5、创建实体关系边
# 创建实体关系边 def create_graphrels(self): Drugs, Foods, Checks, Departments, Producers, Symptoms, Diseases, disease_infos, rels_check, rels_recommandeat, rels_noteat, rels_doeat, rels_department, rels_commonddrug, rels_drug_producer, rels_recommanddrug,rels_symptom, rels_acompany, rels_category = self.read_nodes() self.create_relationship('Disease', 'Food', rels_recommandeat, 'recommand_eat', '推荐食谱') self.create_relationship('Disease', 'Food', rels_noteat, 'no_eat', '忌吃') self.create_relationship('Disease', 'Food', rels_doeat, 'do_eat', '宜吃') self.create_relationship('Department', 'Department', rels_department, 'belongs_to', '属于') self.create_relationship('Disease', 'Drug', rels_commonddrug, 'common_drug', '常用药品') self.create_relationship('Producer', 'Drug', rels_drug_producer, 'drugs_of', '生产药品') self.create_relationship('Disease', 'Drug', rels_recommanddrug, 'recommand_drug', '好评药品') self.create_relationship('Disease', 'Check', rels_check, 'need_check', '诊断检查') self.create_relationship('Disease', 'Symptom', rels_symptom, 'has_symptom', '症状') self.create_relationship('Disease', 'Disease', rels_acompany, 'acompany_with', '并发症') self.create_relationship('Disease', 'Department', rels_category, 'belongs_to', '所属科室')
6、创建实体关联边
# 创建实体关联边 def create_relationship(self, start_node, end_node, edges, rel_type, rel_name): count = 0 # 去重处理 set_edges = [] for edge in edges: set_edges.append('###'.join(edge)) all = len(set(set_edges)) for edge in set(set_edges): edge = edge.split('###') p = edge[0] q = edge[1] query = "match(p:%s),(q:%s) where p.name='%s'and q.name='%s' create (p)-[rel:%s{name:'%s'}]->(q)" % ( start_node, end_node, p, q, rel_type, rel_name) try: self.g.run(query) count += 1 print(rel_type, count, all) except Exception as e: print(e) return
7、导出数据
# 导出数据 def export_data(self): Drugs, Foods, Checks, Departments, Producers, Symptoms, Diseases, disease_infos, rels_check, rels_recommandeat, rels_noteat, rels_doeat, rels_department, rels_commonddrug, rels_drug_producer, rels_recommanddrug, rels_symptom, rels_acompany, rels_category = self.read_nodes() f_drug = open('drug.txt', 'w+') f_food = open('food.txt', 'w+') f_check = open('check.txt', 'w+') f_department = open('department.txt', 'w+') f_producer = open('producer.txt', 'w+') f_symptom = open('symptoms.txt', 'w+') f_disease = open('disease.txt', 'w+') f_drug.write('n'.join(list(Drugs))) f_food.write('n'.join(list(Foods))) f_check.write('n'.join(list(Checks))) f_department.write('n'.join(list(Departments))) f_producer.write('n'.join(list(Producers))) f_symptom.write('n'.join(list(Symptoms))) f_disease.write('n'.join(list(Diseases))) f_drug.close() f_food.close() f_check.close() f_department.close() f_producer.close() f_symptom.close() f_disease.close() return
8、程序主入口
if __name__ == '__main__': handler = MedicalGraph() print("step1:导入图谱节点中") handler.create_graphnodes() print("step2:导入图谱边中") handler.create_graphrels()
# 创建知识节点和边(nodes + rels) # handler.create_graphnodes() # handler.create_graphrels() 快捷键:Ctrl + Shift + F10
8.1、UnicodeDecodeError: 'gbk' codec can't decode byte 0xaf in position 81: illegal multibyte sequence
直接运行会报错:UnicodeDecodeError: 'gbk' codec can't decode byte 0xaf in position 81: illegal multibyte sequence
8.2、修改代码:for data in open(self.data_path):
for data in open(self.data_path, encoding='utf8', mode='r'):
- 需要确保文件的编码格式为 utf8
- 打开文件模式为只读模式
9、运行结果
10、优化导入数据时间
import concurrent import concurrent.futures import json import multiprocessing import os from py2neo import Graph, Node, Subgraph from tqdm import tqdm class MedicalGraph: def __init__(self): pass def clear(self): self.g.run("MATCH (n) DETACH DELETE n") '''读取文件''' def read_nodes(self): # 共7类节点 drugs = [] # 药品 foods = [] # 食物 checks = [] # 检查 departments = [] # 科室 producers = [] # 药品大类 diseases = [] # 疾病 symptoms = [] # 症状 disease_infos = [] # 疾病信息 # 构建节点实体关系 rels_department = [] # 科室-科室关系 rels_noteat = [] # 疾病-忌吃食物关系 rels_doeat = [] # 疾病-宜吃食物关系 rels_recommandeat = [] # 疾病-推荐吃食物关系 rels_commonddrug = [] # 疾病-通用药品关系 rels_recommanddrug = [] # 疾病-热门药品关系 rels_check = [] # 疾病-检查关系 rels_drug_producer = [] # 厂商-药物关系 rels_symptom = [] # 疾病症状关系 rels_acompany = [] # 疾病并发关系 rels_category = [] # 疾病与科室之间的关系 for data in open(self.data_path): disease_dict = {} data_json = json.loads(data) disease = data_json['name'] disease_dict['name'] = disease diseases.append(disease) disease_dict['desc'] = '' disease_dict['prevent'] = '' disease_dict['cause'] = '' disease_dict['easy_get'] = '' disease_dict['cure_department'] = '' disease_dict['cure_way'] = '' disease_dict['cure_lasttime'] = '' disease_dict['symptom'] = '' disease_dict['cured_prob'] = '' if 'symptom' in data_json: symptoms += data_json['symptom'] for symptom in data_json['symptom']: rels_symptom.append([disease, symptom]) if 'acompany' in data_json: for acompany in data_json['acompany']: rels_acompany.append([disease, acompany]) if 'desc' in data_json: disease_dict['desc'] = data_json['desc'] if 'prevent' in data_json: disease_dict['prevent'] = data_json['prevent'] if 'cause' in data_json: disease_dict['cause'] = data_json['cause'] if 'get_prob' in data_json: disease_dict['get_prob'] = data_json['get_prob'] if 'easy_get' in data_json: disease_dict['easy_get'] = data_json['easy_get'] if 'cure_department' in data_json: cure_department = data_json['cure_department'] if len(cure_department) == 1: rels_category.append([disease, cure_department[0]]) if len(cure_department) == 2: big = cure_department[0] small = cure_department[1] rels_department.append([small, big]) rels_category.append([disease, small]) disease_dict['cure_department'] = cure_department departments += cure_department if 'cure_way' in data_json: disease_dict['cure_way'] = data_json['cure_way'] if 'cure_lasttime' in data_json: disease_dict['cure_lasttime'] = data_json['cure_lasttime'] if 'cured_prob' in data_json: disease_dict['cured_prob'] = data_json['cured_prob'] if 'common_drug' in data_json: common_drug = data_json['common_drug'] for drug in common_drug: rels_commonddrug.append([disease, drug]) drugs += common_drug if 'recommand_drug' in data_json: recommand_drug = data_json['recommand_drug'] drugs += recommand_drug for drug in recommand_drug: rels_recommanddrug.append([disease, drug]) if 'not_eat' in data_json: not_eat = data_json['not_eat'] for _not in not_eat: rels_noteat.append([disease, _not]) foods += not_eat do_eat = data_json['do_eat'] for _do in do_eat: rels_doeat.append([disease, _do]) foods += do_eat recommand_eat = data_json['recommand_eat'] for _recommand in recommand_eat: rels_recommandeat.append([disease, _recommand]) foods += recommand_eat if 'check' in data_json: check = data_json['check'] for _check in check: rels_check.append([disease, _check]) checks += check if 'drug_detail' in data_json: drug_detail = data_json['drug_detail'] producer = [i.split('(')[0] for i in drug_detail] rels_drug_producer += [[i.split('(')[0], i.split('(')[-1].replace(')', '')] for i in drug_detail] producers += producer disease_infos.append(disease_dict) return set(drugs), set(foods), set(checks), set(departments), set(producers), set(symptoms), set(diseases), disease_infos, rels_check, rels_recommandeat, rels_noteat, rels_doeat, rels_department, rels_commonddrug, rels_drug_producer, rels_recommanddrug, rels_symptom, rels_acompany, rels_category '''建立节点''' def create_node(self, label, nodes): batch_size = 1000 batches = [list(nodes)[i:i + batch_size] for i in range(0, len(nodes), batch_size)] for batch in tqdm(batches, desc=f"Creating {label} Nodes", unit="batch"): batch_nodes = [Node(label, name=node_name) for node_name in batch] self.g.create(Subgraph(batch_nodes)) '''创建知识图谱中心疾病的节点''' def create_diseases_nodes(self, disease_infos): batch_size = 1000 batches = [disease_infos[i:i + batch_size] for i in range(0, len(disease_infos), batch_size)] for batch in tqdm(batches, desc="Importing Disease Nodes", unit="batch"): batch_nodes = [ Node("Disease", name=disease_dict['name'], desc=disease_dict['desc'], prevent=disease_dict['prevent'], cause=disease_dict['cause'], easy_get=disease_dict['easy_get'], cure_lasttime=disease_dict['cure_lasttime'], cure_department=disease_dict['cure_department'], cure_way=disease_dict['cure_way'], cured_prob=disease_dict['cured_prob']) for disease_dict in batch ] self.g.create(Subgraph(batch_nodes)) '''创建知识图谱实体节点类型schema''' def create_graphnodes(self): Drugs, Foods, Checks, Departments, Producers, Symptoms, Diseases, disease_infos, rels_check, rels_recommandeat, rels_noteat, rels_doeat, rels_department, rels_commonddrug, rels_drug_producer, rels_recommanddrug, rels_symptom, rels_acompany, rels_category = self.read_nodes() self.create_diseases_nodes(disease_infos) self.create_node('Drug', Drugs) self.create_node('Food', Foods) self.create_node('Check', Checks) self.create_node('Department', Departments) self.create_node('Producer', Producers) self.create_node('Symptom', Symptoms) '''创建实体关系边''' def create_graphrels(self): Drugs, Foods, Checks, Departments, Producers, Symptoms, Diseases, disease_infos, rels_check, rels_recommandeat, rels_noteat, rels_doeat, rels_department, rels_commonddrug, rels_drug_producer, rels_recommanddrug, rels_symptom, rels_acompany, rels_category = self.read_nodes() self.create_relationship('Disease', 'Food', rels_recommandeat, 'recommand_eat', '推荐食谱') self.create_relationship('Disease', 'Food', rels_noteat, 'no_eat', '忌吃') self.create_relationship('Disease', 'Food', rels_doeat, 'do_eat', '宜吃') self.create_relationship('Department', 'Department', rels_department, 'belongs_to', '属于') self.create_relationship('Disease', 'Drug', rels_commonddrug, 'common_drug', '常用药品') self.create_relationship('Producer', 'Drug', rels_drug_producer, 'drugs_of', '生产药品') self.create_relationship('Disease', 'Drug', rels_recommanddrug, 'recommand_drug', '好评药品') self.create_relationship('Disease', 'Check', rels_check, 'need_check', '诊断检查') self.create_relationship('Disease', 'Symptom', rels_symptom, 'has_symptom', '症状') self.create_relationship('Disease', 'Disease', rels_acompany, 'acompany_with', '并发症') self.create_relationship('Disease', 'Department', rels_category, 'belongs_to', '所属科室') '''创建实体关联边''' def create_relationship(self, start_node, end_node, edges, rel_type, rel_name): batch_size = 10000 set_edges = set(['###'.join(edge) for edge in edges]) batches = [list(set_edges)[i:i + batch_size] for i in range(0, len(set_edges), batch_size)] executor = concurrent.futures.ThreadPoolExecutor(max_workers=min(multiprocessing.cpu_count(), 4)) tasks = [ lambda: ( tx := self.g.begin(), [ tx.run( f"MATCH (p:{start_node}), (q:{end_node}) " f"WHERE p.name='{p}' AND q.name='{q}' " f"CREATE (p)-[rel:{rel_type} {{name:'{rel_name}'}}]->(q)" ) for edge in batch for p, q in [edge.split('###')] ], self.g.commit(tx) ) for batch in tqdm(batches, desc=f"Creating {rel_type} Relationships", unit="batch") ] executor.map(lambda task: task(), tasks) executor.shutdown() '''导出数据''' def export_data(self): Drugs, Foods, Checks, Departments, Producers, Symptoms, Diseases, disease_infos, rels_check, rels_recommandeat, rels_noteat, rels_doeat, rels_department, rels_commonddrug, rels_drug_producer, rels_recommanddrug, rels_symptom, rels_acompany, rels_category = self.read_nodes() f_drug = open('drug.txt', 'w+') f_food = open('food.txt', 'w+') f_check = open('check.txt', 'w+') f_department = open('department.txt', 'w+') f_producer = open('producer.txt', 'w+') f_symptom = open('symptoms.txt', 'w+') f_disease = open('disease.txt', 'w+') f_drug.write('n'.join(list(Drugs))) f_food.write('n'.join(list(Foods))) f_check.write('n'.join(list(Checks))) f_department.write('n'.join(list(Departments))) f_producer.write('n'.join(list(Producers))) f_symptom.write('n'.join(list(Symptoms))) f_disease.write('n'.join(list(Diseases))) f_drug.close() f_food.close() f_check.close() f_department.close() f_producer.close() f_symptom.close() f_disease.close() if __name__ == '__main__': handler = MedicalGraph() handler.clear() print("step1:导入图谱节点中") handler.create_graphnodes() print("step2:导入图谱边中") handler.create_graphrels()
六、PyCharm 实现问答系统
1、问句类型分类脚本
这里 加载多个特征词列表 处需要保证文件编码格式为 utf8
即添加内容:encoding='utf8'
import os import ahocorasick class QuestionClassifier: def __init__(self): cur_dir = '/'.join(os.path.abspath(__file__).split('/')[:-1]) # 特征词路径 self.disease_path = os.path.join(cur_dir, 'dict/disease.txt') self.department_path = os.path.join(cur_dir, 'dict/department.txt') self.check_path = os.path.join(cur_dir, 'dict/check.txt') self.drug_path = os.path.join(cur_dir, 'dict/drug.txt') self.food_path = os.path.join(cur_dir, 'dict/food.txt') self.producer_path = os.path.join(cur_dir, 'dict/producer.txt') self.symptom_path = os.path.join(cur_dir, 'dict/symptom.txt') self.deny_path = os.path.join(cur_dir, 'dict/deny.txt') # 加载特征词 self.disease_wds= [i.strip() for i in open(self.disease_path,encoding='utf8') if i.strip()] self.department_wds= [i.strip() for i in open(self.department_path,encoding='utf8') if i.strip()] self.check_wds= [i.strip() for i in open(self.check_path,encoding='utf8') if i.strip()] self.drug_wds= [i.strip() for i in open(self.drug_path,encoding='utf8') if i.strip()] self.food_wds= [i.strip() for i in open(self.food_path,encoding='utf8') if i.strip()] self.producer_wds= [i.strip() for i in open(self.producer_path,encoding='utf8') if i.strip()] self.symptom_wds= [i.strip() for i in open(self.symptom_path,encoding='utf8') if i.strip()] self.region_words = set(self.department_wds + self.disease_wds + self.check_wds + self.drug_wds + self.food_wds + self.producer_wds + self.symptom_wds) self.deny_words = [i.strip() for i in open(self.deny_path,encoding='utf8') if i.strip()] # 构造领域actree self.region_tree = self.build_actree(list(self.region_words)) # 构建词典 self.wdtype_dict = self.build_wdtype_dict() # 问句疑问词 self.symptom_qwds = ['症状', '表征', '现象', '症候', '表现'] self.cause_qwds = ['原因','成因', '为什么', '怎么会', '怎样才', '咋样才', '怎样会', '如何会', '为啥', '为何', '如何才会', '怎么才会', '会导致', '会造成'] self.acompany_qwds = ['并发症', '并发', '一起发生', '一并发生', '一起出现', '一并出现', '一同发生', '一同出现', '伴随发生', '伴随', '共现'] self.food_qwds = ['饮食', '饮用', '吃', '食', '伙食', '膳食', '喝', '菜' ,'忌口', '补品', '保健品', '食谱', '菜谱', '食用', '食物','补品'] self.drug_qwds = ['药', '药品', '用药', '胶囊', '口服液', '炎片'] self.prevent_qwds = ['预防', '防范', '抵制', '抵御', '防止','躲避','逃避','避开','免得','逃开','避开','避掉','躲开','躲掉','绕开', '怎样才能不', '怎么才能不', '咋样才能不','咋才能不', '如何才能不', '怎样才不', '怎么才不', '咋样才不','咋才不', '如何才不', '怎样才可以不', '怎么才可以不', '咋样才可以不', '咋才可以不', '如何可以不', '怎样才可不', '怎么才可不', '咋样才可不', '咋才可不', '如何可不'] self.lasttime_qwds = ['周期', '多久', '多长时间', '多少时间', '几天', '几年', '多少天', '多少小时', '几个小时', '多少年'] self.cureway_qwds = ['怎么治疗', '如何医治', '怎么医治', '怎么治', '怎么医', '如何治', '医治方式', '疗法', '咋治', '怎么办', '咋办', '咋治'] self.cureprob_qwds = ['多大概率能治好', '多大几率能治好', '治好希望大么', '几率', '几成', '比例', '可能性', '能治', '可治', '可以治', '可以医'] self.easyget_qwds = ['易感人群', '容易感染', '易发人群', '什么人', '哪些人', '感染', '染上', '得上'] self.check_qwds = ['检查', '检查项目', '查出', '检查', '测出', '试出'] self.belong_qwds = ['属于什么科', '属于', '什么科', '科室'] self.cure_qwds = ['治疗什么', '治啥', '治疗啥', '医治啥', '治愈啥', '主治啥', '主治什么', '有什么用', '有何用', '用处', '用途', '有什么好处', '有什么益处', '有何益处', '用来', '用来做啥', '用来作甚', '需要', '要'] print('model init finished ......') return '''分类主函数''' def classify(self, question): data = {} medical_dict = self.check_medical(question) if not medical_dict: return {} data['args'] = medical_dict #收集问句当中所涉及到的实体类型 types = [] for type_ in medical_dict.values(): types += type_ question_type = 'others' question_types = [] # 症状 if self.check_words(self.symptom_qwds, question) and ('disease' in types): question_type = 'disease_symptom' question_types.append(question_type) if self.check_words(self.symptom_qwds, question) and ('symptom' in types): question_type = 'symptom_disease' question_types.append(question_type) # 原因 if self.check_words(self.cause_qwds, question) and ('disease' in types): question_type = 'disease_cause' question_types.append(question_type) # 并发症 if self.check_words(self.acompany_qwds, question) and ('disease' in types): question_type = 'disease_acompany' question_types.append(question_type) # 推荐食品 if self.check_words(self.food_qwds, question) and 'disease' in types: deny_status = self.check_words(self.deny_words, question) if deny_status: question_type = 'disease_not_food' else: question_type = 'disease_do_food' question_types.append(question_type) #已知食物找疾病 if self.check_words(self.food_qwds+self.cure_qwds, question) and 'food' in types: deny_status = self.check_words(self.deny_words, question) if deny_status: question_type = 'food_not_disease' else: question_type = 'food_do_disease' question_types.append(question_type) # 推荐药品 if self.check_words(self.drug_qwds, question) and 'disease' in types: question_type = 'disease_drug' question_types.append(question_type) # 药品治啥病 if self.check_words(self.cure_qwds, question) and 'drug' in types: question_type = 'drug_disease' question_types.append(question_type) # 疾病接受检查项目 if self.check_words(self.check_qwds, question) and 'disease' in types: question_type = 'disease_check' question_types.append(question_type) # 已知检查项目查相应疾病 if self.check_words(self.check_qwds+self.cure_qwds, question) and 'check' in types: question_type = 'check_disease' question_types.append(question_type) # 症状防御 if self.check_words(self.prevent_qwds, question) and 'disease' in types: question_type = 'disease_prevent' question_types.append(question_type) # 疾病医疗周期 if self.check_words(self.lasttime_qwds, question) and 'disease' in types: question_type = 'disease_lasttime' question_types.append(question_type) # 疾病治疗方式 if self.check_words(self.cureway_qwds, question) and 'disease' in types: question_type = 'disease_cureway' question_types.append(question_type) # 疾病治愈可能性 if self.check_words(self.cureprob_qwds, question) and 'disease' in types: question_type = 'disease_cureprob' question_types.append(question_type) # 疾病易感染人群 if self.check_words(self.easyget_qwds, question) and 'disease' in types : question_type = 'disease_easyget' question_types.append(question_type) # 若没有查到相关的外部查询信息,那么则将该疾病的描述信息返回 if question_types == [] and 'disease' in types: question_types = ['disease_desc'] # 若没有查到相关的外部查询信息,那么则将该疾病的描述信息返回 if question_types == [] and 'symptom' in types: question_types = ['symptom_disease'] # 将多个分类结果进行合并处理,组装成一个字典 data['question_types'] = question_types return data '''构造词对应的类型''' def build_wdtype_dict(self): wd_dict = dict() for wd in self.region_words: wd_dict[wd] = [] if wd in self.disease_wds: wd_dict[wd].append('disease') if wd in self.department_wds: wd_dict[wd].append('department') if wd in self.check_wds: wd_dict[wd].append('check') if wd in self.drug_wds: wd_dict[wd].append('drug') if wd in self.food_wds: wd_dict[wd].append('food') if wd in self.symptom_wds: wd_dict[wd].append('symptom') if wd in self.producer_wds: wd_dict[wd].append('producer') return wd_dict '''构造actree,加速过滤''' def build_actree(self, wordlist): actree = ahocorasick.Automaton() for index, word in enumerate(wordlist): actree.add_word(word, (index, word)) actree.make_automaton() return actree '''问句过滤''' def check_medical(self, question): region_wds = [] for i in self.region_tree.iter(question): wd = i[1][1] region_wds.append(wd) stop_wds = [] for wd1 in region_wds: for wd2 in region_wds: if wd1 in wd2 and wd1 != wd2: stop_wds.append(wd1) final_wds = [i for i in region_wds if i not in stop_wds] final_dict = {i:self.wdtype_dict.get(i) for i in final_wds} return final_dict '''基于特征词进行分类''' def check_words(self, wds, sent): for wd in wds: if wd in sent: return True return False if __name__ == '__main__': handler = QuestionClassifier() while 1: question = input('input an question:') data = handler.classify(question) print(data)
2、问句解析脚本
class QuestionPaser: '''构建实体节点''' def build_entitydict(self, args): entity_dict = {} for arg, types in args.items(): for type in types: if type not in entity_dict: entity_dict[type] = [arg] else: entity_dict[type].append(arg) return entity_dict '''解析主函数''' def parser_main(self, res_classify): args = res_classify['args'] entity_dict = self.build_entitydict(args) question_types = res_classify['question_types'] sqls = [] for question_type in question_types: sql_ = {} sql_['question_type'] = question_type sql = [] if question_type == 'disease_symptom': sql = self.sql_transfer(question_type, entity_dict.get('disease')) elif question_type == 'symptom_disease': sql = self.sql_transfer(question_type, entity_dict.get('symptom')) elif question_type == 'disease_cause': sql = self.sql_transfer(question_type, entity_dict.get('disease')) elif question_type == 'disease_acompany': sql = self.sql_transfer(question_type, entity_dict.get('disease')) elif question_type == 'disease_not_food': sql = self.sql_transfer(question_type, entity_dict.get('disease')) elif question_type == 'disease_do_food': sql = self.sql_transfer(question_type, entity_dict.get('disease')) elif question_type == 'food_not_disease': sql = self.sql_transfer(question_type, entity_dict.get('food')) elif question_type == 'food_do_disease': sql = self.sql_transfer(question_type, entity_dict.get('food')) elif question_type == 'disease_drug': sql = self.sql_transfer(question_type, entity_dict.get('disease')) elif question_type == 'drug_disease': sql = self.sql_transfer(question_type, entity_dict.get('drug')) elif question_type == 'disease_check': sql = self.sql_transfer(question_type, entity_dict.get('disease')) elif question_type == 'check_disease': sql = self.sql_transfer(question_type, entity_dict.get('check')) elif question_type == 'disease_prevent': sql = self.sql_transfer(question_type, entity_dict.get('disease')) elif question_type == 'disease_lasttime': sql = self.sql_transfer(question_type, entity_dict.get('disease')) elif question_type == 'disease_cureway': sql = self.sql_transfer(question_type, entity_dict.get('disease')) elif question_type == 'disease_cureprob': sql = self.sql_transfer(question_type, entity_dict.get('disease')) elif question_type == 'disease_easyget': sql = self.sql_transfer(question_type, entity_dict.get('disease')) elif question_type == 'disease_desc': sql = self.sql_transfer(question_type, entity_dict.get('disease')) if sql: sql_['sql'] = sql sqls.append(sql_) return sqls '''针对不同的问题,分开进行处理''' def sql_transfer(self, question_type, entities): if not entities: return [] # 查询语句 sql = [] # 查询疾病的原因 if question_type == 'disease_cause': sql = ["MATCH (m:Disease) where m.name = '{0}' return m.name, m.cause".format(i) for i in entities] # 查询疾病的防御措施 elif question_type == 'disease_prevent': sql = ["MATCH (m:Disease) where m.name = '{0}' return m.name, m.prevent".format(i) for i in entities] # 查询疾病的持续时间 elif question_type == 'disease_lasttime': sql = ["MATCH (m:Disease) where m.name = '{0}' return m.name, m.cure_lasttime".format(i) for i in entities] # 查询疾病的治愈概率 elif question_type == 'disease_cureprob': sql = ["MATCH (m:Disease) where m.name = '{0}' return m.name, m.cured_prob".format(i) for i in entities] # 查询疾病的治疗方式 elif question_type == 'disease_cureway': sql = ["MATCH (m:Disease) where m.name = '{0}' return m.name, m.cure_way".format(i) for i in entities] # 查询疾病的易发人群 elif question_type == 'disease_easyget': sql = ["MATCH (m:Disease) where m.name = '{0}' return m.name, m.easy_get".format(i) for i in entities] # 查询疾病的相关介绍 elif question_type == 'disease_desc': sql = ["MATCH (m:Disease) where m.name = '{0}' return m.name, m.desc".format(i) for i in entities] # 查询疾病有哪些症状 elif question_type == 'disease_symptom': sql = ["MATCH (m:Disease)-[r:has_symptom]->(n:Symptom) where m.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] # 查询症状会导致哪些疾病 elif question_type == 'symptom_disease': sql = ["MATCH (m:Disease)-[r:has_symptom]->(n:Symptom) where n.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] # 查询疾病的并发症 elif question_type == 'disease_acompany': sql1 = ["MATCH (m:Disease)-[r:acompany_with]->(n:Disease) where m.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] sql2 = ["MATCH (m:Disease)-[r:acompany_with]->(n:Disease) where n.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] sql = sql1 + sql2 # 查询疾病的忌口 elif question_type == 'disease_not_food': sql = ["MATCH (m:Disease)-[r:no_eat]->(n:Food) where m.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] # 查询疾病建议吃的东西 elif question_type == 'disease_do_food': sql1 = ["MATCH (m:Disease)-[r:do_eat]->(n:Food) where m.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] sql2 = ["MATCH (m:Disease)-[r:recommand_eat]->(n:Food) where m.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] sql = sql1 + sql2 # 已知忌口查疾病 elif question_type == 'food_not_disease': sql = ["MATCH (m:Disease)-[r:no_eat]->(n:Food) where n.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] # 已知推荐查疾病 elif question_type == 'food_do_disease': sql1 = ["MATCH (m:Disease)-[r:do_eat]->(n:Food) where n.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] sql2 = ["MATCH (m:Disease)-[r:recommand_eat]->(n:Food) where n.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] sql = sql1 + sql2 # 查询疾病常用药品-药品别名记得扩充 elif question_type == 'disease_drug': sql1 = ["MATCH (m:Disease)-[r:common_drug]->(n:Drug) where m.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] sql2 = ["MATCH (m:Disease)-[r:recommand_drug]->(n:Drug) where m.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] sql = sql1 + sql2 # 已知药品查询能够治疗的疾病 elif question_type == 'drug_disease': sql1 = ["MATCH (m:Disease)-[r:common_drug]->(n:Drug) where n.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] sql2 = ["MATCH (m:Disease)-[r:recommand_drug]->(n:Drug) where n.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] sql = sql1 + sql2 # 查询疾病应该进行的检查 elif question_type == 'disease_check': sql = ["MATCH (m:Disease)-[r:need_check]->(n:Check) where m.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] # 已知检查查询疾病 elif question_type == 'check_disease': sql = ["MATCH (m:Disease)-[r:need_check]->(n:Check) where n.name = '{0}' return m.name, r.name, n.name".format(i) for i in entities] return sql if __name__ == '__main__': handler = QuestionPaser()
3、问答程序脚本
from py2neo import Graph class AnswerSearcher: def __init__(self): self.g = Graph("neo4j://192.168.112.30:7687", auth=("neo4j", "neo4jpassword")) self.num_limit = 20 '''执行cypher查询,并返回相应结果''' def search_main(self, sqls): final_answers = [] for sql_ in sqls: question_type = sql_['question_type'] queries = sql_['sql'] answers = [] for query in queries: ress = self.g.run(query).data() answers += ress final_answer = self.answer_prettify(question_type, answers) if final_answer: final_answers.append(final_answer) return final_answers '''根据对应的qustion_type,调用相应的回复模板''' def answer_prettify(self, question_type, answers): final_answer = [] if not answers: return '' if question_type == 'disease_symptom': desc = [i['n.name'] for i in answers] subject = answers[0]['m.name'] final_answer = '{0}的症状包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'symptom_disease': desc = [i['m.name'] for i in answers] subject = answers[0]['n.name'] final_answer = '症状{0}可能染上的疾病有:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'disease_cause': desc = [i['m.cause'] for i in answers] subject = answers[0]['m.name'] final_answer = '{0}可能的成因有:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'disease_prevent': desc = [i['m.prevent'] for i in answers] subject = answers[0]['m.name'] final_answer = '{0}的预防措施包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'disease_lasttime': desc = [i['m.cure_lasttime'] for i in answers] subject = answers[0]['m.name'] final_answer = '{0}治疗可能持续的周期为:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'disease_cureway': desc = [';'.join(i['m.cure_way']) for i in answers] subject = answers[0]['m.name'] final_answer = '{0}可以尝试如下治疗:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'disease_cureprob': desc = [i['m.cured_prob'] for i in answers] subject = answers[0]['m.name'] final_answer = '{0}治愈的概率为(仅供参考):{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'disease_easyget': desc = [i['m.easy_get'] for i in answers] subject = answers[0]['m.name'] final_answer = '{0}的易感人群包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'disease_desc': desc = [i['m.desc'] for i in answers] subject = answers[0]['m.name'] final_answer = '{0},熟悉一下:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'disease_acompany': desc1 = [i['n.name'] for i in answers] desc2 = [i['m.name'] for i in answers] subject = answers[0]['m.name'] desc = [i for i in desc1 + desc2 if i != subject] final_answer = '{0}的症状包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'disease_not_food': desc = [i['n.name'] for i in answers] subject = answers[0]['m.name'] final_answer = '{0}忌食的食物包括有:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'disease_do_food': do_desc = [i['n.name'] for i in answers if i['r.name'] == '宜吃'] recommand_desc = [i['n.name'] for i in answers if i['r.name'] == '推荐食谱'] subject = answers[0]['m.name'] final_answer = '{0}宜食的食物包括有:{1}n推荐食谱包括有:{2}'.format(subject, ';'.join(list(set(do_desc))[:self.num_limit]), ';'.join(list(set(recommand_desc))[:self.num_limit])) elif question_type == 'food_not_disease': desc = [i['m.name'] for i in answers] subject = answers[0]['n.name'] final_answer = '患有{0}的人最好不要吃{1}'.format(';'.join(list(set(desc))[:self.num_limit]), subject) elif question_type == 'food_do_disease': desc = [i['m.name'] for i in answers] subject = answers[0]['n.name'] final_answer = '患有{0}的人建议多试试{1}'.format(';'.join(list(set(desc))[:self.num_limit]), subject) elif question_type == 'disease_drug': desc = [i['n.name'] for i in answers] subject = answers[0]['m.name'] final_answer = '{0}通常的使用的药品包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'drug_disease': desc = [i['m.name'] for i in answers] subject = answers[0]['n.name'] final_answer = '{0}主治的疾病有{1},可以试试'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'disease_check': desc = [i['n.name'] for i in answers] subject = answers[0]['m.name'] final_answer = '{0}通常可以通过以下方式检查出来:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) elif question_type == 'check_disease': desc = [i['m.name'] for i in answers] subject = answers[0]['n.name'] final_answer = '通常可以通过{0}检查出来的疾病有{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit])) return final_answer if __name__ == '__main__': searcher = AnswerSearcher()
4、问答系统实现
4.1、模型初始化
from answer_search import * from question_classifier import * from question_parser import * class ChatBotGraph: def __init__(self): self.classifier = QuestionClassifier() self.parser = QuestionPaser() self.searcher = AnswerSearcher()
4.2、问答主函数
def chat_main(self, sent): answer = '您好,我是医药智能助理,希望可以帮到您。如果没答上来,可联系https://liuhuanyong.github.io/。祝您身体棒棒!' res_classify = self.classifier.classify(sent) if not res_classify: return answer res_sql = self.parser.parser_main(res_classify) final_answers = self.searcher.search_main(res_sql) if not final_answers: return answer else: return 'n'.join(final_answers)
4.3、运行主入口
运行 chatbot_graph.py 文件
if __name__ == '__main__': handler = ChatBotGraph() while 1: question = input('用户:') answer = handler.chat_main(question) print('医药智能助理:', answer)