首先说明,此方案是一个不可行的方案。
与导入Mysql数据库不同,Hive数据库不支持记录级数据插入;即使一些版本支持,插入速度也是奇慢。Hive主要优势在于处理批量数据,数据量越大越能体现出性能优势;数据量小,如记录级数据插入,则没有可用性。所以,对于使用python将json数据解析出来再一条条插入的方法肯定是行不通的。方案记录在此,为通过python连接操作Hive数据库等提供一些参考。
一、环境准备
1、安装thritf依赖库
#yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel
2、拷贝库文件夹
(
如果在脚本中引用,此步不拷也可以
)
cp -r $HIVE_PATH/lib/py /usr/local/lib/python2.7/site-packages
3、默认Hadoop环境、Python环境、Hive的thritf服务及Json文件已经准备好
二、实现数据插入的脚本
简版是为了测试用,正式版本是如果方案可行的话的可正式使用的脚本。
1、json2hive_python简版
[spark@Master Py_logproc]$ pwd
/home/spark/opt/Log_Data/Py_logproc
[spark@Master Py_logproc]$ cat json2hive_python_recordasarray_basic.py
# -*- encoding:utf-8 -*-
#!/usr/bin/env python
import sys
sys.path.append('/home/spark/opt/hive-1.2.1/lib/py')
from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
import json
import warnings
warnings.filterwarnings("ignore")
def hiveExe(sql):
try:
transport = TSocket.TSocket('127.0.0.1', 10000)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = ThriftHive.Client(protocol)
transport.open()
client.execute(sql)
transport.close()
except Thrift.TException, tx:
print '%s' % (tx.message)
if __name__=="__main__":
import sys
reload(sys)
sys.setdefaultencoding( "utf-8" )
if len(sys.argv)==1:
print "need argv"
else:
print sys.argv
for json_array in open('/home/spark/opt/Log_Data/Py_logproc/log_tmpdir/yemaopythonlog'):
yemao_array = json.loads(json_array)
for yemao in yemao_array:
print yemao['time']
if not yemao.has_key('_reason'):
id = yemao['id']
time = yemao['time']
url_from = yemao['url_from']
url_current = yemao['url_current']
url_to = yem
版权声明:本文为nisjlvhudy原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。