感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。
接续上一篇博客:
PUT
/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def PUT
def PUT(self, req):
"""
处理HTTP协议PUT请求;
"""
......
# 容器全局数据;
container_info = self.container_info(self.account_name, self.container_name, req)
container_partition = container_info['partition']
containers = container_info['nodes']
req.acl = container_info['write_acl']
req.environ['swift_sync_key'] = container_info['sync_key']
object_versions = container_info['versions']
......
# 获取对象的分区号和所有副本节点;
partition, nodes = self.app.object_ring.get_nodes(self.account_name, self.container_name, self.object_name)
......
pile = GreenPile(len(nodes))
......
outgoing_headers = self._backend_requests(
req, len(nodes), container_partition, containers,
delete_at_container, delete_at_part, delete_at_nodes)
# _connect_put_node:实现了PUT方法到各个节点的连接和推送;
for nheaders in outgoing_headers:
# RFC2616:8.2.3 disallows 100-continue without a body
if (req.content_length > 0) or chunked:
nheaders['Expect'] = '100-continue'
pile.spawn(self._connect_put_node, node_iter, partition,
req.swift_entity_path, nheaders,
self.app.logger.thread_locals)
# 获取到各个节点的所有的连接;
conns = [conn for conn in pile if conn]
......
bytes_transferred = 0
try:
with ContextPool(len(nodes)) as pool:
for conn in conns:
conn.failed = False
conn.queue = Queue(self.app.put_queue_depth)
pool.spawn(self._send_file, conn, req.path)
while True:
with ChunkReadTimeout(self.app.client_timeout):
try:
chunk = next(data_source)
except StopIteration:
if chunked:
for conn in conns:
conn.queue.put('0\r\n\r\n')
break
bytes_transferred += len(chunk)
if bytes_transferred > MAX_FILE_SIZE:
return HTTPRequestEntityTooLarge(request=req)
for conn in list(conns):
if not conn.failed:
conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk) if chunked else chunk)
else:
conns.remove(conn)
......
for conn in conns:
if conn.queue.unfinished_tasks:
conn.queue.join()
conns = [conn for conn in conns if not conn.failed]
......
# 获取所有连接的响应信息;
statuses, reasons, bodies, etags = self._get_put_responses(req, conns, nodes)
......
# 根据投票机制,根据现实所有响应信息,实现返回通过投票机制的响应信息;
resp = self.best_response(req, statuses, reasons, bodies, _('Object PUT'), etag=etag)
......
return resp
注:上述方法为裁剪后剩余关键部分的代码,各部分具体实现已经在代码注释中标注出来;
来看方法_connect_put_node的实现:
def _connect_put_node(self, nodes, part, path, headers, logger_thread_locals):
"""
实现PUT方法的推送;
"""
self.app.logger.thread_locals = logger_thread_locals
for node in nodes:
try:
start_time = time.time()
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part, 'PUT', path, headers)
self.app.set_node_timing(node, time.time() - start_time)
with Timeout(self.app.node_timeout):
resp = conn.getexpect()
if resp.status == HTTP_CONTINUE:
conn.resp = None
conn.node = node
return conn
elif is_success(resp.status):
conn.resp = resp
conn.node = node
return conn
elif headers['If-None-Match'] is not None and resp.status == HTTP_PRECONDITION_FAILED:
conn.resp = resp
conn.node = node
return conn
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
self.app.error_limit(node, _('ERROR Insufficient Storage'))
except (Exception, Timeout):
self.app.exception_occurred(node, _('Object'), _('Expect: 100-continue on %s') % path)
def PUT(self, request):
"""
新建(上传数据)/更新一个object对象;,流程如下:
1.通过req的头信息明确content-length长度fsize;
2.获取对象文件管理类DiskFile的实例化对象;
3.获取指定对象的元数据信息;
4.为指定对象文件预分配磁盘空间(大小为fsize);
5.按照network_chunk_size(65536比特)接收来自network的chunk,并且检查上传文件的大小;
6.根据request.heads中的值新建/更新指定对象的metadata;
7.通过file#put方法把更新后的元数据写入磁盘(包括用temp文件改名.data文件和写入metadata);
8.通过file#unlinkold方法实现删除较早版本object文件;
9.调用container_update通知container更新信息;
"""
# 根据request.path获取device、partition、account、container、obj等参数;
device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
if 'x-timestamp' not in request.headers or not check_float(request.headers['x-timestamp']):
return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain')
# 检测确认要建立对象的一切都已经准备好;
error_response = check_object_creation(request, obj)
if error_response:
return error_response
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
if new_delete_at and new_delete_at < time.time():
return HTTPBadRequest(body='X-Delete-At in past', request=request, content_type='text/plain')
# 通过req的头信息明确content-length长度;
try:
fsize = request.message_length()
except ValueError as e:
return HTTPBadRequest(body=str(e), request=request, content_type='text/plain')
# 获取对象文件管理类DiskFile的实例化对象;
# 这个类实现了应用POSIX-compliant格式的文件系统管理对象文件;
try:
disk_file = self.get_diskfile(device, partition, account, container, obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
# 获取对象的元数据信息;
try:
orig_metadata = disk_file.read_metadata()
except (DiskFileNotExist, DiskFileQuarantined):
orig_metadata = {}
# Checks for If-None-Match
if request.if_none_match is not None and orig_metadata:
if '*' in request.if_none_match:
# File exists already so return 412
return HTTPPreconditionFailed(request=request)
if orig_metadata.get('ETag') in request.if_none_match:
# The current ETag matches, so return 412
return HTTPPreconditionFailed(request=request)
orig_timestamp = orig_metadata.get('X-Timestamp')
if orig_timestamp and orig_timestamp >= request.headers['x-timestamp']:
return HTTPConflict(request=request)
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
upload_expiration = time.time() + self.max_upload_time
etag = md5()
elapsed_time = 0
try:
# create:为文件预分配磁盘空间(大小为size);
# 按照network_chunk_size接收来自network的chunk,并且检查上传文件的大小;
# 根据request.heads中的值新建/更新file.metadata;
# 通过file#put方法把更新后的元数据写入磁盘(包括用temp文件改名.data文件和写入metadata);
# 为文件预分配磁盘空间(大小为size);
with disk_file.create(size=fsize) as writer:
upload_size = 0
def timeout_reader():
with ChunkReadTimeout(self.client_timeout):
return request.environ['wsgi.input'].read(self.network_chunk_size)
# 按照network_chunk_size接收来自network的chunk;
try:
for chunk in iter(lambda: timeout_reader(), ''):
start_time = time.time()
if start_time > upload_expiration:
self.logger.increment('PUT.timeouts')
return HTTPRequestTimeout(request=request)
etag.update(chunk)
upload_size = writer.write(chunk)
elapsed_time += time.time() - start_time
except ChunkReadTimeout:
return HTTPRequestTimeout(request=request)
if upload_size:
self.logger.transfer_rate('PUT.' + device + '.timing', elapsed_time, upload_size)
# 并且检查上传文件的大小;
# 如果接收到的文件大小和request.head中声明的一致,并且etag也与heads中的'etag'一致时,说明文件接收成功;
if fsize is not None and fsize != upload_size:
return HTTPClientDisconnect(request=request)
etag = etag.hexdigest()
if 'etag' in request.headers and request.headers['etag'].lower() != etag:
return HTTPUnprocessableEntity(request=request)
# 根据request.heads中的值新建/更新file.metadata;
metadata = {
'X-Timestamp': request.headers['x-timestamp'],
'Content-Type': request.headers['content-type'],
'ETag': etag,
'Content-Length': str(upload_size),
}
metadata.update(val for val in request.headers.iteritems() if is_user_meta('object', val[0]))
for header_key in (
request.headers.get('X-Backend-Replication-Headers') or
self.allowed_headers):
if header_key in request.headers:
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
# 通过file#put方法把更新后的元数据写入磁盘(包括用temp文件改名.data文件和写入metadata);
writer.put(metadata)
except DiskFileNoSpace:
return HTTPInsufficientStorage(drive=device, request=request)
# 通过file#unlinkold删除较早版本object文件;
if orig_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update('PUT', new_delete_at, account, container, obj, request, device)
if orig_delete_at:
self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device)
# 调用container_update通知container更新信息;
self.container_update(
'PUT', account, container, obj, request,
HeaderKeyDict({
'x-size': metadata['Content-Length'],
'x-content-type': metadata['Content-Type'],
'x-timestamp': metadata['X-Timestamp'],
'x-etag': metadata['ETag']}),
device)
return HTTPCreated(request=request, etag=etag)
/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def DELETE
def DELETE(self, req):
"""
处理HTTP协议DELETE请求;
"""
container_info = self.container_info(self.account_name, self.container_name, req)
container_partition = container_info['partition']
containers = container_info['nodes']
req.acl = container_info['write_acl']
req.environ['swift_sync_key'] = container_info['sync_key']
object_versions = container_info['versions']
if object_versions:
# this is a version manifest and needs to be handled differently
object_versions = unquote(object_versions)
lcontainer = object_versions.split('/')[0]
prefix_len = '%03x' % len(self.object_name)
lprefix = prefix_len + self.object_name + '/'
last_item = None
try:
for last_item in self._listing_iter(lcontainer, lprefix, req.environ):
pass
except ListingIterNotFound:
# no worries, last_item is None
pass
except ListingIterNotAuthorized as err:
return err.aresp
except ListingIterError:
return HTTPServerError(request=req)
if last_item:
# there are older versions so copy the previous version to the
# current object and delete the previous version
orig_container = self.container_name
orig_obj = self.object_name
self.container_name = lcontainer
self.object_name = last_item['name'].encode('utf-8')
copy_path = '/v1/' + self.account_name + '/' + self.container_name + '/' + self.object_name
# 拷贝的目标文件:/self.container_name/self.object_name
copy_headers = {'X-Newest': 'True', 'Destination': orig_container + '/' + orig_obj}
copy_environ = {'REQUEST_METHOD': 'COPY', 'swift_versioned_copy': True}
# 根据给定的参数建立一个新的请求对象;
creq = Request.blank(copy_path, headers=copy_headers, environ=copy_environ)
copy_resp = self.COPY(creq)
if is_client_error(copy_resp.status_int):
# some user error, maybe permissions
return HTTPPreconditionFailed(request=req)
elif not is_success(copy_resp.status_int):
# could not copy the data, bail
return HTTPServiceUnavailable(request=req)
# reset these because the COPY changed them
self.container_name = lcontainer
self.object_name = last_item['name'].encode('utf-8')
new_del_req = Request.blank(copy_path, environ=req.environ)
container_info = self.container_info(self.account_name, self.container_name, req)
container_partition = container_info['partition']
containers = container_info['nodes']
new_del_req.acl = container_info['write_acl']
new_del_req.path_info = copy_path
req = new_del_req
# remove 'X-If-Delete-At', since it is not for the older copy
if 'X-If-Delete-At' in req.headers:
del req.headers['X-If-Delete-At']
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not containers:
return HTTPNotFound(request=req)
# 获取指定对象的分区号和所有副本节点;
partition, nodes = self.app.object_ring.get_nodes(self.account_name, self.container_name, self.object_name)
# Used by container sync feature
if 'x-timestamp' in req.headers:
try:
req.headers['X-Timestamp'] = normalize_timestamp(req.headers['x-timestamp'])
except ValueError:
return HTTPBadRequest(request=req, content_type='text/plain',
body='X-Timestamp should be a UNIX timestamp float value; '
'was %r' % req.headers['x-timestamp'])
else:
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
headers = self._backend_requests(req, len(nodes), container_partition, containers)
# 发送一个HTTP请求到多个节点,并汇聚所有返回的响应信息;
# 根据投票机制,根据现实所有响应信息,返回通过投票机制的响应信息(因为是获取多个节点的响应信息);
resp = self.make_requests(req, self.app.object_ring,
partition, 'DELETE', req.swift_entity_path,
headers)
return resp
def DELETE(self, request):
"""
实现删除指定对象文件,并删除较早版本的object文件;
当object更新后,也要更新container,当object删除后进行更新所属container;
"""
device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
if 'x-timestamp' not in request.headers or not check_float(request.headers['x-timestamp']):
return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain')
try:
disk_file = self.get_diskfile(device, partition, account, container, obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
# 获取要删除对象的元数据;
try:
orig_metadata = disk_file.read_metadata()
except DiskFileExpired as e:
orig_timestamp = e.timestamp
orig_metadata = e.metadata
response_class = HTTPNotFound
except DiskFileDeleted as e:
orig_timestamp = e.timestamp
orig_metadata = {}
response_class = HTTPNotFound
except (DiskFileNotExist, DiskFileQuarantined):
orig_timestamp = 0
orig_metadata = {}
response_class = HTTPNotFound
else:
orig_timestamp = orig_metadata.get('X-Timestamp', 0)
if orig_timestamp < request.headers['x-timestamp']:
response_class = HTTPNoContent
else:
response_class = HTTPConflict
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
try:
req_if_delete_at_val = request.headers['x-if-delete-at']
req_if_delete_at = int(req_if_delete_at_val)
except KeyError:
pass
except ValueError:
return HTTPBadRequest(request=request, body='Bad X-If-Delete-At header value')
else:
if orig_delete_at != req_if_delete_at:
return HTTPPreconditionFailed(request=request, body='X-If-Delete-At and X-Delete-At do not match')
# 当更新object的时候,更新到期的对象所属container;
# 经过分析代码,在方法delete_at_update中没有执行任何操作;
if orig_delete_at:
self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device)
req_timestamp = request.headers['X-Timestamp']
if orig_timestamp < req_timestamp:
# 实现删除比给定时间戳旧的任何对象文件;
disk_file.delete(req_timestamp)
# 当object更新后,也要更新container,用于当object更新时更新所属container;
self.container_update(
'DELETE', account, container, obj, request,
HeaderKeyDict({'x-timestamp': req_timestamp}),
device)
return response_class(request=request)
语句disk_file.delete(req_timestamp)实现了删除指定对象,并实现了删除比给定时间戳旧的任何对象文件;
来看方法disk_file.delete()的实现:
class DiskFile(object)----def delete
def delete(self, timestamp):
"""
实现删除比给定时间戳旧的任何对象文件,并实现更新时间戳;
"""
timestamp = normalize_timestamp(timestamp)
with self.create() as deleter:
deleter._extension = '.ts'
deleter.put({'X-Timestamp': timestamp})
注:具体实现通过分析方法create()和方法put()就可得知和理解,这里不多说了。
再来看方法container_update的实现:
def container_update(self, op, account, container, obj, request, headers_out, objdevice):
"""
当object更新后,也要更新container;
通过头文件获取所有要实现更新container所属的device和host;
通过循环遍历实现发送HTTP请求至所属container,更新container的数据;
"""
headers_in = request.headers
# 从原始请求的头部信息中获取container的相应的host信息;
conthosts = [h.strip() for h in
headers_in.get('X-Container-Host', '').split(',')]
# 从原始请求的头部信息中获取container的相应的device信息;
contdevices = [d.strip() for d in
headers_in.get('X-Container-Device', '').split(',')]
# 从原始请求的头部信息中获取container的相应的partition信息;
contpartition = headers_in.get('X-Container-Partition', '')
# 如果要改变信息的container所对应的host数目和device数目不同,引发错误声明并返回;
if len(conthosts) != len(contdevices):
# This shouldn't happen unless there's a bug in the proxy,
# but if there is, we want to know about it.
self.logger.error(_('ERROR Container update failed: different '
'numbers of hosts and devices in request: '
'"%s" vs "%s"') %
(headers_in.get('X-Container-Host', ''),
headers_in.get('X-Container-Device', '')))
return
if contpartition:
updates = zip(conthosts, contdevices)
else:
updates = []
headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-')
headers_out['referer'] = request.as_referer()
# 遍历所有要改变信息container中相对应的host和device,发送更新container信息的请求要求到相应的目标之上;
# 调用方法async_update来具体实现发送HTTP请求至所属container,更新container的数据;
for conthost, contdevice in updates:
self.async_update(op, account, container, obj, conthost,
contpartition, contdevice, headers_out,
objdevice)
def async_update(self, op, account, container, obj, host, partition, contdevice, headers_out, objdevice):
"""
发送或者保存一个异步更新;
用于当object发生变化时,发送HTTP请求至所属container,更新container的数据;
如果请求失败,则将更新序列化写入async_dir的dest文件中,具体路径如下:
ASYNCDIR='async_pending'
async_dir=self.devices/objdevice/<ASYNCDIR>
hash_path=hash(account,container, obj)
dest=<async_dir>/<hash_path>[-3:]/<hash_path>-<timestamp>
"""
headers_out['user-agent'] = 'obj-server %s' % os.getpid()
# 完整路径;
full_path = '/%s/%s/%s' % (account, container, obj)
if all([host, partition, contdevice]):
try:
# 在conn_timeout时间内执行with之下的连接操作,否则引发异常;
with ConnectionTimeout(self.conn_timeout):
# 从host中解析出来ip和port的值;
ip, port = host.rsplit(':', 1)
# 建立一个HTTPConnection类的对象;
# 返回HTTPConnection连接对象;
conn = http_connect(ip, port, contdevice, partition, op, full_path, headers_out)
# 在node_timeout时间内执行with之下的操作,否则引发异常;
with Timeout(self.node_timeout):
# 获取来自所属container服务器的响应;
response = conn.getresponse()
response.read()
# 根据连接状态判断连接是否成功,成功则直接返回;
if is_success(response.status):
return
else:
self.logger.error(_(
'ERROR Container update failed '
'(saving for async update later): %(status)d '
'response from %(ip)s:%(port)s/%(dev)s'),
{'status': response.status, 'ip': ip, 'port': port,
'dev': contdevice})
except (Exception, Timeout):
self.logger.exception(_(
'ERROR container update failed with '
'%(ip)s:%(port)s/%(dev)s (saving for async update later)'),
{'ip': ip, 'port': port, 'dev': contdevice})
data = {'op': op, 'account': account, 'container': container, 'obj': obj, 'headers': headers_out}
timestamp = headers_out['x-timestamp']
self._diskfile_mgr.pickle_async_update(objdevice, account, container, obj, data, timestamp)
def pickle_async_update(self, device, account, container, obj, data, timestamp):
device_path = self.construct_dev_path(device)
# 如果请求失败,则将更新序列化写入async_dir的dest文件中,具体路径如下:
# ASYNCDIR='async_pending'
# async_dir=self.devices/objdevice/<ASYNCDIR>
# hash_path=hash(account,container, obj)
# dest=<async_dir>/<hash_path>[-3:]/<hash_path>-<timestamp>
async_dir = os.path.join(device_path, ASYNCDIR)
# hash_path:根据情况获取account/container/object的哈希值,这里当然是获取object的哈希值啦;
ohash = hash_path(account, container, obj)
# 确保pickle文件写入到磁盘;
# 先写道临时位置,确保它同步到磁盘,然后移动到磁盘上最终的位置;
self.threadpools[device].run_in_thread(
write_pickle,
data,
os.path.join(async_dir, ohash[-3:], ohash + '-' + normalize_timestamp(timestamp)),
os.path.join(device_path, 'tmp'))
self.logger.increment('async_pendings')
def write_pickle(obj, dest, tmp=None, pickle_protocol=0):
"""
确保pickle文件写入到磁盘;
先写道临时位置,确保它同步到磁盘,然后移动到磁盘上最终的位置;
"""
if tmp is None:
tmp = os.path.dirname(dest)
fd, tmppath = mkstemp(dir=tmp, suffix='.tmp')
with os.fdopen(fd, 'wb') as fo:
pickle.dump(obj, fo, pickle_protocol)
fo.flush()
os.fsync(fd)
renamer(tmppath, dest)