使用Python实现大文件切片上传及断点续传的方法
作者:ZZZ_Tong
概要
本文使用python实现大文件切片上传,并支持断点续传,将功能划分为:获取上传文件接口状态、获取临时文件夹状态信息、切片上传、切片合并四个功能模块。提供了详细的思路和功能源码。
整体架构流程
步骤示例: 1.前端收到用户的上传的文件后,会执行以下操作:
- 计算文件哈希值:通过特定算法,算出该文件的hash值。
- 向后端传参:将文件路径(包括文件名)、目标机器id、文件hash值、用户id,一并传递给后端判断文件状态接口get_file_upload_status。
后端收到前端传来的数据后,按如下逻辑进行处理: 检查文件是否已经存在:判断该文件是否存在,如果已存在,则返回前端让用户选择是否覆盖文件。 检查Redis中上传记录:如果不存在则进一步判断redis中是否有该文件正在上传的记录。
- 无上传记录:如果没有则使用redis String数据类型进行存储文件上传状态,key为"机器_id + 文件路径(例如118/tmp/123.jpg)",value为自制一个
{
'hash_code': 文件hash值,
'person_id': 用户_id
}
- 有上传记录:如果redis中存在正在上传的记录,则比对用户id是否为同一个人,是则可以断点续传,否则阻止其他人上传。
2.当文件状态通过后,前端会调用get_file_dir_info接口,并传入主机_id、用户_id、文件路径(包括文件名)。 get_file_dir_info接口主要有三个功能:
- 检查临时文件夹:判断用于存放切片的 “临时文件夹” 是否存在。临时文件夹的完整路径由 “文件路径 + : + 用户 id” 拼接而成,例如 “/tmp/123.jgp:12”。若该文件夹不存在,系统会自动创建它。创建完成后,对临时文件夹路径进行哈希计算,再借助 Redis 存储相关信息,以哈希值作为 key,临时文件夹的全路径作为 value,方便后续执行删除文件夹操作时快速定位。
- 检查切片上传记录临时文件:查看 “临时文件夹” 下的 “切片上传记录临时文件” 是否已经存在。如果文件存在,就读取其中的内容;要是不存在,就对其内容进行初始化。“临时记录文件”内容如下:
class FileObject():
def __init__(self,name,size,chunk,path=None,num=0):
self.name = name #文件名
self.size = size #文件夹已上传大小
self.chunk = chunk #块数
self.path = path # 路径
self.num = num #文件夹下存在分块数,设计冗余了
- 放置合并脚本:在临时文件夹下放置 merge.py 脚本文件,为后续的文件处理流程提供支持。
最后遍历整个临时文件夹,获得一个文件已上传块数的列表(为了支持断点续传),返回给前端临时记录文件信息、临时文件夹的hash值、已上传的文件块列表。
3.前端将文件按照10M进行切片,并按照上述接口得到的信息(文件名、已上传的文件块列表),进行续传或直接传输file_chunk_upload,通过列表中的信息可跳过已上传的文件块,并可补上中间因为其他原因丢失的块。 4.上传完成后前端会再次调用get_file_dir_info接口,获得当前文件夹下已存在的文件块数量与列表,与自己分块进行对比。如果成功则调用合并文件接口。 5.前端发起合并请求时,会调用 bigfile_merge 接口,同时传入用户 id、文件块数、文件路径以及临时文件夹路径这些关键参数。 后端在接收到前端传来的数据后,展开一系列校验与操作:
- 参数校验:对传入的路径与文件块数进行准确性核查,如果正确则开始合并。
- 合并方式选择:合并功能主要分两种执行路径。一是检测目标机器是否支持运行 merge.py 脚本,若支持,便直接执行已传入的脚本完成合并;若不支持,则执行目标机器文件的操作方法来执行合并。
- 断点续合判断:开始合并前,要先确认需要合并的文件是否已经存在。若文件已存在,意味着这是一个断点文件,此时需读取临时记录文件中的 size 和 chunk 值,从而判断文件的实际大小以及已合并的块位置。一旦发现实际文件大小超过记录文件中的数值,就表明合并过程曾出现中断,此时要把文件指针调整到记录值对应的位置,接着从该记录值对应的块开启后续合并。
- 记录更新 :每成功完成一块文件的合并,就把新的 size 与 chunk 值写入临时记录文件。
- 收尾清理:合并完成后清除临时文件夹,并删除redis中该文件上传记录状态信息。
到此完成一次文件上传操作
技术细节
- API
获取上传文件状态接口
get_file_upload_status 查看需要上传的文件是否存在或断点续
class FileUploadStatus(APIView):
def post(self, request, *args, **kwargs):
#先判断目标地址下是否存在该文件
#如果存在则提示是否需要覆盖
try:
forms = json.loads(request.body)
host_id = forms['host_id']
full_path = forms['full_path']
hash_code = forms['hash_code']
person_id = forms['person_id']
except Exception as e:
logger.error("前端参数解析出错",repr(e))
try:
err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id) #使用paramiko模块获取目标机器连接,这里读者自行用其他方式都可以
if err:
return Response({'code': -1, 'data': host_id, 'msg': "连接目标主机出错"}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.error("连接目标机器失败",repr(e))
try:
#判断机器上文件是否存在
tmp_sftp._sftp.stat(full_path)
return Response({'code': -1, 'data': full_path, 'msg': "文件已存在,通知是否覆盖"}, status=status.HTTP_400_BAD_REQUEST)
except IOError:
#其次判断是否存在在redis中,防止短时间内有相同的人传同一份文件,但前一个人并没有合并完成
#如果都没有则创建新key hashcode放入redis,并通知前端可以上传
try:
r = get_redis() #获得redis连接实例,读者自行实现
except Exception as e:
logger.error("redis获取失败",repr(e))
#例如:18115/app/home/202.jpg
#获取redis中记录
data = r.get(str(host_id)+full_path)
if not data:
#将key和hashcode放入redis
#加上person_id
file_dict = {}
file_dict['hash_code'] = hash_code
file_dict['person_id'] = person_id
r.set(str(host_id)+full_path, str(file_dict))
return Response({'code': 1, 'data': full_path, 'msg': "文件不存在,可以上传"}, status=status.HTTP_200_OK)
else:
#如果redis中有记录,host_id+路径能够拦截在目标机器传输同一份文件,但也会把同一个人想断点传输拦截
#所以在此key的value中保存带有person_id的临时文件夹路径信息
#使用person_id将同一个人放行
retrieved_data = r.get(str(host_id)+full_path)
# 将获取到的数据转换为字典对象
try:
retrieved_dict = eval(retrieved_data.decode())
except Exception as e:
logger.error("redis获取目标文件hashcode解码失败",repr(e))
if person_id ==retrieved_dict['person_id']:
return Response({'code': 1, 'data': full_path, 'msg': "断点续传判断通过"}, status=status.HTTP_200_OK)
else:
return Response({'code': 2, 'data': full_path, 'msg': "该文件正在上传,请勿多次操作"}, status=status.HTTP_200_OK)
获取临时文件夹状态信息接口
get_file_dir_info 获取临时文件夹状态信息接口
class GetFileDirInfo(APIView):
def post(self, request, *args, **kwargs):
forms = json.loads(request.body)
host_id = forms['host_id']
person_id = forms['person_id']
full_path = forms['full_path']
err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id)#使用paramiko模块获取目标机器连接,这里读者自行用其他方式都可以
if err:
return Response({'code': -1, 'data': host_id, 'msg': "连接目标主机出错"}, status=status.HTTP_400_BAD_REQUEST)
#分离路径名与文件名
#示例:/home/app 2-2.jpg
file_path,file_name = os.path.split(full_path)
if file_path[-1] != "/":
file_path += "/"
#分离文件名与文件扩展名
#示例:2-2 .jpg
short_name,ext = os.path.splitext(file_name)
#构造存放临时文件的文件夹名
#示例:/home/app/2-2.jpg:12/
tmp_dir = full_path +":"+ str(person_id)
#并将临时文件夹做hashcode 传入redis key为hashcode value为路径,后续删除文件夹操作多一层验证
tmp = hashlib.md5()
tmp.update(tmp_dir.encode('utf-8'))
hash_value = tmp.hexdigest()
if tmp_dir[-1] != "/":
tmp_dir += "/"
try:
#判断临时文件夹是否存在
tmp_sftp._sftp.stat(tmp_dir)
except Exception:
try:
print('创建临时文件夹')
tmp_sftp._sftp.mkdir(tmp_dir)
r = get_redis() #获取redis连接
r.set(hash_value,tmp_dir)
except Exception as e:
logger.error("创建临时文件夹失败",e)
tmp_sftp._sftp.close()
return Response({'code': -1, 'data': "", 'msg': "创建临时文件夹失败"}, status=status.HTTP_400_BAD_REQUEST)
#如果临时文件夹存在则在此文件夹下获取文件传输txt
#初始化记录文件对象
fobject = FileObject(file_name,0,0,tmp_dir,0)
#示例:/home/app/2-2.jpg:12/2-2.txt
record_file = tmp_dir+short_name+".txt"
try:
#判断记录文件是否存在
tmp_sftp._sftp.stat(record_file)
tmp_load_file = tmp_sftp._sftp.open(record_file,'r')
#如果记录文件夹存在则读取其已保存上传文件大小和块数
for line in tmp_load_file.readlines():
title,value = line.replace('\n','').split(':')
print(title + ":"+ value)
if title=='name':
fobject.name = value
elif title=='size':
fobject.size = int(value)
elif title =='chunk':
fobject.chunk = int(value)
tmp_load_file.close()
except IOError:
print("记录文件不存在,创建新的记录文件,并初始化")
store_f=tmp_sftp._sftp.open(record_file,'w+')
store_f.writelines(['name:'+fobject.name+'\n','size:'+str(fobject.size)+'\n',
'chunk:'+str(fobject.chunk)+'\n'])
print("创建记录文件成功")
store_f.close()
serializer = RecordFileSerializer(fobject)
data = serializer.data
data['dir_hashcode'] = hash_value
#上传一个支持异步合并的merge.py文件
command = 'command -v python'
stdin, stdout, stderr = tmp_sftp._ssh.exec_command(command)
output = stdout.read().decode().strip()
try:
print('判断merge在不在')
tmp_sftp._sftp.stat(tmp_dir + 'merge.py')
except Exception as e:
if output:
print('目标机器已安装 Python')
current_path = os.path.dirname(os.path.abspath(__file__))
if current_path[-1] != "/":
current_path += "/"
try:
print('需要传输的地址',current_path+'merge.py')
print('目标地址',tmp_dir + 'merge.py')
tmp_sftp._sftp.put(current_path+'merge.py',tmp_dir + 'merge.py')
except Exception as e:
logger.error("发送本地合并文件失败",e)
tmp_sftp._sftp.close()
return Response({'code': -1, 'data': "", 'msg': "发送本地合并文件失败"}, status=status.HTTP_400_BAD_REQUEST)
#将merge文件发送
else:
tmp_sftp._sftp.close()
return Response({'code': -1, 'data': "", 'msg': "目标机器未安装python"}, status=status.HTTP_400_BAD_REQUEST)
tmp_sftp._sftp.close()
return Response({'code': 0, 'data': data, 'msg': "创建新记录文件"}, status=status.HTTP_200_OK)
file_list = tmp_sftp._sftp.listdir(tmp_dir)
#文件夹下有一个txt记录文件,总文件块数需要减一
fobject.num = len(file_list)-2
chunk_list =[]
for file in file_list:
parts = file.rsplit(":", 1) # 按冒号从右到左分割为最多两个部分
if len(parts) == 2:
last_part = parts[1] # 获取最后一个部分
try:
chunk_list.append(int(last_part))
except Exception as e:
continue
print("最后一个值:", last_part)
else:
print("未找到冒号,跳过")
serializer = RecordFileSerializer(fobject)
data = serializer.data
data['dir_hashcode'] = hash_value
data['chunk_list'] = chunk_list
return Response({'code': 0, 'data': data, 'msg': "记录文件已存在"}, status=status.HTTP_200_OK)
切片上传功能
file_chunk_upload 切片上传功能
class FileChunkUploadHandler():
def __init__(self, redis_host,request=None,file_info=None,chunk_num=0,people_id =0):
self.request = request
self.file_info = request.FILES.get('file_info')
# forms = json.loads(request.body)
#数据块信息
#第几个数据块
self.chunk_num = request.data.get('chunk_num')
self.full_path = request.data.get('full_path')
self.person_id = request.data.get('person_id')
self.redis_host = redis_host
def upload_file(self):
print("%s号文件块开始传输",self.chunk_num)
full_path = urllib.parse.unquote(self.full_path)
file_path,file_name = os.path.split(full_path)
host_id = self.request.query_params.get('host_id', None)
err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id)
#分离扩展名和文件名
#2-2 .jpg
short_name,ext = os.path.splitext(file_name)
if file_path[-1] != "/":
file_path += "/"
#存放临时文件的文件夹,在上一个获取临时文件夹状态接口就已创建,按照相同格式做一次校验。
#/home/app/2-2.jpg:12
tmp_dir = file_path+file_name+":"+ str(self.person_id)
if tmp_dir[-1] != "/":
tmp_dir += "/"
#临时文件名
#/home/app/2-2.jpg:12/12-2-2:1
tmp_file_name = tmp_dir + str(self.person_id) + "-" + short_name + ":" + str(self.chunk_num)
try:
tmp_sftp._sftp.stat(tmp_dir)
except IOError as e:
logger.error("存储临时文件夹不存在",e)
return 0,repr(e)
try:
tmp_sftp._sftp.stat(tmp_file_name)
except IOError:
try:
#文件块上传存放
print('创建临时文件块',tmp_file_name)
remote_file = tmp_sftp._sftp.open(tmp_file_name, mode="wb")
my_bytes = self.file_info.read()
remote_file.write(my_bytes)
remote_file.close()
except Exception as e:
logger.error("上传文件块失败",e)
return 0,repr(e)
print("写入文件完成:",tmp_file_name)
record_file = tmp_dir+short_name+".txt"
#更新上传记录文件信息
fobject = FileObject(file_name,0,0)
tmp_load_file = tmp_sftp._sftp.open(record_file,'r')
for line in tmp_load_file.readlines():
title,value = line.replace('\n','').split(':')
print(title + ":"+ value)
if title=='name':
fobject.name = value
elif title=='size':
fobject.size = int(value)
elif title =='chunk':
fobject.chunk = int(value)
tmp_load_file.close()
try:
tmp_sftp._sftp.stat(record_file)
load_file = tmp_sftp._sftp.open(record_file,'w+')
load_file.writelines(['name:'+fobject.name+'\n','size:'+str(fobject.size)+'\n',
'chunk:'+str(0)+'\n'])
except Exception as e:
logger.error(e)
tmp_sftp.close()
return 1,None
文件合并功能
bigfile_merge 文件合并与校验功能
class RemoteBigFileUploadHandler():
def __init__(self, redis_host,request=None, chat=False,tmp_path=None,chunk_num=0,person_id=0):
self.request = request
forms = json.loads(request.body)
self.chat = chat
#数据块大小
# self.chunk_size = forms['chunk_size']
self.tmp_path = forms['tmp_path']
#数据块数量
self.chunk_num = forms['chunk_num']
self.full_path = forms['full_path']
self.person_id = forms['person_id']
self.redis_host = redis_host
if self.chat:
self.current = 0
self.datetime = datetime.datetime.now()
#数据进度聊天室
self.channel_layer = get_channel_layer()
self.room = self.request.query_params.get('chat_room', None)
self.total = self.request.query_params.get('total', None)
if not self.room or not self.total:
raise KeyError('params: chat_room and total is needed')
def file_isavailable(self):
tmp_path = self.tmp_path
tmp_sftp = self.get_session()
file_list = tmp_sftp._sftp.listdir(tmp_path)
#文件夹下有一个txt记录文件,总文件块数需要减一
true_num = len(file_list)-2
if true_num!=self.chunk_num:
tmp_sftp._sftp.close()
return False
else:
tmp_sftp._sftp.close()
return True
def merge_file(self):
#----------------------获得基础信息-----------------------------------
host_id = self.request.query_params.get('host_id', None)
full_path = urllib.parse.unquote(self.full_path)
file_path,file_name = os.path.split(full_path)
if file_path[-1] != "/":
file_path += "/"
#分离扩展名和文件名
short_name,ext = os.path.splitext(file_name)
tmp_path = self.tmp_path
tmp_sftp = self.get_session()
file_list = tmp_sftp._sftp.listdir(tmp_path)
try:
tmp_sftp._sftp.stat(full_path)
print("文件已存在")
except IOError:
print("文件不存在")
print("创建新的文件")
new_f = tmp_sftp._sftp.open(full_path,mode='a')
new_f.write("")
new_f.close()
#判断merge.py 文件是否存在
if 'merge.py' in file_list:
com_path = tmp_path + 'merge.py'
command = f"nohup python {com_path} {self.person_id} {self.chunk_num} &"
stdin, stdout, stderr = tmp_sftp._ssh.exec_command(command)
merging = True
while merging:
if self.chat:
# websocket回显进度,获得当前文件字节数
self.current = tmp_sftp._sftp.stat(full_path).st_size
print('current',self.current)
print('total',self.total)
self.call_back()
if self.current >= int(self.total):
merging = False
time.sleep(1)
else :
merging = False
else:
#创建或打开用户的存储文件
# ----------------开始合并操作-------------------------------
# rb+ 追加覆盖的方式打开用户需要存储文件,指针初始位置为文件开头
remote_file = tmp_sftp._sftp.open(full_path,mode='rb+')
#默认当前块数从1开始
i=1
#创建文件记录对象
fobject = FileObject(file_name,0,0)
print('创建记录对象成功')
# 断点续传记录文件
record_file = tmp_path+short_name+".txt"
try :
#如果记录文件存在,则存储文件大概率也存在
#如果两个文件打开出错 则走excpt重新创建
tmp_sftp._sftp.stat(record_file)
upload_file = tmp_sftp._sftp.stat(full_path)
#这里也是获取本地文件的大小
temp_size = upload_file.st_size
print("记录文件已存在")
print("当前文件真实数据大小:",temp_size)
# 读取文件,获得记录的已上传块数,以及大小总块数
tmp_load_file = tmp_sftp._sftp.open(record_file,'r')
for line in tmp_load_file.readlines():
title,value = line.replace('\n','').split(':')
print(title + ":"+ value)
if title=='name':
fobject.name = value
elif title=='size':
fobject.size = int(value)
elif title =='chunk':
fobject.chunk = int(value)
tmp_load_file.close()
except IOError:
print("记录文件不存在,创建新的记录文件,并初始化")
temp_size = 0
store_f=tmp_sftp._sftp.open(record_file,'w+')
store_f.writelines(['name:'+fobject.name+'\n','size:'+str(fobject.size)+'\n',
'chunk:'+str(fobject.chunk)+'\n'])
print("创建记录文件成功")
store_f.close()
# ----------------------------------开始保存数据--------------------------
print("真实文件数据大小",temp_size)
print("记录文件数据大小",fobject.size)
if temp_size>=fobject.size:
#如果实际文件大于记录文件,则表示在合并时中断过
print("调整指针")
remote_file.seek(fobject.size)
i = fobject.chunk+1
tmp_file_name = str(self.person_id) + "-" + short_name + ":"
try:
while i<=self.chunk_num:
if tmp_file_name + str(i) in file_list:
print('true')
#临时文件夹/临时文件名
file_path = tmp_path+tmp_file_name + str(i)
with tmp_sftp._sftp.file(file_path,'rb') as input_file:
print(file_path)
file_content = input_file.read()
remote_file.write(file_content)
fobject.size += len(file_content)
i = i+1
fobject.chunk += 1
if self.chat:
# websocket回显进度,获得当前文件字节数
self.current = tmp_sftp._sftp.stat(full_path).st_size
print('current',self.current)
print('total',self.total)
self.call_back()
#将已经写入的块和大小记录
load_file = tmp_sftp._sftp.open(record_file,'w+')
load_file.writelines(['name:'+fobject.name+'\n','size:'+str(fobject.size)+'\n',
'chunk:'+str(fobject.chunk)+'\n'])
load_file.close()
except Exception as e:
logger.error("合并文件异常",e)
#----------------清除临时文件与关闭资源操作----------
#删除临时文件夹下的所有文件
remote_file.close()
try:
for name in file_list:
del_path = tmp_path + name
tmp_sftp._sftp.remove(del_path)
# 删除空临时文件夹
tmp_sftp._sftp.rmdir(tmp_path)
except Exception as e:
logger.error("删除文件文件异常",e)
#关闭操作文件资源
tmp_sftp._sftp.close()
#删除redis中该文件临时记录,一个是文件夹hashcode,用于验证与删除目标文件夹
#一个是host_id+路径,用于标记该路径下这个文件正在传输
r = self.redis_host
data = r.get(str(host_id)+full_path)
if data:
r.delete(str(host_id)+full_path)
tmp = hashlib.md5()
tmp.update(tmp_path.encode('utf-8'))
hash_value = tmp.hexdigest()
r.delete(hash_value)
print("文件合并结束")
return 1,None
小结
本文提供了一个python大文件上传功能实现的思路,希望能够你带来一些思路和启发。
以上就是使用Python实现大文件切片上传及断点续传的方法的详细内容,更多关于Python大文件切片上传及断点续传的资料请关注脚本之家其它相关文章!
