大地主的知识库
专注于收集、整理和分享各种各样的知识信息

夸克网盘文件上传、转存、批量分享接口代码

反复修改 测试了一个星期终于搞定,具体使用场景,有需要的自然明白。

 

"""
夸克网盘API封装模块

本模块提供了夸克网盘API的完整封装,包含以下主要功能:
1. 文件管理(QuarkFileManager):
   - 获取文件和文件夹列表
   - 搜索文件
   - 重命名文件和文件夹
   - 文件夹导航
   - 文件上传功能(支持本地文件和远程URL,含秒传)

2. 分享管理(QuarkShareManager):
   - 创建分享链接(支持设置有效期和密码)
   - 获取分享链接信息
   - 获取分享历史列表

3. 转存功能(QuarkTransferManager):
   - 支持转存他人分享的文件到自己网盘
   - 支持带密码的分享链接
   - 支持选择保存位置
   - 支持任务进度查询

使用方法:
1. 初始化:
   file_manager = QuarkFileManager(cookie)
   share_manager = QuarkShareManager(cookie)
   transfer_manager = QuarkTransferManager(cookie)

2. 调用示例:
   # 获取文件列表
   files = file_manager.get_file_list()
   
   # 创建分享
   share_info = share_manager.create_share_link(file_id, title, expire_time=7, password="123456")
   
   # 重命名文件
   success = file_manager.rename_file(file_id, "new_name.txt")
   
   # 转存他人分享
   result = transfer_manager.save_shared_files(share_url, to_folder_id="root", passcode="123456")
   
   # 上传文件(本地或远程)
   result = file_manager.upload_file("path/to/file.jpg", folder_id="root")
   result = file_manager.upload_file("https://example.com/file.jpg", folder_id="root", from_url=True)

注意事项:
- 需要提供有效的夸克网盘cookie才能正常使用
- cookie获取方法:登录夸克网盘网页版后从浏览器开发者工具中获取
- 所有API操作都依赖于夸克网盘的官方接口,如遇接口变更可能需要更新

作者:大地主
版本:1.1.0 (更新上传功能)
创建日期:2025年
"""
print("tempfile imported")
import requests
import json
import time
import re
import urllib.parse
from typing import Optional, Dict, Any, List
import os
import hashlib
import mimetypes
import base64
from datetime import datetime
import traceback
import tempfile  # 确保这一行存在


class QuarkBase:
    """夸克网盘API基础类"""
    
    def __init__(self, cookie):
        """
        初始化基础类
        :param cookie: 登录夸克网盘后的cookie
        """
        self.cookie = cookie
        self.headers = {
            'Cookie': self.cookie,
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
            'Accept': 'application/json, text/plain, */*',
            'Accept-Encoding': 'gzip, deflate, br, zstd',
            'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
            'Cache-Control': 'no-cache',
            'Content-Type': 'application/json', # Default Content-Type for Quark API calls
            'DNT': '1',
            'Origin': 'https://pan.quark.cn',
            'Pragma': 'no-cache',
            'Priority': 'u=1, i',
            'Referer': 'https://pan.quark.cn/',
            'Sec-Ch-Ua': '"Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"',
            'Sec-Ch-Ua-Mobile': '?0',
            'Sec-Ch-Ua-Platform': '"Windows"',
            'Sec-Fetch-Dest': 'empty',
            'Sec-Fetch-Mode': 'cors',
            'Sec-Fetch-Site': 'same-site', # Changed from same-origin to same-site based on newer script
            # x-request-id might be needed sometimes, but let's omit for now
        }
        self.base_params = {
            'pr': 'ucpro',
            'fr': 'pc',
            'uc_param_str': '' # Ensure this is present, newer script used it
        }
    
    def _poll_task_status(self, task_id, max_retries=10, retry_interval=1):
        """
        轮询任务状态 (保持不变)
        :param task_id: 任务ID
        :param max_retries: 最大重试次数
        :param retry_interval: 重试间隔(秒)
        :return: 任务数据
        """
        url = 'https://drive-pc.quark.cn/1/clouddrive/task'
        
        for i in range(max_retries):
            try:
                params = self.base_params.copy()
                params.update({
                    # 'uc_param_str': '', # Already in base_params
                    'task_id': task_id,
                    'retry_index': i
                })
                
                response = requests.get(url, headers=self.headers, params=params)
                result = response.json()
                
                if result.get('code') == 0:
                    task_data = result.get('data', {})
                    status = task_data.get('status')
                    
                    # 状态码:1=进行中,2=成功,3=失败
                    if status == 2:
                        print(f"任务 {task_id} 成功.")
                        return task_data
                    elif status == 3:
                        print(f"任务 {task_id} 失败: {task_data.get('message', '未知错误')}")
                        return None
                    else:
                        # 任务进行中,显示进度
                        progress = task_data.get('progress', 0)
                        print(f"任务 {task_id} 进行中... {progress}%")
                
                time.sleep(retry_interval)
                
                # 根据返回的元数据调整重试间隔
                metadata = result.get('metadata', {})
                if metadata.get('tq_gap'):
                    retry_interval = max(1, metadata.get('tq_gap') / 1000)
                    
            except Exception as e:
                print(f"查询任务 {task_id} 状态出错: {e}")
                time.sleep(retry_interval)
        
        print(f"任务 {task_id} 查询超时.")
        return None


class QuarkFileManager(QuarkBase):
    """夸克网盘文件管理类 (包含更新后的上传功能)"""

    def __init__(self, cookie):
        super().__init__(cookie)
        self.callback = {} # Initialize callback dict for uploads

    # --- Existing File Management Methods (Unchanged) ---
    
    def get_file_list(self, folder_id='root'):
        """
        获取文件列表,支持自动翻页获取所有文件 (保持不变)
        :param folder_id: 文件夹ID,默认为根目录
        :return: 文件列表
        """
        url = 'https://drive-pc.quark.cn/1/clouddrive/file/sort'
        all_files = []
        page = 1
        page_size = 50  # 每页数量
        total = None  # 用于存储总文件数
        
        while True:
            params = self.base_params.copy()
            params.update({
                'pdir_fid': folder_id if folder_id != 'root' else '0',
                '_page': str(page),
                '_size': str(page_size),
                '_fetch_total': '1',
                '_fetch_sub_dirs': '0',
                '_sort': 'file_type:asc,updated_at:desc'
            })
            
            try:
                response = requests.get(url, headers=self.headers, params=params)
                response.raise_for_status()
                result = response.json()
                
                if result.get('code') != 0:
                    print(f"获取文件列表失败: {result.get('message', '未知错误')}")
                    return all_files
                
                current_files = result.get('data', {}).get('list', [])
                if not current_files:
                    break 
                    
                all_files.extend(current_files)
                
                metadata = result.get('data', {}).get('metadata', {})
                if total is None:
                    total = metadata.get('_total')
                    if total is not None:
                         print(f"文件夹共有 {total} 个文件/文件夹")
                    else:
                         print("无法获取总数信息,继续获取...")


                if total is not None and len(all_files) >= total:
                    break
                
                print(f"已获取 {len(all_files)}{f'/{total}' if total is not None else ''} 个项目...")
                page += 1 
                
            except Exception as e:
                print(f"获取文件列表出错: {e}")
                return all_files
        
        print(f"文件列表获取完成,共 {len(all_files)} 个项目")
        return all_files
    
    def get_folder_list(self, parent_id='root'):
        """
        获取文件夹列表 (保持不变)
        :param parent_id: 父文件夹ID,默认为根目录
        :return: 文件夹列表
        """
        file_list = self.get_file_list(parent_id)
        # 过滤出文件夹类型
        return [item for item in file_list if item.get('file_type') == 'folder']
    
    def search_files(self, keyword, folder_id='root'):
        """
        搜索文件 (保持不变)
        :param keyword: 搜索关键词
        :param folder_id: 搜索范围的文件夹ID,默认为根目录
        :return: 搜索结果列表
        """
        url = 'https://drive-pc.quark.cn/1/clouddrive/file/search'
        params = self.base_params.copy()
        params['pdir_fid'] = folder_id if folder_id != 'root' else '0' # Ensure correct pdir_fid for root
        params['query'] = keyword
        
        try:
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            result = response.json()
            
            if result.get('code') != 0:
                print(f"搜索文件失败: {result.get('message', '未知错误')}")
                return []
                
            return result.get('data', {}).get('list', [])
        except Exception as e:
            print(f"搜索文件出错: {e}")
            return []

    def rename_file(self, file_id, new_name):
        """
        重命名文件或文件夹 (保持不变)
        :param file_id: 文件或文件夹ID
        :param new_name: 新名称
        :return: 是否成功
        """
        url = 'https://drive-pc.quark.cn/1/clouddrive/file/rename'
        params = self.base_params.copy()
        # params['uc_param_str'] = '' # Already in base_params
        
        data = {
            "fid": file_id,
            "file_name": new_name
        }
        
        try:
            # Ensure Content-Type is application/json for this POST request
            headers = self.headers.copy()
            headers['Content-Type'] = 'application/json'
            response = requests.post(url, headers=headers, params=params, json=data)
            response.raise_for_status()
            result = response.json()
            
            if result.get('code') != 0:
                print(f"重命名失败: {result.get('message', '未知错误')}")
                return False
            
            print(f"重命名成功: {new_name}")
            return True
        except Exception as e:
            print(f"重命名出错: {e}")
            return False
    
    def get_file_info(self, file_id: str) -> Optional[dict]:
        """获取文件或文件夹的详细信息 (保持不变)"""
        url = 'https://drive-pc.quark.cn/1/clouddrive/file/info'
        params = self.base_params.copy()
        params['fid'] = file_id
        
        try:
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            result = response.json()
            
            if result.get('code') == 0 and result.get('data'):
                return result['data']
            else:
                 print(f"获取文件信息失败: Code {result.get('code')}, Message: {result.get('message')}")
                 return None
        except Exception as e:
            print(f"获取文件信息失败: {e}")
            return None
            
    def _format_size(self, size_bytes):
        """格式化文件大小显示 (保持不变)"""
        if size_bytes is None or size_bytes < 0: return "N/A"
        if size_bytes < 1024:
            return f"{size_bytes} B"
        elif size_bytes < 1024 * 1024:
            return f"{size_bytes/1024:.2f} KB"
        elif size_bytes < 1024 * 1024 * 1024:
            return f"{size_bytes/(1024*1024):.2f} MB"
        else:
            return f"{size_bytes/(1024*1024*1024):.2f} GB"

    # --- New/Updated Upload Methods ---

    def _compute_file_hash(self, file_path):
        """计算文件的MD5和SHA1哈希值"""
        md5_hash = hashlib.md5()
        sha1_hash = hashlib.sha1()
        try:
            with open(file_path, 'rb') as f:
                while chunk := f.read(8192 * 1024): # Read in larger chunks (8MB)
                    md5_hash.update(chunk)
                    sha1_hash.update(chunk)
            return md5_hash.hexdigest(), sha1_hash.hexdigest()
        except Exception as e:
            print(f"计算文件哈希时出错 {file_path}: {e}")
            return None, None

    def _check_fast_upload(self, task_id, md5, sha1):
        """检查是否可以秒传"""
        if not task_id or not md5 or not sha1:
            return None
        url = 'https://drive-pc.quark.cn/1/clouddrive/file/update/hash'
        data = {"task_id": task_id, "md5": md5, "sha1": sha1}
        try:
            # Ensure Content-Type is application/json
            headers = self.headers.copy()
            headers['Content-Type'] = 'application/json'
            response = requests.post(url, headers=headers, params=self.base_params, json=data)
            response.raise_for_status()
            result = response.json()
            # print(f"秒传检查结果: {result}") # Debug
            # Check code == 0 and data.finish == True explicitly
            if result.get('code') == 0 and result.get('data', {}).get('finish') is True:
                print("检测到文件已存在,执行秒传...")
                return result.get('data')
        except Exception as e:
            print(f"检查秒传时出错: {e}")
        return None

    def _get_upload_auth(self, task_id, method, content_type, upload_url, upload_id, part_number=None, content_md5=None, callback=None):
        """获取OSS上传授权签名"""
        auth_url = 'https://drive-pc.quark.cn/1/clouddrive/file/upload/auth'
        current_time_gmt = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
        
        try:
            parsed_url = urllib.parse.urlparse(upload_url)
            resource = parsed_url.path # Resource should only be the path part
            
            query_params = {}
            if upload_id: query_params['uploadId'] = upload_id
            if part_number is not None: query_params['partNumber'] = str(part_number)

            # Sort query parameters alphabetically for canonical resource string
            sorted_query = "&".join(f"{k}={v}" for k, v in sorted(query_params.items()))
            if sorted_query:
                resource += f"?{sorted_query}"

            # Construct the string to sign for OSS V1 signature
            oss_headers_list = [
                f"x-oss-date:{current_time_gmt}",
                # User agent string might vary, use the one observed in browser or provided script
                "x-oss-user-agent:aliyun-sdk-js/6.18.0 Chrome/134.0.0.0 on Windows 10 64-bit" 
            ]
            
            callback_value = ""
            if callback and callback.get('callbackUrl') and callback.get('callbackBody'):
                 # Callback needs to be JSON string -> base64 encoded
                 try:
                     callback_str = json.dumps(callback, separators=(',', ':')) # Compact JSON
                     callback_value = base64.b64encode(callback_str.encode('utf-8')).decode('utf-8')
                     oss_headers_list.append(f"x-oss-callback:{callback_value}")
                 except Exception as cb_e:
                     print(f"警告:无法编码 callback: {cb_e}")
            
            oss_headers_list.sort() # Sort headers alphabetically by key
            canonicalized_oss_headers = "\n".join(oss_headers_list) + "\n" if oss_headers_list else ""

            # StringToSign format: METHOD\nContent-MD5\nContent-Type\nDate\nCanonicalizedOSSHeaders\nCanonicalizedResource
            auth_meta = f"{method.upper()}\n" \
                        f"{content_md5 or ''}\n" \
                        f"{content_type or ''}\n" \
                        f"{current_time_gmt}\n" \
                        f"{canonicalized_oss_headers}" \
                        f"{resource}"

            # print(f"\n--- Auth Meta String --- \n{auth_meta}\n----------------------\n") # Debug

            auth_data = {"task_id": task_id, "auth_meta": auth_meta}
            headers = self.headers.copy()
            headers['Content-Type'] = 'application/json'
            response = requests.post(auth_url, headers=headers, params=self.base_params, json=auth_data)
            response.raise_for_status()
            result = response.json()

            # print(f"获取授权结果: {result}") # Debug

            if result.get('code') == 0 and result.get('data', {}).get('auth_key'):
                return result['data']['auth_key'], current_time_gmt
            else:
                print(f"获取上传授权失败: Code {result.get('code')}, Message: {result.get('message')}")
                return None, None
        except Exception as e:
             print(f"获取上传授权时发生异常: {e}")
             traceback.print_exc()
             return None, None

    def _complete_multipart_upload(self, task_id, complete_oss_url, upload_id, parts):
        """完成OSS多部分上传"""
        # The complete_oss_url already includes the object key
        # Example: https://bucket.oss-region.aliyuncs.com/object_key
        complete_url = f"{complete_oss_url}?uploadId={upload_id}"
        
        # Construct XML body for completion
        xml_body = '<?xml version="1.0" encoding="UTF-8"?>\n<CompleteMultipartUpload>\n'
        for part in parts:
            # Ensure ETag is not enclosed in double quotes within the XML tags
            etag = part["etag"].strip('"') 
            xml_body += f'  <Part>\n    <PartNumber>{part["part_number"]}</PartNumber>\n    <ETag>"{etag}"</ETag>\n  </Part>\n'
        xml_body += '</CompleteMultipartUpload>'
        
        # print(f"\n--- Complete XML Body ---\n{xml_body}\n-------------------------\n") # Debug

        try:
            xml_bytes = xml_body.encode('utf-8')
            content_md5 = base64.b64encode(hashlib.md5(xml_bytes).digest()).decode('utf-8')
            content_type = "application/xml"
            
            # Get authorization specifically for the POST complete request
            # Note: Pass the correct complete_oss_url (base OSS url + object key)
            auth_key, oss_date = self._get_upload_auth(task_id, "POST", content_type, complete_oss_url, upload_id, content_md5=content_md5, callback=self.callback)
            
            if not auth_key:
                print("获取 CompleteMultipartUpload 授权失败")
                return False

            headers = {
                'Content-Type': content_type,
                'Content-MD5': content_md5,
                'Authorization': auth_key, # Use the obtained auth key
                'x-oss-date': oss_date, # Use the date associated with the auth key
                'x-oss-user-agent': 'aliyun-sdk-js/6.18.0 Chrome/134.0.0.0 on Windows 10 64-bit',
                'Host': urllib.parse.urlparse(complete_oss_url).hostname,
            }
            
            # Add callback header if callback info exists
            if self.callback and self.callback.get('callbackUrl') and self.callback.get('callbackBody'):
                 try:
                     callback_str = json.dumps(self.callback, separators=(',', ':'))
                     callback_value = base64.b64encode(callback_str.encode('utf-8')).decode('utf-8')
                     headers['x-oss-callback'] = callback_value
                 except Exception as cb_e:
                     print(f"警告:无法编码并添加 callback header 到 CompleteUpload: {cb_e}")


            # print(f"\n--- Complete Headers ---\n{headers}\n----------------------\n") # Debug

            # Send the POST request to the OSS endpoint
            response = requests.post(complete_url, headers=headers, data=xml_bytes, timeout=120) # Increased timeout
            
            # print(f"CompleteMultipartUpload 响应状态: {response.status_code}") # Debug
            # print(f"CompleteMultipartUpload 响应头: {response.headers}") # Debug
            # print(f"CompleteMultipartUpload 响应体: {response.text[:500]}") # Debug

            # OSS returns 200 OK on success. It might return JSON if callback succeeded.
            # If callback is used, success depends on the callback response forwarded by OSS.
            if response.status_code == 200:
                 # Check response body if it's JSON (likely from callback)
                 try:
                      callback_response = response.json()
                      # Assuming callback returns a structure like Quark's API
                      if callback_response.get('code') == 0:
                           print("OSS CompleteMultipartUpload (via callback) 成功.")
                           return True
                      else:
                           print(f"OSS CompleteMultipartUpload 成功, 但Callback失败: {callback_response.get('message', '未知回调错误')}")
                           # Consider this success from OSS perspective, Quark finish call will likely fail later
                           return True 
                 except json.JSONDecodeError:
                      # Not JSON, assume standard OSS success (no callback or callback didn't return JSON)
                      print("OSS CompleteMultipartUpload 成功.")
                      return True
            else:
                 print(f"OSS CompleteMultipartUpload 失败. Status: {response.status_code}, Response: {response.text[:500]}")
                 return False

        except Exception as e:
            print(f"完成 OSS 分片上传时发生异常: {e}")
            traceback.print_exc()
            return False

    def _finish_upload(self, task_id, obj_key):
        """通知夸克服务器上传已完成"""
        url = 'https://drive-pc.quark.cn/1/clouddrive/file/upload/finish'
        data = {"task_id": task_id, "obj_key": obj_key}
        max_retries = 5
        retry_delay = 2

        for attempt in range(max_retries):
            try:
                print(f"通知服务器完成上传 (尝试 {attempt + 1}/{max_retries})... Task ID: {task_id}")
                headers = self.headers.copy()
                headers['Content-Type'] = 'application/json'
                response = requests.post(url, headers=headers, params=self.base_params, json=data)
                response.raise_for_status()
                result = response.json()
                
                # print(f"完成上传响应: {result}") # Debug

                # Check code == 0 and data.finish == True explicitly
                if result.get('code') == 0 and result.get('data', {}).get('finish') is True:
                    print("服务器确认上传完成.")
                    return result.get('data')
                elif result.get('code') == 0 and result.get('data', {}).get('finish') is False:
                     print(f"服务器报告上传未完成 (Code 0, Finish False). Message: {result.get('message')}. 等待重试...")
                else: # Handle other non-zero codes
                     print(f"通知服务器完成上传失败: Code {result.get('code')}, Message: {result.get('message')}. 等待重试...")

            except Exception as e:
                print(f"通知服务器完成上传时出错: {e}. 等待重试...")
            
            if attempt < max_retries - 1:
                 time.sleep(retry_delay)
                 retry_delay = min(retry_delay * 2, 10) # Exponential backoff up to 10s

        print(f"通知服务器完成上传失败 (超过最大重试次数). Task ID: {task_id}")
        return None

    def upload_file(self, file_path_or_url, folder_id='root', new_filename=None, from_url=False):
        """
        上传文件到夸克网盘(主入口)
        :param file_path_or_url: 本地文件路径或远程文件URL
        :param folder_id: 目标文件夹ID,默认为根目录 ('root' or '0')
        :param new_filename: 上传后的文件名,默认使用原文件名
        :param from_url: 是否从URL上传,默认为False
        :return: 上传结果字典,包含 success, file_id, file_name, size, message
        """
        pdir_fid = folder_id if folder_id != 'root' else "0" # Use "0" for root

        if from_url or file_path_or_url.startswith(('http://', 'https://')):
            print(f"开始从 URL 上传: {file_path_or_url}")
            return self._upload_from_url_internal(file_path_or_url, pdir_fid, new_filename)
        else:
            print(f"开始从本地文件上传: {file_path_or_url}")
            return self._upload_local_file_internal(file_path_or_url, pdir_fid, new_filename)

    def _upload_local_file_internal(self, file_path, pdir_fid, new_filename=None):
        """内部方法:处理本地文件上传"""
        if not os.path.exists(file_path):
            return {"success": False, "message": f"本地文件不存在: {file_path}"}

        try:
            file_size = os.path.getsize(file_path)
            file_name = new_filename or os.path.basename(file_path)
            content_type, _ = mimetypes.guess_type(file_path)
            content_type = content_type or 'application/octet-stream' # Default if guess fails
            file_mtime = int(os.path.getmtime(file_path) * 1000)
            file_ctime = int(os.path.getctime(file_path) * 1000)

            print(f"准备上传: {file_name} ({self._format_size(file_size)}), 类型: {content_type}")

            # 1. 计算哈希 (MD5, SHA1)
            print("正在计算文件哈希...")
            md5, sha1 = self._compute_file_hash(file_path)
            if not md5 or not sha1:
                 return {"success": False, "message": "计算文件哈希失败"}
            print(f"文件哈希计算完成. MD5: {md5[:8]}..., SHA1: {sha1[:8]}...")
            
            # 2. 预上传 (Pre-upload)
            print("正在请求预上传...")
            pre_upload_url = 'https://drive-pc.quark.cn/1/clouddrive/file/upload/pre'
            pre_upload_payload = {
                "ccp_hash_update": True,
                "parallel_upload": True,
                "pdir_fid": pdir_fid,
                "dir_name": "", # Keep empty for file upload
                "file_name": file_name,
                "format_type": content_type,
                "l_created_at": file_ctime,
                "l_updated_at": file_mtime,
                "size": file_size
            }
            headers = self.headers.copy()
            headers['Content-Type'] = 'application/json'
            response = requests.post(pre_upload_url, headers=headers, params=self.base_params, json=pre_upload_payload)
            response.raise_for_status()
            pre_upload_result = response.json()

            if pre_upload_result.get('code') != 0:
                return {"success": False, "message": f"预上传失败: {pre_upload_result.get('message', '未知错误')}"}

            pre_data = pre_upload_result.get('data', {})
            metadata = pre_upload_result.get('metadata', {})
            task_id = pre_data.get('task_id')
            upload_id = pre_data.get('upload_id')
            obj_key = pre_data.get('obj_key')
            # upload_url = pre_data.get('upload_url') # Seems deprecated or unused in new flow
            fid = pre_data.get('fid') # File ID might be assigned early
            bucket = pre_data.get('bucket')
            region = pre_data.get('region', 'oss-cn-zhangjiakou') # Default or from response
            self.callback = pre_data.get('callback', {}) # Store callback info
            part_size = metadata.get('part_size', 4 * 1024 * 1024) # Default 4MB
            
            if not task_id or not upload_id or not obj_key or not bucket:
                 return {"success": False, "message": "预上传响应缺少必要信息 (task_id, upload_id, obj_key, bucket)"}

            # Construct the base OSS URL for this upload
            complete_oss_url = f"https://{bucket}.{region}.aliyuncs.com/{obj_key}"
            print(f"预上传成功. TaskID: {task_id}, UploadID: {upload_id}, FID: {fid}")
            print(f"目标OSS URL: {complete_oss_url}")
            print(f"分片大小: {self._format_size(part_size)}")

            # 3. 检查秒传 (Fast Upload Check)
            fast_upload_data = self._check_fast_upload(task_id, md5, sha1)
            if fast_upload_data:
                #秒传成功后,需要调用finish接口确认
                finish_result = self._finish_upload(task_id, obj_key)
                if finish_result:
                     final_fid = finish_result.get('fid', fid) # Use fid from finish if available
                     print(f"秒传成功! 文件ID: {final_fid}")
                     return {"success": True, "file_id": final_fid, "file_name": file_name, "size": file_size, "message": "秒传成功"}
                else:
                     # If finish fails after fast upload check returned success, it's ambiguous
                     print("秒传检查成功,但完成上传确认失败。")
                     return {"success": False, "message": "秒传确认失败", "file_id": fid, "task_id": task_id}


            # 4. 分片上传 (Multipart Upload)
            print("开始分片上传...")
            total_parts = (file_size + part_size - 1) // part_size if file_size > 0 else 1
            parts = []
            upload_start_time = time.time()
            uploaded_size = 0

            with open(file_path, 'rb') as f:
                for part_number in range(1, total_parts + 1):
                    part_data = f.read(part_size)
                    if not part_data: break # Should not happen if total_parts is correct

                    part_md5_b64 = base64.b64encode(hashlib.md5(part_data).digest()).decode('utf-8')
                    part_url = f"{complete_oss_url}?partNumber={part_number}&uploadId={upload_id}"
                    
                    # Retry logic for part upload
                    max_part_retries = 3
                    for attempt in range(max_part_retries):
                         try:
                              # Get auth for this specific part
                              auth_key, oss_date = self._get_upload_auth(task_id, "PUT", content_type, complete_oss_url, upload_id, part_number, part_md5_b64)
                              if not auth_key:
                                   # No retries if auth fails
                                   print(f"获取分片 {part_number} 授权失败,终止上传。")
                                   # TODO: Abort multipart upload?
                                   return {"success": False, "message": f"获取分片 {part_number} 授权失败"}
                              
                              part_headers = {
                                   # Content-Type might need to be application/octet-stream for OSS PUT Part
                                   'Content-Type': content_type, # Or 'application/octet-stream' ? Test this. Let's use original.
                                   'Content-Length': str(len(part_data)),
                                   'Content-MD5': part_md5_b64,
                                   'Authorization': auth_key,
                                   'x-oss-date': oss_date,
                                   'x-oss-user-agent': 'aliyun-sdk-js/6.18.0 Chrome/134.0.0.0 on Windows 10 64-bit',
                                   'Host': f"{bucket}.{region}.aliyuncs.com"
                              }

                              # print(f"\n--- Part {part_number} Headers ---\n{part_headers}\n----------------------\n") # Debug

                              part_response = requests.put(part_url, headers=part_headers, data=part_data, timeout=180) # Increased timeout
                              part_response.raise_for_status() # Check for HTTP errors (4xx, 5xx)

                              etag = part_response.headers.get('ETag')
                              if not etag:
                                   raise ValueError("响应头中未找到 ETag")
                              
                              parts.append({"part_number": part_number, "etag": etag.strip('"')})
                              uploaded_size += len(part_data)
                              elapsed_time = time.time() - upload_start_time
                              speed = uploaded_size / elapsed_time if elapsed_time > 0 else 0
                              progress = (uploaded_size / file_size) * 100 if file_size > 0 else 100
                              
                              print(f"\r上传中: {progress:.2f}% ({self._format_size(uploaded_size)}/{self._format_size(file_size)}) | "
                                    f"速度: {self._format_size(speed)}/s | 分片 {part_number}/{total_parts} OK", end="")
                              break # Success, exit retry loop

                         except Exception as part_e:
                              print(f"\n分片 {part_number} 上传失败 (尝试 {attempt + 1}/{max_part_retries}): {part_e}")
                              if attempt == max_part_retries - 1:
                                   print(f"\n分片 {part_number} 上传失败次数过多,终止上传。")
                                   # TODO: Abort multipart upload?
                                   return {"success": False, "message": f"分片 {part_number} 上传失败"}
                              time.sleep(2 ** attempt) # Exponential backoff before retrying part
            
            print("\n所有分片上传至OSS完成.")

            # 5. 完成 OSS 多部分上传 (Complete Multipart Upload)
            print("正在请求合并OSS分片...")
            if not self._complete_multipart_upload(task_id, complete_oss_url, upload_id, parts):
                # TODO: Abort multipart upload?
                return {"success": False, "message": "OSS CompleteMultipartUpload 失败"}

            # 6. 通知夸克服务器上传完成 (Finish Upload)
            print("正在通知服务器上传完成...")
            finish_result = self._finish_upload(task_id, obj_key)
            if not finish_result:
                 # Upload likely succeeded on OSS, but Quark doesn't know. File might appear later or be orphaned.
                return {"success": False, "message": "服务器完成确认失败", "file_id": fid, "task_id": task_id}

            final_fid = finish_result.get('fid', fid) # Use fid from finish response if available
            print(f"上传成功! 文件名: {file_name}, 文件ID: {final_fid}")
            return {"success": True, "file_id": final_fid, "file_name": file_name, "size": file_size, "message": "上传成功"}

        except Exception as e:
            print(f"\n上传本地文件时发生未处理异常: {e}")
            traceback.print_exc()
            return {"success": False, "message": f"上传失败: {e}"}


    def _upload_from_url_internal(self, url: str, pdir_fid: str, new_filename: Optional[str] = None) -> dict:
            """
            内部方法:处理 URL 上传,先下载到本地临时文件,再调用本地上传逻辑
            :param url: 远程文件 URL
            :param pdir_fid: 目标文件夹 ID
            :param new_filename: 上传后的文件名,默认从 URL 或响应头推断
            :return: 上传结果字典
            """
            try:
                print(f"开始处理远程文件上传: {url}")

                # 1. 获取文件元数据
                print(f"正在获取 URL 文件信息: {url}")
                head_response = requests.head(url, allow_redirects=True, timeout=30)
                head_response.raise_for_status()
                
                final_url = head_response.url
                file_size_str = head_response.headers.get('Content-Length')
                file_size = int(file_size_str) if file_size_str and file_size_str.isdigit() else None
                content_type = head_response.headers.get('Content-Type', 'application/octet-stream')

                # 确定文件名
                if not new_filename:
                    content_disposition = head_response.headers.get('Content-Disposition')
                    if content_disposition:
                        filename_match = re.search(r'filename\*?=(?:UTF-8\'\')?["\']?([^"\';]+)["\']?', content_disposition, re.IGNORECASE)
                        if filename_match:
                            new_filename = urllib.parse.unquote(filename_match.group(1))
                    if not new_filename:
                        parsed_url_path = urllib.parse.urlparse(final_url).path
                        new_filename = os.path.basename(parsed_url_path) or f"download_{int(time.time())}"
                    ext = mimetypes.guess_extension(content_type.split(';')[0])
                    if ext and not new_filename.endswith(ext):
                        new_filename += ext

                new_filename = re.sub(r'[\\/*?:"<>|]', '_', new_filename)
                print(f"最终文件名: {new_filename}")
                print(f"文件大小: {self._format_size(file_size) if file_size is not None else '未知'}")
                print(f"文件类型: {content_type}")

                head_response.close()

                # 2. 创建临时文件并下载远程文件
                with tempfile.NamedTemporaryFile(delete=False) as temp_file:
                    temp_file_path = temp_file.name
                    print(f"临时文件路径: {temp_file_path}")

                    print(f"开始下载远程文件到本地: {url}")
                    download_start_time = time.time()
                    downloaded_size = 0

                    with requests.get(final_url, stream=True, timeout=180) as download_response:
                        download_response.raise_for_status()
                        
                        # 更新文件大小(如果 HEAD 未提供)
                        if file_size is None:
                            size_header = download_response.headers.get('Content-Length')
                            if size_header and size_header.isdigit():
                                file_size = int(size_header)
                                print(f"下载流中获取到文件大小: {self._format_size(file_size)}")

                        # 写入临时文件并显示进度
                        for chunk in download_response.iter_content(chunk_size=1024 * 1024):  # 1MB 块
                            if chunk:
                                temp_file.write(chunk)
                                downloaded_size += len(chunk)
                                elapsed_time = time.time() - download_start_time
                                speed = downloaded_size / elapsed_time if elapsed_time > 0 else 0
                                progress = (downloaded_size / file_size * 100) if file_size else 0
                                progress_str = f"{progress:.2f}% " if file_size else ""
                                print(f"\r下载中: {progress_str}({self._format_size(downloaded_size)}/{self._format_size(file_size) if file_size else '未知'}) | "
                                    f"速度: {self._format_size(speed)}/s", end="")

                    print(f"\n远程文件下载完成,大小: {self._format_size(downloaded_size)}")

                # 3. 调用本地文件上传方法
                print(f"开始上传临时文件到夸克网盘: {temp_file_path}")
                upload_result = self._upload_local_file_internal(temp_file_path, pdir_fid, new_filename)

                # 4. 清理临时文件
                try:
                    os.unlink(temp_file_path)
                    print(f"已删除临时文件: {temp_file_path}")
                except Exception as cleanup_e:
                    print(f"警告:删除临时文件失败: {cleanup_e}")

                # 5. 返回结果
                if upload_result.get('success'):
                    print(f"URL 文件上传成功! 文件名: {upload_result.get('file_name')}, 文件ID: {upload_result.get('file_id')}")
                else:
                    print(f"URL 文件上传失败: {upload_result.get('message')}")
                return upload_result

            except requests.exceptions.RequestException as req_e:
                print(f"\n从 URL 下载文件时网络请求失败: {req_e}")
                traceback.print_exc()
                return {"success": False, "message": f"URL 下载失败: {req_e}"}
            except Exception as e:
                print(f"\n从 URL 上传文件时发生未处理异常: {e}")
                traceback.print_exc()
                # 即使发生异常,也尝试清理临时文件
                if 'temp_file_path' in locals():
                    try:
                        os.unlink(temp_file_path)
                        print(f"已删除临时文件: {temp_file_path}")
                    except Exception as cleanup_e:
                        print(f"警告:删除临时文件失败: {cleanup_e}")
                return {"success": False, "message": f"URL 上传失败: {e}"}


class QuarkShareManager(QuarkBase):
    """夸克网盘分享管理类 (保持不变)"""
    
    def create_share_link(self, file_id, title, expire_time=None, password=None):
        """
        创建分享链接
        :param file_id: 文件ID
        :param title: 分享标题
        :param expire_time: 有效期(天),None 或 0 表示永久,其他正整数表示天数
        :param password: 提取密码,可选
        :return: 分享链接信息
        """
        create_url = 'https://drive-pc.quark.cn/1/clouddrive/share'
        params = self.base_params.copy()
        
        create_data = {
            "fid_list": [file_id],
            "title": title,
            "url_type": 1, # Seems fixed
            "expired_type": 1, # Default: 1 for permanent
            "expire_time": 0 # Default for permanent
        }

        if expire_time and isinstance(expire_time, int) and expire_time > 0:
            create_data["expired_type"] = 2  # 2 for custom expiration
            create_data["expire_time"] = expire_time * 86400 # Convert days to seconds
            print(f"设置分享有效期: {expire_time} 天")
        else:
             print("设置分享有效期: 永久")


        if password:
            create_data["password"] = password
            print("设置分享密码: 是")
        else:
            print("设置分享密码: 否")


        try:
            headers = self.headers.copy()
            headers['Content-Type'] = 'application/json'
            create_response = requests.post(create_url, headers=headers, params=params, json=create_data)
            create_response.raise_for_status()
            create_result = create_response.json()
            
            # print(f"创建分享响应: {create_result}") # Debug

            if create_result.get('code') != 0:
                print(f"创建分享任务失败: {create_result.get('message', '未知错误')}")
                return None
            
            share_id = None
            task_sync = create_result.get('data', {}).get('task_sync')
            
            # Handle both synchronous and asynchronous task creation
            if task_sync:
                 # Synchronous: result directly in task_resp
                 task_resp = create_result.get('data', {}).get('task_resp', {})
                 # print(f"同步任务响应: {task_resp}") # Debug
                 if task_resp.get('code') == 0:
                     share_id = task_resp.get('data', {}).get('share_id')
                 else:
                      print(f"同步创建分享失败: {task_resp.get('message', '未知错误')}")
                      return None
            else:
                 # Asynchronous: need to poll task_id
                 task_id = create_result.get('data', {}).get('task_id')
                 if task_id:
                     print(f"创建分享任务已提交,Task ID: {task_id}. 正在查询结果...")
                     task_data = self._poll_task_status(task_id)
                     if task_data and task_data.get('status') == 2: # Check if polling succeeded AND task status is success
                         share_id = task_data.get('share_id')
                     else:
                          print("创建分享任务查询失败或未成功完成。")
                          return None
                 else:
                      print("创建分享响应中未找到 task_id 且非同步模式。")
                      return None

            if share_id:
                print(f"获取到 Share ID: {share_id}")
                # Get share details (URL, pwd) using the share_id
                share_info = self.get_share_url(share_id)
                if share_info and share_info.get('share_url'):
                    share_url = share_info.get('share_url')
                    share_pwd = share_info.get('share_pwd', '') # Password might be empty string if not set
                    print(f"\n分享创建成功!")
                    print(f"分享链接: {share_url}")
                    if password: # Only print password if one was intended
                        print(f"提取密码: {share_pwd if share_pwd else '(无密码)'}") # Display actual pwd or note if none
                    return share_info
                else:
                     print(f"成功获取 Share ID ({share_id}),但获取分享链接详情失败。")
                     # Return basic info if URL fetch fails
                     return {"share_id": share_id, "message": "获取分享链接详情失败"}
            
            print("未能获取到 Share ID。")
            return None
            
        except Exception as e:
            print(f"创建分享链接时发生异常: {e}")
            traceback.print_exc()
            return None
    
    def get_share_url(self, share_id):
        """
        根据分享ID获取分享链接和密码 (保持不变)
        :param share_id: 分享ID
        :return: 分享链接信息字典
        """
        url = 'https://drive-pc.quark.cn/1/clouddrive/share/password'
        params = self.base_params.copy()
        
        data = {
            "share_id": share_id
        }
        try:
            headers = self.headers.copy()
            headers['Content-Type'] = 'application/json'
            response = requests.post(url, headers=headers, params=params, json=data)
            response.raise_for_status()
            result = response.json()
            if result.get('code') == 0:
                return result.get('data', {}) # Contains share_url, share_pwd etc.
            else:
                print(f"获取分享链接详情失败 (ShareID: {share_id}): Code {result.get('code')}, Msg: {result.get('message', '未知错误')}")
                return {}
        except Exception as e:
            print(f"获取分享链接详情时出错 (ShareID: {share_id}): {e}")
            return {}
    
    def get_share_list(self):
        """
        获取我的分享列表 (保持不变)
        :return: 分享列表
        """
        url = 'https://drive-pc.quark.cn/1/clouddrive/share/list'
        params = self.base_params.copy()
        
        try:
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            result = response.json()
            
            if result.get('code') != 0:
                print(f"获取分享列表失败: {result.get('message', '未知错误')}")
                return []
                
            return result.get('data', {}).get('list', [])
        except Exception as e:
            print(f"获取分享列表出错: {e}")
            return []


class QuarkTransferManager(QuarkBase):
    """夸克网盘转存管理类 (主要逻辑保持不变,微调日志和stoken处理)"""
    
    def extract_pwd_id(self, share_url):
        """
        从分享链接中提取pwd_id (保持不变)
        :param share_url: 分享链接
        :return: pwd_id or None if not found
        """
        # Updated regex to handle variations and potential surrounding text
        pattern = r'(?:https?://)?pan\.quark\.cn/s/([a-zA-Z0-9]+)'
        match = re.search(pattern, share_url)
        if match:
            return match.group(1)
        else:
            # Don't raise error, return None and let caller handle
            print(f"无法从 '{share_url}' 中提取有效的夸克 pwd_id (格式应为 pan.quark.cn/s/...)")
            return None
    
    def get_share_token(self, pwd_id, passcode=""):
        """
        获取分享的stoken (保持不变)
        :param pwd_id: 分享ID
        :param passcode: 提取密码,默认为空
        :return: stoken or None
        """
        # Endpoint might vary (drive-h vs drive-pc), let's stick to drive-h as in original
        url = 'https://drive-h.quark.cn/1/clouddrive/share/sharepage/token'
        # Use specific params for this endpoint if needed, or rely on base_params
        params = self.base_params.copy()
        params.update({
            '__dt': str(int(time.time() * 1000) % 1000), # Seems like timing related param
            '__t': str(int(time.time() * 1000))
        })
        
        data = {
            "pwd_id": pwd_id,
            "passcode": passcode
        }
        
        try:
            headers = self.headers.copy()
            headers['Content-Type'] = 'application/json'
            # Referer might be important for sharepage APIs
            headers['Referer'] = f'https://pan.quark.cn/s/{pwd_id}'

            response = requests.post(url, headers=headers, params=params, json=data)
            response.raise_for_status()
            result = response.json()
            
            # print(f"获取分享Token响应: {result}") # Debug

            if result.get('code') != 0:
                print(f"获取分享token失败 (PwdID: {pwd_id}): Code {result.get('code')}, Msg: {result.get('message', '密码错误或分享无效?')}")
                return None
            
            stoken = result.get('data', {}).get('stoken')
            if stoken:
                 print(f"获取到stoken (PwdID: {pwd_id}): {stoken[:10]}...") # Log truncated token
                 return stoken
            else:
                 print(f"获取分享token成功但响应中未找到stoken (PwdID: {pwd_id})")
                 return None
        except Exception as e:
            print(f"获取分享token时出错 (PwdID: {pwd_id}): {e}")
            return None
    
    def get_share_detail(self, pwd_id, stoken):
        """
        获取分享详情 (保持不变, 注意stoken处理)
        :param pwd_id: 分享ID
        :param stoken: 分享token (原始,未编码)
        :return: 分享详情字典 or None
        """
        if not stoken: return None
        url = 'https://drive-h.quark.cn/1/clouddrive/share/sharepage/detail'
        
        # Stoken *should not* be URL-encoded when passed as a query parameter value via requests' `params` dict.
        # Requests library handles the encoding. Let's pass the raw stoken.
        
        params = self.base_params.copy()
        params.update({
            'pwd_id': pwd_id,
            'pdir_fid': '0', # Usually start at root of share
            'force': '0',
            '_page': '1',
            '_size': '50', # Get first 50 items, adjust if needed
            '_fetch_banner': '1',
            '_fetch_share': '1',
            '_fetch_total': '1',
            '_sort': 'file_type:asc,file_name:asc',
            'stoken': stoken, # Pass raw stoken here
            '__dt': str(int(time.time() * 1000) % 1000),
            '__t': str(int(time.time() * 1000))
        })
        
        # print(f"获取分享详情请求参数: {params}") # Debug

        try:
            headers = self.headers.copy()
            # Referer might be important
            headers['Referer'] = f'https://pan.quark.cn/s/{pwd_id}'
            
            response = requests.get(url, headers=headers, params=params) # Let requests handle param encoding
            response.raise_for_status()
            result = response.json()
            
            # print(f"获取分享详情响应: {result}") # Debug

            if result.get('code') != 0:
                print(f"获取分享详情失败 (PwdID: {pwd_id}): Code {result.get('code')}, Msg: {result.get('message', '未知错误')}")
                return None
            
            # Check if data exists and contains expected keys like 'list' and 'share'
            data = result.get('data', {})
            if 'list' not in data or 'share' not in data:
                 print(f"获取分享详情成功但响应数据不完整 (PwdID: {pwd_id})")
                 return None # Or return partial data? Let's return None for safety.

            return data
        except requests.exceptions.RequestException as e:
            print(f"获取分享详情网络请求失败 (PwdID: {pwd_id}): {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"响应状态码: {e.response.status_code}")
                print(f"响应内容: {e.response.text[:200]}...")
            return None
        except Exception as e:
            print(f"获取分享详情时发生异常 (PwdID: {pwd_id}): {e}")
            traceback.print_exc()
            return None
    
    def save_shared_files(self, share_url, to_folder_id="root", passcode=""):
        """
        保存分享文件到自己的网盘 (保持不变, 注意stoken处理和fid/token列表)
        :param share_url: 分享链接
        :param to_folder_id: 保存到的文件夹ID ('root' or specific fid)
        :param passcode: 提取密码
        :return: 保存结果字典
        """
        try:
            # 1. Extract pwd_id
            pwd_id = self.extract_pwd_id(share_url)
            if not pwd_id:
                return {"success": False, "message": "无效的分享链接"}
            print(f"解析到 PwdID: {pwd_id}")
            
            # 2. Get stoken
            stoken = self.get_share_token(pwd_id, passcode)
            if not stoken:
                return {"success": False, "message": "获取分享token失败 (密码错误或分享无效?)"}
            
            # 3. Get share details to find file IDs and share_fid_tokens
            share_detail = self.get_share_detail(pwd_id, stoken)
            if not share_detail:
                return {"success": False, "message": "获取分享详情失败"}
            
            file_list = share_detail.get('list', [])
            if not file_list:
                return {"success": False, "message": "分享中没有文件或无法获取文件列表"}
                
            share_info = share_detail.get('share', {})
            share_title = share_info.get('title', '未知标题')
            print(f"分享标题: {share_title}, 文件数: {len(file_list)}")
            
            # Prepare lists of fids and their corresponding share_fid_tokens
            fid_list = [file.get('fid') for file in file_list if file.get('fid')]
            fid_token_list = [file.get('share_fid_token') for file in file_list if file.get('share_fid_token')]
            
            # Basic validation: ensure lists are non-empty and have same length
            if not fid_list or not fid_token_list or len(fid_list) != len(fid_token_list):
                 print(f"错误:未能从分享详情中正确提取 fid 或 fid_token 列表。")
                 print(f"FIDs: {fid_list}") # Debug
                 print(f"Tokens: {fid_token_list}") # Debug
                 return {"success": False, "message": "提取文件ID或Token失败"}
            
            print(f"准备转存 {len(fid_list)} 个项目...")

            # 4. Call save endpoint
            save_url = 'https://drive-pc.quark.cn/1/clouddrive/share/sharepage/save'
            params = self.base_params.copy()
            params.update({
                 '__dt': str(int(time.time() * 1000) % 1000),
                 '__t': str(int(time.time() * 1000))
            })
            
            # Use "0" for root folder ID in the API call
            target_pdir_fid = to_folder_id if to_folder_id != "root" else "0"
            
            save_data = {
                "fid_list": fid_list,
                "fid_token_list": fid_token_list,
                "pdir_fid": "0", # Usually the root of the *share* itself
                "pwd_id": pwd_id,
                "scene": "link", # Fixed value
                "stoken": stoken, # Pass raw stoken
                "to_pdir_fid": target_pdir_fid # Target folder in *your* drive
            }
            
            # print(f"转存请求数据: {json.dumps(save_data, ensure_ascii=False)}") # Debug (careful with tokens)

            headers = self.headers.copy()
            headers['Content-Type'] = 'application/json'
            headers['Referer'] = f'https://pan.quark.cn/s/{pwd_id}' # Referer might be needed

            print(f"发送转存请求至目录ID: {target_pdir_fid}...")
            save_response = requests.post(save_url, headers=headers, params=params, json=save_data)
            save_response.raise_for_status()
            save_result = save_response.json()
            
            # print(f"转存响应: {json.dumps(save_result, ensure_ascii=False)}") # Debug
            
            # Handle response based on sync/async task
            if save_result.get('code') != 0:
                return {
                    "success": False, 
                    "message": f"转存请求失败: {save_result.get('message', '未知错误')}"
                }

            task_sync = save_result.get('data', {}).get('task_sync')
            if task_sync:
                 # Synchronous completion
                 task_resp = save_result.get('data', {}).get('task_resp', {})
                 if task_resp.get('code') == 0:
                      print("转存任务同步完成。")
                      return {
                           "success": True,
                           "message": "文件保存成功 (同步)",
                           "data": task_resp.get('data', {}),
                           "share_title": share_title
                      }
                 else:
                      print(f"转存任务同步失败: {task_resp.get('message', '未知错误')}")
                      return {
                           "success": False,
                           "message": f"转存同步任务失败: {task_resp.get('message', '未知错误')}",
                           "share_title": share_title
                      }
            else:
                 # Asynchronous task, poll status
                 task_id = save_result.get('data', {}).get('task_id')
                 if task_id:
                     print(f"转存任务已提交,Task ID: {task_id}. 正在查询结果...")
                     task_data = self._poll_task_status(task_id)
                     if task_data and task_data.get('status') == 2:
                         print("转存任务异步完成。")
                         return {
                             "success": True,
                             "message": "文件保存成功 (异步)",
                             "data": task_data, # Return final task data
                             "share_title": share_title
                         }
                     else:
                         print("转存任务查询失败或未成功完成。")
                         return {
                             "success": False,
                             "message": "转存任务查询失败或未成功完成",
                             "task_id": task_id,
                             "share_title": share_title
                         }
                 else:
                      # Should not happen if code was 0 and not sync
                      print("转存响应成功但既非同步也无Task ID。")
                      return {
                          "success": False, 
                          "message": "转存状态未知 (无Task ID)",
                          "share_title": share_title
                     }

        except Exception as e:
            print(f"保存分享文件时发生异常: {e}")
            traceback.print_exc()
            return {"success": False, "message": f"转存异常: {str(e)}"}


# --- Main Execution Block (Example Usage) ---
if __name__ == '__main__':
    # !!! 重要: 请将下面的cookie替换为你自己的有效cookie !!!
    # 获取方法: 登录 https://pan.quark.cn/ ,打开浏览器开发者工具(F12),
    # 切换到 网络(Network) 标签页, 刷新页面或点击任意文件夹,
    # 找到任意一个发送到 drive-pc.quark.cn 或 drive-h.quark.cn 的请求 (如 file/sort, sharepage/detail 等),
    # 在 请求头(Request Headers) 中找到 `Cookie:` 开头的那一行,复制其完整的 值。
    # 示例Cookie (已失效):
    QUARK_COOKIE = '' # 在这里粘贴你的Cookie

    if not QUARK_COOKIE or len(QUARK_COOKIE) < 50:
         print("错误:请在脚本中设置有效的 QUARK_COOKIE 值。")
         exit()

    # 初始化管理器
    try:
        file_manager = QuarkFileManager(QUARK_COOKIE)
        share_manager = QuarkShareManager(QUARK_COOKIE)
        transfer_manager = QuarkTransferManager(QUARK_COOKIE)
        print("API管理器初始化成功。")
    except Exception as init_e:
        print(f"初始化API管理器失败: {init_e}")
        exit()
    
    while True:
        print("\n" + "=" * 15 + " 夸克网盘API测试 " + "=" * 15)
        print("1. 获取文件/文件夹列表")
        print("2. 创建分享链接")
        print("3. 获取我的分享列表")
        print("4. 转存他人分享的文件")
        print("5. 搜索文件")
        print("6. 重命名文件或文件夹")
        print("7. 上传文件 (本地或远程)")
        print("0. 退出")
        print("=" * 48)
        
        choice = input("\n请选择功能 (0-7): ")
        
        if choice == '0':
            print("退出程序")
            break
            
        elif choice == '1':
            print("\n===== 1. 获取文件/文件夹列表 =====")
            folder_id = input("请输入文件夹ID (默认为根目录 'root', 直接回车): ")
            if not folder_id:
                folder_id = 'root'
                
            files = file_manager.get_file_list(folder_id)
            if files is None: # Check for explicit None indicating error
                print("获取文件列表时发生错误。")
            elif not files:
                print("未获取到文件列表,可能是cookie无效或文件夹为空。")
            else:
                print(f"\n'{folder_id}' 中的项目 ({len(files)}):")
                for i, file in enumerate(files, 1):
                    is_folder = file.get('file_type') == 'folder'
                    icon = "📁" if is_folder else "📄"
                    size_str = file_manager._format_size(file.get('size')) if not is_folder else "(-)"
                    print(f"{i:>3}. {icon} {file.get('file_name', 'N/A')} ({size_str}) | ID: {file.get('fid', 'N/A')}")
        
        elif choice == '2':
            print("\n===== 2. 创建分享链接 =====")
            # Let user input file ID directly or choose from list
            list_files_choice = input("是否先列出根目录文件以供选择? (y/n, default n): ").lower()
            file_id_to_share = None
            file_title_to_share = "默认分享标题"

            if list_files_choice == 'y':
                 files = file_manager.get_file_list('root')
                 if files:
                      print("\n根目录文件列表:")
                      for i, file in enumerate(files, 1):
                           icon = "📁" if file.get('file_type') == 'folder' else "📄"
                           print(f"{i:>3}. {icon} {file.get('file_name', 'N/A')}")
                      try:
                           file_index = int(input(f"\n请选择要分享的文件序号 (1-{len(files)}): ")) - 1
                           if 0 <= file_index < len(files):
                                file_id_to_share = files[file_index]['fid']
                                file_title_to_share = files[file_index]['file_name']
                           else:
                                print("选择无效。")
                      except ValueError:
                           print("输入无效。")
                 else:
                      print("无法获取根目录文件列表。")

            if not file_id_to_share:
                 file_id_to_share = input("请输入要分享的文件或文件夹的 FID: ")
                 # Attempt to get title if user provided FID directly
                 file_info = file_manager.get_file_info(file_id_to_share)
                 if file_info:
                     file_title_to_share = file_info.get('file_name', file_title_to_share)

            if not file_id_to_share:
                 print("未指定要分享的文件ID。")
                 continue

            expire_days_str = input("请输入分享有效期天数 (整数, 0 或不填表示永久): ")
            expire_days = None
            try:
                 if expire_days_str:
                      expire_days = int(expire_days_str)
                      if expire_days < 0: expire_days = 0 # Treat negative as permanent
            except ValueError:
                 print("无效的天数,将设置为永久有效。")
                 expire_days = 0

            password = input("请输入提取密码 (可选,直接回车为无密码): ")
                
            print(f"\n准备为 FID: {file_id_to_share} 创建分享...")
            share_info = share_manager.create_share_link(
                 file_id_to_share, 
                 file_title_to_share, 
                 expire_days, # Pass integer days or 0/None
                 password if password else None
            )
            
            if share_info and share_info.get('share_url'):
                print("\n✅ 分享创建成功!")
                print(f"   分享链接: {share_info.get('share_url')}")
                pwd = share_info.get('share_pwd')
                if password: # Only show if a password was intended
                    print(f"   提取密码: {pwd if pwd else '(无密码)'}")
                print(f"   分享ID: {share_info.get('share_id')}")
            elif share_info and share_info.get('share_id'):
                 print(f"\n⚠️ 分享任务已创建 (ShareID: {share_info.get('share_id')}),但获取最终链接详情失败。")
            else:
                print("\n❌ 分享创建失败。")
        
        elif choice == '3':
            print("\n===== 3. 获取我的分享列表 =====")
            share_list = share_manager.get_share_list()
            
            if share_list is None:
                 print("获取分享列表时发生错误。")
            elif not share_list:
                print("未获取到分享列表,可能是cookie无效或没有创建过分享。")
            else:
                print(f"\n我的分享列表 ({len(share_list)}):")
                for i, share in enumerate(share_list, 1):
                    create_ts = share.get('create_time', 0)
                    expire_ts = share.get('expire_time', 0)
                    create_dt = time.strftime('%Y-%m-%d %H:%M', time.localtime(create_ts)) if create_ts else "N/A"
                    expire_dt_str = "永久"
                    if expire_ts > create_ts: # Check if expire time is meaningful
                         expire_dt_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(expire_ts))

                    has_pwd = bool(share.get('share_pwd')) # API might return empty string even if no password

                    print(f"{i:>3}. 标题: {share.get('title', 'N/A')}")
                    print(f"    链接: {share.get('share_url', 'N/A')} {'[有密码]' if has_pwd else '[无密码]'}")
                    print(f"    ShareID: {share.get('share_id', 'N/A')}")
                    print(f"    创建时间: {create_dt} | 过期时间: {expire_dt_str}")
                    print(f"    访问: {share.get('visit_count', 0)} | 保存: {share.get('save_count', 0)} | 下载: {share.get('download_count', 0)}")
                    print("-" * 40)
        
        elif choice == '4':
            print("\n===== 4. 转存他人分享的文件 =====")
            share_url = input("请输入分享链接 (例如 https://pan.quark.cn/s/xxxxxxxx): ")
            if not share_url:
                print("未提供分享链接。")
                continue
                
            passcode = input("请输入提取密码 (如果有,直接回车跳过): ")
            
            # Ask for target folder
            list_folders_choice = input("是否列出文件夹以选择保存位置? (y/n, default y): ").lower()
            target_folder_id = 'root' # Default to root
            target_folder_name = "根目录"

            if list_folders_choice != 'n':
                 folders = file_manager.get_folder_list('root') # List root folders
                 if folders is not None:
                     folders.insert(0, {"file_name": "根目录", "fid": "root"}) # Add root option
                     print("\n请选择保存位置:")
                     for i, folder in enumerate(folders):
                         print(f"{i:>3}. 📁 {folder['file_name']}")
                     try:
                         folder_index = int(input(f"请选择序号 (0-{len(folders)-1}): "))
                         if 0 <= folder_index < len(folders):
                             target_folder_id = folders[folder_index]['fid']
                             target_folder_name = folders[folder_index]['file_name']
                         else:
                             print("选择无效,将保存到根目录。")
                     except ValueError:
                         print("输入无效,将保存到根目录。")
                 else:
                      print("无法获取文件夹列表,将尝试保存到根目录。")
            else:
                 target_folder_id_input = input("请输入目标文件夹的 FID (直接回车保存到根目录): ")
                 if target_folder_id_input:
                      target_folder_id = target_folder_id_input
                      # Try to get name for display
                      folder_info = file_manager.get_file_info(target_folder_id)
                      if folder_info and folder_info.get('file_type') == 'folder':
                          target_folder_name = folder_info.get('file_name', target_folder_id)
                      else:
                          print(f"警告: 无法确认 FID '{target_folder_id}' 是一个有效文件夹,仍将尝试转存。")
                          target_folder_name = target_folder_id

            print(f"\n准备将分享内容转存到: '{target_folder_name}' (ID: {target_folder_id})")
            transfer_result = transfer_manager.save_shared_files(share_url, target_folder_id, passcode)
                
            if transfer_result.get('success'):
                print(f"\n✅ 转存成功: {transfer_result.get('message')}")
                print(f"   分享标题: {transfer_result.get('share_title', '未知')}")
                # Optionally print more details from transfer_result['data'] if needed
            else:
                print(f"\n❌ 转存失败: {transfer_result.get('message')}")
        
        elif choice == '5':
            print("\n===== 5. 搜索文件 =====")
            keyword = input("请输入搜索关键词: ")
            if not keyword:
                print("未提供搜索关键词。")
                continue
            
            # Allow searching specific folder or whole drive
            search_scope = input("在指定文件夹内搜索? (输入FID,或直接回车搜索整个网盘): ")
            search_fid = search_scope if search_scope else 'root' # Default to root/whole drive

            print(f"正在 {'整个网盘' if search_fid == 'root' else f'文件夹 {search_fid}'} 中搜索 '{keyword}'...")
            search_results = file_manager.search_files(keyword, search_fid)

            if search_results is None:
                 print("搜索时发生错误。")
            elif not search_results:
                print(f"未找到包含 '{keyword}' 的文件或文件夹。")
            else:
                print(f"\n搜索结果 ({len(search_results)}):")
                for i, file in enumerate(search_results, 1):
                    is_folder = file.get('file_type') == 'folder'
                    icon = "📁" if is_folder else "📄"
                    size_str = file_manager._format_size(file.get('size')) if not is_folder else "(-)"
                    # Highlight keyword? Maybe too complex for console.
                    print(f"{i:>3}. {icon} {file.get('file_name', 'N/A')} ({size_str}) | ID: {file.get('fid', 'N/A')}")
                    # Could add path info if available in search result, often it's not directly included.
        
        elif choice == '6':
            print("\n===== 6. 重命名文件或文件夹 =====")
            list_files_choice = input("是否先列出根目录文件以供选择? (y/n, default n): ").lower()
            fid_to_rename = None
            old_name = None

            if list_files_choice == 'y':
                 files = file_manager.get_file_list('root')
                 if files:
                      print("\n根目录文件列表:")
                      for i, file in enumerate(files, 1):
                           icon = "📁" if file.get('file_type') == 'folder' else "📄"
                           print(f"{i:>3}. {icon} {file.get('file_name', 'N/A')} | ID: {file.get('fid')}")
                      try:
                           file_index = int(input(f"\n请选择要重命名的项目序号 (1-{len(files)}): ")) - 1
                           if 0 <= file_index < len(files):
                                fid_to_rename = files[file_index]['fid']
                                old_name = files[file_index]['file_name']
                           else:
                                print("选择无效。")
                      except ValueError:
                           print("输入无效。")
                 else:
                      print("无法获取根目录文件列表。")
            
            if not fid_to_rename:
                 fid_to_rename = input("请输入要重命名的文件或文件夹的 FID: ")
                 # Try to get old name for display
                 file_info = file_manager.get_file_info(fid_to_rename)
                 if file_info:
                     old_name = file_info.get('file_name')

            if not fid_to_rename:
                 print("未指定要重命名的项目ID。")
                 continue

            print(f"当前名称: {old_name if old_name else '(未知)'}")
            new_name = input("请输入新名称: ")
            
            if not new_name:
                print("新名称不能为空。")
                continue
            
            print(f"\n准备将 FID: {fid_to_rename} 重命名为 '{new_name}'...")
            success = file_manager.rename_file(fid_to_rename, new_name)
            if success:
                print(f"\n✅ 重命名成功!")
            else:
                print(f"\n❌ 重命名失败。")
        
        elif choice == '7':
            print("\n===== 7. 上传文件 =====")
            print("1. 上传本地文件")
            print("2. 上传远程URL文件")
            upload_choice = input("请选择上传方式 (1-2): ")
            
            source_path_or_url = ""
            is_from_url = False
            
            if upload_choice == '1':
                source_path_or_url = input("请输入要上传的本地文件完整路径: ")
                if not source_path_or_url:
                    print("未提供文件路径。")
                    continue
                is_from_url = False
            elif upload_choice == '2':
                source_path_or_url = input("请输入要上传的文件URL: ")
                if not source_path_or_url:
                    print("未提供文件URL。")
                    continue
                is_from_url = True
            else:
                print("选择无效。")
                continue
                
            # Ask for target folder (similar to transfer)
            list_folders_choice = input("是否列出文件夹以选择上传位置? (y/n, default y): ").lower()
            target_folder_id = 'root' # Default to root
            target_folder_name = "根目录"

            if list_folders_choice != 'n':
                 folders = file_manager.get_folder_list('root') # List root folders
                 if folders is not None:
                     folders.insert(0, {"file_name": "根目录", "fid": "root"}) # Add root option
                     print("\n请选择上传位置:")
                     for i, folder in enumerate(folders):
                         print(f"{i:>3}. 📁 {folder['file_name']}")
                     try:
                         folder_index = int(input(f"请选择序号 (0-{len(folders)-1}): "))
                         if 0 <= folder_index < len(folders):
                             target_folder_id = folders[folder_index]['fid']
                             target_folder_name = folders[folder_index]['file_name']
                         else:
                             print("选择无效,将上传到根目录。")
                     except ValueError:
                         print("输入无效,将上传到根目录。")
                 else:
                      print("无法获取文件夹列表,将尝试上传到根目录。")
            else:
                 target_folder_id_input = input("请输入目标文件夹的 FID (直接回车上传到根目录): ")
                 if target_folder_id_input:
                      target_folder_id = target_folder_id_input
                      folder_info = file_manager.get_file_info(target_folder_id)
                      if folder_info and folder_info.get('file_type') == 'folder':
                          target_folder_name = folder_info.get('file_name', target_folder_id)
                      else:
                          print(f"警告: 无法确认 FID '{target_folder_id}' 是一个有效文件夹,仍将尝试上传。")
                          target_folder_name = target_folder_id
                
            new_filename_input = input("请输入上传后的文件名 (可选,直接回车使用原文件名或URL推断): ")
            
            print(f"\n准备上传到: '{target_folder_name}' (ID: {target_folder_id})")
            # Call the unified upload_file method
            upload_result = file_manager.upload_file(
                source_path_or_url, 
                target_folder_id, 
                new_filename_input if new_filename_input else None,
                is_from_url
            )
            
            # Display result
            if upload_result and upload_result.get('success'):
                print(f"\n✅ 上传成功!")
                print(f"   文件名: {upload_result.get('file_name')}")
                print(f"   文件ID: {upload_result.get('file_id')}")
                print(f"   大小: {file_manager._format_size(upload_result.get('size'))}")
                print(f"   消息: {upload_result.get('message')}") # e.g., "秒传成功" or "上传成功"
            else:
                # Handle None result or success=False
                message = "未知错误"
                if upload_result and 'message' in upload_result:
                    message = upload_result['message']
                elif upload_result is None:
                     message = "上传函数返回了 None"
                print(f"\n❌ 上传失败: {message}")
        
        else:
            print("无效的选择,请重新输入。")
        
        # Pause before next iteration
        try:
             input("\n按 Enter 键继续...")
        except EOFError: # Handle case where input stream is closed (e.g., piping)
             break

可以直接调用 或者直接运行进行测试

赞(0) 打赏
未经允许不得转载:大地主的知识库 » 夸克网盘文件上传、转存、批量分享接口代码

评论 抢沙发

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我将继续提供更多优质内容,让我们一起创建更加美好的网络世界!

微信扫一扫打赏

登录

找回密码

注册