from __future__ import print_function
import re
import sys
from time import sleep
import visa # https://pyvisa.readthedocs.io/
import numpy as np
# global settings
visa_address = "TCPIP::192.168.2.37::INSTR"
buffer_len = 1024**2
hires_channels = (2, 3)
class Connection:
def __init__(self, address, visa_backend='@py'): # default backend: pyvisa-py
self.rm = visa.ResourceManager(visa_backend)
self.scope = self.rm.open_resource(address, access_mode=visa.constants.AccessModes.exclusive_lock)
self.scope.timeout = 5000 # 5 s
def __del__(self):
self.scope.close()
self.rm.close()
def identify(self):
print(self.scope.query('*IDN?'))
def save_screenshot(self, target_fname='screenshot.png', tmp_fname='C:/Temp.png'):
self.scope.write('SAVE:IMAGe "{}"'.format(tmp_fname)) # Save image to instrument's local disk
self.scope.query('*OPC?') # Wait for instrument to finish writing image to disk
self.scope.write('FILESystem:READFile "{}"'.format(tmp_fname)) # Read image file from instrument
sleep(0.5)
img_data = self.scope.read_raw(buffer_len)
with open(target_fname, 'w') as fout:
fout.write(img_data)
self.scope.write('FILESystem:DELEte "{}"'.format(tmp_fname)) # Delete temporary file
def available_analog_channels(self):
ret = self.scope.query('data:source:available?')
if not ret:
return []
channels = [int(c) for c in re.findall('CH(\\d', ret)]
return channels
def save_channels(self, out_fname, target_dt, *channels):
data_all = []
data_dict = {}
for channel in channels:
channel = int(channel)
print('getting channel', channel)
d = self.get_channel(channel)
sleep(0.5)
data_all.append(d)
if channel in hires_channels:
data_dict['ch{}'.format(channel)] = d
time, dt = self.get_time_axis()
data_dict['time'] = time
data_all.insert(0, time)
data = np.column_stack(data_all)
downsample = int(np.rint(float(target_dt) / dt))
if downsample > 1:
print('downsampling by factor', downsample)
data = data[::downsample,:]
np.savetxt(out_fname, data,
fmt=(['%.7e'] # time has high resolution
+ ['%.3e'] * (len(data_all)-1)# 16 bit float has only 3.3 decimal digits precision
),
header=' '.join(['time'] + ['ch{}'.format(c) for c in channels]))
np.savez_compressed('data_all_fullres.npz', **data_dict)
def get_channel(self, channel_i, byte_width=2):
"""Based on script at
https://www.tek.com/support/faqs/programing-how-get-and-plot-waveform-dpo-mso-mdo4000-series-scope-python
"""
self.scope.write('DATA:SOUrce CH{}'.format(channel_i))
self.scope.write('DATA:WIDTH {}'.format(byte_width))
self.scope.write('DATA:ENCdg RIBINARY')
ymult = float(self.scope.query('WFMOUTPRE:YMULT?'))
yzero = float(self.scope.query('WFMOUTPRE:YZERO?'))
yoff = float(self.scope.query('WFMOUTPRE:YOFF?'))
self.scope.write('CURVE?')
data = self.scope.read_raw(buffer_len)
headerlen = 2 + int(data[1])
header = data[:headerlen]
# first byte is '#', second is length of string showing number of points
data_wfm_str = data[headerlen:-1] # last byte is newline
data_wfm = np.fromstring(data_wfm_str, dtype='>i{}'.format(byte_width))
data_volts = (data_wfm - yoff)*ymult + yzero
return data_volts
def get_time_axis(self):
self.scope.write('DATA:SOUrce CH1') # TODO? to be sure we are taking analog
xincr = float(self.scope.query('WFMOUTPRE:XINCR?'))
xzero = float(self.scope.query('WFMOUTPRE:XZERO?'))
N = int(self.scope.query('WFMOutpre:NR_Pt?'))
time = np.arange(N)*xincr + xzero
return time, xincr
known_commands = set(filter(lambda n: not n.startswith('_'), dir(Connection)))
if __name__ == '__main__':
con = Connection(visa_address)
try:
command = sys.argv[1]
except IndexError:
print('supply command (Connection method name) as script argument')
sys.exit(1)
try:
method = getattr(con, command)
except AttributeError:
print('Unkwnown command "{}", known commands: {}'.format(command, known_commands))
sys.exit(1)
method(*sys.argv[2:])
sys.exit(0)