import os import subprocess import re def run_command(command): """运行命令并返回输出""" try: result = subprocess.run(command, shell=True, text=True, capture_output=True, check=True) return result.stdout except subprocess.CalledProcessError as e: print(f"命令执行失败: {e}") return None def detect_disks(): """检测系统中的磁盘设备""" print("检测磁盘设备...") output = run_command("lsblk -dn -o NAME,SIZE") if output: disks = re.findall(r"(\w+)\s+([\d.]+[A-Z])", output) return disks return [] def generate_fio_config(filename, mode, bs, size, numjobs, runtime): """生成 fio 配置文件""" config = f""" [global] ioengine=sync direct=1 thread=1 time_based=1 runtime={runtime} group_reporting=1 [fio_test] filename={filename} rw={mode} bs={bs} size={size} numjobs={numjobs} """ return config def run_fio_test(filename, mode, bs="1M", size="512M", numjobs=1, runtime=60): """运行 fio 测试""" print(f"开始对 {filename} 进行 {mode} 测试...") config = generate_fio_config(filename, mode, bs, size, numjobs, runtime) config_file = "/tmp/fio_test.fio" with open(config_file, "w") as f: f.write(config) result = run_command(f"fio {config_file}") if result: print("测试完成。以下是结果:") print(result) else: print("测试失败。") os.remove(config_file) def main(): print("磁盘性能自动化测试") disks = detect_disks() if not disks: print("未检测到磁盘设备。请检查系统配置。") return print("检测到以下磁盘设备:") for idx, (name, size) in enumerate(disks): print(f"{idx + 1}. /dev/{name} ({size})") #choice = input("请选择要测试的磁盘编号(或输入 0 退出):") choice = "1" print (type(choice)) if not choice.isdigit() or int(choice) < 1 or int(choice) > len(disks): print("退出程序。") return selected_disk = f"/dev/{disks[int(choice) - 1][0]}" print(f"选择的磁盘设备:{selected_disk}") mode = "read" #mode = input("请输入测试模式(read/write/randread/randwrite):") print (type(mode)) if mode not in ["read", "write", "randread", "randwrite"]: print("无效的模式。退出程序。") return # 运行测试 run_fio_test(selected_disk, mode) if __name__ == "__main__": main()