run_test (back-0322).py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. #!/usr/bin/python
  2. # _*_ coding: utf-8 _*_
  3. from Tkinter import * # python2
  4. import ttk
  5. import tkMessageBox
  6. import tkFont
  7. import sys
  8. import os
  9. import subprocess
  10. import time
  11. import threading
  12. import csv
  13. from datetime import datetime
  14. import test_function
  15. class TestContext:
  16. def __init__(self):
  17. self.script_ver = 'C181-C-C7-v2.4'
  18. self.log_file = './Logs/temp.txt'
  19. self.log1_buffer = ""
  20. self.tp_path = "./testplan.csv"
  21. self.device_info = {}
  22. self.params_dict = {
  23. 'path_loss': '25',
  24. 'wave_file': './11b_waveforms/wave11rc_1000.mod',
  25. 'per_limit': '92',
  26. 'SA_RSSI': '-80',
  27. 'BTOn_RSSI': '-30'
  28. }
  29. try:
  30. f = open(self.tp_path, 'r')
  31. reader = csv.DictReader(f)
  32. self.testplan_list = []
  33. for row in reader:
  34. if row.get("Disable", "") != "Y":
  35. self.testplan_list.append(row)
  36. except IOError as e:
  37. print(e)
  38. finally:
  39. f.close()
  40. self.step_status = ['ns'] * len(self.testplan_list)
  41. self.result_msg = [''] * len(self.testplan_list)
  42. def print1(self, string, newline=0):
  43. if newline == 0:
  44. self.log1_buffer += "\n" + str(string)
  45. elif newline == 1:
  46. self.log1_buffer += "\n" + str(string) + "\n"
  47. def write_log_to_file(self, content):
  48. curr_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
  49. log_dir = "./Logs/"
  50. if not os.path.exists(log_dir):
  51. os.makedirs(log_dir)
  52. log_file = "log_" + curr_time + ".txt"
  53. log_file_location = log_dir + log_file
  54. # Python 2文件写入(指定编码避免中文乱码)
  55. file = open(log_file_location, 'a')
  56. file.write(str(content) + '\n')
  57. file.close()
  58. class IO_process:
  59. def __init__(self, log_file_path):
  60. self.log_file = log_file_path
  61. self.line_count = 0
  62. def read_file(self):
  63. try:
  64. f = open(self.log_file, 'r')
  65. flag = 0
  66. line = ""
  67. for i, curr_line in enumerate(f):
  68. if i == self.line_count:
  69. self.line_count += 1
  70. flag = 1
  71. line = curr_line
  72. break
  73. f.close()
  74. if flag == 1:
  75. return line.rstrip('\r')
  76. else:
  77. return ""
  78. except Exception as e:
  79. return ""
  80. def empty_file(self):
  81. f = open(self.log_file, 'w')
  82. f.close()
  83. def reset_counter(self):
  84. self.line_count = 0
  85. class MasterThread(threading.Thread):
  86. def __init__(self, context):
  87. threading.Thread.__init__(self)
  88. self.context = context
  89. def run(self):
  90. self.context.print1("X86 C180####################################################", 1)
  91. self.context.print1(self.context.script_ver)
  92. for step_num, test_item in enumerate(self.context.testplan_list):
  93. self.context.print1("\n----------------------------------------------------")
  94. self.context.print1("Step " + str(step_num) + " Started test...")
  95. func_name = test_item.get("Function", "")
  96. arguments = test_item.get("Arguments", "")
  97. func = getattr(test_function, func_name, None) if 'test_function' in sys.modules else None
  98. if not func:
  99. self.context.step_status[step_num] = "f"
  100. self.context.result_msg[step_num] = "%s is not defined" % func_name # Python 2格式化
  101. continue
  102. self.context.step_status[step_num] = 'r' # running
  103. if arguments:
  104. result, result_msg = func(self.context, arguments)
  105. else:
  106. result, result_msg = func(self.context)
  107. self.context.step_status[step_num] = "p" if result else "f"
  108. self.context.result_msg[step_num] = result_msg
  109. self.context.print1("\n----------------------------------------------------")
  110. self.context.print1(result_msg)
  111. self.context.print1("\n----------------------------------------------------")
  112. self.context.print1("Test concluded!")
  113. class GuiThread(object):
  114. def __init__(self):
  115. self.context = TestContext()
  116. self.thread_list = {}
  117. self.counter = 0
  118. self.root = Tk()
  119. self.root.geometry("1800x1200")
  120. small_font = tkFont.Font(family="DejaVu Sans", size=8)
  121. self.start_time = None
  122. self.timer_label = None
  123. self.timer_id = None
  124. self.test_finished = False
  125. self.root.protocol("WM_DELETE_WINDOW", self.destroy)
  126. self.root.title("WiPER: FT Simular Test")
  127. self.note1 = Label(self.root, text="Please press 'Start Test' to begin...", fg='blue')
  128. self.start_button = Button(self.root, text="Start Test", command=self.start)
  129. self.test_status_label = Label(self.root, text="NOT STARTED", width=15, bg='gray')
  130. self.deviceInfo_label = Label(self.root, text="Device: ", width=20, bg="#C6EBFF")
  131. # -------------------------- 计时器标签(Python 2兼容) --------------------------
  132. self.timer_label = Label(self.root, text="Elapsed Time: 00:00:00", fg='darkred',
  133. font=tkFont.Font(family="Arial", size=10, weight="bold"))
  134. self.timer_label.pack()
  135. self.notebook_box = ttk.Notebook(self.root, name='notebook')
  136. self.status_frame = Frame(self.root)
  137. self.logFrame = Frame(self.root)
  138. self.parametersFrame = Frame(self.root)
  139. self.logText = Text(self.logFrame)
  140. self.logScrollbar = Scrollbar(self.logFrame)
  141. # 创建带滚动的Status面板(Python 2兼容)
  142. self.canvas = Canvas(self.status_frame, bg="white")
  143. self.scrollbar_y = Scrollbar(self.status_frame, command=self.canvas.yview)
  144. self.lblframe = Frame(self.canvas)
  145. self.canvas_window = self.canvas.create_window((0, 0), window=self.lblframe, anchor="nw")
  146. self.canvas.configure(yscrollcommand=self.scrollbar_y.set)
  147. # 绑定事件(Python 2兼容)
  148. def on_canvas_configure(event):
  149. self.canvas.itemconfig(self.canvas_window, width=event.width)
  150. self.canvas.configure(scrollregion=self.canvas.bbox("all"))
  151. def on_frame_configure(event):
  152. self.canvas.configure(scrollregion=self.canvas.bbox("all"))
  153. self.canvas.bind("<Configure>", on_canvas_configure)
  154. self.lblframe.bind("<Configure>", on_frame_configure)
  155. # 鼠标滚轮支持(Python 2兼容)
  156. def on_mouse_wheel(event):
  157. if event.delta > 0:
  158. self.canvas.yview_scroll(-1, "pages")
  159. else:
  160. self.canvas.yview_scroll(1, "pages")
  161. if sys.platform == 'darwin':
  162. self.canvas.bind('<MouseWheel>', on_mouse_wheel)
  163. else:
  164. self.canvas.bind('<MouseWheel>', on_mouse_wheel)
  165. self.canvas.bind('<Button-4>', lambda e: self.canvas.yview_scroll(-1, "units"))
  166. self.canvas.bind('<Button-5>', lambda e: self.canvas.yview_scroll(1, "units"))
  167. self.canvas.bind("<Enter>", lambda e: self.canvas.focus_set())
  168. # 布局Canvas和Scrollbar
  169. self.canvas.pack(side=LEFT, fill=BOTH, expand=True)
  170. self.scrollbar_y.pack(side=RIGHT, fill=Y)
  171. # 创建步骤列表(Python 2兼容)
  172. self.index_list = []
  173. self.step_buttons = []
  174. self.step_name_list = []
  175. self.step_data_list = []
  176. self.step_status_list = []
  177. # Status面板标题行
  178. title_1 = Label(self.lblframe, text="Index", anchor='center', width=5, font=small_font)
  179. title_1.grid(row=0, column=0, sticky=W, padx=2, pady=1)
  180. title_2 = Label(self.lblframe, text="Run Steps", anchor='center', width=12, font=small_font)
  181. title_2.grid(row=0, column=1, sticky=W, padx=2, pady=1)
  182. title_3 = Label(self.lblframe, text="TestName", anchor='center', width=50, font=small_font)
  183. title_3.grid(row=0, column=2, sticky=W, padx=2, pady=1)
  184. title_4 = Label(self.lblframe, text="TestData", anchor='center', width=50, font=small_font)
  185. title_4.grid(row=0, column=3, sticky=W, padx=2, pady=1)
  186. title_5 = Label(self.lblframe, text="TestStatus", anchor='center', width=20, font=small_font)
  187. title_5.grid(row=0, column=4, sticky=W, padx=2, pady=1)
  188. for index, desc in enumerate(self.context.testplan_list):
  189. # Python 2格式化字符串
  190. index_lable = Label(self.lblframe, text=str(index), anchor='center', width=5, font=small_font)
  191. self.index_list.append(index_lable)
  192. btn = Button(self.lblframe, text="Run Step %d" % index,
  193. command=lambda num=index: self.run_single_step(num),
  194. width=12, bg='lightblue', font=small_font)
  195. self.step_buttons.append(btn)
  196. item_label = Label(self.lblframe, text=desc.get("TestName", ""), anchor='w', bg='#CCCCCC', width=50,
  197. font=small_font)
  198. self.step_name_list.append(item_label)
  199. data_label = Label(self.lblframe, text="", anchor='w', bg='#CCCCCC', width=50, font=small_font)
  200. self.step_data_list.append(data_label)
  201. status_label = Label(self.lblframe, text="NOT STARTED", bg='#CCCCCC', width=20, font=small_font)
  202. self.step_status_list.append(status_label)
  203. self.my_io_process = IO_process(self.context.log_file)
  204. self.readLog1()
  205. self.set_param_frame()
  206. # -------------------------- 计时器更新(纯Python 2语法) --------------------------
  207. def update_timer(self):
  208. if self.start_time and not self.test_finished:
  209. # Python 2整数运算
  210. elapsed_seconds = int(time.time() - self.start_time)
  211. hours = elapsed_seconds // 3600
  212. minutes = (elapsed_seconds % 3600) // 60
  213. seconds = elapsed_seconds % 60
  214. # Python 2格式化(%操作符)
  215. timer_text = "Elapsed Time: %02d:%02d:%02d" % (hours, minutes, seconds)
  216. self.timer_label.config(text=timer_text)
  217. # 1秒后再次更新
  218. self.timer_id = self.root.after(1000, self.update_timer)
  219. # -------------------------- 停止计时器(Python 2兼容) --------------------------
  220. def stop_timer(self):
  221. """停止计时器"""
  222. if self.timer_id:
  223. self.root.after_cancel(self.timer_id)
  224. self.timer_id = None
  225. self.test_finished = True
  226. if self.start_time:
  227. total_seconds = int(time.time() - self.start_time)
  228. hours = total_seconds // 3600
  229. minutes = (total_seconds % 3600) // 60
  230. seconds = total_seconds % 60
  231. final_text = "Total Time: %02d:%02d:%02d (Test Completed)" % (hours, minutes, seconds)
  232. self.timer_label.config(text=final_text, fg='darkgreen')
  233. def destroy(self):
  234. if tkMessageBox.askokcancel("Quit?", "Are you sure you want to quit?"):
  235. self.stop_timer()
  236. self.root.quit()
  237. def run(self):
  238. self.note1.pack()
  239. self.start_button.pack(side=TOP, padx=10, pady=3)
  240. self.test_status_label.pack()
  241. self.deviceInfo_label.pack(fill=X, expand=False, padx=50, pady=20)
  242. self.logScrollbar.config(command=self.logText.yview)
  243. self.logText.config(yscrollcommand=self.logScrollbar.set)
  244. self.logText.pack(side=LEFT, fill=Y)
  245. self.logScrollbar.pack(side=RIGHT, fill=Y)
  246. # 布局步骤列表
  247. for index in range(len(self.context.testplan_list)):
  248. self.index_list[index].grid(row=index + 1, column=0, sticky=W, padx=2, pady=0)
  249. self.step_buttons[index].grid(row=index + 1, column=1, sticky=W, padx=2, pady=0)
  250. self.step_name_list[index].grid(row=index + 1, column=2, sticky=W, padx=2, pady=0)
  251. self.step_data_list[index].grid(row=index + 1, column=3, sticky=W, padx=2, pady=0)
  252. self.step_status_list[index].grid(row=index + 1, column=4, sticky=W, padx=2, pady=0)
  253. self.notebook_box.add(self.status_frame, text='Status')
  254. self.notebook_box.add(self.logFrame, text='Log')
  255. self.notebook_box.add(self.parametersFrame, text='Parameters')
  256. self.notebook_box.enable_traversal()
  257. self.notebook_box.pack(fill=BOTH, expand="true", padx=5, pady=1)
  258. self.root.mainloop()
  259. def readLog1(self):
  260. if self.context.log1_buffer != "":
  261. self.logText.insert(END, self.context.log1_buffer)
  262. text = self.my_io_process.read_file()
  263. if text != "":
  264. self.logText.insert(END, text)
  265. if self.context.log1_buffer != "" or text != "":
  266. self.logText.yview_pickplace("end")
  267. self.context.log1_buffer = ""
  268. self.root.update()
  269. self.root.after(100, self.readLog1)
  270. def set_param_frame(self):
  271. self.L = []
  272. self.E = []
  273. count = 1
  274. for key, value in self.context.params_dict.items():
  275. lbl = Label(self.parametersFrame, text=key)
  276. enty = Entry(self.parametersFrame, bd=4)
  277. enty.delete(0, END)
  278. enty.insert(0, value)
  279. enty.config(state=DISABLED)
  280. lbl.grid(row=count, sticky=W)
  281. enty.grid(row=count, column=1)
  282. self.L.append(lbl)
  283. self.E.append(enty)
  284. count += 1
  285. def run_single_step(self, step_num):
  286. self.context.print1("\n--------------------single_step---------------------")
  287. self.context.print1("Step %d Started test..." % step_num)
  288. self.context.step_status[step_num] = 'r'
  289. item_info = self.context.testplan_list[step_num]
  290. func_name = item_info.get("Function", "")
  291. arguments = item_info.get("Arguments", "")
  292. func = getattr(test_function, func_name, None) if 'test_function' in sys.modules else None
  293. if not func:
  294. self.context.step_status[step_num] = "f"
  295. self.context.result_msg[step_num] = "%s is not defined" % func_name
  296. self.root.after(0, self.update_step_status)
  297. else:
  298. self.context.step_status[step_num] = 'r'
  299. self.root.after(0, self.update_step_status)
  300. def run_step():
  301. result, result_msg = False, "FAIL"
  302. if arguments:
  303. result, result_msg = func(self.context, arguments)
  304. else:
  305. result, result_msg = func(self.context)
  306. self.context.step_status[step_num] = "p" if result else "f"
  307. self.context.result_msg[step_num] = result_msg
  308. self.context.print1("\n----------------------------------------------------")
  309. self.context.print1(result_msg)
  310. self.root.after(0, self.update_step_status)
  311. thread = threading.Thread(target=run_step)
  312. thread.start()
  313. def update_step_status(self):
  314. total_no_of_steps = len(self.context.step_status)
  315. for index in range(total_no_of_steps):
  316. status = self.context.step_status[index]
  317. if status == 'r':
  318. self.step_status_list[index]["text"] = "RUNNING..."
  319. self.step_status_list[index]["bg"] = 'yellow'
  320. elif status == 'p':
  321. self.step_status_list[index]["text"] = "PASS"
  322. self.step_data_list[index]["text"] = self.context.result_msg[index]
  323. self.step_status_list[index]["bg"] = 'green'
  324. elif status == 'f':
  325. self.step_status_list[index]["text"] = "ERROR!"
  326. self.step_status_list[index]["bg"] = 'red'
  327. self.step_data_list[index]["text"] = self.context.result_msg[index]
  328. self.root.after(200, self.update_step_status)
  329. def start(self):
  330. # 启动计时器
  331. self.start_time = time.time()
  332. self.test_finished = False
  333. self.update_timer()
  334. self.context.step_status = ['x'] + ['ns'] * len(self.context.testplan_list)
  335. self.start_button.config(state=DISABLED)
  336. for button in self.step_buttons:
  337. button.config(state=DISABLED)
  338. for data_lable in self.step_data_list:
  339. data_lable["text"] = ""
  340. self.logText.delete('1.0', 'end')
  341. self.my_io_process.reset_counter()
  342. self.thread_list['thread'] = MasterThread(self.context)
  343. self.thread_list['thread'].start()
  344. self.update_test_status()
  345. def update_test_status(self):
  346. total_no_of_steps = len(self.context.testplan_list)
  347. try:
  348. SrNo = self.context.device_info['SrNo']
  349. string = "Device: " + SrNo
  350. self.deviceInfo_label.config(text=string)
  351. except KeyError:
  352. string = "Device: "
  353. self.deviceInfo_label.config(text=string)
  354. flag = 0
  355. count1 = 0
  356. count2 = 0
  357. count3 = 0
  358. for i in range(total_no_of_steps):
  359. status = self.context.step_status[i]
  360. if status == 'r':
  361. self.step_status_list[i]["text"] = "RUNNING..."
  362. self.step_status_list[i]["bg"] = 'yellow'
  363. self.test_status_label.config(bg='yellow')
  364. self.test_status_label.config(text='RUNNING...')
  365. self.step_data_list[i]["text"] = ""
  366. elif status == 'p':
  367. self.step_status_list[i]["text"] = "PASS"
  368. self.step_status_list[i]["bg"] = 'green'
  369. self.step_data_list[i]["text"] = self.context.result_msg[i]
  370. count2 += 1
  371. elif status == 'f':
  372. self.step_status_list[i]["text"] = "ERROR!"
  373. self.step_status_list[i]["bg"] = 'red'
  374. self.test_status_label.config(text='ERROR!')
  375. self.test_status_label.config(bg='red')
  376. self.step_data_list[i]["text"] = self.context.result_msg[i]
  377. count3 += 1
  378. elif status == 'ns':
  379. self.step_status_list[i]["text"] = "NOT STARTED"
  380. self.step_status_list[i]["bg"] = 'gray'
  381. count1 += 1
  382. if count1 == total_no_of_steps:
  383. self.test_status_label.config(bg='gray')
  384. self.test_status_label.config(text='NOT STARTED')
  385. if count2 == total_no_of_steps:
  386. self.test_status_label.config(bg='green')
  387. self.test_status_label.config(text='DONE!')
  388. flag = 1
  389. self.stop_timer() # 测试完成停止计时器
  390. if (count3 + count2 + count1) == total_no_of_steps:
  391. self.start_button.config(state=NORMAL)
  392. for button in self.step_buttons:
  393. button.config(state=NORMAL)
  394. if not self.test_finished:
  395. self.stop_timer()
  396. if flag == 0:
  397. self.root.after(100, self.update_test_status)
  398. else:
  399. self.context.device_info['SrNo'] = ""
  400. return 0
  401. if __name__ == "__main__":
  402. if not os.path.exists("./Logs"):
  403. os.makedirs("./Logs")
  404. g = GuiThread()
  405. g.run()