#!/usr/bin/env/ python import socket from subprocess import call from time import sleep, time from contextlib import contextmanager import sys # PARAMETERS gnome232_address = ("192.168.2.249", 10001) units = { 'm': 1e-3, 'MICRO': 1e-6, } # COMMUNICATION CLASS class Fluke451PSocket(object): def __init__(self, address, timeout=1): self.socket = socket.create_connection(gnome232_address) self.socket.settimeout(timeout) self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) def __del__(self): self.socket.shutdown(socket.SHUT_RDWR) self.socket.close() @contextmanager def different_timeout(self, timeout): old_timeout = self.socket.gettimeout() self.socket.settimeout(timeout) yield self.socket.settimeout(old_timeout) def get(self, termination_str=None, buflen=1024): message = '' while True: try: packet = self.socket.recv(buflen) except socket.timeout: break message += packet if termination_str and termination_str in message: break return message def put(self, command): self.socket.sendall(command) def activate_measurement(self): self.put('T') def terminate_measurement(self, plus_chars=5, repeat=1): with self.different_timeout(0.05): while True: self.put(' ') packet = self.get(buflen=1024) if '?' in packet: break def get_integrated(self): self.put('F') # not interested if ends with Sv/h with self.different_timeout(5): message = self.get() line = '' for line in message.splitlines(): if 'integrated' in line: break try: value, unit_mod, unit, status = line.split() return float(value) * units[unit_mod] except (TypeError, ValueError, KeyError): raise def save_integrated(self, fname): self.terminate_measurement() value = self.get_integrated() self.activate_measurement() save_const(fname, value) return value # UTILITY FUNCTIONS def make_icon(text): return call(['convert', '-size', '150x120', '-gravity', 'center', 'label:' + text, 'icon.png']) def save_const(fname, value): with open(fname, 'w') as fobj: fobj.write(str(value)) def load_const(fname): with open(fname, 'r') as fobj: return float(fobj.read()) if __name__ == '__main__': try: action = sys.argv[1] except IndexError: action = '' fluke = Fluke451PSocket(gnome232_address) if action == 'arming': #value = fluke.save_integrated('pre_Sv') pass elif action == 'acquisition': value = fluke.save_integrated('dose_since_last_measurement_Sv') try: old_value = load_const('/tmp/cumulative_dose_Sv') except IOError: old_value = 0 value += old_value save_const('/tmp/cumulative_dose_Sv', value) make_icon('%.2g Sv' % value) #value = fluke.save_integrated('post_Sv') #value_pre = load_const('pre_Sv') #diff = value - value_pre #save_const('diff_Sv', diff) #make_icon('%.2g Sv' % diff)