tmpusrscriptmthread.py¶
If a measurement script will need a considerable amount of time ( > 3s) for execution, it is recommended to use the template “tmpusrscriptmthread.py”.
The main difference with the simple template is that it utilizes multithreading to keep the graphical user interface “alive”.
Detailed description see: Additional Multithreading Functions:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | #!/usr/bin/env python
##############################################################################
##
## Copyright (C) 2021 Narda Safety Test Solutions GmbH.
## Contact: http://www.narda-sts.de
##
## Redistribution and use in source and binary forms,
## with or without modification, are permitted provided that the following
## conditions are met:
##
## 1.) Redistributions of source code must retain the above copyright notice,
## this list of conditions and the following disclaimer.
##
## 2.) Redistributions in binary form must reproduce the above copyright
## notice, this list of conditions and the following disclaimer in the
## documentation and/or other materials provided with the distribution.
##
## 3.) Neither the name of the copyright holder nor the names of its
## contributors may be used to endorse or promote products derived from this
## software without specific prior written permission.
##
## 4.) EXPORT CONTROL
## The Software, including technical data /cryptographic software,
## may be subject to, German, European Union and U.S. export controls and
## may be subject to import or export controls in other countries.
## The Licensee agrees to strictly comply with all applicable import and
## export regulations. He specifically agrees, that he must not disclose or
## otherwise export or re-export the Licensed Software or any part thereof
## delivered under this end user license agreement (EULA) to any country
## (including a national or resident of such country) without a valid export
## or import license. Please be aware that the Software may contain
## US-Content, therefor the Licensee represent and warrant that he is not a
## citizen of, or otherwise located within, an embargoed nation (including
## without limitation Iran, Syria, Sudan, Cuba, North Korea) and that he is
## not otherwise prohibited under the Export Laws from receiving the Software.
## All rights to Use the Software are granted on condition that such rights
## are forfeited if the Licensee fails to comply with
## the terms of this Agreement.
##
## 5.) SEVERABILITY
## Should any provision of this Agreement be or become invalid, ineffective
## or unenforceable, the remaining provisions of this Agreement shall be
## valid. The parties agree to replace the invalid, ineffective or
## unenforceable provision by a valid, effective and enforceable provision
## which best meets the commercial intention of the parties.
## The same shall apply in case of omissions.
##
## 6.) APPLICABLE LAW AND PLACE OF JURISTICATION
## This Agreement shall be constituted under the law of Germany.
## The United Nations Convention on the International Sale of Goods
## shall not apply to this Agreement. The Place of Jurisdiction for any
## dispute between the Parties shall be Tübingen (Germany).
##
## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
## “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
## TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
## PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Narda Safety Test Solutions GmbH
## BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
## CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
## SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
## INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
## CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
## ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
## POSSIBILITY OF SUCH DAMAGE.
##
##############################################################################
from pathlib import Path
import wx
from nardascripting.base.usrscriptbase import *
from nardascripting.base.signalsharkdev import *
from nardascripting.base.measthread import MeasDlgThread
import nardascripting.base.toolbox as TB
class MyMThreadUserScript(UsrScriptBase):
"""User script class"""
def __init__(self, main_gui, dev=SignalSharkDev()):
"""Initialization. Please leave this line unchanged"""
super().__init__(main_gui, dev, __file__)
# Script settings.
# -------------------------------------------------------------------------------
# Please adapt the following lines of code according to your script.
self._tab_name = 'Templates'
self._scr_title = 'Measurement Thread User Script'
self._scr_description = 'Script that contains a time consuming measurement.'
# self._icon_path = self.script_path.joinpath('NardaScriptLauncher_example01.png')
self._list_prio = 2
self._nsl_executed_behavior = NSL_Executed_Behaviors.SHOW_NSL
# Add class variables if needed
# -------------------------------------------------------------------------------
# self.my_variable = None
def _run_script(self, args):
""" Script main function
This method is called when a user clicks on the corresponding script button.
"""
# Optional: Prepare some parameters for the measurement.
measurement_steps = 10
# Create an additional measurement thread to keep the GUI alive.
# Please change 'MyUserScript._run_measurement' according to your class name.
# With "args=[param1, parm2, paramn]" you can pass some optional parameters to the measurement thread.
mthread = MeasDlgThread(main_gui=self.main_gui, callback=MyMThreadUserScript._run_measurement,
args=[self.signalshark.addr, measurement_steps])
# Start the measurement thread and show a visualization (progress) dialog.
mthread.start_measurement()
# The lines of code below will be executed after the measurement thread has done it's job.
# ...
# Evaluate the result of the measurement thread (if any).
if mthread.result is not None:
self.MessageBoxModal('Max value: ' + str(mthread.result),
'Measurement Result', wx.OK | wx.ICON_INFORMATION)
@staticmethod
def _run_measurement(stopevent, update_progress, wait_for_stopevent,
signalshark_addr: str, measurement_steps: int):
""" Main method of the measurement thread that handles a time consuming measurement.
Please do NOT use any wxPython elements like a MessageBox or a Dialog here!
They are only permitted in the GUI thread ('_run_script' method).
Please use the "update_progress" method to display information!
:param stopevent: Thread flag that indicates whether the procedure should be finished/cancelled
:param update_progress: Delegate method to update a progress bar, message string and icon
:param wait_for_stopevent: Delegate method to wait until stopevent is raised with possibility to show a message.
:param signalshark_addr: Custom parameter to handover the ip address of the SignalShark
:param measurement_steps: Custom parameter
:return:
"""
# Initialize return value (if any)
max_value = -999.0
# Setup connection to the SignalShark and display error message if connection fails.
# -------------------------------------------------------------------------------
signalshark = SignalSharkDev(signalshark_addr)
if not signalshark.connect():
return wait_for_stopevent('Cannot connect to the device!', icon_style=wx.ICON_ERROR)
try:
# Prepare progress value calculation. Progress is a value from 0.0 to 1.0
# -------------------------------------------------------------------------------
progress_step = 1.0 / measurement_steps
act_progress = 0.0
# Setup measurement:
# -------------------------------------------------------------------------------
# Check if desired Task exists and add one if not.
task_exists = signalshark.scpi.check_add_task(TaskTypes.RT_SPECTRUM, 'MyMeas')
# Check if desired View exists and add one if not.
view_exists, view_index = signalshark.scpi.check_add_view(ViewTypes.SPECTRUM)
if view_index < 0:
return wait_for_stopevent('Adding Level Meter View failed!', icon_style=wx.ICON_ERROR)
# Setup Task/View if it did not exist yet:
if not task_exists or not view_exists:
signalshark.scpi.spectrum.set_frequency_stop(110e6)
signalshark.scpi.spectrum.set_frequency_start(85e6)
signalshark.scpi.spectrum.set_rbw_auto(True)
signalshark.scpi.spectrum.set_measurement_time(1.0)
signalshark.scpi.spectrum.set_trace_enable(SpecTraceTypes.PPk, True)
# Do measurement:
# -------------------------------------------------------------------------------
# Update the dialog message text and show a progress bar
update_progress(msg='Starting measurement', show_progress=True)
# Loop, that simulates a time consuming measurement
for x in range(measurement_steps):
# Periodically check, whether the user has clicked the cancel button
if stopevent.isSet():
return
# Wait until Scan Number has increased.
scan_number = -1
last_scan_number = -1
while True:
scan_number = signalshark.scpi.spectrum.get_data_update(last_scan_number)
if scan_number < 0:
return wait_for_stopevent('Cannot acquire measurement data!', icon_style=wx.ICON_ERROR)
elif scan_number > last_scan_number or stopevent.isSet():
break
# Allow the thread to synchronize the stop event.
stopevent.wait(0.1)
# Get new +Pk trace and find max value:
ppk_trace = signalshark.scpi.spectrum.get_data_level(SpecTraceTypes.PPk)
if len(ppk_trace) > 0:
act_max_value = max(ppk_trace)
if act_max_value > max_value:
max_value = act_max_value
# Update the progress bar.
act_progress += progress_step
update_progress(progress=act_progress)
# Allow the thread to synchronize the stop event.
stopevent.wait(0.05)
# Return some optional measurement result (number, string, list, dict, object ...)
return max_value
except Exception as e:
# Do some error handling
max_value = None
return wait_for_stopevent(str(e), icon_style=wx.ICON_ERROR)
finally:
# Close connection.
signalshark.disconnect()
|