import os
from dataclasses import dataclass
import numpy as np
import pandas as pd
from numba import vectorize, float64, boolean
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import holoviews as hv
from IPython.display import Markdown
from holoviews.operation import decimate
#from holoviews.operation.datashader import rasterize
hv.extension('bokeh', logo = False)
# turn off numpy warning for overflow in np.exp()
np.seterr(over='ignore');
{'divide': 'warn', 'over': 'warn', 'under': 'ignore', 'invalid': 'warn'}
shot_no = 42181
# load data locally
if os.path.isfile('DAS_raw_data_dir/data_all_fullres.npz'):
full_res_data = np.load('DAS_raw_data_dir/data_all_fullres.npz')
else:
try:
# or download from web (slow)
ds = np.DataSource()
file = ds.open(f'http://golem.fjfi.cvut.cz/shots/{shot_no}/Diagnostics/ScintillationProbes/TunklSFP23/DAS_raw_data_dir/data_all_fullres.npz',mode = 'rb')
full_res_data = np.load(file)
except ConnectionError:
err = Markdown('### Can not download data')
Check if data has sufficient sample rate
time = full_res_data['time']
dt = time[1]-time[0]
MSamples = 1e-6/(dt)
downsample = int(np.rint(float(1e-6) / dt))
if MSamples >= 120:
out = Markdown(f'### Data with {MSamples:.1f}MS/s resolution')
exit_notebook = False
else:
out = Markdown('### Low resolution for peak recononstruction \n adjust oscilloscope settings')
exit_notebook = True
out
if exit_notebook:
raise UserWarning('Exiting')
# class for storing channels and their parameters and results
@dataclass
class channel:
name : str
time : np.ndarray
y : np.ndarray
noPlot : bool = False
## parameters for peak detection
mean_noise_threshold_mult : float = 3.5
peak_prominence_thr : float = 0.001
# peak rise / fall time
rise_time : float = None
fall_time : float = None
# calibration
calibration : float = None # keV/V
# Flip waveform of PMTs but no SiPMs
flip_waveform : bool = True
## parameters recieved from further analysis
mean_channel_noise : float = 0
# threshold
threshold : float = 0
# arrays for results
no_corr_peak_height : np.ndarray = None
simple_peak_loc : np.ndarray = None
simple_corr_peak_height : np.ndarray = None
multipass_peak_loc : np.ndarray = None
multipass_corr_peak_height : np.ndarray = None
y_diff : np.ndarray = None
# set channel and their parameters
ch2 = channel('YAP', time, full_res_data['ch2'], rise_time=2.740e-09, fall_time=3.460e-08, calibration=46747.)
ch3 = channel('CeBr-a', time, full_res_data['ch3'], rise_time=1.457e-09, fall_time=2.754e-08, calibration=13817.)
ch4 = channel('CeBr-b', time, full_res_data['ch4'], rise_time=3.409e-09, fall_time=2.785e-08, calibration=6714.)
ch5 = channel('NaI', time, full_res_data['ch5'])
ch6 = channel('LYSO', time, full_res_data['ch6'], rise_time=6.309e-09, fall_time=5.311e-08, calibration=8065., flip_waveform=False)
channels = [ch2,ch3,ch4,ch5,ch6]
def get_noise_level(t,y_vals,noise_time):
y_noise = y_vals[t < noise_time]
y_noise = np.nan_to_num(y_noise, posinf = 0, neginf = 0)
mean_y = np.mean(y_noise)
# mean noise amplitude
abs_noise = np.abs(y_noise - mean_y)**2
mean_noise_ampl = np.sqrt(np.mean(abs_noise))
return mean_y, mean_noise_ampl
def moving_average(x, w):
return np.convolve(x, np.ones(w), 'save') / w
@vectorize(['float64(float64, boolean)'])
def half_exp(x, where):
'''compute exp function only for selected indices (to avoid infinite/large values)'''
if where:
return np.exp(x)
else:
return 0.0
def get_index(oneDArray, val):
diff = np.abs(oneDArray - val)
return np.argmin(diff)
def exp_decay(t, height, center, rise, fall):
'''smooth peak fucntion from https://arxiv.org/pdf/1706.01211.pdf eq:(2)'''
# bool mask where compute exp functions (without it results in +-inf values)
exp_window_split_index = get_index(t, center-100*rise)
exp_window = np.zeros_like(t, np.bool_)
exp_window[exp_window_split_index:].fill(True)
# scaling and shifting of peak fucntion
A = height / ( (fall-rise)/fall * (fall/rise - 1 )**(-rise/fall) )
peak_center_shift = rise*np.log((fall-rise)/rise)
center = center - peak_center_shift
peak = A * 1/(1+half_exp(-(t-center)/rise, exp_window)) * half_exp(-(t-center)/fall, exp_window)
return peak
color_cycle = plt.rcParams['axes.prop_cycle'].by_key()['color']
fig, axes = plt.subplots(len(channels), sharex =True)
[ax.plot(ch.time,ch.y, label = ch.name, color = c) for ax, ch, c in zip(axes, channels, color_cycle)]
[ax.axhline(1., color = 'red', ls = '--') for ax, ch in zip(axes, channels) if not ch.flip_waveform and np.any(ch.y > 1.)]
fig.legend()
is_clipping = [np.any(ch.y > 1.) for ch in channels if not ch.flip_waveform]
if np.any(is_clipping):
warning = Markdown("""### SiPM's integrated amplifier saturated
HXR spectrum reconstruction from whole discharge may be unreliable
""")
display(warning)
def gen_peaks(time, HXR_y, peaks_loc, rise_time, fall_time, test_diff = True):
corrected_peak_heights = np.zeros_like(peaks_loc, float)
HXR_generated = np.zeros_like(HXR_y)
diff = HXR_y.copy()
discarded_peaks = np.zeros_like(peaks_loc, np.bool_)
# operate only on small slice of waveform (it's faster)
dt = np.mean(np.diff(time))
i_slice = max(int(fall_time*100 / dt),1000)
for n, peak in enumerate(peaks_loc):
# use differece between generated and original waveform to obtain peak height
peak_height = diff[peak]
peak_center = time[peak]
slice_idx_left = max(peak-i_slice,0)
slice_idx_right = min(peak+i_slice,time.shape[0])
gen_slice_time = time[slice_idx_left:slice_idx_right]
if peak_height < 0.:
corrected_peak_heights[n] = 0.
continue
# generate peak
generated_peak = exp_decay(gen_slice_time, peak_height, peak_center, rise_time, fall_time)
# if adding this peak increases differance btw original and generated signal
# discard this peak as false detection
diff_test = diff[slice_idx_left:slice_idx_right].copy()
diff_test -= generated_peak
if test_diff and np.sum(np.abs(diff_test)) > np.sum(np.abs(diff[slice_idx_left:slice_idx_right])):
corrected_peak_heights[n] = 0.
discarded_peaks[n] = True
else:
# subtract peak from signal and record peak height
diff[slice_idx_left:slice_idx_right] -= generated_peak
HXR_generated[slice_idx_left:slice_idx_right] += generated_peak
corrected_peak_heights[n] = peak_height
return corrected_peak_heights, HXR_generated, diff, discarded_peaks
for ch in channels:
noise, ch.mean_channel_noise = get_noise_level(ch.time, ch.y, 0.000)
ch.y = ch.y - noise
ch.threshold = ch.mean_channel_noise*ch.mean_noise_threshold_mult
if ch.flip_waveform:
ch.y *=-1
## smooth-out noise
ch.y = moving_average(ch.y, 5)
#%matplotlib widget
fig, axes = plt.subplots(len(channels), sharex =True)
for ax,ch,color in zip(axes,channels, color_cycle):
ax.plot(ch.time, ch.y, label = ch.name, color=color)
ax.axhline(ch.mean_channel_noise,ls= '-.', c = 'k')
ax.axhline(ch.threshold,ls= '--', c = 'k')
ax.set_ylim(-2*ch.threshold, 2*ch.threshold)
ax.grid()
fig.legend();
<matplotlib.legend.Legend at 0x7f7da5f86bb0>
max_HXR_energy = 10.
for ch in channels:
peaks_loc, _ = find_peaks(ch.y, height=ch.threshold, prominence = ch.mean_channel_noise*2)
ch.simple_peak_loc = peaks_loc
ch.no_corr_peak_height = ch.y[peaks_loc]
if ch.calibration is not None:
max_HXR_energy = np.max((max_HXR_energy, ch.no_corr_peak_height.max()*ch.calibration))
plots = list()
for ch in channels:
if ch.rise_time is None or ch.rise_time is None:
continue
ch.simple_corr_peak_height, ch_gen, ch.y_diff ,_ = gen_peaks(ch.time, ch.y, ch.simple_peak_loc, ch.rise_time, ch.fall_time)
if ch.noPlot:
continue
gen_pl = hv.Curve((time[::downsample], ch_gen[::downsample]), 't','U_%s' % ch.name, label = '%s fit' % ch.name)
org_pl = hv.Curve((time[::downsample], ch.y[::downsample]), 't','U_%s' % ch.name, label = ch.name)
diff_pl = hv.Curve((time[::downsample], ch.y_diff[::downsample]), 't','U_%s' % ch.name, label = '%s difference' % ch.name)
peaks_pl = hv.ErrorBars((ch.time[ch.simple_peak_loc],ch.y[ch.simple_peak_loc],ch.simple_corr_peak_height,0),
vdims=['U_%s' % ch.name, 'yerrneg', 'yerrpos']).opts(upper_head=None, lower_head=None)
plot = org_pl * gen_pl * diff_pl * peaks_pl
plot.opts(width=950, height = 200).relabel(ch.name)
plots.append(plot)
hv.Layout(plots).cols(1)
fig,(ax,ax2) = plt.subplots(2, sharex = True, sharey=True)
for ch, color in zip(channels, color_cycle):
ax.hist(ch.no_corr_peak_height, bins = 100, label = '%s' % ch.name, histtype = 'step', color = color)
if ch.rise_time is None or ch.rise_time is None:
continue
ax2.hist(ch.simple_corr_peak_height, bins = 100, histtype = 'step', color = color)
ax.grid()
ax2.grid()
ax.semilogy()
ax2.semilogy()
fig.legend();
<matplotlib.legend.Legend at 0x7f7da5e05be0>
for ch in channels:
if ch.rise_time is None or ch.rise_time is None:
continue
peaks_loc = ch.simple_peak_loc
for pass_no in range(3):
peaks_locNew, _ = find_peaks(ch.y_diff, height=ch.threshold, prominence = ch.mean_channel_noise*2)
if peaks_locNew.size == 0:
continue
_, _, ch.y_diff,_ = gen_peaks(time, ch.y_diff, peaks_locNew, ch.rise_time, ch.fall_time, test_diff = False)
peaks_loc = np.concatenate((peaks_loc,peaks_locNew))
peaks_loc = np.sort(peaks_loc)
ch.multipass_peak_loc = peaks_loc
plots = list()
for ch in channels:
if ch.rise_time is None or ch.rise_time is None:
continue
peaks_loc = ch.multipass_peak_loc
corrected_peak_heights, ch_gen, ch.y_diff, discarded = gen_peaks(ch.time, ch.y, peaks_loc,
ch.rise_time, ch.fall_time, test_diff = True)
### Discard false positives, peaks below threshold and saturated values
saturated = ch.y[peaks_loc]>0.9
mask_discarded = np.logical_or(discarded, corrected_peak_heights<ch.threshold, saturated)
corrected_peak_heights = corrected_peak_heights[~mask_discarded]
peaks_loc = peaks_loc[~mask_discarded]
ch.multipass_peak_loc = peaks_loc
ch.multipass_corr_peak_height = corrected_peak_heights
if ch.noPlot:
continue
gen_pl = hv.Curve((time[::downsample], ch_gen[::downsample]), 't','U_%s' % ch.name, label = '%s fit' % ch.name)
org_pl = hv.Curve((time[::downsample], ch.y[::downsample]), 't','U_%s' % ch.name, label = ch.name)
diff_pl = hv.Curve((time[::downsample], ch.y_diff[::downsample]), 't','U_%s' % ch.name, label = '%s difference' % ch.name)
peaks_pl = hv.ErrorBars((ch.time[peaks_loc],ch.y[peaks_loc],corrected_peak_heights,0),
vdims=['U_%s' % ch.name, 'yerrneg', 'yerrpos']).opts(upper_head=None, lower_head=None)
plot = org_pl * gen_pl * diff_pl * peaks_pl
plot.opts(width=950, height = 200).relabel(ch.name)
plots.append(plot)
hv.Layout(plots).cols(1)
fig, ax = plt.subplots()
for ch,color in zip(channels,color_cycle):
if ch.noPlot or ch.calibration is None:
continue
_, bins, _ = ax.hist(ch.no_corr_peak_height*ch.calibration, bins = 100, label = ch.name,
range = (0,max_HXR_energy), histtype = 'step', ls = '--', color = color)
if ch.rise_time is None or ch.rise_time is None:
continue
h1, b1 = np.histogram(ch.simple_corr_peak_height*ch.calibration, bins = len(bins)-1, range = (bins[0], bins[-1]))
h2, b2 = np.histogram(ch.multipass_corr_peak_height*ch.calibration, bins = len(bins)-1, range = (bins[0], bins[-1]))
h = (h1+h2)/2
h_p = np.maximum(h1,h2)-h
h_m = h-np.minimum(h1,h2)
ax.errorbar(bins[:-1], h, yerr = (h_m, h_p), color = color, ls = '')
fig.legend(frameon =True, fancybox=True,framealpha=1.)
ax.semilogy()
ax.grid()
ax.set_xlabel('$E$ [keV]')
ax.set_ylabel('N [-]')
plt.savefig('icon-fig.png')
waveform_plots = list()
for ch,color in zip(channels,color_cycle):
if ch.noPlot or ch.calibration is None:
continue
w_plot = hv.Curve((ch.time[::downsample]*1000, ch.y[::downsample]), 't [ms]', 'U [V]', label = ch.name)
w_plot.opts(width = 600, height = 150)
waveform_plots.append(w_plot)
# parameters for sliders
window = .002 # 1ms
time_range = np.arange(0, time[-1]*1000,1)
hists_hmap = hv.HoloMap(kdims=['Time'])
span_hmap = hv.HoloMap(kdims=['Time'])
for time_sel in time_range:
time_sel/=1000
hist_plots = list()
left_margin, right_margin = time_sel - window/2, time_sel + window/2
for ch in channels:
if ch.rise_time is None or ch.rise_time is None or ch.calibration is None or ch.noPlot:
continue
peak_time = ch.time[ch.multipass_peak_loc]
mask_peaks = (peak_time > left_margin) & (peak_time < right_margin)
# histograms for selected time slice
peak_heights = ch.multipass_corr_peak_height[mask_peaks]
frequencies, edges = np.histogram(peak_heights*ch.calibration, 100, range = (0,max_HXR_energy))
h_plot = hv.Curve((edges, frequencies), 'E [keV]', 'N [-]', label = ch.name)
h_plot.opts(logy=True, ylim = (1,None), xlim = (0,None), width = 600)
hist_plots.append(h_plot)
time_sel*=1000
hists_hmap[time_sel] = hv.Overlay(hist_plots)
span_hmap[time_sel] = hv.VSpan(left_margin*1e3,right_margin*1e3)
waveform_plots = hv.Overlay(waveform_plots)
hv.Layout((waveform_plots*span_hmap)+ hists_hmap).cols(1).opts(title = '')