1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239 | #!/usr/bin/env python
##############################################################################
##
## Copyright (C) 2022 Narda Safety Test Solutions GmbH.
## Contact: www.narda-sts.com
##
## Redistribution and use in source and binary forms,
## with or without modification, are permitted provided that the following
## conditions are met:
##
## 1.) Redistributions of source code must retain the above copyright notice,
## this list of conditions and the following disclaimer.
##
## 2.) Redistributions in binary form must reproduce the above copyright
## notice, this list of conditions and the following disclaimer in the
## documentation and/or other materials provided with the distribution.
##
## 3.) Neither the name of the copyright holder nor the names of its
## contributors may be used to endorse or promote products derived from this
## software without specific prior written permission.
##
## 4.) EXPORT CONTROL
## The Software, including technical data /cryptographic software,
## may be subject to, German, European Union and U.S. export controls and
## may be subject to import or export controls in other countries.
## The Licensee agrees to strictly comply with all applicable import and
## export regulations. He specifically agrees, that he must not disclose or
## otherwise export or re-export the Licensed Software or any part thereof
## delivered under this end user license agreement (EULA) to any country
## (including a national or resident of such country) without a valid export
## or import license. Please be aware that the Software may contain
## US-Content, therefor the Licensee represent and warrant that he is not a
## citizen of, or otherwise located within, an embargoed nation (including
## without limitation Iran, Syria, Sudan, Cuba, North Korea) and that he is
## not otherwise prohibited under the Export Laws from receiving the Software.
## All rights to Use the Software are granted on condition that such rights
## are forfeited if the Licensee fails to comply with
## the terms of this Agreement.
##
## 5.) SEVERABILITY
## Should any provision of this Agreement be or become invalid, ineffective
## or unenforceable, the remaining provisions of this Agreement shall be
## valid. The parties agree to replace the invalid, ineffective or
## unenforceable provision by a valid, effective and enforceable provision
## which best meets the commercial intention of the parties.
## The same shall apply in case of omissions.
##
## 6.) APPLICABLE LAW AND PLACE OF JURISTICATION
## This Agreement shall be constituted under the law of Germany.
## The United Nations Convention on the International Sale of Goods
## shall not apply to this Agreement. The Place of Jurisdiction for any
## dispute between the Parties shall be Tübingen (Germany).
##
## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
## “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
## TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
## PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Narda Safety Test Solutions GmbH
## BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
## CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
## SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
## INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
## CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
## ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
## POSSIBILITY OF SUCH DAMAGE.
##
##############################################################################
from pathlib import Path
import wx
from nardascripting.base.usrscriptbase import *
from nardascripting.base.signalsharkdev import *
from nardascripting.base.measthread import MeasDlgThread
import nardascripting.base.toolbox as tb
class PeakTrigger(UsrScriptBase):
"""User script class"""
def __init__(self, main_gui, dev=SignalSharkDev()):
"""Initialization. Please leave this line unchanged"""
super().__init__(main_gui, dev, __file__)
# Script settings.
# -------------------------------------------------------------------------------
# Please adapt the following lines of code according to your script.
self._tab_name = 'Examples'
self._scr_title = 'Example02 - Peak Trigger'
self._scr_description = 'Monitors the spectrum and shows a message if a signal exceeds a certain level.'
# self._icon_path = self.script_path.joinpath('NardaScriptLauncher_example01.png')
self._list_prio = 2
self._nsl_executed_behavior = NSL_Executed_Behaviors.MINIMIZE_NSL
# Add class variables if needed
# -------------------------------------------------------------------------------
# self.my_variable = None
def _run_script(self, args):
""" Script main function
This method is called when a user clicks on the corresponding script button.
"""
# Read the default trigger value
sig_tresh = self.config.get('sig_tresh', -20)
# Setup and show trigger level dialog
value_list = []
for i in range(-100, 0, 10):
value_list.append(str(i))
dlg = tb.ListMsgBox(self.frm_dlghelper, 'Trigger level: ',
value_list, ' dBm', str(sig_tresh), 'Title')
if self.ShowDlgModalTop(dlg) == wx.ID_OK:
sig_tresh = float(dlg.value)
self.config['sig_tresh'] = sig_tresh
else:
dlg.Destroy() # Please do not forget to destroy the created dialog if you do not need it anymore
# User has canceled selection
return
dlg.Destroy() # Please do not forget to destroy the created dialog if you do not need it anymore
# Connect to the device
if not self.signalshark.connect():
self.MessageBoxModal('Cannot connect to the device!', 'Connection error',
wx.OK | wx.ICON_ERROR)
return
# Clear SCPI error queue
self.signalshark.scpi.check_error()
# Add a RT Spectrum Task if it does not already exist.
if not self.signalshark.scpi.check_add_task(TaskTypes.RT_SPECTRUM):
self.MessageBoxModal('Please configure the newly added task (e.g. frequency range)', 'New Task',
wx.OK | wx.ICON_INFORMATION)
self.signalshark.disconnect()
return
# Configure the task regarding the trigger function
self.signalshark.scpi.display.set_unit(LevelUnits.dBm)
self.signalshark.scpi.spectrum.set_trace_enable(SpecTraceTypes.MxP, True)
# Add a Peak Table to monitor the spectrum if it does not already exist and configure it
exists, view_index = self.signalshark.scpi.check_add_view(ViewTypes.PEAK_TABLE)
if view_index < 1:
self.MessageBoxModal('Adding peak table not possible.',
'Peak Table', wx.OK | wx.ICON_ERROR)
return
# Configure the peaktable if it did not already exist
if not exists:
self.signalshark.scpi.display.set_peaktable_sort(PeakSort.FREQUENCY)
self.signalshark.scpi.marker.set_spectrum_search_peak_excursion(3.0)
self.signalshark.scpi.marker.set_spectrum_search_peak_excursion_enable(True)
# Configure the Peak Table for the new trigger level
self.signalshark.scpi.marker.set_spectrum_search_threshold(sig_tresh)
self.signalshark.scpi.marker.set_spectrum_search_limits_enable(True)
# Disconnect from the device
self.signalshark.disconnect()
# Create an additional measurement thread to keep the GUI alive.
# Please change 'MyUserScript._run_measurement' according to your class name.
# With 'args=[param1, parm2, paramn]' you can pass some optional parameters to the measurement thread.
mthread = MeasDlgThread(main_gui=self.main_gui, callback=PeakTrigger._run_measurement,
args=[self.signalshark.addr])
# Start the measurement thread and show a visualization (progress) dialog.
mthread.start_measurement()
# The lines of code below will be executed after the measurement thread has done it's job.
# ...
# Output a status message
if mthread.result is not None:
self.MessageBoxModal('Signal frequency: ' + str(mthread.result),
'Peak Trigger Result', wx.OK | wx.ICON_INFORMATION)
@staticmethod
def _run_measurement(stopevent, update_progress, wait_for_stopevent,
signalshark_addr: str):
""" Main method of the measurement thread that handles a time consuming measurement.
Please do NOT use any wxPython elements like a MessageBox or a Dialog here!
They are only permitted in the GUI thread ('_run_script' method).
Please use the "update_progress" method to display information!
:param stopevent: Thread flag that indicates whether the procedure should be finished/cancelled
:param update_progress: Delegate method to update a progress bar, message string and icon
:param wait_for_stopevent: Delegate method to wait until stopevent is raised with possibility to show a message.
:param signalshark_addr: Custom parameter to handover the ip address of the SignalShark
:return:
"""
result = None
# Setup connection to the SignalShark and display error message if connection fails.
signalshark = SignalSharkDev(signalshark_addr)
if not signalshark.connect():
return wait_for_stopevent('Cannot connect to the device!') # Returns None
try:
# Update the dialog message text and show a progress bar
update_progress(msg='Waiting for trigger', show_progress=True)
# Wait until a signal has exceeded the trigger level or the user wants to cancel the process.
while True:
# Periodically check, whether the user has clicked the cancel button,
# and exit thread if so.
if stopevent.is_set():
return None
# Example measurement:
# Wait until Scan Number and Peak Table data count have been increased.
signalshark.scpi.run_cont()
scan_number = signalshark.scpi.peaktable.get_data_update()
if scan_number < 0:
return wait_for_stopevent('Cannot acquire measurement data!', icon_style=wx.ICON_ERROR)
if scan_number > 0:
signalshark.scpi.hold()
data_cnt = signalshark.scpi.peaktable.get_data_count()
if data_cnt > 0:
# Get signal frequency
freq_str_arr = signalshark.scpi.peaktable.get_data_frequency()
frequ_str = freq_str_arr[0]
result = round(float(frequ_str) / 1e6, 6)
break
# Allow the thread to synchronize the stop event.
stopevent.wait(0.05)
# Returns the signal frequency or None
return result
except Exception as e:
# Do some error handling
return wait_for_stopevent(str(e), icon_style=wx.ICON_ERROR) # Returns None
finally:
# Close connection.
signalshark.disconnect()
|