Post content
#酷 import os import subprocess import concurrent.futures import time def convert_single_file(ts_path, mp4_path): filename = os.path.basename(ts_path) if os.path.exists(mp4_path): return False, f"⚠️ 已存在,跳过:{os.path.basename(mp4_path)}" try: result = subprocess.run( ['ffmpeg', '-i', ts_path, '-c', 'copy', mp4_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) if result.returncode == 0: return True, f"✅ 成功:{os.path.basename(mp4_path)}" else: return False, f"❌ 失败:{filename}" except Exception as e: return False, f"❌ 错误:{filename} - {str(e)}" def convert_ts_to_mp4(): start_time = time.time() current_dir = os.path.dirname(os.path.abspath(__file__)) ts_files = [f for f in os.listdir(current_dir) if f.lower().endswith('.ts')] print(f"检测到 [{len(ts_files)}] 个 .ts 文件:") for f in ts_files: print(f"- {f}") print("------") if not ts_files: print("没有找到.ts文件") input("\n按任意键退出...") return tasks = [] for filename in ts_files: ts_path = os.path.join(current_dir, filename) mp4_path = os.path.join(current_dir, os.path.splitext(filename)[0] + '.mp4') tasks.append((ts_path, mp4_path)) converted_count = 0 if len(tasks) == 1: success, log = convert_single_file(*tasks[0]) if success: converted_count += 1 print(log) else: with concurrent.futures.ThreadPoolExecutor(max_workers=min(4, os.cpu_count() or 1)) as executor: future_to_task = {executor.submit(convert_single_file, *task): task for task in tasks} for future in concurrent.futures.as_completed(future_to_task): success, log = future.result() if success: converted_count += 1 print(log) elapsed_time = time.time() - start_time print(f"\n📌 所有转换完成,共转换 [{converted_count}] 个 .ts 文件") print(f"⏱️ - 耗时: {elapsed_time:.2f} 秒") input("\n按任意键退出...") if __name__ == '__main__': convert_ts_to_mp4()