spark在 python 中运用

  • Post author:
  • Post category:python


RDD属性。

只读:不能修改,只能通过转换操作生成新的 RDD。

分布式:可以分布在多台机器上进行并行处理。

弹性:计算过程中内存不够时它会和磁盘进行数据交换。

基于内存:可以全部或部分缓存在内存中,在多次计算间重用

# -*- coding: utf-8 -*-
# uptime  8 月 26
import requests
#  v36 二级违禁专项排查
import time
import json
import MySQLdb,zlib
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
import xlrd
env = os.environ
from pyspark import SparkContext
from pyspark.sql import  SparkSession
os.system('mkdir %s' % env['BUILD_NUMBER'])

from weijin_data.utils.validate_data import download
# # 根据表单数据,下载资源并进行验证
weijin_link = env.get('weijin_link')
print(weijin_link)
print('===========')

# 下载文件
save_path = download(weijin_link,file_name='weijin.xlsx')
print(save_path)
book = xlrd.open_workbook(save_path,encoding_override='utf-8')
# book = xlrd.open_workbook('全站违禁词列表 .xlsx',encoding_override='utf-8')
table = book.sheet_by_index(0)
nrow = table.nrows
weijin_list = []
for q in range(1, nrow):
    str1 = table.cell(q, 0).value  # 主词
    str3 = table.cell(q, 4).value  # 等级
    str2 = table.cell(q, 6).value # 正选词
    fa_x = table.cell(q,7).value # 反选词
    if int(str3) ==2 and fa_x == '':
        str_list = str(str2).strip().split(',')
        str_list = list(filter(None, str_list))
        if str_list:
            for i in str_list:
                weijin_str = str1 + i
                weijin_list.append(str(weijin_str).replace('\u3000','').replace('\xa0',''))
    if int(str3) ==2 and fa_x == '' and str2== '':
        weijin_list.append(str(str1).replace('\u3000', '').replace('\xa0', ''))

print (weijin_list)
print(len(weijin_list))
assert len(weijin_list) >=1,'无违禁词'


mysql_config = {"host": "192.168.9.12",
                 "port": 3306,
                 'user': "root",
                 "passwd": "gc7232275",
                 "db": "test",
                     "charset": "utf8"}
conn = MySQLdb.connect(**mysql_config)
cursor1 = conn.cursor()
# 清空
cursor1.execute('truncate table test.weijinv36_0808_copy2')
print('清空weijinv36_0808_copy2表')
print('开始处理专项排查')



spark_ctx = SparkContext(appName="weijin_check_data")
spark_session = SparkSession(spark_ctx)
mysql_config1 = {"host": "192.168.9.12",
                     "port": 3306,
                     'user': "root",
                     "passwd": "gc7232275",
                     "db": "gcV3_6",
                     "charset": "utf8mb4"}
conn1 = MySQLdb.connect(**mysql_config1)
cursor1 = conn1.cursor()
cursor1.execute("select itemid from destoon_sell_5 order by itemid desc limit 100")
up_itemid = cursor1.fetchall()[0][0]
rdd = spark_session.read.jdbc("jdbc:mysql://10.101.40.166/gcV3_6?tinyInt1isBit=false&useSSL=true", "destoon_sell_5",lowerBound=24,column="itemid",upperBound=int(up_itemid),numPartitions=6000,properties={"user":"root","password":"gc7232275","driver":'com.mysql.jdbc.Driver'}).select("itemid","catid","title","status","addtime").rdd.map(lambda x:x.asDict())

print ('===============')
def map_fun(data):
    import ahocorasick
    import MySQLdb
    ac = ahocorasick.AhoCorasick(*weijin_list)  # 从列表里匹配关键字
    def handle(old_str):
        new_str = old_str or ''
        if not new_str:
            return ''
        result = ac.search(old_str)
        desc_str = ''
        for r in result:
            desc_str = desc_str + r
        return desc_str

    mysql_config = {"host": "192.168.9.12",
                    "port": 3306,
                    'user': "root",
                    "passwd": "gc7232275",
                    "db": "gcV3_6",
                    "charset": "utf8mb4"}
    conn = MySQLdb.connect(**mysql_config)

    mysql_config1 = {"host": "192.168.9.12",
                     "port": 3306,
                     'user': "root",
                     "passwd": "gc7232275",
                     "db": "test",
                     "charset": "utf8"}
    conn1 = MySQLdb.connect(**mysql_config1)
    for i in data:
        id = int(i["itemid"])
        catid = int(i["catid"])
        title = i.get("title","")
        status = int(i.get("status", ""))
        new_title = handle(title)
        print (new_title)
        content = ''
        if status == 3:
            try:
                conn.ping()
            except:
                conn = MySQLdb.connect(**mysql_config)
            cursor = conn.cursor()
            if id % 2000000 == 0:
                table_id = id / 2000000
            else:
                table_id = id / 2000000 + 1
            cursor.execute("select content from destoon_5_%s where itemid=%%s" % int(table_id),(int(id),))

            try:
                content = cursor.fetchall()[0][0]
            except:
                cursor.close()
            new_content = handle(content)
            if new_content:
                print ('===============')
                print (new_title)
                try:
                    conn1.ping()
                except:
                    conn1 = MySQLdb.connect(**mysql_config1)
                try:
                    cursor1 = conn1.cursor()
                    url = 'https://chanpin.gongchang.com/show/' + str(id) + '/'
                    print (url)
                    cursor1.execute("insert ignore into test.weijinv36_0808_copy2(itemid,catid,title,url,title_weijin,content_weijin)values(%s,%s,%s,%s,%s,%s)",(int(id), catid, title, url, new_title, new_content))
                    conn1.commit()
                    print ('ok')
                    cursor1.close()
                except Exception as e:
                    print (e)
    conn1.close()
    conn.close()

rdd.foreachPartition(map_fun)
print ('成功')

















版权声明:本文为zhplz123原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。