RDD属性。
只读:不能修改,只能通过转换操作生成新的 RDD。
分布式:可以分布在多台机器上进行并行处理。
弹性:计算过程中内存不够时它会和磁盘进行数据交换。
基于内存:可以全部或部分缓存在内存中,在多次计算间重用
# -*- coding: utf-8 -*-
# uptime 8 月 26
import requests
# v36 二级违禁专项排查
import time
import json
import MySQLdb,zlib
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
import xlrd
env = os.environ
from pyspark import SparkContext
from pyspark.sql import SparkSession
os.system('mkdir %s' % env['BUILD_NUMBER'])
from weijin_data.utils.validate_data import download
# # 根据表单数据,下载资源并进行验证
weijin_link = env.get('weijin_link')
print(weijin_link)
print('===========')
# 下载文件
save_path = download(weijin_link,file_name='weijin.xlsx')
print(save_path)
book = xlrd.open_workbook(save_path,encoding_override='utf-8')
# book = xlrd.open_workbook('全站违禁词列表 .xlsx',encoding_override='utf-8')
table = book.sheet_by_index(0)
nrow = table.nrows
weijin_list = []
for q in range(1, nrow):
str1 = table.cell(q, 0).value # 主词
str3 = table.cell(q, 4).value # 等级
str2 = table.cell(q, 6).value # 正选词
fa_x = table.cell(q,7).value # 反选词
if int(str3) ==2 and fa_x == '':
str_list = str(str2).strip().split(',')
str_list = list(filter(None, str_list))
if str_list:
for i in str_list:
weijin_str = str1 + i
weijin_list.append(str(weijin_str).replace('\u3000','').replace('\xa0',''))
if int(str3) ==2 and fa_x == '' and str2== '':
weijin_list.append(str(str1).replace('\u3000', '').replace('\xa0', ''))
print (weijin_list)
print(len(weijin_list))
assert len(weijin_list) >=1,'无违禁词'
mysql_config = {"host": "192.168.9.12",
"port": 3306,
'user': "root",
"passwd": "gc7232275",
"db": "test",
"charset": "utf8"}
conn = MySQLdb.connect(**mysql_config)
cursor1 = conn.cursor()
# 清空
cursor1.execute('truncate table test.weijinv36_0808_copy2')
print('清空weijinv36_0808_copy2表')
print('开始处理专项排查')
spark_ctx = SparkContext(appName="weijin_check_data")
spark_session = SparkSession(spark_ctx)
mysql_config1 = {"host": "192.168.9.12",
"port": 3306,
'user': "root",
"passwd": "gc7232275",
"db": "gcV3_6",
"charset": "utf8mb4"}
conn1 = MySQLdb.connect(**mysql_config1)
cursor1 = conn1.cursor()
cursor1.execute("select itemid from destoon_sell_5 order by itemid desc limit 100")
up_itemid = cursor1.fetchall()[0][0]
rdd = spark_session.read.jdbc("jdbc:mysql://10.101.40.166/gcV3_6?tinyInt1isBit=false&useSSL=true", "destoon_sell_5",lowerBound=24,column="itemid",upperBound=int(up_itemid),numPartitions=6000,properties={"user":"root","password":"gc7232275","driver":'com.mysql.jdbc.Driver'}).select("itemid","catid","title","status","addtime").rdd.map(lambda x:x.asDict())
print ('===============')
def map_fun(data):
import ahocorasick
import MySQLdb
ac = ahocorasick.AhoCorasick(*weijin_list) # 从列表里匹配关键字
def handle(old_str):
new_str = old_str or ''
if not new_str:
return ''
result = ac.search(old_str)
desc_str = ''
for r in result:
desc_str = desc_str + r
return desc_str
mysql_config = {"host": "192.168.9.12",
"port": 3306,
'user': "root",
"passwd": "gc7232275",
"db": "gcV3_6",
"charset": "utf8mb4"}
conn = MySQLdb.connect(**mysql_config)
mysql_config1 = {"host": "192.168.9.12",
"port": 3306,
'user': "root",
"passwd": "gc7232275",
"db": "test",
"charset": "utf8"}
conn1 = MySQLdb.connect(**mysql_config1)
for i in data:
id = int(i["itemid"])
catid = int(i["catid"])
title = i.get("title","")
status = int(i.get("status", ""))
new_title = handle(title)
print (new_title)
content = ''
if status == 3:
try:
conn.ping()
except:
conn = MySQLdb.connect(**mysql_config)
cursor = conn.cursor()
if id % 2000000 == 0:
table_id = id / 2000000
else:
table_id = id / 2000000 + 1
cursor.execute("select content from destoon_5_%s where itemid=%%s" % int(table_id),(int(id),))
try:
content = cursor.fetchall()[0][0]
except:
cursor.close()
new_content = handle(content)
if new_content:
print ('===============')
print (new_title)
try:
conn1.ping()
except:
conn1 = MySQLdb.connect(**mysql_config1)
try:
cursor1 = conn1.cursor()
url = 'https://chanpin.gongchang.com/show/' + str(id) + '/'
print (url)
cursor1.execute("insert ignore into test.weijinv36_0808_copy2(itemid,catid,title,url,title_weijin,content_weijin)values(%s,%s,%s,%s,%s,%s)",(int(id), catid, title, url, new_title, new_content))
conn1.commit()
print ('ok')
cursor1.close()
except Exception as e:
print (e)
conn1.close()
conn.close()
rdd.foreachPartition(map_fun)
print ('成功')
版权声明:本文为zhplz123原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。