nlp项目:搭建一个简单的问答系统
文章目录
- 引言
- 一、问答系统任务介绍
- 1. 模块介绍
- 2. 数据介绍
- 3. 项目工具介绍
- 二、搭建问答系统
- 1. 文本读取
- 2. 可视化分析
- 3. 文本预处理
- 3.1 无用符号过滤
- 3.2 停用词过滤
- 3.3 去掉低频率的词
- 3.4 处理数字
- 3.5 其他辅助函数
- 3.6 文本预处理流程
- 4.文本表示
- 4.1 使用tf-idf表示向量
- 4.2 使用wordvec + average pooling
- 4.3 使用BERT + average pooling
- 5.相似度匹配与搜索
- 5.1 tf-idf + 余弦相似度
- 5.2 倒排索引表的创建
- 5.3 语义相似度
- 5.4 利用倒排索引表进行搜索
- 6.拼写纠错
- 6.1 训练一个语言模型
- 6.2 计算channel probability
- 6.3 根据错别字生成所有候选集合
- 6.4 给定一个输入,如果有错误给予纠正
- 6.5 基于拼写纠错算法,实现用户输入自动矫正
- 7.总结
这是贪心上面的某个项目,我拿来学习分享的!!!
数据集代码链接见:https://gitee.com/lj857335332/question-answering-system
引言
下面展示对话系统框架:
从框架方面,对话系统可以分为问答系统与多轮对话系统。本文着重讲解基于检索形式的问答系统。问答系统又包括结构化的问答系统与非结构化的问答系统。其中涉及的技术包括信息检索与语义匹配技术。涉及到的算法有TF-IDF算法、Jieba分词(中文)、停用词的去除(英文)等文本处理到文本匹配这方面的内容。这种基于检索形式的问答系统在现实中非常常见,比如:百度搜索、谷歌搜索等。文本的组织形式如下:
下面展示基于检索式的问答系统的流程:
一、问答系统任务介绍
问答系统所需要的数据已经提供,对于每一个问题都可以找得到相应的答案,所以可以理解为每一个样本数据是 <问题、答案>
。 那系统的核心是当用户输入一个问题的时候,首先要找到跟这个问题最相近的已经存储在库里的问题,然后直接返回相应的答案即可(但实际上也可以抽取其中的实体或者关键词)。 举一个简单的例子:
假设我们的库里面已有存在以下几个<问题,答案>:
- <"贪心学院主要做什么方面的业务?”, “他们主要做人工智能方面的教育”>
- <“国内有哪些做人工智能教育的公司?”, “贪心学院”>
- <“人工智能和机器学习的关系什么?”, “其实机器学习是人工智能的一个范畴,很多人工智能的应用要基于机器学习的技术”>
- <“人工智能最核心的语言是什么?”, ”Python“>
假设一个用户往系统中输入了问题 “贪心学院是做什么的?”, 那这时候系统先去匹配最相近的“已经存在库里的”问题。 那在这里很显然是 “贪心学院是做什么的”和“贪心学院主要做什么方面的业务?”是最相近的。 所以当我们定位到这个问题之后,直接返回它的答案 “他们主要做人工智能方面的教育”就可以了。 所以这里的核心问题可以归结为计算两个问句(query)之间的相似度。
1. 模块介绍
问答系统项目涉及的模块包括:
- 文本的读取: 需要从相应的文件里读取
(问题,答案)
- 文本预处理: 清洗文本很重要,需要涉及到
停用词过滤
等工作 - 文本的表示: 如果表示一个句子是非常核心的问题,这里会涉及到
tf-idf
,Glove
以及BERT Embedding
- 文本相似度匹配: 在基于检索式系统中一个核心的部分是计算文本之间的
相似度
,从而选择相似度最高的问题然后返回这些问题的答案 - 倒排表: 为了加速搜索速度,我们需要设计
倒排表
来存储每一个词与出现的文本 - 词义匹配:直接使用倒排表会忽略到一些意思上相近但不完全一样的单词,我们需要做这部分的处理。我们需要提前构建好
相似的单词
然后搜索阶段使用 - 拼写纠错:我们不能保证用户输入的准确,所以第一步需要做用户输入检查,如果发现用户拼错了,我们需要及时在后台改正,然后按照修改后的在库里面搜索
- 文档的排序: 最后返回结果的排序根据文档之间
余弦相似度
有关,同时也跟倒排表中匹配的单词有关
2. 数据介绍
项目中需要的数据:
train-v2.0.json
: 这个数据包含了问题和答案的pair, 但是以JSON格式存在,需要编写parser来提取出里面的问题和答案。glove.6B
: 这个文件需要从网上下载,下载地址为:https://nlp.stanford.edu/projects/glove/, 请使用d=200的词向量(将)spell-errors.txt
这个文件主要用来编写拼写纠错模块。 文件中第一列为正确的单词,之后列出来的单词都是常见的错误写法。 但这里需要注意的一点是我们没有给出他们之间的概率,也就是p(错误|正确),所以我们可以认为每一种类型的错误都是同等概率
vocab.txt
这里列了几万个英文常见的单词,可以用这个词库来验证是否有些单词被拼错testdata.txt
这里搜集了一些测试数据,可以用来测试自己的spell corrector。这个文件只是用来测试自己的程序。
3. 项目工具介绍
在本次项目中,你将会用到以下几个工具:
sklearn
。具体安装请见:http://scikit-learn.org/stable/install.html sklearn包含了各类机器学习算法和数据处理工具,包括本项目需要使用的词袋模型,均可以在sklearn工具包中找得到。jieba
,用来做分词。具体使用方法请见 https://github.com/fxsjy/jiebabert embedding
: https://github.com/imgarylai/bert-embeddingnltk
:https://www.nltk.org/index.html
二、搭建问答系统
首先,对训练数据进行处理:读取文件与预处理
文本的读取
: 需要从文本中读取数据,此处需要读取的文件是train-v2.0.json
,并把读取的文件存入一个列表里(list)文本预处理
: 对于问题本身需要做一些停用词过滤等文本方面的处理可视化分析
: 对于给定的样本数据,做一些可视化分析来更好地理解数据
1. 文本读取
把给定的文本数据读入到qlist
和alist
当中,这两个分别是列表,其中qlist
是问题的列表,alist
是对应的答案列表
qlist=["问题1",“问题2”,“问题3”....]alist=["答案1","答案2","答案3"....]qlist = ["问题1", “问题2”, “问题3” ....]\\ alist = ["答案1", "答案2", "答案3" ....]qlist=["问题1",“问题2”,“问题3”....]alist=["答案1","答案2","答案3"....]
# 把给定的文本数据读入到```qlist```和```alist```当中,这两个分别是列表,其中```qlist```是问题的列表,```alist```是对应的答案列表
def read_corpus():"""读取给定的语料库,并把问题列表和答案列表分别写入到 qlist, alist 里面。 qlist = ["问题1", “问题2”, “问题3” ....]alist = ["答案1", "答案2", "答案3" ....]务必要让每一个问题和答案对应起来(下标位置一致)"""# 问题列表qlist = []# 答案列表alist = []# 文件名称filename = 'data/train-v2.0.json'# 加载json文件datas = json.load(open(filename, 'r'))# 下面将通过字典索引提取问题与答案data = datas['data']for d in data:paragraph = d['paragraphs']for p in paragraph:qas = p['qas']for qa in qas:# print(qa)# 处理is_impossible为True时answers空if (not qa['is_impossible']):qlist.append(qa['question'])alist.append(qa['answers'][0]['text'])# print(qlist[0])# print(alist[0])assert len(qlist) == len(alist) # 确保长度一样return qlist, alist# 读取给定的语料库,并把问题列表和答案列表分别写入到 qlist, alist
qlist, alist = read_corpus()
2. 可视化分析
统计一下在qlist中总共出现了多少个单词? 总共出现了多少个不同的单词(unique word)?这里需要做简单的分词,对于英文我们根据空格来分词即可,其他过滤暂不考虑(只需分词)。
words_qlist = dict()
for q in qlist:# 以空格为分词,都转为小写words = q.strip().split(' ')for w in words:if w.lower() in words_qlist:words_qlist[w.lower()] += 1else:words_qlist[w.lower()] = 1
word_total = len(words_qlist)
print(word_total)
统计一下qlist中出现1次,2次,3次… 出现的单词个数, 然后画一个plot。 这里的x轴是单词出现的次数(1,2,3,…), y轴是单词个数。从左到右分别是 出现1次的单词数,出现2次的单词数,出现3次的单词数。
import matplotlib.pyplot as plt
import numpy as np# counts:key出现N次,value:出现N次词有多少
counts = dict()
for w, c in words_qlist.items():if c in counts:counts[c] += 1else:counts[c] = 1
# 以histogram画图
fig, ax = plt.subplots()
ax.hist(counts.values(), bins=np.arange(0, 250, 25), histtype='step', alpha=0.6, label="counts")
ax.legend()
ax.set_xlim(0, 250)
ax.set_yticks(np.arange(0, 500, 50))
plt.show()
3. 文本预处理
文本预处理一般用到如下的方法:
- 停用词过滤 (去网上搜一下 “english stop words list”,会出现很多包含停用词库的网页,或者直接使用NLTK自带的)
- 转换成lower_case: 这是一个基本的操作
- 去掉一些无用的符号: 比如连续的感叹号!!!, 或者一些奇怪的单词。
- 去掉出现频率很低的词:比如出现次数少于10,20… (想一下如何选择阈值)
- 对于数字的处理: 分词完只有有些单词可能就是数字比如44,415,把所有这些数字都看成是一个单词,这个新的单词我们可以定义为 “#number”
- lemmazation(词性还原): 在这里不要使用stemming(词干提取), 因为stemming的结果有可能不是valid word。
3.1 无用符号过滤
import nltk
from nltk.corpus import stopwords
import codecs
import re# 去掉一些无用的符号
def tokenizer(ori_list):# 利用正则表达式去掉无用的符号# compile 函数用于编译正则表达式,[]用来表示一组字符# \s匹配任意空白字符,等价于 [\t\n\r\f]。SYMBOLS = re.compile('[\s;\"\",.!?\\/\[\]\{\}\(\)-]+')new_list = []for q in ori_list:# split 方法按照能够匹配的子串将字符串分割后返回列表words = SYMBOLS.split(q.lower().strip())new_list.append(' '.join(words))return new_list
3.2 停用词过滤
# 去掉question的停用词
def removeStopWord(ori_list):new_list = []# nltk中stopwords包含what等,但是在QA问题中,这算关键词,所以不看作关键词restored = ['what', 'when', 'which', 'how', 'who', 'where']# nltk中自带的停用词库,加载英语停用词english_stop_words = list(set(stopwords.words('english'))) # ['what','when','which','how','who','where','a','an','the'] ## 将在QA问答系统中不算停用词的词去掉for w in restored:english_stop_words.remove(w)for q in ori_list:# 将每个问句的停用词去掉sentence = ' '.join([w for w in q.strip().split(' ') if w not in english_stop_words])# 将去掉停用词的问句添加至列表中new_list.append(sentence)return new_list
3.3 去掉低频率的词
def removeLowFrequence(ori_list, vocabulary, thres=10):"""去掉低频率的词:param ori_list: 预处理后的问题列表:param vocabulary: 词频率字典:param thres: 频率阈值,可以基于数据实际情况进行调整:return: 新的问题列表"""# 根据thres筛选词表,小于thres的词去掉new_list = []for q in ori_list:sentence = ' '.join([w for w in q.strip().split(' ') if vocabulary[w] >= thres])new_list.append(sentence)return new_list
3.4 处理数字
在文本匹配时,如果数字较多,就会造成噪声。
def replaceDigits(ori_list, replace='#number'):"""将数字统一替换为replace,默认#number:param ori_list: 预处理后的问题列表:param replace::return:"""# 编译正则表达式:匹配1个或多个数字DIGITS = re.compile('\d+')new_list = []for q in ori_list:# re.sub用于替换字符串中的匹配项,相当于在q中查找连续的数字替换为#numberq = DIGITS.sub(replace, q)# 将处理后的问题字符串添加到新列表中new_list.append(q)return new_list
3.5 其他辅助函数
def createVocab(ori_list):"""创建词表,统计所有单词总数与每个单词总数:param ori_list:预处理后的列表:return:所有单词总数与每个单词总数"""count = 0vocab_count = dict()for q in ori_list:words = q.strip().split(' ')count += len(words)for w in words:if w in vocab_count:vocab_count[w] += 1else:vocab_count[w] = 1return vocab_count, count
def writeFile(oriList, filename):"""将处理后的问题列表写入到文件中:param oriList: 预处理后的问题列表:param filename: 文件名"""with codecs.open(filename, 'w', 'utf8') as Fout:for q in oriList:Fout.write(q + u'\n')
def writeVocab(vocabulary, filename):"""将词表写入到文件中:param vocabulary: 词表:param filename: 文件名"""sortedList = sorted(vocabulary.items(), key=lambda d: d[1])with codecs.open(filename, 'w', 'utf8') as Fout:for (w, c) in sortedList:Fout.write(w + u':' + str(c) + u'\n')
3.6 文本预处理流程
# 去掉一些无用的符号
new_list = tokenizer(qlist)
# 停用词过滤
new_list = removeStopWord(new_list)
# 数字处理-将数字替换为#number
new_list = replaceDigits(new_list)
# 创建词表并统计所有单词数目
vocabulary, count = createVocab(new_list)
# 去掉低频率的词
new_list = removeLowFrequence(new_list, vocabulary, 5)
# 重新统计词频
vocab_count, count = createVocab(new_list)
# 将词表写入到文件“train.vocab”中
writeVocab(vocab_count, "train.vocab")
qlist = new_list
4.文本表示
4.1 使用tf-idf表示向量
把qlist
中的每一个问题的字符串转换成tf-idf
向量, 转换之后的结果存储在X
矩阵里。X
的大小是: N* D
的矩阵。 这里N
是问题的个数(样本个数),D
是词典库的大小。
词袋模型常用TF-IDF来计算权重,公式为:
TF−IDF(t,d)=TF(t,d)×IDF(t)TF-IDF(t,d)=TF(t,d)×IDF(t)TF−IDF(t,d)=TF(t,d)×IDF(t)
其中TF(t,d)为单词t在文档d中出现的频率,IDF(t)是逆文档频率,用来衡量单词t对表达语义所起的重要性,表示为
IDF(t)=log[(文章总数)/(句含单词t的文章总数+1)]IDF(t)=log{[(文章总数)/(句含单词t的文章总数+1)]}IDF(t)=log[(文章总数)/(句含单词t的文章总数+1)]
直观的解释是,如果一个单词在非常多的文章里面都出现,那么它可能是一个比较通用的词汇,对于区分某篇文章特殊语义的贡献较小,因此对权重做一定惩罚。
下面代码展示TF-IDF的计算过程
import numpy as npdef computeTF(vocab, c):"""计算每次词的词频:TF:param vocab: 词频字典:键是单词,值是所有问句中单词出现的次数:param c: 单词总数:return: TF"""# 初始化TFTF = np.ones(len(vocab))# 词频字典word2id = dict()# 单词字典id2word = dict()for word, fre in vocab.items():# 计算TF值:每个单词出现的个数/总的单词个数TF[len(word2id)] = 1.0 * fre / cid2word[len(word2id)] = wordword2id[word] = len(word2id)return TF, word2id, id2word
def computeIDF(word2id, qlist):"""计算IDF:log[问句总数/(包含单词t的问句总数+1)]:param word2id:单词字典:param qlist:问句列表:return:"""IDF = np.ones(len(word2id))for q in qlist:# 去重words = set(q.strip().split())for w in words:# 统计单词出现在问句中的总数IDF[word2id[w]] += 1# 计算IDFIDF /= len(qlist)IDF = -1.0 * np.log2(IDF)return IDF
def computeSentenceEach(sentence, tfidf, word2id):"""给定句子,计算句子TF-IDF,tfidf是一个1*M的矩阵,M为词表大小:param sentence:句子:param tfidf:TF-IDF向量:param word2id:词表:return:"""sentence_tfidf = np.zeros(len(word2id))# 将问句以空格进行分割for w in sentence.strip().split(' '):if w not in word2id:continue# 碰到在词表word2id中的单词sentence_tfidf[word2id[w]] = tfidf[word2id[w]]return sentence_tfidfdef computeSentence(qlist, word2id, tfidf):"""把```qlist```中的每一个问题的字符串转换成```tf-idf```向量, 转换之后的结果存储在```X```矩阵里:param qlist: 问题列表:param word2id: 词表(字典形式):param tfidf: TF-IDF(与词表中的键一一对应):return: X矩阵"""# 对所有句子分别求tfidfX_tfidf = np.zeros((len(qlist), len(word2id)))for i, q in enumerate(qlist):X_tfidf[i] = computeSentenceEach(q, tfidf, word2id)# print(X_tfidf[i])return X_tfidf
# 计算每个词的TF,词表字典
TF, word2id, id2word = computeTF(vocab_count, count)
# 计算IDF:log[问句总数/(包含单词t的问句总数+1)]
IDF = computeIDF(word2id, qlist)
# 用TF,IDF计算最终的tf-idf,定义一个tf-idf的vectorizer
vectorizer = np.multiply(TF, IDF)
# 把```qlist```中的每一个问题的字符串转换成```tf-idf```向量, 转换之后的结果存储在```X```矩阵里
X_tfidf = computeSentence(qlist, word2id, vectorizer)
4.2 使用wordvec + average pooling
wordvec本质上是通过前馈神经网络的训练参数。从表现上来看,wordvec表征能力比TF-IDF表征能力强很多。词向量方面需要下载: https://nlp.stanford.edu/projects/glove/ (请下载glove.6B.zip
),并使用d=200
的词向量(200维),我资源里面也传了一份。这样获得的词向量是已经训练好的词向量。 每个词向量获取完之后,即可以得到一个句子的向量。 我们通过average pooling
来实现句子的向量。
from gensim.models import KeyedVectors
from gensim.scripts.glove2word2vec import glove2word2vecdef loadEmbedding(filename):"""加载glove模型,转化为word2vec,再加载到word2vec模型这两种模型形式上是一样的,在数据的保存形式上有略微的差异:param filename: glove文件:return: word2vec模型"""word2vec_temp_file = 'word2vec_temp.txt'# 加载glove模型,转化为word2vecglove2word2vec(filename, word2vec_temp_file)# 再加载到word2vec模型model = KeyedVectors.load_word2vec_format(word2vec_temp_file)return model
def computeGloveSentenceEach(sentence, embedding):""":param sentence:问题:param embedding:wordvec模型:return:"""# 查找句子中每个词的embedding,将所有embedding进行加和求均值emb = np.zeros(200)words = sentence.strip().split(' ')for w in words:# 如果单词是新词,则重命名为'unknown',当成特殊符号来使用if w not in embedding:# 没有lookup的即为unknownw = 'unknown'# 将所有embedding进行加和求均值emb += embedding.get_vector(w)# emb += embedding[w]return emb / len(words)def computeGloveSentence(qlist, embedding):"""对每一个句子来构建句子向量:param qlist:问题列表:param embedding:word2vec模型:return:"""# 对每一个句子进行求均值的embeddingX_w2v = np.zeros((len(qlist), 200))for i, q in enumerate(qlist):# 编码每一个问题X_w2v[i] = computeGloveSentenceEach(q, embedding)# print(X_w2v)return X_w2v
# 加载到word2vec模型
# 这是 D*H的矩阵,这里的D是词典库的大小, H是词向量的大小。 这里面我们给定的每个单词的词向量,
emb = loadEmbedding('data/glove.6B.200d.txt')
# 初始化完emb之后就可以对每一个句子来构建句子向量了,这个过程使用average pooling来实现
X_w2v = computeGloveSentence(qlist, emb)
4.3 使用BERT + average pooling
BERT与wordvec非常相似,只不过BERT是基于Transformer的深层神经网络来训练得到向量的表示,学到的效果比浅层的神经网络效果好。我们不做任何的训练,而是直接使用已经训练好的BERT embedding. 为了获取BERT-embedding,可以直接下载已经训练好的模型,从而获得每一个单词的向量。可以从这里获取: https://github.com/imgarylai/bert-embedding ,请使用bert_12_768_12
当然,你也可以从其他source获取也没问题,只要是合理的词向量。
5.相似度匹配与搜索
在这部分里,我们需要把用户每一个输入跟知识库里的每一个问题做一个相似度计算,从而得出最相似的问题。但对于这个问题,时间复杂度其实很高,所以我们需要结合倒排表来获取相似度最高的问题,从而获得答案。下面介绍几种最常见的相似度匹配算法的设计。
5.1 tf-idf + 余弦相似度
我们可以直接基于计算出来的tf-idf
向量,计算用户最新问题与库中存储的问题之间的相似度,从而选择相似度最高的问题的答案。这个方法的复杂度为O(N)
, N
是库中问题的个数。
import queue as Q# 优先级队列实现大顶堆Heap,每次输出都是相似度最大值
que = Q.PriorityQueue()def cosineSimilarity(vec1, vec2):# 定义余弦相似度,余弦相似度越大,两个向量越相似return np.dot(vec1, vec2.T) / (np.sqrt(np.sum(vec1 ** 2)) * np.sqrt(np.sum(vec2 ** 2)))def get_top_results_tfidf_noindex(query):"""给定用户输入的问题 query, 返回最有可能的TOP 5问题。:param query:用户新问题:return:""""""给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点:1. 对于用户的输入 query 首先做一系列的预处理(上面提到的方法),然后再转换成tf-idf向量(利用上面的vectorizer)2. 计算跟每个库里的问题之间的相似度3. 找出相似度最高的top5问题的答案"""top = 5# 将用户输入的新问题用tf-idf来表示query_tfidf = computeSentenceEach(query.lower(), vectorizer, word2id)for i, vec in enumerate(X_tfidf):# 计算原问题与用户输入的新问题的相似度result = cosineSimilarity(vec, query_tfidf)# print(result)# 存放到大顶堆里面que.put((-1 * result, i))i = 0top_idxs = [] while (i < top and not que.empty()):# top_idxs存放相似度最高的(存在qlist里的)问题的下标top_idxs.append(que.get()[1])i += 1print(top_idxs)# 返回相似度最高的问题对应的答案,作为TOP5答案return np.array(alist)[top_idxs] # 给定用户输入的问题 query, 返回最有可能的TOP 5问题的答案。
results = get_top_results_tfidf_noindex('In what city and state did Beyonce grow up')
print(results)
你会发现上述的程序很慢,没错! 是因为循环了所有库里的问题。为了优化这个过程,我们需要使用一种数据结构叫做倒排表
。 使用倒排表我们可以把单词和出现这个单词的文档做关键。 之后假如要搜索包含某一个单词的文档,即可以非常快速的找出这些文档。 在这个QA系统上,我们首先使用倒排表来快速查找包含至少一个单词的文档,然后再进行余弦相似度的计算,即可以大大减少时间复杂度
。
5.2 倒排索引表的创建
倒排表可以用于加快匹配的过程。倒排表的创建其实很简单,最简单的方法就是循环所有的单词一遍,然后记录每一个单词所出现的文档,然后把这些文档的ID保存成list即可。我们可以定义一个类似于hash_map
, 比如 inverted_index = {}
, 然后存放包含每一个关键词的文档出现在了什么位置,也就是,通过关键词的搜索首先来判断包含这些关键词的文档(比如出现至少一个),然后对于candidates问题做相似度比较。
倒排索引表基于一个强假设:两个句子包含的相同的词越多,两个句子越相似。实际上会漏掉一些句子,相同的词不多,但是句子相似。
word_doc = dict()
# key:word,value:包含该词的句子序号的列表
for i, q in enumerate(qlist):words = q.strip().split(' ')for w in set(words):if w not in word_doc:# 没在word_doc中的,建立一个空listiword_doc[w] = set([])word_doc[w] = word_doc[w] | set([i])# 定一个一个简单的倒排表,是一个map结构。 循环所有qlist一遍就可以
inverted_idx = word_doc
5.3 语义相似度
语义的相似度,可以这么理解: 两个单词比如car, auto这两个单词长得不一样,但从语义上还是类似的。如果只是使用倒排表,我们不能考虑到这些单词之间的相似度,这就导致如果我们搜索句子里包含了car
, 则我们没法获取到包含auto的所有的文档。所以我们希望把这些信息也存下来。那这个问题如何解决呢? 其实也不难,可以提前构建好相似度的关系,比如对于car
这个单词,一开始就找好跟它意思上比较类似的单词比如top 10,这些都标记为related words
。所以最后我们就可以创建一个保存related words
的一个map
. 比如调用related_words['car']
即可以调取出跟car
意思上相近的TOP 10的单词。那这个related_words
又如何构建呢? 在这里我们仍然使用Glove
向量,然后计算一下俩俩的相似度(余弦相似度)。之后对于每一个词,存储跟它最相近的top 10单词,最终结果保存在related_words
里面。 这个计算需要发生在离线,因为计算量很大,复杂度为O(V*V)
, V是单词的总数。这个结果保存在related_words.txt
里。 我们在使用的时候直接从文件里读取就可以了,不用再重复计算。
# 读取语义相关的单词
import codecsdef get_related_words(filename):"""从预处理的相似词的文件加载相似词信息文件格式w1 w2 w3..w11,其中w1为原词,w2-w11为w1的相似词:param filename: 文件名:return:"""related_words = {}with codecs.open(filename, 'r', 'utf8') as Fin:lines = Fin.readlines()for line in lines:words = line.strip().split(' ')# 键为原词,值为相似词related_words[words[0]] = words[1:]return related_words# 从预处理的相似词的文件加载相似词信息
related_words = get_related_words('data/related_words.txt')
5.4 利用倒排索引表进行搜索
在这里,我们使用倒排表先获得一批候选问题,然后再通过余弦相似度做精准匹配,这样一来可以节省大量的时间。搜索过程分成两步:
- 使用倒排表把候选问题全部提取出来。首先,对输入的新问题做分词等必要的预处理工作,然后对于句子里的每一个单词,从
related_words
里提取出跟它意思相近的top 10单词, 然后根据这些top词从倒排表里提取相关的文档,把所有的文档返回。 这部分可以放在下面的函数当中,也可以放在外部。 - 然后针对于这些文档做余弦相似度的计算,最后排序并选出最好的答案。
import queue as Qdef cosineSimilarity(vec1, vec2):# 定义余弦相似度return np.dot(vec1, vec2.T) / (np.sqrt(np.sum(vec1 ** 2)) * np.sqrt(np.sum(vec2 ** 2)))def getCandidate(query):"""根据查询句子中每个词及其10个相似词所在的序号列表,求交集:param query: 问题:return:"""searched = set()for w in query.strip().split(' '):# 如果单词不在word2id中或者不在倒排表中if w not in word2id or w not in inverted_idx:continue# 搜索原词所在的序号列表if len(searched) == 0:searched = set(inverted_idx[w])else:searched = searched & set(inverted_idx[w])# 搜索相似词所在的列表if w in related_words:for similar in related_words[w]:searched = searched & set(inverted_idx[similar])return searched
下面对三种编码进行余弦相似度的计算,最后排序并选出最好的答案。
def get_top_results_tfidf(query):"""基于TF-IDF,给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点:1. 利用倒排表来筛选 candidate (需要使用related_words).2. 对于候选文档,计算跟输入问题之间的相似度3. 找出相似度最高的top5问题的答案"""top = 5# 计算给定句子的TF-IDFquery_tfidf = computeSentenceEach(query, vectorizer, word2id)# 优先级队列实现大顶堆Heap,每次输出都是相似度最大值results = Q.PriorityQueue()# 利用倒排索引表获取相似问题searched = getCandidate(query)# print(len(searched))for candidate in searched:# 计算candidate与query的余弦相似度result = cosineSimilarity(query_tfidf, X_tfidf[candidate])# 优先级队列中保存相似度和对应的candidate序号# -1保证降序results.put((-1 * result, candidate))i = 0# top_idxs存放相似度最高的(存在qlist里的)问题的索引top_idxs = []# hint: 利用priority queue来找出top results.while i < top and not results.empty():top_idxs.append(results.get()[1])i += 1# 返回相似度最高的问题对应的答案,作为TOP5答案return np.array(alist)[top_idxs]def get_top_results_w2v(query):"""基于word2vec,给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点:1. 利用倒排表来筛选 candidate (需要使用related_words).2. 对于候选文档,计算跟输入问题之间的相似度3. 找出相似度最高的top5问题的答案"""# embedding用glovetop = 5# 利用glove将问题进行编码query_emb = computeGloveSentenceEach(query, emb)# 优先级队列实现大顶堆Heap,每次输出都是相似度最大值results = Q.PriorityQueue()# 利用倒排索引表获取相似问题searched = getCandidate(query)for candidate in searched:# 计算candidate与query的余弦相似度result = cosineSimilarity(query_emb, X_w2v[candidate])# 优先级队列中保存相似度和对应的candidate序号# -1保证降序results.put((-1 * result, candidate))# top_idxs存放相似度最高的(存在qlist里的)问题的索引top_idxs = []i = 0# hint: 利用priority queue来找出top results.while i < top and not results.empty():top_idxs.append(results.get()[1])i += 1# 返回相似度最高的问题对应的答案,作为TOP5答案return np.array(alist)[top_idxs]def get_top_results_bert(query):"""给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点:1. 利用倒排表来筛选 candidate (需要使用related_words).2. 对于候选文档,计算跟输入问题之间的相似度3. 找出相似度最高的top5问题的答案"""top = 5# embedding用Bert embeddingquery_emb = np.sum(bert_embedding([query], 'sum')[0][1], axis=0) / len(query.strip().split())# 优先级队列实现大顶堆Heap,每次输出都是相似度最大值results = Q.PriorityQueue()# 利用倒排索引表获取相似问题searched = getCandidate(query)for candidate in searched:# 计算candidate与query的余弦相似度result = cosineSimilarity(query_emb, X_bert[candidate])# 优先级队列中保存相似度和对应的candidate序号# -1保证降序results.put((-1 * result, candidate))# top_idxs存放相似度最高的(存在qlist里的)问题的索引top_idxs = []i = 0# hint: 利用priority queue来找出top results.while i < top and not results.empty():top_idxs.append(results.get()[1])i += 1# 返回相似度最高的问题对应的答案,作为TOP5答案return np.array(alist)[top_idxs]
6.拼写纠错
这一部分并不是文本匹配的后续部分,但在问答系统中是经常用到的,这是因为考虑到用户输入是不一定正确的。这个额外的技术手段用来保证query的正确性。其实用户在输入问题的时候,不能期待他一定会输入正确,有可能输入的单词的拼写错误的。这个时候我们需要后台及时捕获拼写错误,并进行纠正,然后再通过修正之后的结果再跟库里的问题做匹配。这里我们需要实现一个简单的拼写纠错的代码,然后自动去修复错误的单词。
这里使用的拼写纠错方法是使用noisy channel model
。 我们回想一下它的表示:
c∗=argmaxc∈candidatesp(c∣s)=argmaxc∈candidatesp(s∣c)p(c)c^* = \text{argmax}_{c\in candidates} ~~p(c|s) = \text{argmax}_{c\in candidates} ~~p(s|c)p(c)c∗=argmaxc∈candidatesp(c∣s)=argmaxc∈candidatesp(s∣c)p(c)
这里的candidates
指的是针对于错误的单词的候选集,这部分我们可以假定是通过edit_distance
来获取的(比如生成跟当前的词距离为1/2的所有的valid 单词。 valid单词可以定义为存在词典里的单词。 c
代表的是正确的单词, s
代表的是用户错误拼写的单词。 所以我们的目的是要寻找出在candidates
里让上述概率最大的正确写法c
。 p(s∣c)p(s|c)p(s∣c),这个概率我们可以通过历史数据来获得,也就是对于一个正确的单词ccc, 有百分之多少人把它写成了错误的形式1,形式2… 这部分的数据可以从spell_errors.txt
里面找得到。但在这个文件里,我们并没有标记这个概率,所以可以使用uniform probability
来表示。这个也叫做channel probability
。 p(c)p(c)p(c),这一项代表的是语言模型,也就是假如我们把错误的sss,改造成了ccc, 把它加入到当前的语句之后有多通顺?在本次项目里我们使用bigram来评估这个概率。
举个例子: 假如有两个候选 c1,c2c_1, c_2c1,c2, 然后我们希望分别计算出这个语言模型的概率。 由于我们使用的是
bigram
, 我们需要计算出两个概率,分别是当前词前面和后面词的bigram
概率。 用一个例子来表示:
给定:We are go to school tomorrow
, 对于这句话我们希望把中间的go
替换成正确的形式,假如候选集里有个,分别是going
,went
, 这时候我们分别对这俩计算如下的概率:p(going∣are)p(to∣going)p(going|are)p(to|going)p(going∣are)p(to∣going)和 p(went∣are)p(to∣went)p(went|are)p(to|went)p(went∣are)p(to∣went), 然后把这个概率当做是p(c)p(c)p(c)的概率。 然后再跟channel probability
结合给出最终的概率大小。那这里的p(are∣going)p(are|going)p(are∣going)这些bigram概率又如何计算呢?答案是训练一个语言模型! 但训练一个语言模型需要一些文本数据,这个数据怎么找? 在这次项目里我们会用到nltk
自带的reuters
的文本类数据来训练一个语言模型。当然,如果你有资源你也可以尝试其他更大的数据。最终目的就是计算出bigram
概率。
6.1 训练一个语言模型
在这里,我们使用nltk
自带的reuters
数据来训练一个语言模型。 使用add-one smoothing
。
from nltk.corpus import reuters
import numpy as np
import codecs# 读取语料库的数据
categories = reuters.categories()
corpus = reuters.sents(categories=categories)
# 循环所有的语料库并构建bigram probability.
# bigram[word1][word2]: 在word1出现的情况下下一个是word2的概率。
new_corpus = []
for sent in corpus:# 句子前后加入<s>,</s>表示开始和结束new_corpus.append(['<s> '] + sent + [' </s>'])
print(new_corpus[0])
word2id = dict()
id2word = dict()
# 构建word2id与id2word
for sent in new_corpus:for w in sent:w = w.lower()if w in word2id:continueid2word[len(word2id)] = wword2id[w] = len(word2id)
# 词表尺寸
vocab_size = len(word2id)
# 初始化
# 统计单个词出现的概率
count_uni = np.zeros(vocab_size)
# 统计两个词相邻出现的概率
count_bi = np.zeros((vocab_size, vocab_size))
for sent in new_corpus:for i, w in enumerate(sent):w = w.lower()count_uni[word2id[w]] += 1if i < len(sent) - 1:count_bi[word2id[w], word2id[sent[i + 1].lower()]] += 1
print("unigram done")
# 建立bigram模型
bigram = np.zeros((vocab_size, vocab_size))
# 计算bigram LM
# 有bigram统计值的加一除以|vocab|+uni统计值,没有统计值,1 除以 |vocab|+uni统计值
for i in range(vocab_size):for j in range(vocab_size):if count_bi[i, j] == 0:bigram[i, j] = 1.0 / (vocab_size + count_uni[i])else:bigram[i, j] = (1.0 + count_bi[i, j]) / (vocab_size + count_uni[i])# 得到bigram概率
def checkLM(word1, word2):if word1.lower() in word2id and word2.lower() in word2id:return bigram[word2id[word1.lower()], word2id[word2.lower()]]else:return 0.0
6.2 计算channel probability
这个概率我们可以通过历史数据来获得,也就是对于一个正确的单词ccc, 有百分之多少人把它写成了错误的形式1,形式2… 这部分的数据可以从spell_errors.txt
里面找得到。但在这个文件里,我们并没有标记这个概率,所以可以使用uniform probability
来表示。这个也叫做channel probability
。下面基于spell_errors.txt
文件构建channel probability
, 其中channel[c][s]channel[c][s]channel[c][s]表示正确的单词ccc被写错成sss的概率。
channel = {}
# 读取文件,格式为w1:w2,w3..
# w1为正确词,w2,w3...为错误词
# 没有给出不同w2-wn的概率,暂时按等概率处理
for line in open('spell-errors.txt'):# 按':'为分隔符进行分割(correct, error) = line.strip().split(':')# 错误单词按照‘,’进行分割errors = error.split(',')errorProb = dict()for e in errors:# 键:错误单词;值:错误概率errorProb[e.strip()] = 1.0 / len(errors)# 键:正确单词,值:错误字典channel[correct.strip()] = errorProb
6.3 根据错别字生成所有候选集合
给定一个错误的单词,首先生成跟这个单词距离为1或者2的所有的候选集合。
def filter(words):# 将不在词表中的词过滤new_words = []for w in words:if w in word2id:new_words.append(w)return set(new_words)def generate_candidates1(word):# 生成DTW距离为1的词,# 对于英语来说,插入,替换,删除26个字母chars = 'abcdefghijklmnopqrstuvwxyz'words = set([])# insert 1words = set(word[0:i] + chars[j] + word[i:] for i in range(len(word)) for j in range(len(chars)))# sub 1words = words | set(word[0:i] + chars[j] + word[i + 1:] for i in range(len(word)) for j in range(len(chars)))# delete 1words = words | set(word[0:i] + word[i + 1:] for i in range(len(chars)))# 交换相邻# print(set(word[0:i - 1] + word[i] + word[i - 1] + word[i + 1:] for i in range(1,len(word))))words = words | set(word[0:i - 1] + word[i] + word[i - 1] + word[i + 1:] for i in range(1, len(word)))# 将不在词表中的词去掉words = filter(words)# 去掉word本身if word in words:words.remove(word)return wordsdef generate_candidates(word):# 基于拼写错误的单词,生成跟它的编辑距离为1或者2的单词,并通过词典库的过滤,只留写法上正确的单词。words = generate_candidates1(word)words2 = set([])for word in words:# 将距离为1词,再分别计算距离为1的词,# 作为距离为2的词候选words2 = generate_candidates1(word)# 过滤掉不在词表中的词words2 = filter(words)# 距离为1,2的词合并列表words = words | words2return words# 生成候选集合
words = generate_candidates('strat')
print(words)
6.4 给定一个输入,如果有错误给予纠正
给定一个输入query
, 如果这里有些单词是拼错的,就需要把它纠正过来。这部分的实现可以简单一点: 对于query
分词,然后把分词后的每一个单词在词库里面搜一下,假设搜不到的话可以认为是拼写错误的! 如果拼写错误了再通过channel
和bigram
来计算最适合的候选。
import numpy as np
import queue as Qdef word_corrector(word, context):"""在候选词中基于上下文找到概率最大的词:param word::param context::return:"""word = word.lower()# 生成跟这个单词距离为1或者2的所有的候选集合candidate = generate_candidates(word)if len(candidate) == 0:return word# 优先级队列实现大顶堆Heap,每次输出都是相似度最大值correctors = Q.PriorityQueue()# 通过``channel``和``bigram``来计算最适合的候选for w in candidate:# 如果候选单词在channel中并且正确单词在它的错误单词字典中,候选单词,文本的第一个第二个单词均在词表中if w in channel and word in channel[w] and w in word2id and context[0].lower() in word2id and context[1].lower() in word2id:# 相乘转化为log是相加# $c^* = \text{argmax}_{c\in candidates} ~~p(c|s) = \text{argmax}_{c\in candidates} ~~p(s|c)p(c)$# 其中channel部分是p(c),bigram部分是p(s|c)probility = np.log(channel[w][word] + 0.0001) + np.log(bigram[word2id[context[0].lower()], word2id[w]]) + np.log(bigram[word2id[context[1].lower()], word2id[w]])# 加入大顶堆correctors.put((-1 * probility, w))if correctors.empty():return word# 返回最大概率的候选词return correctors.get()[1]word = word_corrector('strat', ('to', 'in'))
print(word)def spell_corrector(line):"""对拼写错误进行修正1. 首先做分词,然后把``line``表示成``tokens``2. 循环每一token, 然后判断是否存在词库里。如果不存在就意味着是拼写错误的,需要修正。修正的过程就使用上述提到的``noisy channel model``, 然后从而找出最好的修正之后的结果。:param line::return:"""new_words = []words = ['<s>'] + line.strip().lower().split(' ') + ['</s>']for i, word in enumerate(words):# 如果索引是尾部索引为break,因为尾部是'</s>'if i == len(words) - 1:breakword = word.lower()if word not in word2id:# 认为错误,需要修正,句子前后加了<s>,</s># 不在词表中词,肯定位于[1,len - 2]之间# 如果单词不在词表中,则说明需要修正,将修正后的词添加到new_words中new_words.append(word_corrector(word, (words[i - 1].lower(), words[i + 1].lower())))else:new_words.append(word)# 修正后的结果newline = ' '.join(new_words[1:])# 返回修正之后的结果,假如用户输入没有问题,那这时候``newline = line``return newlinesentence = spell_corrector('When did Beyonce strat becoming popular')
print(sentence)
6.5 基于拼写纠错算法,实现用户输入自动矫正
首先有了用户的输入query
, 然后做必要的处理把句子转换成tokens的形状,然后对于每一个token比较是否是valid, 如果不是的话就进行下面的修正过程。
test_query1 = "When did Beyonce strat becoming popular" # 拼写错误的-'start'
# result:in the late 1990s
test_query2 = "What counted for more of the poplation change" # 拼写错误的-'population'
# result:births and deaths
test_query1 = spell_corrector(test_query1)
test_query2 = spell_corrector(test_query2)
print(test_query1)
print(test_query2)
7.总结
在本次项目中我们实现了一个简易的问答系统。基于这个项目,我们其实可以有很多方面的延伸。
- 在这里,我们使用文本向量之间的余弦相似度作为了一个标准。但实际上,我们也可以基于基于包含关键词的情况来给一定的权重。比如一个单词跟related word有多相似,越相似就意味着相似度更高,权重也会更大。
- 另外 ,除了根据词向量去寻找
related words
也可以提前定义好同义词库,但这个需要大量的人力成本。 - 在这里,我们直接返回了问题的答案。 但在理想情况下,我们还是希望通过问题的种类来返回最合适的答案。 比如一个用户问:“明天北京的天气是多少?”, 那这个问题的答案其实是一个具体的温度(其实也叫做实体),所以需要在答案的基础上做进一步的抽取。这项技术其实是跟信息抽取相关的。
- 对于词向量,我们只是使用了
average pooling
, 除了average pooling,我们也还有其他的经典的方法直接去学出一个句子的向量。 - 短文的相似度分析一直是业界和学术界一个具有挑战性的问题。在这里我们使用尽可能多的同义词来提升系统的性能。但除了这种简单的方法,可以尝试其他的方法比如WMD,或者适当结合parsing相关的知识点。
如果对您有帮助,麻烦点赞关注,这真的对我很重要!!!如果需要互关,请评论或者私信!
nlp项目:搭建一个简单的问答系统相关推荐
- 搭建一个简单的问答系统(v2.0)
之前刚接触机器学习的时候,写过一篇<基于sklearn库,搭建一个简单的问答系统>.此篇文章是在上篇的逻辑上,对一些函数进行了优化,并对检索方式进行了一些优化,再各个环节上时间复杂度都提高 ...
- 1-3.Win10系统利用Pycharm社区版安装Django搭建一个简单Python Web项目的步骤之三
在1-1.Win10系统利用Pycharm社区版安装Django搭建一个简单Python Web项目的步骤之一 基础上进行如下操作: 所有路由不能全部都在myDjango下的urls.py路由文件中, ...
- eclipse maven项目 class类部署不到tomcat下_Springboot介绍以及用Eclipse搭建一个简单的Springboot项目教程
简述 本文主要介绍Springboot以及用Eclipse搭建一个简单的Springboot项目. Springboot简介 Springboot是由Pivotal团队提供的全新框架,其设计目的是用来 ...
- 测试开发——搭建一个简单 web服务(flask框架基础)项目实战
搭建一个简单 web服务-flask框架 一.什么是wsgi? 二.搭建一个简单 web服务 三.扩展 四.请求加参数的情况 五.安装flask 一.什么是wsgi? wsgi是webserver和a ...
- 使用SpringBoot搭建一个简单的webSocket服务
前言 个人地址:使用SpringBoot搭建一个简单的webSocket服务 什么是WebSocket? WebSocket是一个HTML5新增的协议,它的目的在浏览器和服务器之间建立一个不受限的双向 ...
- SpringBoot + Dubbo + Zookeeper搭建一个简单的分布式服务
本文使用SpringBoot + Dubbo + Zookeeper 来搭建一个简单的分布式服务 文章目录 dubbo-spring-boot-starter 如何发布 Dubbo 服务 如何消费 D ...
- java分布式dubbo_Dubbo剖析-搭建一个简单的分布式系统(1)
一.前言 随着阿里巴巴开源的分布式RPC框架Dubbo成为Apache开源卵化器项目,Dubbo有火了一把.在接下来的一段时间本公众号将会时不时的发布一些dubbo使用与原理剖析的文章. image. ...
- 搭建一个简单的FAQ系统
现在的智能问答系统的应用是非常普遍的,比如说客服,前台机器人,讲解机器人等很多场景都可能会用到FAQ问答系统,所谓的FAQ就是 frequently asked questions,也就是说在某个场景 ...
- Python后端---使用Django+Mysql搭建一个简单的网站
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言 一.项目搭建 前期准备 命令行创建项目和app Django项目配置文件说明 使用数据库 编写业务逻辑 配置url主路 ...
- SpringBoot搭建一个简单的天气预报系统(一)
章节目录 1. 前言 2. 数据来源 3. 实战 3.1 开发环境 3.2 功能需求 3.3 手动编码 3.3.1 vo层 3.3.2 service层 3.3.3 controller层 3.3.4 ...
最新文章
- Apache Flink 漫谈系列 - JOIN 算子
- 【CodeForces - 689B】Mike and Shortcuts(Dijkstra最短路,或者bfs跑状态类似spfa)
- php 查找无限级,Ztree + PHP 无限级节点 递归查找节点法
- python字符串查找某个字符_python的字符串
- Daily scrum 11.22
- dubbo : Invalid multicast address 127.0.0.1, scope: 224.0.0.0 - 239.255.255.255
- Hibernate学习笔记!
- python中copy和deepcopy的区别_python里shadowcopy和deepcopy的区别
- opencl初探-sobel检测
- poj 1823 Hotel 线段树,注意懒惰标记,不标记就会超时滴
- 网页版bpc电波对时_BPC电波对时app
- H5自适应简约浪漫婚礼邀请函HTML源码
- thinkpad T480安装WIN7系统NVM固态硬盘+INTEL HD620显卡
- 《隐私保护周三见》精彩50问 | 交流群互动合集
- 母亲节为什么要定在5月的第二个星期日? [节假日]
- 贡献一个fisco-bcos-browser-front基于官方的代码改造的兼容手机浏览器和pc浏览器
- Gdevops广州站:主流数据库的选型、架构设计与迁移实战,一网打尽!
- OneNET麒麟座应用开发之四:数据上传测试
- 如何修改PDF并调整页面尺寸大小
- WWDC20 苹果发布会