Load libraries
%matplotlib inline
import os
import numpy as np
import matplotlib.pyplot as plt
from scipy import constants, integrate, signal, interpolate
import sqlalchemy # high-level library for SQL in Python
import pandas as pd
For interactive web figures
import holoviews as hv
hv.extension('bokeh')
import hvplot.pandas
For conditional rich-text boxes
from IPython.display import Markdown
Define global constants.
base_URL = "http://golem.fjfi.cvut.cz/utils/data/{shot_no}/{identifier}" #remote access
data_URL = "http://golem.fjfi.cvut.cz/shots/{shot_no}/DASs/StandardDAS/{identifier}.csv" # TODO workaround
destination=''
#os.makedirs(destination, exist_ok=True );
shot_no = 0
# try to get thot number form SHOT_NO envirnoment variable, otherwise use the specified one
shot_no = os.environ.get('SHOT_NO', 0)
The DataSource
downloads and caches data (by full URL) in a temporary directory and makes them accessible as files.
ds = np.DataSource(destpath='/tmp')
def print_and_save(PhysQuant, Value, Format ):
print(PhysQuant+" = %.5f" % Value)
with open(destination+PhysQuant, 'w') as f:
f.write("%.3f" % Value)
update_db_current_shot(PhysQuant,Value)
def update_db_current_shot(field_name, value):
try:
engine = sqlalchemy.create_engine('postgresql://golem:rabijosille@192.168.2.116/golem_database')
except:
return
engine.execute(f"""UPDATE shots SET "{field_name}"={value} WHERE shot_no IN(SELECT max(shot_no) FROM shots)""")
def open_remote(shot_no, identifier, url_template=base_URL):
return ds.open(url_template.format(shot_no=shot_no, identifier=identifier))
def read_value(shot_no, identifier):
"""Return the value for given shot as a number if possible"""
value = open_remote(shot_no, identifier, base_URL).read()
return pd.to_numeric(value, errors='ignore')
def read_signal(shot_no, identifier):
file = open_remote(shot_no, identifier, data_URL)
return pd.read_csv(file, names=['Time',identifier],
index_col='Time', squeeze=True) # squeeze makes simple 1-column signals a Series
loop_voltage = read_signal(shot_no, 'LoopVoltageCoil_raw')
ax = loop_voltage.plot(grid=True)
ax.set(xlabel="Time [s]", ylabel="$U_l$ [V]", title="Loop voltage $U_l$ #{}".format(shot_no));
loop_voltage_mean = loop_voltage.abs().mean()
print_and_save("U_loop_mean", loop_voltage_mean, "%.3f")
The 99% quantile is used to neglect large peaks. This method also ignores NaNs.
loop_voltage_max = loop_voltage.quantile(0.99)
print_and_save("U_loop_max", loop_voltage_max, "%.3f")
It is as magnetic measurement, so the raw data only give $\frac{dB_t}{dt}$
dBt = read_signal(shot_no,'BtCoil_raw')
ax = dBt.plot(grid=True)
ax.set(xlabel="Time [s]", ylabel="$dU_{B_t}/dt$ [V]", title="BtCoil_raw signal #{}".format(shot_no));
K_BtCoil = read_value(shot_no, 'K_BtCoil') # Get BtCoil calibration factor
print('BtCoil calibration factor K_BtCoil={} T/(Vs)'.format(K_BtCoil))
Bt = pd.Series(integrate.cumtrapz(dBt, x=dBt.index, initial=0) * K_BtCoil,
index=dBt.index, name='Bt')
ax = Bt.plot(grid=True)
ax.set(xlabel="Time [s]", ylabel="$B_t$ [T]", title="Toroidal magnetic field $B_t$ #{}".format(shot_no));
Bt_mean = Bt.abs().mean()
print_and_save("Bt_mean", Bt_mean, "%.3f")
Bt_max = Bt.quantile(0.99)
print_and_save("Bt_max", Bt_max, "%.3f")
Because it is a magnetic measurement, the raw data only gives $\frac{dI_{p+ch}}{dt}$
dIpch = read_signal(shot_no, 'RogowskiCoil_raw')
dIpch -= dIpch.loc[:0.9e-3].mean() # subtract offset
ax = dIpch.plot(grid=True)
ax.set(xlabel="Time [s]", ylabel="$dU_{I_{p+ch}}/dt$ [V]", title="RogowskiCoil_raw signal #{}".format(shot_no));
K_RogowskiCoil = read_value(shot_no, 'K_RogowskiCoil') # Get RogowskiCoil calibration factor
print('RogowskiCoil calibration factor K_RogowskiCoil={} A/(Vs)'.format(K_RogowskiCoil))
Ipch = pd.Series(integrate.cumtrapz(dIpch, x=dIpch.index, initial=0) * K_RogowskiCoil * (-1),
index=dIpch.index, name='Ipch')
ax = Ipch.plot(grid=True)
ax.set(xlabel="Time [s]", ylabel="$I_{p+ch}$ [A]", title="Total (plasma+chamber) current $I_{{p+ch}}$ #{}".format(shot_no));
R_chamber = read_value(shot_no, 'R_chamber') # Get Chamber resistivity
print('Chamber resistivity R_chamber={} Ohm'.format(R_chamber))
L_chamber = 1.2e-6 # H
The chamber current $I_{ch}$ satisfies the equation (neglecting the mutual inductance with the plasma) $$U_l = R_{ch} I_{ch} + L_{ch} \frac{d I_{ch}}{dt}$$ Therefore, the following initial value problem must be solved to take into account the chamber inductance properly $$\frac{d I_{ch}}{dt} = \frac{1}{L_{ch}}\left( U_l - R_{ch} I_{ch}\right), \quad I_{ch}(t=0)=0$$
U_l_func = interpolate.interp1d(loop_voltage.index, loop_voltage) # 1D interpolator
def dIch_dt(t, Ich):
return (U_l_func(t) - R_chamber * Ich) / L_chamber
t_span = loop_voltage.index[[0, -1]]
solution = integrate.solve_ivp(dIch_dt, t_span, [0], t_eval=loop_voltage.index, )
Ich = pd.Series(solution.y[0], index=loop_voltage.index, name='Ich')
Ip_naive = Ipch - loop_voltage/R_chamber # creates a new Series
Ip = Ipch - Ich
Ip.name = 'Ip'
Ip_naive.plot(grid=True, label='naive $I_{ch}=U_l/R_{ch}$')
ax = Ip.plot(grid=True, label=r'using $U_l = R_{ch} I_{ch} - L_{ch} \frac{d I_{ch}}{dt}$')
ax.legend()
ax.set(xlabel="Time [s]", ylabel="$I_{p}$ [A]", title="Plasma current $I_{{p}}$ #{}".format(shot_no));
Ip_mean = Ip.abs().mean()
print_and_save("Ip_mean", Ip_mean/1000, "%.3f")
Ip_max = Ip.quantile(0.99)
print_and_save("Ip_max", Ip_max/1000, "%.3f")
The plasma detection is based on finding the time points of the greatest peaks in the first (smooth) derivative of $I_p$ which corresponds to the breaks in slope of $I_p$, i.e. the rise or fall to 0, respectively. The smooth derivative is found using the Savitzky-Golay filter.
Ip_detect = Ip.loc[0.0025:] # select time > 2.5 ms to avoid thyristors
dt = (Ip_detect.index[-1] - Ip_detect.index[0]) / (Ip_detect.index.size) # estimated sampling step
print('Sampling step {dt:.3g} s, sampling frequency {fs:.3g} Hz'.format(dt=dt, fs=1/dt))
window_length = int(0.5e-3/dt) # window fit over 1 ms
if window_length % 2 == 0: # must not be odd
window_length += 1
dIp = pd.Series(signal.savgol_filter(Ip_detect, window_length, polyorder=3, deriv=1, delta=dt),
name='dIp', index=Ip_detect.index) / 1e6 # [MA/s]
threshold = 0.05
plasma_start = dIp[dIp > dIp.max()*threshold].index[0] # plasma actually starts before the sharpest rise
plasma_end = dIp.idxmin() # due to inductance smoothing the greatest fall is the best predictor
is_plasma = int(plasma_start < plasma_end and dIp.abs().max() > 0.5)
if not is_plasma:
#plasma_start = plasma_end = np.nan
plasma_start = plasma_end = -1
plasma_lifetime = 0
else:
plasma_lifetime = plasma_end - plasma_start
print_and_save("b_discharge", is_plasma, "%.3f")
print_and_save("t_plasma_start", plasma_start*1000, "%.3f")
print_and_save("t_plasma_end", plasma_end*1000, "%.3f")
print_and_save("t_discharge_duration", plasma_lifetime*1000, "%.3f")
fig, axs = plt.subplots(3, 1, sharex=True, dpi=200)
dIp.plot(grid=True, ax=axs[0])
axs[0].set(xlabel='', ylabel='$dI_p/dt$ [MA/s]', title='Plasma detection from $d I_p/dt$ in #{}'.format(shot_no))
Ip.plot(grid=True, ax=axs[1])
axs[1].set(ylabel="$I_{p}$ [A]")
loop_voltage.plot(grid=True, ax=axs[2])
axs[2].set(xlabel="Time [s]", ylabel="$U_{l}$ [V]")
for ax in axs:
for t in (plasma_start, plasma_end):
ax.axvline(t, color='k', linestyle='--', linewidth=2)
plt.savefig('plasma_detection')
df_processed = pd.concat(
[loop_voltage.rename('Ul'), Bt, Ip*1e-3], axis='columns')
df_processed.index = df_processed.index * 1e3 # to ms
df_processed.head()
if is_plasma:
plasma_lines = hv.VLine(plasma_start*1e3) * hv.VLine(plasma_end*1e3)
else:
plasma_lines = hv.Curve([])
layout = hv.Layout([df_processed[v].hvplot.line(
xlabel='', ylabel=l, legend=False, title='', grid=True, group_label=v) * plasma_lines
for (v, l) in [('Ul', 'Uₗ [V]'), ('Bt', 'Bₜ [T]'), ('Ip', 'Iₚ [kA]')] ])
plot=layout.cols(1).opts(hv.opts.Curve(width=600, height=200),
hv.opts.VLine(color='black', alpha=0.7, line_dash='dashed'),
hv.opts.Curve('Ip', xlabel='time [ms]'))
hvplot.save(plot, 'homepage_figure.html')
plot
fname = 'basig_diagnostics_processed.csv'
df_processed.to_csv(fname)
Markdown("[Time series in graph in CSV format](./{})".format(fname))
units = ['V', 'T', 'kA']
plasma_start_ms = plasma_start *1e3
plasma_end_ms = plasma_end *1e3
plasma_lifetime_ms = plasma_lifetime * 1e3
if is_plasma:
heading = Markdown("### Basic diagnostics values during plasma\n\n"
"plasma lifetime of {:.3g} ms, from {:.3g} ms to {:.3g} ms".format(
plasma_lifetime_ms, plasma_start_ms, plasma_end_ms))
else:
heading = Markdown("### No plasma detected (vacuum discharge)")
heading
df_during_plasma = df_processed.loc[plasma_start_ms:plasma_end_ms]
df_overview = pd.DataFrame()
for n in ('mean', 'min', 'max'):
df_overview[n] = df_during_plasma.aggregate(n)
if is_plasma:
df_overview['start'] = df_during_plasma.iloc[0]
df_overview['end'] = df_during_plasma.iloc[-1]
else:
df_overview['start'] = df_overview['end'] = df_overview['max']
df_overview.insert(0,'units', units)
df_overview.T
End of overview row
abs()
?)