阿里云OSS对象存储接入


Python接入阿里云OSS对象存储基本内容,暂时只定了Windows和Linux下的接入处理,更多内容请参考阿里云OSS接入官方文档。

环境

支持环境

OSS Python SDK适用于Python 2.6、2.7、3.3、3.4、3.5、3.6、3.7、3.8及以上版本

ps: windows系统下安装Python SDK时,需确保Visual C++版本为15.0或以上

当前使用环境

名称 | 版本 | 备注
— | —
python | 3.11.1
pip | 24.0
python-devel | 3.11 | CentOS需要安装,Windows不需要

OSS Python SDK

SDK下载

通过GitHub下载
历史版本下载

SDK安装

pip

pip3 install oss2

源码

  1. 下载最新版本的OSS Python SDK,解压后进入目录,确认目录下有setup.py文件。
  2. 执行以下命令安装OSS Python SDK。
python3 setup.py install

验证SDK

  1. 执行以下命进入Python环境。
python3
  1. 执行以下命令查看OSS Python SDK版本。
import oss2
oss2.__version__
  1. 成功返回示例
'2.18.3'

上传文件

简单上传

简单上传默认会覆盖同名Object

# -*- coding: utf-8 -*-
import oss2
import os
from oss2.credentials import EnvironmentVariableCredentialsProvider

# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
# yourEndpoint填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
# 填写Bucket名称。
bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')

# 必须以二进制的方式打开文件。
# 填写本地文件的完整路径。如果未指定本地路径,则默认从示例程序所属项目对应本地路径中上传文件。
with open('D:\\localpath\\examplefile.txt', 'rb') as fileobj:
    # Seek方法用于指定从第1000个字节位置开始读写。上传时会从您指定的第1000个字节位置开始上传,直到文件结束。
    fileobj.seek(1000, os.SEEK_SET)
    # Tell方法用于返回当前位置。
    current = fileobj.tell()
    # 填写Object完整路径。Object完整路径中不能包含Bucket名称。
    bucket.put_object('exampleobject.txt', fileobj)

表单上传

通过表单上传的方式上传的Object大小不能超过5 GB。

import os
from hashlib import sha1 as sha
import json
import base64
import hmac
import datetime
import time

# 配置环境变量OSS_ACCESS_KEY_ID。
access_key_id = os.environ.get('OSS_ACCESS_KEY_ID')
# 配置环境变量OSS_ACCESS_KEY_SECRET。
access_key_secret = os.environ.get('OSS_ACCESS_KEY_SECRET')
# 将<YOUR_BUCKET>替换为Bucket名称。
bucket = '<YOUR_BUCKET>'
# host的格式为bucketname.endpoint。将<YOUR_BUCKET>替换为Bucket名称。将<YOUR_ENDPOINT>替换为OSS Endpoint,例如oss-cn-hangzhou.aliyuncs.com。
host = 'https://<YOUR_BUCKET>.<YOUR_ENDPOINT>'
# 指定上传到OSS的文件前缀。
upload_dir = 'user-dir-prefix/'
# 指定过期时间,单位为秒。
expire_time = 3600


def generate_expiration(seconds):
    """
    通过指定有效的时长(秒)生成过期时间。
    :param seconds: 有效时长(秒)。
    :return: ISO8601 时间字符串,如:"2014-12-01T12:00:00.000Z"。
    """
    now = int(time.time())
    expiration_time = now + seconds
    gmt = datetime.datetime.utcfromtimestamp(expiration_time).isoformat()
    gmt += 'Z'
    return gmt


def generate_signature(access_key_secret, expiration, conditions, policy_extra_props=None):
    """
    生成签名字符串Signature。
    :param access_key_secret: 有权限访问目标Bucket的AccessKeySecret。
    :param expiration: 签名过期时间,按照ISO8601标准表示,并需要使用UTC时间,格式为yyyy-MM-ddTHH:mm:ssZ。示例值:"2014-12-01T12:00:00.000Z"。
    :param conditions: 策略条件,用于限制上传表单时允许设置的值。
    :param policy_extra_props: 额外的policy参数,后续如果policy新增参数支持,可以在通过dict传入额外的参数。
    :return: signature,签名字符串。
    """
    policy_dict = {
        'expiration': expiration,
        'conditions': conditions
    }
    if policy_extra_props is not None:
        policy_dict.update(policy_extra_props)
    policy = json.dumps(policy_dict).strip()
    policy_encode = base64.b64encode(policy.encode())
    h = hmac.new(access_key_secret.encode(), policy_encode, sha)
    sign_result = base64.b64encode(h.digest()).strip()
    return sign_result.decode()

def generate_upload_params():
    policy = {
        # 有效期。
        "expiration": generate_expiration(expire_time),
        # 约束条件。
        "conditions": [
            # 未指定success_action_redirect时,上传成功后的返回状态码,默认为 204。
            ["eq", "$success_action_status", "200"],
            # 表单域的值必须以指定前缀开始。例如指定key的值以user/user1开始,则可以写为["starts-with", "$key", "user/user1"]。
            ["starts-with", "$key", upload_dir],
            # 限制上传Object的最小和最大允许大小,单位为字节。
            ["content-length-range", 1, 1000000],
            # 限制上传的文件为指定的图片类型
            ["in", "$content-type", ["image/jpg", "image/png"]]
        ]
    }
    signature = generate_signature(access_key_secret, policy.get('expiration'), policy.get('conditions'))
    response = {
        'policy': base64.b64encode(json.dumps(policy).encode('utf-8')).decode(),
        'ossAccessKeyId': access_key_id,
        'signature': signature,
        'host': host,
        'dir': upload_dir
        # 可以在这里再自行追加其他参数
    }
    return json.dumps(response)

上传回调

  • 目前仅简单上传(PutObject)、表单上传(PostObject)、完成分片上传(CompleteMultipartUpload)操作支持使用上传回调。
  • 上传回调旨在实现文件上传至OSS后,自动触发通知至业务服务器,以执行数据更新或发送通知等后续步骤。回调成功或失败不影响文件上传并保存至OSS。
# -*- coding: utf-8 -*-
import json
import base64
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider

# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
# yourEndpoint填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
# 填写Bucket名称。
bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')

# 定义回调参数Base64编码函数。
def encode_callback(callback_params):
    cb_str = json.dumps(callback_params).strip()
    return oss2.compat.to_string(base64.b64encode(oss2.compat.to_bytes(cb_str)))

# 设置上传回调参数。
callback_params = {}
# 设置回调请求的服务器地址,例如http://oss-demo.aliyuncs.com:23450。
callback_params['callbackUrl'] = 'http://oss-demo.aliyuncs.com:23450'
#(可选)设置回调请求消息头中Host的值,即您的服务器配置Host的值。
#callback_params['callbackHost'] = 'yourCallbackHost'
# 设置发起回调时请求body的值。
callback_params['callbackBody'] = 'bucket=${bucket}&object=${object}'
# 设置发起回调请求的Content-Type。
callback_params['callbackBodyType'] = 'application/x-www-form-urlencoded'
encoded_callback = encode_callback(callback_params)
# 设置发起回调请求的自定义参数,由Key和Value组成,Key必须以x:开始。
callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
encoded_callback_var = encode_callback(callback_var_params)

# 上传回调。
params = {'x-oss-callback': encoded_callback, 'x-oss-callback-var': encoded_callback_var}
# 填写Object完整路径和字符串。Object完整路径中不能包含Bucket名称。
result = bucket.put_object('examplefiles/exampleobject.txt', 'a'*1024*1024, params)

上传进度条

# -*- coding: utf-8 -*-
from __future__ import print_function
import os, sys
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider
# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
# 填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
# yourBucketName填写存储空间名称。
bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'yourBucketName')
# consumed_bytes表示已上传的数据量。
# total_bytes表示待上传的总数据量。当无法确定待上传的数据长度时,total_bytes的值为None。
def percentage(consumed_bytes, total_bytes):
    if total_bytes:
        rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
        print('\r{0}% '.format(rate), end='')
        sys.stdout.flush()
# progress_callback为可选参数,用于实现进度条功能。
bucket.put_object('yourObjectName', 'a'*1024*1024, progress_callback=percentage)

推流上传

  1. 获取推流地址
# -*- coding: utf-8 -*-
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider

# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
# 填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
# 填写存储空间名称,例如examplebucket。
bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
# 填写LiveChannel名称,例如test-channel。
channel_name = "test-channel"
channel_cfg = oss2.models.LiveChannelInfo(target = oss2.models.LiveChannelInfoTarget())
channel = bucket.create_live_channel(channel_name, channel_cfg)
publish_url = channel.publish_url
# 生成RTMP推流的签名URL,并设置过期时间为3600秒。
signed_publish_url = bucket.sign_rtmp_url(channel_name, "playlist.m3u8", 3600)
# 打印未签名推流地址。
print('publish_url='+publish_url)
# 打印签名推流地址。
print('signed_publish_url='+signed_publish_url)
  1. 使用推流地址向OSS推送音视频数据。(ffmpeg示例)
ffmpeg -i 1.flv -c copy -f flv "rtmp://examplebucket.oss-cn-hangzhou.aliyuncs.com/live/test-channel?playlistName=playlist.m3u8&OSSAccessKeyId=LTAI********&Expires=1688543369&Signature=eqK8z0ZTSwznP7fkELy0ckt0Iv***"

下载文件

简单下载

简单下载指的是使用OSS API的GetObject接口,下载已上传的文件(Object),适用于一次HTTP请求交互即可完成下载的场景。

# -*- coding: utf-8 -*-
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider

# 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录RAM控制台创建RAM账号。
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
# Endpoint以杭州为例,其它Region请按实际情况填写。
# 填写Bucket名称,例如examplebucket。
bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')

# 填写Object完整路径,完整路径中不包含Bucket名称,例如testfolder/exampleobject.txt。
# 下载Object到本地文件,并保存到指定的本地路径D:\\localpath\\examplefile.txt。如果指定的本地文件存在会覆盖,不存在则新建。
bucket.get_object_to_file('testfolder/exampleobject.txt', 'D:\\localpath\\examplefile.txt')

管理文件

列举文件

存储空间(Bucket)内的文件(Object)默认按照字母序排列。您可以结合实际场景列举当前Bucket的所有Object、指定前缀的Object、指定个数的Object等。

# -*- coding: utf-8 -*-
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider
# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
# 填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
# 填写Bucket名称,例如examplebucket。
bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')

# 列举Bucket下的所有文件。
for obj in oss2.ObjectIteratorV2(bucket):
    print(obj.key)

重命名文件

在OSS数据迁移或重组过程中,您可以通过重命名文件满足新的组织标准,确保命名一致性和结构准确性。

# -*- coding: utf-8 -*-
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider

# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
# yourEndpoint填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
endpoint = 'https://oss-cn-hangzhou.aliyuncs.com'
# 填写Bucket名称,例如examplebucket。
bucket_name = 'examplebucket'
bucket = oss2.Bucket(auth, endpoint, bucket_name)

# 填写不包含Bucket名称在内源Object的完整路径,例如srcobject.txt。
src_object_name = 'srcobject.txt'
# 填写不包含Bucket名称在内目标Object的完整路径,例如destobject.txt。
dest_object_name = 'destobject.txt'

# 将examplebucket下的srcobject.txt拷贝至同一Bucket下的destobject.txt。
result = bucket.copy_object(bucket_name, src_object_name, dest_object_name)

# 查看返回结果的状态。如果返回值为200,表示执行成功。
print('result.status:', result.status)

# 删除srcobject.txt。
result_del = bucket.delete_object(src_object_name)

# 查看返回结果的状态。如果返回值为204,表示执行成功。
print('result.status:', result_del.status)

删除文件

以下仅列举常见SDK的删除单个文件的代码示例。关于其他SDK删除单个文件以及多个文件的代码示例,请参见官方SDK简介。

# -*- coding: utf-8 -*-
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider

# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
# yourEndpoint填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
# 填写Bucket名称,例如examplebucket。
bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')

# 删除文件。
# yourObjectName填写待删除文件的完整路径,完整路径中不包含Bucket名称,例如exampledir/exampleobject.txt。
# 如需删除文件夹,请将yourObjectName设置为对应的文件夹名称。如果文件夹非空,则需要将文件夹下的所有文件删除后才能删除该文件夹。
bucket.delete_object('exampledir/exampleobject.txt')

文章作者: Ron.
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Ron. !
  目录