Example 2: Writing a Threading Based Script¶
If a measurement script will need a considerable amount of time ( > 3s) for execution, you should use the template tmpusrscriptmthread.py.
The main difference with the simple template is that it utilizes multithreading to keep the graphical user interface “alive”.
Step 1) Define the ‘Measurement Sequence’¶
The goal of this application is to monitor the spectrum and output a message if a signal exceeds a certain level.
Therefore, the script must perform the following steps:
Ask the user for the trigger level
Connect to the device
Add a RT Spectrum Task if it does not already exist
Add a Peak Table to monitor the spectrum if it does not already exist and configure it
Configure the Peak Table for the new trigger level
Wait until a signal has exceeded the trigger level or the user wants to cancel the process
Output a status message
Disconnect from the device
Step 2) Start with a Template¶
Predefined template files are available for various kind of applications.
The ‘measurement sequence’ as described in ‘step 1)’ takes an indefinite time. Therefore we have to use to use the template tmpusrscriptmthread.py. The main difference with the simple template is that it utilizes multithreading to keep the graphical user interface “alive”.
For more information see: Script Templates
Step 3) Adapt the Template File¶
In this step we have to adapt the template file according to our defined goal.
Note
It is recommended to use an IDE such as PyCharm to create and edit a script. These offer the possibility of debugging the script execution in addition to autocompletion of code.
The Narda Script Launcher API uses four spaces per indentation. The type of indentation must not be changed within a script!
Name the Script and Add a Description¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | class PeakTrigger(UsrScriptBase):
"""User script class"""
def __init__(self, main_gui, dev=SignalSharkDev()):
"""Initialization. Please leave this line unchanged"""
super().__init__(main_gui, dev, __file__)
# Script settings.
# -------------------------------------------------------------------------------
# Please adapt the following lines of code according to your script.
self._tab_name = 'Examples'
self._scr_title = 'Example02 - Peak Trigger'
self._scr_description = 'Monitors the spectrum and shows a message if a signal exceeds a certain level.'
# self._icon_path = self.script_path.joinpath('NardaScriptLauncher_example01.png')
self._list_prio = 2
self._nsl_executed_behavior = NSL_Executed_Behaviors.MINIMIZE_NSL
# Add class variables if needed
# -------------------------------------------------------------------------------
# self.my_variable = None
|
Code the Defined Measurement Sequence In “_run_script(“¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | def _run_script(self, args):
""" Script main function
This method is called when a user clicks on the corresponding script button.
"""
# Read the default trigger value
sig_tresh = self.config.get('sig_tresh', -20)
# Setup and show trigger level dialog
value_list = []
for i in range(-100, 0, 10):
value_list.append(str(i))
dlg = tb.ListMsgBox(self.frm_dlghelper, 'Trigger level: ',
value_list, ' dBm', str(sig_tresh), 'Title')
if self.ShowDlgModalTop(dlg) == wx.ID_OK:
sig_tresh = float(dlg.value)
self.config['sig_tresh'] = sig_tresh
else:
dlg.Destroy() # Please do not forget to destroy the created dialog if you do not need it anymore
# User has canceled selection
return
dlg.Destroy() # Please do not forget to destroy the created dialog if you do not need it anymore
# Connect to the device
if not self.signalshark.connect():
self.MessageBoxModal('Cannot connect to the device!', 'Connection error',
wx.OK | wx.ICON_ERROR)
return
# Clear SCPI error queue
self.signalshark.scpi.check_error()
# Add a RT Spectrum Task if it does not already exist.
if not self.signalshark.scpi.check_add_task(TaskTypes.RT_SPECTRUM):
self.MessageBoxModal('Please configure the newly added task (e.g. frequency range)', 'New Task',
wx.OK | wx.ICON_INFORMATION)
self.signalshark.disconnect()
return
# Configure the task regarding the trigger function
self.signalshark.scpi.display.set_unit(LevelUnits.dBm)
self.signalshark.scpi.spectrum.set_trace_enable(SpecTraceTypes.MxP, True)
# Add a Peak Table to monitor the spectrum if it does not already exist and configure it
exists, view_index = self.signalshark.scpi.check_add_view(ViewTypes.PEAK_TABLE)
if view_index < 1:
self.MessageBoxModal('Adding peak table not possible.',
'Peak Table', wx.OK | wx.ICON_ERROR)
return
# Configure the peaktable if it did not already exist
if not exists:
self.signalshark.scpi.display.set_peaktable_sort(PeakSort.FREQUENCY)
self.signalshark.scpi.marker.set_spectrum_search_peak_excursion(3.0)
self.signalshark.scpi.marker.set_spectrum_search_peak_excursion_enable(True)
# Configure the Peak Table for the new trigger level
self.signalshark.scpi.marker.set_spectrum_search_threshold(sig_tresh)
self.signalshark.scpi.marker.set_spectrum_search_limits_enable(True)
# Disconnect from the device
self.signalshark.disconnect()
# Create an additional measurement thread to keep the GUI alive.
# Please change 'MyUserScript._run_measurement' according to your class name.
# With 'args=[param1, parm2, paramn]' you can pass some optional parameters to the measurement thread.
mthread = MeasDlgThread(main_gui=self.main_gui, callback=PeakTrigger._run_measurement,
args=[self.signalshark.addr])
# Start the measurement thread and show a visualization (progress) dialog.
mthread.start_measurement()
# The lines of code below will be executed after the measurement thread has done it's job.
# ...
# Output a status message
if mthread.result is not None:
self.MessageBoxModal('Signal frequency: ' + str(mthread.result),
'Peak Trigger Result', wx.OK | wx.ICON_INFORMATION)
|
Code the Thread Function “_run_measurement(“¶
A detailed description of the functions “stopevent”, “update_progress” and “wait_for_stopevent” can be found in chapter Additional Multithreading Functions:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | @staticmethod
def _run_measurement(stopevent, update_progress, wait_for_stopevent,
signalshark_addr: str):
""" Main method of the measurement thread that handles a time consuming measurement.
Please do NOT use any wxPython elements like a MessageBox or a Dialog here!
They are only permitted in the GUI thread ('_run_script' method).
Please use the "update_progress" method to display information!
:param stopevent: Thread flag that indicates whether the procedure should be finished/cancelled
:param update_progress: Delegate method to update a progress bar, message string and icon
:param wait_for_stopevent: Delegate method to wait until stopevent is raised with possibility to show a message.
:param signalshark_addr: Custom parameter to handover the ip address of the SignalShark
:return:
"""
result = None
# Setup connection to the SignalShark and display error message if connection fails.
signalshark = SignalSharkDev(signalshark_addr)
if not signalshark.connect():
return wait_for_stopevent('Cannot connect to the device!') # Returns None
try:
# Update the dialog message text and show a progress bar
update_progress(msg='Waiting for trigger', show_progress=True)
# Wait until a signal has exceeded the trigger level or the user wants to cancel the process.
while True:
# Periodically check, whether the user has clicked the cancel button,
# and exit thread if so.
if stopevent.isSet():
return None
# Example measurement:
# Wait until Scan Number and Peak Table data count have been increased.
signalshark.scpi.run_cont()
scan_number = signalshark.scpi.peaktable.get_data_update()
if scan_number < 0:
return wait_for_stopevent('Cannot acquire measurement data!', icon_style=wx.ICON_ERROR)
if scan_number > 0:
signalshark.scpi.hold()
data_cnt = signalshark.scpi.peaktable.get_data_count()
if data_cnt > 0:
# Get signal frequency
freq_str_arr = signalshark.scpi.peaktable.get_data_frequency()
frequ_str = freq_str_arr[0]
result = round(float(frequ_str) / 1e6, 6)
break
# Allow the thread to synchronize the stop event.
stopevent.wait(0.05)
# Returns the signal frequency or None
return result
except Exception as e:
# Do some error handling
return wait_for_stopevent(str(e)) # Returns None
finally:
# Close connection.
signalshark.disconnect()
|
Add Packages to the Import Section of the Module¶
In this example we use the ‘ListMsgBox’ to let the user select the target task.
However, the ‘ListMsgBox’ is not part of the standard Python library, but is included in the ‘toolbox’ module of the nardascripting API and must therefore be imported first.
To do this, add the following line of code to the import section.
import nardascripting.base.toolbox as tb
1 2 3 4 5 6 | from pathlib import Path
import wx
from nardascripting.base.usrscriptbase import *
from nardascripting.base.signalsharkdev import *
from nardascripting.base.measthread import MeasDlgThread
import nardascripting.base.toolbox as tb
|
Step 4) Debug the Script¶
It is recommended to use an IDE such as PyCharm to create and edit a script. These offer the possibility of debugging the script execution in addition to autocompletion of code.
The application note “Developing Scripts for SignalShark with PyCharm.pdf”, which can be downloaded from the Narda website, describes how to use the Narda Script Launcher together with PyCharm.
Note
The Narda Script Launcher API uses four spaces per indentation. The type of indentation must not be changed within a script!
You can view and download the complete script here: