In [1]:
import os
import numpy as np
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
In [2]:
shot_no = 0
(Internal Tektronix format) .wfm parser¶
In [3]:
# wfm reader proof-of-concept
# https://www.tek.com/sample-license
# reads volts vs. time records (including fastframes) from little-endian version 3 WFM files
# See Also
# Performance Oscilloscope Reference Waveform File Format
# Tektronix part # 077-0220-10
# https://www.tek.com/oscilloscope/dpo7000-digital-phosphor-oscilloscope-manual-4
import struct
import numpy as np # http://www.numpy.org/
class WfmReadError(Exception):
"""error for unexpected things"""
pass
def read_wfm(target):
"""return sample data from target WFM file"""
with open(target, 'rb') as f:
hbytes = f.read(838)
meta = decode_header(hbytes)
# file signature checks
if meta['byte_order'] != 0x0f0f:
raise WfmReadError('big-endian not supported in this example')
if meta['version'] != b':WFM#003':
raise WfmReadError('only version 3 wfms supported in this example')
if meta['imp_dim_count'] != 1:
raise WfmReadError('imp dim count not 1')
if meta['exp_dim_count'] != 1:
raise WfmReadError('exp dim count not 1')
if meta['record_type'] != 2:
raise WfmReadError('not WFMDATA_VECTOR')
if meta['exp_dim_1_type'] != 0:
raise WfmReadError('not EXPLICIT_SAMPLE')
if meta['time_base_1'] != 0:
raise WfmReadError('not BASE_TIME')
tfrac_array = np.zeros(meta['Frames'], dtype=np.double)
tdatefrac_array = np.zeros(meta['Frames'], dtype=np.double)
tdate_array = np.zeros(meta['Frames'], dtype=np.int32)
tfrac_array[0] = meta['tfrac']
tdatefrac_array[0] = meta['tdatefrac']
tdate_array[0] = meta['tdate']
# if fastframe, read fastframe table
if meta['fastframe'] == 1:
WUSp = np.fromfile(f, dtype='i4,f8,f8,i4', count=(meta['Frames'] - 1))
# merge first frame trigger infos with frames > 1
tfrac_array[1:] = WUSp['f1']
tdatefrac_array[1:] = WUSp['f2']
tdate_array[1:] = WUSp['f3']
# read curve block
bin_wave = np.memmap(filename = f,
dtype = meta['dformat'],
mode = 'r',
offset = meta['curve_offset'],
shape = (meta['avilable_values'], meta['Frames']),
order = 'F')
# close file
# slice out buffer values
bin_wave = bin_wave[meta['pre_values']:meta['avilable_values'] - meta['post_values'],:]
scaled_array = bin_wave * meta['vscale'] + meta['voffset']
return scaled_array, meta['tstart'], meta['tscale'], tfrac_array, tdatefrac_array, tdate_array
def decode_header(header_bytes):
"""returns a dict of wfm metadata"""
wfm_info = {}
if len(header_bytes) != 838:
raise WfmReadError('wfm header bytes not 838')
wfm_info['byte_order'] = struct.unpack_from('H', header_bytes, offset=0)[0]
wfm_info['version'] = struct.unpack_from('8s', header_bytes, offset=2)[0]
wfm_info['imp_dim_count'] = struct.unpack_from('I', header_bytes, offset=114)[0]
wfm_info['exp_dim_count'] = struct.unpack_from('I', header_bytes, offset=118)[0]
wfm_info['record_type'] = struct.unpack_from('I', header_bytes, offset=122)[0]
wfm_info['exp_dim_1_type'] = struct.unpack_from('I', header_bytes, offset=244)[0]
wfm_info['time_base_1'] = struct.unpack_from('I', header_bytes, offset=768)[0]
wfm_info['fastframe'] = struct.unpack_from('I', header_bytes, offset=78)[0]
wfm_info['Frames'] = struct.unpack_from('I', header_bytes, offset=72)[0] + 1
wfm_info['summary_frame'] = struct.unpack_from('h', header_bytes, offset=154)[0]
wfm_info['curve_offset'] = struct.unpack_from('i', header_bytes, offset=16)[0] # 838 + ((frames - 1) * 54)
# scaling factors
wfm_info['vscale'] = struct.unpack_from('d', header_bytes, offset=168)[0]
wfm_info['voffset'] = struct.unpack_from('d', header_bytes, offset=176)[0]
wfm_info['tstart'] = struct.unpack_from('d', header_bytes, offset=496)[0]
wfm_info['tscale'] = struct.unpack_from('d', header_bytes, offset=488)[0]
# trigger detail
wfm_info['tfrac'] = struct.unpack_from('d', header_bytes, offset=788)[0] # frame index 0
wfm_info['tdatefrac'] = struct.unpack_from('d', header_bytes, offset=796)[0] # frame index 0
wfm_info['tdate'] = struct.unpack_from('I', header_bytes, offset=804)[0] # frame index 0
# data offsets
# frames are same size, only first frame offsets are used
dpre = struct.unpack_from('I', header_bytes, offset=822)[0]
wfm_info['dpre'] = dpre
dpost = struct.unpack_from('I', header_bytes, offset=826)[0]
wfm_info['dpost'] = dpost
readbytes = dpost - dpre
wfm_info['readbytes'] = readbytes
allbytes = struct.unpack_from('I', header_bytes, offset=830)[0]
wfm_info['allbytes'] = allbytes
# sample data type detection
code = struct.unpack_from('i', header_bytes, offset=240)[0]
wfm_info['code'] = code
bps = struct.unpack_from('b', header_bytes, offset=15)[0] # bytes-per-sample
wfm_info['bps'] = bps
if code == 7 and bps == 1:
dformat = 'int8'
samples = readbytes
elif code == 0 and bps == 2:
dformat = 'int16'
samples = readbytes // 2
elif code == 4 and bps == 4:
dformat = 'single'
samples = readbytes // 4
else:
raise WfmReadError('data type code or bytes-per-sample not understood')
wfm_info['dformat'] = dformat
wfm_info['samples'] = samples
wfm_info['avilable_values'] = allbytes // bps
wfm_info['pre_values'] = dpre // bps
wfm_info['post_values'] = (allbytes - dpost) // bps
return wfm_info
def readwfm(path):
volts, tstart, tscale, tfrac, tdatefrac, tdate = read_wfm(path)
toff = tfrac * tscale
samples, frames = volts.shape
tstop = samples * tscale + tstart
volts = volts.reshape(len(volts))
time = np.linspace(tstart+toff, tstop+toff, num=samples, endpoint=False)
time = time.reshape(len(volts))
return time,volts
Get Hi-Res data from MSO58 oscilloscope¶
In [4]:
# load data locally
fnames = ['DAS_raw_data_dir/ch2.wfm',
'DAS_raw_data_dir/ch3.wfm',
'DAS_raw_data_dir/ch4.wfm',
'DAS_raw_data_dir/ch5.wfm',
'DAS_raw_data_dir/ch6.wfm']
# or download from web (may be slow)
ds = np.DataSource('/tmp/')
if not os.path.isfile('DAS_raw_data_dir/ch6.wfm'):
for i, ch_no in enumerate(range(2,7)):
try:
file = ds.open(f'http://golem.fjfi.cvut.cz/shots/{shot_no}/Devices/Oscilloscopes/TektrMSO58-a/ch{ch_no}.wfm', mode = 'rb')
fnames[i] = file.name
file.close()
except (ConnectionError, FileNotFoundError):
print('Can not download data')
channels = [readwfm(fname) for fname in fnames]
channel_names = ["YaP", "CeBr-A", "CeBr-B", "NaI", "LYSO"]
Check sample rate
In [5]:
time = channels[0][0]
dt = time[1]-time[0]
MSamples = 1e-6/(dt)
downsample = int(np.rint(float(1e-6) / dt))
print(f"Data with {MSamples:.0f}MS/s")
Data with 250MS/s
Plot data¶
In [6]:
# use fancy colours
color_cycle = plt.rcParams['axes.prop_cycle'].by_key()['color']
# prepare figure with axes
fig, axes = plt.subplots(len(channels), sharex=True)
# plot each channel
for ax, channel, channel_name, color in zip(axes, channels, channel_names, color_cycle):
time, y = channel
# flip waveforms
if channel_name != 'LYSO':
y*= -1
ax.plot(time[::downsample],y[::downsample], label = channel_name, color = color)
# show legend
fig.legend()
plt.savefig('icon-fig.png')
In [7]:
mean_noise_level = []
fig, axes = plt.subplots(len(channels), sharex=True)
for ax, channel, channel_name, color in zip(axes, channels, channel_names, color_cycle):
time, y = channel
mask = time < -0.001
noise = y[mask]
zero_shift = np.mean(noise)
y-=zero_shift
noise = y[mask]
mean_noise = np.sqrt(np.mean(noise**2))
ax.plot(time,y, label = channel_name, color = color)
ax.axhline(mean_noise)
mean_noise_level.append(mean_noise)
In [8]:
plasma_URL = "http://golem.fjfi.cvut.cz/shots/{}/Diagnostics/PlasmaDetection/Results/{}"
try:
t_plasma_start = ds.open(plasma_URL.format(shot_no, "t_plasma_start"))
t_plasma_start = np.loadtxt(t_plasma_start)/1000
t_plasma_end = ds.open(plasma_URL.format(shot_no, "t_plasma_end"))
t_plasma_end = np.loadtxt(t_plasma_end)/1000
except OSError:
t_plasma_start = None
t_plasma_end = None
In [9]:
peak_count = []
fig, axes = plt.subplots(len(channels), sharex=True)
for ax, channel, channel_name, noise_level, color in zip(axes, channels, channel_names, mean_noise_level, color_cycle):
time, y = channel
peak_thresh = noise_level*4
peak_loc,_ = find_peaks(y,height=peak_thresh, prominence = noise_level*2)
if t_plasma_start is not None and t_plasma_end is not None:
mask_in_plasma = np.bitwise_and(time[peak_loc] > t_plasma_start,time[peak_loc] < t_plasma_end)
peak_loc = peak_loc[mask_in_plasma].copy()
ax.plot(time,y, label = channel_name, color = color)
ax.axhline(peak_thresh)
ax.scatter(time[peak_loc],y[peak_loc])
peak_count.append(len(peak_loc))
In [10]:
for pc, name in zip(peak_count, channel_names):
print(f"{name}\t{pc}")
YaP 78540 CeBr-A 181987 CeBr-B 73636 NaI 109252 LYSO 26156
In [11]:
RE_energy_estimate = 500 #keV
photon_yd_per_keV = [18,60,60,38,30]
for pc, name, y in zip(peak_count, channel_names, photon_yd_per_keV):
p_estimate = pc * y * RE_energy_estimate
print(f"{name}\t{p_estimate:,d}")
YaP 706,860,000 CeBr-A 5,459,610,000 CeBr-B 2,209,080,000 NaI 2,075,788,000 LYSO 392,340,000