| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- import os
- import subprocess
- import re
- def run_command(command):
- """运行命令并返回输出"""
- try:
- result = subprocess.run(command, shell=True, text=True, capture_output=True, check=True)
- return result.stdout
- except subprocess.CalledProcessError as e:
- print(f"命令执行失败: {e}")
- return None
- def detect_disks():
- """检测系统中的磁盘设备"""
- print("检测磁盘设备...")
- output = run_command("lsblk -dn -o NAME,SIZE")
- if output:
- disks = re.findall(r"(\w+)\s+([\d.]+[A-Z])", output)
- return disks
- return []
- def generate_fio_config(filename, mode, bs, size, numjobs, runtime):
- """生成 fio 配置文件"""
- config = f"""
- [global]
- ioengine=sync
- direct=1
- thread=1
- time_based=1
- runtime={runtime}
- group_reporting=1
- [fio_test]
- filename={filename}
- rw={mode}
- bs={bs}
- size={size}
- numjobs={numjobs}
- """
- return config
- def run_fio_test(filename, mode, bs="1M", size="512M", numjobs=1, runtime=60):
- """运行 fio 测试"""
- print(f"开始对 {filename} 进行 {mode} 测试...")
- config = generate_fio_config(filename, mode, bs, size, numjobs, runtime)
- config_file = "/tmp/fio_test.fio"
- with open(config_file, "w") as f:
- f.write(config)
- result = run_command(f"fio {config_file}")
- if result:
- print("测试完成。以下是结果:")
- print(result)
- else:
- print("测试失败。")
- os.remove(config_file)
- def main():
- print("磁盘性能自动化测试")
- disks = detect_disks()
- if not disks:
- print("未检测到磁盘设备。请检查系统配置。")
- return
-
- print("检测到以下磁盘设备:")
- for idx, (name, size) in enumerate(disks):
- print(f"{idx + 1}. /dev/{name} ({size})")
- #choice = input("请选择要测试的磁盘编号(或输入 0 退出):")
- choice = "1"
- print (type(choice))
- if not choice.isdigit() or int(choice) < 1 or int(choice) > len(disks):
- print("退出程序。")
- return
-
- selected_disk = f"/dev/{disks[int(choice) - 1][0]}"
- print(f"选择的磁盘设备:{selected_disk}")
- mode = "read"
- #mode = input("请输入测试模式(read/write/randread/randwrite):")
- print (type(mode))
- if mode not in ["read", "write", "randread", "randwrite"]:
- print("无效的模式。退出程序。")
- return
- # 运行测试
- run_fio_test(selected_disk, mode)
- if __name__ == "__main__":
- main()
|