**
亚马逊的对象存储boto3访问应用实例
**
【关于boto3】
Boto3是亚马逊AWS提供的python SDK,最为常用的功能是S3对象存储的访问。作为标准的S3 SDK,除了访问AWS,也可以访问其他兼容S3 API的云存储厂商。
Boto3的项目地址为:https://github.com/boto/boto3.git
Boto3的AWS doc地址为:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#service-resource
需要指出的是,AWS的文档说明较为混乱,现在给大家提供一个简单的使用说明和范例。
【准备】
首先需要安装python,建议使用Python 3.6及以后的版本。
通过pip安装boto3的包即可:
pip install boto3
在你的python脚本直接引用boto3即可:
import boto3
正式使用boto3之前,你还需要具备云存储厂商的access key和secret_key。先给出一对土星云的测试key,有需要的可以去土星云存储官网注册获取。
param_endpoint=‘https://s3.local-north-1.saturncloud.com.cn:6666’
param_access_key=‘3ABC8E3ABC387442B3936F9426B11C1B’
param_secret_key=‘0428D37B0FDC14805AF0153969C272F3’
【创建client】
先初始化一个client对象,我们可以定义一个方法如下:
def setup_client(endpoint, access_key_id, secret_access_key):
return boto3.client(
‘s3’,
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
use_ssl=True,
region_name=‘cn’,
endpoint_url=endpoint,
config=Config(s3={“addressing_style”: “virtual”, “signature_version”: ‘s3v4’}))
再定义一个判断http响应状态的方法:
def is_result_ok(return_obj):
ifcode = return_obj[‘ResponseMetadata’][‘HTTPStatusCode’]
if ifcode == 200 or ifcode == 204:
return True
else:
return False
然后在main函数中调用创建client的方法:
if
name
== ‘
main
’:
print(‘Start SCS testing at ‘+datetime.datetime.now().strftime(’%Y-%m-%d %H:%M:%S’))
s3=setup_client(param_endpoint, param_access_key, param_secret_key)
【创建存储桶】
以当前时间为名创建一个存储桶:
bucket_id=datetime.datetime.now().strftime(‘test-%Y%m%d-%H%M%S’)
result=cre_bucket(s3, bucket_id)
if is_result_ok(result):
print('bucket ['+bucket_id+'] created ok.')
else:
print('bucket ['+bucket_id+'] created fail.')
print(result)
sys.exit(1)
【上传文件】
定义一个上传文件的方法如下:
def put_obj(s3, bucket_id, filename, local_dir):
return s3.put_object(Bucket=bucket_id, Body=open(local_dir+’/’+filename, ‘rb’), Key=filename)
选择一个本地文件上传到刚刚创建的存储桶
param_test_local_dir=’/tmp/’
filename=‘test.txt’
result=put_obj(s3, bucket_id, filename, param_test_local_dir)
if is_result_ok(result):
print(‘put_obj [’+filename+’] ok.’)
else:
print(‘put_obj [’+filename+’] fail.’)
print(result)
return
【下载文件】
定义一个下载文件的方法:
def get_obj(s3, bucket_id, filename):
return s3.get_object(Bucket=bucket_id, Key=filename)
我们再把刚才上传的文件下载下来
result=get_obj(s3, bucket_id, filename)
if is_result_ok(result):
print('get_obj ['+filename+'] ok.')
else:
print('get_obj ['+filename+'] fail.')
print(result)
return
【删除文件】
首先定义一个删除S3存储桶文件的方法
def del_obj(s3, bucket_id, filename):
return s3.delete_object(Bucket=bucket_id, Key=filename)
然后调用此方法即可删除刚刚上传到云端的文件:
result=del_obj(s3, bucket_id, filename)
if is_result_ok(result):
print('del_obj ['+filename+'] ok.')
else:
print('del_obj ['+filename+'] fail.')
print(result)
return
【结语】
上面已经给出了一个简单的使用范例,更多的接口和使用方法,可以查看AWS的文档和云存储厂商的文档。
【附 完整代码】
############################################################################################################
‘’’
Random test script for saturn cloud systems.
Usage:
- Change test parameters through modifying param_XXXX in test configurations.
- Run ‘python scs_rand_test.py’
- For better performance, please set param_test_local_dir to a ramdisk path, such as ‘/run’ in Linux.
Requirement:
- Python 3.6 or higher version.
- boto3, use ‘pip install boto3 –user’ to install.
by sam 2021-10-06
‘’’
############################################################################################################
import time
import datetime
import os
import sys
import subprocess
import random
import numpy as np
import threading
from queue import Queue
import boto3
from botocore.config import Config
############################################################################################################
‘’’
Test configurations
‘’’
############################################################################################################
param_endpoint=‘https://s3.local-north-1.saturncloud.com.cn:6666’
param_access_key=‘3ABC8E3ABC387442B3936F9426B11C1B’
param_secret_key=‘0428D37B0FDC14805AF0153969C272F3’
param_test_local_dir=’/tmp/’
param_test_max_blocks=33
param_block_size=256*1024
############################################################################################################
‘’’
Common used API fuctions
‘’’
############################################################################################################
def setup_client(endpoint, access_key_id, secret_access_key):
return boto3.client(
‘s3’,
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
use_ssl=True,
region_name=‘cn’,
endpoint_url=endpoint,
config=Config(s3={“addressing_style”: “virtual”, “signature_version”: ‘s3v4’}))
def cre_bucket(s3, bucket_id):
return s3.create_bucket(Bucket=bucket_id,)
def del_bucket(s3, bucket_id):
return s3.delete_bucket(Bucket=bucket_id,)
def put_obj(s3, bucket_id, filename, local_dir):
return s3.put_object(Bucket=bucket_id, Body=open(local_dir+’/’+filename, ‘rb’), Key=filename)
def get_obj(s3, bucket_id, filename):
return s3.get_object(Bucket=bucket_id, Key=filename)
def del_obj(s3, bucket_id, filename):
return s3.delete_object(Bucket=bucket_id, Key=filename)
def is_result_ok(return_obj):
ifcode = return_obj[‘ResponseMetadata’][‘HTTPStatusCode’]
if ifcode == 200 or ifcode == 204:
return True
else:
return False
############################################################################################################
‘’’
Test applications
‘’’
############################################################################################################
def create_dummy_file(file_path):
blocks=random.randrange(0, param_test_max_blocks, 1)
last_bytes=random.randrange(1, param_block_size+1, 1)
total_size=blocks*param_block_size+last_bytes
file=open(file_path, mode='wb')
file.write(np.random.bytes(total_size))
file.close()
def test_task(bucket_id):
s3=setup_client(param_endpoint, param_access_key, param_secret_key)
filename='put_obj'
create_dummy_file(param_test_local_dir+'/'+filename)
result=put_obj(s3, bucket_id, filename, param_test_local_dir)
if is_result_ok(result):
print('put_obj ['+filename+'] ok.')
else:
print('put_obj ['+filename+'] fail.')
print(result)
return
result=get_obj(s3, bucket_id, filename)
if is_result_ok(result):
print('get_obj ['+filename+'] ok.')
else:
print('get_obj ['+filename+'] fail.')
print(result)
return
filename_new='get_obj'
file=open(param_test_local_dir+'/'+filename_new, mode='wb')
file.write(result['Body'].read())
file.close()
cmd_string='cmp '+param_test_local_dir+'/'+filename+' '+param_test_local_dir+'/'+filename_new
cmd_result=subprocess.getoutput(cmd_string)
if len(cmd_result)>1:
print('content error at ['+filename+'] and ['+filename_new+']')
return
result=del_obj(s3, bucket_id, filename)
if is_result_ok(result):
print('del_obj ['+filename+'] ok.')
else:
print('del_obj ['+filename+'] fail.')
print(result)
return
cmd_string='rm '+param_test_local_dir+'/'+filename+' '+param_test_local_dir+'/'+filename_new
os.system(cmd_string)
############################################################################################################
‘’’
Script entrance
‘’’
############################################################################################################
if
name
== ‘
main
’:
print(‘Start SCS random testing at ‘+datetime.datetime.now().strftime(’%Y-%m-%d %H:%M:%S’))
s3=setup_client(param_endpoint, param_access_key, param_secret_key)
bucket_id=datetime.datetime.now().strftime('test-%Y%m%d-%H%M%S')
result=cre_bucket(s3, bucket_id)
if is_result_ok(result):
print('bucket ['+bucket_id+'] created ok.')
else:
print('bucket ['+bucket_id+'] created fail.')
print(result)
sys.exit(1)
pass
test_task(bucket_id)
result=del_bucket(s3, bucket_id)
if is_result_ok(result):
print('bucket ['+bucket_id+'] deleted ok.')
else:
print('bucket ['+bucket_id+'] deleted fail.')
print(result)
sys.exit(1)
pass
print('End SCS random testing at '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))