#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # 用法: 直接将本脚本路径附加到原有命令行之前 # 这样做的目的是,确保青龙的 runSingle 函数中不会重新给它加上task前缀,而导致使用task来实际触发本脚本 # # 使用环境变量 PTASK_THREADS 定义并发数 # ptask.py task /ql/config/ptask/demo.js # # 某个脚本单独设置并发数(比环境变量优先级高),比如下面表示单独设置并发数为2 # ptask.py -t2 task /ql/config/ptask/demo.js # # import argparse import datetime import logging import os import subprocess import sys from typing import Optional envs = {} logger = None # type: Optional[logging.Logger] def parallel_task(target_script: str, threads: int, jd_cookie_count: int): account_per_thread = (jd_cookie_count + threads - 1) // threads logger.debug(f"并发执行脚本, Powered By 风之凌殇") logger.debug(f"使用 {threads} 并发") logger.debug(f"共有 {jd_cookie_count} 个京东cookie") logger.debug(f"每个并发任务处理 {account_per_thread} 个账号") # 构建并发任务的cmd # 比如下面是 使用三个进程并行执行13个账号的实际子任务命令列表 # PTASK_LEFT=1 PTASK_RIGHT=5 task /ql/config/ptask/demo.py desi JD_COOKIE 1 2 3 4 5 # PTASK_LEFT=6 PTASK_RIGHT=10 task /ql/config/ptask/demo.py desi JD_COOKIE 6 7 8 9 10 # PTASK_LEFT=11 PTASK_RIGHT=13 task /ql/config/ptask/demo.py desi JD_COOKIE 11 12 13 cmd_list = [] for idx in range(threads): left = idx * account_per_thread + 1 right = min((idx + 1) * account_per_thread, jd_cookie_count) batch_accounts = ' '.join(map(str, range(left, right + 1))) cmd_list.append(f"PTASK=true PTASK_FINAL_THREADS={threads} PTASK_BATCH={account_per_thread} PTASK_TOTAL={jd_cookie_count} PTASK_LEFT={left} PTASK_RIGHT={right} task {target_script} desi JD_COOKIE {batch_accounts}") logger.debug("将要执行以下任务") for cmd in cmd_list: logger.debug(f"\t{cmd}") # 确保在执行前全部输出 logger.debug("") task_list = [subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True) for cmd in cmd_list] for task in task_list: task.wait() # logger.debug(task.stdout.read()) logger.debug("全部任务执行完毕") def parse_args(): logger.debug(f"sys.argv={sys.argv}") parse_envs() parser = argparse.ArgumentParser() parser.add_argument("task", type=str, help="只能填写task", choices=["task"]) parser.add_argument("target_script", type=str, help="实际要运行的脚本路径") parser.add_argument("now", nargs="?", type=str, help="是否立即执行") parser.add_argument("-t", "--threads", type=int, help="并发数", default=int(get_env('PTASK_THREADS', '3'))) parser.add_argument("--jd_cookie_count", type=int, help="京东cookie数量", default=int(get_env('JD_COOKIE', '').count('&') + 1)) args = parser.parse_args() logger.debug(f"args={args}") return args def parse_envs(): # 加载 /ql/config/config.sh 和 /ql/config/env.sh 中定义的环境变量 global envs for shell_file in [ '/ql/config/config.sh', '/ql/config/env.sh', '../config.sh', '../env.sh', ]: if not os.path.exists(shell_file): continue env_prefix = "export " with open(shell_file, 'r', encoding='utf-8') as f: for line in f.readlines(): if not line.startswith(env_prefix): continue line = line.replace(env_prefix, "") key, value = line.split('=', maxsplit=1) start_idx = value.find('"') end_idx = value.rfind('"') envs[key] = value[start_idx + 1:end_idx] # for k, v in envs.items(): # print(f"{k}={v}") def get_env(key: str, default="") -> str: return envs.get(key, default) def init_logger(): global logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) logger.name = "ptask" consoleHandler = logging.StreamHandler() consoleHandler.setLevel(logging.INFO) logger.addHandler(consoleHandler) log_directory = "./logs" if os.path.exists("/ql/log"): log_directory = "/ql/log/ptask" if not os.path.exists(log_directory): os.makedirs(log_directory, exist_ok=True) time_str = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') log_filename = f"{log_directory}/{time_str}.log" fileHandler = logging.FileHandler(log_filename, encoding="utf-8", delay=True) fileHandler.setLevel(logging.DEBUG) fileHandler.setFormatter(logging.Formatter("%(asctime)s %(message)s")) logger.addHandler(fileHandler) def main(): init_logger() args = parse_args() parallel_task(args.target_script, args.threads, args.jd_cookie_count) if __name__ == '__main__': main()