1. 阿里云 python sdk的安装和使用
参考:
阿里云python sdk的使用: https://help.aliyun.com/document_detail/67117.html?spm=a2c4g.11186623.6.544.4b3042c5OiDjys
阿里云region id 地域和可用区:https://help.aliyun.com/document_detail/40654.html?spm=5176.10695662.1996646101.1.46b36dd0Sy8kEd
1.1 PIP安装:
pip install aliyun-python-sdk-core # 安装阿里云SDK核心库
pip install aliyun-python-sdk-ecs # 安装管理ECS的库
1.2 使用AccessKey简单调用ECS
-
导入相关产品的SDK (此处为ECS )
from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkecs.request.v20140526 import DescribeInstancesRequest from aliyunsdkecs.request.v20140526 import StopInstanceRequest
-
新建一个AcsClient
client = AcsClient( "<your-access-key-id>", "<your-access-key-secret>", "<your-region-id>" );
注意:Region ID 为你购买相应产品时选择的区域ID, 详见
Region ID文档
。国内Region ID 表:
地域名称 所在城市 Region ID 可用区数量 华北 1 青岛 cn-qingdao 2 华北 2 北京 cn-beijing 8 华北 3 张家口 cn-zhangjiakou 3 华北 5 呼和浩特 cn-huhehaote 2 华北 6 乌兰察布 cn-wulanchabu 2 华东 1 杭州 cn-hangzhou 8 华东 2 上海 cn-shanghai 7 华南 1 深圳 cn-shenzhen 5 华南 2 河源 cn-heyuan 2 西南 1 成都 cn-chengdu 2 中国香港 香港 cn-hongkong 2 -
创建Request对象。
request = DescribeInstancesRequest.DescribeInstancesRequest() request.set_PageSize(10)
-
发起调用并处理返回。
try: response = client.do_action_with_exception(request) print(response) except ServerException as e: print(repr(e)) except ClientException as e: print(repr(e))
2. Datahub Python SDK的安装和使用
参考:
Datahub名词解释及官方文档: https://help.aliyun.com/document_detail/158776.html?spm=a2c4g.11186623.6.545.3af9329a3eUfXg
Python库pydatahub安装指南:https://pydatahub.readthedocs.io/zh_CN/latest/installation.html
pydatahub在github源码及Demo: https://github.com/aliyun/aliyun-datahub-sdk-python/tree/master/examples
2.1 安装pydatahub
pip install pydatahub
2.2 使用pydatahub调用Datahub SDK
2.2.1 导入pydatahub库包
import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType
2.2.2 连接Datahub
access_key_id = "阿里云accesskey id" # 阿里云accesskey id
access_key_secret = "阿里云accesskey secret" # 阿里云accesskey secret
end_point = "endpoint地址" # endpoint地址
dh = DataHub(access_key_id, access_key_secret, end_point)
注意: 此处的endpoint对应的是相应阿里云产品的endpoint互联网地址。详见
endpoint管理和介绍
对应此处,endpoint则是Datahub控制台中的互联网ID ,查看路径:
Datahub数据总线控制台 > 概览 > 常用信息 > 外网(互联网)
2.2.3 创建新的project
project_name = 'test_py_sdk'
comment = 'test creating project with python sdk!'
try:
dh.create_project(project_name, comment)
print("create project '{}' success!".format(project_name))
except Exception as e:
print(repr(e))
查看已有的project
dh.list_project()
2.2.4 创建新的Topic
2.2.4.1 Tuple类型的Topice创建
topic_name = "test_py_sdk_topic_tuple"
shard = 1
life_cycle = 3
record_schema = RecordSchema.from_lists(
['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'],
[FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])
topic_comment = "test tuple topic"
try:
# 调用创建命令
dh.create_tuple_topic(project_name, topic_name, shard, life_cycle, record_schema, topic_comment)
print("create tuple topic '{}' success!".format(topic_name))
except Exception as e:
print(repr(e))
注意:Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前阿里云的python SDK仅仅支持以下几种类型(Java SDK支持更多的, 可能是觉得python不配?):
类型 | 含义 | 值域 |
---|---|---|
Bigint | 8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。 | -9223372036854775807 ~ 9223372036854775807 |
String | 字符串,只支持UTF-8编码。 | 单个String列最长允许1MB。 |
Boolean | 布尔型。 | 可以表示为True/False,true/false, 0/1 |
Double | 8字节双精度浮点数。 | -1.0 10308 ~ 1.0 10308 |
TimeStamp | 时间戳类型 | 表示到微秒的时间戳类型 |
测试的话,此处shard设置为1就好了,毕竟是收钱的,按个收费。生产使用也要尽量评估shard的使用个数。
以下是关于datahub 的限制描述
(可根据此表评估shard使用数量)
限制项 | 描述 | 值域范围 |
---|---|---|
活跃shard数 | 每个topic中活跃shard数量限制 | (0,256] |
总shard数 | 每个topic中总shard数量限制 | (0,512] |
Http BodySize | http请求中body大小限制 | 4MB |
单个String长度 | 数据中单个String字段长度限制 | 2MB |
Merge/Split频率限制 | 每个新产生的shard在一定时间内不允许进行Merge/Split操作 | 5s |
QPS限制 | 每个Shard写入QPS限制(非Record/s,Batch写入同一Shard仅计算为1次) | 2000 |
Throughput限制 | 每个Shard写入每秒吞吐限制 | 5MB/s |
Project限制 | 每个云账号能够创建的Project上限 | 50 |
Topic限制 | 每个Project内能创建的Topic数量限制,如有特殊请求请联系管理员 | 500 |
Topic Lifecycle限制 | 每个Topic中数据保存的最大时长,单位是天 | [1,7] |
2.2.4.2 BLOB类型的Topice创建
topic_name = "test_py_sdk_topic_bolb"
shard_count = 3
life_cycle = 7
topic_comment = "creating blob topic by using python sdk"
try:
# 调用创建命令
dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, topic_comment)
print("create blob topic '{}' success!".format(topic_name))
except Exception as e:
print(repr(e))
2.2.5 获取shard列表
shard_result = dh.list_shard(project_name, topic_name)
shards = shard_result.shards
print(len(shards))
print(shards)
返回结果是一个ListShardResult对象,包含一个Shard对象的list,list中的每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息
2.2.6 发布数据
发布数据前需要等候shard不在传输状态。
2.2.6.1 发布TUPLE数据
topic_name = "test_py_sdk_topic_tuple"
project_name = "test_py_sdk"
# 等候shards准备好
dh.wait_shards_ready(project_name, topic_name)
# 获取topic
topic_result = dh.get_topic(project_name, topic_name)
# 查看record的类型
print(topic_result.record_type)
# 查看此topic的schema信息,发布到topic的数据需要类型一致、字段类型(schema)一致
print(topic_result.record_schema)
# 获取topic的shema信息
record_schema = topic_result.record_schema
写一个生成测试tuple 数据的函数:
def gen_tuple_data(num):
"""
生成10的num次方条tuple数据
"""
res = []
for i in range(10**num):
data = [i, "yc"+str(i),i/10.01, True,1455869335000000]
res.append(data)
return res
发布TUPLE数据的函数:
def publish_tuple_data(dh, project_name, topic_name, records):
topic_result = dh.get_topic(project_name, topic_name)
record_schema = topic_result.record_schema
tuple_recs = []
for rec in records:
tuple_rec = TupleRecord(schema=record_schema, values=rec)
tuple_recs.append(tuple_rec)
return dh.put_records(project_name, topic_name, tuple_recs)
发布十条数据到tuple类型的topic
tuple_data = gen_tuple_data(1)
publish_result = publish_tuple_data(dh, project_name, "test_py_sdk_topic", tuple_data)
发布一万条数据到tuple类型的topic
tuple_data = gen_tuple_data(4)
publish_result = publish_tuple_data(dh, project_name, "test_py_sdk_topic", tuple_data)
若是学习使用的话,此步慎重!!不然明天收到缴费单。
另,同一个shard一次不要传入大于4MB的数据,不然会返回413错误,当数据量过大时,控制好数据数量,做一下切片。
发布TUPLE数据的其他操作:
records0 = []
# 方法一
record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
# 设置shard
record0.shard_id = '0'
record0.put_attribute('AK', '47')
records0.append(record0)
# 方法二
record1 = TupleRecord(schema=record_schema)
record1.set_value('bigint_field', 2)
record1.set_value('string_field', 'yc2')
record1.set_value('double_field', None)
record1.set_value('bool_field', False)
record1.set_value('time_field', 1455869335000011)
record1.hash_key = '4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
records0.append(record1)
# 方法三
record2 = TupleRecord(schema=record_schema)
record2.set_value(0, 3)
record2.set_value(1, 'yc3')
record2.set_value(2, 1.1)
record2.set_value(3, False)
record2.set_value(4, 1455869335000011)
record2.attributes = {'key': 'value'}
record2.partition_key = 'TestPartitionKey'
records0.append(record2)
# 发布数据
put_result = dh.put_records(project_name, topic_name, records0)
# 发布结果
print(put_result)
2.2.6.2 发布BLOB数据
topic_name = 'test_py_sdk_topic_bolb'
project_name = "test_py_sdk"
records1 = []
record3 = BlobRecord(blob_data='data')
record3.shard_id = '0'
record3.put_attribute('a', 'b')
records1.append(record3)
put_result = dh.put_records(project_name, topic_name, records1)
put_result
看到这里点个赞吧,谢谢:)