将python算法转为scala_海量文本去重simhash算法(python&scala)

  • Post author:
  • Post category:python


1.python(Numpy实现)

具体公式见reference中的论文。

# -*- coding: utf-8 -*-

“””

Created on Mon May 19 09:32:00 2018

@author: wangyao

“””

import jieba

#simhash值直接用包计算,pip install simhash

from simhash import Simhash

import re

import numpy as np

import pandas as pd

#停用词

stopwords = [line.strip() for line in open(‘Stopwords.txt’, ‘r’, encoding=’utf-8′).readlines()]

#文本预处理+特征提取

def get_features(s):

width = 3

string = ”

s = re.sub(r'[^\w]+’, ”, s)

s = jieba.lcut(s)

X,Y = [‘\u4e00′,’\u9fa5’]

s = [ i for i in s if len(i) > 1 and X<=i<=Y and i not in stopwords]

for i in s:

string += i

if string:

return [string[i:i + width] for i in range(max(len(string) – width + 1, 1))]

else:

print (“请输入中文文档”)

#list1 = df.content.apply(lambda x: isinstance(x, str))

#文本预处理

def Pre_Processing(s):

string = ”

s = re.sub(r'[^\w]+’, ”, s)

s = jieba.lcut(s)

X,Y = [‘\u4e00′,’\u9fa5’]

s = [i for i in s if len(i) > 1 and X<=i<=Y and i not in stopwords]

string = string.join(s)

if string:

return string

else:

print(‘请勿输入空字符串或者完全由停用词组成的无意义的句子’)

#simhash包自带的汉明距离

def hanming_simhash(s1,s2):

hanmingdistance = Simhash(Pre_Processing(s1)).distance(Simhash(Pre_Processing(s2)))

#return hanming_distance

return 1-hanmingdistance/64

#将字符串转化为hashcode

def ToSimhashcode(s):

if type(s) == str:

return Simhash(get_features(s)).value

else:

print(‘输入的句子格式需要是字符串’)

#自己写的汉明距离

def hanming_distance(s1,s2):

if type(s1) == str and type(s2) == str:

hanmingdistance = bin(int(hex(Simhash(get_features(s1)).value),16)^int(hex(Simhash(get_features(s2)).value),16)).count(‘1’)

elif type(s1) == int and type(s2) == int:

hanmingdistance = bin(int(hex(s1),16)^int(hex(s2),16)).count(‘1’)

else:

print (‘s1和s2需要是相同的数据类型!’)

#return hanming_distance

return 1-hanmingdistance/64

#直接填入文本列表,生成相似度矩阵(文本较少的时候可以使用此函数)

def simhash_simmatrix_doc(doc_list):

simhash_corpus = []

for i in range(len(doc_list)):

simhash_corpus.append([])

for i in range(len(doc_list)):

print (i)

for j in range(i,len(doc_list)):

simhash_corpus[j].append(hanming_distance(doc_list[i],doc_list[j]))

x = len(simhash_corpus)-1

for i in range(len(simhash_corpus)-1):

simhash_corpus[i].extend(x*[0])

x = x – 1

return np.array(simhash_corpus) + np.array(simhash_corpus).T

#填入文本的hashcode,生成相似度矩阵

def simhash_simmatrix_hashcode(hashcode_list):

simhash_corpus = []

for i in range(len(hashcode_list)):

simhash_corpus.append([])

for i in range(len(hashcode_list)):

print (i)

for j in range(i,len(hashcode_list)):

simhash_corpus[j].append(hanming_distance(hashcode_list[i],hashcode_list[j]))

x = len(simhash_corpus)-1

for i in range(len(simhash_corpus)-1):

simhash_corpus[i].extend(x*[0])

x = x – 1

return np.array(simhash_corpus) + np.array(simhash_corpus).T

#过滤文本生成的相似度矩阵,

def DuplicateContent_filtering_doc(doc_list,sim = 0.8):

if sim>1 or sim<0:

print (‘错误的取值范围!!范围应该是[0,1]’)

sim_matrix = simhash_simmatrix_doc(doc_list)>sim

return sim_matrix

#过滤hashcode生成的相似度矩阵

def DuplicateContent_label_hashcode(hashcode_array,sim = 0.8):

if sim>1 or sim<0:

print (‘错误的取值范围!!范围应该是[0,1]’)

sim_matrix = hashcode_array>sim

return sim_matrix

#列表求交集

def list_intersection(list1,list2):

list3 = list(set(list1) & set(list2))

return list3

#清洗重复的文章列表

def clean_Duplicate_List(Duplicate_List):

Duplicate_List = [list(set(i)) for i in Duplicate_List]

Duplicate = []

for i in range(len(Duplicate_List)):

temp = []

for j in range(len(Duplicate_List)):

if list_intersection(Duplicate_List[i],Duplicate_List[j]) != []:

temp.extend(Duplicate_List[i])

temp.extend(Duplicate_List[j])

Duplicate.append(temp)

Duplicate = [list(set(i)) for i in Duplicate]

NoDuplicate = []

for i in Duplicate:

if i in NoDuplicate:

pass

else:

NoDuplicate.append(i)

NoDuplicate = [list(set(i)) for i in NoDuplicate]

return NoDuplicate

#读入相似度矩阵,返回非重复的文章ID和重复的文章ID

def DuplicateContent_filtering_hashcode(matrix):

NoDuplicateList = []

DuplicateList = []

NoDuplicate_List = []

Duplicate_List = []

for i in range(len(matrix)):

DuplicateList.append([])

for i in range(len(matrix)):

NoDuplicateList.append([])

if (matrix[i] == True).sum() <= 1:

NoDuplicateList[i].append(ID[i])

else:

for j in range(len(matrix[i])):

if matrix[i][j]==True and i < j:

DuplicateList[i].extend([i,j])

for i in DuplicateList:

if i != []:

Duplicate_List.append(i)

else:

pass

for i in NoDuplicateList:

if i != []:

NoDuplicate_List.append(i)

else:

pass

return NoDuplicate_List,clean_Duplicate_List(Duplicate_List)

短文本,如果文本很短,可以直接调用simhash_simmatrix_doc,直接读入字符串即可产生结果。

df = pd.read_excel(’00.xlsx’)

df.content = df.content.apply(Pre_Processing)

content_list = df.content.tolist()

simhash_simmatrix_doc(content_list)

如果需要存进hive表中,需要先把矩阵转换成pandas中的dataframe再转化为spark中的dataframe。

//转化

values=pandas_df.values.tolist()

cols_name=list(pandas_df.columns)

spark_df = spark.createDataFrame(values, cols_name)

turn_pandas_df = spark_df.toPandas()

如果是是直接使用spark,两两比较可以使用reducebykey来聚合,查找的时候才需要使用索引。如果是在python中按照上面的代码两两比较会非常耗时,所以比较的时候就需要引入simhash的索引。

网上的例子:

from simhash import Simhash, SimhashIndex

# 建立索引

data = {

u’1′: u’How are you I Am fine . blar blar blar blar blar Thanks .’.lower().split(),

u’2′: u’How are you i am fine .’.lower().split(),

u’3′: u’This is simhash test .’.lower().split(),

}

objs = [(id, Simhash(sent)) for id, sent in data.items()]

index = SimhashIndex(objs, k=10) # k是容忍度;k越大,检索出的相似文本就越多

# 检索

s1 = Simhash(u’How are you . blar blar blar blar blar Thanks’.lower().split())

print index.get_near_dups(s1)

# 增加新索引

index.add(u’4′, s1)

2.scala实现

scala中没有simhash包,需要自己实现FNVHash来给文本转码。

目前用的比较多的是FNV-1和FNV-1a,FNV-0已弃用。

FNV-1:

hash = offset_basis

for each octet_of_data to be hashed

hash = hash * FNV_prime

hash = hash xor octet_of_data

return hash

FNV-1a:

hash = offset_basis

for each octet_of_data to be hashed

hash = hash xor octet_of_data

hash = hash * FNV_prime

return hash

FNV-1a算法只是将相乘和异或运算进行了顺序调换。

import java.math.BigInteger

object FNVHash {

val HASH_BITS = 64

val FNV_64_INIT = new BigInteger(“14695981039346656037”)

val FNV_64_PRIME = new BigInteger(“1099511628211”)

val MASK_64: BigInteger = BigInteger.ONE.shiftLeft(HASH_BITS).subtract(BigInteger.ONE)

val str = “电风扇卡机是可敬的搞屎棍的”

val str1 = “电风扇卡机是可敬的搞屎棍的对对对”

/**

* fnv-1 hash算法,将字符串转换为64位hash值

*

* @param str str

* @return

*/

def fnv1Hash64(str: String): BigInteger = {

var hash = FNV_64_INIT

val len = str.length

for (i

hash = hash.multiply(FNV_64_PRIME)

hash = hash.xor(BigInteger.valueOf(str.charAt(i)))

}

hash = hash.and(MASK_64)

hash

}

/**

* fnv-1a hash算法,将字符串转换为64位hash值

*

* @param str str

* @return

*/

def fnv1aHash64(str: String): BigInteger = {

var hash = FNV_64_INIT

val len = str.length

var i = 0

for (i

hash = hash.xor(BigInteger.valueOf(str.charAt(i)))

hash = hash.multiply(FNV_64_PRIME)

}

hash = hash.and(MASK_64)

hash

}

/**

* 返回二进制串hash距离

*

* @param str1 str1

* @param str2 str2

* @return

*/

def getDistance(str1: String, str2: String): Int = {

var distance = 0

if (str1.length != str2.length) distance = -1

else {

distance = 0

val len = str1.length

for (i

if (str1.charAt(i) != str2.charAt(i)) distance += 1

}

}

distance

}

def hashContent_d(content: Array[String]): BigInteger = {

val featureVector: Array[Double] = new Array[Double](FNVHash.HASH_BITS)

val wordInfos = content.map((_, 1)).groupBy(_._1).

map(r => (r._1, r._2.length.toDouble)).

map(r => {

val wordHash = FNVHash.fnv1aHash64(r._1)

for (i

val bitmask: BigInteger = BigInteger.ONE.shiftLeft(FNVHash.HASH_BITS – i – 1)

if (wordHash.and(bitmask).signum != 0)

featureVector(i) += r._2.toDouble

else

featureVector(i) -= r._2.toDouble

}

})

var signature: BigInteger = BigInteger.ZERO

for (i

if (featureVector(i) >= 0) {

signature = signature.add(BigInteger.ONE.shiftLeft(FNVHash.HASH_BITS – i – 1))

}

}

signature

}

def hashContent(content: Array[String]): BigInteger = {

val featureVector: Array[Double] = new Array[Double](HASH_BITS)

val wordInfos = content.map((_, 1)).groupBy(_._1).

map(r => (r._1, r._2.length.toDouble)).

map(r => {

val wordHash = fnv1aHash64(r._1)

for (i

val bitmask: BigInteger = BigInteger.ONE.shiftLeft(HASH_BITS – i – 1)

if (wordHash.and(bitmask).signum != 0)

featureVector(i) += r._2.toDouble

else

featureVector(i) -= r._2.toDouble

}

})

var signature: BigInteger = BigInteger.ZERO

for (i

if (featureVector(i) >= 0) {

signature = signature.add(BigInteger.ONE.shiftLeft(HASH_BITS – i – 1))

}

}

signature

}

def hashContentStr(content: String): BigInteger = {

hashContent(content.split(” “))

}

// 判断汉明距离是否大于kBits

def distance(hash1: Long, hash2: Long, kBits: Int): Boolean = {

var n = hash1 ^ hash2

for (i 0) {

n = n & (n – 1)

}

n == 0

}

}

Reference:



版权声明:本文为weixin_39900582原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。