In [1]:
import os, math, requests
from dataclasses import dataclass
import numpy as np
import pandas as pd
from numba import vectorize, float64, boolean, njit
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import holoviews as hv
from IPython.display import Markdown
from holoviews.operation import decimate

hv.extension('bokeh', logo = False)
#hv.extension('matplotlib', logo = False)
In [2]:
# turn off numpy warning for overflow in np.exp()
np.seterr(over='ignore');
In [3]:
shot_no = 0

(Internal Tektronix format) .wfm parser¶

In [4]:
# wfm reader proof-of-concept
# https://www.tek.com/sample-license
# reads volts vs. time records (including fastframes) from little-endian version 3 WFM files

# See Also
# Performance Oscilloscope Reference Waveform File Format
# Tektronix part # 077-0220-10
# https://www.tek.com/oscilloscope/dpo7000-digital-phosphor-oscilloscope-manual-4

import struct
import numpy as np # http://www.numpy.org/

class WfmReadError(Exception):
    """error for unexpected things"""
    pass

def read_wfm(target):
    """return sample data from target WFM file"""
    with open(target, 'rb') as f:
        hbytes = f.read(838)
        meta = decode_header(hbytes)
        # file signature checks
        if meta['byte_order'] != 0x0f0f:
            raise WfmReadError('big-endian not supported in this example')
        if meta['version'] != b':WFM#003':
            raise WfmReadError('only version 3 wfms supported in this example')
        if meta['imp_dim_count'] != 1:
            raise WfmReadError('imp dim count not 1')
        if meta['exp_dim_count'] != 1:
            raise WfmReadError('exp dim count not 1')
        if meta['record_type'] != 2:
            raise WfmReadError('not WFMDATA_VECTOR')
        if meta['exp_dim_1_type'] != 0:
            raise WfmReadError('not EXPLICIT_SAMPLE')
        if meta['time_base_1'] != 0:
            raise WfmReadError('not BASE_TIME')
        tfrac_array = np.zeros(meta['Frames'], dtype=np.double)
        tdatefrac_array = np.zeros(meta['Frames'], dtype=np.double)
        tdate_array = np.zeros(meta['Frames'], dtype=np.int32)
        tfrac_array[0] = meta['tfrac']
        tdatefrac_array[0] = meta['tdatefrac']
        tdate_array[0] = meta['tdate']
        # if fastframe, read fastframe table
        if meta['fastframe'] == 1:
            WUSp = np.fromfile(f, dtype='i4,f8,f8,i4', count=(meta['Frames'] - 1))
            # merge first frame trigger infos with frames > 1
            tfrac_array[1:] = WUSp['f1']
            tdatefrac_array[1:] = WUSp['f2']
            tdate_array[1:] = WUSp['f3']
        # read curve block
        bin_wave = np.memmap(filename = f,
                             dtype = meta['dformat'],
                             mode = 'r',
                             offset = meta['curve_offset'],
                             shape = (meta['avilable_values'], meta['Frames']),
                             order = 'F')
        # close file
    # slice out buffer values
    bin_wave = bin_wave[meta['pre_values']:meta['avilable_values'] - meta['post_values'],:]
    scaled_array = bin_wave * meta['vscale'] + meta['voffset']
    return scaled_array, meta['tstart'], meta['tscale'], tfrac_array, tdatefrac_array, tdate_array

def decode_header(header_bytes):
    """returns a dict of wfm metadata"""
    wfm_info = {}
    if len(header_bytes) != 838:
        raise WfmReadError('wfm header bytes not 838')
    wfm_info['byte_order'] = struct.unpack_from('H', header_bytes, offset=0)[0]
    wfm_info['version'] = struct.unpack_from('8s', header_bytes, offset=2)[0]
    wfm_info['imp_dim_count'] = struct.unpack_from('I', header_bytes, offset=114)[0]
    wfm_info['exp_dim_count'] = struct.unpack_from('I', header_bytes, offset=118)[0]
    wfm_info['record_type'] = struct.unpack_from('I', header_bytes, offset=122)[0]
    wfm_info['exp_dim_1_type'] = struct.unpack_from('I', header_bytes, offset=244)[0]
    wfm_info['time_base_1'] = struct.unpack_from('I', header_bytes, offset=768)[0]
    wfm_info['fastframe'] = struct.unpack_from('I', header_bytes, offset=78)[0]
    wfm_info['Frames'] = struct.unpack_from('I', header_bytes, offset=72)[0] + 1
    wfm_info['summary_frame'] = struct.unpack_from('h', header_bytes, offset=154)[0]
    wfm_info['curve_offset'] = struct.unpack_from('i', header_bytes, offset=16)[0] # 838 + ((frames - 1) * 54)
    # scaling factors
    wfm_info['vscale'] = struct.unpack_from('d', header_bytes, offset=168)[0]
    wfm_info['voffset'] = struct.unpack_from('d', header_bytes, offset=176)[0]
    wfm_info['tstart'] = struct.unpack_from('d', header_bytes, offset=496)[0]
    wfm_info['tscale'] = struct.unpack_from('d', header_bytes, offset=488)[0]
    # trigger detail
    wfm_info['tfrac'] = struct.unpack_from('d', header_bytes, offset=788)[0] # frame index 0
    wfm_info['tdatefrac'] = struct.unpack_from('d', header_bytes, offset=796)[0] # frame index 0
    wfm_info['tdate'] = struct.unpack_from('I', header_bytes, offset=804)[0] # frame index 0
    # data offsets
    # frames are same size, only first frame offsets are used
    dpre = struct.unpack_from('I', header_bytes, offset=822)[0]
    wfm_info['dpre'] = dpre
    dpost = struct.unpack_from('I', header_bytes, offset=826)[0]
    wfm_info['dpost'] = dpost
    readbytes = dpost - dpre
    wfm_info['readbytes'] = readbytes
    allbytes = struct.unpack_from('I', header_bytes, offset=830)[0]
    wfm_info['allbytes'] = allbytes
    # sample data type detection
    code = struct.unpack_from('i', header_bytes, offset=240)[0]
    wfm_info['code'] = code
    bps = struct.unpack_from('b', header_bytes, offset=15)[0]  # bytes-per-sample
    wfm_info['bps'] = bps
    if code == 7 and bps == 1:
        dformat = 'int8'
        samples = readbytes
    elif code == 0 and bps == 2:
        dformat = 'int16'
        samples = readbytes // 2
    elif code == 4 and bps == 4:
        dformat = 'single'
        samples = readbytes // 4
    else:
        raise WfmReadError('data type code or bytes-per-sample not understood')
    wfm_info['dformat'] = dformat
    wfm_info['samples'] = samples
    wfm_info['avilable_values'] = allbytes // bps
    wfm_info['pre_values'] = dpre // bps
    wfm_info['post_values'] = (allbytes - dpost) // bps
    return wfm_info

def readwfm(path):
    volts, tstart, tscale, tfrac, tdatefrac, tdate = read_wfm(path)
    toff = tfrac * tscale
    samples, frames = volts.shape
    tstop = samples * tscale + tstart
    volts = volts.reshape(len(volts))
    time = np.linspace(tstart+toff, tstop+toff, num=samples, endpoint=False)
    time = time.reshape(len(volts))
    
    return time,volts

Get Hi-Res data from MSO58 oscilloscope¶

In [5]:
err = None
# load data locally
fnames = ['DAS_raw_data_dir/ch1.wfm',
          'DAS_raw_data_dir/ch2.wfm',
          'DAS_raw_data_dir/ch3.wfm',
          'DAS_raw_data_dir/ch4.wfm',
          'DAS_raw_data_dir/ch5.wfm',
          'DAS_raw_data_dir/ch6.wfm',
          'DAS_raw_data_dir/ch7.wfm']
ds = np.DataSource()

if not os.path.isfile('DAS_raw_data_dir/ch6.wfm'):
        # or download from web (slow)
        for i, ch_no in enumerate(range(1,8)):
            try:
                file = ds.open(f'http://golem.fjfi.cvut.cz/shots/{shot_no}/Devices/Oscilloscopes/TektrMSO58-a/ch{ch_no}.wfm', mode = 'rb')
                fnames[i] = file.name
                file.close()
            except (ConnectionError, FileNotFoundError) as e:
                err = Markdown('### Can not download data')
if err is None:
    t_vec, y_ch1 = readwfm(fnames[0])        
    _, y_ch2 = readwfm(fnames[1])
    _, y_ch3 = readwfm(fnames[2])
    _, y_ch4 = readwfm(fnames[3])
    _, y_ch5 = readwfm(fnames[4])
    _, y_ch6 = readwfm(fnames[5])
    _, y_ch7 = readwfm(fnames[6])
else:
    err
    
channels = [y_ch1, y_ch2, y_ch3, y_ch4, y_ch5, y_ch6, y_ch7]

Check if data has sufficient sample rate

In [6]:
dt = t_vec[1]-t_vec[0]
MSamples =  1e-6/(dt) 

downsample = int(np.rint(float(1e-6) / dt))

if MSamples >= 120:
    out = Markdown(f'### Data with {MSamples:.1f}MS/s resolution') 
    low_res_data = False
else:
    out = Markdown('### Low resolution for peak recononstruction \n adjust oscilloscope settings')
    low_res_data = True
out
Out[6]:

Data with 250.0MS/s resolution¶

Plot OSC waveforms and peak thresholds¶

In [7]:
color_cycle = plt.rcParams['axes.prop_cycle'].by_key()['color']
fig, axes = plt.subplots(len(channels), sharex  =True)
for ax,ch,color in zip(axes,channels, color_cycle):
    ax.plot(t_vec[::downsample]*1e3, ch[::downsample], color=color)
    ax.grid()

fig.legend();
plt.savefig('icon-fig.png')
No artists with labels found to put in legend.  Note that artists whose label start with an underscore are ignored when legend() is called with no argument.
No description has been provided for this image