tarfile打包下载

  • Post author:
  • Post category:其他


# 1 生成树形目录结构并打包
import os, zipfile, zipapp

def parse_father(archive_list, base_dir):

    # 遍历获取一共有多少个父级文件夹

​    for _ in archive_list:

        # 1. 创建父级文件夹

​        father_folder = os.path.join(base_dir, _.get("folder_name"))
​        if not os.path.exists(father_folder):
​            os.makedirs(father_folder)

        # 2. 父级文件夹下是否包括文件

​        table_name = _.get("data_table").get("data_sheet")
​        if table_name:
​            with open(os.path.join(father_folder, (table_name + ".txt")), "w") as f:
​                f.write(table_name)

        # 判断父级文件夹下是否包括子级文件夹

​        if _.get("children"):
​            parse_father(_.get("children"), father_folder)


def zip_file(path, dest_path):
    try:
        zf = zipfile.ZipFile(dest_path, "w", zipfile.ZIP_DEFLATED)
        for dir_path, dir_names, file_names in os.walk(path):
            # print("dir_path:", dir_path)
            # print("dir_names:",dir_names)
            # print("file_names---->:", file_names)
            f_path = dir_path.replace(path, "项目1")
            print("1.>>>", f_path)
            f_path = f_path and f_path + os.sep or ""
            print("2.>>>>>", f_path)
            for file in file_names:
                zf.write(os.path.join(dir_path, file), f_path + file)
        zf.close()
    except Exception as e:
        print(e)


def demo():
    base_dir = os.path.join(os.path.dirname(__file__), "oss_file/archive")
    if not os.path.exists(base_dir):
        os.makedirs(base_dir)
    archive_list = [{'id': 1, 'folder_name': "第1个文件夹", 'data_table': {'type': "origin", 'data_sheet': "表1"},
                     'file_name': ["1", "2"], 'children': [
            {'id': 2, 'folder_name': "第2个文件夹", 'data_table': {'type': "relation", 'data_sheet': "表2"},
             'file_name': ["3", "4"], 'children': [
                {'id': 3, 'folder_name': "第3个文件夹", 'data_table': {'type': "origin", 'data_sheet': "表3"},
                 'file_name': ["1", "2"], 'children': []}
            ]}
        ]},
                    {'id': 4, 'folder_name': "第4个文件夹", 'data_table': {'type': "origin", 'data_sheet': "表4"},
                     'file_name': ["1", "2"], 'children': [
                        {'id': 5, 'folder_name': "第5个文件夹", 'data_table': {'type': "relation", 'data_sheet': "表5"},
                         'file_name': ["3", "4"], 'children': [
                            {'id': 6, 'folder_name': "第6个文件夹", 'data_table': {'type': "origin", 'data_sheet': "表6"},
                             'file_name': ["1", "2"], 'children': []}
                        ]}
                    ]}
                    ]

    project_name = "项目1"
    new_dir = os.path.join(base_dir, project_name)
    parse_father(archive_list, new_dir)
    
    tar_name = "项目1.zip"
    tar_dir = os.path.join(base_dir, tar_name)
    zip_file(new_dir, tar_dir)
    
    # os.os.system(f"rm -rf {new_dir}")
    # os.remove(tar_dir)
    return {"ok":"aa"}


if __name__ == '__main__':
    demo()


# 2--- 在fastapi中实现直接下载文件的话,需要返回文件流,所以需要改
import os
import zipfile
from io import BytesIO

from exceptions.response_exceptions import ShowError
from settings.config import PROJECT_DIR_PATH, TARFILE_PATH
from settings.log import logger


async def parse_archive_tree(archive_list, base_dir):
    # 遍历获取一共有多少个父级文件夹
    for _ in archive_list:
        father_folder = os.path.join(base_dir, _.get("folder_name"))
        if not os.path.exists(father_folder):
            os.makedirs(father_folder)
        table_name = _.get("data_table").get("data_sheet")
        if table_name:
            # TODO 生成表并保存
            with open(os.path.join(father_folder, (table_name + ".txt")), "w") as f:
                f.write(table_name)
        if _.get("children"):
            await parse_archive_tree(_.get("children"), father_folder)


async def _zip_file(path, project_name):
    try:
        memory_file = BytesIO()
        with zipfile.ZipFile(memory_file, "w", zipfile.ZIP_DEFLATED) as zf:
            for dir_path, dir_names, file_names in os.walk(path):
                f_path = dir_path.replace(path, project_name) # 将当前目录替换为project_name,即以当前目录为相对目录,如果当前目录下面还存在文件夹,则f_path为 【/子目录】
                f_path = f_path and f_path + os.sep or ""
                for file in file_names:
                    zf.write(os.path.join(dir_path, file), f_path + file)
        memory_file.seek(0)
        return memory_file
    except Exception as e:
        logger.warning(e)
        raise ShowError(f"打包文件错误")


async def zip_file(archive_list, project_name):
    base_dir = os.path.join(PROJECT_DIR_PATH, TARFILE_PATH)
    if not os.path.exists(base_dir):
        os.makedirs(base_dir)

    tree_father_dir = os.path.join(base_dir, project_name)
    await parse_archive_tree(archive_list, tree_father_dir)

    _stream = await _zip_file(tree_father_dir, project_name)
    os.system(f"rm -rf {tree_father_dir}")
    return _stream



版权声明:本文为myli_binbin原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。