{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Plasma detection from photodiode signal\n", "\n", "This is a quick plasma detection code from the photodiode signal.\n", "This procedure is run as soon as possible in order to provide the information on plasma existence to other routines.\n", "\n", "(author: L. Lobko)" ] }, { "cell_type": "code", "metadata": {}, "source": [ "import numpy as np\n", "import os\n", "from scipy import signal as sigproc\n", "import matplotlib.pyplot as plt" ], "outputs": [], "execution_count": null }, { "cell_type": "code", "metadata": {}, "source": "shot_no = 45752", "outputs": [], "execution_count": null }, { "cell_type": "code", "metadata": {}, "source": [ "def update_db_current_shot(field_name, value):\n", " os.system('export PGPASSWORD=`cat /golem/production/psql_password`;psql -c \"UPDATE operation.discharges SET '+field_name+'='+str(value)+'WHERE shot_no IN(SELECT max(shot_no) FROM operation.discharges)\" -q -U golem golem_database')\n", " os.system('export PGPASSWORD=`cat /golem/production/psql_password`;psql -c \"UPDATE diagnostics.basicdiagnostics SET '+field_name+'='+str(value)+'WHERE shot_no IN(SELECT max(shot_no) FROM diagnostics.basicdiagnostics)\" -q -U golem golem_database')" ], "outputs": [], "execution_count": null }, { "cell_type": "code", "metadata": {}, "source": [ "os.makedirs('Results', exist_ok=True)" ], "outputs": [], "execution_count": null }, { "cell_type": "code", "metadata": {}, "source": [ "def save_scalar(phys_quant, value, format_str='%.3f'):\n", " with open(\"Results/\"+phys_quant, 'w') as f:\n", " f.write(format_str % value)\n", " update_db_current_shot(phys_quant,value)" ], "outputs": [], "execution_count": null }, { "cell_type": "code", "metadata": {}, "source": [ "ds = np.DataSource('/tmp') # temporary storage for downloaded files\n", "data_URL = 'http://golem.fjfi.cvut.cz/shots/{shot_no}/Diagnostics/PlasmaDetection/U_LeybPhot.csv'" ], "outputs": [], "execution_count": null }, { "cell_type": "markdown", "metadata": {}, "source": [ "Load data" ] }, { "cell_type": "code", "metadata": {}, "source": [ "def load_data(shot_no):\n", " fname = ds.open(data_URL.format(shot_no=shot_no)).name\n", " data = np.loadtxt(fname, delimiter=',')\n", " data[:, 0] = data[:, 0] * 1e3 # from micros to ms\n", " return data" ], "outputs": [], "execution_count": null }, { "cell_type": "code", "metadata": {}, "source": [ "U_photodiode = load_data(shot_no)" ], "outputs": [], "execution_count": null }, { "cell_type": "markdown", "metadata": {}, "source": [ "Remove offset" ] }, { "cell_type": "code", "metadata": {}, "source": [ "def offset_remove(data):\n", " x_size, y_size = data.shape\n", " data_for_offset = data[0:int(x_size/100)]\n", " offset = np.mean(data_for_offset[:, 1])\n", " data[:, 1] -= offset\n", " return data" ], "outputs": [], "execution_count": null }, { "cell_type": "code", "metadata": {}, "source": [ "U_photodiode = offset_remove(U_photodiode)" ], "outputs": [], "execution_count": null }, { "cell_type": "markdown", "metadata": {}, "source": [ "Smooth signal" ] }, { "cell_type": "code", "metadata": {}, "source": [ "def smooth(y, box_pts):\n", " box = np.ones(box_pts) / box_pts\n", " y_smooth = np.convolve(y, box, mode='same')\n", " return y_smooth" ], "outputs": [], "execution_count": null }, { "cell_type": "code", "metadata": {}, "source": [ "U_photodiode[:, 1] = smooth(U_photodiode[:, 1], 100)" ], "outputs": [], "execution_count": null }, { "cell_type": "code", "source": [ "def find_peaks(data):\n", " peaks_indexes, _ = sigproc.find_peaks(data[:, 1], prominence=1e-4)\n", " return np.vstack((data[peaks_indexes, 0], data[peaks_indexes, 1])).T" ], "metadata": { "collapsed": false }, "outputs": [], "execution_count": null }, { "cell_type": "markdown", "metadata": {}, "source": [ "Calculate plasma boundaries from signal derivation" ] }, { "cell_type": "code", "metadata": {}, "source": [ "def calc_plasma_boundaries(data, position):\n", " deriv = data.copy()\n", " deriv[:, 1] = np.gradient(data[:, 1])\n", " deriv[:, 1] = smooth(deriv[:, 1], 200)\n", " if position == 'start':\n", " index = np.where(deriv[:, 1] >= np.max(deriv[:, 1])/5)\n", " deriv = deriv[index]\n", " return deriv[0, 0]\n", " else:\n", " deriv = np.abs(deriv)\n", " peaks = find_peaks(deriv)\n", " return peaks[-1, 0]" ], "outputs": [], "execution_count": null }, { "cell_type": "code", "metadata": {}, "source": [ "if np.max(U_photodiode[:, 1]) < 0.01:\n", " print('No plasma in vacuum chamber.')\n", " t_plasma_start = -1.0\n", " t_plasma_end = -1.0\n", "else:\n", " t_plasma_start = calc_plasma_boundaries(U_photodiode, 'start')\n", " print('Plasma starts at {:.2f} ms.'.format(t_plasma_start))\n", " t_plasma_end = calc_plasma_boundaries(U_photodiode, 'end')\n", " print('Plasma ends at {:.2f} ms.'.format(t_plasma_end))" ], "outputs": [], "execution_count": null }, { "cell_type": "code", "metadata": {}, "source": [ "b_plasma = int(t_plasma_start > 0 and t_plasma_end > 0)" ], "outputs": [], "execution_count": null }, { "cell_type": "code", "metadata": {}, "source": [ "if b_plasma:\n", " t_plasma_duration = t_plasma_end - t_plasma_start\n", " print('Plasma duration is {:.2f} ms.'.format(t_plasma_duration))\n", "else:\n", " t_plasma_duration = -1.0 # convention instead of nan" ], "outputs": [], "execution_count": null }, { "metadata": {}, "cell_type": "code", "source": "plasma_endpoints = [t_plasma_start, t_plasma_end]", "outputs": [], "execution_count": null }, { "metadata": {}, "cell_type": "code", "source": [ "fig, axes = plt.subplots()\n", "axes.plot(U_photodiode[:, 0], U_photodiode[:, 1], label = 'photodiode signal')\n", "for x in plasma_endpoints:\n", " plt.axvline(x=x, color='black', linestyle='--')\n", "axes.set(xlabel='$time$ [ms]', ylabel='$U$ [a.u.]')\n", "plt.legend()\n", "plt.grid()\n", "plt.show()" ], "outputs": [], "execution_count": null }, { "cell_type": "markdown", "metadata": {}, "source": [ "Save data" ] }, { "cell_type": "code", "metadata": {}, "source": [ "save_scalar(\"b_plasma\", b_plasma)\n", "save_scalar(\"t_plasma_start\", t_plasma_start)\n", "save_scalar(\"t_plasma_end\", t_plasma_end)\n", "save_scalar(\"t_plasma_duration\", t_plasma_duration)" ], "outputs": [], "execution_count": null } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.8" } }, "nbformat": 4, "nbformat_minor": 4 }