python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python大文件切片上传及断点续传

使用Python实现大文件切片上传及断点续传的方法

作者:ZZZ_Tong

本文介绍了使用 Python 实现大文件切片上传及断点续传的方法,包括功能模块划分(获取上传文件接口状态、临时文件夹状态信息、切片上传、切片合并)、整体架构流程、技术细节(相关接口和功能的代码实现),最后进行了小结,需要的朋友可以参考下

概要

本文使用python实现大文件切片上传,并支持断点续传,将功能划分为:获取上传文件接口状态、获取临时文件夹状态信息、切片上传、切片合并四个功能模块。提供了详细的思路和功能源码。

整体架构流程

步骤示例: 1.前端收到用户的上传的文件后,会执行以下操作:

后端收到前端传来的数据后,按如下逻辑进行处理: 检查文件是否已经存在:判断该文件是否存在,如果已存在,则返回前端让用户选择是否覆盖文件。 检查Redis中上传记录:如果不存在则进一步判断redis中是否有该文件正在上传的记录。

{
'hash_code': 文件hash值,
'person_id': 用户_id
}

2.当文件状态通过后,前端会调用get_file_dir_info接口,并传入主机_id、用户_id、文件路径(包括文件名)。 get_file_dir_info接口主要有三个功能:

class FileObject():
    def __init__(self,name,size,chunk,path=None,num=0):
        self.name = name #文件名
        self.size = size #文件夹已上传大小
        self.chunk = chunk #块数
        self.path = path # 路径
        self.num = num #文件夹下存在分块数,设计冗余了

最后遍历整个临时文件夹,获得一个文件已上传块数的列表(为了支持断点续传),返回给前端临时记录文件信息、临时文件夹的hash值、已上传的文件块列表。

3.前端将文件按照10M进行切片,并按照上述接口得到的信息(文件名、已上传的文件块列表),进行续传或直接传输file_chunk_upload,通过列表中的信息可跳过已上传的文件块,并可补上中间因为其他原因丢失的块。 4.上传完成后前端会再次调用get_file_dir_info接口,获得当前文件夹下已存在的文件块数量与列表,与自己分块进行对比。如果成功则调用合并文件接口。 5.前端发起合并请求时,会调用 bigfile_merge 接口,同时传入用户 id、文件块数、文件路径以及临时文件夹路径这些关键参数。 后端在接收到前端传来的数据后,展开一系列校验与操作:

到此完成一次文件上传操作

技术细节

获取上传文件状态接口

get_file_upload_status 查看需要上传的文件是否存在或断点续

class FileUploadStatus(APIView):
    
    def post(self, request, *args, **kwargs):
        
        #先判断目标地址下是否存在该文件
        #如果存在则提示是否需要覆盖
        try:
            forms = json.loads(request.body)
            host_id = forms['host_id']
            
            full_path = forms['full_path']
            hash_code = forms['hash_code']
            person_id = forms['person_id']
        except Exception as e:
            logger.error("前端参数解析出错",repr(e))
        try:
            err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id) #使用paramiko模块获取目标机器连接,这里读者自行用其他方式都可以
            if err:
                return Response({'code': -1, 'data': host_id, 'msg': "连接目标主机出错"}, status=status.HTTP_400_BAD_REQUEST)
        except Exception as e:
            logger.error("连接目标机器失败",repr(e))
        
        try:
            #判断机器上文件是否存在
            tmp_sftp._sftp.stat(full_path)
            return Response({'code': -1, 'data': full_path, 'msg': "文件已存在,通知是否覆盖"}, status=status.HTTP_400_BAD_REQUEST)
        except IOError:
            #其次判断是否存在在redis中,防止短时间内有相同的人传同一份文件,但前一个人并没有合并完成
            #如果都没有则创建新key hashcode放入redis,并通知前端可以上传
            try:
                r = get_redis() #获得redis连接实例,读者自行实现
            except Exception as e:
                logger.error("redis获取失败",repr(e))
            #例如:18115/app/home/202.jpg
            #获取redis中记录
            data = r.get(str(host_id)+full_path)
            if not data:
                #将key和hashcode放入redis
                #加上person_id 
                file_dict = {}
                file_dict['hash_code'] = hash_code
                file_dict['person_id'] = person_id
                r.set(str(host_id)+full_path, str(file_dict))
                return Response({'code': 1, 'data': full_path, 'msg': "文件不存在,可以上传"}, status=status.HTTP_200_OK)
            else:
                #如果redis中有记录,host_id+路径能够拦截在目标机器传输同一份文件,但也会把同一个人想断点传输拦截
                #所以在此key的value中保存带有person_id的临时文件夹路径信息
                #使用person_id将同一个人放行
                retrieved_data = r.get(str(host_id)+full_path)

                # 将获取到的数据转换为字典对象
                try:
                    retrieved_dict = eval(retrieved_data.decode())
                except Exception as e:
                    logger.error("redis获取目标文件hashcode解码失败",repr(e))
                if person_id ==retrieved_dict['person_id']:
                    return Response({'code': 1, 'data': full_path, 'msg': "断点续传判断通过"}, status=status.HTTP_200_OK)
                else:
                    return Response({'code': 2, 'data': full_path, 'msg': "该文件正在上传,请勿多次操作"}, status=status.HTTP_200_OK)

获取临时文件夹状态信息接口

get_file_dir_info 获取临时文件夹状态信息接口

class GetFileDirInfo(APIView):
    
    def post(self, request, *args, **kwargs):
        forms = json.loads(request.body)
        host_id = forms['host_id']
        person_id = forms['person_id']
        full_path = forms['full_path']
        err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id)#使用paramiko模块获取目标机器连接,这里读者自行用其他方式都可以
        if err:
            return Response({'code': -1, 'data': host_id, 'msg': "连接目标主机出错"}, status=status.HTTP_400_BAD_REQUEST)
        #分离路径名与文件名
        #示例:/home/app  2-2.jpg
        file_path,file_name = os.path.split(full_path)
        if file_path[-1] != "/":
                file_path += "/"
        #分离文件名与文件扩展名
        #示例:2-2 .jpg
        short_name,ext = os.path.splitext(file_name)
        #构造存放临时文件的文件夹名
        #示例:/home/app/2-2.jpg:12/   
        tmp_dir = full_path +":"+ str(person_id)
        #并将临时文件夹做hashcode 传入redis key为hashcode value为路径,后续删除文件夹操作多一层验证
        tmp = hashlib.md5()    
        tmp.update(tmp_dir.encode('utf-8'))
        hash_value = tmp.hexdigest()
        
        if tmp_dir[-1] != "/":
            tmp_dir += "/"
        try:
            #判断临时文件夹是否存在
            tmp_sftp._sftp.stat(tmp_dir)
        except Exception:
            try:
                print('创建临时文件夹')
                tmp_sftp._sftp.mkdir(tmp_dir)
                r = get_redis() #获取redis连接
                r.set(hash_value,tmp_dir)
                
            except Exception as e:
                logger.error("创建临时文件夹失败",e)
                tmp_sftp._sftp.close()
                return Response({'code': -1, 'data': "", 'msg': "创建临时文件夹失败"}, status=status.HTTP_400_BAD_REQUEST)
        #如果临时文件夹存在则在此文件夹下获取文件传输txt
        #初始化记录文件对象
        fobject = FileObject(file_name,0,0,tmp_dir,0)
        #示例:/home/app/2-2.jpg:12/2-2.txt  
        record_file = tmp_dir+short_name+".txt"
        try:
            #判断记录文件是否存在
            tmp_sftp._sftp.stat(record_file)
            tmp_load_file = tmp_sftp._sftp.open(record_file,'r')
            #如果记录文件夹存在则读取其已保存上传文件大小和块数
            for line in tmp_load_file.readlines():
                title,value = line.replace('\n','').split(':')
                print(title + ":"+ value)
                if title=='name':
                    fobject.name = value
                elif title=='size':
                    fobject.size = int(value)
                elif title =='chunk':
                    fobject.chunk = int(value)
            tmp_load_file.close()
        except IOError:
            print("记录文件不存在,创建新的记录文件,并初始化")
            store_f=tmp_sftp._sftp.open(record_file,'w+')
            store_f.writelines(['name:'+fobject.name+'\n','size:'+str(fobject.size)+'\n',
                                'chunk:'+str(fobject.chunk)+'\n'])
            print("创建记录文件成功")
            store_f.close()
            
            serializer = RecordFileSerializer(fobject)
            data = serializer.data
            data['dir_hashcode'] = hash_value
            
            
            #上传一个支持异步合并的merge.py文件
            command = 'command -v python'
            stdin, stdout, stderr = tmp_sftp._ssh.exec_command(command)
            output = stdout.read().decode().strip()
            try:
                print('判断merge在不在')
                tmp_sftp._sftp.stat(tmp_dir + 'merge.py')
            except Exception as e:
                if output:
                    print('目标机器已安装 Python')
                    current_path = os.path.dirname(os.path.abspath(__file__))
                    if current_path[-1] != "/":
                        current_path += "/"
                    try:
                        print('需要传输的地址',current_path+'merge.py')
                        print('目标地址',tmp_dir + 'merge.py')
                        tmp_sftp._sftp.put(current_path+'merge.py',tmp_dir + 'merge.py')
                    except Exception as e:
                        logger.error("发送本地合并文件失败",e)
                        tmp_sftp._sftp.close()
                        return Response({'code': -1, 'data': "", 'msg': "发送本地合并文件失败"}, status=status.HTTP_400_BAD_REQUEST)
                    #将merge文件发送
                else:
                    tmp_sftp._sftp.close()
                    return Response({'code': -1, 'data': "", 'msg': "目标机器未安装python"}, status=status.HTTP_400_BAD_REQUEST)
            tmp_sftp._sftp.close()
            return Response({'code': 0, 'data': data, 'msg': "创建新记录文件"}, status=status.HTTP_200_OK)
        
        file_list = tmp_sftp._sftp.listdir(tmp_dir)
        #文件夹下有一个txt记录文件,总文件块数需要减一
        fobject.num = len(file_list)-2
        chunk_list =[]
        for file in file_list:
            parts = file.rsplit(":", 1)  # 按冒号从右到左分割为最多两个部分

            if len(parts) == 2:
                last_part = parts[1]  # 获取最后一个部分
                try:
                    chunk_list.append(int(last_part))
                except Exception as e:
                    continue
                print("最后一个值:", last_part)
            else:
                print("未找到冒号,跳过")
        serializer = RecordFileSerializer(fobject)
        data = serializer.data
        data['dir_hashcode'] = hash_value
        data['chunk_list'] = chunk_list
        return Response({'code': 0, 'data': data, 'msg': "记录文件已存在"}, status=status.HTTP_200_OK) 

切片上传功能

file_chunk_upload 切片上传功能

class FileChunkUploadHandler():
    
    def __init__(self, redis_host,request=None,file_info=None,chunk_num=0,people_id =0):
        self.request = request
        self.file_info = request.FILES.get('file_info')
        # forms = json.loads(request.body)
        #数据块信息
        
        #第几个数据块
        self.chunk_num = request.data.get('chunk_num')
        self.full_path = request.data.get('full_path')
        self.person_id = request.data.get('person_id')
        self.redis_host = redis_host
    
    
    def upload_file(self):
        print("%s号文件块开始传输",self.chunk_num)
        full_path = urllib.parse.unquote(self.full_path)
        file_path,file_name = os.path.split(full_path)
        host_id = self.request.query_params.get('host_id', None)
        err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id)
        #分离扩展名和文件名
        #2-2  .jpg
        short_name,ext = os.path.splitext(file_name)
        if file_path[-1] != "/":
            file_path += "/"
        #存放临时文件的文件夹,在上一个获取临时文件夹状态接口就已创建,按照相同格式做一次校验。
        #/home/app/2-2.jpg:12
        tmp_dir = file_path+file_name+":"+ str(self.person_id)
        if tmp_dir[-1] != "/":
            tmp_dir += "/"
        #临时文件名
        #/home/app/2-2.jpg:12/12-2-2:1
        tmp_file_name = tmp_dir + str(self.person_id) + "-"  + short_name + ":" + str(self.chunk_num)
        
        try:
            tmp_sftp._sftp.stat(tmp_dir)
        except IOError as e:
            logger.error("存储临时文件夹不存在",e)
            return 0,repr(e)
        
        try:
            tmp_sftp._sftp.stat(tmp_file_name)
        except IOError:
            try:
                #文件块上传存放
                print('创建临时文件块',tmp_file_name)
                remote_file = tmp_sftp._sftp.open(tmp_file_name, mode="wb")
                my_bytes = self.file_info.read()     
                remote_file.write(my_bytes)
                remote_file.close()
            except Exception as e:
                logger.error("上传文件块失败",e)
                return 0,repr(e)
            print("写入文件完成:",tmp_file_name)
        record_file = tmp_dir+short_name+".txt"
            
        #更新上传记录文件信息
        fobject = FileObject(file_name,0,0)
        tmp_load_file = tmp_sftp._sftp.open(record_file,'r')
        for line in tmp_load_file.readlines():
            title,value = line.replace('\n','').split(':')
            print(title + ":"+ value)
            if title=='name':
                fobject.name = value
            elif title=='size':
                fobject.size = int(value)
            elif title =='chunk':
                fobject.chunk = int(value)
        tmp_load_file.close()
        
        try:
            tmp_sftp._sftp.stat(record_file)
            load_file = tmp_sftp._sftp.open(record_file,'w+')
            load_file.writelines(['name:'+fobject.name+'\n','size:'+str(fobject.size)+'\n',
                            'chunk:'+str(0)+'\n'])
        except Exception as e:
            logger.error(e)
        tmp_sftp.close()
        return 1,None

文件合并功能

bigfile_merge 文件合并与校验功能

class RemoteBigFileUploadHandler():
    
    def __init__(self, redis_host,request=None, chat=False,tmp_path=None,chunk_num=0,person_id=0):
        self.request = request
        forms = json.loads(request.body)
        self.chat = chat
        #数据块大小
        # self.chunk_size = forms['chunk_size']
        self.tmp_path = forms['tmp_path']
        #数据块数量
        self.chunk_num = forms['chunk_num']
        self.full_path = forms['full_path']
        self.person_id = forms['person_id']
        self.redis_host = redis_host
        
        if self.chat:
            self.current = 0
            self.datetime = datetime.datetime.now()
            #数据进度聊天室
            self.channel_layer = get_channel_layer()
            self.room = self.request.query_params.get('chat_room', None)
            self.total = self.request.query_params.get('total', None)
            if not self.room or not self.total:
                raise KeyError('params: chat_room and total is needed')
            
    
    def file_isavailable(self):
        
        tmp_path = self.tmp_path
        tmp_sftp = self.get_session()
        file_list = tmp_sftp._sftp.listdir(tmp_path)
        #文件夹下有一个txt记录文件,总文件块数需要减一
        true_num = len(file_list)-2
        if true_num!=self.chunk_num:
            tmp_sftp._sftp.close()
            return False
        else:
            tmp_sftp._sftp.close()
            return True
    
    def merge_file(self):
        #----------------------获得基础信息-----------------------------------
        host_id = self.request.query_params.get('host_id', None)
        full_path = urllib.parse.unquote(self.full_path)
        file_path,file_name = os.path.split(full_path)
        if file_path[-1] != "/":
            file_path += "/"
        #分离扩展名和文件名
        short_name,ext = os.path.splitext(file_name)
        
        tmp_path = self.tmp_path
        tmp_sftp = self.get_session()
        
        
        file_list = tmp_sftp._sftp.listdir(tmp_path)
        try:
            tmp_sftp._sftp.stat(full_path)
            print("文件已存在")
        except IOError:
            print("文件不存在")
            print("创建新的文件")
            new_f = tmp_sftp._sftp.open(full_path,mode='a')
            new_f.write("")
            new_f.close()
        #判断merge.py 文件是否存在
        if 'merge.py' in file_list:
            com_path = tmp_path + 'merge.py'
            command = f"nohup python {com_path} {self.person_id} {self.chunk_num} &"
            stdin, stdout, stderr = tmp_sftp._ssh.exec_command(command)
            merging = True
            
            while merging:
                if self.chat:
                    # websocket回显进度,获得当前文件字节数
                    self.current = tmp_sftp._sftp.stat(full_path).st_size
                    print('current',self.current)
                    print('total',self.total)
                    self.call_back()
                    if self.current >= int(self.total):
                        merging = False
                    time.sleep(1)
                else :
                    merging = False
        else:
        #创建或打开用户的存储文件
            
                
            # ----------------开始合并操作-------------------------------
            # rb+ 追加覆盖的方式打开用户需要存储文件,指针初始位置为文件开头
            remote_file = tmp_sftp._sftp.open(full_path,mode='rb+')
            
            #默认当前块数从1开始
            i=1
            #创建文件记录对象
            fobject = FileObject(file_name,0,0)
            print('创建记录对象成功')
            # 断点续传记录文件
            record_file = tmp_path+short_name+".txt"
            try :
                #如果记录文件存在,则存储文件大概率也存在
                #如果两个文件打开出错 则走excpt重新创建
                tmp_sftp._sftp.stat(record_file)
                upload_file = tmp_sftp._sftp.stat(full_path)
                
                #这里也是获取本地文件的大小
                temp_size = upload_file.st_size
                print("记录文件已存在")
                print("当前文件真实数据大小:",temp_size)
                # 读取文件,获得记录的已上传块数,以及大小总块数
                tmp_load_file = tmp_sftp._sftp.open(record_file,'r')
                for line in tmp_load_file.readlines():
                    title,value = line.replace('\n','').split(':')
                    print(title + ":"+ value)
                    if title=='name':
                        fobject.name = value
                    elif title=='size':
                        fobject.size = int(value)
                    elif title =='chunk':
                        fobject.chunk = int(value)
                tmp_load_file.close()
            except IOError:
                print("记录文件不存在,创建新的记录文件,并初始化")
                temp_size = 0
                store_f=tmp_sftp._sftp.open(record_file,'w+')
                store_f.writelines(['name:'+fobject.name+'\n','size:'+str(fobject.size)+'\n',
                                    'chunk:'+str(fobject.chunk)+'\n'])
                print("创建记录文件成功")
                store_f.close()
                    
            #    ----------------------------------开始保存数据--------------------------
            print("真实文件数据大小",temp_size)
            print("记录文件数据大小",fobject.size)
            if temp_size>=fobject.size:
                #如果实际文件大于记录文件,则表示在合并时中断过
                print("调整指针")
                remote_file.seek(fobject.size)
                i = fobject.chunk+1
            
            tmp_file_name = str(self.person_id) + "-"  + short_name + ":"
            try:
                while i<=self.chunk_num:
                    if  tmp_file_name + str(i) in file_list:
                        print('true')
                        #临时文件夹/临时文件名
                        file_path = tmp_path+tmp_file_name + str(i)
                        with tmp_sftp._sftp.file(file_path,'rb') as input_file:
                            print(file_path)
                            file_content = input_file.read()
                            remote_file.write(file_content)
                            fobject.size += len(file_content)
                    i = i+1
                    fobject.chunk += 1
                    if self.chat:
                        # websocket回显进度,获得当前文件字节数
                        self.current = tmp_sftp._sftp.stat(full_path).st_size
                        print('current',self.current)
                        print('total',self.total)
                        self.call_back()
                    #将已经写入的块和大小记录
                    load_file = tmp_sftp._sftp.open(record_file,'w+')
                    load_file.writelines(['name:'+fobject.name+'\n','size:'+str(fobject.size)+'\n',
                                            'chunk:'+str(fobject.chunk)+'\n'])
                    load_file.close()
                    
            except Exception as e:
                logger.error("合并文件异常",e)
                #----------------清除临时文件与关闭资源操作----------
                #删除临时文件夹下的所有文件
            remote_file.close()
        try:
            for name in file_list:
                del_path = tmp_path  + name
                tmp_sftp._sftp.remove(del_path)
                # 删除空临时文件夹
            tmp_sftp._sftp.rmdir(tmp_path)
        except Exception as e:
            logger.error("删除文件文件异常",e)
        
        #关闭操作文件资源    
            
        tmp_sftp._sftp.close()
        #删除redis中该文件临时记录,一个是文件夹hashcode,用于验证与删除目标文件夹
        #一个是host_id+路径,用于标记该路径下这个文件正在传输
        r = self.redis_host
        data = r.get(str(host_id)+full_path)
        if data:
            r.delete(str(host_id)+full_path)
        tmp = hashlib.md5()    
        tmp.update(tmp_path.encode('utf-8'))
        hash_value = tmp.hexdigest()
        r.delete(hash_value)
        print("文件合并结束")
        return 1,None

小结

本文提供了一个python大文件上传功能实现的思路,希望能够你带来一些思路和启发。

以上就是使用Python实现大文件切片上传及断点续传的方法的详细内容,更多关于Python大文件切片上传及断点续传的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文