| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- #!/usr/bin/python
- # _*_ coding: utf-8 _*_
- try:
- from Tkinter import * # python2
- import ttk
- import tkMessageBox
- except Exception as e:
- from tkinter import * # python3
- from tkinter import ttk
- from tkinter import messagebox as tkMessageBox
- import sys
- import os
- import subprocess
- import time
- import threading
- import csv
- from datetime import datetime
- import test_function
- class TestContext:
- def __init__(self):
- self.script_ver = 'C180-C-C7-v1.2'
- self.log_file = './Logs/temp.txt'
- self.log1_buffer = "" # 日志缓冲区
- self.tp_path = "/Users/zhang/Documents/Projects/C180/pythonTest-V2.2-C180/python_test/testplan.csv"
- self.device_info = {}
- self.params_dict = {
- 'path_loss': '25',
- 'wave_file': './11b_waveforms/wave11rc_1000.mod',
- 'per_limit': '92',
- 'SA_RSSI': '-80',
- 'BTOn_RSSI': '-30'
- }
- # 加载Testplan
- f = open(self.tp_path, 'r')
- reader = csv.DictReader(f)
- self.testplan_list = []
- # for row in reader:
- # if row["Disable"] ~= "Y":
- # self.testplan_list.append(row)
- self.testplan_list = [row for row in reader if row["Disable"] != "Y"]
- # disable = row["Disable"]
- # testName = row["TestName"]
- # function = row["function"]
- # Parameters = row["Parameters"]
- self.step_status = ['ns'] * len(self.testplan_list)
- self.result_msg = [''] * len(self.testplan_list)
- # 可选:为 log1_buffer 添加锁(如果担心线程安全)
- # self.buffer_lock = threading.Lock()
- def print1(self, string, newline=0):
- if newline == 0:
- self.log1_buffer += "\n" + str(string)
- elif newline == 1:
- self.log1_buffer += "\n" + str(string) + "\n"
- def write_log_to_file(self, content):
- curr_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
- log_dir = "./Logs/"
- log_file = "log_" + curr_time + ".txt"
- log_file_location = log_dir + log_file
- file = open(log_file_location, 'a') # open file to append
- file.write(str(content) + '\n')
- file.close()
- class IO_process:
- def __init__(self, log_file_path):
- # log_file_path is the absolute path of the log file
- self.log_file = log_file_path
- self.line_count = 0
- def read_file(self):
- try:
- f = open(self.log_file, 'r') # open file to read
- flag = 0
- for i, line in enumerate(f):
- if i == self.line_count: # if reached the correct line
- self.line_count += 1
- flag = 1
- # print self.line_count
- break
- f.close()
- if flag == 1:
- return line.rstrip('\r') # get rid of the newline character
- else:
- return ""
- except:
- return ""
- def empty_file(self):
- f = open(self.log_file, 'w')
- f.close()
- def reset_counter(self):
- self.line_count = 0
- class MasterThread(threading.Thread):
- def __init__(self, context):
- super(MasterThread, self).__init__()
- self.context = context
- def run(self):
- self.context.print1("X86 C180####################################################", 1)
- self.context.print1(self.context.script_ver)
- for step_num, test_item in enumerate(self.context.testplan_list):
- self.context.print1("\n----------------------------------------------------")
- self.context.print1("Step " + str(step_num) + " Started test...")
- func_name = test_item["Function"]
- arguments = test_item["Arguments"]
- func = getattr(test_function, func_name, None)
- if not func:
- self.context.step_status[step_num] = "f"
- self.context.result_msg[step_num] = "{} is not defined".format(func_name)
- continue
- self.context.step_status[step_num] = 'r' # running
- if arguments:
- result, result_msg = func(self.context, arguments)
- else:
- result, result_msg = func(self.context)
- self.context.step_status[step_num] = result and "p" or "f"
- self.context.result_msg[step_num] = result_msg
- self.context.print1("\n----------------------------------------------------")
- self.context.print1(result_msg)
- self.context.print1("\n----------------------------------------------------")
- self.context.print1("Test concluded!")
- class GuiThread(object):
- def __init__(self):
- self.context = TestContext()
- self.thread_list = {}
- self.counter = 0
- self.root = Tk()
- self.root.geometry("1800x1200")
- self.root.protocol("WM_DELETE_WINDOW", self.destroy) # when user closes the window
- self.root.title("WiPER: FT Simular Test")
- self.note1 = Label(self.root, text="Please press 'Start Test' to begin...", fg='blue')
- self.start_button = Button(self.root, text="Start Test", command=self.start)
- self.test_status_label = Label(self.root, text="NOT STARTED", width=15, bg='gray')
- self.deviceInfo_label = Label(self.root, text="Device: ", width=20, bg="#C6EBFF")
- self.notebook_box = ttk.Notebook(self.root, name='notebook')
- self.status_frame = Frame(self.root)
- self.logFrame = Frame(self.root)
- self.parametersFrame = Frame(self.root)
- self.logText = Text(self.logFrame)
- self.logScrollbar = Scrollbar(self.logFrame)
- # self.logScrollbar.config(command=self.logText.yview)
- # self.logText.config(yscrollcommand=self.logScrollbar.set)
- # -------------------------- 核心修改:添加滚动条 --------------------------
- # 2. 创建Canvas和Scrollbar
- self.canvas = Canvas(self.status_frame, bg="white")
- self.scrollbar_y = Scrollbar(self.status_frame, command=self.canvas.yview)
- # 3. 创建原来的lblframe作为Canvas的内部窗口
- self.lblframe = Frame(self.canvas)
- # 4. 将内部Frame绑定到Canvas
- self.canvas_window = self.canvas.create_window((0, 0), window=self.lblframe, anchor="nw")
- # 5. 配置Canvas的滚动命令
- self.canvas.configure(yscrollcommand=self.scrollbar_y.set)
- # 6. 绑定事件:更新Canvas滚动区域和内部Frame宽度
- def on_canvas_configure(event):
- # 让内部Frame宽度和Canvas一致
- self.canvas.itemconfig(self.canvas_window, width=event.width)
- # 更新滚动区域
- self.canvas.configure(scrollregion=self.canvas.bbox("all"))
- def on_frame_configure(event):
- # 当内部Frame尺寸变化时更新滚动区域
- # print("event>>", event, event.height)
- self.canvas.configure(scrollregion=self.canvas.bbox("all"))
- # self.canvas.bind("<Configure>", on_canvas_configure)
- self.lblframe.bind("<Configure>", on_frame_configure)
- # -------------------------- 新增:鼠标滚轮支持 --------------------------
- # 定义滚轮事件处理函数(兼容Windows/Linux和Mac)
- def on_mouse_wheel(event):
- # Windows/Linux: event.delta 是120的倍数;Mac: event.delta 是-1/1的倍数
- if event.delta > 0:
- self.canvas.yview_scroll(-1, "pages") # 向上滚动
- else:
- self.canvas.yview_scroll(1, "pages") # 向下滚动
- # 绑定滚轮事件(兼容不同系统)
- if sys.platform == 'darwin': # Mac系统
- self.canvas.bind('<MouseWheel>', on_mouse_wheel)
- else: # Windows/Linux系统
- self.canvas.bind('<MouseWheel>', on_mouse_wheel)
- self.canvas.bind('<Button-4>', lambda e: self.canvas.yview_scroll(-1, "units"))
- self.canvas.bind('<Button-5>', lambda e: self.canvas.yview_scroll(1, "units"))
- # 确保Canvas能接收鼠标事件(解决焦点问题)
- self.canvas.bind("<Enter>", lambda e: self.canvas.focus_set())
- # ------------------------------------------------------------------------
- # 7. 布局Canvas和Scrollbar
- self.canvas.pack(side=LEFT, fill=BOTH, expand=True)
- self.scrollbar_y.pack(side=RIGHT, fill=Y)
- # ------------------------------------------------------------------------
- # 创建按钮和标签列表
- self.index_list = []
- self.step_buttons = []
- self.step_name_list = []
- self.step_data_list = []
- self.step_status_list = []
- title_1 = Label(self.lblframe, text="Index", anchor='center', width=5)
- title_1.grid(row=0, column=0, sticky=W, padx=5, pady=2)
- title_2 = Label(self.lblframe, text="Run Steps", anchor='center', width=12)
- title_2.grid(row=0, column=1, sticky=W, padx=5, pady=2)
- # title_2 = Button(self.lblframe, text="Run Steps", width=12, bg='lightblue')
- # title_2.config(state=DISABLED)
- # title_2.grid(row=0, column=1, padx=5, pady=2)
- title_3 = Label(self.lblframe, text="TestName", anchor='center', width=30)
- title_3.grid(row=0, column=2, sticky=W, padx=5, pady=2)
- title_4 = Label(self.lblframe, text="TestData", anchor='center', width=20)
- title_4.grid(row=0, column=3, sticky=W, padx=5, pady=2)
- title_5 = Label(self.lblframe, text="TestStatus", anchor='center', width=20)
- title_5.grid(row=0, column=4, sticky=W, padx=5, pady=2)
- # 创建按钮和标签
- for index, desc in enumerate(self.context.testplan_list):
- # 创建序列
- index_lable = Label(self.lblframe, text=str(index), anchor='center', width=5)
- self.index_list.append(index_lable)
- # 创建按钮
- btn = Button(self.lblframe, text=f"Run Step {index}",
- command=lambda num=index: self.run_single_step(num),
- width=12, bg='lightblue')
- self.step_buttons.append(btn)
- # 创建步骤描述标签
- item_label = Label(self.lblframe, text=desc["TestName"], anchor='w', bg='#CCCCCC', width=30)
- self.step_name_list.append(item_label)
- # 创数据标签
- data_label = Label(self.lblframe, text="", anchor='w', bg='#CCCCCC', width=20)
- self.step_data_list.append(data_label)
- # 创建状态标签
- status_label = Label(self.lblframe, text="NOT STARTED", bg='#CCCCCC', width=20)
- self.step_status_list.append(status_label)
- self.my_io_process = IO_process(self.context.log_file)
- self.readLog1()
- self.set_param_frame()
- def destroy(self):
- if tkMessageBox.askokcancel("Quit?", "Are you sure you want to quit?"):
- self.root.quit()
- def run(self):
- self.note1.pack()
- self.start_button.pack(side=TOP, padx=10, pady=3)
- self.test_status_label.pack()
- # 优化:单行文本标签仅水平填充,不扩展额外空间
- self.deviceInfo_label.pack(fill=X, expand=False, padx=50, pady=20)
- self.logScrollbar.config(command=self.logText.yview)
- self.logText.config(yscrollcommand=self.logScrollbar.set)
- self.logText.pack(side=LEFT, fill=Y)
- self.logScrollbar.pack(side=RIGHT, fill=Y)
- # 布局步骤按钮和标签(放在带滚动的内部Frame中)
- for index in range(len(self.context.testplan_list)):
- self.index_list[index].grid(row=index + 1, column=0, sticky=W)
- self.step_buttons[index].grid(row=index + 1, column=1, sticky=W, padx=5, pady=2)
- self.step_name_list[index].grid(row=index + 1, column=2, sticky=W, padx=5, pady=2)
- self.step_data_list[index].grid(row=index + 1, column=3, sticky=W, padx=5, pady=2)
- self.step_status_list[index].grid(row=index + 1, column=4, sticky=W, padx=5, pady=2)
- self.notebook_box.add(self.status_frame, text='Status')
- self.notebook_box.add(self.logFrame, text='Log')
- self.notebook_box.add(self.parametersFrame, text='Parameters')
- self.notebook_box.enable_traversal()
- self.notebook_box.pack(fill=BOTH, expand="true", padx=5, pady=1)
- self.root.mainloop()
- def readLog1(self):
- # This function periodically reads and empties the log buffer
- if self.context.log1_buffer != "":
- self.logText.insert(END, self.context.log1_buffer)
- text = self.my_io_process.read_file()
- if text != "":
- self.logText.insert(END, text)
- if self.context.log1_buffer != "" or text != "": # if log buffer was populated
- self.logText.yview_pickplace("end") # move the scrollbar to the bottom
- self.context.log1_buffer = "" # empty the log buffer
- self.root.update() # update the screen
- self.root.after(100, self.readLog1)
- def set_param_frame(self):
- self.L = []
- self.E = []
- count = 1
- for key, value in self.context.params_dict.items():
- lbl = Label(self.parametersFrame, text=key)
- enty = Entry(self.parametersFrame, bd=4)
- enty.delete(0, END)
- enty.insert(0, value)
- enty.config(state=DISABLED)
- lbl.grid(row=count, sticky=W)
- enty.grid(row=count, column=1)
- self.L.append(lbl)
- self.E.append(enty)
- count = count + 1
- def run_single_step(self, step_num):
- """运行单个测试步骤"""
- self.context.print1("\n--------------------single_step---------------------")
- self.context.print1("Step " + str(step_num) + " Started test...")
- # 重置该步骤的状态
- self.context.step_status[step_num] = 'r'
- item_info = self.context.testplan_list[step_num]
- func_name = item_info["Function"]
- arguments = item_info["Arguments"]
- func = getattr(test_function, func_name, None)
- if not func:
- self.context.step_status[step_num] = "f"
- self.context.result_msg[step_num] = "{} is not defined".format(func_name)
- self.root.after(0, self.update_step_status)
- else:
- self.context.step_status[step_num] = 'r' # running
- self.root.after(0, self.update_step_status)
- result, result_msg = False, "FAIL"
- def run_step():
- if arguments:
- result, result_msg = func(self.context, arguments)
- else:
- result, result_msg = func(self.context)
- self.context.step_status[step_num] = result and "p" or "f"
- self.context.result_msg[step_num] = result_msg
- self.context.print1("\n----------------------------------------------------")
- self.context.print1(result_msg)
- # 更新UI状态
- thread = threading.Thread(target=run_step)
- thread.start()
- # self.root.after(0, self.update_step_status)
- def update_step_status(self):
- """更新步骤状态显示"""
- total_no_of_steps = len(self.context.step_status)
- for index in range(total_no_of_steps):
- if self.context.step_status[index] == 'r':
- self.step_status_list[index]["text"] = "RUNNING..."
- self.step_status_list[index]["bg"] = 'yellow'
- elif self.context.step_status[index] == 'p':
- self.step_status_list[index]["text"] = "PASS"
- self.step_data_list[index]["text"] = self.context.result_msg[index]
- self.step_status_list[index]["bg"] = 'green'
- elif self.context.step_status[index] == 'f':
- self.step_status_list[index]["text"] = "ERROR!"
- self.step_status_list[index]["bg"] = 'red'
- self.step_data_list[index]["text"] = self.context.result_msg[index]
- # 继续监控状态变化
- self.root.after(200, self.update_step_status)
- def start(self):
- self.context.step_status = ['x'] + ['ns'] * len(self.context.testplan_list) # 索引从1开始
- self.start_button.config(state=DISABLED) # disable the start button
- for button in self.step_buttons:
- button.config(state=DISABLED)
- for data_lable in self.step_data_list:
- data_lable["text"] = ""
- self.logText.delete('1.0', 'end') # delete the log
- self.my_io_process.reset_counter()
- self.thread_list['thread'] = MasterThread(self.context)
- self.thread_list['thread'].start()
- self.update_test_status()
- def update_test_status(self):
- # manage the frame that displays the step-by-step status
- # manage the top-level status Label
- total_no_of_steps = len(self.context.testplan_list)
- try:
- SrNo = self.context.device_info['SrNo']
- string = "Device: " + SrNo
- self.deviceInfo_label.config(text=string)
- except:
- string = "Device: "
- self.deviceInfo_label.config(text=string)
- flag = 0
- count1 = 0
- count2 = 0
- count3 = 0
- for i in range(total_no_of_steps):
- if self.context.step_status[i] == 'r': # if running
- self.step_status_list[i]["text"] = "RUNNING..."
- self.step_status_list[i]["bg"] = 'yellow'
- self.test_status_label.config(bg='yellow')
- self.test_status_label.config(text='RUNNING...')
- self.step_data_list[i]["text"] = ""
- elif self.context.step_status[i] == 'p': # if passed
- self.step_status_list[i]["text"] = "PASS"
- self.step_status_list[i]["bg"] = 'green'
- self.step_data_list[i]["text"] = self.context.result_msg[i]
- count2 = count2 + 1
- elif self.context.step_status[i] == 'f': # if failed
- self.step_status_list[i]["text"] = "ERROR!"
- self.step_status_list[i]["bg"] = 'red'
- self.test_status_label.config(text='ERROR!')
- self.test_status_label.config(bg='red')
- self.step_data_list[i]["text"] = self.context.result_msg[i]
- # flag = 1
- count3 = count3 + 1
- elif self.context.step_status[i] == 'ns': # if not started
- self.step_status_list[i]["text"] = "NOT STARTED"
- self.step_status_list[i]["bg"] = 'gray'
- count1 = count1 + 1
- if count1 == total_no_of_steps: # if none of the steps have started
- self.test_status_label.config(bg='gray')
- self.test_status_label.config(text='NOT STARTED')
- if count2 == total_no_of_steps: # if all the steps are pass
- self.test_status_label.config(bg='green')
- self.test_status_label.config(text='DONE!')
- flag = 1
- if (count3 + count2 + count1) == total_no_of_steps: # if all the steps are done
- self.start_button.config(state=NORMAL) # enable the start button
- for button in self.step_buttons:
- button.config(state=NORMAL)
- if flag == 0: # if there is no error and all the steps aren't done yet
- self.root.after(100, self.update_test_status)
- else: # if there is an error or if all the steps are done already
- self.context.device_info['SrNo'] = ""
- return 0
- if __name__ == "__main__":
- g = GuiThread()
- g.run()
|