【阿里云数据总线】Datahub使用Python SDK记录

  • Post author:
  • Post category:python




1. 阿里云 python sdk的安装和使用

参考:

阿里云python sdk的使用: https://help.aliyun.com/document_detail/67117.html?spm=a2c4g.11186623.6.544.4b3042c5OiDjys

阿里云region id 地域和可用区:https://help.aliyun.com/document_detail/40654.html?spm=5176.10695662.1996646101.1.46b36dd0Sy8kEd



1.1 PIP安装:

pip install aliyun-python-sdk-core # 安装阿里云SDK核心库
pip install aliyun-python-sdk-ecs # 安装管理ECS的库



1.2 使用AccessKey简单调用ECS

  1. 导入相关产品的SDK (此处为ECS )

    from aliyunsdkcore.client import AcsClient
    from aliyunsdkcore.acs_exception.exceptions import ClientException
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkecs.request.v20140526 import DescribeInstancesRequest
    from aliyunsdkecs.request.v20140526 import StopInstanceRequest
    
  2. 新建一个AcsClient

    client = AcsClient(
       "<your-access-key-id>", 
       "<your-access-key-secret>",
       "<your-region-id>"
    );
    

    注意:Region ID 为你购买相应产品时选择的区域ID, 详见

    Region ID文档

    国内Region ID 表:

    地域名称 所在城市 Region ID 可用区数量
    华北 1 青岛 cn-qingdao 2
    华北 2 北京 cn-beijing 8
    华北 3 张家口 cn-zhangjiakou 3
    华北 5 呼和浩特 cn-huhehaote 2
    华北 6 乌兰察布 cn-wulanchabu 2
    华东 1 杭州 cn-hangzhou 8
    华东 2 上海 cn-shanghai 7
    华南 1 深圳 cn-shenzhen 5
    华南 2 河源 cn-heyuan 2
    西南 1 成都 cn-chengdu 2
    中国香港 香港 cn-hongkong 2
  3. 创建Request对象。

    request = DescribeInstancesRequest.DescribeInstancesRequest()
    request.set_PageSize(10)
    
  4. 发起调用并处理返回。

    try:
        response = client.do_action_with_exception(request)
        print(response)
    except ServerException as e:
        print(repr(e))
    except ClientException as e:
        print(repr(e))
    



2. Datahub Python SDK的安装和使用

参考:

Datahub名词解释及官方文档: https://help.aliyun.com/document_detail/158776.html?spm=a2c4g.11186623.6.545.3af9329a3eUfXg

Python库pydatahub安装指南:https://pydatahub.readthedocs.io/zh_CN/latest/installation.html

pydatahub在github源码及Demo: https://github.com/aliyun/aliyun-datahub-sdk-python/tree/master/examples



2.1 安装pydatahub

pip install pydatahub



2.2 使用pydatahub调用Datahub SDK




2.2.1 导入pydatahub库包

import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType




2.2.2 连接Datahub

access_key_id = "阿里云accesskey id" # 阿里云accesskey id
access_key_secret = "阿里云accesskey secret" # 阿里云accesskey secret
end_point = "endpoint地址" # endpoint地址

dh = DataHub(access_key_id, access_key_secret, end_point)

注意: 此处的endpoint对应的是相应阿里云产品的endpoint互联网地址。详见

endpoint管理和介绍


对应此处,endpoint则是Datahub控制台中的互联网ID ,查看路径:

Datahub数据总线控制台 > 概览 > 常用信息 > 外网(互联网)




2.2.3 创建新的project

project_name = 'test_py_sdk'
comment = 'test creating project with python sdk!'
try:
    dh.create_project(project_name, comment)
    print("create project '{}' success!".format(project_name))
except Exception as e:
    print(repr(e))




查看已有的project

dh.list_project()




2.2.4 创建新的Topic




2.2.4.1 Tuple类型的Topice创建

topic_name = "test_py_sdk_topic_tuple"
shard = 1
life_cycle = 3
record_schema = RecordSchema.from_lists(
    ['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'],
    [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])
topic_comment = "test tuple topic"

try:
    # 调用创建命令
    dh.create_tuple_topic(project_name, topic_name, shard, life_cycle, record_schema, topic_comment)
    print("create tuple topic '{}' success!".format(topic_name))
except Exception as e:
    print(repr(e))

注意:Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前阿里云的python SDK仅仅支持以下几种类型(Java SDK支持更多的, 可能是觉得python不配?):

类型 含义 值域
Bigint 8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。 -9223372036854775807 ~ 9223372036854775807
String 字符串,只支持UTF-8编码。 单个String列最长允许1MB。
Boolean 布尔型。 可以表示为True/False,true/false, 0/1
Double 8字节双精度浮点数。 -1.0 10308 ~ 1.0 10308
TimeStamp 时间戳类型 表示到微秒的时间戳类型


测试的话,此处shard设置为1就好了,毕竟是收钱的,按个收费。生产使用也要尽量评估shard的使用个数。


以下是关于datahub 的限制描述

(可根据此表评估shard使用数量)

限制项 描述 值域范围
活跃shard数 每个topic中活跃shard数量限制 (0,256]
总shard数 每个topic中总shard数量限制 (0,512]
Http BodySize http请求中body大小限制 4MB
单个String长度 数据中单个String字段长度限制 2MB
Merge/Split频率限制 每个新产生的shard在一定时间内不允许进行Merge/Split操作 5s
QPS限制 每个Shard写入QPS限制(非Record/s,Batch写入同一Shard仅计算为1次) 2000
Throughput限制 每个Shard写入每秒吞吐限制 5MB/s
Project限制 每个云账号能够创建的Project上限 50
Topic限制 每个Project内能创建的Topic数量限制,如有特殊请求请联系管理员 500
Topic Lifecycle限制 每个Topic中数据保存的最大时长,单位是天 [1,7]




2.2.4.2 BLOB类型的Topice创建

topic_name = "test_py_sdk_topic_bolb"
shard_count = 3
life_cycle = 7
topic_comment = "creating blob topic by using python sdk"

try:
    # 调用创建命令
    dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, topic_comment)
    print("create blob topic '{}' success!".format(topic_name))
except Exception as e:
    print(repr(e))




2.2.5 获取shard列表

shard_result = dh.list_shard(project_name, topic_name)
shards = shard_result.shards
print(len(shards))
print(shards)

返回结果是一个ListShardResult对象,包含一个Shard对象的list,list中的每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息




2.2.6 发布数据

发布数据前需要等候shard不在传输状态。




2.2.6.1 发布TUPLE数据

topic_name = "test_py_sdk_topic_tuple"
project_name = "test_py_sdk"
# 等候shards准备好
dh.wait_shards_ready(project_name, topic_name) 
# 获取topic
topic_result = dh.get_topic(project_name, topic_name)
# 查看record的类型 
print(topic_result.record_type)
# 查看此topic的schema信息,发布到topic的数据需要类型一致、字段类型(schema)一致
print(topic_result.record_schema)
# 获取topic的shema信息
record_schema =  topic_result.record_schema

写一个生成测试tuple 数据的函数:

def gen_tuple_data(num):
    """
    生成10的num次方条tuple数据
    """
    res = []
    for i in range(10**num):
        data = [i, "yc"+str(i),i/10.01, True,1455869335000000]
        res.append(data)
    return res

发布TUPLE数据的函数:

def publish_tuple_data(dh, project_name, topic_name, records):
    topic_result = dh.get_topic(project_name, topic_name)
    record_schema = topic_result.record_schema
    tuple_recs = []
    for rec in records:
        tuple_rec = TupleRecord(schema=record_schema, values=rec)
        tuple_recs.append(tuple_rec)
    return dh.put_records(project_name, topic_name, tuple_recs)

发布十条数据到tuple类型的topic

tuple_data = gen_tuple_data(1)
publish_result = publish_tuple_data(dh, project_name, "test_py_sdk_topic", tuple_data)

发布一万条数据到tuple类型的topic

tuple_data = gen_tuple_data(4)
publish_result = publish_tuple_data(dh, project_name, "test_py_sdk_topic", tuple_data)



若是学习使用的话,此步慎重!!不然明天收到缴费单。



另,同一个shard一次不要传入大于4MB的数据,不然会返回413错误,当数据量过大时,控制好数据数量,做一下切片。

发布TUPLE数据的其他操作:

records0 = []
# 方法一
record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
# 设置shard
record0.shard_id = '0'
record0.put_attribute('AK', '47')
records0.append(record0)
# 方法二
record1 = TupleRecord(schema=record_schema)
record1.set_value('bigint_field', 2)
record1.set_value('string_field', 'yc2')
record1.set_value('double_field', None)
record1.set_value('bool_field', False)
record1.set_value('time_field', 1455869335000011)
record1.hash_key = '4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
records0.append(record1)
# 方法三
record2 = TupleRecord(schema=record_schema)
record2.set_value(0, 3)
record2.set_value(1, 'yc3')
record2.set_value(2,  1.1)
record2.set_value(3, False)
record2.set_value(4, 1455869335000011)
record2.attributes = {'key': 'value'}
record2.partition_key = 'TestPartitionKey'
records0.append(record2)
# 发布数据
put_result = dh.put_records(project_name, topic_name, records0)
# 发布结果
print(put_result)




2.2.6.2 发布BLOB数据

topic_name = 'test_py_sdk_topic_bolb'
project_name = "test_py_sdk"
records1 = []
record3 = BlobRecord(blob_data='data')
record3.shard_id = '0'
record3.put_attribute('a', 'b')
records1.append(record3)
put_result = dh.put_records(project_name, topic_name, records1)
put_result

看到这里点个赞吧,谢谢:)



版权声明:本文为q1042960848原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。