Sandbox/Dirigent/main.py

from __future__ import print_function
import re
import sys
from time import sleep
import visa 								# https://pyvisa.readthedocs.io/
import numpy as np

# global settings
visa_address = "TCPIP::192.168.2.37::INSTR"
buffer_len = 1024**2
hires_channels = (2, 3)


class Connection:

    def __init__(self, address, visa_backend='@py'):  # default backend: pyvisa-py
        self.rm = visa.ResourceManager(visa_backend)
        self.scope = self.rm.open_resource(address, access_mode=visa.constants.AccessModes.exclusive_lock)
        self.scope.timeout = 5000 # 5 s

    def __del__(self):
        self.scope.close()
        self.rm.close()

    def identify(self):
        print(self.scope.query('*IDN?'))

    def save_screenshot(self, target_fname='screenshot.png', tmp_fname='C:/Temp.png'):
        self.scope.write('SAVE:IMAGe "{}"'.format(tmp_fname))			        # Save image to instrument's local disk
        self.scope.query('*OPC?')							# Wait for instrument to finish writing image to disk
        self.scope.write('FILESystem:READFile "{}"'.format(tmp_fname))			# Read image file from instrument
        sleep(0.5)
        img_data = self.scope.read_raw(buffer_len)
        with open(target_fname, 'w') as fout:
            fout.write(img_data)
        self.scope.write('FILESystem:DELEte "{}"'.format(tmp_fname))			# Delete temporary file

    def available_analog_channels(self):
        ret = self.scope.query('data:source:available?')
        if not ret:
            return []
        channels = [int(c) for c in re.findall('CH(\\d', ret)]
        return channels

    def save_channels(self, out_fname, target_dt, *channels):
        data_all = []
        data_dict = {}
        for channel in channels:
            channel = int(channel)
            print('getting channel', channel)
            d = self.get_channel(channel)
            sleep(0.5)
            data_all.append(d)
            if channel in hires_channels:
                data_dict['ch{}'.format(channel)] = d
        time, dt = self.get_time_axis()
        data_dict['time'] = time
        data_all.insert(0, time)
        data = np.column_stack(data_all)
        downsample = int(np.rint(float(target_dt) / dt))
        if downsample > 1:
            print('downsampling by factor', downsample)
            data = data[::downsample,:]
        np.savetxt(out_fname, data,
                fmt=(['%.7e'] # time has high resolution
                    + ['%.3e'] * (len(data_all)-1)# 16 bit float has only 3.3 decimal digits precision
                    ), 
                header=' '.join(['time'] + ['ch{}'.format(c) for c in channels]))
        np.savez_compressed('data_all_fullres.npz', **data_dict)

    def get_channel(self, channel_i, byte_width=2):
        """Based on script at 
        https://www.tek.com/support/faqs/programing-how-get-and-plot-waveform-dpo-mso-mdo4000-series-scope-python
        """
        self.scope.write('DATA:SOUrce CH{}'.format(channel_i))
        self.scope.write('DATA:WIDTH {}'.format(byte_width))
        self.scope.write('DATA:ENCdg RIBINARY')


        ymult = float(self.scope.query('WFMOUTPRE:YMULT?'))
        yzero = float(self.scope.query('WFMOUTPRE:YZERO?'))
        yoff = float(self.scope.query('WFMOUTPRE:YOFF?'))


        self.scope.write('CURVE?')
        data = self.scope.read_raw(buffer_len)
        headerlen = 2 + int(data[1]) 
        header = data[:headerlen]
        # first byte is '#', second is length of string showing number of points
        data_wfm_str = data[headerlen:-1] # last byte is newline

        data_wfm = np.fromstring(data_wfm_str, dtype='>i{}'.format(byte_width))

        data_volts = (data_wfm - yoff)*ymult + yzero
        return data_volts

    def get_time_axis(self):
        self.scope.write('DATA:SOUrce CH1') # TODO? to be sure we are taking analog
        xincr = float(self.scope.query('WFMOUTPRE:XINCR?'))
        xzero = float(self.scope.query('WFMOUTPRE:XZERO?'))
        N = int(self.scope.query('WFMOutpre:NR_Pt?'))

        time = np.arange(N)*xincr + xzero
        return time, xincr




known_commands =  set(filter(lambda n: not n.startswith('_'), dir(Connection)))


if __name__ == '__main__':
    con = Connection(visa_address)
    try:
        command = sys.argv[1]
    except IndexError:
        print('supply command (Connection method name) as script argument')
        sys.exit(1)
    try:
        method = getattr(con, command)
    except AttributeError:
        print('Unkwnown command "{}", known commands: {}'.format(command, known_commands))
        sys.exit(1)
    method(*sys.argv[2:])
    sys.exit(0)