#!/usr/bin/python # _*_ coding: utf-8 _*_ from Tkinter import * # python2 import ttk import tkMessageBox import tkFont import sys import os import subprocess import time import threading import csv from datetime import datetime import test_function class TestContext: def __init__(self): self.script_ver = 'C181-C-C7-v2.4' self.log_file = './Logs/temp.txt' self.log1_buffer = "" self.tp_path = "./testplan.csv" self.device_info = {} self.params_dict = { 'path_loss': '25', 'wave_file': './11b_waveforms/wave11rc_1000.mod', 'per_limit': '92', 'SA_RSSI': '-80', 'BTOn_RSSI': '-30' } try: f = open(self.tp_path, 'r') reader = csv.DictReader(f) self.testplan_list = [] for row in reader: if row.get("Disable", "") != "Y": self.testplan_list.append(row) except IOError as e: print(e) finally: f.close() self.step_status = ['ns'] * len(self.testplan_list) self.result_msg = [''] * len(self.testplan_list) def print1(self, string, newline=0): if newline == 0: self.log1_buffer += "\n" + str(string) elif newline == 1: self.log1_buffer += "\n" + str(string) + "\n" def write_log_to_file(self, content): curr_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) log_dir = "./Logs/" if not os.path.exists(log_dir): os.makedirs(log_dir) log_file = "log_" + curr_time + ".txt" log_file_location = log_dir + log_file # Python 2文件写入(指定编码避免中文乱码) file = open(log_file_location, 'a') file.write(str(content) + '\n') file.close() class IO_process: def __init__(self, log_file_path): self.log_file = log_file_path self.line_count = 0 def read_file(self): try: f = open(self.log_file, 'r') flag = 0 line = "" for i, curr_line in enumerate(f): if i == self.line_count: self.line_count += 1 flag = 1 line = curr_line break f.close() if flag == 1: return line.rstrip('\r') else: return "" except Exception as e: return "" def empty_file(self): f = open(self.log_file, 'w') f.close() def reset_counter(self): self.line_count = 0 class MasterThread(threading.Thread): def __init__(self, context): threading.Thread.__init__(self) self.context = context def run(self): self.context.print1("X86 C180####################################################", 1) self.context.print1(self.context.script_ver) for step_num, test_item in enumerate(self.context.testplan_list): self.context.print1("\n----------------------------------------------------") self.context.print1("Step " + str(step_num) + " Started test...") func_name = test_item.get("Function", "") arguments = test_item.get("Arguments", "") func = getattr(test_function, func_name, None) if 'test_function' in sys.modules else None if not func: self.context.step_status[step_num] = "f" self.context.result_msg[step_num] = "%s is not defined" % func_name # Python 2格式化 continue self.context.step_status[step_num] = 'r' # running if arguments: result, result_msg = func(self.context, arguments) else: result, result_msg = func(self.context) self.context.step_status[step_num] = "p" if result else "f" self.context.result_msg[step_num] = result_msg self.context.print1("\n----------------------------------------------------") self.context.print1(result_msg) self.context.print1("\n----------------------------------------------------") self.context.print1("Test concluded!") class GuiThread(object): def __init__(self): self.context = TestContext() self.thread_list = {} self.counter = 0 self.root = Tk() self.root.geometry("1800x1200") small_font = tkFont.Font(family="DejaVu Sans", size=8) self.start_time = None self.timer_label = None self.timer_id = None self.test_finished = False self.root.protocol("WM_DELETE_WINDOW", self.destroy) self.root.title("WiPER: FT Simular Test") self.note1 = Label(self.root, text="Please press 'Start Test' to begin...", fg='blue') self.start_button = Button(self.root, text="Start Test", command=self.start) self.test_status_label = Label(self.root, text="NOT STARTED", width=15, bg='gray') self.deviceInfo_label = Label(self.root, text="Device: ", width=20, bg="#C6EBFF") # -------------------------- 计时器标签(Python 2兼容) -------------------------- self.timer_label = Label(self.root, text="Elapsed Time: 00:00:00", fg='darkred', font=tkFont.Font(family="Arial", size=10, weight="bold")) self.timer_label.pack() self.notebook_box = ttk.Notebook(self.root, name='notebook') self.status_frame = Frame(self.root) self.logFrame = Frame(self.root) self.parametersFrame = Frame(self.root) self.logText = Text(self.logFrame) self.logScrollbar = Scrollbar(self.logFrame) # 创建带滚动的Status面板(Python 2兼容) self.canvas = Canvas(self.status_frame, bg="white") self.scrollbar_y = Scrollbar(self.status_frame, command=self.canvas.yview) self.lblframe = Frame(self.canvas) self.canvas_window = self.canvas.create_window((0, 0), window=self.lblframe, anchor="nw") self.canvas.configure(yscrollcommand=self.scrollbar_y.set) # 绑定事件(Python 2兼容) def on_canvas_configure(event): self.canvas.itemconfig(self.canvas_window, width=event.width) self.canvas.configure(scrollregion=self.canvas.bbox("all")) def on_frame_configure(event): self.canvas.configure(scrollregion=self.canvas.bbox("all")) self.canvas.bind("", on_canvas_configure) self.lblframe.bind("", on_frame_configure) # 鼠标滚轮支持(Python 2兼容) def on_mouse_wheel(event): if event.delta > 0: self.canvas.yview_scroll(-1, "pages") else: self.canvas.yview_scroll(1, "pages") if sys.platform == 'darwin': self.canvas.bind('', on_mouse_wheel) else: self.canvas.bind('', on_mouse_wheel) self.canvas.bind('', lambda e: self.canvas.yview_scroll(-1, "units")) self.canvas.bind('', lambda e: self.canvas.yview_scroll(1, "units")) self.canvas.bind("", lambda e: self.canvas.focus_set()) # 布局Canvas和Scrollbar self.canvas.pack(side=LEFT, fill=BOTH, expand=True) self.scrollbar_y.pack(side=RIGHT, fill=Y) # 创建步骤列表(Python 2兼容) self.index_list = [] self.step_buttons = [] self.step_name_list = [] self.step_data_list = [] self.step_status_list = [] # Status面板标题行 title_1 = Label(self.lblframe, text="Index", anchor='center', width=5, font=small_font) title_1.grid(row=0, column=0, sticky=W, padx=2, pady=1) title_2 = Label(self.lblframe, text="Run Steps", anchor='center', width=12, font=small_font) title_2.grid(row=0, column=1, sticky=W, padx=2, pady=1) title_3 = Label(self.lblframe, text="TestName", anchor='center', width=50, font=small_font) title_3.grid(row=0, column=2, sticky=W, padx=2, pady=1) title_4 = Label(self.lblframe, text="TestData", anchor='center', width=50, font=small_font) title_4.grid(row=0, column=3, sticky=W, padx=2, pady=1) title_5 = Label(self.lblframe, text="TestStatus", anchor='center', width=20, font=small_font) title_5.grid(row=0, column=4, sticky=W, padx=2, pady=1) for index, desc in enumerate(self.context.testplan_list): # Python 2格式化字符串 index_lable = Label(self.lblframe, text=str(index), anchor='center', width=5, font=small_font) self.index_list.append(index_lable) btn = Button(self.lblframe, text="Run Step %d" % index, command=lambda num=index: self.run_single_step(num), width=12, bg='lightblue', font=small_font) self.step_buttons.append(btn) item_label = Label(self.lblframe, text=desc.get("TestName", ""), anchor='w', bg='#CCCCCC', width=50, font=small_font) self.step_name_list.append(item_label) data_label = Label(self.lblframe, text="", anchor='w', bg='#CCCCCC', width=50, font=small_font) self.step_data_list.append(data_label) status_label = Label(self.lblframe, text="NOT STARTED", bg='#CCCCCC', width=20, font=small_font) self.step_status_list.append(status_label) self.my_io_process = IO_process(self.context.log_file) self.readLog1() self.set_param_frame() # -------------------------- 计时器更新(纯Python 2语法) -------------------------- def update_timer(self): if self.start_time and not self.test_finished: # Python 2整数运算 elapsed_seconds = int(time.time() - self.start_time) hours = elapsed_seconds // 3600 minutes = (elapsed_seconds % 3600) // 60 seconds = elapsed_seconds % 60 # Python 2格式化(%操作符) timer_text = "Elapsed Time: %02d:%02d:%02d" % (hours, minutes, seconds) self.timer_label.config(text=timer_text) # 1秒后再次更新 self.timer_id = self.root.after(1000, self.update_timer) # -------------------------- 停止计时器(Python 2兼容) -------------------------- def stop_timer(self): """停止计时器""" if self.timer_id: self.root.after_cancel(self.timer_id) self.timer_id = None self.test_finished = True if self.start_time: total_seconds = int(time.time() - self.start_time) hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 seconds = total_seconds % 60 final_text = "Total Time: %02d:%02d:%02d (Test Completed)" % (hours, minutes, seconds) self.timer_label.config(text=final_text, fg='darkgreen') def destroy(self): if tkMessageBox.askokcancel("Quit?", "Are you sure you want to quit?"): self.stop_timer() self.root.quit() def run(self): self.note1.pack() self.start_button.pack(side=TOP, padx=10, pady=3) self.test_status_label.pack() self.deviceInfo_label.pack(fill=X, expand=False, padx=50, pady=20) self.logScrollbar.config(command=self.logText.yview) self.logText.config(yscrollcommand=self.logScrollbar.set) self.logText.pack(side=LEFT, fill=Y) self.logScrollbar.pack(side=RIGHT, fill=Y) # 布局步骤列表 for index in range(len(self.context.testplan_list)): self.index_list[index].grid(row=index + 1, column=0, sticky=W, padx=2, pady=0) self.step_buttons[index].grid(row=index + 1, column=1, sticky=W, padx=2, pady=0) self.step_name_list[index].grid(row=index + 1, column=2, sticky=W, padx=2, pady=0) self.step_data_list[index].grid(row=index + 1, column=3, sticky=W, padx=2, pady=0) self.step_status_list[index].grid(row=index + 1, column=4, sticky=W, padx=2, pady=0) self.notebook_box.add(self.status_frame, text='Status') self.notebook_box.add(self.logFrame, text='Log') self.notebook_box.add(self.parametersFrame, text='Parameters') self.notebook_box.enable_traversal() self.notebook_box.pack(fill=BOTH, expand="true", padx=5, pady=1) self.root.mainloop() def readLog1(self): if self.context.log1_buffer != "": self.logText.insert(END, self.context.log1_buffer) text = self.my_io_process.read_file() if text != "": self.logText.insert(END, text) if self.context.log1_buffer != "" or text != "": self.logText.yview_pickplace("end") self.context.log1_buffer = "" self.root.update() self.root.after(100, self.readLog1) def set_param_frame(self): self.L = [] self.E = [] count = 1 for key, value in self.context.params_dict.items(): lbl = Label(self.parametersFrame, text=key) enty = Entry(self.parametersFrame, bd=4) enty.delete(0, END) enty.insert(0, value) enty.config(state=DISABLED) lbl.grid(row=count, sticky=W) enty.grid(row=count, column=1) self.L.append(lbl) self.E.append(enty) count += 1 def run_single_step(self, step_num): self.context.print1("\n--------------------single_step---------------------") self.context.print1("Step %d Started test..." % step_num) self.context.step_status[step_num] = 'r' item_info = self.context.testplan_list[step_num] func_name = item_info.get("Function", "") arguments = item_info.get("Arguments", "") func = getattr(test_function, func_name, None) if 'test_function' in sys.modules else None if not func: self.context.step_status[step_num] = "f" self.context.result_msg[step_num] = "%s is not defined" % func_name self.root.after(0, self.update_step_status) else: self.context.step_status[step_num] = 'r' self.root.after(0, self.update_step_status) def run_step(): result, result_msg = False, "FAIL" if arguments: result, result_msg = func(self.context, arguments) else: result, result_msg = func(self.context) self.context.step_status[step_num] = "p" if result else "f" self.context.result_msg[step_num] = result_msg self.context.print1("\n----------------------------------------------------") self.context.print1(result_msg) self.root.after(0, self.update_step_status) thread = threading.Thread(target=run_step) thread.start() def update_step_status(self): total_no_of_steps = len(self.context.step_status) for index in range(total_no_of_steps): status = self.context.step_status[index] if status == 'r': self.step_status_list[index]["text"] = "RUNNING..." self.step_status_list[index]["bg"] = 'yellow' elif status == 'p': self.step_status_list[index]["text"] = "PASS" self.step_data_list[index]["text"] = self.context.result_msg[index] self.step_status_list[index]["bg"] = 'green' elif status == 'f': self.step_status_list[index]["text"] = "ERROR!" self.step_status_list[index]["bg"] = 'red' self.step_data_list[index]["text"] = self.context.result_msg[index] self.root.after(200, self.update_step_status) def start(self): # 启动计时器 self.start_time = time.time() self.test_finished = False self.update_timer() self.context.step_status = ['x'] + ['ns'] * len(self.context.testplan_list) self.start_button.config(state=DISABLED) for button in self.step_buttons: button.config(state=DISABLED) for data_lable in self.step_data_list: data_lable["text"] = "" self.logText.delete('1.0', 'end') self.my_io_process.reset_counter() self.thread_list['thread'] = MasterThread(self.context) self.thread_list['thread'].start() self.update_test_status() def update_test_status(self): total_no_of_steps = len(self.context.testplan_list) try: SrNo = self.context.device_info['SrNo'] string = "Device: " + SrNo self.deviceInfo_label.config(text=string) except KeyError: string = "Device: " self.deviceInfo_label.config(text=string) flag = 0 count1 = 0 count2 = 0 count3 = 0 for i in range(total_no_of_steps): status = self.context.step_status[i] if status == 'r': self.step_status_list[i]["text"] = "RUNNING..." self.step_status_list[i]["bg"] = 'yellow' self.test_status_label.config(bg='yellow') self.test_status_label.config(text='RUNNING...') self.step_data_list[i]["text"] = "" elif status == 'p': self.step_status_list[i]["text"] = "PASS" self.step_status_list[i]["bg"] = 'green' self.step_data_list[i]["text"] = self.context.result_msg[i] count2 += 1 elif status == 'f': self.step_status_list[i]["text"] = "ERROR!" self.step_status_list[i]["bg"] = 'red' self.test_status_label.config(text='ERROR!') self.test_status_label.config(bg='red') self.step_data_list[i]["text"] = self.context.result_msg[i] count3 += 1 elif status == 'ns': self.step_status_list[i]["text"] = "NOT STARTED" self.step_status_list[i]["bg"] = 'gray' count1 += 1 if count1 == total_no_of_steps: self.test_status_label.config(bg='gray') self.test_status_label.config(text='NOT STARTED') if count2 == total_no_of_steps: self.test_status_label.config(bg='green') self.test_status_label.config(text='DONE!') flag = 1 self.stop_timer() # 测试完成停止计时器 if (count3 + count2 + count1) == total_no_of_steps: self.start_button.config(state=NORMAL) for button in self.step_buttons: button.config(state=NORMAL) if not self.test_finished: self.stop_timer() if flag == 0: self.root.after(100, self.update_test_status) else: self.context.device_info['SrNo'] = "" return 0 if __name__ == "__main__": if not os.path.exists("./Logs"): os.makedirs("./Logs") g = GuiThread() g.run()