Simple python script for demonstrating SCPI programming

This commit is contained in:
Jan Käberich 2021-04-15 20:06:23 +02:00
parent aebe92111b
commit 9db83a608d
3 changed files with 132 additions and 0 deletions

View File

@ -0,0 +1,4 @@
venv/
__pycache__
.idea

View File

@ -0,0 +1,82 @@
import socket
from asyncio import IncompleteReadError # only import the exception class
class SocketStreamReader:
def __init__(self, sock: socket.socket):
self._sock = sock
self._recv_buffer = bytearray()
def read(self, num_bytes: int = -1) -> bytes:
raise NotImplementedError
def readexactly(self, num_bytes: int) -> bytes:
buf = bytearray(num_bytes)
pos = 0
while pos < num_bytes:
n = self._recv_into(memoryview(buf)[pos:])
if n == 0:
raise IncompleteReadError(bytes(buf[:pos]), num_bytes)
pos += n
return bytes(buf)
def readline(self) -> bytes:
return self.readuntil(b"\n")
def readuntil(self, separator: bytes = b"\n") -> bytes:
if len(separator) != 1:
raise ValueError("Only separators of length 1 are supported.")
chunk = bytearray(4096)
start = 0
buf = bytearray(len(self._recv_buffer))
bytes_read = self._recv_into(memoryview(buf))
assert bytes_read == len(buf)
while True:
idx = buf.find(separator, start)
if idx != -1:
break
start = len(self._recv_buffer)
bytes_read = self._recv_into(memoryview(chunk))
buf += memoryview(chunk)[:bytes_read]
result = bytes(buf[: idx + 1])
self._recv_buffer = b"".join(
(memoryview(buf)[idx + 1 :], self._recv_buffer)
)
return result
def _recv_into(self, view: memoryview) -> int:
bytes_read = min(len(view), len(self._recv_buffer))
view[:bytes_read] = self._recv_buffer[:bytes_read]
self._recv_buffer = self._recv_buffer[bytes_read:]
if bytes_read == len(view):
return bytes_read
bytes_read += self._sock.recv_into(view[bytes_read:])
return bytes_read
class libreVNA:
def __init__(self, host='localhost', port=19542):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sock.connect((host, port))
except:
raise Exception("Unable to connect to LibreVNA-GUI. Make sure it is running and the TCP server is enabled.")
self.reader = SocketStreamReader(self.sock)
def __del__(self):
self.sock.close()
def __read_response(self):
return self.reader.readline().decode().rstrip()
def cmd(self, cmd):
self.sock.sendall(cmd.encode())
self.sock.send(b"\n")
self.__read_response()
def query(self, query):
self.sock.sendall(query.encode())
self.sock.send(b"\n")
return self.__read_response()

View File

@ -0,0 +1,46 @@
#!/usr/bin/env python3
import time
from libreVNA import libreVNA
# Create the control instance
vna = libreVNA('localhost', 19542)
# Quick connection check (should print "LibreVNA-GUI")
print(vna.query("*IDN?"))
# Make sure we are connecting to a device (just to be sure, with default settings the LibreVNA-GUI auto-connects)
vna.cmd(":DEV:CONN")
dev = vna.query(":DEV:CONN?")
if dev == "Not connected":
print("Not connected to any device, aborting")
exit(-1)
else:
print("Connected to "+dev)
# Simple generator demo
# switch to generator
vna.cmd(":DEV:MODE GEN")
# set the output level
vna.cmd(":GEN:LVL -20")
# set initial frequency and enable port 1
print("Generating signal with 1GHz at port 1")
vna.cmd(":GEN:FREQ 1000000000")
vna.cmd(":GEN:PORT 1")
try:
while True:
time.sleep(2)
print("Setting frequency to 1.5GHz")
vna.cmd(":GEN:FREQ 1500000000")
time.sleep(2)
print("Setting frequency to 1.0GHz")
vna.cmd(":GEN:FREQ 1000000000")
except KeyboardInterrupt:
# turn off generator
vna.cmd(":GEN:PORT 0")
exit(0)