from __future__ import print_function import re import sys from time import sleep import visa # https://pyvisa.readthedocs.io/ import numpy as np # global settings visa_address = "TCPIP::192.168.2.37::INSTR" buffer_len = 1024**2 hires_channels = (2, 3) class Connection: def __init__(self, address, visa_backend='@py'): # default backend: pyvisa-py self.rm = visa.ResourceManager(visa_backend) self.scope = self.rm.open_resource(address, access_mode=visa.constants.AccessModes.exclusive_lock) self.scope.timeout = 5000 # 5 s def __del__(self): self.scope.close() self.rm.close() def identify(self): print(self.scope.query('*IDN?')) def save_screenshot(self, target_fname='screenshot.png', tmp_fname='C:/Temp.png'): self.scope.write('SAVE:IMAGe "{}"'.format(tmp_fname)) # Save image to instrument's local disk self.scope.query('*OPC?') # Wait for instrument to finish writing image to disk self.scope.write('FILESystem:READFile "{}"'.format(tmp_fname)) # Read image file from instrument sleep(0.5) img_data = self.scope.read_raw(buffer_len) with open(target_fname, 'w') as fout: fout.write(img_data) self.scope.write('FILESystem:DELEte "{}"'.format(tmp_fname)) # Delete temporary file def available_analog_channels(self): ret = self.scope.query('data:source:available?') if not ret: return [] channels = [int(c) for c in re.findall('CH(\\d', ret)] return channels def save_channels(self, out_fname, target_dt, *channels): data_all = [] data_dict = {} for channel in channels: channel = int(channel) print('getting channel', channel) d = self.get_channel(channel) sleep(0.5) data_all.append(d) if channel in hires_channels: data_dict['ch{}'.format(channel)] = d time, dt = self.get_time_axis() data_dict['time'] = time data_all.insert(0, time) data = np.column_stack(data_all) downsample = int(np.rint(float(target_dt) / dt)) if downsample > 1: print('downsampling by factor', downsample) data = data[::downsample,:] np.savetxt(out_fname, data, fmt=(['%.7e'] # time has high resolution + ['%.3e'] * (len(data_all)-1)# 16 bit float has only 3.3 decimal digits precision ), header=' '.join(['time'] + ['ch{}'.format(c) for c in channels])) np.savez_compressed('data_all_fullres.npz', **data_dict) def get_channel(self, channel_i, byte_width=2): """Based on script at https://www.tek.com/support/faqs/programing-how-get-and-plot-waveform-dpo-mso-mdo4000-series-scope-python """ self.scope.write('DATA:SOUrce CH{}'.format(channel_i)) self.scope.write('DATA:WIDTH {}'.format(byte_width)) self.scope.write('DATA:ENCdg RIBINARY') ymult = float(self.scope.query('WFMOUTPRE:YMULT?')) yzero = float(self.scope.query('WFMOUTPRE:YZERO?')) yoff = float(self.scope.query('WFMOUTPRE:YOFF?')) self.scope.write('CURVE?') data = self.scope.read_raw(buffer_len) headerlen = 2 + int(data[1]) header = data[:headerlen] # first byte is '#', second is length of string showing number of points data_wfm_str = data[headerlen:-1] # last byte is newline data_wfm = np.fromstring(data_wfm_str, dtype='>i{}'.format(byte_width)) data_volts = (data_wfm - yoff)*ymult + yzero return data_volts def get_time_axis(self): self.scope.write('DATA:SOUrce CH1') # TODO? to be sure we are taking analog xincr = float(self.scope.query('WFMOUTPRE:XINCR?')) xzero = float(self.scope.query('WFMOUTPRE:XZERO?')) N = int(self.scope.query('WFMOutpre:NR_Pt?')) time = np.arange(N)*xincr + xzero return time, xincr known_commands = set(filter(lambda n: not n.startswith('_'), dir(Connection))) if __name__ == '__main__': con = Connection(visa_address) try: command = sys.argv[1] except IndexError: print('supply command (Connection method name) as script argument') sys.exit(1) try: method = getattr(con, command) except AttributeError: print('Unkwnown command "{}", known commands: {}'.format(command, known_commands)) sys.exit(1) method(*sys.argv[2:]) sys.exit(0)