run_test.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786
  1. #!/usr/bin/python
  2. # _*_ coding: utf-8 _*_
  3. import traceback
  4. try:
  5. from Tkinter import * # python2
  6. import ttk
  7. import tkMessageBox
  8. import tkFont
  9. except Exception as e:
  10. # Python 2环境下不会走到这里,保留仅做兼容占位
  11. from tkinter import *
  12. from tkinter import ttk
  13. from tkinter import messagebox as tkMessageBox
  14. import tkinter.font as tkFont
  15. import sys
  16. import os
  17. import subprocess
  18. import time
  19. import threading
  20. import csv
  21. from datetime import datetime
  22. import subprocess
  23. import test_function
  24. # class test_function:
  25. # @staticmethod
  26. # def default_test(context, args=""):
  27. # """默认测试函数,实际使用时替换为真实逻辑"""
  28. # time.sleep(0.1)
  29. # return True, "Test passed (default function)"
  30. #
  31. # sys.modules['test_function'] = test_function
  32. class TestContext:
  33. def __init__(self):
  34. self.script_ver = 'C181-C-C7-v2.4'
  35. self.log_file = './Logs/temp.txt'
  36. self.log1_buffer = ""
  37. self.tp_path = "./testplan.csv"
  38. self.device_info = {}
  39. self.params_dict = {
  40. 'path_loss': '25',
  41. 'wave_file': './11b_waveforms/wave11rc_1000.mod',
  42. 'per_limit': '92',
  43. 'SA_RSSI': '-80',
  44. 'BTOn_RSSI': '-30'
  45. }
  46. try:
  47. if sys.version_info >= (3, 0):
  48. # Python 3 直接用 open + encoding
  49. with open(self.tp_path, "r", encoding='utf-8') as f:
  50. reader = csv.DictReader(f)
  51. self.testplan_list = []
  52. for row in reader:
  53. if row.get("Disable", "") != "Y":
  54. self.testplan_list.append(row)
  55. else:
  56. try:
  57. f = open(self.tp_path, 'r')
  58. reader = csv.DictReader(f)
  59. self.testplan_list = []
  60. # 修复Python 2中不等于运算符(~=是错误的,应该是!=)
  61. for row in reader:
  62. if row.get("Disable", "") != "Y":
  63. self.testplan_list.append(row)
  64. except Exception as e:
  65. traceback.print_exc()
  66. finally:
  67. f.close() # Python 2需显式关闭文件
  68. except Exception as e:
  69. print(e)
  70. self.step_status = ['ns'] * len(self.testplan_list)
  71. self.result_msg = [''] * len(self.testplan_list)
  72. self.current_loop = 0 # 当前循环次数
  73. self.total_loops = 1 # 总循环次数
  74. def create_sample_testplan(self):
  75. if not os.path.exists(os.path.dirname(self.tp_path)):
  76. os.makedirs(os.path.dirname(self.tp_path))
  77. with open(self.tp_path, 'w', encoding='utf-8', newline='') as f:
  78. writer = csv.DictWriter(f, fieldnames=['TestName', 'Function', 'Arguments', 'Disable'])
  79. writer.writeheader()
  80. writer.writerow({
  81. 'TestName': 'Sample Test',
  82. 'Function': 'default_test',
  83. 'Arguments': '',
  84. 'Disable': ''
  85. })
  86. with open(self.tp_path, "r", encoding="utf-8") as f:
  87. reader = csv.DictReader(f)
  88. self.testplan_list = []
  89. for row in reader:
  90. if row.get("Disable", "") != "Y":
  91. self.testplan_list.append(row)
  92. def print1(self, string, newline=0):
  93. if newline == 0:
  94. self.log1_buffer += "\n" + str(string)
  95. elif newline == 1:
  96. self.log1_buffer += "\n" + str(string) + "\n"
  97. def write_log_to_file(self, content):
  98. curr_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
  99. log_dir = "./Logs/"
  100. if not os.path.exists(log_dir):
  101. os.makedirs(log_dir)
  102. log_file = "log_" + curr_time + ".txt"
  103. log_file_location = log_dir + log_file
  104. # Python 2文件写入(指定编码避免中文乱码)
  105. file = open(log_file_location, 'a')
  106. file.write(str(content) + '\n')
  107. file.close()
  108. class IO_process:
  109. def __init__(self, log_file_path):
  110. self.log_file = log_file_path
  111. self.line_count = 0
  112. def read_file(self):
  113. try:
  114. f = open(self.log_file, 'r')
  115. flag = 0
  116. line = ""
  117. for i, curr_line in enumerate(f):
  118. if i == self.line_count:
  119. self.line_count += 1
  120. flag = 1
  121. line = curr_line
  122. break
  123. f.close()
  124. if flag == 1:
  125. return line.rstrip('\r')
  126. else:
  127. return ""
  128. except Exception as e:
  129. return ""
  130. def empty_file(self):
  131. f = open(self.log_file, 'w')
  132. f.close()
  133. def reset_counter(self):
  134. self.line_count = 0
  135. class MasterThread(threading.Thread):
  136. def __init__(self, context, selected_steps):
  137. threading.Thread.__init__(self)
  138. self.context = context
  139. self.stop_flag = False
  140. self.selected_steps = selected_steps
  141. def run(self):
  142. for loop_num in range(1, self.context.total_loops + 1):
  143. if self.stop_flag:
  144. break
  145. self.context.current_loop = loop_num
  146. self.context.print1("=" * 80, 1)
  147. self.context.print1("Loop {}/{} - X86 C180 Test Started".format(loop_num, self.context.total_loops), 1)
  148. self.context.print1("Script Version: {}".format(self.context.script_ver), 1)
  149. self.context.print1("Selected Test Steps: {}".format(self.selected_steps), 1)
  150. self.context.print1("=" * 80, 1)
  151. self.context.step_status = ['ns'] * len(self.context.testplan_list)
  152. self.context.result_msg = [''] * len(self.context.testplan_list)
  153. for step_num in self.selected_steps:
  154. if self.stop_flag:
  155. break
  156. test_item = self.context.testplan_list[step_num]
  157. self.context.print1("\n----------------------------------------------------")
  158. self.context.print1("Loop {} - Step {} Started test...".format(loop_num, step_num))
  159. func_name = test_item.get("Function", "")
  160. arguments = test_item.get("Arguments", "")
  161. func = getattr(test_function, func_name, None) if 'test_function' in sys.modules else None
  162. if not func:
  163. self.context.step_status[step_num] = "f"
  164. self.context.result_msg[step_num] = "%s is not defined" % func_name
  165. continue
  166. self.context.step_status[step_num] = 'r'
  167. try:
  168. if arguments:
  169. result, result_msg = func(self.context, arguments)
  170. else:
  171. result, result_msg = func(self.context)
  172. except Exception as e:
  173. result = False
  174. result_msg = "Error: {}".format(str(e))
  175. self.context.step_status[step_num] = "p" if result else "f"
  176. self.context.result_msg[step_num] = result_msg
  177. self.context.print1("Loop {} - Step {} Result: {}".format(loop_num, step_num, result_msg))
  178. self.context.print1("\n----------------------------------------------------")
  179. if not self.stop_flag:
  180. self.context.print1("\nLoop {} Completed!".format(loop_num, 1))
  181. if loop_num < self.context.total_loops:
  182. self.context.print1("Waiting 2 seconds before next loop...", 1)
  183. time.sleep(2)
  184. self.context.print1("\n" + "=" * 80, 1)
  185. if self.stop_flag:
  186. self.context.print1("Test stopped manually!", 1)
  187. else:
  188. self.context.print1("All {} loops completed!".format(self.context.total_loops), 1)
  189. self.context.print1("Test concluded!", 1)
  190. self.context.print1("=" * 80, 1)
  191. def stop(self):
  192. """停止测试线程"""
  193. self.stop_flag = True
  194. class GuiThread(object):
  195. def __init__(self):
  196. self.context = TestContext()
  197. self.thread_list = {}
  198. self.counter = 0
  199. self.root = Tk()
  200. self.root.geometry("1800x1200")
  201. small_font = tkFont.Font(family="DejaVu Sans", size=8)
  202. self.start_time = None
  203. self.timer_label = None
  204. self.timer_label = None
  205. self.timer_id = None
  206. self.test_finished = False
  207. self.root.protocol("WM_DELETE_WINDOW", self.destroy)
  208. self.root.title("WiPER: FT Simular Test")
  209. self.sn_frame = Frame(self.root)
  210. self.need_sn_var = BooleanVar() # 存储复选框状态
  211. self.need_sn_var.set(True) # 默认勾选(需要校验SN)
  212. self.need_sn_check = Checkbutton(
  213. self.sn_frame,
  214. text="Need SN",
  215. variable=self.need_sn_var,
  216. font=("Arial", 9),
  217. command=self.toggle_sn_input # 勾选状态变化时触发输入框启用/禁用
  218. )
  219. self.sn_label = Label(self.sn_frame, text="Device SN:", fg='darkblue', font=("Arial", 10))
  220. self.sn_entry = Entry(self.sn_frame, width=20, font=("Arial", 10)) # 宽度更大,适配SN长度
  221. self.sn_entry.insert(0, "") # 默认空值
  222. self.sn_entry.bind('<Return>', lambda event: self.get_device_sn())
  223. self.get_sn_btn = Button(self.sn_frame, text="Confirm SN", font=("Arial", 9),
  224. command=self.get_device_sn, width=16)
  225. self.sn_hint = Label(self.sn_frame, text="(Device Serial Number)", fg='gray', font=("Arial", 8))
  226. self.need_sn_check.pack(side=LEFT, padx=5)
  227. self.sn_label.pack(side=LEFT, padx=5)
  228. self.sn_entry.pack(side=LEFT, padx=5)
  229. self.get_sn_btn.pack(side=LEFT, padx=5) # 按钮放在输入框后面
  230. self.sn_hint.pack(side=LEFT, padx=5)
  231. self.sn_frame.pack(pady=5)
  232. self.time_loop_frame = Frame(self.root)
  233. self.time_loop_frame.pack(pady=5)
  234. self.timer_label = Label(self.time_loop_frame, text="Time: 00:00:00", fg='darkred',
  235. font=tkFont.Font(family="Arial", size=10, weight="bold"))
  236. self.loop_status_label = Label(self.time_loop_frame, text="Current Loop: 0/0", fg='purple',
  237. font=tkFont.Font(family="Arial"))
  238. self.loop_label = Label(self.time_loop_frame, text="Loop Count:", fg='darkblue', font=("Arial", 10))
  239. self.loop_entry = Entry(self.time_loop_frame, width=10, font=("Arial", 10))
  240. self.loop_entry.insert(0, "1") # 默认循环1次
  241. self.loop_hint = Label(self.time_loop_frame, text="(test loops)", fg='gray', font=("Arial", 8))
  242. self.timer_label.pack(side=LEFT, padx=5)
  243. self.loop_status_label.pack(side=LEFT, padx=5)
  244. self.loop_label.pack(side=LEFT, padx=5)
  245. self.loop_entry.pack(side=LEFT, padx=5)
  246. self.loop_hint.pack(side=LEFT, padx=5)
  247. self.note1 = Label(self.root, text="Please press 'Start Test' to begin...", fg='blue')
  248. self.start_button = Button(self.root, text="Start Test", command=self.start)
  249. self.stop_button = Button(self.root, text="Stop Test", command=self.stop_test, state=DISABLED)
  250. self.test_status_label = Label(self.root, text="NOT STARTED", width=15, bg='gray')
  251. self.deviceInfo_label = Label(self.root, text="Device SN: ", width=20, bg="#CCCCCC")
  252. self.notebook_box = ttk.Notebook(self.root, name='notebook')
  253. self.status_frame = Frame(self.root)
  254. self.logFrame = Frame(self.root)
  255. self.parametersFrame = Frame(self.root)
  256. self.logText = Text(self.logFrame)
  257. self.logScrollbar = Scrollbar(self.logFrame)
  258. self.canvas = Canvas(self.status_frame, bg="white")
  259. self.scrollbar_y = Scrollbar(self.status_frame, command=self.canvas.yview)
  260. self.lblframe = Frame(self.canvas)
  261. self.canvas_window = self.canvas.create_window((0, 0), window=self.lblframe, anchor="nw")
  262. self.canvas.configure(yscrollcommand=self.scrollbar_y.set)
  263. def on_canvas_configure(event):
  264. self.canvas.itemconfig(self.canvas_window, width=event.width)
  265. self.canvas.configure(scrollregion=self.canvas.bbox("all"))
  266. def on_frame_configure(event):
  267. self.canvas.configure(scrollregion=self.canvas.bbox("all"))
  268. self.canvas.bind("<Configure>", on_canvas_configure)
  269. self.lblframe.bind("<Configure>", on_frame_configure)
  270. def on_mouse_wheel(event):
  271. if event.delta > 0:
  272. self.canvas.yview_scroll(-1, "pages")
  273. else:
  274. self.canvas.yview_scroll(1, "pages")
  275. if sys.platform == 'darwin':
  276. self.canvas.bind('<MouseWheel>', on_mouse_wheel)
  277. else:
  278. self.canvas.bind('<MouseWheel>', on_mouse_wheel)
  279. self.canvas.bind('<Button-4>', lambda e: self.canvas.yview_scroll(-1, "units"))
  280. self.canvas.bind('<Button-5>', lambda e: self.canvas.yview_scroll(1, "units"))
  281. self.canvas.bind("<Enter>", lambda e: self.canvas.focus_set())
  282. self.canvas.pack(side=LEFT, fill=BOTH, expand=True)
  283. self.scrollbar_y.pack(side=RIGHT, fill=Y)
  284. self.index_list = []
  285. self.step_check_vars = [] # 复选框变量
  286. self.step_checkbuttons = [] # 复选框控件
  287. self.step_buttons = []
  288. self.step_name_list = []
  289. self.step_data_list = []
  290. self.step_status_list = []
  291. self.select_all_var = BooleanVar(value=True) # 默认全选
  292. self.select_all_check = Checkbutton(
  293. self.lblframe,
  294. variable=self.select_all_var,
  295. command=self.on_select_all_toggle, # 切换时自动全选/取消全选
  296. anchor='center'
  297. )
  298. self.select_all_check.grid(row=0, column=0, padx=2, pady=1)
  299. title_1 = Label(self.lblframe, text="Index", anchor='center', width=5, font=small_font)
  300. title_1.grid(row=0, column=1, sticky=W, padx=2, pady=1)
  301. title_2 = Label(self.lblframe, text="Run Steps", anchor='center', width=12, font=small_font)
  302. title_2.grid(row=0, column=2, sticky=W, padx=2, pady=1)
  303. title_3 = Label(self.lblframe, text="TestName", anchor='center', width=50, font=small_font)
  304. title_3.grid(row=0, column=3, sticky=W, padx=2, pady=1)
  305. title_4 = Label(self.lblframe, text="TestData", anchor='center', width=50, font=small_font)
  306. title_4.grid(row=0, column=4, sticky=W, padx=2, pady=1)
  307. title_5 = Label(self.lblframe, text="TestStatus", anchor='center', width=20, font=small_font)
  308. title_5.grid(row=0, column=5, sticky=W, padx=2, pady=1)
  309. for index, desc in enumerate(self.context.testplan_list):
  310. # 新增:复选框(默认全选)
  311. check_var = BooleanVar(value=True)
  312. self.step_check_vars.append(check_var)
  313. check_btn = Checkbutton(
  314. self.lblframe,
  315. variable=check_var,
  316. anchor='center',
  317. command=self.update_select_all_state # 点击时自动更新顶部全选框
  318. )
  319. self.step_checkbuttons.append(check_btn)
  320. index_lable = Label(self.lblframe, text=str(index), anchor='center', width=5, font=small_font)
  321. self.index_list.append(index_lable)
  322. btn = Button(self.lblframe, text="Run Step %d" % index,
  323. command=lambda num=index: self.run_single_step(num),
  324. width=12, bg='lightblue', font=small_font)
  325. self.step_buttons.append(btn)
  326. item_label = Label(self.lblframe, text=desc.get("TestName", ""), anchor='w', bg='#CCCCCC', width=50,
  327. font=small_font)
  328. self.step_name_list.append(item_label)
  329. data_label = Label(self.lblframe, text="", anchor='w', bg='#CCCCCC', width=50, font=small_font)
  330. self.step_data_list.append(data_label)
  331. status_label = Label(self.lblframe, text="NOT STARTED", bg='#CCCCCC', width=20, font=small_font)
  332. self.step_status_list.append(status_label)
  333. self.my_io_process = IO_process(self.context.log_file)
  334. self.readLog1()
  335. self.set_param_frame()
  336. def select_all(self):
  337. for var in self.step_check_vars:
  338. var.set(True)
  339. def unselect_all(self):
  340. for var in self.step_check_vars:
  341. var.set(False)
  342. def get_selected_steps(self):
  343. selected = []
  344. for idx, var in enumerate(self.step_check_vars):
  345. if var.get():
  346. selected.append(idx)
  347. return selected
  348. def toggle_sn_input(self):
  349. if self.need_sn_var.get():
  350. self.sn_entry.config(state=NORMAL)
  351. self.get_sn_btn.config(state=NORMAL)
  352. self.sn_label.config(fg='darkblue')
  353. else:
  354. self.sn_entry.config(state=DISABLED)
  355. self.get_sn_btn.config(state=DISABLED)
  356. self.sn_label.config(fg='gray')
  357. self.deviceInfo_label.config(text="Device SN: N/A")
  358. self.context.device_info['ScanSN'] = "N/A"
  359. def get_device_sn(self):
  360. if not self.need_sn_var.get():
  361. return
  362. test_running = 'thread' in self.thread_list and self.thread_list['thread'].is_alive()
  363. if test_running:
  364. return
  365. sn_value = self.sn_entry.get().strip()
  366. if sn_value:
  367. self.context.device_info['ScanSN'] = sn_value
  368. else:
  369. self.context.device_info['ScanSN'] = "Unknown"
  370. tkMessageBox.showerror("Error", "请输入SN!")
  371. return
  372. sn_length = len(sn_value)
  373. if sn_length != 11:
  374. tkMessageBox.showerror("Error", "SN长度不合格!应该为11位。")
  375. return
  376. string = "Device: " + sn_value
  377. self.deviceInfo_label.config(text=string)
  378. self.start()
  379. def update_timer(self):
  380. if self.start_time and not self.test_finished:
  381. elapsed_seconds = int(time.time() - self.start_time)
  382. hours = elapsed_seconds // 3600
  383. minutes = (elapsed_seconds % 3600) // 60
  384. seconds = elapsed_seconds % 60
  385. timer_text = "Time: %02d:%02d:%02d" % (hours, minutes, seconds)
  386. self.timer_label.config(text=timer_text)
  387. self.loop_status_label.config(
  388. text="Current Loop: {}/{}".format(self.context.current_loop, self.context.total_loops)
  389. )
  390. self.timer_id = self.root.after(1000, self.update_timer)
  391. def stop_timer(self):
  392. if self.timer_id:
  393. self.root.after_cancel(self.timer_id)
  394. self.timer_id = None
  395. self.test_finished = True
  396. self.loop_status_label.config(
  397. text="Loop: {}/{}".format(self.context.current_loop, self.context.total_loops)
  398. )
  399. if self.start_time:
  400. total_seconds = int(time.time() - self.start_time)
  401. hours = total_seconds // 3600
  402. minutes = (total_seconds % 3600) // 60
  403. seconds = total_seconds % 60
  404. final_text = "Total Time: %02d:%02d:%02d" % (hours, minutes, seconds)
  405. self.timer_label.config(text=final_text, fg='darkgreen')
  406. def stop_test(self):
  407. if tkMessageBox.askokcancel("Stop Test?", "Are you sure you want to stop the test?"):
  408. if 'thread' in self.thread_list and self.thread_list['thread'].is_alive():
  409. self.thread_list['thread'].stop()
  410. self.stop_timer()
  411. self.test_status_label.config(text='STOPPED', bg='orange')
  412. self.start_button.config(state=NORMAL)
  413. self.stop_button.config(state=DISABLED)
  414. self.get_sn_btn.config(state=NORMAL if self.need_sn_var.get() else DISABLED)
  415. self.sn_entry.config(state=NORMAL if self.need_sn_var.get() else DISABLED)
  416. self.loop_entry.config(state=NORMAL)
  417. self.select_all_check.config(state=NORMAL)
  418. for button in self.step_buttons:
  419. button.config(state=NORMAL)
  420. for check_obj in self.step_checkbuttons:
  421. check_obj.config(state=NORMAL)
  422. def destroy(self):
  423. if tkMessageBox.askokcancel("Quit?", "Are you sure you want to quit?"):
  424. if 'thread' in self.thread_list and self.thread_list['thread'].is_alive():
  425. self.thread_list['thread'].stop()
  426. self.stop_timer()
  427. self.root.quit()
  428. def run(self):
  429. self.note1.pack()
  430. btn_frame = Frame(self.root)
  431. btn_frame.pack()
  432. self.start_button = Button(btn_frame, text="Start Test", command=self.start)
  433. self.stop_button = Button(btn_frame, text="Stop Test", command=self.stop_test, state=DISABLED)
  434. self.start_button.pack(side=LEFT, padx=5)
  435. self.stop_button.pack(side=LEFT, padx=5)
  436. self.test_status_label.pack()
  437. self.deviceInfo_label.pack(fill=X, expand=False, padx=50, pady=20)
  438. self.logScrollbar.config(command=self.logText.yview)
  439. self.logText.config(yscrollcommand=self.logScrollbar.set)
  440. self.logText.pack(side=LEFT, fill=Y)
  441. self.logScrollbar.pack(side=RIGHT, fill=Y)
  442. # 布局步骤列表
  443. for index in range(len(self.context.testplan_list)):
  444. self.step_checkbuttons[index].grid(row=index + 1, column=0, sticky=W, padx=2, pady=0)
  445. self.index_list[index].grid(row=index + 1, column=1, sticky=W, padx=2, pady=0)
  446. self.step_buttons[index].grid(row=index + 1, column=2, sticky=W, padx=2, pady=0)
  447. self.step_name_list[index].grid(row=index + 1, column=3, sticky=W, padx=2, pady=0)
  448. self.step_data_list[index].grid(row=index + 1, column=4, sticky=W, padx=2, pady=0)
  449. self.step_status_list[index].grid(row=index + 1, column=5, sticky=W, padx=2, pady=0)
  450. self.notebook_box.add(self.status_frame, text='Status')
  451. self.notebook_box.add(self.logFrame, text='Log')
  452. self.notebook_box.add(self.parametersFrame, text='Parameters')
  453. self.notebook_box.enable_traversal()
  454. self.notebook_box.pack(fill=BOTH, expand="true", padx=5, pady=1)
  455. self.root.mainloop()
  456. def readLog1(self):
  457. if self.context.log1_buffer != "":
  458. self.logText.insert(END, self.context.log1_buffer)
  459. text = self.my_io_process.read_file()
  460. if text != "":
  461. self.logText.insert(END, text)
  462. if self.context.log1_buffer != "" or text != "":
  463. self.logText.yview_pickplace("end")
  464. self.context.log1_buffer = ""
  465. self.root.update()
  466. self.root.after(100, self.readLog1)
  467. def set_param_frame(self):
  468. self.L = []
  469. self.E = []
  470. count = 1
  471. for key, value in self.context.params_dict.items():
  472. lbl = Label(self.parametersFrame, text=key)
  473. enty = Entry(self.parametersFrame, bd=4)
  474. enty.delete(0, END)
  475. enty.insert(0, value)
  476. enty.config(state=DISABLED)
  477. lbl.grid(row=count, sticky=W)
  478. enty.grid(row=count, column=1)
  479. self.L.append(lbl)
  480. self.E.append(enty)
  481. count += 1
  482. def run_single_step(self, step_num):
  483. self.context.print1("\n--------------------single_step---------------------")
  484. self.context.print1("Step %d Started test..." % step_num)
  485. self.context.step_status[step_num] = 'r'
  486. item_info = self.context.testplan_list[step_num]
  487. func_name = item_info.get("Function", "")
  488. arguments = item_info.get("Arguments", "")
  489. func = getattr(test_function, func_name, None) if 'test_function' in sys.modules else None
  490. if not func:
  491. self.context.step_status[step_num] = "f"
  492. self.context.result_msg[step_num] = "%s is not defined" % func_name
  493. self.root.after(0, self.update_step_status)
  494. else:
  495. self.context.step_status[step_num] = 'r'
  496. self.root.after(0, self.update_step_status)
  497. def run_step():
  498. result, result_msg = False, "FAIL"
  499. if arguments:
  500. result, result_msg = func(self.context, arguments)
  501. else:
  502. result, result_msg = func(self.context)
  503. self.context.step_status[step_num] = "p" if result else "f"
  504. self.context.result_msg[step_num] = result_msg
  505. self.context.print1("\n----------------------------------------------------")
  506. self.context.print1(result_msg)
  507. self.root.after(0, self.update_step_status)
  508. thread = threading.Thread(target=run_step)
  509. thread.start()
  510. def update_step_status(self):
  511. total_no_of_steps = len(self.context.step_status)
  512. for index in range(total_no_of_steps):
  513. status = self.context.step_status[index]
  514. if status == 'r':
  515. self.step_status_list[index]["text"] = "RUNNING..."
  516. self.step_status_list[index]["bg"] = 'yellow'
  517. elif status == 'p':
  518. self.step_status_list[index]["text"] = "PASS"
  519. self.step_data_list[index]["text"] = self.context.result_msg[index]
  520. self.step_status_list[index]["bg"] = 'green'
  521. elif status == 'f':
  522. self.step_status_list[index]["text"] = "ERROR!"
  523. self.step_status_list[index]["bg"] = 'red'
  524. self.step_data_list[index]["text"] = self.context.result_msg[index]
  525. self.root.after(200, self.update_step_status)
  526. def start(self):
  527. # 获取勾选的测试步骤
  528. selected_steps = self.get_selected_steps()
  529. if not selected_steps:
  530. tkMessageBox.showerror("Error", "请至少选择一个测试项!")
  531. return
  532. # 获取循环次数
  533. try:
  534. loop_count = int(self.loop_entry.get())
  535. if loop_count < 1:
  536. tkMessageBox.showerror("Error", "Loop count must be at least 1!")
  537. return
  538. self.context.total_loops = loop_count
  539. self.context.current_loop = 0
  540. except ValueError:
  541. tkMessageBox.showerror("Error", "Please enter a valid number for loop count!")
  542. return
  543. # SN校验
  544. if self.need_sn_var.get():
  545. sn_value = self.sn_entry.get().strip()
  546. if not sn_value:
  547. tkMessageBox.showerror("Error", "请输入SN!")
  548. return
  549. if len(sn_value) != 11:
  550. tkMessageBox.showerror("Error", "SN长度不合格!应该为11位。")
  551. return
  552. self.context.device_info['ScanSN'] = sn_value
  553. self.deviceInfo_label.config(text="Device: " + sn_value)
  554. # writeSN 260328 by joe
  555. try:
  556. proc = subprocess.Popen( #amidelnx_64
  557. ["sh","scansn.sh","/SS",sn_value],
  558. stdin=subprocess.PIPE,
  559. stdout=subprocess.PIPE,
  560. stderr=subprocess.PIPE
  561. )
  562. stdout,stderr = proc.communicate(input="111111\n")
  563. result = proc.returncode
  564. #a = result.stdout
  565. except subprocess.CalledProcessError as e:
  566. tkMessageBox.showerror("Error", "Command failed: {e.stderr}")
  567. return
  568. else:
  569. self.context.device_info['ScanSN'] = "N/A"
  570. self.deviceInfo_label.config(text="Device SN: N/A")
  571. # 启动计时器
  572. self.start_time = time.time()
  573. self.test_finished = False
  574. self.update_timer()
  575. # 界面状态
  576. self.context.step_status = ['x'] + ['ns'] * len(self.context.testplan_list)
  577. self.start_button.config(state=DISABLED)
  578. self.stop_button.config(state=NORMAL)
  579. self.need_sn_check.config(state=DISABLED)
  580. self.get_sn_btn.config(state=DISABLED)
  581. self.sn_entry.config(state=DISABLED)
  582. self.loop_entry.config(state=DISABLED)
  583. self.select_all_check.config(state=DISABLED)
  584. for check_obj in self.step_checkbuttons:
  585. check_obj.config(state=DISABLED)
  586. for button in self.step_buttons:
  587. button.config(state=DISABLED)
  588. for data_lable in self.step_data_list:
  589. data_lable["text"] = ""
  590. self.logText.delete('1.0', 'end')
  591. self.my_io_process.reset_counter()
  592. self.thread_list['thread'] = MasterThread(self.context, selected_steps)
  593. self.thread_list['thread'].start()
  594. self.update_test_status()
  595. def update_test_status(self):
  596. total_no_of_steps = len(self.context.testplan_list)
  597. try:
  598. ScanSN = self.context.device_info['ScanSN']
  599. string = "Device: " + ScanSN
  600. self.deviceInfo_label.config(text=string)
  601. except KeyError:
  602. string = "Device: "
  603. self.deviceInfo_label.config(text=string)
  604. flag = 0
  605. count1 = 0
  606. count2 = 0
  607. count3 = 0
  608. for i in range(total_no_of_steps):
  609. status = self.context.step_status[i]
  610. if status == 'r':
  611. self.step_status_list[i]["text"] = "RUNNING..."
  612. self.step_status_list[i]["bg"] = 'yellow'
  613. self.test_status_label.config(bg='yellow')
  614. self.test_status_label.config(text='RUNNING...')
  615. self.step_data_list[i]["text"] = ""
  616. elif status == 'p':
  617. self.step_status_list[i]["text"] = "PASS"
  618. self.step_status_list[i]["bg"] = 'green'
  619. self.step_data_list[i]["text"] = self.context.result_msg[i]
  620. count2 += 1
  621. elif status == 'f':
  622. self.step_status_list[i]["text"] = "ERROR!"
  623. self.step_status_list[i]["bg"] = 'red'
  624. self.test_status_label.config(text='ERROR!')
  625. self.test_status_label.config(bg='red')
  626. self.step_data_list[i]["text"] = self.context.result_msg[i]
  627. count3 += 1
  628. elif status == 'ns':
  629. self.step_status_list[i]["text"] = "NOT STARTED"
  630. self.step_status_list[i]["bg"] = 'gray'
  631. count1 += 1
  632. if count1 == total_no_of_steps:
  633. self.test_status_label.config(bg='gray')
  634. self.test_status_label.config(text='NOT STARTED')
  635. test_running = 'thread' in self.thread_list and self.thread_list['thread'].is_alive()
  636. if not test_running and not self.test_finished:
  637. if count2 == len(self.get_selected_steps()):
  638. self.test_status_label.config(bg='green')
  639. self.test_status_label.config(text='DONE!')
  640. self.stop_timer()
  641. self.start_button.config(state=NORMAL)
  642. self.stop_button.config(state=DISABLED)
  643. self.need_sn_check.config(state=NORMAL)
  644. self.get_sn_btn.config(state=NORMAL if self.need_sn_var.get() else DISABLED)
  645. self.sn_entry.config(state=NORMAL if self.need_sn_var.get() else DISABLED)
  646. self.loop_entry.config(state=NORMAL)
  647. self.select_all_check.config(state=NORMAL)
  648. for check_obj in self.step_checkbuttons:
  649. check_obj.config(state=NORMAL)
  650. for button in self.step_buttons:
  651. button.config(state=NORMAL)
  652. flag = 1
  653. if flag == 0:
  654. self.root.after(100, self.update_test_status)
  655. else:
  656. self.context.device_info['ScanSN'] = ""
  657. return 0
  658. def on_select_all_toggle(self):
  659. is_select_all = self.select_all_var.get()
  660. for var in self.step_check_vars:
  661. var.set(is_select_all)
  662. def update_select_all_state(self):
  663. all_checked = all(var.get() for var in self.step_check_vars)
  664. self.select_all_var.set(all_checked)
  665. if __name__ == "__main__":
  666. if not os.path.exists("./Logs"):
  667. os.makedirs("./Logs")
  668. g = GuiThread()
  669. g.run()