diskW_test.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import os
  2. import subprocess
  3. import re
  4. def run_command(command):
  5. """运行命令并返回输出"""
  6. try:
  7. result = subprocess.run(command, shell=True, text=True, capture_output=True, check=True)
  8. return result.stdout
  9. except subprocess.CalledProcessError as e:
  10. print(f"命令执行失败: {e}")
  11. return None
  12. def detect_disks():
  13. """检测系统中的磁盘设备"""
  14. print("检测磁盘设备...")
  15. output = run_command("lsblk -dn -o NAME,SIZE")
  16. if output:
  17. disks = re.findall(r"(\w+)\s+([\d.]+[A-Z])", output)
  18. return disks
  19. return []
  20. def generate_fio_config(filename, mode, bs, size, numjobs, runtime):
  21. """生成 fio 配置文件"""
  22. config = f"""
  23. [global]
  24. ioengine=sync
  25. direct=1
  26. thread=1
  27. time_based=1
  28. runtime={runtime}
  29. group_reporting=1
  30. [fio_test]
  31. filename={filename}
  32. rw={mode}
  33. bs={bs}
  34. size={size}
  35. numjobs={numjobs}
  36. """
  37. return config
  38. def run_fio_test(filename, mode, bs="1M", size="512M", numjobs=1, runtime=60):
  39. """运行 fio 测试"""
  40. print(f"开始对 {filename} 进行 {mode} 测试...")
  41. config = generate_fio_config(filename, mode, bs, size, numjobs, runtime)
  42. config_file = "/tmp/fio_test.fio"
  43. with open(config_file, "w") as f:
  44. f.write(config)
  45. result = run_command(f"fio {config_file}")
  46. if result:
  47. print("测试完成。以下是结果:")
  48. print(result)
  49. else:
  50. print("测试失败。")
  51. os.remove(config_file)
  52. def main():
  53. print("磁盘性能自动化测试")
  54. disks = detect_disks()
  55. if not disks:
  56. print("未检测到磁盘设备。请检查系统配置。")
  57. return
  58. print("检测到以下磁盘设备:")
  59. for idx, (name, size) in enumerate(disks):
  60. print(f"{idx + 1}. /dev/{name} ({size})")
  61. #choice = input("请选择要测试的磁盘编号(或输入 0 退出):")
  62. choice = "1"
  63. print (type(choice))
  64. if not choice.isdigit() or int(choice) < 1 or int(choice) > len(disks):
  65. print("退出程序。")
  66. return
  67. selected_disk = f"/dev/{disks[int(choice) - 1][0]}"
  68. print(f"选择的磁盘设备:{selected_disk}")
  69. mode = "write"
  70. #mode = input("请输入测试模式(read/write/randread/randwrite):")
  71. print (type(mode))
  72. if mode not in ["read", "write", "randread", "randwrite"]:
  73. print("无效的模式。退出程序。")
  74. return
  75. # 运行测试
  76. run_fio_test(selected_disk, mode)
  77. if __name__ == "__main__":
  78. main()