#!/usr/bin/python # _*_ coding: utf-8 _*_ import traceback try: from Tkinter import * # python2 import ttk import tkMessageBox import tkFont except Exception as e: # Python 2环境下不会走到这里,保留仅做兼容占位 from tkinter import * from tkinter import ttk from tkinter import messagebox as tkMessageBox import tkinter.font as tkFont import sys import os import subprocess import time import threading import csv from datetime import datetime import subprocess sys.path.append("./") for i in sys.path: print(i) from requests_offline import requests import test_function # class test_function: # @staticmethod # def default_test(context, args=""): # """默认测试函数,实际使用时替换为真实逻辑""" # time.sleep(0.1) # return True, "Test passed (default function)" # # sys.modules['test_function'] = test_function class TestContext: def __init__(self): self.script_ver = 'C180-C-C7-v2.6' self.log_file = './Logs/temp.txt' self.log1_buffer = "" self.run_log_content = "" self.log_file_handle = None self.current_log_path = None self.tp_path = "./testplan.csv" self.device_info = {} #self.mes_url = "http://60.188.52.164:8079/sfc_response.aspx" #self.TSID = "2208C_10_Link_PCBA01" #self.station_id = "2208C_10_Link_PCBA01" self.mes_url = "http://10.60.64.87/BobcatH/sfc_response.aspx" self.TSID = "ZT001_Function_Test101" self.station_id = "ZT001_Function_Test101" self.params_dict = { 'path_loss': '25', 'wave_file': './11b_waveforms/wave11rc_1000.mod', 'per_limit': '92', 'SA_RSSI': '-80', 'BTOn_RSSI': '-30' } try: if sys.version_info >= (3, 0): # Python 3 直接用 open + encoding with open(self.tp_path, "r", encoding='utf-8') as f: reader = csv.DictReader(f) self.testplan_list = [] for row in reader: if row.get("Disable", "") != "Y": self.testplan_list.append(row) else: try: f = open(self.tp_path, 'r') reader = csv.DictReader(f) self.testplan_list = [] # 修复Python 2中不等于运算符(~=是错误的,应该是!=) for row in reader: if row.get("Disable", "") != "Y": self.testplan_list.append(row) except Exception as e: traceback.print_exc() finally: f.close() # Python 2需显式关闭文件 except Exception as e: print(e) self.step_status = ['ns'] * len(self.testplan_list) self.result_msg = [''] * len(self.testplan_list) self.current_loop = 0 # 当前循环次数 self.total_loops = 1 # 总循环次数 def mes_check_sn(self, sn): try: data = { "C": "QUERY_RECORD", "SN": sn, "P": "unit_process_check", "TSID": self.TSID } res = requests.post(self.mes_url, data=data, timeout=10) text = res.text.strip() # 判断是否通过 #self.print1("MES Result:" + text) 260409 by joe if "unit_process_check=OK" in text: return True, "SN校验通过" else: return False, text except Exception as e: return False,"网络异常:{}".format(str(e)) def mes_get_value(self, sn, mes_key): mes_key = mes_key or "PART_NO" try: data = { "C": "QUERY_RECORD", "SN": sn, "P": mes_key } res = requests.post(self.mes_url, data=data, timeout=10) text = res.text.strip() #self.print1("MES Result:" + text) 260409 by joe if "0 SFC_OK" in text: return True, text else: return False, text except Exception as e: return False,"网络异常:{}".format(str(e)) def mes_upload_result(self, sn, status, error_code="", values={}): try: data = { "C": "ADD_RECORD", "STATUS": status, # PASS / FAIL "STATION_ID": self.station_id, "SN": sn, "ERROR_CODE": error_code } if values: data.update(values) res = requests.post(self.mes_url, data=data, timeout=10) text = res.text.strip() self.print1("MES Result:" + text) if "0 SFC_OK" in text: return True, "过站成功" else: return False, text except Exception as e: return False, "网络异常:{}".format(str(e)) def create_sample_testplan(self): if not os.path.exists(os.path.dirname(self.tp_path)): os.makedirs(os.path.dirname(self.tp_path)) with open(self.tp_path, 'w', encoding='utf-8', newline='') as f: writer = csv.DictWriter(f, fieldnames=['TestName', 'Function', 'Arguments', 'Disable']) writer.writeheader() writer.writerow({ 'TestName': 'Sample Test', 'Function': 'default_test', 'Arguments': '', 'Disable': '' }) with open(self.tp_path, "r", encoding="utf-8") as f: reader = csv.DictReader(f) self.testplan_list = [] for row in reader: if row.get("Disable", "") != "Y": self.testplan_list.append(row) def print1(self, string, newline=0): curr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) log_str = "[{}] {}".format(curr_time,string) if newline == 0: self.log1_buffer += "\n" + log_str elif newline == 1: self.log1_buffer += "\n" + log_str + "\n" # 实时写入文件 if self.log_file_handle is not None: try: self.log_file_handle.write(log_str + "\n") self.log_file_handle.flush() # 强制刷盘 except Exception: pass # def write_log_to_file(self): # log_dir = "./Logs/" # if not os.path.exists(log_dir): # os.makedirs(log_dir) # # filename = time.strftime("log_%Y-%m-%d_%H-%M-%S.txt", time.localtime()) # log_path = os.path.join(log_dir, filename) # # with open(log_path, 'w', encoding='utf-8') as f: # f.write(self.run_log_content) # # self.run_log_content = "" def start_new_log(self): # 每次测试新建一个带时间戳的日志文件 log_dir = "./Logs/" if not os.path.exists(log_dir): os.makedirs(log_dir) filename = time.strftime("log_%Y-%m-%d_%H-%M-%S.txt", time.localtime()) self.current_log_path = os.path.join(log_dir, filename) try: self.log_file_handle = open(self.current_log_path, 'w', encoding='utf-8') except: self.log_file_handle = open(self.current_log_path, 'w') def close_log(self): # 关闭文件句柄 if self.log_file_handle is not None: self.log_file_handle.close() self.log_file_handle = None class IO_process: def __init__(self, log_file_path): self.log_file = log_file_path self.line_count = 0 def read_file(self): try: f = open(self.log_file, 'r') flag = 0 line = "" for i, curr_line in enumerate(f): if i == self.line_count: self.line_count += 1 flag = 1 line = curr_line break f.close() if flag == 1: return line.rstrip('\r') else: return "" except Exception as e: return "" def empty_file(self): f = open(self.log_file, 'w') f.close() def reset_counter(self): self.line_count = 0 class MasterThread(threading.Thread): def __init__(self, context, selected_steps): threading.Thread.__init__(self) self.context = context self.stop_flag = False self.selected_steps = selected_steps def run(self): for loop_num in range(1, self.context.total_loops + 1): if self.stop_flag: break self.context.start_new_log() self.context.current_loop = loop_num self.context.print1("=" * 80, 1) self.context.print1("Device SN: {}".format(self.context.device_info["ScanSN"])) self.context.print1("Loop {}/{} - X86 C180 Test Started".format(loop_num, self.context.total_loops), 1) self.context.print1("Script Version: {}".format(self.context.script_ver), 1) self.context.print1("Selected Test Steps: {}".format(self.selected_steps), 1) self.context.print1("=" * 80, 1) self.context.step_status = ['ns'] * len(self.context.testplan_list) self.context.result_msg = [''] * len(self.context.testplan_list) for step_num in self.selected_steps: if self.stop_flag: break test_item = self.context.testplan_list[step_num] self.context.print1("\n----------------------------------------------------") self.context.print1("Loop {} - Step {} Started test...".format(loop_num, step_num)) func_name = test_item.get("Function", "") arguments = test_item.get("Arguments", "") func = getattr(test_function, func_name, None) if 'test_function' in sys.modules else None if not func: self.context.step_status[step_num] = "f" self.context.result_msg[step_num] = "%s is not defined" % func_name continue self.context.step_status[step_num] = 'r' try: if arguments: result, result_msg = func(self.context, arguments) else: result, result_msg = func(self.context) except Exception as e: result = False result_msg = "Error: {}".format(str(e)) self.context.step_status[step_num] = "p" if result else "f" self.context.result_msg[step_num] = result_msg self.context.print1("Loop {} - Step {} Result: {}".format(loop_num, step_num, result_msg)) self.context.print1("\n----------------------------------------------------") if not self.stop_flag: self.context.print1("\nLoop {} Completed!".format(loop_num, 1)) if loop_num < self.context.total_loops: self.context.print1("Waiting 2 seconds before next loop...", 1) time.sleep(2) # self.context.print1("\n" + "=" * 80, 1) # if self.stop_flag: # self.context.print1("Test stopped manually!", 1) # else: # self.context.print1("All {} loops completed!".format(self.context.total_loops), 1) # self.context.print1("Test concluded!", 1) # self.context.print1("=" * 80, 1) self.context.close_log() # self.context.close_log() def stop(self): """停止测试线程""" self.stop_flag = True class GuiThread(object): def __init__(self): self.context = TestContext() self.thread_list = {} self.counter = 0 self.root = Tk() self.root.geometry("1800x1200") small_font = tkFont.Font(family="DejaVu Sans", size=8) self.start_time = None self.timer_label = None self.timer_label = None self.timer_id = None self.test_finished = False self.root.protocol("WM_DELETE_WINDOW", self.destroy) self.root.title("WiPER: FT Simular Test") self.sn_frame = Frame(self.root) self.need_sn_var = BooleanVar() # 存储复选框状态 self.need_sn_var.set(True) # 默认勾选(需要校验SN) self.need_sn_check = Checkbutton( self.sn_frame, text="Need SN", variable=self.need_sn_var, font=("Arial", 9), command=self.toggle_sn_input # 勾选状态变化时触发输入框启用/禁用 ) self.offline_test_var = BooleanVar() self.offline_test_var.set(False) # 默认不勾选(正常联网上传) self.sn_label = Label(self.sn_frame, text="Device SN:", fg='darkblue', font=("Arial", 10)) self.sn_entry = Entry(self.sn_frame, width=20, font=("Arial", 10)) # 宽度更大,适配SN长度 self.sn_entry.insert(0, "") # 默认空值 self.sn_entry.bind('', lambda event: self.get_device_sn()) self.get_sn_btn = Button(self.sn_frame, text="Confirm SN", font=("Arial", 9), command=self.get_device_sn, width=16) self.sn_hint = Label(self.sn_frame, text="(Device Serial Number)", fg='gray', font=("Arial", 8)) self.need_sn_check.pack(side=LEFT, padx=5) self.sn_label.pack(side=LEFT, padx=5) self.sn_entry.pack(side=LEFT, padx=5) self.get_sn_btn.pack(side=LEFT, padx=5) # 按钮放在输入框后面 self.sn_hint.pack(side=LEFT, padx=5) self.offline_check = Checkbutton( self.sn_frame, text="离线测试", variable=self.offline_test_var, font=("Arial", 9), fg='red' ) self.offline_check.pack(side=LEFT, padx=5) self.sn_frame.pack(pady=5) self.time_loop_frame = Frame(self.root) self.time_loop_frame.pack(pady=5) self.timer_label = Label(self.time_loop_frame, text="Time: 00:00:00", fg='darkred', font=tkFont.Font(family="Arial", size=10, weight="bold")) self.loop_status_label = Label(self.time_loop_frame, text="Current Loop: 0/0", fg='purple', font=tkFont.Font(family="Arial")) self.loop_label = Label(self.time_loop_frame, text="Loop Count:", fg='darkblue', font=("Arial", 10)) self.loop_entry = Entry(self.time_loop_frame, width=10, font=("Arial", 10)) self.loop_entry.insert(0, "1") # 默认循环1次 self.loop_hint = Label(self.time_loop_frame, text="(test loops)", fg='gray', font=("Arial", 8)) self.timer_label.pack(side=LEFT, padx=5) self.loop_status_label.pack(side=LEFT, padx=5) self.loop_label.pack(side=LEFT, padx=5) self.loop_entry.pack(side=LEFT, padx=5) self.loop_hint.pack(side=LEFT, padx=5) self.note1 = Label(self.root, text="Please press 'Start Test' to begin...", fg='blue') self.start_button = Button(self.root, text="Start Test", command=self.start) self.stop_button = Button(self.root, text="Stop Test", command=self.stop_test, state=DISABLED) self.test_status_label = Label(self.root, text="NOT STARTED", width=15, bg='gray') self.deviceInfo_label = Label(self.root, text="Device SN: ", width=20, bg="#CCCCCC") self.notebook_box = ttk.Notebook(self.root, name='notebook') self.status_frame = Frame(self.root) self.logFrame = Frame(self.root) self.parametersFrame = Frame(self.root) self.logText = Text(self.logFrame) self.logScrollbar = Scrollbar(self.logFrame) self.canvas = Canvas(self.status_frame, bg="white") self.scrollbar_y = Scrollbar(self.status_frame, command=self.canvas.yview) self.lblframe = Frame(self.canvas) self.canvas_window = self.canvas.create_window((0, 0), window=self.lblframe, anchor="nw") self.canvas.configure(yscrollcommand=self.scrollbar_y.set) def on_canvas_configure(event): self.canvas.itemconfig(self.canvas_window, width=event.width) self.canvas.configure(scrollregion=self.canvas.bbox("all")) def on_frame_configure(event): self.canvas.configure(scrollregion=self.canvas.bbox("all")) self.canvas.bind("", on_canvas_configure) self.lblframe.bind("", on_frame_configure) def on_mouse_wheel(event): if event.delta > 0: self.canvas.yview_scroll(-1, "pages") else: self.canvas.yview_scroll(1, "pages") if sys.platform == 'darwin': self.canvas.bind('', on_mouse_wheel) else: self.canvas.bind('', on_mouse_wheel) self.canvas.bind('', lambda e: self.canvas.yview_scroll(-1, "units")) self.canvas.bind('', lambda e: self.canvas.yview_scroll(1, "units")) self.canvas.bind("", lambda e: self.canvas.focus_set()) self.canvas.pack(side=LEFT, fill=BOTH, expand=True) self.scrollbar_y.pack(side=RIGHT, fill=Y) self.index_list = [] self.step_check_vars = [] # 复选框变量 self.step_checkbuttons = [] # 复选框控件 self.step_buttons = [] self.step_name_list = [] self.step_data_list = [] self.step_status_list = [] self.select_all_var = BooleanVar(value=True) # 默认全选 self.select_all_check = Checkbutton( self.lblframe, variable=self.select_all_var, command=self.on_select_all_toggle, # 切换时自动全选/取消全选 anchor='center' ) self.select_all_check.grid(row=0, column=0, padx=2, pady=1) title_1 = Label(self.lblframe, text="Index", anchor='center', width=5, font=small_font) title_1.grid(row=0, column=1, sticky=W, padx=2, pady=1) title_2 = Label(self.lblframe, text="Run Steps", anchor='center', width=12, font=small_font) title_2.grid(row=0, column=2, sticky=W, padx=2, pady=1) title_3 = Label(self.lblframe, text="TestName", anchor='center', width=50, font=small_font) title_3.grid(row=0, column=3, sticky=W, padx=2, pady=1) title_4 = Label(self.lblframe, text="TestData", anchor='center', width=50, font=small_font) title_4.grid(row=0, column=4, sticky=W, padx=2, pady=1) title_5 = Label(self.lblframe, text="TestStatus", anchor='center', width=20, font=small_font) title_5.grid(row=0, column=5, sticky=W, padx=2, pady=1) for index, desc in enumerate(self.context.testplan_list): # 新增:复选框(默认全选) check_var = BooleanVar(value=True) self.step_check_vars.append(check_var) check_btn = Checkbutton( self.lblframe, variable=check_var, anchor='center', command=self.update_select_all_state # 点击时自动更新顶部全选框 ) self.step_checkbuttons.append(check_btn) index_lable = Label(self.lblframe, text=str(index), anchor='center', width=5, font=small_font) self.index_list.append(index_lable) btn = Button(self.lblframe, text="Run Step %d" % index, command=lambda num=index: self.run_single_step(num), width=12, bg='lightblue', font=small_font) self.step_buttons.append(btn) item_label = Label(self.lblframe, text=desc.get("TestName", ""), anchor='w', bg='#CCCCCC', width=50, font=small_font) self.step_name_list.append(item_label) data_label = Label(self.lblframe, text="", anchor='w', bg='#CCCCCC', width=50, font=small_font) self.step_data_list.append(data_label) status_label = Label(self.lblframe, text="NOT STARTED", bg='#CCCCCC', width=20, font=small_font) self.step_status_list.append(status_label) self.my_io_process = IO_process(self.context.log_file) self.readLog1() self.set_param_frame() def select_all(self): for var in self.step_check_vars: var.set(True) def unselect_all(self): for var in self.step_check_vars: var.set(False) def get_selected_steps(self): selected = [] for idx, var in enumerate(self.step_check_vars): if var.get(): selected.append(idx) return selected def toggle_sn_input(self): if self.need_sn_var.get(): self.sn_entry.config(state=NORMAL) self.get_sn_btn.config(state=NORMAL) self.sn_label.config(fg='darkblue') else: self.sn_entry.config(state=DISABLED) self.get_sn_btn.config(state=DISABLED) self.sn_label.config(fg='gray') self.deviceInfo_label.config(text="Device SN: N/A") self.context.device_info['ScanSN'] = "N/A" def get_device_sn(self): if not self.need_sn_var.get(): return test_running = 'thread' in self.thread_list and self.thread_list['thread'].is_alive() if test_running: return sn_value = self.sn_entry.get().strip() if sn_value: self.context.device_info['ScanSN'] = sn_value else: self.context.device_info['ScanSN'] = "Unknown" tkMessageBox.showerror("Error", "请输入SN!") return sn_length = len(sn_value) if sn_length != 11: tkMessageBox.showerror("Error", "SN长度不合格!应该为11位。") return string = "Device: " + sn_value self.deviceInfo_label.config(text=string) self.start() def update_timer(self): if self.start_time and not self.test_finished: elapsed_seconds = int(time.time() - self.start_time) hours = elapsed_seconds // 3600 minutes = (elapsed_seconds % 3600) // 60 seconds = elapsed_seconds % 60 timer_text = "Time: %02d:%02d:%02d" % (hours, minutes, seconds) self.timer_label.config(text=timer_text) self.loop_status_label.config( text="Current Loop: {}/{}".format(self.context.current_loop, self.context.total_loops) ) self.timer_id = self.root.after(1000, self.update_timer) def stop_timer(self): if self.timer_id: self.root.after_cancel(self.timer_id) self.timer_id = None self.test_finished = True self.loop_status_label.config( text="Loop: {}/{}".format(self.context.current_loop, self.context.total_loops) ) if self.start_time: total_seconds = int(time.time() - self.start_time) hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 seconds = total_seconds % 60 final_text = "Total Time: %02d:%02d:%02d" % (hours, minutes, seconds) self.timer_label.config(text=final_text, fg='darkgreen') def stop_test(self): if tkMessageBox.askokcancel("Stop Test?", "Are you sure you want to stop the test?"): if 'thread' in self.thread_list and self.thread_list['thread'].is_alive(): self.thread_list['thread'].stop() self.stop_timer() self.test_status_label.config(text='STOPPED', bg='orange') self.start_button.config(state=NORMAL) self.stop_button.config(state=DISABLED) self.get_sn_btn.config(state=NORMAL if self.need_sn_var.get() else DISABLED) self.sn_entry.config(state=NORMAL if self.need_sn_var.get() else DISABLED) self.loop_entry.config(state=NORMAL) self.select_all_check.config(state=NORMAL) self.offline_check.config(state=NORMAL) for button in self.step_buttons: button.config(state=NORMAL) for check_obj in self.step_checkbuttons: check_obj.config(state=NORMAL) self.context.close_log() def destroy(self): if tkMessageBox.askokcancel("Quit?", "Are you sure you want to quit?"): if 'thread' in self.thread_list and self.thread_list['thread'].is_alive(): self.thread_list['thread'].stop() self.stop_timer() self.root.quit() self.context.close_log() def run(self): self.note1.pack() btn_frame = Frame(self.root) btn_frame.pack() self.start_button = Button(btn_frame, text="Start Test", command=self.start) self.stop_button = Button(btn_frame, text="Stop Test", command=self.stop_test, state=DISABLED) self.start_button.pack(side=LEFT, padx=5) self.stop_button.pack(side=LEFT, padx=5) self.test_status_label.pack() self.deviceInfo_label.pack(fill=X, expand=False, padx=50, pady=20) self.logScrollbar.config(command=self.logText.yview) self.logText.config(yscrollcommand=self.logScrollbar.set) self.logText.pack(side=LEFT, fill=Y) self.logScrollbar.pack(side=RIGHT, fill=Y) # 布局步骤列表 for index in range(len(self.context.testplan_list)): self.step_checkbuttons[index].grid(row=index + 1, column=0, sticky=W, padx=2, pady=0) self.index_list[index].grid(row=index + 1, column=1, sticky=W, padx=2, pady=0) self.step_buttons[index].grid(row=index + 1, column=2, sticky=W, padx=2, pady=0) self.step_name_list[index].grid(row=index + 1, column=3, sticky=W, padx=2, pady=0) self.step_data_list[index].grid(row=index + 1, column=4, sticky=W, padx=2, pady=0) self.step_status_list[index].grid(row=index + 1, column=5, sticky=W, padx=2, pady=0) self.notebook_box.add(self.status_frame, text='Status') self.notebook_box.add(self.logFrame, text='Log') self.notebook_box.add(self.parametersFrame, text='Parameters') self.notebook_box.enable_traversal() self.notebook_box.pack(fill=BOTH, expand="true", padx=5, pady=1) self.root.mainloop() def readLog1(self): if self.context.log1_buffer != "": self.logText.insert(END, self.context.log1_buffer) text = self.my_io_process.read_file() if text != "": self.logText.insert(END, text) if self.context.log1_buffer != "" or text != "": self.logText.yview_pickplace("end") self.context.log1_buffer = "" self.root.update() self.root.after(100, self.readLog1) def set_param_frame(self): self.L = [] self.E = [] count = 1 for key, value in self.context.params_dict.items(): lbl = Label(self.parametersFrame, text=key) enty = Entry(self.parametersFrame, bd=4) enty.delete(0, END) enty.insert(0, value) enty.config(state=DISABLED) lbl.grid(row=count, sticky=W) enty.grid(row=count, column=1) self.L.append(lbl) self.E.append(enty) count += 1 def run_single_step(self, step_num): self.context.print1("\n--------------------single_step---------------------") self.context.print1("Step %d Started test..." % step_num) self.context.step_status[step_num] = 'r' item_info = self.context.testplan_list[step_num] func_name = item_info.get("Function", "") arguments = item_info.get("Arguments", "") func = getattr(test_function, func_name, None) if 'test_function' in sys.modules else None if not func: self.context.step_status[step_num] = "f" self.context.result_msg[step_num] = "%s is not defined" % func_name self.root.after(0, self.update_step_status) else: self.context.step_status[step_num] = 'r' self.root.after(0, self.update_step_status) def run_step(): result, result_msg = False, "FAIL" if arguments: result, result_msg = func(self.context, arguments) else: result, result_msg = func(self.context) self.context.step_status[step_num] = "p" if result else "f" self.context.result_msg[step_num] = result_msg self.context.print1("\n----------------------------------------------------") self.context.print1(result_msg) self.root.after(0, self.update_step_status) thread = threading.Thread(target=run_step) thread.start() def update_step_status(self): total_no_of_steps = len(self.context.step_status) for index in range(total_no_of_steps): status = self.context.step_status[index] if status == 'r': self.step_status_list[index]["text"] = "RUNNING..." self.step_status_list[index]["bg"] = 'yellow' elif status == 'p': self.step_status_list[index]["text"] = "PASS" self.step_data_list[index]["text"] = self.context.result_msg[index] self.step_status_list[index]["bg"] = 'green' elif status == 'f': self.step_status_list[index]["text"] = "ERROR!" self.step_status_list[index]["bg"] = 'red' self.step_data_list[index]["text"] = self.context.result_msg[index] self.root.after(200, self.update_step_status) def start(self): # 是否离线模式 is_offline = self.offline_test_var.get() self.context.device_info['offline_test'] = is_offline # 获取勾选的测试步骤 selected_steps = self.get_selected_steps() if not selected_steps: tkMessageBox.showerror("Error", "请至少选择一个测试项!") return # 获取循环次数 try: loop_count = int(self.loop_entry.get()) if loop_count < 1: tkMessageBox.showerror("Error", "Loop count must be at least 1!") return self.context.total_loops = loop_count self.context.current_loop = 0 except ValueError: tkMessageBox.showerror("Error", "Please enter a valid number for loop count!") return # SN校验 if self.need_sn_var.get(): sn_value = self.sn_entry.get().strip() if not sn_value: tkMessageBox.showerror("Error", "请输入SN!") return if len(sn_value) != 11: tkMessageBox.showerror("Error", "SN长度不合格!应该为11位。") return self.context.device_info['ScanSN'] = sn_value self.deviceInfo_label.config(text="Device: " + sn_value) # writeSN 260328 by joe try: proc = subprocess.Popen( #amidelnx_64 ["sh","scansn.sh","/SS",sn_value], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout,stderr = proc.communicate(input="111111\n") result = proc.returncode #a = result.stdout except subprocess.CalledProcessError as e: tkMessageBox.showerror("Error", "Command failed: {e.stderr}") return else: sn_value = "N/A" self.context.device_info['ScanSN'] = "N/A" self.deviceInfo_label.config(text="Device SN: N/A") # MES 校验 if is_offline: self.context.print1("离线模式,跳过MES检查!") else: result, msg = self.context.mes_check_sn(sn_value) if not result: tkMessageBox.showerror("Error", "SN:%s MES check error!!\n%s" % (sn_value, msg)) return self.context.print1(msg) # 启动计时器 self.start_time = time.time() self.test_finished = False self.update_timer() # 界面状态 self.context.step_status = ['x'] + ['ns'] * len(self.context.testplan_list) self.start_button.config(state=DISABLED) self.stop_button.config(state=NORMAL) self.need_sn_check.config(state=DISABLED) self.get_sn_btn.config(state=DISABLED) self.sn_entry.config(state=DISABLED) self.loop_entry.config(state=DISABLED) self.select_all_check.config(state=DISABLED) self.offline_check.config(state=DISABLED) for check_obj in self.step_checkbuttons: check_obj.config(state=DISABLED) for button in self.step_buttons: button.config(state=DISABLED) for data_lable in self.step_data_list: data_lable["text"] = "" self.logText.delete('1.0', 'end') self.my_io_process.reset_counter() self.thread_list['thread'] = MasterThread(self.context, selected_steps) self.thread_list['thread'].start() self.update_test_status() def update_test_status(self): total_no_of_steps = len(self.context.testplan_list) try: ScanSN = self.context.device_info['ScanSN'] string = "Device: " + ScanSN self.deviceInfo_label.config(text=string) except KeyError: string = "Device: " self.deviceInfo_label.config(text=string) flag = 0 count1 = 0 count2 = 0 count3 = 0 for i in range(total_no_of_steps): status = self.context.step_status[i] if status == 'r': self.step_status_list[i]["text"] = "RUNNING..." self.step_status_list[i]["bg"] = 'yellow' self.test_status_label.config(bg='yellow') self.test_status_label.config(text='RUNNING...') self.step_data_list[i]["text"] = "" elif status == 'p': self.step_status_list[i]["text"] = "PASS" self.step_status_list[i]["bg"] = 'green' self.step_data_list[i]["text"] = self.context.result_msg[i] count2 += 1 elif status == 'f': self.step_status_list[i]["text"] = "ERROR!" self.step_status_list[i]["bg"] = 'red' self.test_status_label.config(text='ERROR!') self.test_status_label.config(bg='red') self.step_data_list[i]["text"] = self.context.result_msg[i] count3 += 1 elif status == 'ns': self.step_status_list[i]["text"] = "NOT STARTED" self.step_status_list[i]["bg"] = 'gray' count1 += 1 if count1 == total_no_of_steps: self.test_status_label.config(bg='gray') self.test_status_label.config(text='NOT STARTED') test_running = 'thread' in self.thread_list and self.thread_list['thread'].is_alive() if not test_running and not self.test_finished: if count2 == len(self.get_selected_steps()): self.test_status_label.config(bg='green') self.test_status_label.config(text='DONE!') test_result = "PASS" else: test_result = "FAIL" # MES过站上传 is_offline = self.offline_test_var.get() if not is_offline: test_result_values = {} error_code = "" for i in range(total_no_of_steps): status = self.context.step_status[i] msg = self.context.result_msg[i] item = self.context.testplan_list[i] if status == 'f': test_result_values[item["TestName"]] =msg elif status == "p": test_result_values[item["TestName"]] ="PASS" if status == 'f' and "串口" in item["TestName"]: error_code = "S102" uplaload_result, msg = self.context.mes_upload_result(ScanSN, test_result, error_code, test_result_values) if not uplaload_result: tkMessageBox.showerror("Error", "SN:%s\n%s" % (ScanSN, msg)) else: msg = "离线模式,跳过上传MES" self.context.print1(msg) self.stop_timer() self.start_button.config(state=NORMAL) self.stop_button.config(state=DISABLED) self.need_sn_check.config(state=NORMAL) self.get_sn_btn.config(state=NORMAL if self.need_sn_var.get() else DISABLED) self.sn_entry.config(state=NORMAL if self.need_sn_var.get() else DISABLED) self.loop_entry.config(state=NORMAL) self.select_all_check.config(state=NORMAL) self.offline_check.config(state=NORMAL) for check_obj in self.step_checkbuttons: check_obj.config(state=NORMAL) for button in self.step_buttons: button.config(state=NORMAL) flag = 1 if flag == 0: self.root.after(100, self.update_test_status) else: self.context.device_info['ScanSN'] = "" self.context.close_log() return 0 def on_select_all_toggle(self): is_select_all = self.select_all_var.get() for var in self.step_check_vars: var.set(is_select_all) def update_select_all_state(self): all_checked = all(var.get() for var in self.step_check_vars) self.select_all_var.set(all_checked) if __name__ == "__main__": if not os.path.exists("./Logs"): os.makedirs("./Logs") g = GuiThread() g.run()