文章相似度增量更新
创始人
2024-05-15 16:49:53
0

2.8 文章相似度增量更新

目标

  • 目标
    • 知道文章向量计算方式
    • 了解Word2Vec模型原理
    • 知道文章相似度计算方式
  • 应用
    • 应用Spark完成文章相似度计算

2.8.1 增量更新需求

每天、每小时都会有大量的新文章过来,当后端审核通过一篇文章之后,我们推荐给用户后,一旦发生点击行为了,就会有相应相似文章计算需求。没有选择去在线计算中处理,新文章来了之后立即推荐召回给用用户,这样增加新文章推荐曝光的比例,达到一定速度的曝光。在线需要用用到离线仓库中部分数据,所以速度会受影响。

  • 设计:
    • 按照之前增量更新文章画像那样的频率,按小时更新增量文章的相似文章
  • 如何更新:
    • 批量新文章,需要与历史数据进行相似度计算

2.8.2 增量更新文章向量与相似度

  • 目标:对每次新增的文章,计算完画像后,计算向量,在进行与历史文章相似度计算
  • 步骤:
    • 1、新文章数据,按照频道去计算文章所在频道的相似度
    • 2、求出新文章向量,保存
    • 3、BucketedRandomProjectionLSH计算相似度

完整代码:

    def compute_article_similar(self, articleProfile):"""计算增量文章与历史文章的相似度 word2vec:return:"""# 得到要更新的新文章通道类别(不采用)# all_channel = set(articleProfile.rdd.map(lambda x: x.channel_id).collect())def avg(row):x = 0for v in row.vectors:x += v#  将平均向量作为article的向量return row.article_id, row.channel_id, x / len(row.vectors)for channel_id, channel_name in CHANNEL_INFO.items():profile = articleProfile.filter('channel_id = {}'.format(channel_id))wv_model = Word2VecModel.load("hdfs://hadoop-master:9000/headlines/models/channel_%d_%s.word2vec" % (channel_id, channel_name))vectors = wv_model.getVectors()# 计算向量profile.registerTempTable("incremental")articleKeywordsWeights = ua.spark.sql("select article_id, channel_id, keyword, weight from incremental LATERAL VIEW explode(keywords) AS keyword, weight where channel_id=%d" % channel_id)articleKeywordsWeightsAndVectors = articleKeywordsWeights.join(vectors,vectors.word == articleKeywordsWeights.keyword, "inner")articleKeywordVectors = articleKeywordsWeightsAndVectors.rdd.map(lambda r: (r.article_id, r.channel_id, r.keyword, r.weight * r.vector)).toDF(["article_id", "channel_id", "keyword", "weightingVector"])articleKeywordVectors.registerTempTable("tempTable")articleVector = self.spark.sql("select article_id, min(channel_id) channel_id, collect_set(weightingVector) vectors from tempTable group by article_id").rdd.map(avg).toDF(["article_id", "channel_id", "articleVector"])# 写入数据库def toArray(row):return row.article_id, row.channel_id, [float(i) for i in row.articleVector.toArray()]articleVector = articleVector.rdd.map(toArray).toDF(['article_id', 'channel_id', 'articleVector'])articleVector.write.insertInto("article_vector")import gcdel wv_modeldel vectorsdel articleKeywordsWeightsdel articleKeywordsWeightsAndVectorsdel articleKeywordVectorsgc.collect()# 得到历史数据, 转换成固定格式使用LSH进行求相似train = self.spark.sql("select * from article_vector where channel_id=%d" % channel_id)def _array_to_vector(row):return row.article_id, Vectors.dense(row.articleVector)train = train.rdd.map(_array_to_vector).toDF(['article_id', 'articleVector'])test = articleVector.rdd.map(_array_to_vector).toDF(['article_id', 'articleVector'])brp = BucketedRandomProjectionLSH(inputCol='articleVector', outputCol='hashes', seed=12345,bucketLength=1.0)model = brp.fit(train)similar = model.approxSimilarityJoin(test, train, 2.0, distCol='EuclideanDistance')def save_hbase(partition):import happybasefor row in partition:pool = happybase.ConnectionPool(size=3, host='hadoop-master')# article_similar article_id similar:article_id simwith pool.connection() as conn:table = connection.table("article_similar")for row in partition:if row.datasetA.article_id == row.datasetB.article_id:passelse:table.put(str(row.datasetA.article_id).encode(),{b"similar:%d" % row.datasetB.article_id: b"%0.4f" % row.EuclideanDistance})conn.close()similar.foreachPartition(save_hbase)

添加函数到update_article.py文件中,修改update更新代码

ua = UpdateArticle()
sentence_df = ua.merge_article_data()
if sentence_df.rdd.collect():rank, idf = ua.generate_article_label(sentence_df)articleProfile = ua.get_article_profile(rank, idf)ua.compute_article_similar(articleProfile)

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...