this notebook loads data acquired with a segmented silicon detector
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mp
import copy
import re
import holoviews as hv
from holoviews import opts
hv.extension('bokeh')
#shot_no = 37021
shot_no = 47292
/opt/anaconda3/lib/python3.8/site-packages/pandas/core/computation/expressions.py:20: UserWarning: Pandas requires version '2.7.3' or newer of 'numexpr' (version '2.7.1' currently installed). from pandas.core.computation.check import NUMEXPR_INSTALLED WARNING:param.Parameterized: Use method 'warning' via param namespace WARNING:param.main: pandas could not register all extension types imports failed with the following error: cannot import name 'ABCIndexClass' from 'pandas.core.dtypes.generic' (/opt/anaconda3/lib/python3.8/site-packages/pandas/core/dtypes/generic.py)
Download logfile and time of plasma start and plasma end
log_URL = "http://golem.fjfi.cvut.cz/shots/{shot_no}/Diagnostics/StripDetector/PH32Det{det}_{shot_no}.log"
plasma_URL = "http://golem.fjfi.cvut.cz/shots/{}/Diagnostics/PlasmaDetection/Results/{}"
ds = np.DataSource(None)
# get logs from web
logfile0 = ds.open(log_URL.format(shot_no=shot_no, det=0))
logfile1 = ds.open(log_URL.format(shot_no=shot_no, det=1))
# get logs from local storage
#logfile0 = ds.open(f"PH32Det0_{shot_no}.log")
#logfile1 = ds.open(f"PH32Det1_{shot_no}.log")
try:
t_plasma_start = ds.open(plasma_URL.format(shot_no, "t_plasma_start"))
t_plasma_start = np.loadtxt(t_plasma_start)
t_plasma_end = ds.open(plasma_URL.format(shot_no, "t_plasma_end"))
t_plasma_end = np.loadtxt(t_plasma_end)
except OSError:
t_plasma_start = -1.0
t_plasma_end = -1.0
Communication between rPi (or any other computer) and PH32 readout chip is achieved with SURE (Simple USB Readout Equipment).
SURE uses NMEA 0183 protocol for communication. Each message for configuring PH32 is acknowledged. These 'echos' are saved in the log file.
For example $ACKNL,SEATM -- acquisition time set to 100us*1b
is response to $SEATM,100
command which sets acquisition time.
$STARB,[wait-for-trigger],[delay]
sets detector for measurement and is fololowed by 200 frames of measured data eg.:
$FRAME,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0*5d
From log file we extract
acquisition mode
frames
def parseLog(logfile):
frames = []
# defaults
acc_time = 0.15 #ms
delay = 0.10 #ms
start_delay = 0.50 #ms
acc_mode = ['hit count'] * 32
# read logfile
for line in logfile:
regex_seatm = re.match("\$ACKNL,SEATM -- acquisition time set to (\d+)us", line)
if regex_seatm:
acc_time = regex_seatm.group(1)
acc_time = int(acc_time)/1000.0 ## ms
regex_delay = re.match("\$ACKNL,DELAY -- delay set to (\d+)us", line)
if regex_delay:
delay = regex_delay.group(1)
delay = int(delay)/1000.0 ## ms
regex_selo1= re.match(
"\$ACKNL,SELO1 -- local configuration for channel (\d+) was set to value (\d+)", line)
if regex_selo1:
channel = int(regex_selo1.group(1))
if (int(regex_selo1.group(2)) & 0x60) == 0x60 :
acc_mode[channel] = "ToA"
elif (int(regex_selo1.group(2)) & 0x20) == 0x20:
acc_mode[channel] = "ToT-First"
elif (int(regex_selo1.group(2)) & 0x40) == 0x40:
acc_mode[channel] = "ToT"
else:
acc_mode[channel] = "hit count"
regex_startb = re.match("\$STARB,(\d+),(\d+)", line)
if regex_startb:
start_delay = regex_startb.group(2)
start_delay = int(start_delay)/1000.0 ## ms
regex_frame = re.match("\$FRAME,((?:\d+,)+)(\d+)\*[0-9a-f]{2}", line)
if regex_frame:
line = np.fromstring(''.join(regex_frame.groups()), sep=',')
### saturation or error in measeurement or chanel is disabled
line[line == 65535] = np.nan
frames.append(line)
frames = np.array(frames)
time_btw_frames = acc_time + delay
return {'start_delay':start_delay,
'acc_time':acc_time,
'delay':delay,
'acc_mode':acc_mode,
'time_btw_frames': time_btw_frames,
'frames':frames}
detector0 = parseLog(logfile0)
detector1 = parseLog(logfile1)
framesCombined = np.concatenate((detector0["frames"].T,detector1["frames"].T))
plt.pcolormesh(framesCombined)
plt.axhline(32,color = 'k')
plt.axis('off')
plt.savefig('rawdata.png')
The order of channels on the PH32 chip does not correspond to the order of strips on the sensor; we need to rearrange them.
Strip n. 0 is on channel 15, strip n. 1 on channel 16, ...
channelOrder = [15,16,14,17,13,18,12,19,11,20,10,21,9,22,8,23,24,7,25,6,26,5,27,4,28,3,29,2,30,1,31,0]
def rearrangeFrames(frames):
permutation_matrix = np.zeros((len(channelOrder), len(channelOrder)))
for idx, i in enumerate(channelOrder):
permutation_matrix[i, idx] = 1
frames = np.dot(frames, permutation_matrix)
return frames
## rearange acc_mode
def rearrangeModes(acc_mode):
new_acc_mode = [''] *32
for new, old in enumerate(channelOrder):
new_acc_mode[new] = acc_mode[old]
acc_mode = new_acc_mode
return acc_mode
for detector in [detector0, detector1]:
detector['frames'] = rearrangeFrames(detector['frames'])
detector['acc_mode'] = rearrangeModes(detector['acc_mode'])
def addMasks(detector):
acc_mode = detector['acc_mode']
detector["mask_hit"] = np.array([ch == 'hit count' for ch in acc_mode])
detector["mask_ToT"] = np.array([ch == 'ToT' for ch in acc_mode])
detector["mask_fToT"] = np.array([ch == 'ToT-First' for ch in acc_mode])
detector["mask_ToA"] = np.array([ch == 'ToA' for ch in acc_mode])
# test if list (mask) consists of same values
def all_eq(iterable, val):
return all([i == val for i in iterable])
[addMasks(detector) for detector in [detector0, detector1]];
[None, None]
For energy measeuremets convert number of clock cycles to deposited energy
energy_calib0 = 115.26 #keV/clock
energy_calib1 = 79.82 #keV/clock
def applyECalib(detector, calib):
mask_energy = detector["mask_ToT"] & detector["mask_fToT"]
detector["frames"][:,mask_energy] *= calib
applyECalib(detector0, energy_calib0)
applyECalib(detector1, energy_calib1)
Convert absolute ToA values to relative
from each frame subtract minimal ToA -> compare ToA between channels
def shiftToA(detector):
frames = detector['frames']
mask_ToA = detector["mask_ToA"]
if not all_eq(mask_ToA, False):
detector["frames"][:,mask_ToA] -= np.nanmin(frames[:,mask_ToA],axis = 1)[:, np.newaxis]
About half of the frames are usually empty because the detector records after plasma end.
Plot only when plasma was present
def mplPlot(ax, frames, acc_mode, start_delay, time_btw_frames, acc_time, **kwargs):
y_tics = np.arange(0,32,1)
t_tics = np.arange(0, frames.shape[0]*time_btw_frames, time_btw_frames)
t_tics += start_delay
cmap = copy.copy(mp.cm.get_cmap("viridis"))
# bad values in grey
cmap.set_under("grey")
im = ax.pcolormesh(t_tics, y_tics, frames.T, vmin = 0, vmax = np.amax(frames[5:80]),
cmap=cmap, shading = 'auto')
ax.set_ylabel("strip n.")
ax.set_xlabel("time [ms]")
if t_plasma_start != -1.0 and t_plasma_start != -1.0:
ax.set_xlim(start_delay, t_plasma_end + 2)
ax.axvline(t_plasma_start, color = "white", dashes=[1, 2], label='plasma start')
ax.axvline(t_plasma_end, color = "white", dashes=[1, 2], label='plasma end')
else:
ax.set_xlim(start_delay, 30.0)
cbar = fig.colorbar(im, ax=ax)
if all_eq(acc_mode, "hit count"):
cbar.set_label(acc_mode)
elif all_eq(acc_mode, "ToT"):
cbar.set_label("Deposited energy [keV/{}ms]".format(acc_time))
elif all_eq(acc_mode,"first hit"):
cbar.set_label("First hit energy [kev]")
else:
cbar.set_label("mix")
fig, axes = plt.subplots(nrows=2, figsize = (8,10))
[mplPlot(ax, **detector) for ax, detector in zip(axes, [detector0, detector1])]
fig.savefig('icon-fig.png')
def hvCurvePlot(frames, acc_mode, start_delay, time_btw_frames, mask_hit,
mask_ToT, mask_fToT, mask_ToA, **kwargs):
t_tics = np.arange(0, frames.shape[0]*time_btw_frames, time_btw_frames)
t_tics += start_delay
hit_curves = [hv.Curve((t_tics,ch), 't [ms]', 'hit count') for ch in frames[:,mask_hit].T]
tot_curves = [hv.Curve((t_tics,ch), 't [ms]', 'E_dep [keV]') for ch in frames[:,mask_ToT].T]
first_hit_curves = [hv.Curve((t_tics,ch), 't [ms]', 'E_first [keV]') for ch in frames[:,mask_fToT].T]
toa_curves = [hv.Curve((t_tics,ch), 't [ms]', '\Delta t') for ch in frames[:,mask_ToA].T]
hit_overlay = hv.Overlay(hit_curves) if hit_curves else hv.Empty()
tot_overlay = hv.Overlay(tot_curves) if tot_curves else hv.Empty()
first_hit_overlay = hv.Overlay(first_hit_curves) if first_hit_curves else hv.Empty()
toa_overlay = hv.Overlay(toa_curves) if toa_curves else hv.Empty()
curve_plot = hv.Layout(hit_overlay + tot_overlay + first_hit_overlay + toa_overlay).cols(1)
x_max = t_plasma_end + 2 if t_plasma_end != -1.0 else 30.0
curve_plot.opts(opts.Overlay(height=200, width=450, xlim=(start_delay, x_max)))
return curve_plot
hv.Layout(hvCurvePlot(**detector0) + hvCurvePlot(**detector1)).opts(transpose=True)
def hvMeshPlot(frames, acc_mode, start_delay, time_btw_frames, mask_hit,
mask_ToT, mask_fToT, mask_ToA, **kwargs):
t_tics = np.arange(0, frames.shape[0]*time_btw_frames, time_btw_frames)
t_tics += start_delay
masks = [mask_hit, mask_ToT, mask_fToT, mask_ToA]
titles = ['hit count', 'E_dep', 'E_first-dep', '\Delta t' ]
meshes = [hv.Image(frames[:,mask].T, bounds=(t_tics[0],0,t_tics[-1],np.sum(mask))).opts(
title = title, ylabel='channel', xlabel='t [ms]')
if frames[:,mask].size else hv.Empty() for mask, title in zip(masks,titles)]
mesh_plot = hv.Layout(meshes).cols(1)
x_max = t_plasma_end + 2 if t_plasma_end != -1.0 else 30.0
mesh_plot.opts(opts.Image(height=200, width=450, xlim=(start_delay, x_max)))
return mesh_plot
hv.Layout(hvMeshPlot(**detector0) + hvMeshPlot(**detector1)).opts(transpose=True)