数据科学与工程算法课程项目-文本摘要
摘要
该项目主要实现了文本摘要,以最大化子模函数为目标,从专题下的电影评论中抽取100条作为摘要。
解决问题的方法主要使用基于贪心的爬山算法,通过搜索资料和阅读论文,了解到了一些针对贪心算法的优化方法,能够提高算法的效率,并实现了模拟退火算法进行对照实验。该项目对这些方法进行了分析、实现和比较。
The project mainly implements text summarization, with the goal of maximizing the sub-module function, and extracts 100 movie reviews from the topic as summaries.
The method to solve the problem mainly uses the hill climbing algorithm based on greedy. Through searching information and reading papers, I have learned some optimization methods for the greedy algorithm, which can improve the efficiency of the algorithm. This project analyzes, implements and compares these methods.
项目概述
任务描述
从Web爬取某一主题的文本数据
对爬取到的文本库进行预处理,以提取关键词。预处理技术如:分词、去除停用词、stemming等
实现基于最大子覆盖问题的文本摘要算法,并从语料库中抽取100条摘要(集合元素可以是word或k-gram)
实验报告中应说明摘要算法的性能,如执行时间、摘要的质量等
实验任务的核心是子模函数最大化算法的实现和改进
实验要求
- 禁止直接调包实现,直接调包实验0分;
- 代码编写语言不做要求,可使用python, C, java等;
- 实验完成后,每人均需提交源代码;
- 严禁相互抄袭,一旦发现双方实验成绩均为0分。
问题描述
本项目数据集来源为从IMDb上爬取约10,000条的电影评论,在对爬取到的文本库进行预处理后,以最大化子模函数为目标从语料库抽取100条摘要。
方法
数据来源
数据来源为IMDb上爬取的电影《肖申克的救赎》中的9,196条评论,语种为英语。经过分句和分词后,共包含44,638句,17,127个词项。
原始数据集如下:
文本预处理
利用python中nltk模块进行文本预处理,包括
分词
大小写转换
- 去除停用词
- 去除标点符号
- 单词变体还原
- 提取词根
def tokenizor(s):
# 分词
token_words = word_tokenize(s)
# 大小写转换
token_words = [word.lower() for word in token_words]
# 去除停用词
token_words = [word for word in token_words if word not in stopwords.words('english')]
# 去除标点符号
token_words = [word for word in token_words if word not in characters]
# 单词变体还原
token_words = [wordnet_lematizer.lemmatize(word) for word in token_words]
# 提取词根
token_words = [lancaster_stemmer.stem(word) for word in token_words]
return token_words
- 生成词典并将句子转化为集合
def readData():
f = open('cleand_review.txt',mode='r',encoding='utf-8')
lines = f.readlines()
sentences = ['' for _ in range(len(lines))]
words = [[] for _ in range(len(lines))]
X = []
wordDict = defaultdict()
elements = set()
for i in range(len(lines)):
line = lines[i]
line = line.strip('\n').split('\t')
sentences[i] = line[0]
words[i] = line[1].split(',')[:-1]
for word in words[i]:
elements.add(word)
elements = np.sort(list(elements))
labels = np.arange(len(elements)).astype(int)
for i in range(len(elements)):
wordDict[str(elements[i])] = labels[i]
for i in range(len(words)):
temp = set()
for j in range(len(words[i])):
temp.add(wordDict[words[i][j]])
X.append(temp)
return sentences, wordDict, X, len(elements)
预处理后的语料如下:
模拟退火算法
解决最大化子模函数的常用算法有爬山算法,为了评估爬山算法的效果和性能,首先通过模拟退火算法求解,评估其摘要质量和算法性能。
实现过程如下:
每次随机调整选择的摘要,若调整后的解更优则选择新解,若新解更劣,计算当前解和新解之差$\Delta$,以概率$e^{-\frac{\Delta}{T}}$的概率接受新解。
def SimulatedAnnealing(X, wordNum, initial_t, ratio, eps):
sentenceNum = len(X)
s_vis = np.zeros(sentenceNum)
w_vis = np.zeros(wordNum)
pick_list = []
pick_word = set()
temperature = initial_t
for i in range(100):
j = np.random.randint(sentenceNum)
while j in pick_list:
j = np.random.randint(sentenceNum)
pick_list.append(j)
pre_val = get_coverage(pick_list, X)
T = initial_t
epoch = 0
while(T > eps):
epoch += 1
cur_pick_list = pick_list.copy()
for _ in range(1):
i = cur_pick_list[np.random.randint(100)]
j = np.random.randint(sentenceNum)
while j in cur_pick_list:
j = np.random.randint(sentenceNum)
cur_pick_list.remove(i)
cur_pick_list.append(j)
cur_val = get_coverage(cur_pick_list, X)
diff = cur_val - pre_val
if (diff > 0):
pick_list = cur_pick_list
pre_val = cur_val
else:
pass
if (np.random.random() < np.exp(diff / T)):
pick_list = cur_pick_list
pre_val = cur_val
# print(pre_val, T)
T *= ratio
朴素Greedy算法
通过朴素的贪心算法解决最大子覆盖问题。我们的目标是找到一个元素个数不大于$k$的集合$A$,使得$f(A)$尽可能的大。
设全集为$V$,朴素贪心算法每次从$V-A$中找到一个使得$f(A)$增长最大的元素$(e\in V-A使得f(A\cup \{e\}最大)$,进行k次,得到一个大小为$k$的近似解$A$
时间复杂度分析:算法需要迭代k次,每次需要在全集中扫描,每次扫描计算$f(A\cup \{e\})$,计算$f(A\cup\{e\})$的代价至多为词项个数$w$,即时间复杂度为$O(knw)$。
def HillClimbing1(X, wordNum):
sentenceNum = len(X)
pick_list = []
pick_word = set()
pre_cov = 0
for _ in range(100):
mx = 0
mxp = 0
for i in range(sentenceNum):
if i in pick_list:
continue
new_pick_word = get_union(pick_word, X[i])
if (len(new_pick_word) - len(pick_word)) > mx:
mx = len(new_pick_word) - len(pick_word)
mxp = i
pick_list.append(mxp)
pick_word = get_union(pick_word, X[mxp])
return pick_list, pick_word
改进Greedy算法
在朴素Greedy算法的基础上,为了计算$f(A\cup\{e\})$,需要对两个集合取交集。为了减少这部分的计算开销,可以维护一个数组n_effective_words
,记录每个句子中还未被覆盖的词项的个数,用该数组的值$O(1)$代替交集运算。每次将一个新的元素加入集合后更新数组,其均摊复杂度为$O(nq)$,$q$为每句句子的平均词项个数,整体时间复杂度优化为$O(nk+nq)$。
def HillClimbing2(X, wordNum):
sentenceNum = len(X)
pick_list = []
pick_word = set()
pre_cov = 0
n_effective_words = np.zeros(sentenceNum)
doc_for_word = defaultdict(list)
for i in range(sentenceNum):
n_effective_words[i] = len(X[i])
for x in X[i]:
doc_for_word[x].append(i)
for _ in range(100):
mx = 0
mxp = 0
for i in range(sentenceNum):
if i in pick_list:
continue
if (n_effective_words[i]) > mx:
mx = n_effective_words[i]
mxp = i
pick_list.append(mxp)
for x in X[mxp]:
if x not in pick_word:
pick_word.add(x)
for doc in doc_for_word[x]:
n_effective_words[doc] -= 1
return pick_list, pick_word
Stochastic-Greedy算法
Stochastic-Greedy算法每次迭代不是从$V-A$中遍历查找,而是从$V-A$中随机采样出一个子集$R$,遍历$R$查找。该算法来自AAAI2015的一篇论文 Lazier Than Lazy Greedy ,第一作者是来自ETH Zurich 的 Baharan Mirzasoleiman,论文中证明,如果$R$的大小设为$\frac{n}{k}\log\frac{1}{\epsilon}$,那么Stochastic-Greedy 算法可达到$1-1/e-\epsilon$的近似程度。该算法的时间复杂度为$O(n\log(1/\epsilon)+nq)$。
def HillClimbing3(X, wordNum):
sentenceNum = len(X)
k = 100
pick_list = []
pick_word = set()
n_effective_words = np.zeros(sentenceNum)
doc_for_word = defaultdict(list)
for i in range(sentenceNum):
n_effective_words[i] = len(X[i])
for x in X[i]:
doc_for_word[x].append(i)
eps = 0.05
R = int(sentenceNum / k * np.log(1 / eps)) + 1
print(R)
for _ in range(k):
mx = 0
mxp = 0
for _ in range(R):
i = np.random.randint(sentenceNum)
if i in pick_list:
continue
if (n_effective_words[i]) > mx:
mx = n_effective_words[i]
mxp = i
pick_list.append(mxp)
for x in X[mxp]:
if x not in pick_word:
pick_word.add(x)
for doc in doc_for_word[x]:
n_effective_words[doc] -= 1
return pick_list, pick_word
Lazy-Greedy加速
该方法在论文Cost-effective Outbreak Detection in Networks中被提出。利用了子模函数的单调性优化贪心算法。
Lazy-Greedy算法在每次迭代不一定会计算所有句子的增益分数。而是先将$V-A$中第$i-1$个迭代计算后的分数列表先进行由大到小的排序,在选取最大值对应的句子后,增益分数第二大的句子就成为了第$i$个迭代中的最大值,假设为$a$。在第$i$次迭代计算时,优先对$a$先进行增益分数的更新计算,根据子模函数的单调性可知,若更新后$a$的增益分数仍然最大,则$a$一定是目前集合$V-A$的最大值,不需要再对其他元素进行计算。
Lazy greedy算法虽然在一定程度上提升了贪心算法的效率,但是它本质上却没有改变贪心算法的复杂度。在最坏情况下,每轮迭代中lazy greedy仍然需要计算所有$V-A$中句子的增益分数。
另一方面,在实现过程中,每轮迭代前首先要对所有$V-A$中句子的增益分数排序,若直接排序的开销通常为$O(n\log n)$,因此需要使用更加巧妙的排序方法减少开销,难度较高,否则其时间复杂度会比不使用该优化方法更劣,因此本项目只对Lazy-Greedy方法作了解,并没有在代码层面上实现。
实验结果
摘要结果:
本次实验对朴素Greedy算法、改进Greedy算法和Stochastic-Greedy算法进行了实现与测试,对比三者的运行效率和摘要效果。
模拟退火算法 | 朴素Greedy算法 | 改进Greedy算法 | Stochastic-Greedy算法 | |
---|---|---|---|---|
执行时间(s) | 37.95 | 145.768 | 4.328 | 0.854 |
摘要覆盖大小 | 2101 | 2831 | 2831 | 2719 |
摘要覆盖率 | 12.70% | 16.52% | 16.52% | 15.87% |
可见爬山算法的摘要质量效果远好于模拟退火算法,说明爬山算法在求解近似解时有较好的效果。
相较于朴素Greedy算法,改进Greedy算法在不影响摘要效果的情况下大幅度减少了算啊的执行时间。Stochastic-Greedy在保证一定摘要质量的前提下大幅度提高算法执行效率。
因此可以认为,改进Greedy算法和Stochastic-Greedy算法适合不同的应用场景。某些对摘要质量要求较高的场景适合使用改进Greedy算法,而对摘要生成效率要求较高的场景可以考虑使用Stochastic-Greedy算法进行文本摘要。
结论
本实验通过最大化子模函数实现的文本摘要,实验内容包括了数据爬取,数据清洗和预处理,以及通过基于贪心的爬山算法求近似解,包含了采集、加工、分析等完整的数据流程。
对于文本摘要问题,本次实验实现了三种不同的贪心算法对问题求近似解,其中改进Greedy算法是对朴素Greedy算法在实现方式上的优化,在不影响最终解的情况下优化算法的时间复杂度;Stochastic-Greedy则参考了论文Lazier Than Lazy Greedy中的实现和证明,论文中证明,如果$R$的大小设为$\frac{n}{k}\log\frac{1}{\epsilon}$,那么Stochastic-Greedy 算法可达到$1-1/e-\epsilon$的近似程度。同时也对Lazy greedy算法进行了分析和了解,正确实现的Lazy greedy算法相较于改进Greedy算法可以在常数上有一定优化,但实现难度较高,因此未在代码层面实现。
代码
scrapy/imdb_reviews.py
import re
import requests
import time
import csv
from selenium import webdriver
from bs4 import BeautifulSoup
from requests.exceptions import RequestException
def generate_movie_list_link():
movie_list_url = "https://www.imdb.com/chart/top?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=MZ4XKE9SDGQAA8DTYPWX&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=moviemeter&ref_=chtmvm_ql_3"
return movie_list_url
def generate_movie_review_list_link(url):
URL = url
try:
response = requests.get(URL)
if response.status_code == 200:
html = response.text
print(html)
soup = BeautifulSoup(html,'lxml')
movies = soup.select('.titleColumn')
movie_reviews_url_list = [[0] * 2 for j in range(250)]
i = 0
for movie in movies:
# header = movie.select_one('.lister-item-header')
movie_link = movie.select_one('a')['href'] #电影链接
id_pattern = re.compile(r'(?<=tt)\d+(?=/?)')
movie_id = int(id_pattern.search(movie_link).group()) #imdb电影id
movie_reviews_url = "https://www.imdb.com/title/tt"+"%07d" % movie_id +"/reviews?spoiler=hide&sort=helpfulnessScore&dir=desc&ratingFilter=0"
movie_reviews_url_list[i] = [movie_id,movie_reviews_url]
i += 1
#print(i,movie_id,movie_reviews_url)
#time.sleep(1)
return movie_reviews_url_list
else:
print("Error when request URL")
except RequestException:
print("Request Failed")
return None
#获取imdb电影评论页面所有非spoiler的用户评论
def get_imdb_movie_review(url,movieId):
NETWORK_STATUS = True # 判断状态变量
URL = url
print(URL)
try:
response = requests.get(URL)
if response.status_code == 200:
original_html = response.text
original_soup = BeautifulSoup(original_html,'lxml')
#需要下载Chromedriver
driver = webdriver.Chrome("C:/Users/MorphLing/AppData/Local/Google/Chrome/User Data/chromedriver.exe")
driver.get(URL)
# 判断是否需要模拟点击load more 按钮
tot = 25
while(tot < 10000):
temp_response = driver.page_source
temp_html = temp_response
temp_soup = BeautifulSoup(temp_html,'lxml')
load_more = temp_soup.select('.ipl-load-more__button')
#需要模拟点击
if(len(load_more) == 1):
button_load_more = driver.find_element_by_class_name('ipl-load-more__button')
button_is_or_not_visible = driver.find_element_by_class_name('ipl-load-more__button').is_displayed()
if(button_is_or_not_visible is True): #button按钮不可见时停止点击
button_load_more.click()
tot += 25
print(tot)
time.sleep(2)
else:
break
else:
break
final_response = driver.page_source
html = final_response
soup = BeautifulSoup(html,'lxml')
movie_id = movieId
reviews = soup.select('.review-container')
movie_reviews_list = [[0] * 4 for j in range(tot)]
i = 0
for review in reviews:
header = review.select_one('.display-name-date')
user_link = header.select_one('a')['href']
user_id_pattern = re.compile(r'(?<=ur)\d+(?=/?)')
user_id = int(user_id_pattern.search(user_link).group()) #用户id
review_date = header.select_one('.review-date').string #用户评论时间
content = review.select_one('.content')
user_review = content.select_one('.text.show-more__control') #用户评论
movie_reviews_list[i][0] = user_id
movie_reviews_list[i][1] = review_date
movie_reviews_list[i][2] = user_review
movie_reviews_list[i][3] = movie_id
i += 1
#print(i,user_id,review_date,user_review,movie_id)
time.sleep(0.01)
driver.close() #关闭网页
return movie_reviews_list
else:
print("Error when request URL")
except RequestException:
print("Request Failed")
return None
except requests.exceptions.Timeout:
NETWORK_STATUS = False # 请求超时改变状态
if NETWORK_STATUS == False:
#'''请求超时'''
driver.close() # 关闭网页
print('请求超时,重复请求')
get_imdb_movie_review(URL, movieId)
if __name__ == '__main__':
i = 0
count = 0
with open('movie_review3.csv', 'w', newline="",encoding='utf-8') as csvfile:
csvwriter = csv.writer(csvfile, dialect=("excel"))
csvwriter.writerow(["userId", "reviewDate", "userReview","movieId"])
movie_list_url = generate_movie_list_link()
movie_review_url_list = generate_movie_review_list_link(movie_list_url)
print(movie_review_url_list)
l = [[0] * 4 for j in range(10500)]
l = get_imdb_movie_review(movie_review_url_list[0][1], movie_review_url_list[0][0])
k = 0
while(l[k][0] != 0):
csvwriter.writerow(l[k])
k += 1
count += 1
print("爬虫完毕,共爬取"+str(count)+"个用户评论")
HillClimbing-1.py
import numpy as np
from collections import defaultdict
import time
def readData():
f = open('cleand_review2.txt',mode='r',encoding='utf-8')
lines = f.readlines()
sentences = ['' for _ in range(len(lines))]
words = [[] for _ in range(len(lines))]
X = []
wordDict = defaultdict()
elements = set()
for i in range(len(lines)):
line = lines[i]
line = line.strip('\n').split('\t')
sentences[i] = line[0]
words[i] = line[1].split(',')[:-1]
for word in words[i]:
elements.add(word)
elements = np.sort(list(elements))
labels = np.arange(len(elements)).astype(int)
for i in range(len(elements)):
wordDict[str(elements[i])] = labels[i]
for i in range(len(words)):
temp = set()
for j in range(len(words[i])):
temp.add(wordDict[words[i][j]])
X.append(temp)
return sentences, wordDict, X, len(elements)
def get_coverage(pick_list, X):
cov_set = set()
for i in pick_list:
for x in X[i]:
cov_set.add(x)
return len(cov_set)
def get_union(x, y):
return x.union(y)
def HillClimbing(X, wordNum):
sentenceNum = len(X)
pick_list = []
pick_word = set()
pre_cov = 0
for _ in range(100):
mx = 0
mxp = 0
for i in range(sentenceNum):
if i in pick_list:
continue
new_pick_word = get_union(pick_word, X[i])
if (len(new_pick_word) - len(pick_word)) > mx:
mx = len(new_pick_word) - len(pick_word)
mxp = i
pick_list.append(mxp)
pick_word = get_union(pick_word, X[mxp])
return pick_list, pick_word
if __name__ == '__main__':
sentences, wordDict, X, wordNum = readData()
st = time.time()
pick_list, pick_word = HillClimbing(X, wordNum)
print(len(pick_word) / len(wordDict) * 100)
print(time.time() - st)
SimulatedAnnealing.py
import numpy as np
from collections import defaultdict
import time
def readData():
f = open('cleand_review2.txt',mode='r',encoding='utf-8')
lines = f.readlines()
sentences = ['' for _ in range(len(lines))]
words = [[] for _ in range(len(lines))]
X = []
wordDict = defaultdict()
elements = set()
for i in range(len(lines)):
line = lines[i]
line = line.strip('\n').split('\t')
sentences[i] = line[0]
words[i] = line[1].split(',')[:-1]
for word in words[i]:
elements.add(word)
elements = np.sort(list(elements))
labels = np.arange(len(elements)).astype(int)
for i in range(len(elements)):
wordDict[str(elements[i])] = labels[i]
for i in range(len(words)):
temp = set()
for j in range(len(words[i])):
temp.add(wordDict[words[i][j]])
X.append(temp)
return sentences, wordDict, X, len(elements)
def get_coverage(pick_list, X):
cov_set = set()
for i in pick_list:
for x in X[i]:
cov_set.add(x)
return len(cov_set)
def SimulatedAnnealing(X, wordNum, initial_t, ratio, eps):
sentenceNum = len(X)
s_vis = np.zeros(sentenceNum)
w_vis = np.zeros(wordNum)
pick_list = []
pick_word = set()
temperature = initial_t
for i in range(100):
j = np.random.randint(sentenceNum)
while j in pick_list:
j = np.random.randint(sentenceNum)
pick_list.append(j)
pre_val = get_coverage(pick_list, X)
T = initial_t
epoch = 0
while(T > eps):
epoch += 1
cur_pick_list = pick_list.copy()
for _ in range(1):
i = cur_pick_list[np.random.randint(100)]
j = np.random.randint(sentenceNum)
while j in cur_pick_list:
j = np.random.randint(sentenceNum)
cur_pick_list.remove(i)
cur_pick_list.append(j)
cur_val = get_coverage(cur_pick_list, X)
diff = cur_val - pre_val
if (diff > 0):
pick_list = cur_pick_list
pre_val = cur_val
else:
pass
if (np.random.random() < np.exp(diff / T)):
pick_list = cur_pick_list
pre_val = cur_val
# print(pre_val, T)
T *= ratio
# print(epoch)
if __name__ == '__main__':
sentences, wordDict, X, wordNum = readData()
print(X)
st = time.time()
result = SimulatedAnnealing(X, wordNum, 100, 0.9996, 0.01)
print(time.time() - st)