Separate backends into independent files

This commit is contained in:
Michal Krenek (Mikos) 2017-02-14 23:01:15 +01:00
parent d351a858da
commit e9264886e8
8 changed files with 727 additions and 607 deletions

View File

@ -4,8 +4,8 @@ import sys, signal, time
from PyQt4 import QtCore, QtGui from PyQt4 import QtCore, QtGui
from qspectrumanalyzer import backends
from qspectrumanalyzer.version import __version__ from qspectrumanalyzer.version import __version__
from qspectrumanalyzer.backend import RtlPowerThread, RtlPowerFftwThread, SoapyPowerThread, RxPowerThread, HackRFSweepThread
from qspectrumanalyzer.data import DataStorage from qspectrumanalyzer.data import DataStorage
from qspectrumanalyzer.plot import SpectrumPlotWidget, WaterfallPlotWidget from qspectrumanalyzer.plot import SpectrumPlotWidget, WaterfallPlotWidget
from qspectrumanalyzer.utils import color_to_str, str_to_color from qspectrumanalyzer.utils import color_to_str, str_to_color
@ -33,9 +33,21 @@ class QSpectrumAnalyzerSettings(QtGui.QDialog, Ui_QSpectrumAnalyzerSettings):
self.executableEdit.setText(settings.value("executable", "soapy_power")) self.executableEdit.setText(settings.value("executable", "soapy_power"))
self.waterfallHistorySizeSpinBox.setValue(settings.value("waterfall_history_size", 100, int)) self.waterfallHistorySizeSpinBox.setValue(settings.value("waterfall_history_size", 100, int))
self.deviceEdit.setText(settings.value("device", "")) self.deviceEdit.setText(settings.value("device", ""))
self.sampleRateSpinBox.setValue(settings.value("sample_rate", 2560000, int))
backend = settings.value("backend", "soapy_power") backend = settings.value("backend", "soapy_power")
try:
backend_module = getattr(backends, backend)
except AttributeError:
backend_module = backends.soapy_power
self.sampleRateSpinBox.setMinimum(backend_module.Info.sample_rate_min)
self.sampleRateSpinBox.setMaximum(backend_module.Info.sample_rate_max)
self.sampleRateSpinBox.setValue(settings.value("sample_rate", backend_module.Info.sample_rate, int))
self.backendComboBox.clear()
for b in sorted(backends.__all__):
self.backendComboBox.addItem(b)
self.backendComboBox.blockSignals(True) self.backendComboBox.blockSignals(True)
i = self.backendComboBox.findText(backend) i = self.backendComboBox.findText(backend)
if i == -1: if i == -1:
@ -57,14 +69,14 @@ class QSpectrumAnalyzerSettings(QtGui.QDialog, Ui_QSpectrumAnalyzerSettings):
self.executableEdit.setText(text) self.executableEdit.setText(text)
self.deviceEdit.setText("") self.deviceEdit.setText("")
if text == "hackrf_sweep": try:
self.sampleRateSpinBox.setMinimum(20000000) backend_module = getattr(backends, text)
self.sampleRateSpinBox.setMaximum(20000000) except AttributeError:
self.sampleRateSpinBox.setValue(20000000) backend_module = backends.soapy_power
else:
self.sampleRateSpinBox.setMinimum(0) self.sampleRateSpinBox.setMinimum(backend_module.Info.sample_rate_min)
self.sampleRateSpinBox.setMaximum(25000000) self.sampleRateSpinBox.setMaximum(backend_module.Info.sample_rate_max)
self.sampleRateSpinBox.setValue(2560000) self.sampleRateSpinBox.setValue(backend_module.Info.sample_rate)
def accept(self): def accept(self):
"""Save settings when dialog is accepted""" """Save settings when dialog is accepted"""
@ -196,38 +208,34 @@ class QSpectrumAnalyzerMainWindow(QtGui.QMainWindow, Ui_QSpectrumAnalyzerMainWin
self.data_storage.peak_hold_min_updated.connect(self.spectrumPlotWidget.update_peak_hold_min) self.data_storage.peak_hold_min_updated.connect(self.spectrumPlotWidget.update_peak_hold_min)
backend = settings.value("backend", "soapy_power") backend = settings.value("backend", "soapy_power")
if backend == "soapy_power": try:
self.power_thread = SoapyPowerThread(self.data_storage) backend_module = getattr(backends, backend)
elif backend == "rx_power": except AttributeError:
self.power_thread = RxPowerThread(self.data_storage) backend_module = backends.soapy_power
elif backend == "rtl_power_fftw":
self.power_thread = RtlPowerFftwThread(self.data_storage)
elif backend == "hackrf_sweep":
self.gainSpinBox.setMinimum(0)
self.gainSpinBox.setMaximum(102)
self.gainSpinBox.setValue(40)
self.startFreqSpinBox.setMinimum(0)
self.startFreqSpinBox.setMaximum(7230)
self.startFreqSpinBox.setValue(0)
self.stopFreqSpinBox.setMinimum(0)
self.stopFreqSpinBox.setMaximum(7250)
self.stopFreqSpinBox.setValue(6000)
self.binSizeSpinBox.setMinimum(40)
self.binSizeSpinBox.setMaximum(5000)
self.binSizeSpinBox.setValue(1000)
self.intervalSpinBox.setMinimum(0)
self.intervalSpinBox.setMaximum(0)
self.intervalSpinBox.setValue(0)
self.ppmSpinBox.setMinimum(0)
self.ppmSpinBox.setMaximum(0)
self.ppmSpinBox.setValue(0)
self.cropSpinBox.setMinimum(0)
self.cropSpinBox.setMaximum(0)
self.cropSpinBox.setValue(0)
self.power_thread = HackRFSweepThread(self.data_storage)
else:
self.power_thread = RtlPowerThread(self.data_storage)
self.gainSpinBox.setMinimum(backend_module.Info.gain_min)
self.gainSpinBox.setMaximum(backend_module.Info.gain_max)
self.gainSpinBox.setValue(backend_module.Info.gain)
self.startFreqSpinBox.setMinimum(backend_module.Info.start_freq_min)
self.startFreqSpinBox.setMaximum(backend_module.Info.start_freq_max)
self.startFreqSpinBox.setValue(backend_module.Info.start_freq)
self.stopFreqSpinBox.setMinimum(backend_module.Info.stop_freq_min)
self.stopFreqSpinBox.setMaximum(backend_module.Info.stop_freq_max)
self.stopFreqSpinBox.setValue(backend_module.Info.stop_freq)
self.binSizeSpinBox.setMinimum(backend_module.Info.bin_size_min)
self.binSizeSpinBox.setMaximum(backend_module.Info.bin_size_max)
self.binSizeSpinBox.setValue(backend_module.Info.bin_size)
self.intervalSpinBox.setMinimum(backend_module.Info.interval_min)
self.intervalSpinBox.setMaximum(backend_module.Info.interval_max)
self.intervalSpinBox.setValue(backend_module.Info.interval)
self.ppmSpinBox.setMinimum(backend_module.Info.ppm_min)
self.ppmSpinBox.setMaximum(backend_module.Info.ppm_max)
self.ppmSpinBox.setValue(backend_module.Info.ppm)
self.cropSpinBox.setMinimum(backend_module.Info.crop_min)
self.cropSpinBox.setMaximum(backend_module.Info.crop_max)
self.cropSpinBox.setValue(backend_module.Info.crop)
self.power_thread = backend_module.PowerThread(self.data_storage)
self.power_thread.powerThreadStarted.connect(self.update_buttons) self.power_thread.powerThreadStarted.connect(self.update_buttons)
self.power_thread.powerThreadStopped.connect(self.update_buttons) self.power_thread.powerThreadStopped.connect(self.update_buttons)

View File

@ -1,566 +0,0 @@
import subprocess, math, pprint, re
import numpy as np
from PyQt4 import QtCore
import struct
class BasePowerThread(QtCore.QThread):
"""Thread which runs Power Spectral Density acquisition and calculation process"""
powerThreadStarted = QtCore.pyqtSignal()
powerThreadStopped = QtCore.pyqtSignal()
def __init__(self, data_storage, parent=None):
super().__init__(parent)
self.data_storage = data_storage
self.alive = False
self.process = None
def stop(self):
"""Stop power process thread"""
self.process_stop()
self.alive = False
self.wait()
def setup(self, start_freq, stop_freq, bin_size, interval=10.0, gain=-1,
ppm=0, crop=0, single_shot=False, device=0, sample_rate=2560000):
"""Setup power process params"""
raise NotImplementedError
def process_start(self):
"""Start power process"""
raise NotImplementedError
def process_stop(self):
"""Terminate power process"""
if self.process:
try:
self.process.terminate()
except ProcessLookupError:
pass
self.process.wait()
self.process = None
def parse_output(self, line):
"""Parse one line of output from power process"""
raise NotImplementedError
def run(self):
"""Power process thread main loop"""
self.process_start()
self.alive = True
self.powerThreadStarted.emit()
for line in self.process.stdout:
if not self.alive:
break
self.parse_output(line)
self.process_stop()
self.alive = False
self.powerThreadStopped.emit()
class RxPowerThread(BasePowerThread):
"""Thread which runs rx_power process"""
def setup(self, start_freq, stop_freq, bin_size, interval=10.0, gain=-1,
ppm=0, crop=0, single_shot=False, device=0, sample_rate=2560000):
"""Setup rx_power params"""
self.params = {
"start_freq": start_freq,
"stop_freq": stop_freq,
"bin_size": bin_size,
"interval": interval,
"device": device,
"hops": 0,
"gain": gain,
"ppm": ppm,
"crop": crop,
"single_shot": single_shot
}
self.databuffer = {}
self.last_timestamp = ""
print("rx_power params:")
pprint.pprint(self.params)
print()
def process_start(self):
"""Start rx_power process"""
if not self.process and self.params:
settings = QtCore.QSettings()
cmdline = [
settings.value("executable", "rx_power"),
"-f", "{}M:{}M:{}k".format(self.params["start_freq"],
self.params["stop_freq"],
self.params["bin_size"]),
"-i", "{}".format(self.params["interval"]),
"-d", "{}".format(self.params["device"]),
"-p", "{}".format(self.params["ppm"]),
"-c", "{}".format(self.params["crop"])
]
if self.params["gain"] >= 0:
cmdline.extend(["-g", "{}".format(self.params["gain"])])
if self.params["single_shot"]:
cmdline.append("-1")
self.process = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
universal_newlines=True)
def parse_output(self, line):
"""Parse one line of output from rx_power"""
line = [col.strip() for col in line.split(",")]
timestamp = " ".join(line[:2])
start_freq = int(line[2])
stop_freq = int(line[3])
step = float(line[4])
samples = float(line[5])
x_axis = list(np.arange(start_freq, stop_freq, step))
y_axis = [float(y) for y in line[6:]]
if len(x_axis) != len(y_axis):
print("ERROR: len(x_axis) != len(y_axis), use newer version of rx_power!")
if len(x_axis) > len(y_axis):
print("Trimming x_axis...")
x_axis = x_axis[:len(y_axis)]
else:
print("Trimming y_axis...")
y_axis = y_axis[:len(x_axis)]
if timestamp != self.last_timestamp:
self.last_timestamp = timestamp
self.databuffer = {"timestamp": timestamp,
"x": x_axis,
"y": y_axis}
else:
self.databuffer["x"].extend(x_axis)
self.databuffer["y"].extend(y_axis)
# This have to be stupid like this to be compatible with old broken version of rx_power. Right way is:
# if stop_freq == self.params["stop_freq"] * 1e6:
if stop_freq > (self.params["stop_freq"] * 1e6) - step:
self.data_storage.update(self.databuffer)
class RtlPowerThread(BasePowerThread):
"""Thread which runs rtl_power process"""
def setup(self, start_freq, stop_freq, bin_size, interval=10.0, gain=-1,
ppm=0, crop=0, single_shot=False, device=0, sample_rate=2560000):
"""Setup rtl_power params"""
if bin_size > 2800:
bin_size = 2800
self.params = {
"start_freq": start_freq,
"stop_freq": stop_freq,
"bin_size": bin_size,
"interval": interval,
"device": device,
"hops": 0,
"gain": gain,
"ppm": ppm,
"crop": crop,
"single_shot": single_shot
}
self.databuffer = {}
self.last_timestamp = ""
print("rtl_power params:")
pprint.pprint(self.params)
print()
def process_start(self):
"""Start rtl_power process"""
if not self.process and self.params:
settings = QtCore.QSettings()
cmdline = [
settings.value("executable", "rtl_power"),
"-f", "{}M:{}M:{}k".format(self.params["start_freq"],
self.params["stop_freq"],
self.params["bin_size"]),
"-i", "{}".format(self.params["interval"]),
"-d", "{}".format(self.params["device"]),
"-p", "{}".format(self.params["ppm"]),
"-c", "{}".format(self.params["crop"])
]
if self.params["gain"] >= 0:
cmdline.extend(["-g", "{}".format(self.params["gain"])])
if self.params["single_shot"]:
cmdline.append("-1")
self.process = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
universal_newlines=True)
def parse_output(self, line):
"""Parse one line of output from rtl_power"""
line = [col.strip() for col in line.split(",")]
timestamp = " ".join(line[:2])
start_freq = int(line[2])
stop_freq = int(line[3])
step = float(line[4])
samples = float(line[5])
x_axis = list(np.arange(start_freq, stop_freq, step))
y_axis = [float(y) for y in line[6:]]
if len(x_axis) != len(y_axis):
print("ERROR: len(x_axis) != len(y_axis), use newer version of rtl_power!")
if len(x_axis) > len(y_axis):
print("Trimming x_axis...")
x_axis = x_axis[:len(y_axis)]
else:
print("Trimming y_axis...")
y_axis = y_axis[:len(x_axis)]
if timestamp != self.last_timestamp:
self.last_timestamp = timestamp
self.databuffer = {"timestamp": timestamp,
"x": x_axis,
"y": y_axis}
else:
self.databuffer["x"].extend(x_axis)
self.databuffer["y"].extend(y_axis)
# This have to be stupid like this to be compatible with old broken version of rtl_power. Right way is:
# if stop_freq == self.params["stop_freq"] * 1e6:
if stop_freq > (self.params["stop_freq"] * 1e6) - step:
self.data_storage.update(self.databuffer)
class RtlPowerFftwThread(BasePowerThread):
"""Thread which runs rtl_power_fftw process"""
def setup(self, start_freq, stop_freq, bin_size, interval=10.0, gain=-1,
ppm=0, crop=0, single_shot=False, device=0, sample_rate=2560000):
"""Setup rtl_power_fftw params"""
crop = crop * 100
overlap = crop * 2
freq_range = stop_freq * 1e6 - start_freq * 1e6
min_overhang = sample_rate * overlap * 0.01
hops = math.ceil((freq_range - min_overhang) / (sample_rate - min_overhang))
overhang = (hops * sample_rate - freq_range) / (hops - 1) if hops > 1 else 0
if bin_size > 2800:
bin_size = 2800
bins = math.ceil(sample_rate / (bin_size * 1e3))
crop_freq = sample_rate * crop * 0.01
self.params = {
"start_freq": start_freq,
"stop_freq": stop_freq,
"freq_range": freq_range,
"device": device,
"sample_rate": sample_rate,
"bin_size": bin_size,
"bins": bins,
"interval": interval,
"hops": hops,
"time": interval / hops,
"gain": gain * 10,
"ppm": ppm,
"crop": crop,
"overlap": overlap,
"min_overhang": min_overhang,
"overhang": overhang,
"single_shot": single_shot
}
self.freqs = [self.get_hop_freq(hop) for hop in range(hops)]
self.freqs_crop = [(f[0] + crop_freq, f[1] - crop_freq) for f in self.freqs]
self.databuffer = {"timestamp": [], "x": [], "y": []}
self.databuffer_hop = {"timestamp": [], "x": [], "y": []}
self.hop = 0
self.prev_line = ""
print("rtl_power_fftw params:")
pprint.pprint(self.params)
print()
def get_hop_freq(self, hop):
"""Get start and stop frequency for particular hop"""
start_freq = self.params["start_freq"] * 1e6 + (self.params["sample_rate"] - self.params["overhang"]) * hop
stop_freq = start_freq + self.params["sample_rate"] - (self.params["sample_rate"] / self.params["bins"])
return (start_freq, stop_freq)
def process_start(self):
"""Start rtl_power_fftw process"""
if not self.process and self.params:
settings = QtCore.QSettings()
cmdline = [
settings.value("executable", "rtl_power_fftw"),
"-f", "{}M:{}M".format(self.params["start_freq"],
self.params["stop_freq"]),
"-b", "{}".format(self.params["bins"]),
"-t", "{}".format(self.params["time"]),
"-d", "{}".format(self.params["device"]),
"-r", "{}".format(self.params["sample_rate"]),
"-p", "{}".format(self.params["ppm"]),
]
if self.params["gain"] >= 0:
cmdline.extend(["-g", "{}".format(self.params["gain"])])
if self.params["overlap"] > 0:
cmdline.extend(["-o", "{}".format(self.params["overlap"])])
if not self.params["single_shot"]:
cmdline.append("-c")
self.process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
universal_newlines=True)
def parse_output(self, line):
"""Parse one line of output from rtl_power_fftw"""
line = line.strip()
# One empty line => new hop
if not line and self.prev_line:
self.hop += 1
self.databuffer["x"].extend(self.databuffer_hop["x"])
self.databuffer["y"].extend(self.databuffer_hop["y"])
self.databuffer_hop = {"timestamp": [], "x": [], "y": []}
# Two empty lines => new set
elif not line and not self.prev_line:
self.hop = 0
self.data_storage.update(self.databuffer)
self.databuffer = {"timestamp": [], "x": [], "y": []}
# Get timestamp for new hop and set
elif line.startswith("# Acquisition start:"):
timestamp = line.split(":", 1)[1].strip()
if not self.databuffer_hop["timestamp"]:
self.databuffer_hop["timestamp"] = timestamp
if not self.databuffer["timestamp"]:
self.databuffer["timestamp"] = timestamp
# Skip other comments
elif line.startswith("#"):
pass
# Parse frequency and power
elif line[0].isdigit():
freq, power = line.split()
freq, power = float(freq), float(power)
start_freq, stop_freq = self.freqs_crop[self.hop]
# Apply cropping
if freq >= start_freq and freq <= stop_freq:
# Skip overlapping frequencies
if not self.databuffer["x"] or freq > self.databuffer["x"][-1]:
#print(" {:.3f} MHz".format(freq / 1e6))
self.databuffer_hop["x"].append(freq)
self.databuffer_hop["y"].append(power)
else:
#print(" Overlapping {:.3f} MHz".format(freq / 1e6))
pass
else:
#print(" Cropping {:.3f} MHz".format(freq / 1e6))
pass
self.prev_line = line
class SoapyPowerThread(BasePowerThread):
"""Thread which runs soapy_power process"""
re_two_floats = re.compile(r'^[-+\d.eE]+\s+[-+\d.eE]+$')
def setup(self, start_freq, stop_freq, bin_size, interval=10.0, gain=-1,
ppm=0, crop=0, single_shot=False, device="", sample_rate=2560000):
"""Setup soapy_power params"""
self.params = {
"start_freq": start_freq,
"stop_freq": stop_freq,
"device": device,
"sample_rate": sample_rate,
"bin_size": bin_size,
"interval": interval,
"hops": 0,
"gain": gain * 10,
"ppm": ppm,
"crop": crop * 100,
"single_shot": single_shot
}
self.databuffer = {"timestamp": [], "x": [], "y": []}
self.databuffer_hop = {"timestamp": [], "x": [], "y": []}
self.hop = 0
self.run = 0
self.prev_line = ""
print("soapy_power params:")
pprint.pprint(self.params)
print()
def process_start(self):
"""Start soapy_power process"""
if not self.process and self.params:
settings = QtCore.QSettings()
cmdline = [
settings.value("executable", "soapy_power"),
"-f", "{}M:{}M".format(self.params["start_freq"],
self.params["stop_freq"]),
"-B", "{}k".format(self.params["bin_size"]),
"-T", "{}".format(self.params["interval"]),
"-d", "{}".format(self.params["device"]),
"-r", "{}".format(self.params["sample_rate"]),
"-p", "{}".format(self.params["ppm"]),
]
if self.params["gain"] >= 0:
cmdline.extend(["-g", "{}".format(self.params["gain"])])
if self.params["crop"] > 0:
cmdline.extend(["-k", "{}".format(self.params["crop"])])
if not self.params["single_shot"]:
cmdline.append("-c")
self.process = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
universal_newlines=True)
def parse_output(self, line):
"""Parse one line of output from soapy_power"""
line = line.strip()
# One empty line => new hop
if not line and self.prev_line:
self.hop += 1
print(' => HOP:', self.hop)
self.databuffer["x"].extend(self.databuffer_hop["x"])
self.databuffer["y"].extend(self.databuffer_hop["y"])
self.databuffer_hop = {"timestamp": [], "x": [], "y": []}
# Two empty lines => new set
elif not line and not self.prev_line:
self.hop = 0
self.run += 1
print(' * RUN:', self.run)
self.data_storage.update(self.databuffer)
self.databuffer = {"timestamp": [], "x": [], "y": []}
# Get timestamp for new hop and set
elif line.startswith("# Acquisition start:"):
timestamp = line.split(":", 1)[1].strip()
if not self.databuffer_hop["timestamp"]:
self.databuffer_hop["timestamp"] = timestamp
if not self.databuffer["timestamp"]:
self.databuffer["timestamp"] = timestamp
# Skip other comments
elif line.startswith("#"):
pass
# Parse frequency and power
elif self.re_two_floats.match(line):
try:
freq, power = line.split()
except ValueError:
return
freq, power = float(freq), float(power)
self.databuffer_hop["x"].append(freq)
self.databuffer_hop["y"].append(power)
self.prev_line = line
class HackRFSweepThread(BasePowerThread):
"""Thread which runs hackrf_sweep process"""
def setup(self, start_freq=0, stop_freq=6000, bin_size=1000,
interval=0.0, gain=40, ppm=0, crop=0, single_shot=False,
device=0, sample_rate=20000000):
"""Setup hackrf_sweep params"""
# theoretically we can support bins smaller than 40 kHz, but it is
# unlikely to result in acceptable performance
if bin_size < 40:
bin_size = 40
if bin_size > 5000:
bin_size = 5000
# We only support whole numbers of steps with bandwidth equal to the
# sample rate.
step_bandwidth = sample_rate / 1000000
total_bandwidth = stop_freq - start_freq
step_count = 1 + (total_bandwidth - 1) // step_bandwidth
total_bandwidth = step_count * step_bandwidth
stop_freq = start_freq + total_bandwidth
# distribute gain between two analog gain stages
if gain < 0:
gain = 0
if gain > 102:
gain = 102
lna_gain = 8 * (gain // 18)
vga_gain = 2 * ((gain - lna_gain) // 2)
self.params = {
"start_freq": start_freq, # MHz
"stop_freq": stop_freq, # MHz
"hops": 0,
"device": 0,
"sample_rate": 20e6, # sps
"bin_size": bin_size, # kHz
"interval": 0, # seconds
"gain": gain,
"lna_gain": lna_gain,
"vga_gain": vga_gain,
"ppm": 0,
"crop": 0,
"single_shot": single_shot
}
self.databuffer = {"timestamp": [], "x": [], "y": []}
print("hackrf_sweep params:")
pprint.pprint(self.params)
print()
def process_start(self):
"""Start hackrf_sweep process"""
if not self.process and self.params:
settings = QtCore.QSettings()
cmdline = [
settings.value("executable", "hackrf_sweep"),
"-f", "{}:{}".format(int(self.params["start_freq"]),
int(self.params["stop_freq"])),
"-B",
"-w", "{}".format(int(self.params["bin_size"]*1000)),
"-l", "{}".format(int(self.params["lna_gain"])),
"-g", "{}".format(int(self.params["vga_gain"])),
]
if self.params["single_shot"]:
cmdline.append("-1")
self.process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
universal_newlines=False)
def parse_output(self, buf):
"""Parse one buf of output from hackrf_sweep"""
(low_edge, high_edge) = struct.unpack('QQ', buf[:16])
data = np.fromstring(buf[16:], dtype='<f4')
step = (high_edge - low_edge) / len(data)
if (low_edge//1000000) <= (self.params["start_freq"]):
# Reset databuffer at the start of each sweep even if we somehow
# did not complete the previous sweep.
self.databuffer = {"timestamp": [], "x": [], "y": []}
x_axis = list(np.arange(low_edge + step/2, high_edge, step))
self.databuffer["x"].extend(x_axis)
for i in range(len(data)):
self.databuffer["y"].append(data[i])
if (high_edge / 1e6) >= (self.params["stop_freq"]):
# We've reached the end of a pass, so sort and display it.
sorted_data = sorted(zip(self.databuffer["x"], self.databuffer["y"]))
self.databuffer["x"], self.databuffer["y"] = [list(x) for x in zip(*sorted_data)]
self.data_storage.update(self.databuffer)
def run(self):
"""hackrf_sweep thread main loop"""
self.process_start()
self.alive = True
self.powerThreadStarted.emit()
while self.alive:
buf = self.process.stdout.read(4)
if buf:
(record_length,) = struct.unpack('I', buf)
buf = self.process.stdout.read(record_length)
if buf:
self.parse_output(buf)
self.process_stop()
self.alive = False
self.powerThreadStopped.emit()

View File

@ -0,0 +1,96 @@
import os, glob
from PyQt4 import QtCore
class BaseInfo:
"""Default device metadata"""
sample_rate_min = 0
sample_rate_max = 61440000
sample_rate = 2560000
gain_min = -1
gain_max = 49
gain = -1
start_freq_min = 24
start_freq_max = 1766
start_freq = 87
stop_freq_min = 24
stop_freq_max = 1766
stop_freq = 108
bin_size_min = 0
bin_size_max = 2800
bin_size = 10
interval_min = 0
interval_max = 999
interval = 10
ppm_min = -999
ppm_max = 999
ppm = 0
crop_min = 0
crop_max = 99
crop = 0
class BasePowerThread(QtCore.QThread):
"""Thread which runs Power Spectral Density acquisition and calculation process"""
powerThreadStarted = QtCore.pyqtSignal()
powerThreadStopped = QtCore.pyqtSignal()
def __init__(self, data_storage, parent=None):
super().__init__(parent)
self.data_storage = data_storage
self.alive = False
self.process = None
def stop(self):
"""Stop power process thread"""
self.process_stop()
self.alive = False
self.wait()
def setup(self, start_freq, stop_freq, bin_size, interval=10.0, gain=-1,
ppm=0, crop=0, single_shot=False, device=0, sample_rate=2560000):
"""Setup power process params"""
raise NotImplementedError
def process_start(self):
"""Start power process"""
raise NotImplementedError
def process_stop(self):
"""Terminate power process"""
if self.process:
try:
self.process.terminate()
except ProcessLookupError:
pass
self.process.wait()
self.process = None
def parse_output(self, line):
"""Parse one line of output from power process"""
raise NotImplementedError
def run(self):
"""Power process thread main loop"""
self.process_start()
self.alive = True
self.powerThreadStarted.emit()
for line in self.process.stdout:
if not self.alive:
break
self.parse_output(line)
self.process_stop()
self.alive = False
self.powerThreadStopped.emit()
# Build list of all backends
_backends_files = glob.glob(os.path.join(os.path.dirname(__file__), "*.py"))
__all__ = [os.path.splitext(os.path.basename(f))[0] for f in _backends_files
if os.path.isfile(f) and not os.path.basename(f).startswith("_")]
# Import all backends
from qspectrumanalyzer.backends import *

View File

@ -0,0 +1,144 @@
import subprocess, pprint
import numpy as np
from PyQt4 import QtCore
import struct
from qspectrumanalyzer.backends import BaseInfo, BasePowerThread
class Info(BaseInfo):
"""hackrf_sweep device metadata"""
sample_rate_min = 20000000
sample_rate_max = 20000000
sample_rate = 20000000
gain_min = 0
gain_max = 102
gain = 40
start_freq_min = 0
start_freq_max = 7230
start_freq = 0
stop_freq_min = 0
stop_freq_max = 7250
stop_freq = 6000
bin_size_min = 40
bin_size_max = 5000
bin_size = 1000
interval_min = 0
interval_max = 0
interval = 0
ppm_min = 0
ppm_max = 0
ppm = 0
crop_min = 0
crop_max = 0
crop = 0
class PowerThread(BasePowerThread):
"""Thread which runs hackrf_sweep process"""
def setup(self, start_freq=0, stop_freq=6000, bin_size=1000,
interval=0.0, gain=40, ppm=0, crop=0, single_shot=False,
device=0, sample_rate=20000000):
"""Setup hackrf_sweep params"""
# theoretically we can support bins smaller than 40 kHz, but it is
# unlikely to result in acceptable performance
if bin_size < 40:
bin_size = 40
if bin_size > 5000:
bin_size = 5000
# We only support whole numbers of steps with bandwidth equal to the
# sample rate.
step_bandwidth = sample_rate / 1000000
total_bandwidth = stop_freq - start_freq
step_count = 1 + (total_bandwidth - 1) // step_bandwidth
total_bandwidth = step_count * step_bandwidth
stop_freq = start_freq + total_bandwidth
# distribute gain between two analog gain stages
if gain < 0:
gain = 0
if gain > 102:
gain = 102
lna_gain = 8 * (gain // 18)
vga_gain = 2 * ((gain - lna_gain) // 2)
self.params = {
"start_freq": start_freq, # MHz
"stop_freq": stop_freq, # MHz
"hops": 0,
"device": 0,
"sample_rate": 20e6, # sps
"bin_size": bin_size, # kHz
"interval": 0, # seconds
"gain": gain,
"lna_gain": lna_gain,
"vga_gain": vga_gain,
"ppm": 0,
"crop": 0,
"single_shot": single_shot
}
self.databuffer = {"timestamp": [], "x": [], "y": []}
print("hackrf_sweep params:")
pprint.pprint(self.params)
print()
def process_start(self):
"""Start hackrf_sweep process"""
if not self.process and self.params:
settings = QtCore.QSettings()
cmdline = [
settings.value("executable", "hackrf_sweep"),
"-f", "{}:{}".format(int(self.params["start_freq"]),
int(self.params["stop_freq"])),
"-B",
"-w", "{}".format(int(self.params["bin_size"] * 1000)),
"-l", "{}".format(int(self.params["lna_gain"])),
"-g", "{}".format(int(self.params["vga_gain"])),
]
if self.params["single_shot"]:
cmdline.append("-1")
self.process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
universal_newlines=False)
def parse_output(self, buf):
"""Parse one buf of output from hackrf_sweep"""
(low_edge, high_edge) = struct.unpack('QQ', buf[:16])
data = np.fromstring(buf[16:], dtype='<f4')
step = (high_edge - low_edge) / len(data)
if (low_edge // 1000000) <= (self.params["start_freq"]):
# Reset databuffer at the start of each sweep even if we somehow
# did not complete the previous sweep.
self.databuffer = {"timestamp": [], "x": [], "y": []}
x_axis = list(np.arange(low_edge + step / 2, high_edge, step))
self.databuffer["x"].extend(x_axis)
for i in range(len(data)):
self.databuffer["y"].append(data[i])
if (high_edge / 1e6) >= (self.params["stop_freq"]):
# We've reached the end of a pass, so sort and display it.
sorted_data = sorted(zip(self.databuffer["x"], self.databuffer["y"]))
self.databuffer["x"], self.databuffer["y"] = [list(x) for x in zip(*sorted_data)]
self.data_storage.update(self.databuffer)
def run(self):
"""hackrf_sweep thread main loop"""
self.process_start()
self.alive = True
self.powerThreadStarted.emit()
while self.alive:
buf = self.process.stdout.read(4)
if buf:
(record_length,) = struct.unpack('I', buf)
buf = self.process.stdout.read(record_length)
if buf:
self.parse_output(buf)
self.process_stop()
self.alive = False
self.powerThreadStopped.emit()

View File

@ -0,0 +1,95 @@
import subprocess, pprint
import numpy as np
from PyQt4 import QtCore
from qspectrumanalyzer.backends import BaseInfo, BasePowerThread
class Info(BaseInfo):
"""rtl_power device metadata"""
pass
class PowerThread(BasePowerThread):
"""Thread which runs rtl_power process"""
def setup(self, start_freq, stop_freq, bin_size, interval=10.0, gain=-1,
ppm=0, crop=0, single_shot=False, device=0, sample_rate=2560000):
"""Setup rtl_power params"""
if bin_size > 2800:
bin_size = 2800
self.params = {
"start_freq": start_freq,
"stop_freq": stop_freq,
"bin_size": bin_size,
"interval": interval,
"device": device,
"hops": 0,
"gain": gain,
"ppm": ppm,
"crop": crop,
"single_shot": single_shot
}
self.databuffer = {}
self.last_timestamp = ""
print("rtl_power params:")
pprint.pprint(self.params)
print()
def process_start(self):
"""Start rtl_power process"""
if not self.process and self.params:
settings = QtCore.QSettings()
cmdline = [
settings.value("executable", "rtl_power"),
"-f", "{}M:{}M:{}k".format(self.params["start_freq"],
self.params["stop_freq"],
self.params["bin_size"]),
"-i", "{}".format(self.params["interval"]),
"-d", "{}".format(self.params["device"]),
"-p", "{}".format(self.params["ppm"]),
"-c", "{}".format(self.params["crop"])
]
if self.params["gain"] >= 0:
cmdline.extend(["-g", "{}".format(self.params["gain"])])
if self.params["single_shot"]:
cmdline.append("-1")
self.process = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
universal_newlines=True)
def parse_output(self, line):
"""Parse one line of output from rtl_power"""
line = [col.strip() for col in line.split(",")]
timestamp = " ".join(line[:2])
start_freq = int(line[2])
stop_freq = int(line[3])
step = float(line[4])
samples = float(line[5])
x_axis = list(np.arange(start_freq, stop_freq, step))
y_axis = [float(y) for y in line[6:]]
if len(x_axis) != len(y_axis):
print("ERROR: len(x_axis) != len(y_axis), use newer version of rtl_power!")
if len(x_axis) > len(y_axis):
print("Trimming x_axis...")
x_axis = x_axis[:len(y_axis)]
else:
print("Trimming y_axis...")
y_axis = y_axis[:len(x_axis)]
if timestamp != self.last_timestamp:
self.last_timestamp = timestamp
self.databuffer = {"timestamp": timestamp,
"x": x_axis,
"y": y_axis}
else:
self.databuffer["x"].extend(x_axis)
self.databuffer["y"].extend(y_axis)
# This have to be stupid like this to be compatible with old broken version of rtl_power. Right way is:
# if stop_freq == self.params["stop_freq"] * 1e6:
if stop_freq > (self.params["stop_freq"] * 1e6) - step:
self.data_storage.update(self.databuffer)

View File

@ -0,0 +1,139 @@
import subprocess, math, pprint
from PyQt4 import QtCore
from qspectrumanalyzer.backends import BaseInfo, BasePowerThread
class Info(BaseInfo):
"""rtl_power_fftw device metadata"""
pass
class PowerThread(BasePowerThread):
"""Thread which runs rtl_power_fftw process"""
def setup(self, start_freq, stop_freq, bin_size, interval=10.0, gain=-1,
ppm=0, crop=0, single_shot=False, device=0, sample_rate=2560000):
"""Setup rtl_power_fftw params"""
crop = crop * 100
overlap = crop * 2
freq_range = stop_freq * 1e6 - start_freq * 1e6
min_overhang = sample_rate * overlap * 0.01
hops = math.ceil((freq_range - min_overhang) / (sample_rate - min_overhang))
overhang = (hops * sample_rate - freq_range) / (hops - 1) if hops > 1 else 0
if bin_size > 2800:
bin_size = 2800
bins = math.ceil(sample_rate / (bin_size * 1e3))
crop_freq = sample_rate * crop * 0.01
self.params = {
"start_freq": start_freq,
"stop_freq": stop_freq,
"freq_range": freq_range,
"device": device,
"sample_rate": sample_rate,
"bin_size": bin_size,
"bins": bins,
"interval": interval,
"hops": hops,
"time": interval / hops,
"gain": gain * 10,
"ppm": ppm,
"crop": crop,
"overlap": overlap,
"min_overhang": min_overhang,
"overhang": overhang,
"single_shot": single_shot
}
self.freqs = [self.get_hop_freq(hop) for hop in range(hops)]
self.freqs_crop = [(f[0] + crop_freq, f[1] - crop_freq) for f in self.freqs]
self.databuffer = {"timestamp": [], "x": [], "y": []}
self.databuffer_hop = {"timestamp": [], "x": [], "y": []}
self.hop = 0
self.prev_line = ""
print("rtl_power_fftw params:")
pprint.pprint(self.params)
print()
def get_hop_freq(self, hop):
"""Get start and stop frequency for particular hop"""
start_freq = self.params["start_freq"] * 1e6 + (self.params["sample_rate"] - self.params["overhang"]) * hop
stop_freq = start_freq + self.params["sample_rate"] - (self.params["sample_rate"] / self.params["bins"])
return (start_freq, stop_freq)
def process_start(self):
"""Start rtl_power_fftw process"""
if not self.process and self.params:
settings = QtCore.QSettings()
cmdline = [
settings.value("executable", "rtl_power_fftw"),
"-f", "{}M:{}M".format(self.params["start_freq"],
self.params["stop_freq"]),
"-b", "{}".format(self.params["bins"]),
"-t", "{}".format(self.params["time"]),
"-d", "{}".format(self.params["device"]),
"-r", "{}".format(self.params["sample_rate"]),
"-p", "{}".format(self.params["ppm"]),
]
if self.params["gain"] >= 0:
cmdline.extend(["-g", "{}".format(self.params["gain"])])
if self.params["overlap"] > 0:
cmdline.extend(["-o", "{}".format(self.params["overlap"])])
if not self.params["single_shot"]:
cmdline.append("-c")
self.process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
universal_newlines=True)
def parse_output(self, line):
"""Parse one line of output from rtl_power_fftw"""
line = line.strip()
# One empty line => new hop
if not line and self.prev_line:
self.hop += 1
self.databuffer["x"].extend(self.databuffer_hop["x"])
self.databuffer["y"].extend(self.databuffer_hop["y"])
self.databuffer_hop = {"timestamp": [], "x": [], "y": []}
# Two empty lines => new set
elif not line and not self.prev_line:
self.hop = 0
self.data_storage.update(self.databuffer)
self.databuffer = {"timestamp": [], "x": [], "y": []}
# Get timestamp for new hop and set
elif line.startswith("# Acquisition start:"):
timestamp = line.split(":", 1)[1].strip()
if not self.databuffer_hop["timestamp"]:
self.databuffer_hop["timestamp"] = timestamp
if not self.databuffer["timestamp"]:
self.databuffer["timestamp"] = timestamp
# Skip other comments
elif line.startswith("#"):
pass
# Parse frequency and power
elif line[0].isdigit():
freq, power = line.split()
freq, power = float(freq), float(power)
start_freq, stop_freq = self.freqs_crop[self.hop]
# Apply cropping
if freq >= start_freq and freq <= stop_freq:
# Skip overlapping frequencies
if not self.databuffer["x"] or freq > self.databuffer["x"][-1]:
#print(" {:.3f} MHz".format(freq / 1e6))
self.databuffer_hop["x"].append(freq)
self.databuffer_hop["y"].append(power)
else:
#print(" Overlapping {:.3f} MHz".format(freq / 1e6))
pass
else:
#print(" Cropping {:.3f} MHz".format(freq / 1e6))
pass
self.prev_line = line

View File

@ -0,0 +1,93 @@
import subprocess, pprint
import numpy as np
from PyQt4 import QtCore
from qspectrumanalyzer.backends import BaseInfo, BasePowerThread
class Info(BaseInfo):
"""rx_power device metadata"""
pass
class PowerThread(BasePowerThread):
"""Thread which runs rx_power process"""
def setup(self, start_freq, stop_freq, bin_size, interval=10.0, gain=-1,
ppm=0, crop=0, single_shot=False, device=0, sample_rate=2560000):
"""Setup rx_power params"""
self.params = {
"start_freq": start_freq,
"stop_freq": stop_freq,
"bin_size": bin_size,
"interval": interval,
"device": device,
"hops": 0,
"gain": gain,
"ppm": ppm,
"crop": crop,
"single_shot": single_shot
}
self.databuffer = {}
self.last_timestamp = ""
print("rx_power params:")
pprint.pprint(self.params)
print()
def process_start(self):
"""Start rx_power process"""
if not self.process and self.params:
settings = QtCore.QSettings()
cmdline = [
settings.value("executable", "rx_power"),
"-f", "{}M:{}M:{}k".format(self.params["start_freq"],
self.params["stop_freq"],
self.params["bin_size"]),
"-i", "{}".format(self.params["interval"]),
"-d", "{}".format(self.params["device"]),
"-p", "{}".format(self.params["ppm"]),
"-c", "{}".format(self.params["crop"])
]
if self.params["gain"] >= 0:
cmdline.extend(["-g", "{}".format(self.params["gain"])])
if self.params["single_shot"]:
cmdline.append("-1")
self.process = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
universal_newlines=True)
def parse_output(self, line):
"""Parse one line of output from rx_power"""
line = [col.strip() for col in line.split(",")]
timestamp = " ".join(line[:2])
start_freq = int(line[2])
stop_freq = int(line[3])
step = float(line[4])
samples = float(line[5])
x_axis = list(np.arange(start_freq, stop_freq, step))
y_axis = [float(y) for y in line[6:]]
if len(x_axis) != len(y_axis):
print("ERROR: len(x_axis) != len(y_axis), use newer version of rx_power!")
if len(x_axis) > len(y_axis):
print("Trimming x_axis...")
x_axis = x_axis[:len(y_axis)]
else:
print("Trimming y_axis...")
y_axis = y_axis[:len(x_axis)]
if timestamp != self.last_timestamp:
self.last_timestamp = timestamp
self.databuffer = {"timestamp": timestamp,
"x": x_axis,
"y": y_axis}
else:
self.databuffer["x"].extend(x_axis)
self.databuffer["y"].extend(y_axis)
# This have to be stupid like this to be compatible with old broken version of rx_power. Right way is:
# if stop_freq == self.params["stop_freq"] * 1e6:
if stop_freq > (self.params["stop_freq"] * 1e6) - step:
self.data_storage.update(self.databuffer)

View File

@ -0,0 +1,111 @@
import subprocess, pprint, re
from PyQt4 import QtCore
from qspectrumanalyzer.backends import BaseInfo, BasePowerThread
class Info(BaseInfo):
"""soapy_power device metadata"""
pass
class PowerThread(BasePowerThread):
"""Thread which runs soapy_power process"""
re_two_floats = re.compile(r'^[-+\d.eE]+\s+[-+\d.eE]+$')
def setup(self, start_freq, stop_freq, bin_size, interval=10.0, gain=-1,
ppm=0, crop=0, single_shot=False, device="", sample_rate=2560000):
"""Setup soapy_power params"""
self.params = {
"start_freq": start_freq,
"stop_freq": stop_freq,
"device": device,
"sample_rate": sample_rate,
"bin_size": bin_size,
"interval": interval,
"hops": 0,
"gain": gain * 10,
"ppm": ppm,
"crop": crop * 100,
"single_shot": single_shot
}
self.databuffer = {"timestamp": [], "x": [], "y": []}
self.databuffer_hop = {"timestamp": [], "x": [], "y": []}
self.hop = 0
self.run = 0
self.prev_line = ""
print("soapy_power params:")
pprint.pprint(self.params)
print()
def process_start(self):
"""Start soapy_power process"""
if not self.process and self.params:
settings = QtCore.QSettings()
cmdline = [
settings.value("executable", "soapy_power"),
"-f", "{}M:{}M".format(self.params["start_freq"],
self.params["stop_freq"]),
"-B", "{}k".format(self.params["bin_size"]),
"-T", "{}".format(self.params["interval"]),
"-d", "{}".format(self.params["device"]),
"-r", "{}".format(self.params["sample_rate"]),
"-p", "{}".format(self.params["ppm"]),
]
if self.params["gain"] >= 0:
cmdline.extend(["-g", "{}".format(self.params["gain"])])
if self.params["crop"] > 0:
cmdline.extend(["-k", "{}".format(self.params["crop"])])
if not self.params["single_shot"]:
cmdline.append("-c")
self.process = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
universal_newlines=True)
def parse_output(self, line):
"""Parse one line of output from soapy_power"""
line = line.strip()
# One empty line => new hop
if not line and self.prev_line:
self.hop += 1
print(' => HOP:', self.hop)
self.databuffer["x"].extend(self.databuffer_hop["x"])
self.databuffer["y"].extend(self.databuffer_hop["y"])
self.databuffer_hop = {"timestamp": [], "x": [], "y": []}
# Two empty lines => new set
elif not line and not self.prev_line:
self.hop = 0
self.run += 1
print(' * RUN:', self.run)
self.data_storage.update(self.databuffer)
self.databuffer = {"timestamp": [], "x": [], "y": []}
# Get timestamp for new hop and set
elif line.startswith("# Acquisition start:"):
timestamp = line.split(":", 1)[1].strip()
if not self.databuffer_hop["timestamp"]:
self.databuffer_hop["timestamp"] = timestamp
if not self.databuffer["timestamp"]:
self.databuffer["timestamp"] = timestamp
# Skip other comments
elif line.startswith("#"):
pass
# Parse frequency and power
elif self.re_two_floats.match(line):
try:
freq, power = line.split()
except ValueError:
return
freq, power = float(freq), float(power)
self.databuffer_hop["x"].append(freq)
self.databuffer_hop["y"].append(power)
self.prev_line = line