Adapted version of InnerStabilisation.ipynb by Daniela Kropáčková, Honza Stöckel and Vojtěch Svoboda.
This notebook loads Mirnov coil data and calculates the plasma column position from them. It also calculates the plasma column position from the fast camera data and compares the two diagnostics.
import os
#delete files from the previous discharge
if os.path.exists('InnerStabilization.html'):
os.remove('InnerStabilization.html')
if os.path.exists('video.mp4'):
os.remove('video.mp4')
if os.path.exists('icon-fig.png'):
os.remove('icon-fig.png')
import numpy as np
import matplotlib.pyplot as plt
from scipy import integrate, signal, interpolate
import pandas as pd
import holoviews as hv
hv.extension('bokeh')
import hvplot.pandas
import requests
from IPython.display import Markdown
#execute time
from timeit import default_timer as timer
start_execute = timer()
To calculate the plasma position, $B_t$ contribution must be subtracted from the Mirnov coil signal. This contribution can be acquired by two ways: from a dedicated vacuum shot with the same toroidal magnetic field, or it can be estimated from the standard diagnostics. If a suitable vacuum shot is available, its number is stored in the variable vacuum_shot
. If it isn't available, the vacuum_shot
variable shall be set to False
and the $B_t$ contribution shall be estimated.
# data_URL = "http://golem.fjfi.cvut.cz/shots/{shot_no}/DASs/LimiterMirnovCoils/{identifier}.csv" #Mirnov coils and quadrupole
BDdata_URL = "http://golem.fjfi.cvut.cz/shots/{shot_no}/Diagnostics/BasicDiagnostics/{identifier}.csv" #BD = basic diagnostic
# data_URL = "http://golem.fjfi.cvut.cz/shots/{shot_no}/DASs/022Daniela0InnerStabilization/{identifier}.csv"
data_URL = "http://golem.fjfi.cvut.cz/shots/{shot_no}/Diagnostics/LimiterMirnovCoils/DAS_raw_data_dir/Nidatap_6133.lvm" #NI Standart (DAS)
shot_no = 35362 # to be replaced by the actual discharge number
#shot_no = 35021
vacuum_shot = 35353 # to be replaced by the discharge command line paramater "vacuum_shot"
#vacuum_shot = 35022 #number of the vacuum shot or 'False'
ds = np.DataSource(destpath='')
def open_remote(shot_no, identifier, url_template=data_URL):
return ds.open(url_template.format(shot_no=shot_no, identifier=identifier))
def read_signal(shot_no, identifier, url = data_URL, data_type='csv'):
file = open_remote(shot_no, identifier, url)
if data_type == 'lvm':
channels = ['Time', 'mc1', 'mc5', 'mc9', 'mc13', '5', 'RogQuadr', '7', '8', '9', '10', '11', '12']
return pd.read_table(file, sep = '\t', names=channels)
else:
return pd.read_csv(file, names=['Time', identifier],
index_col = 'Time', squeeze=True)
def get_basic_diag_data(shot):
url = ( 'http://golem.fjfi.cvut.cz/shots/{}/Diagnostics/BasicDiagnostics/'
'basig_diagnostics_processed.csv'.format(shot) )
print(url)
return pd.read_csv(url, header=0, index_col='Time')
Load the data of a given Mirnov coil, integrate it and subtract the $B_t$ contribution from them.
def elimination (shot_no, identifier, vacuum_shot = False):
#load data
mc = (read_signal(shot_no, identifier, data_URL, 'lvm'))
mc = mc.set_index('Time')
mc = mc.replace([np.inf, -np.inf, np.nan], value = 0)
mc = mc[identifier]
konst = 1/(3.8e-03)
if vacuum_shot == False:
signal_start = mc.index[0]
length = len(mc)
basic_diag = get_basic_diag_data(shot_no)
Bt = basic_diag['Bt']
if len(Bt)>len(mc):
Bt = Bt.iloc[:length]
if len(Bt)<len(mc):
mc = mc.iloc[:len(Bt)]
if identifier == 'mc1':
k=300
elif identifier == 'mc5':
k= 14
elif identifier == 'mc9':
k = 31
elif identifier == 'mc13':
k = -100
mc_vacuum = Bt/k
else:
mc_vacuum = (read_signal(vacuum_shot, identifier, data_URL, 'lvm'))
mc_vacuum = mc_vacuum.set_index('Time')
mc_vacuum = mc_vacuum[identifier]
mc_vacuum -= mc_vacuum.loc[:0.9e-3].mean() #remove offset
mc_vacuum = mc_vacuum.replace([np.inf, -np.inf], value = 0)
mc_vacuum = pd.Series(integrate.cumtrapz(mc_vacuum, x=mc_vacuum.index, initial=0) * konst,
index=mc_vacuum.index*1000, name= identifier) #integration
mc -= mc.loc[:0.9e-3].mean() #remove offset
mc = pd.Series(integrate.cumtrapz(mc, x=mc.index, initial=0) * konst,
index=mc.index*1000, name= identifier) #integration
#Bt elimination
mc_vacuum = np.array(mc_vacuum)
mc_elim = mc - mc_vacuum
return mc_elim
Find plasma start and end from the basic diagnostics.
loop_voltage = read_signal(shot_no, 'U_Loop', BDdata_URL)
dIpch = read_signal(shot_no, 'U_RogCoil', BDdata_URL)
dIpch -= dIpch.loc[:0.9e-3].mean()
Ipch = pd.Series(integrate.cumtrapz(dIpch, x=dIpch.index, initial=0) * (-5.3*1e06),
index=dIpch.index, name='Ipch')
U_l_func = interpolate.interp1d(loop_voltage.index, loop_voltage)
def dIch_dt(t, Ich):
return (U_l_func(t) - 0.0097 * Ich) / (1.2e-6/2)
t_span = loop_voltage.index[[0, -1]]
solution = integrate.solve_ivp(dIch_dt, t_span, [0], t_eval=loop_voltage.index, )
Ich = pd.Series(solution.y[0], index=loop_voltage.index, name='Ich')
Ip = Ipch - Ich
Ip.name = 'Ip'
Ip_detect = Ip.loc[0.0025:]
dt = (Ip_detect.index[-1] - Ip_detect.index[0]) / (Ip_detect.index.size)
window_length = int(0.5e-3/dt)
if window_length % 2 == 0:
window_length += 1
dIp = pd.Series(signal.savgol_filter(Ip_detect, window_length, polyorder=3, deriv=1, delta=dt),
name='dIp', index=Ip_detect.index) / 1e6
threshold = 0.05
CD = requests.get("http://golem.fjfi.cvut.cz/shots/%i/Production/Parameters/CD_orientation" % shot_no)
CD_orientation = CD.text
if "ACW" in CD_orientation:
plasma_start = dIp[dIp < dIp.min()*threshold].index[0]*1e3
plasma_end = dIp.idxmax()*1e3
else:
plasma_start = dIp[dIp > dIp.max()*threshold].index[0]*1e3
plasma_end = dIp.idxmin()*1e3 + 0.5
print ('Plasma start =', round(plasma_start, 3), 'ms')
print ('Plasma end =', round(plasma_end, 3), 'ms')
# print (CD_orientation)
mc1 = elimination(shot_no, 'mc1', vacuum_shot)
def horpol(shot_no, vacuum_shot=False):
mc1 = elimination(shot_no, 'mc1', vacuum_shot)
mc9 = elimination (shot_no, 'mc9', vacuum_shot)
b = 93
r = ((mc1-mc9)/(mc1+mc9))*b
r = r.replace([np.nan], value = 0)
# return r.loc[plasma_start:]
return r.loc[plasma_start:plasma_end]
# return r
r = horpol(shot_no, vacuum_shot)
ax = r.plot()
ax.set(ylim=(-85,85), xlim=(plasma_start,plasma_end), xlabel= 'Time [ms]', ylabel = '$\Delta$r [mm]',
title = 'Horizontal plasma position #{}'.format(shot_no))
ax.axhline(y=0, color='k', ls='--', lw=1, alpha=0.4)
def vertpol(shot_no, vacuum_shot = False):
mc5 = elimination(shot_no, 'mc5', vacuum_shot)
mc13 = elimination (shot_no, 'mc13', vacuum_shot)
b = 93
z = ((mc5-mc13)/(mc5+mc13))*b
z = z.replace([np.nan], value = 0)
# return z.loc[plasma_start:]
return z.loc[plasma_start:plasma_end]
# return z
z = vertpol (shot_no, vacuum_shot)
ax = z.plot()
ax.set(ylim=(-85, 85), xlim=(plasma_start,plasma_end), xlabel= 'Time [ms]', ylabel = '$\Delta$z [mm]', title = 'Vertical plasma position #{}'.format(shot_no))
ax.axhline(y=0, color='k', ls='--', lw=1, alpha=0.4)
def plasma_radius(shot_no, vacuum_shot=False):
r = horpol(shot_no, vacuum_shot)
z = vertpol(shot_no, vacuum_shot)
a0 = 85
a = a0 - np.sqrt((r**2)+(z**2))
a = a.replace([np.nan], value = 0)
# return a.loc[plasma_start:]
return a.loc[plasma_start:plasma_end]
# return a
a = plasma_radius(shot_no,vacuum_shot)
ax = a.plot()
ax.set(ylim=(0,85), xlim=(plasma_start,plasma_end), xlabel= 'Time [ms]', ylabel = '$a$ [mm]', title = 'Plasma column radius #{}'.format(shot_no))
plasma_time = []
t = 0
for i in a:
if i>85 or i <0:
a = a.replace(i, value = 0)
else:
plasma_time.append(a.index[t])
t+=1
start = plasma_time[0]-1e-03
end = plasma_time[-1]-1e-03
print('start =', round(start, 3), 'ms')
print('end =', round(end, 3), 'ms')
r_cut = r.loc[start:end]
a_cut = a.loc[start:end]
z_cut = z.loc[start:end]
df_processed = pd.concat(
[r_cut.rename('r'), z_cut.rename('z'), a_cut.rename('a')], axis= 'columns')
df_processed
savedata = 'plasma_position.csv'
df_processed.to_csv(savedata)
Markdown("[Plasma position data - $r, z, a$ ](./{})".format(savedata))
hline = hv.HLine(0)
hline.opts(
color='k',
line_dash='dashed',
alpha = 0.4,
line_width=1.0)
layout = hv.Layout([df_processed[v].hvplot.line(
xlabel='', ylabel=l,ylim=(-85,85), xlim=(start,end),legend=False, title='', grid=True, group_label=v)
for (v, l) in [('r', ' r [mm]'), ('z', 'z [mm]'), ('a', 'a [mm]')] ])*hline
plot=layout.cols(1).opts(hv.opts.Curve(width=600, height=200),
hv.opts.Curve('a', xlabel='time [ms]'))
plot
fig, axs = plt.subplots(3, 1, sharex=True, dpi=200)
r.plot(grid=True, ax=axs[0])
z.plot(grid=True, ax=axs[1])
a.plot(grid=True, ax=axs[2])
axs[2].set(ylim=(0,85), xlim=(start,end), xlabel= 'Time [ms]', ylabel = '$a$ [mm]')
axs[1].set(ylim=(-85,85), xlim=(start,end), xlabel= 'Time [ms]', ylabel = '$\Delta$z [mm]')
axs[0].set(ylim=(-85,85), xlim=(start,end), xlabel= 'Time [ms]', ylabel = '$\Delta$r [mm]', title = 'Horizontal, vertical plasma position and radius #{}'.format(shot_no))
plt.savefig('icon-fig')
The plasma vertical position can also be calculated from the fast camera signal as the centre of the radiating zone.
from PIL import Image
from io import BytesIO
from IPython.display import Image as DispImage
FastCamera = True
try:
url = "http://golem.fjfi.cvut.cz/shots/%i/Diagnostics/FastExCameras/plasma_film.png"%(shot_no)
image = requests.get(url)
img = Image.open(BytesIO(image.content)).convert('P') #'L','1'
data = np.asanyarray(img)
r = []
for i in range(data.shape[1]):
a=0
b=0
for j in range(336):
a += data[j,i]*j
b += data[j,i]
r.append((a/b)*(170/336))
Tcd = 3 #delay
camera_time = np.linspace(start+abs(start-Tcd), end, len(r))
r_camera = pd.Series(r, index = camera_time)-85
r_mirnov = horpol(shot_no, vacuum_shot).loc[start:end]
# print(start, end, len(r))
fig = plt.figure()
ax = r_camera.plot(label = 'Fast Camera')
ax = r_mirnov.plot(label = 'Mirnov coils')
plt.legend()
ax.set(ylim=(-85, 85), title='#%i'%shot_no,ylabel='$\Delta$r visible radiation')
ax.axhline(y=0, color='k', ls='--', lw=1, alpha=0.4)
fig.savefig('FastCameras_deltaR')
except OSError:
FastCamera = False
if FastCamera == True:
plt.imshow(img)