Python SDK for S3 API
AWS S3で提供するPython SDKを用いて、NAVERクラウドプラットフォームObject Storageを使用する方法を説明します。
このテキストは AWS Python SDK 1.6.19バージョンに基づいて作成されました。
インストール
pip install boto3==1.6.19
AWS Python SDK
- ソース : https://github.com/boto/boto3
- テキスト : https://boto3.readthedocs.io/en/latest/reference/services/s3.html
例
例で使用するaccess_key、secret_keyは登録したAPI認証キー情報で入力しなければなりません。
バケットの作成
import boto3
service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'
if __name__ == "__main__":
s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
bucket_name = 'sample-bucket'
s3.create_bucket(Bucket=bucket_name)
バケットリストの照会
import boto3
service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'
if __name__ == "__main__":
s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
response = s3.list_buckets()
for bucket in response.get('Buckets', []):
print bucket.get('Name')
バケットの削除
import boto3
service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'
if __name__ == "__main__":
s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
bucket_name = 'sample-bucket'
s3.delete_bucket(Bucket=bucket_name)
オブジェクトのアップロード
import boto3
service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'
if __name__ == "__main__":
s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
bucket_name = 'sample-bucket'
# create folder
object_name = 'sample-folder/'
s3.put_object(Bucket=bucket_name, Key=object_name)
# upload file
object_name = 'sample-object'
local_file_path = '/tmp/test.txt'
s3.upload_file(local_file_path, bucket_name, object_name)
オブジェクトリストの照会
import boto3
service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'
if __name__ == "__main__":
s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
bucket_name = 'sample-bucket'
# list all in the bucket
max_keys = 300
response = s3.list_objects(Bucket=bucket_name, MaxKeys=max_keys)
print('list all in the bucket')
while True:
print('IsTruncated=%r' % response.get('IsTruncated'))
print('Marker=%s' % response.get('Marker'))
print('NextMarker=%s' % response.get('NextMarker'))
print('Object List')
for content in response.get('Contents'):
print(' Name=%s, Size=%d, Owner=%s' % \
(content.get('Key'), content.get('Size'), content.get('Owner').get('ID')))
if response.get('IsTruncated'):
response = s3.list_objects(Bucket=bucket_name, MaxKeys=max_keys,
Marker=response.get('NextMarker'))
else:
break
# top level folders and files in the bucket
delimiter = '/'
max_keys = 300
response = s3.list_objects(Bucket=bucket_name, Delimiter=delimiter, MaxKeys=max_keys)
print('top level folders and files in the bucket')
while True:
print('IsTruncated=%r' % response.get('IsTruncated'))
print('Marker=%s' % response.get('Marker'))
print('NextMarker=%s' % response.get('NextMarker'))
print('Folder List')
for folder in response.get('CommonPrefixes'):
print(' Name=%s' % folder.get('Prefix'))
print('File List')
for content in response.get('Contents'):
print(' Name=%s, Size=%d, Owner=%s' % \
(content.get('Key'), content.get('Size'), content.get('Owner').get('ID')))
if response.get('IsTruncated'):
response = s3.list_objects(Bucket=bucket_name, Delimiter=delimiter, MaxKeys=max_keys,
Marker=response.get('NextMarker'))
else:
break
オブジェクトのダウンロード
import boto3
service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'
if __name__ == "__main__":
s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
bucket_name = 'sample-bucket'
object_name = 'sample-object'
local_file_path = '/tmp/test.txt'
s3.download_file(bucket_name, object_name, local_file_path)
オブジェクトの削除
import boto3
service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'
if __name__ == "__main__":
s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
bucket_name = 'sample-bucket'
object_name = 'sample-object'
s3.delete_object(Bucket=bucket_name, Key=object_name)
ACLの設定
import boto3
service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'
if __name__ == "__main__":
s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
bucket_name = 'sample-bucket'
# set bucket ACL
# add read permission to anonymous
s3.put_bucket_acl(Bucket=bucket_name, ACL='public-read')
response = s3.get_bucket_acl(Bucket=bucket_name)
# set object ACL
# add read permission to user by ID
object_name = 'sample-object'
owner_id = 'test-owner-id'
target_id = 'test-user-id'
s3.put_object_acl(Bucket=bucket_name, Key=object_name,
AccessControlPolicy={
'Grants': [
{
'Grantee': {
'ID': owner_id,
'Type': 'CanonicalUser'
},
'Permission': 'FULL_CONTROL'
},
{
'Grantee': {
'ID': target_id,
'Type': 'CanonicalUser'
},
'Permission': 'READ'
}
],
'Owner': {
'ID': owner_id
}
})
response = s3.get_object_acl(Bucket=bucket_name, Key=object_name)
Multipart Upload
import boto3
service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY_ID'
secret_key = 'SECRET_KEY'
if __name__ == "__main__":
s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
bucket_name = 'sample-bucket'
object_name = 'sample-large-object'
local_file = '/tmp/sample.file'
# initialize and get upload ID
create_multipart_upload_response = s3.create_multipart_upload(Bucket=bucket_name, Key=object_name)
upload_id = create_multipart_upload_response['UploadId']
part_size = 10 * 1024 * 1024
parts = []
# upload parts
with open(local_file, 'rb') as f:
part_number = 1
while True:
data = f.read(part_size)
if not len(data):
break
upload_part_response = s3.upload_part(Bucket=bucket_name, Key=object_name, PartNumber=part_number, UploadId=upload_id, Body=data)
parts.append({
'PartNumber': part_number,
'ETag': upload_part_response['ETag']
})
part_number += 1
multipart_upload = {'Parts': parts}
# abort multipart upload
# s3.abort_multipart_upload(Bucket=bucket_name, Key=object_name, UploadId=upload_id)
# complete multipart upload
s3.complete_multipart_upload(Bucket=bucket_name, Key=object_name, UploadId=upload_id, MultipartUpload=multipart_upload)