python-七牛云获取空间域名
环境需求
- cdn 在腾讯云
- 存储在七牛云
- 每次配置cdn,都需要找七牛云配置源站域名绑定到相应的存储上面。
- 但七牛云的控制台上并没有这个信息。
- 只能更具API 来获取
七牛API
代码
- Python2.7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141#!/usr/bin/env python
# -*- coding: utf-8 -*-
from qiniu import Auth
# import bucket
import platform
import requests
# from requests.auth import AuthBase
from qiniu.compat import is_py2, is_py3
from qiniu import config
import qiniu.auth
### 版本号可能会变,https://github.com/qiniu/python-sdk/blob/master/qiniu/__init__.py 这边去确认
__version__ = '7.2.6'
_sys_info = '{0}; {1}'.format(platform.system(), platform.machine())
_python_ver = platform.python_version()
USER_AGENT = 'QiniuPython/{0} ({1}; ) Python/{2}'.format(
__version__, _sys_info, _python_ver)
_session = None
_headers = {'User-Agent': USER_AGENT}
access_key = '填入ak'
secret_key = '填入sk'
bucket_name = "存储空间名"
auth = Auth(access_key, secret_key)
def _get(url, params, auth):
try:
r = requests.get(
url,
params=params,
auth=qiniu.auth.RequestsAuth(auth) if auth is not None else None,
timeout=config.get_default('connection_timeout'),
headers=_headers)
except Exception as e:
return None, ResponseInfo(None, e)
return __return_wrapper(r)
def __return_wrapper(resp):
if resp.status_code != 200 or resp.headers.get('X-Reqid') is None:
return None, ResponseInfo(resp)
resp.encoding = 'utf-8'
ret = resp.json(encoding='utf-8') if resp.text != '' else {}
if ret is None: # json null
ret = {}
return ret, ResponseInfo(resp)
class ResponseInfo(object):
"""七牛HTTP请求返回信息类
该类主要是用于获取和解析对七牛发起各种请求后的响应包的header和body。
Attributes:
status_code: 整数变量,响应状态码
text_body: 字符串变量,响应的body
req_id: 字符串变量,七牛HTTP扩展字段,参考 http://developer.qiniu.com/docs/v6/api/reference/extended-headers.html
x_log: 字符串变量,七牛HTTP扩展字段,参考 http://developer.qiniu.com/docs/v6/api/reference/extended-headers.html
error: 字符串变量,响应的错误内容
"""
def __init__(self, response, exception=None):
"""用响应包和异常信息初始化ResponseInfo类"""
self.__response = response
self.exception = exception
if response is None:
self.status_code = -1
self.text_body = None
self.req_id = None
self.x_log = None
self.error = str(exception)
else:
self.status_code = response.status_code
self.text_body = response.text
self.req_id = response.headers.get('X-Reqid')
self.x_log = response.headers.get('X-Log')
if self.status_code >= 400:
ret = response.json() if response.text != '' else None
if ret is None or ret['error'] is None:
self.error = 'unknown'
else:
self.error = ret['error']
if self.req_id is None and self.status_code == 200:
self.error = 'server is not qiniu'
def ok(self):
return self.status_code == 200 and self.req_id is not None
def need_retry(self):
if self.__response is None or self.req_id is None:
return True
code = self.status_code
if (code // 100 == 5 and code != 579) or code == 996:
return True
return False
def connect_failed(self):
return self.__response is None or self.req_id is None
def __str__(self):
if is_py2:
return ', '.join(
['%s:%s' % item for item in self.__dict__.items()]).encode('utf-8')
elif is_py3:
return ', '.join(['%s:%s' %
item for item in self.__dict__.items()])
def __repr__(self):
return self.__str__()
if __name__ == "__main__":
options = {
'tbl': bucket_name,
}
url = 'http://api.qiniu.com/v6/domain/list'
print(bucket_name)
ret, info = _get(url, options, auth)
print(info)
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 云运维!