基于物品余弦相似度的协同过滤算法

  • Post author:
  • Post category:其他


import random
import math

class DataSet():

    def __init__(self, filePath):
        self.data = self.loadData_100K(filePath)
        # self.data = self.loadData_1M(filePath)

    def loadData_100K(self, filePath):
        print("加载数据...")
        data = []
        for line in open(filePath):
            # 获取前两个数据:userId及该user评价的电影的id
            data.append(tuple(map(int, line.strip().split('\t')[:2])))
        print("数据加载完成")
        return data

    def loadData_1M(self, filePath):
        print("加载数据...")
        data = []
        for line in open(filePath):
            # 获取前两个数据:userId及该user评价的电影的id
            data.append(tuple(map(int, line.strip().split('::')[:2])))
        print("数据加载完成")
        return data

    def splitData(self, M, k, seed=1):
        '''
        :params: data, 加载的所有(user, item)数据条目
        :params: M, 划分的份数,最后需要取M折的平均,用M次试验的平均值作为最后的评测指标
        :params: k, 本次是第几次划分,k~[0, M)
        :params: seed, random的种子数,对于不同的k应设置成一样的
        :return: train, test
        '''
        print("正在拆分数据...")
        train, test = [], []
        random.seed(seed)
        for userID, movieID in self.data:
            if random.randint(0, M) == k:
                test.append((userID, movieID))
            else:
                train.append((userID, movieID))
        print("数据已拆分成训练集和测试集")
        print("训练集长度:", train.__len__())
        print("测试集长度:", test.__len__())

        # 建立用户-物品倒排表:
        # 即对每个用户建立一个包含他所有评价过的电影列表
        # 处理成字典的形式,user->set(items),键为userId, 值为该用户全部所评价的电影列表
        def convert_dict(data):
            data_dict = {}
            for userId, movieId in data:
                if userId not in data_dict:
                    data_dict[userId] = set()
                data_dict[userId].add(movieId)
            data_dict = {k: list(data_dict[k]) for k in data_dict}
            return data_dict

        print("将数据转换成字典后输出前2个用户-电影示例数据:")
        for key, value in convert_dict(train).items():
            if key < 3:
                print(key, value)
        # p1 = {key: value for key, value in convert_dict(train).items() if key < 5}
        # print("p1",  p1)
        return convert_dict(train), convert_dict(test)

# 基于物品余弦相似度的推荐
def ItemCF(train, K, N):
    '''
    :params: train, 训练数据集
    :params: K, 取与目标物品最相似的前K个物品
    :params: N, 为用户推荐最符合用户个性的前N个物品
    :return: GetRecommendation, 推荐接口函数
    '''
    # 计算物品相似度
    print("计算物品相似度...")
    sim = {}  # 相似度矩阵
    num = {}  # 每种movieId出现的次数
    for user in train:
        movieIds = train[user]
        for i in range(len(movieIds)):
            u = movieIds[i]  # 获取movieId
            if u not in num:  # 对相同的电影计数
                num[u] = 0
            num[u] += 1
            if u not in sim:  #
                sim[u] = {}
            for j in range(len(movieIds)):
                if j == i: continue
                # 将出现的movieId加入相似度矩阵
                v = movieIds[j]
                if v not in sim[u]:
                    sim[u][v] = 0
                sim[u][v] += 1
    for u in sim:
        for v in sim[u]:
            sim[u][v] /= math.sqrt(num[u] * num[v])

    # 按照相似度排序
    sorted_item_sim = {k: list(sorted(v.items(), key=lambda x: x[1], reverse=True)) \
                       for k, v in sim.items()}

    # 获取接口函数:为每个用户计算推荐列表,并取逆序排序后的前N个
    def GetRecommendation(user):
        items = {}
        seen_items = set(train[user])  # 获取用户历史电影数据movieId
        for item in train[user]:
            for u, v in sorted_item_sim[item][:K]:
                #  为用户推荐的电影应该是不在用户的历史记录中
                if u not in seen_items:
                    if u not in items:
                        items[u] = 0
                    items[u] += sim[item][u]
        # 对推荐列表逆序排序并取前N个
        recs = list(sorted(items.items(), key=lambda x: x[1], reverse=True))[:N]
        return recs

    return GetRecommendation

#评价指标的计算
class Metric():

    def __init__(self, train, test, GetRecommendation):
        '''
        :params: train, 训练数据
        :params: test, 测试数据
        :params: GetRecommendation, 为某个用户获取推荐物品的接口函数
        '''
        self.train = train
        self.test = test
        self.GetRecommendation = GetRecommendation
        self.recs = self.getRec()

    # 为test中的每个用户进行推荐
    def getRec(self):
        print("为测试集test中的每个用户进行推荐...")
        recs = {}
        for user in self.test:
            rank = self.GetRecommendation(user)
            recs[user] = rank
            if user < 10:  # 为了查看推荐结果,根据用户id输出前十条数据到控制台
                print("recs[%d]=" % (user), recs[user])
        return recs

    # 精确率指标计算
    # 精确率:为用户最终推荐的列表中有多少比例是发生过的用户-物品评分记录中(测试集中)
    def precision(self):
        print("计算精确率...")
        all, hit = 0, 0
        for user in self.test:
            test_items = set(self.test[user])
            rank = self.recs[user]
            for item, score in rank:
                if item in test_items:
                    hit += 1
            all += len(rank)
        return round(hit / all * 100, 2)

    # 召回率指标计算
    # 召回率:有多少比例的用户-评分记录包含在最终的推荐列表中
    def recall(self):
        print("计算召回率...")
        all, hit = 0, 0
        for user in self.test:
            test_items = set(self.test[user])
            rank = self.recs[user]
            for item, score in rank:
                if item in test_items:
                    hit += 1
            all += len(test_items)
        return round(hit / all * 100, 2)

    # 覆盖率指标计算
    # 覆盖率:最终的推荐列表中包含多大比例的物品
    def coverage(self):
        print("计算覆盖率...")
        all_item, recom_item = set(), set()
        for user in self.test:
            for item in self.train[user]:
                all_item.add(item)
            rank = self.recs[user]
            for item, score in rank:
                recom_item.add(item)
        return round(len(recom_item) / len(all_item) * 100, 2)

    # 定义流行度指标计算方式
    def popularity(self):
        print("计算流行度...")
        # 计算物品的流行度
        item_pop = {}
        for user in self.train:
            for item in self.train[user]:
                if item not in item_pop:
                    item_pop[item] = 0
                item_pop[item] += 1
        num, pop = 0, 0
        for user in self.test:
            rank = self.recs[user]
            for item, score in rank:
                # 对每个物品的流行度取对数,
                # 这是因为物品的流行度分布满足长尾分布,在取对数后,流行度的平均值更加稳定
                pop += math.log(1 + item_pop[item])
                num += 1
        return round(pop / num, 6)

    def eval(self):
        metric = {'准确率': self.precision(),
                  '召回率': self.recall(),
                  '覆盖率': self.coverage(),
                  '流行度': self.popularity()}
        print('评价指标:', metric)
        print("-"*50)
        return metric

class Experiment():
    def __init__(self, M, K, N, filePath):
        '''
        :params: M, 进行多少次实验
        :params: K, TopK相似物品的个数
        :params: N, TopN推荐物品的个数
        :params: filePath, 数据文件路径
        '''
        self.M = M
        self.K = K
        self.N = N
        self.filePath = filePath

    # 定义单次实验
    def worker(self, train, test):
        '''
        :params: train, 训练数据集
        :params: test, 测试数据集
        :return: 各指标的值
        '''
        print("正在进行计算...")
        getRecommendation = ItemCF(train, self.K, self.N)
        metric = Metric(train, test, getRecommendation)
        return metric.eval()

    # 多次实验取平均
    def run(self):
        metrics = {'准确率': 0, '召回率': 0,
                   '覆盖率': 0, '流行度': 0}
        dataset = DataSet(self.filePath)
        for i in range(self.M):
            train, test = dataset.splitData(self.M, i)
            print('第{}次试验:'.format(i+1))
            metric = self.worker(train, test)
            # 对每次实验获得的各项指标分别进行累加
            metrics = {k: metrics[k] + metric[k] for k in metrics}
        # 计算各项指标的平均值
        metrics = {k: metrics[k] / self.M for k in metrics}
        print('平均结果 (M={}, K={}, N={}): {}'.format(self.M, self.K, self.N, metrics))

# numpy pandas
'''
:params: M, 进行多少次实验
:params: K, TopK相似物品的个数
:params: N, TopN推荐物品的个数
'''
if __name__ == '__main__' :
    filePath_1M = 'D:/python/testProjects/test_01/data/ml-1m/ratings.dat'
    filePath_100K = "D:/2020暑假学习/协同过滤算法/数据/ml-100k/u.data"
    M, K, N = 2, 10, 3
    itemCF = Experiment(M, K, N, filePath_100K)
    print("总共进行的试验次数:", M)
    print("TopK相似物品的个数:", K)
    print("TopN推荐物品的个数:", N)
    print("-"*50)
    itemCF.run()

'''
基于物品的协同过滤算法:
    给用户推荐那些他们之前喜欢的物品相似的物品
    通过分析用户的行为记录计算物品之间的相似度
    即:物品A和B具有很高的相似度,是因为喜欢物品A的用户都喜欢物品B
'''

数据下载地址:

http://files.grouplens.org/datasets/movielens/



版权声明:本文为fu_jian_ping原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。