Skip to content

Instantly share code, notes, and snippets.

@MuXiu1997
Last active December 12, 2025 06:22
Show Gist options
  • Select an option

  • Save MuXiu1997/d982ea1c53c97ee06bf8957e229b8166 to your computer and use it in GitHub Desktop.

Select an option

Save MuXiu1997/d982ea1c53c97ee06bf8957e229b8166 to your computer and use it in GitHub Desktop.
ass-processor
import { ensureDir } from 'https://deno.land/std@0.208.0/fs/mod.ts'
import { basename, join } from 'https://deno.land/std@0.208.0/path/mod.ts'
import { $ } from 'npm:zx@8.8.1'
import fg from 'npm:fast-glob@3.3.3'
import { consola } from 'npm:consola@3.2.3'
import dayjs from 'npm:dayjs@1.11.19'
// 从 extractors.ts 导入压缩文件处理函数
import { extractArchive, isArchiveFile } from './extractors.ts'
// 从 assfonts-installer.ts 导入安装器功能
import { ensureAssfontsInstalled } from './assfonts-installer.ts'
// ============================================================================
// 工具函数
// ============================================================================
/** 从 unknown 类型的 error 中提取错误信息 */
function getErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error)
}
// ============================================================================
// ASS 字幕处理
// ============================================================================
/**
* 处理单个 ASS 字幕文件:进行字体子集化和内嵌
* @param inputFile 输入 ASS 文件路径
* @param outputPath 输出目录路径
* @param fontPaths 字体搜索路径列表
* @param logFile 日志文件路径,如果提供则将 CLI 输出追加到此文件
*/
async function processAssSubtitle(
inputFile: string,
outputPath: string,
fontPaths: string[],
logFile?: string,
): Promise<void> {
const binPath = await ensureAssfontsInstalled()
// 构建命令参数
const args: string[] = ['-i', inputFile]
// 输出路径
if (outputPath.endsWith('.ass')) {
throw new Error('目前不支持指定输出文件路径,请使用输出目录')
}
args.push('-o', outputPath)
// 字体路径
for (const fontPath of fontPaths) {
args.push('-f', fontPath)
}
// 固定 verbose 为 3
args.push('-v', '3')
consola.info(`🎬 处理 ASS 字幕: ${inputFile}`)
consola.info(`📁 输出位置: ${outputPath}`)
if (fontPaths.length) {
consola.info(`🔤 字体路径: ${fontPaths.join(', ')}`)
}
try {
const result = await $`${binPath} ${args}`.quiet()
// 如果指定了日志文件,将输出追加到日志
if (logFile) {
const timestamp = dayjs().format('YYYY-MM-DD HH:mm:ss')
const logEntry = [
`\n${'='.repeat(60)}`,
`[${timestamp}] 处理文件: ${inputFile}`,
`命令: ${binPath} ${args.join(' ')}`,
`${'='.repeat(60)}`,
`--- STDOUT ---`,
result.stdout || '(无输出)',
`--- STDERR ---`,
result.stderr || '(无输出)',
`${'='.repeat(60)}\n`,
].join('\n')
await Deno.writeTextFile(logFile, logEntry, { append: true })
}
consola.success('ASS 字幕处理完成')
} catch (error) {
// 如果指定了日志文件,将错误也写入日志
if (logFile) {
const timestamp = dayjs().format('YYYY-MM-DD HH:mm:ss')
const logEntry = [
`\n${'='.repeat(60)}`,
`[${timestamp}] 处理文件失败: ${inputFile}`,
`命令: ${binPath} ${args.join(' ')}`,
`错误: ${getErrorMessage(error)}`,
`${'='.repeat(60)}\n`,
].join('\n')
await Deno.writeTextFile(logFile, logEntry, { append: true })
}
throw new Error(`ASS 字幕处理失败: ${getErrorMessage(error)}`)
}
}
// ============================================================================
// 批处理
// ============================================================================
/**
* 单个批处理项配置
* 每个 glob 必须精确匹配一个文件
*/
interface BatchItem {
/** 字体目录或压缩文件(支持单个或多个) */
fontDir: string | string[]
/** 原始字幕目录或压缩文件 */
subtitleDir: string
/** 字幕文件 glob 模式,必须精确匹配一个文件 */
subtitleGlob: string
/** 输出目录(视频文件所在目录) */
outputDir: string
/** 视频文件 glob 模式,必须精确匹配一个文件 */
videoGlob: string
/** 输出后缀,例如 ".sc.ass" */
outputSuffix: string
}
/**
* 批处理结果
*/
interface BatchResult {
success: boolean
inputFile: string
outputFile: string
error?: string
}
/**
* 准备目录(如果是压缩文件则解压)
*/
async function prepareDirectory(
path: string,
description: string,
): Promise<{ dir: string; needCleanup: boolean }> {
try {
const stat = await Deno.stat(path)
if (stat.isDirectory) {
return { dir: path, needCleanup: false }
}
if (stat.isFile && await isArchiveFile(path)) {
const tempDir = await Deno.makeTempDir({
prefix: `assfonts_${description}_`,
})
await extractArchive(path, tempDir)
return { dir: tempDir, needCleanup: true }
}
throw new Error(`${description} "${path}" 不是有效的目录或压缩文件`)
} catch (error) {
if (error instanceof Deno.errors.NotFound) {
throw new Error(`${description} "${path}" 不存在`)
}
throw error
}
}
/**
* 使用 glob 或正则表达式获取文件列表
* - 如果 pattern 以 / 开头和结尾,则作为正则表达式处理
* - 否则作为 glob 模式处理
* 注意:使用 Deno.readDir + regex 而非 expandGlob,
* 以正确处理目录路径中包含特殊字符(如方括号)的情况
*/
async function getFilesByGlob(dir: string, pattern: string): Promise<string[]> {
// 使用 fast-glob 进行匹配
// 使用 cwd 选项来避免目录路径中的特殊字符被解释为 glob 模式
// fast-glob 支持转义特殊字符,例如 \[01\] 可以匹配字面的 [01]
const files = await fg(pattern, {
onlyFiles: true,
cwd: dir,
absolute: true,
})
return files
}
/**
* 使用 glob 获取唯一文件,如果匹配 0 个或多个则抛出异常
*/
async function getUniqueFileByGlob(
dir: string,
glob: string,
description: string,
): Promise<string> {
const files = await getFilesByGlob(dir, glob)
if (files.length === 0) {
throw new Error(
`${description}: glob "${glob}" 在目录 "${dir}" 中没有匹配到任何文件`,
)
}
if (files.length > 1) {
const fileList = files.map((f) => ` - ${basename(f)}`).join('\n')
throw new Error(
`${description}: glob "${glob}" 在目录 "${dir}" 中匹配到多个文件:\n${fileList}`,
)
}
return files[0]
}
/**
* 解压缓存管理器
*/
class ExtractCache {
private cache = new Map<string, string>()
private tempDirs: string[] = []
async getOrExtract(
archivePath: string,
description: string,
): Promise<string> {
const absolutePath = await Deno.realPath(archivePath)
if (this.cache.has(absolutePath)) {
consola.info(`📦 使用缓存的解压目录: ${basename(archivePath)}`)
return this.cache.get(absolutePath)!
}
const tempDir = await Deno.makeTempDir({
prefix: `assfonts_${description}_`,
})
await extractArchive(archivePath, tempDir)
this.cache.set(absolutePath, tempDir)
this.tempDirs.push(tempDir)
return tempDir
}
async cleanup(): Promise<void> {
for (const dir of this.tempDirs) {
try {
await Deno.remove(dir, { recursive: true })
} catch {
// 忽略清理错误
}
}
this.cache.clear()
this.tempDirs = []
}
getStats(): { cached: number; tempDirs: number } {
return {
cached: this.cache.size,
tempDirs: this.tempDirs.length,
}
}
}
/**
* 准备目录(支持缓存)
*/
async function prepareDirectoryWithCache(
path: string,
description: string,
cache: ExtractCache,
): Promise<string> {
try {
const stat = await Deno.stat(path)
if (stat.isDirectory) {
return path
}
if (stat.isFile && await isArchiveFile(path)) {
return await cache.getOrExtract(path, description)
}
throw new Error(`${description} "${path}" 不是有效的目录或压缩文件`)
} catch (error) {
if (error instanceof Deno.errors.NotFound) {
throw new Error(`${description} "${path}" 不存在`)
}
throw error
}
}
/**
* 处理单个批处理项(使用缓存)
*/
async function processBatchItemWithCache(
item: BatchItem,
cache: ExtractCache,
logFile?: string,
): Promise<BatchResult> {
const fontDirs = Array.isArray(item.fontDir) ? item.fontDir : [item.fontDir]
consola.log('\n' + '-'.repeat(60))
consola.info(`📁 字体源: ${fontDirs.join(', ')}`)
consola.info(`📂 字幕源: ${item.subtitleDir}`)
consola.info(`🔍 字幕 glob: ${item.subtitleGlob}`)
consola.info(`📤 输出目录: ${item.outputDir}`)
consola.info(`🎬 视频 glob: ${item.videoGlob}`)
consola.info(`📝 输出后缀: ${item.outputSuffix}`)
consola.log('-'.repeat(60))
let tempOutputDir: string | null = null
try {
const actualFontDirs = await Promise.all(
fontDirs.map((dir, index) =>
prepareDirectoryWithCache(
dir,
fontDirs.length > 1 ? `字体${index + 1}` : '字体',
cache,
)
),
)
const actualSubtitleDir = await prepareDirectoryWithCache(
item.subtitleDir,
'字幕',
cache,
)
const subtitleFile = await getUniqueFileByGlob(
actualSubtitleDir,
item.subtitleGlob,
'字幕文件',
)
consola.info(`📥 字幕文件: ${basename(subtitleFile)}`)
const videoFile = await getUniqueFileByGlob(
item.outputDir,
item.videoGlob,
'视频文件',
)
consola.info(`🎬 视频文件: ${basename(videoFile)}`)
const videoBasename = basename(videoFile)
const outputFilename = videoBasename.replace(
/\.(mkv|mp4|avi|mov|wmv|flv|webm|m4v)$/i,
item.outputSuffix,
)
const outputFile = join(item.outputDir, outputFilename)
consola.info(`📤 输出文件: ${outputFilename}`)
await ensureDir(item.outputDir)
tempOutputDir = await Deno.makeTempDir({ prefix: 'assfonts_output_' })
await processAssSubtitle(subtitleFile, tempOutputDir, actualFontDirs, logFile)
const processedFiles = await getFilesByGlob(tempOutputDir, '*.assfonts.ass')
if (processedFiles.length === 0) {
throw new Error('未找到处理后的字幕文件')
}
if (processedFiles.length > 1) {
throw new Error(`处理后产生了多个字幕文件: ${processedFiles.length}`)
}
const processedFile = processedFiles[0]
await Deno.copyFile(processedFile, outputFile)
consola.success(`处理完成: ${outputFilename}`)
return {
success: true,
inputFile: subtitleFile,
outputFile,
}
} catch (error) {
consola.error(`处理失败: ${getErrorMessage(error)}`)
return {
success: false,
inputFile: '',
outputFile: '',
error: getErrorMessage(error),
}
} finally {
if (tempOutputDir) {
try {
await Deno.remove(tempOutputDir, { recursive: true })
} catch {
// 忽略清理错误
}
}
}
}
/**
* 处理单个批处理项
*/
async function processBatchItem(
item: BatchItem,
logFile?: string,
): Promise<BatchResult> {
const cache = new ExtractCache()
try {
return await processBatchItemWithCache(item, cache, logFile)
} finally {
await cache.cleanup()
}
}
/**
* 生成批处理日志文件路径
*/
function generateBatchLogPath(): string {
const timestamp = dayjs().format('YYYY-MM-DD_HH-mm-ss')
return join(Deno.cwd(), `assfonts-batch-${timestamp}.log`)
}
/**
* 执行批处理
* @param items 批处理项列表
* @param options 批处理选项
* @returns 批处理结果和日志文件路径
*/
async function processBatch(
items: BatchItem[],
options: {
/** 自定义日志文件路径,如果不提供则自动生成 */
logFile?: string
/** 是否禁用日志文件 */
disableLog?: boolean
} = {},
): Promise<{ results: BatchResult[]; logFile?: string }> {
consola.log('\n' + '='.repeat(60))
consola.start('📦 开始批处理')
consola.info(`📋 共 ${items.length} 个任务`)
consola.log('='.repeat(60))
await ensureAssfontsInstalled()
// 创建日志文件
let logFile: string | undefined
if (!options.disableLog) {
logFile = options.logFile || generateBatchLogPath()
const logHeader = [
`assfonts 批处理日志`,
`开始时间: ${dayjs().format('YYYY-MM-DD HH:mm:ss')}`,
`任务数量: ${items.length}`,
`${'='.repeat(60)}`,
'',
].join('\n')
await Deno.writeTextFile(logFile, logHeader)
consola.info(`📝 日志文件: ${logFile}`)
}
const uniqueArchives = new Set<string>()
for (const item of items) {
const fontDirs = Array.isArray(item.fontDir) ? item.fontDir : [item.fontDir]
for (const fontDir of fontDirs) {
if (await isArchiveFile(fontDir)) {
uniqueArchives.add(fontDir)
}
}
if (await isArchiveFile(item.subtitleDir)) {
uniqueArchives.add(item.subtitleDir)
}
}
if (uniqueArchives.size > 0) {
consola.info(
`📦 检测到 ${uniqueArchives.size} 个唯一压缩文件,将使用缓存优化`,
)
}
const cache = new ExtractCache()
const results: BatchResult[] = []
let successCount = 0
let failCount = 0
try {
for (let i = 0; i < items.length; i++) {
consola.info(`\n[${i + 1}/${items.length}] 处理中...`)
const result = await processBatchItemWithCache(items[i], cache, logFile)
results.push(result)
if (result.success) {
successCount++
} else {
failCount++
consola.error(`批处理在第 ${i + 1} 项失败,停止执行`)
break
}
}
} finally {
const stats = cache.getStats()
if (stats.tempDirs > 0) {
consola.info(`🧹 清理 ${stats.tempDirs} 个临时解压目录...`)
}
await cache.cleanup()
// 写入日志摘要
if (logFile) {
const logSummary = [
'',
`${'='.repeat(60)}`,
`批处理完成`,
`结束时间: ${dayjs().format('YYYY-MM-DD HH:mm:ss')}`,
`成功: ${successCount}`,
`失败: ${failCount}`,
`未执行: ${items.length - successCount - failCount}`,
`${'='.repeat(60)}`,
].join('\n')
await Deno.writeTextFile(logFile, logSummary, { append: true })
}
}
consola.log('\n' + '='.repeat(60))
consola.success('📊 批处理完成')
consola.log('='.repeat(60))
consola.info(` ✅ 成功: ${successCount}`)
consola.info(` ❌ 失败: ${failCount}`)
consola.info(` ⏸️ 未执行: ${items.length - successCount - failCount}`)
if (logFile) {
consola.info(` 📝 日志: ${logFile}`)
}
consola.log('='.repeat(60) + '\n')
if (failCount > 0) {
throw new Error(`批处理失败: ${failCount} 个任务失败`)
}
return { results, logFile }
}
// ============================================================================
// 导出
// ============================================================================
export {
// 批处理
ExtractCache,
// 工具函数
getFilesByGlob,
getUniqueFileByGlob,
prepareDirectory,
prepareDirectoryWithCache,
processAssSubtitle,
processBatch,
processBatchItem,
processBatchItemWithCache,
}
// 重新导出 extractors.ts 的函数
export { extractArchive, isArchiveFile } from './extractors.ts'
// 重新导出 assfonts-installer.ts 的函数
export {
downloadAssfonts,
ensureAssfontsInstalled,
fileExists,
getAssfontsPath,
withMountedDmg,
} from './assfonts-installer.ts'
export type {
// 批处理类型
BatchItem,
BatchResult,
}
/**
* assfonts 安装器模块
*
* 提供 assfonts 二进制文件的下载、安装和管理功能
* 支持 macOS (Intel/Apple Silicon) 和 Linux (x86_64/ARM64)
*/
import { ensureDir } from 'https://deno.land/std@0.208.0/fs/mod.ts'
import { join } from 'https://deno.land/std@0.208.0/path/mod.ts'
import { $ } from 'npm:zx@8.8.1'
import { consola } from 'npm:consola@3.2.3'
// 从 extractors.ts 导入压缩文件处理函数
import { extractArchive } from './extractors.ts'
// ============================================================================
// 常量定义
// ============================================================================
const OS = Deno.build.os
const ARCH = Deno.build.arch
const HOME = Deno.env.get('HOME') || (() => {
consola.error('HOME 环境变量未设置')
Deno.exit(1)
})()
const XDG_DATA_HOME = Deno.env.get('XDG_DATA_HOME') ||
join(HOME, '.local', 'share')
const ASSFONTS_VERSION = 'v0.7.3'
const ASSFONTS_INSTALL_DIR = join(XDG_DATA_HOME, `assfonts@${ASSFONTS_VERSION}`)
const ASSFONTS_BIN_DIR = join(ASSFONTS_INSTALL_DIR, 'bin')
const ASSFONTS_BIN_PATH = join(ASSFONTS_BIN_DIR, 'assfonts')
// ============================================================================
// 工具函数
// ============================================================================
/** 从 unknown 类型的 error 中提取错误信息 */
function getErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error)
}
/** 检查文件是否存在 */
async function fileExists(path: string): Promise<boolean> {
try {
await Deno.stat(path)
return true
} catch {
return false
}
}
// ============================================================================
// DMG 挂载(macOS)
// ============================================================================
/**
* 挂载 DMG 文件并执行回调函数,完成后自动卸载并清理临时目录
* @param dmgPath DMG 文件路径
* @param callback 接收挂载点路径的回调函数
* @returns 回调函数的返回值
*/
async function withMountedDmg<T>(
dmgPath: string,
callback: (mountPoint: string) => Promise<T>,
): Promise<T> {
// 使用 Deno API 创建临时目录作为挂载点
const mountPoint = await Deno.makeTempDir({ prefix: 'dmg_mount_' })
// 挂载 dmg
try {
await $`hdiutil attach ${dmgPath} -mountpoint ${mountPoint} -nobrowse`
} catch (error) {
// 挂载失败时清理临时目录
try {
await Deno.remove(mountPoint)
} catch {
// 忽略清理错误
}
throw new Error(`挂载 dmg 失败: ${getErrorMessage(error)}`)
}
try {
// 执行业务逻辑
return await callback(mountPoint)
} finally {
// 确保卸载 dmg
try {
await $`hdiutil detach ${mountPoint}`
} catch (e) {
consola.warn('卸载 dmg 时出错:', e)
}
// 清理临时目录
try {
await Deno.remove(mountPoint)
} catch {
// 忽略清理错误
}
}
}
// ============================================================================
// assfonts 安装和管理
// ============================================================================
/**
* 下载 assfonts 二进制文件到本地
* 支持 Mac (Intel/Apple Silicon) 和 Linux (x86_64/ARM64) 系统
*/
async function downloadAssfonts(): Promise<void> {
const baseUrl =
`https://github.com/wyzdwdz/assfonts/releases/download/${ASSFONTS_VERSION}`
const [assetName, isTarGz] = (() => {
if (OS === 'darwin') {
// macOS
if (ARCH === 'x86_64' || ARCH === 'aarch64') {
return [`assfonts-${ASSFONTS_VERSION}-${ARCH}-macOS.dmg`, false]
}
throw new Error(`不支持的 Mac 架构: ${ARCH}`)
}
if (OS === 'linux') {
// Linux
if (ARCH === 'x86_64' || ARCH === 'aarch64') {
return [`assfonts-${ASSFONTS_VERSION}-${ARCH}-Linux.tar.gz`, true]
}
throw new Error(`不支持的 Linux 架构: ${ARCH}`)
}
throw new Error(`不支持的操作系统: ${OS}`)
})()
const downloadUrl = `${baseUrl}/${assetName}`
consola.info(`检测到系统: ${OS} ${ARCH}`)
consola.info(`下载文件: ${assetName}`)
consola.info(`安装目录: ${ASSFONTS_INSTALL_DIR}`)
// 确保目录存在
await ensureDir(ASSFONTS_BIN_DIR)
// 下载文件
consola.start(`正在下载 ${downloadUrl}...`)
const response = await fetch(downloadUrl)
if (!response.ok) {
throw new Error(`下载失败: ${response.status} ${response.statusText}`)
}
const tempFile = join(ASSFONTS_INSTALL_DIR, `temp_${assetName}`)
const file = await Deno.create(tempFile)
await response.body?.pipeTo(file.writable)
consola.success('下载完成,开始处理文件...')
if (isTarGz) {
// 处理 tar.gz 文件
consola.start('解压 tar.gz 文件...')
// 使用 extractArchive 解压
try {
await extractArchive(tempFile, ASSFONTS_INSTALL_DIR)
} catch (error) {
throw new Error(`解压失败: ${getErrorMessage(error)}`)
}
// Linux 版本的 tar.gz 解压后结构为 bin/assfonts
// 检查 bin/assfonts 是否存在
if (await fileExists(ASSFONTS_BIN_PATH)) {
// 文件已经在正确的位置
await Deno.chmod(ASSFONTS_BIN_PATH, 0o755)
consola.info(`二进制文件位置: ${ASSFONTS_BIN_PATH}`)
} else {
// 尝试查找根目录下的 assfonts
const rootBin = join(ASSFONTS_INSTALL_DIR, 'assfonts')
if (await fileExists(rootBin)) {
await Deno.rename(rootBin, ASSFONTS_BIN_PATH)
await Deno.chmod(ASSFONTS_BIN_PATH, 0o755)
} else {
throw new Error(`未找到解压后的 assfonts 二进制文件`)
}
}
} else {
// 处理 .dmg 文件 (macOS)
consola.start('处理 .dmg 文件...')
await withMountedDmg(tempFile, async (mountPoint) => {
// 查找二进制文件
const binInDmg = join(mountPoint, 'bin', 'assfonts')
if (await fileExists(binInDmg)) {
await Deno.copyFile(binInDmg, ASSFONTS_BIN_PATH)
await Deno.chmod(ASSFONTS_BIN_PATH, 0o755)
consola.info('二进制文件已复制到:', ASSFONTS_BIN_PATH)
} else {
throw new Error(`在 dmg 中未找到 assfonts 二进制文件`)
}
})
}
// 清理临时文件
try {
await Deno.remove(tempFile)
} catch (e) {
consola.warn('清理临时文件时出错:', e)
}
// 验证安装
if (await fileExists(ASSFONTS_BIN_PATH)) {
consola.success(`assfonts ${ASSFONTS_VERSION} 安装成功!`)
consola.info(`📍 位置: ${ASSFONTS_BIN_PATH}`)
// 检查是否可执行
try {
const result = await $`${ASSFONTS_BIN_PATH} --help`
if (result.stdout.includes('assfonts')) {
consola.success(`assfonts 工作正常`)
}
} catch (e) {
consola.warn('无法验证 assfonts:', e)
}
} else {
throw new Error('安装失败:未找到二进制文件')
}
}
/**
* 确保 assfonts 已安装,如果未安装则自动安装
*/
async function ensureAssfontsInstalled(): Promise<string> {
// 检查二进制文件是否存在
if (!(await fileExists(ASSFONTS_BIN_PATH))) {
consola.info('assfonts 未安装,开始安装...')
await downloadAssfonts()
return ASSFONTS_BIN_PATH
}
// 检查二进制文件是否可执行
try {
const result = await $`${ASSFONTS_BIN_PATH} --help`
if (result.stdout.includes(`assfonts ${ASSFONTS_VERSION}`)) {
consola.success(`assfonts ${ASSFONTS_VERSION} 已安装`)
return ASSFONTS_BIN_PATH
}
} catch {
consola.warn('检测到 assfonts 文件存在但可能损坏,重新安装...')
}
// 如果检测失败,重新安装
consola.start('重新安装 assfonts...')
await downloadAssfonts()
return ASSFONTS_BIN_PATH
}
/**
* 获取已安装的 assfonts 二进制文件路径(如果已安装)
*/
async function getAssfontsPath(): Promise<string | null> {
if (!(await fileExists(ASSFONTS_BIN_PATH))) {
return null
}
// 快速检查是否可执行
try {
const result = await $`${ASSFONTS_BIN_PATH} --help`
if (result.stdout.includes('assfonts')) {
return ASSFONTS_BIN_PATH
}
} catch {
// 忽略错误
}
return null
}
// ============================================================================
// 导出
// ============================================================================
export {
// 常量
ASSFONTS_BIN_DIR,
ASSFONTS_BIN_PATH,
ASSFONTS_INSTALL_DIR,
ASSFONTS_VERSION,
// 主要函数
downloadAssfonts,
ensureAssfontsInstalled,
// 工具函数
fileExists,
getAssfontsPath,
getErrorMessage,
// DMG 挂载
withMountedDmg,
}
import { ensureDir } from 'https://deno.land/std@0.208.0/fs/mod.ts'
import {
basename,
dirname,
join,
} from 'https://deno.land/std@0.208.0/path/mod.ts'
import SevenZip from 'npm:7z-wasm@1.2.0'
import { consola } from 'npm:consola@3.2.3'
import { nanoid } from 'npm:nanoid@5.1.6'
import { fileTypeFromFile } from 'npm:file-type@21.1.1'
// ============================================================================
// 压缩文件检测
// ============================================================================
/**
* 使用 file-type 检测文件类型(基于文件内容)
*/
export async function detectFileType(
path: string,
): Promise<{ ext?: string; mime?: string } | undefined> {
try {
return await fileTypeFromFile(path)
} catch {
return undefined
}
}
/**
* 检查路径是否为 RAR 文件(基于文件内容)
*/
export async function isRarFile(path: string): Promise<boolean> {
const type = await detectFileType(path)
return type?.ext === 'rar'
}
/**
* 检查路径是否为 ZIP 文件(基于文件内容)
*/
export async function isZipFile(path: string): Promise<boolean> {
const type = await detectFileType(path)
return type?.ext === 'zip'
}
/**
* 检查路径是否为 7z 文件(基于文件内容)
*/
export async function is7zFile(path: string): Promise<boolean> {
const type = await detectFileType(path)
return type?.ext === '7z'
}
/**
* 检查路径是否为 tar 文件(基于文件内容)
*/
export async function isTarFile(path: string): Promise<boolean> {
const type = await detectFileType(path)
return type?.ext === 'tar' || type?.ext === 'tar.gz'
}
/**
* 检查路径是否为压缩文件(基于文件内容)
*/
export async function isArchiveFile(path: string): Promise<boolean> {
const type = await detectFileType(path)
if (!type?.ext) return false
return ['rar', 'zip', '7z', 'tar', 'tar.gz'].includes(type.ext)
}
// ============================================================================
// 7z-wasm 核心解压逻辑
// ============================================================================
/**
* 统计目录中的文件数量(递归)
*/
async function countFiles(dir: string): Promise<number> {
let count = 0
async function walk(path: string): Promise<void> {
for await (const entry of Deno.readDir(path)) {
if (entry.isFile) {
count++
} else if (entry.isDirectory) {
await walk(join(path, entry.name))
}
}
}
await walk(dir)
return count
}
/**
* 使用 7z-wasm 执行解压
* @param archivePath 压缩文件路径
* @param destDir 目标目录
*/
async function extractWith7z(
archivePath: string,
destDir: string,
): Promise<void> {
// @ts-ignore: npm package type definition issue
const sevenZip = await SevenZip()
const absoluteArchivePath = await Deno.realPath(archivePath)
const absoluteDestDir = await Deno.realPath(destDir)
const archiveDir = dirname(absoluteArchivePath)
const archiveName = basename(absoluteArchivePath)
// 生成唯一的挂载点名称,避免并发冲突
const mountId = nanoid(16)
const srcMount = `/src_${mountId}`
const destMount = `/dest_${mountId}`
try {
// 创建挂载点目录 (在 VFS 中)
sevenZip.FS.mkdir(srcMount)
sevenZip.FS.mkdir(destMount)
// 挂载源目录(包含压缩文件)和目标目录
sevenZip.FS.mount(sevenZip.NODEFS, { root: archiveDir }, srcMount)
sevenZip.FS.mount(sevenZip.NODEFS, { root: absoluteDestDir }, destMount)
// 切换到目标目录
sevenZip.FS.chdir(destMount)
// 执行解压命令
const archiveInVfs = `${srcMount}/${archiveName}`
const result = sevenZip.callMain(['x', '-y', archiveInVfs])
if (result !== 0) {
throw new Error(`7z 解压失败,返回码: ${result}`)
}
} finally {
// 清理工作 (必须执行)
try {
sevenZip.FS.chdir('/')
sevenZip.FS.unmount(srcMount)
sevenZip.FS.unmount(destMount)
sevenZip.FS.rmdir(srcMount)
sevenZip.FS.rmdir(destMount)
} catch {
// 忽略清理阶段的错误
}
}
}
/**
* 使用 7z-wasm 在 VFS 内完成 tar.gz 的两步解压
* gzip -> tar (在 VFS 内存中) -> files (写入物理文件系统)
* @param tarGzPath tar.gz 文件路径
* @param destDir 目标目录
*/
async function extractTarGzInVfs(
tarGzPath: string,
destDir: string,
): Promise<void> {
// @ts-ignore: npm package type definition issue
const sevenZip = await SevenZip()
const absoluteArchivePath = await Deno.realPath(tarGzPath)
const absoluteDestDir = await Deno.realPath(destDir)
const archiveDir = dirname(absoluteArchivePath)
const archiveName = basename(absoluteArchivePath)
// 生成唯一的挂载点和临时目录名称
const mountId = nanoid(16)
const srcMount = `/src_${mountId}`
const destMount = `/dest_${mountId}`
const tmpDir = `/tmp_${mountId}` // VFS 内存中的临时目录
try {
// 创建挂载点和临时目录
sevenZip.FS.mkdir(srcMount)
sevenZip.FS.mkdir(destMount)
sevenZip.FS.mkdir(tmpDir) // 这是 MEMFS,不会写入物理磁盘
// 挂载源目录和目标目录
sevenZip.FS.mount(sevenZip.NODEFS, { root: archiveDir }, srcMount)
sevenZip.FS.mount(sevenZip.NODEFS, { root: absoluteDestDir }, destMount)
// ========== 第一步:解压 gzip -> tar (到 VFS 内存临时目录) ==========
sevenZip.FS.chdir(tmpDir)
const archiveInVfs = `${srcMount}/${archiveName}`
const result1 = sevenZip.callMain(['x', '-y', archiveInVfs])
if (result1 !== 0) {
throw new Error(`tar.gz 第一步解压失败 (gzip),返回码: ${result1}`)
}
// 在 VFS 临时目录中查找生成的 tar 文件
const tmpDirContents = sevenZip.FS.readdir(tmpDir)
let tarFileName: string | null = null
for (const name of tmpDirContents) {
if (name !== '.' && name !== '..') {
tarFileName = name
break
}
}
if (!tarFileName) {
throw new Error('tar.gz 解压后未找到 tar 文件')
}
// ========== 第二步:解压 tar -> files (到物理目标目录) ==========
sevenZip.FS.chdir(destMount)
const tarInVfs = `${tmpDir}/${tarFileName}`
const result2 = sevenZip.callMain(['x', '-y', tarInVfs])
if (result2 !== 0) {
throw new Error(`tar.gz 第二步解压失败 (tar),返回码: ${result2}`)
}
} finally {
// 清理工作
try {
sevenZip.FS.chdir('/')
// 清理 VFS 临时目录中的文件
try {
const tmpContents = sevenZip.FS.readdir(tmpDir)
for (const name of tmpContents) {
if (name !== '.' && name !== '..') {
sevenZip.FS.unlink(`${tmpDir}/${name}`)
}
}
} catch {
// 忽略清理错误
}
// 卸载和删除目录
sevenZip.FS.unmount(srcMount)
sevenZip.FS.unmount(destMount)
sevenZip.FS.rmdir(srcMount)
sevenZip.FS.rmdir(destMount)
sevenZip.FS.rmdir(tmpDir)
} catch {
// 忽略清理阶段的错误
}
}
}
// ============================================================================
// 公开的解压函数
// ============================================================================
/**
* 解压 RAR 文件到指定目录
*/
export async function extractRar(
rarPath: string,
destDir: string,
): Promise<void> {
consola.start(`📦 解压 RAR 文件: ${basename(rarPath)}`)
await ensureDir(destDir)
await extractWith7z(rarPath, destDir)
const count = await countFiles(destDir)
consola.success(`解压完成,共 ${count} 个文件`)
}
/**
* 解压 ZIP 文件到指定目录
*/
export async function extractZip(
zipPath: string,
destDir: string,
): Promise<void> {
consola.start(`📦 解压 ZIP 文件: ${basename(zipPath)}`)
await ensureDir(destDir)
await extractWith7z(zipPath, destDir)
const count = await countFiles(destDir)
consola.success(`解压完成,共 ${count} 个文件`)
}
/**
* 解压 7z 文件到指定目录
*/
export async function extract7z(
sevenZipPath: string,
destDir: string,
): Promise<void> {
consola.start(`📦 解压 7z 文件: ${basename(sevenZipPath)}`)
await ensureDir(destDir)
await extractWith7z(sevenZipPath, destDir)
const count = await countFiles(destDir)
consola.success(`解压完成,共 ${count} 个文件`)
}
/**
* 解压 tar 文件到指定目录(支持 .tar, .tar.gz)
* 对于 tar.gz 文件,自动执行两步解压:gzip -> tar -> files
*/
export async function extractTar(
tarPath: string,
destDir: string,
): Promise<void> {
consola.start(`📦 解压 TAR 文件: ${basename(tarPath)}`)
await ensureDir(destDir)
// 检测文件类型
const fileType = await detectFileType(tarPath)
const isTarGz = fileType?.ext === 'tar.gz'
if (isTarGz) {
// 使用 VFS 内存完成两步解压,不影响物理文件系统
consola.info(` 🔄 检测到 tar.gz,在 VFS 内完成两步解压...`)
await extractTarGzInVfs(tarPath, destDir)
} else {
// 单步解压纯 tar 文件
await extractWith7z(tarPath, destDir)
}
const count = await countFiles(destDir)
consola.success(`解压完成,共 ${count} 个文件`)
}
/**
* 解压压缩文件到指定目录(统一入口)
* 支持:zip, rar, 7z, tar, tar.gz
*/
export async function extractArchive(
archivePath: string,
destDir: string,
): Promise<void> {
const fileType = await detectFileType(archivePath)
if (!fileType?.ext) {
throw new Error(`无法检测文件类型: ${archivePath}`)
}
switch (fileType.ext) {
case 'rar':
await extractRar(archivePath, destDir)
break
case 'zip':
await extractZip(archivePath, destDir)
break
case '7z':
await extract7z(archivePath, destDir)
break
case 'tar':
case 'tar.gz':
await extractTar(archivePath, destDir)
break
default:
throw new Error(
`不支持的压缩格式: ${fileType.ext} (${fileType.mime || 'unknown'})`,
)
}
}
#!/usr/bin/env -S deno run --quiet --allow-all --ext=ts
import { defineCommand, runMain } from 'npm:citty@0.1.6'
import { basename, dirname } from 'https://deno.land/std@0.208.0/path/mod.ts'
import SevenZip from 'npm:7z-wasm@1.2.0'
import { consola } from 'npm:consola@3.2.3'
import { nanoid } from 'npm:nanoid@5.1.6'
import { detectFileType } from './extractors.ts'
// ============================================================================
// 文件树构建
// ============================================================================
interface TreeNode {
[key: string]: TreeNode | null
}
/**
* 从文件路径列表构建树形结构
*/
function buildTree(paths: string[]): TreeNode {
const tree: TreeNode = {}
for (const filePath of paths) {
const parts = filePath.split('/').filter((p) => p.length > 0)
let current = tree
for (let i = 0; i < parts.length; i++) {
const part = parts[i]
const isLast = i === parts.length - 1
if (!current[part]) {
current[part] = isLast ? null : {}
}
if (current[part] !== null) {
current = current[part] as TreeNode
}
}
}
return tree
}
/**
* 打印树形结构 (类似 tree 命令)
*/
function printTree(
node: TreeNode,
prefix = '',
isRoot = true,
rootName = '',
): void {
if (isRoot && rootName) {
console.log(`📦 ${rootName}`)
}
const entries = Object.entries(node)
entries.sort(([a], [b]) => {
// 目录优先
const aIsDir = node[a] !== null
const bIsDir = node[b] !== null
if (aIsDir !== bIsDir) return aIsDir ? -1 : 1
return a.localeCompare(b)
})
entries.forEach(([name, child], index) => {
const isLast = index === entries.length - 1
const connector = isLast ? '└── ' : '├── '
const icon = child !== null ? '📁' : '📄'
console.log(`${prefix}${connector}${icon} ${name}`)
if (child !== null) {
const newPrefix = prefix + (isLast ? ' ' : '│ ')
printTree(child, newPrefix, false)
}
})
}
/**
* 统计文件和目录数量
*/
function countItems(node: TreeNode): { files: number; dirs: number } {
let files = 0
let dirs = 0
for (const [, child] of Object.entries(node)) {
if (child === null) {
files++
} else {
dirs++
const sub = countItems(child)
files += sub.files
dirs += sub.dirs
}
}
return { files, dirs }
}
// ============================================================================
// 7z-wasm 列出文件
// ============================================================================
/**
* 使用 7z-wasm 列出压缩包内的文件
*/
async function listArchiveFiles(archivePath: string): Promise<string[]> {
const absoluteArchivePath = await Deno.realPath(archivePath)
const archiveDir = dirname(absoluteArchivePath)
const archiveName = basename(absoluteArchivePath)
// 生成唯一的挂载点名称
const mountId = nanoid(16)
const srcMount = `/src_${mountId}`
// 捕获 stdout 输出
const files: string[] = []
let capturing = false
// 在初始化时传入 print 函数来捕获输出
// @ts-ignore: npm package type definition issue
const sevenZip = await SevenZip({
print: (text: string) => {
// 解析 7z l 命令的输出
// 格式类似:
// Date Time Attr Size Compressed Name
// ------------------- ----- ------------ ------------ ------------------------
// 2024-01-01 12:00:00 ....A 12345 12345 path/to/file.txt
// ------------------- ----- ------------ ------------ ------------------------
const line = text.trim()
// 检测分隔线来确定开始/结束捕获
if (line.startsWith('---')) {
capturing = !capturing
return
}
if (capturing && line.length > 0) {
// 尝试解析文件名 (在最后一列)
// 格式: Date Time Attr Size Compressed Name
// 例如: 2024-12-26 10:37:22 D...A 0 0 AssetBundle
// 使用正则匹配 - 注意 Size 和 Compressed 之间可能有多个空格
const match = line.match(
/^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\s+[\w.]+\s+\d+\s+\d+\s+(.+)$/,
)
if (match) {
files.push(match[1].trim())
}
}
},
printErr: () => {}, // 静默错误输出
})
try {
// 创建挂载点
sevenZip.FS.mkdir(srcMount)
// 挂载源目录
sevenZip.FS.mount(sevenZip.NODEFS, { root: archiveDir }, srcMount)
// 执行列表命令
const archiveInVfs = `${srcMount}/${archiveName}`
const result = sevenZip.callMain(['l', archiveInVfs])
if (result !== 0) {
throw new Error(`7z 列表命令失败,返回码: ${result}`)
}
return files
} finally {
// 清理工作
try {
sevenZip.FS.unmount(srcMount)
sevenZip.FS.rmdir(srcMount)
} catch {
// 忽略清理错误
}
}
}
// ============================================================================
// CLI 命令定义
// ============================================================================
const main = defineCommand({
meta: {
name: 'list-archive',
version: '1.0.0',
description: '列出压缩包中的文件列表,以树形结构展示',
},
args: {
archive: {
type: 'positional',
description: '压缩包文件路径',
required: true,
},
flat: {
type: 'boolean',
description: '以平铺列表方式显示(不使用树形结构)',
default: false,
},
count: {
type: 'boolean',
description: '仅显示文件数量统计',
default: false,
},
},
async run({ args }: { args: { archive: string; flat: boolean; count: boolean } }) {
const archivePath = args.archive
// 检查文件是否存在
try {
await Deno.stat(archivePath)
} catch {
consola.error(`文件不存在: ${archivePath}`)
Deno.exit(1)
}
// 检测文件类型
const fileType = await detectFileType(archivePath)
if (!fileType?.ext) {
consola.error(`无法识别文件类型: ${archivePath}`)
Deno.exit(1)
}
const supportedFormats = ['rar', 'zip', '7z', 'tar', 'tar.gz']
if (!supportedFormats.includes(fileType.ext)) {
consola.error(`不支持的压缩格式: ${fileType.ext}`)
Deno.exit(1)
}
consola.start(`正在读取压缩包: ${basename(archivePath)} (${fileType.ext})`)
try {
const files = await listArchiveFiles(archivePath)
if (files.length === 0) {
consola.warn('压缩包为空或无法读取文件列表')
return
}
console.log() // 空行分隔
if (args.count) {
// 仅显示统计
const tree = buildTree(files)
const { files: fileCount, dirs: dirCount } = countItems(tree)
consola.success(`共 ${dirCount} 个目录, ${fileCount} 个文件`)
} else if (args.flat) {
// 平铺列表
files.forEach((f) => console.log(f))
console.log()
consola.info(`共 ${files.length} 个条目`)
} else {
// 树形结构
const tree = buildTree(files)
printTree(tree, '', true, basename(archivePath))
console.log()
const { files: fileCount, dirs: dirCount } = countItems(tree)
consola.info(`共 ${dirCount} 个目录, ${fileCount} 个文件`)
}
} catch (error) {
consola.error(`读取压缩包失败: ${(error as Error).message}`)
Deno.exit(1)
}
},
})
// 运行主命令
runMain(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment