This is a quick plasma detection code from the photodiode signal. This procedure is run as soon as possible in order to provide the information on plasma existence to other routines.
(author: L. Lobko)
import numpy as np
import os
from scipy import signal as sigproc
shot_no = 44486
def update_db_current_shot(field_name, value):
os.system('export PGPASSWORD=`cat /golem/production/psql_password`;psql -c "UPDATE operation.discharges SET '+field_name+'='+str(value)+'WHERE shot_no IN(SELECT max(shot_no) FROM operation.discharges)" -q -U golem golem_database')
os.system('export PGPASSWORD=`cat /golem/production/psql_password`;psql -c "UPDATE diagnostics.basicdiagnostics SET '+field_name+'='+str(value)+'WHERE shot_no IN(SELECT max(shot_no) FROM diagnostics.basicdiagnostics)" -q -U golem golem_database')
os.makedirs('Results', exist_ok=True)
def save_scalar(phys_quant, value, format_str='%.3f'):
with open("Results/"+phys_quant, 'w') as f:
f.write(format_str % value)
update_db_current_shot(phys_quant,value)
ds = np.DataSource('/tmp') # temporary storage for downloaded files
data_URL = 'http://golem.fjfi.cvut.cz/shots/{shot_no}/Diagnostics/PlasmaDetection/U_LeybPhot.csv'
Load data
def load_data(shot_no):
fname = ds.open(data_URL.format(shot_no=shot_no)).name
data = np.loadtxt(fname, delimiter=',')
data[:, 0] = data[:, 0] * 1e3 # from micros to ms
return data
U_photodiode = load_data(shot_no)
Remove offset
def offset_remove(data):
x_size, y_size = data.shape
data_for_offset = data[0:int(x_size/100)]
offset = np.mean(data_for_offset[:, 1])
data[:, 1] -= offset
return data
U_photodiode = offset_remove(U_photodiode)
Smooth signal
def smooth(y, box_pts):
box = np.ones(box_pts) / box_pts
y_smooth = np.convolve(y, box, mode='same')
return y_smooth
U_photodiode[:, 1] = smooth(U_photodiode[:, 1], 100)
def find_peaks(data):
peaks_indexes, _ = sigproc.find_peaks(data[:, 1], prominence=1e-5)
return np.vstack((data[peaks_indexes, 0], data[peaks_indexes, 1])).T
Calculate plasma boundaries from signal derivation
def calc_plasma_boundaries(data, position):
deriv = data.copy()
deriv[:, 1] = np.gradient(data[:, 1])
deriv[:, 1] = smooth(deriv[:, 1], 200)
if position == 'start':
index = np.where(deriv[:, 1] >= np.max(deriv[:, 1])/5)
deriv = deriv[index]
return deriv[0, 0]
else:
deriv = np.abs(deriv)
peaks = find_peaks(deriv)
return peaks[-1, 0]
if np.max(U_photodiode[:, 1]) < 0.01:
print('No plasma in vacuum chamber.')
t_plasma_start = -1.0
t_plasma_end = -1.0
else:
t_plasma_start = calc_plasma_boundaries(U_photodiode, 'start')
print('Plasma starts at {:.2f} ms.'.format(t_plasma_start))
t_plasma_end = calc_plasma_boundaries(U_photodiode, 'end')
print('Plasma ends at {:.2f} ms.'.format(t_plasma_end))
No plasma in vacuum chamber.
b_plasma = int(t_plasma_start > 0 and t_plasma_end > 0)
if b_plasma:
t_plasma_duration = t_plasma_end - t_plasma_start
print('Plasma duration is {:.2f} ms.'.format(t_plasma_duration))
else:
t_plasma_duration = -1.0 # convention instead of nan
Save data
save_scalar("b_plasma", b_plasma)
save_scalar("t_plasma_start", t_plasma_start)
save_scalar("t_plasma_end", t_plasma_end)
save_scalar("t_plasma_duration", t_plasma_duration)