#!/usr/bin/env/ python
import socket
from subprocess import call
from time import sleep, time
from contextlib import contextmanager
import sys
# PARAMETERS
gnome232_address = ("192.168.2.249", 10001)
units = {
'm': 1e-3,
'MICRO': 1e-6,
}
# COMMUNICATION CLASS
class Fluke451PSocket(object):
def __init__(self, address, timeout=1):
self.socket = socket.create_connection(gnome232_address)
self.socket.settimeout(timeout)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
def __del__(self):
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
@contextmanager
def different_timeout(self, timeout):
old_timeout = self.socket.gettimeout()
self.socket.settimeout(timeout)
yield
self.socket.settimeout(old_timeout)
def get(self, termination_str=None, buflen=1024):
message = ''
while True:
try:
packet = self.socket.recv(buflen)
except socket.timeout:
break
message += packet
if termination_str and termination_str in message:
break
return message
def put(self, command):
self.socket.sendall(command)
def activate_measurement(self):
self.put('T')
def terminate_measurement(self, plus_chars=5, repeat=1):
with self.different_timeout(0.05):
while True:
self.put(' ')
packet = self.get(buflen=1024)
if '?' in packet:
break
def get_integrated(self):
self.put('F')
# not interested if ends with Sv/h
with self.different_timeout(5):
message = self.get()
line = ''
for line in message.splitlines():
if 'integrated' in line:
break
try:
value, unit_mod, unit, status = line.split()
return float(value) * units[unit_mod]
except (TypeError, ValueError, KeyError):
raise
def save_integrated(self, fname):
self.terminate_measurement()
value = self.get_integrated()
self.activate_measurement()
save_const(fname, value)
return value
# UTILITY FUNCTIONS
def make_icon(text):
return call(['convert', '-size', '150x120', '-gravity', 'center',
'label:' + text, 'icon.png'])
def save_const(fname, value):
with open(fname, 'w') as fobj:
fobj.write(str(value))
def load_const(fname):
with open(fname, 'r') as fobj:
return float(fobj.read())
if __name__ == '__main__':
try:
action = sys.argv[1]
except IndexError:
action = ''
fluke = Fluke451PSocket(gnome232_address)
if action == 'arming':
#value = fluke.save_integrated('pre_Sv')
pass
elif action == 'acquisition':
value = fluke.save_integrated('dose_since_last_measurement_Sv')
try:
old_value = load_const('/tmp/cumulative_dose_Sv')
except IOError:
old_value = 0
value += old_value
save_const('/tmp/cumulative_dose_Sv', value)
make_icon('%.2g Sv' % value)
#value = fluke.save_integrated('post_Sv')
#value_pre = load_const('pre_Sv')
#diff = value - value_pre
#save_const('diff_Sv', diff)
#make_icon('%.2g Sv' % diff)