This is a quick plasma detection code from the plasma current signal. This procedure is run as soon as possible in order to provide the information on plasma existence to other routines.
(author: L. Lobko)
%matplotlib inline
import numpy as np
import os
from scipy import signal as sigproc
from scipy import integrate
import matplotlib.pyplot as plt
shot_no = 47564
def update_db_current_shot(field_name, value):
os.system('export PGPASSWORD=`cat /golem/production/psql_password`;psql -c "UPDATE operation.discharges SET '+field_name+'='+str(value)+'WHERE shot_no IN(SELECT max(shot_no) FROM operation.discharges)" -q -U golem golem_database')
os.system('export PGPASSWORD=`cat /golem/production/psql_password`;psql -c "UPDATE diagnostics.basicdiagnostics SET '+field_name+'='+str(value)+'WHERE shot_no IN(SELECT max(shot_no) FROM diagnostics.basicdiagnostics)" -q -U golem golem_database')
os.makedirs('Results', exist_ok=True)
def save_scalar(phys_quant, value, format_str='%.3f'):
with open("Results/"+phys_quant, 'w') as f:
f.write(format_str % value)
update_db_current_shot(phys_quant,value)
ds = np.DataSource('/tmp') # temporary storage for downloaded files
data_rog_URL = 'http://golem.fjfi.cvut.cz/shots/{shot_no}/Diagnostics/PlasmaDetection/U_RogCoil.csv'
data_ULoop_URL = 'http://golem.fjfi.cvut.cz/shots/{shot_no}/Diagnostics/PlasmaDetection/U_Loop.csv'
t_cd_URL = 'http://golem.fjfi.cvut.cz/shots/{shot_no}/Operation/Discharge/t_cd_discharge_request'
K_RogowskiCoil_URL = 'http://golem.fjfi.cvut.cz/shots/{shot_no}/Production/Parameters/SystemParameters/K_RogowskiCoil'
L_chamber_URL = 'http://golem.fjfi.cvut.cz/shots/{shot_no}/Production/Parameters/SystemParameters/L_chamber'
R_chamber_URL = 'http://golem.fjfi.cvut.cz/shots/{shot_no}/Production/Parameters/SystemParameters/R_chamber'
Load data
def load_data(shot_no, signal_URL):
fname = ds.open(signal_URL.format(shot_no=shot_no)).name
data = np.loadtxt(fname, delimiter=',')
# data[:, 0] = data[:, 0] * 1e3 # from micros to ms
return data
def load_param(shot_no, param):
data = float(ds.open(param.format(shot_no=shot_no)).read())
# data = data * 1e-3
return data
U_rogcoil = load_data(shot_no, data_rog_URL)
U_Loop = load_data(shot_no, data_ULoop_URL)
t_cd = load_param(shot_no, t_cd_URL)
K_RogowskiCoil = load_param(shot_no, K_RogowskiCoil_URL)
L_chamber = load_param(shot_no, L_chamber_URL)
R_chamber = load_param(shot_no, R_chamber_URL)
U_rogcoil[:, 1] *= -1
# fig, ax = plt.subplots()
# ax.plot(U_Loop[:, 0], U_Loop[:, 1], 'b-')
# plt.show()
def offset_remove(data):
x_size, y_size = data.shape
data_for_offset = data[0:int(x_size/100)]
offset = np.mean(data_for_offset[:, 1])
data[:, 1] -= offset
return data
U_rogcoil = offset_remove(U_rogcoil)
U_Loop = offset_remove(U_Loop)
# fig, ax = plt.subplots()
# ax.plot(U_Loop[:, 0], U_Loop[:, 1], 'b-')
# plt.show()
Integrate signal
U_integrated = np.copy(U_rogcoil)
U_integrated[:, 1] = (integrate.cumtrapz(U_rogcoil[:, 1], U_rogcoil[:, 0], initial=0))
U_integrated[:, 1] *= K_RogowskiCoil
# fig, ax = plt.subplots()
# ax.plot(U_integrated[:, 0], U_integrated[:, 1], 'b-')
# plt.show()
Calculate chamber current
# def dIch_dt(t, Ich):
# return (U_l_func(t) - R_chamber * Ich) / L_chamber
#
# dIch_dt = (U_Loop - R_chamber *)/ L_chamber
Ich = np.copy(U_Loop)
Ich[:, 1] = U_Loop[:, 1] / R_chamber
# fig, ax = plt.subplots()
# ax.plot(Ich[:, 0], Ich[:, 1], 'b-')
# plt.show()
Ip = np.copy(Ich)
Ip[:, 1] = U_integrated[:, 1] - Ich[:, 1]
# fig, ax = plt.subplots()
# ax.plot(Ip[:, 0], Ip[:, 1], 'b-')
# plt.show()
Smooth signal
def smooth(y, box_pts):
box = np.ones(box_pts) / box_pts
y_smooth = np.convolve(y, box, mode='same')
return y_smooth
Ip[:, 1] = smooth(Ip[:, 1], 200)
Ip[:, 0] = Ip[:, 0] * 1e3
# fig, ax = plt.subplots()
# ax.plot(Ip[:, 0], Ip[:, 1], 'b-')
# plt.show()
def find_peaks(data):
peaks_indexes, _ = sigproc.find_peaks(data[:, 1], prominence=1e-1)
return np.vstack((data[peaks_indexes, 0], data[peaks_indexes, 1])).T
Calculate plasma boundaries from signal derivation
def calc_plasma_boundaries(data, position):
deriv = data.copy()
deriv[:, 1] = np.gradient(data[:, 1])
deriv[:, 1] = smooth(deriv[:, 1], 1000)
if position == 'start':
index = np.where(deriv[:, 1] >= np.max(deriv[:, 1])/5)
deriv = deriv[index]
return deriv[0, 0]
else:
deriv = np.abs(deriv)
# max_time = np.max(deriv[:, 0])
# deriv = deriv[deriv[:, 0] <= (max_time-0.5)]
peaks = find_peaks(deriv)
return (peaks[-1, 0]+0.5)
Cut data before t_cd (before is no plasma)
Ip_plasma_check = Ip[Ip[:, 0] <= (t_cd/1000+5)]
Ip = Ip[Ip[:, 0] > (t_cd/1000)]
# fig, ax = plt.subplots()
# ax.plot(Ip_plasma_check[:, 0], Ip_plasma_check[:, 1], 'b-')
# plt.show()
if np.max(Ip_plasma_check[:, 1]) < 100:
print('No plasma in vacuum chamber.')
t_plasma_start = -1.0
t_plasma_end = -1.0
else:
t_plasma_start = calc_plasma_boundaries(Ip, 'start')
print('Plasma starts at {:.2f} ms.'.format(t_plasma_start))
t_plasma_end = calc_plasma_boundaries(Ip, 'end')
print('Plasma ends at {:.2f} ms.'.format(t_plasma_end))
No plasma in vacuum chamber.
b_plasma = int(t_plasma_start > 0 and t_plasma_end > 0)
if b_plasma:
t_plasma_duration = t_plasma_end - t_plasma_start
print('Plasma duration is {:.2f} ms.'.format(t_plasma_duration))
else:
t_plasma_duration = -1.0 # convention instead of nan
plasma_endpoints = [t_plasma_start, t_plasma_end]
fig, axes = plt.subplots()
axes.plot(Ip[:, 0], Ip[:, 1]/1000, label ='Ip calculated')
for x in plasma_endpoints:
plt.axvline(x=x, color='black', linestyle='--')
axes.set(xlabel='$time$ [ms]', ylabel='$I_p$ [kA]')
plt.legend()
plt.grid()
plt.show()
Save data
save_scalar("b_plasma", b_plasma)
save_scalar("t_plasma_start", t_plasma_start)
save_scalar("t_plasma_end", t_plasma_end)
save_scalar("t_plasma_duration", t_plasma_duration)