Fix API for Windows

This commit is contained in:
Jan Käberich 2023-01-08 13:55:27 +01:00
parent 6aef3fe0ac
commit 8aec59f32b

249
Documentation/UserManual/SCPI_Examples/libreVNA.py Normal file → Executable file
View File

@ -1,124 +1,125 @@
import socket import socket
from asyncio import IncompleteReadError # only import the exception class from asyncio import IncompleteReadError # only import the exception class
import time import time
class SocketStreamReader: class SocketStreamReader:
def __init__(self, sock: socket.socket): def __init__(self, sock: socket.socket):
self._sock = sock self._sock = sock
self._recv_buffer = bytearray() self._sock.setblocking(0)
self.timeout = 1.0 self._recv_buffer = bytearray()
self.timeout = 1.0
def read(self, num_bytes: int = -1) -> bytes:
raise NotImplementedError def read(self, num_bytes: int = -1) -> bytes:
raise NotImplementedError
def readexactly(self, num_bytes: int) -> bytes:
buf = bytearray(num_bytes) def readexactly(self, num_bytes: int) -> bytes:
pos = 0 buf = bytearray(num_bytes)
while pos < num_bytes: pos = 0
n = self._recv_into(memoryview(buf)[pos:]) while pos < num_bytes:
if n == 0: n = self._recv_into(memoryview(buf)[pos:])
raise IncompleteReadError(bytes(buf[:pos]), num_bytes) if n == 0:
pos += n raise IncompleteReadError(bytes(buf[:pos]), num_bytes)
return bytes(buf) pos += n
return bytes(buf)
def readline(self) -> bytes:
return self.readuntil(b"\n") def readline(self) -> bytes:
return self.readuntil(b"\n")
def readuntil(self, separator: bytes = b"\n") -> bytes:
if len(separator) != 1: def readuntil(self, separator: bytes = b"\n") -> bytes:
raise ValueError("Only separators of length 1 are supported.") if len(separator) != 1:
raise ValueError("Only separators of length 1 are supported.")
chunk = bytearray(4096)
start = 0 chunk = bytearray(4096)
buf = bytearray(len(self._recv_buffer)) start = 0
bytes_read = self._recv_into(memoryview(buf)) buf = bytearray(len(self._recv_buffer))
assert bytes_read == len(buf) bytes_read = self._recv_into(memoryview(buf))
assert bytes_read == len(buf)
timeout = time.time() + self.timeout
while True: timeout = time.time() + self.timeout
idx = buf.find(separator, start) while True:
if idx != -1: idx = buf.find(separator, start)
break if idx != -1:
elif time.time() > timeout: break
raise Exception("Timed out waiting for response from GUI") elif time.time() > timeout:
raise Exception("Timed out waiting for response from GUI")
start = len(self._recv_buffer)
bytes_read = self._recv_into(memoryview(chunk)) start = len(self._recv_buffer)
buf += memoryview(chunk)[:bytes_read] bytes_read = self._recv_into(memoryview(chunk))
buf += memoryview(chunk)[:bytes_read]
result = bytes(buf[: idx + 1])
self._recv_buffer = b"".join( result = bytes(buf[: idx + 1])
(memoryview(buf)[idx + 1 :], self._recv_buffer) self._recv_buffer = b"".join(
) (memoryview(buf)[idx + 1 :], self._recv_buffer)
return result )
return result
def _recv_into(self, view: memoryview) -> int:
bytes_read = min(len(view), len(self._recv_buffer)) def _recv_into(self, view: memoryview) -> int:
view[:bytes_read] = self._recv_buffer[:bytes_read] bytes_read = min(len(view), len(self._recv_buffer))
self._recv_buffer = self._recv_buffer[bytes_read:] view[:bytes_read] = self._recv_buffer[:bytes_read]
if bytes_read == len(view): self._recv_buffer = self._recv_buffer[bytes_read:]
return bytes_read if bytes_read == len(view):
try: return bytes_read
bytes_read += self._sock.recv_into(view[bytes_read:], 0, socket.MSG_DONTWAIT) try:
except: bytes_read += self._sock.recv_into(view[bytes_read:], 0)
pass except:
return bytes_read pass
return bytes_read
class libreVNA:
def __init__(self, host='localhost', port=19542): class libreVNA:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def __init__(self, host='localhost', port=19542):
try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port)) try:
except: self.sock.connect((host, port))
raise Exception("Unable to connect to LibreVNA-GUI. Make sure it is running and the TCP server is enabled.") except:
self.reader = SocketStreamReader(self.sock) raise Exception("Unable to connect to LibreVNA-GUI. Make sure it is running and the TCP server is enabled.")
self.reader = SocketStreamReader(self.sock)
def __del__(self):
self.sock.close() def __del__(self):
self.sock.close()
def __read_response(self):
return self.reader.readline().decode().rstrip() def __read_response(self):
return self.reader.readline().decode().rstrip()
def cmd(self, cmd):
self.sock.sendall(cmd.encode()) def cmd(self, cmd):
self.sock.send(b"\n") self.sock.sendall(cmd.encode())
resp = self.__read_response() self.sock.send(b"\n")
if len(resp) > 0: resp = self.__read_response()
raise Exception("Expected empty response but got "+resp) if len(resp) > 0:
raise Exception("Expected empty response but got "+resp)
def query(self, query):
self.sock.sendall(query.encode()) def query(self, query):
self.sock.send(b"\n") self.sock.sendall(query.encode())
return self.__read_response() self.sock.send(b"\n")
return self.__read_response()
@staticmethod
def parse_VNA_trace_data(data): @staticmethod
ret = [] def parse_VNA_trace_data(data):
# Remove brackets (order of data implicitly known) ret = []
data = data.replace(']','').replace('[','') # Remove brackets (order of data implicitly known)
values = data.split(',') data = data.replace(']','').replace('[','')
if int(len(values) / 3) * 3 != len(values): values = data.split(',')
# number of values must be a multiple of three (frequency, real, imaginary) if int(len(values) / 3) * 3 != len(values):
raise Exception("Invalid input data: expected tuples of three values each") # number of values must be a multiple of three (frequency, real, imaginary)
for i in range(0, len(values), 3): raise Exception("Invalid input data: expected tuples of three values each")
freq = float(values[i]) for i in range(0, len(values), 3):
real = float(values[i+1]) freq = float(values[i])
imag = float(values[i+2]) real = float(values[i+1])
ret.append((freq, complex(real, imag))) imag = float(values[i+2])
return ret ret.append((freq, complex(real, imag)))
return ret
@staticmethod
def parse_SA_trace_data(data): @staticmethod
ret = [] def parse_SA_trace_data(data):
# Remove brackets (order of data implicitly known) ret = []
data = data.replace(']','').replace('[','') # Remove brackets (order of data implicitly known)
values = data.split(',') data = data.replace(']','').replace('[','')
if int(len(values) / 2) * 2 != len(values): values = data.split(',')
# number of values must be a multiple of two (frequency, dBm) if int(len(values) / 2) * 2 != len(values):
raise Exception("Invalid input data: expected tuples of two values each") # number of values must be a multiple of two (frequency, dBm)
for i in range(0, len(values), 2): raise Exception("Invalid input data: expected tuples of two values each")
freq = float(values[i]) for i in range(0, len(values), 2):
dBm = float(values[i+1]) freq = float(values[i])
ret.append((freq, dBm)) dBm = float(values[i+1])
return ret ret.append((freq, dBm))
return ret