import os
import numpy as np
from numba import vectorize, float64, boolean
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import requests
import io
import holoviews as hv
hv.extension('bokeh',logo = False)
from IPython.display import Markdown
shot_no = 39391
# try load data locally
if os.path.isfile('DAS_raw_data_dir/data_all_fullres.npz'):
full_res_data = np.load('DAS_raw_data_dir/data_all_fullres.npz')
else:
try:
# or download from web (slow)
response = requests.get(f'http://golem.fjfi.cvut.cz/shots/{shot_no}/Diagnostics/HXRprobes/DAS_raw_data_dir/data_all_fullres.npz')
response.raise_for_status()
full_res_data = np.load(io.BytesIO(response.content))
except ConnectionError:
err = Markdown('### Can not download data')
# unpack npz file
time = full_res_data['time']
LYSO = full_res_data['ch6']
HXR = LYSO
MSamples = 1e-6/(time[1]-time[0])
if MSamples >= 120:
out = Markdown(f'### Data with {MSamples:.1f}MS/s resolution')
exit_notebook = False
else:
out = Markdown('### Low resolution for peak recononstruction \n adjust oscilloscope settings')
exit_notebook = True
out
if exit_notebook:
raise UserWarning('Exiting')
def get_noise_level(t,y_vals,noise_time):
y_noise = y_vals[t < noise_time]
y_noise = np.nan_to_num(y_noise, posinf = 0, neginf = 0)
mean_y = np.mean(y_noise)
# mean noise amplitude
abs_noise = np.abs(y_noise - mean_y)**2
mean_noise_ampl = np.sqrt(np.mean(abs_noise))
return mean_y, mean_noise_ampl
@vectorize(['float64(float64, boolean)'])
def half_exp(x, where):
'''compute exp function only for selected indices (to avoid infinite/large values)'''
if where:
return np.exp(x)
else:
return 0.0
def get_index(oneDArray, val):
diff = np.abs(oneDArray - val)
return np.argmin(diff)
def exp_decay(t, height, center, rise, fall):
'''smooth peak fucntion from https://arxiv.org/pdf/1706.01211.pdf eq:(2)'''
# bool mask where compute exp functions (without it results in +-inf values)
exp_window_split_index = get_index(t, center-100*rise)
exp_window = np.zeros_like(t, np.bool_)
exp_window[exp_window_split_index:].fill(True)
# scaling and shifting of peak fucntion
A = height / ( (fall-rise)/fall * (fall/rise - 1 )**(-rise/fall) )
peak_center_shift = rise*np.log((fall-rise)/rise)
center = center - peak_center_shift
peak = A * 1/(1+half_exp(-(t-center)/rise, exp_window)) * half_exp(-(t-center)/fall, exp_window)
return peak
#%matplotlib notebook
plt.plot(time,HXR);
plt.axhline(1.)
<matplotlib.lines.Line2D at 0x7f29f00eed60>
def gen_peaks(time, HXR_y, peaks_loc, rise_time, fall_time, test_diff = True):
corrected_peak_heights = np.zeros_like(peaks_loc, float)
HXR_generated = np.zeros_like(HXR_y)
diff = HXR_y.copy()
discarded_peaks = np.zeros_like(peaks_loc, np.bool_)
# operate only on small slice of waveform (it's faster)
dt = np.mean(np.diff(time))
i_slice = max(int(fall_time*100 / dt),1000)
for n, peak in enumerate(peaks_loc):
# use differece between generated and original waveform to obtain peak height
peak_height = diff[peak]
peak_center = time[peak]
gen_slice_time = time[peak-i_slice:peak+i_slice]
if peak_height < 0.:
corrected_peak_heights[n] = 0.
continue
# generate peak
generated_peak = exp_decay(gen_slice_time, peak_height, peak_center, rise_time, fall_time)
# if adding this peak increases differance btw original and generated signal
# discard this peak as false detection
diff_test = diff[peak-i_slice:peak+i_slice].copy()
diff_test -= generated_peak
if test_diff and np.sum(np.abs(diff_test)) > np.sum(np.abs(diff[peak-i_slice:peak+i_slice])):
corrected_peak_heights[n] = 0.
discarded_peaks[n] = True
else:
# subtract peak from signal and record peak height
diff[peak-i_slice:peak+i_slice] -= generated_peak
HXR_generated[peak-i_slice:peak+i_slice] += generated_peak
corrected_peak_heights[n] = peak_height
return corrected_peak_heights, HXR_generated, diff, discarded_peaks
## parameters for peak detection
mean_noise_threshold_mult = 7
peak_prominence_thr = 0.001
# peak rise / fall time
rise_time, fall_time = 6.159e-09, 4.591e-08
noise, mean_noise = get_noise_level(time, HXR, 0.003)
HXR_y = HXR - noise
thresh = mean_noise*mean_noise_threshold_mult
peaks_loc, _ = find_peaks(HXR_y, height=thresh, prominence = peak_prominence_thr)
corrected_peak_heights, HXR_gen, HXR_diff,_ = gen_peaks(time, HXR_y, peaks_loc, rise_time, fall_time)
fig, ax = plt.subplots()
ax.plot(time, HXR_gen, label = 'generated')
ax.plot(time, HXR_y, label = 'original',alpha=0.5)
ax.plot(time, HXR_diff, label = 'difference')
ax.grid()
fig.legend()
peak_heights = HXR_y[peaks_loc]
fig, ax = plt.subplots()
ax.hist(peak_heights, bins = 100, label = 'raw', histtype = 'step')
ax.hist(corrected_peak_heights, bins = 100, label = 'corrected', histtype = 'step')
ax.legend()
ax.grid()
ax.set_title('peak height spectrum')
ax.semilogy()
corrected_peak_heights_simple = corrected_peak_heights
peaks_loc_simple = peaks_loc
<ipython-input-1-8b9b47ea4a9e>:26: RuntimeWarning: overflow encountered in half_exp peak = A * 1/(1+half_exp(-(t-center)/rise, exp_window)) * half_exp(-(t-center)/fall, exp_window)
for pass_no in range(3):
peaks_locNew, _ = find_peaks(HXR_diff, height=thresh*2, prominence = peak_prominence_thr*3)
if peaks_locNew.size == 0:
continue
_, _, HXR_diff,_ = gen_peaks(time, HXR_diff, peaks_locNew, rise_time, fall_time, test_diff = False)
peaks_loc = np.concatenate((peaks_loc,peaks_locNew))
<ipython-input-1-8b9b47ea4a9e>:26: RuntimeWarning: overflow encountered in half_exp peak = A * 1/(1+half_exp(-(t-center)/rise, exp_window)) * half_exp(-(t-center)/fall, exp_window)
peaks_loc = np.sort(peaks_loc)
corrected_peak_heights, HXR_gen, HXR_diff, discarded = gen_peaks(time, HXR_y, peaks_loc,
rise_time, fall_time, test_diff = True)
fig, ax = plt.subplots()
ax.plot(time, HXR_gen, label = 'generated')
ax.plot(time, HXR_y, label = 'original',alpha=0.5)
ax.plot(time, HXR_diff, label = 'difference')
ax.grid()
fig.legend()
peak_heights = HXR_y[peaks_loc]
fig, ax = plt.subplots()
ax.hist(peak_heights, bins = 100, label = 'raw', histtype = 'step')
ax.hist(corrected_peak_heights, bins = 100, label = 'corrected', histtype = 'step')
ax.set_title('peak height spectrum')
ax.grid()
ax.legend()
ax.semilogy();
<ipython-input-1-8b9b47ea4a9e>:26: RuntimeWarning: overflow encountered in half_exp peak = A * 1/(1+half_exp(-(t-center)/rise, exp_window)) * half_exp(-(t-center)/fall, exp_window)
[]
mask_discarded = np.logical_and(discarded, corrected_peak_heights==0.)
peak_heights = peak_heights[~mask_discarded]
peaks_loc = peaks_loc[~mask_discarded]
corrected_peak_heights = corrected_peak_heights[~mask_discarded]
#calib = 1291 # keV/V @ 0.5V overvoltage
calib = 3365 # keV/V @ 0.3V overvoltage
original_spectrum = peak_heights * calib
corrected_spectrum = corrected_peak_heights * calib
fig, ax = plt.subplots()
ax.hist(original_spectrum, bins = 100, label = 'raw', histtype = 'step')
ax.hist(corrected_spectrum, bins = 100, label = 'corrected', histtype = 'step')
fig.legend(frameon =True, fancybox=True,framealpha=1.)
ax.semilogy()
ax.grid()
ax.set_xlabel('$E$ [keV]')
ax.set_ylabel('N [-]');
ax.set_title('HXR spectrum from LYSO')
plt.savefig('icon-fig.png')
# decimated plot of waveform
dec = int(time.size/MSamples)//25
HXR_plot = hv.Curve((time[::dec]*1000, LYSO[::dec]), 't [ms]', 'V').opts(width = 600, height = 150)
# parameters for sliders
windows = [1.,5.]
time_range = np.arange(0, time[-1]*1000,1)
hists_hmap = hv.HoloMap(kdims=['Time','Window'])
span_hmap = hv.HoloMap(kdims=['Time','Window'])
for window in windows:
for time_sel in time_range:
window /=1000 # to [s]
time_sel/=1000
peak_time = time[peaks_loc]
left_margin, right_margin = time_sel - window/2, time_sel + window/2
mask_peaks = (peak_time > left_margin) & (peak_time < right_margin)
# histograms for selected time slice
org = original_spectrum[mask_peaks]
corr = corrected_spectrum[mask_peaks]
frequencies, edges = np.histogram(org, 50)
h_org = hv.Curve((edges, frequencies), 'E [keV]', 'N [-]', label = 'raw')
frequencies, edges = np.histogram(corr,50)
h_corr = hv.Curve((edges, frequencies), 'E [keV]', 'N [-]', label = 'corrected')
hist_plot = h_org.opts(logy=True) * h_corr.opts(logy=True)
hist_plot = hist_plot.opts(ylim = (1,None), xlim = (0,None), width = 600)
window *=1000 # to [ms]
time_sel*=1000
hists_hmap[time_sel,window] = hist_plot
span_hmap[time_sel,window] = hv.VSpan(left_margin*1e3,right_margin*1e3)
hv.Layout((HXR_plot*span_hmap)+ hists_hmap).cols(1).opts(title = '')