rxspectrum.pyΒΆ
This script for Narda Script Launcher application allows a user to open a dialog box and measure real-time spectrum plots in it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | #!/usr/bin/env python
from pathlib import Path
import wx
from nardascripting.base.usrscriptbase import *
from nardascripting.base.signalsharkdev import *
from nardascripting.base.measthread import MeasDlgThread
from .gui.plotdialogs import SpectrumPlotDlg, SpectrumSettings
class SpectrumMeasurement(UsrScriptBase):
"""User script class"""
def __init__(self, main_gui, dev=SignalSharkDev()):
"""Initialization. Please leave this line unchanged"""
super().__init__(main_gui, dev, __file__)
# Script settings.
# -------------------------------------------------------------------------------
# Please adapt the following lines of code according to your script.
self._tab_name = 'Examples'
self._scr_title = 'Examples04 - Spectrum Plots'
self._scr_description = 'Acquires spectrum data and shows them in a plot dialog.'
# self._icon_path = self.script_path.joinpath('NardaScriptLauncher_example01.png')
self._list_prio = 4
self._nsl_executed_behavior = NSL_Executed_Behaviors.SHOW_NSL
# Add class variables if needed
# -------------------------------------------------------------------------------
# self.my_variable = None
def _run_script(self, args):
""" Script main function
This method is called when a user clicks on the corresponding script button.
Use the dialog classes only in this part of the code.
"""
# Connect to the device
# -------------------------------------------------------------------------------
if not self.signalshark.connect():
self.MessageBoxModal('Cannot connect to the device!', 'Connection error',
wx.OK | wx.ICON_ERROR)
return
# Clear SCPI error queue
self.signalshark.scpi.check_error()
# Setup Spectrum View:
# -------------------------------------------------------------------------------
# Add a Spectrum View if it does not already exist and configure it
exists, view_index = self.signalshark.scpi.check_add_view(ViewTypes.SPECTRUM)
if view_index < 1:
self.MessageBoxModal('Adding Spectrum View not possible.',
'Level Meter View', wx.OK | wx.ICON_ERROR)
self.signalshark.disconnect()
return
if not exists:
self.MessageBoxModal('Please configure the newly added Spectrum View (e.g. Fstart/Fstop, RBW, ...)',
'Spectrum View', wx.OK | wx.ICON_INFORMATION)
self.signalshark.disconnect()
return
# Read spectrum settings from device:
self.spectrum_settings = SpectrumSettings.from_device(self.signalshark)
if not self.spectrum_settings:
self.MessageBoxModal('Error reading spectrum settings', 'Settings Error',
wx.OK | wx.ICON_ERROR)
self.signalshark.disconnect()
return
self.signalshark.disconnect()
# Instantiate dialog event class.
# -------------------------------------------------------------------------------
# Pop up dialog for level value
# Dialog for spectrum plots
meas_dlg = SpectrumPlotDlg(self.main_gui, 'Spectrum Plot', self.spectrum_settings)
# Create an additional measurement thread to keep the GUI alive.
# Please change 'MyUserScript._run_measurement' according to your class name.
# With "args=[param1, parm2, paramn]" you can pass some optional parameters to the measurement thread.
mthread = MeasDlgThread(main_gui=self.main_gui, dlg=meas_dlg,
callback=SpectrumMeasurement._run_measurement,
args=[self.signalshark.addr])
# Start the measurement thread and show a visualization (progress) dialog.
mthread.start_measurement()
# The lines of code below will be executed after the measurement thread has done it's job.
# ...
@staticmethod
def _run_measurement(stopevent, update_progress, wait_for_stopevent,
signalshark_addr: str):
""" Main method of the measurement thread that handles a time consuming measurement.
Please do NOT use any wxPython elements like a MessageBox or a Dialog here!
They are only permitted in the GUI thread ('_run_script' method).
Please use the "update_progress" method to display information!
:param stopevent: Thread flag that indicates whether the procedure should be finished/cancelled
:param update_progress: Delegate method to update a progress bar, message string and icon
:param wait_for_stopevent: Delegate method to wait until stopevent is raised with possibility to show a message.
:param signalshark_addr: Custom parameter to handover the ip address of the SignalShark
:return: Optional custom parameter
"""
# Setup connection to the SignalShark and display error message if connection fails.
signalshark = SignalSharkDev(signalshark_addr)
if not signalshark.connect():
return wait_for_stopevent('Cannot connect to the device!', icon_style=wx.ICON_ERROR)
try:
# Update the dialog message text and show a progress bar
update_progress(msg='Measurement running', show_progress=True)
trace_list = signalshark.scpi.spectrum.get_trace_list()
# Loop, that simulates a time consuming measurement
last_scan_number = -1
while True:
if stopevent.isSet():
return
# Example measurement:
# Wait until Scan Number has increased.
scan_number = signalshark.scpi.spectrum.get_data_update(last_scan_number)
if scan_number < 0:
return wait_for_stopevent('Cannot acquire measurement data!', icon_style=wx.ICON_ERROR)
elif scan_number > last_scan_number:
last_scan_number = scan_number
else:
if signalshark.scpi.check_error():
return wait_for_stopevent(str(signalshark.scpi.error_str))
# Wait for new data
stopevent.wait(0.5)
continue
trace_data_list = []
for trace in trace_list:
trace_data_list.append([str(trace), signalshark.scpi.spectrum.get_data_level(trace)])
# Send new values to GUI.
update_progress(data=trace_data_list)
# Allow the thread to synchronize the stop event.
stopevent.wait(0.1)
except Exception as e:
# Do some error handling
return wait_for_stopevent(str(e), icon_style=wx.ICON_ERROR)
finally:
# Close connection.
signalshark.disconnect()
|