#!/usr/bin/env python # -*- coding: utf-8 -*- """ ================================================================ Standalone Python TDMS reader, (not using the NI libraries for they were Windows-specific) ================================================================ based on the format description on http://zone.ni.com/devzone/cda/tut/p/id/5696 Floris van Vugt IMMM Hannover http://florisvanvugt.free.fr/ I am greatly indebted for insightful bug-corrections by: ADAM REEVE JUERGEN NEUBAUER, PH.D. Thanks guys! """ import struct import os # Tells us whether we should really output messages about what # we have read (clutters the output buffer though) verbose = False # store previous values for the raw data properties for when # they are repeated object_rawdata = {} def byteToHex( byteStr ): """ Convert a byte string to it's hex string representation e.g. for output. """ # Uses list comprehension which is a fractionally faster implementation than # the alternative, more readable, implementation below # # hex = [] # for aChar in byteStr: # hex.append( "%02X " % ord( aChar ) ) # # return ''.join( hex ).strip() return ''.join( [ "%02X " % ord( x ) for x in byteStr ] ).strip() tocProperties = { 'kTocMetaData' : (1L<<1), 'kTocRawData' : (1L<<3), 'kTocDAQmxRawData' : (1L<<7), 'kTocInterleavedData' : (1L<<5), 'kTocBigEndian' : (1L<<6), 'kTocNewObjList' : (1L<<2), } tdsDataTypes = [ 'tdsTypeVoid', 'tdsTypeI8', 'tdsTypeI16', 'tdsTypeI32', 'tdsTypeI64', 'tdsTypeU8', 'tdsTypeU16', 'tdsTypeU32', 'tdsTypeU64', 'tdsTypeSingleFloat', 'tdsTypeDoubleFloat', 'tdsTypeExtendedFloat', 'tdsTypeSingleFloatWithUnit', 'tdsTypeDoubleFloatWithUnit', 'tdsTypeExtendedFloatWithUnit', 'tdsTypeString', 'tdsTypeBoolean', 'tdsTypeTimeStamp', 'tdsTypeDAQmxRawData', ] tdsDataTypesDefined = { 0x19: 'tdsTypeSingleFloatWithUnit', 0x20: 'tdsTypeString', 0x21: 'tdsTypeBoolean', 0x44: 'tdsTypeTimeStamp', 0xFFFFFFFF:'tdsTypeDAQmxRawData', } tdsDataTypesTranscriptions = { 'tdsTypeVoid' : '', 'tdsTypeI8' : 'b', 'tdsTypeI16' : 'h', # short: standard size: 2 bytes 'tdsTypeI32' : 'l', 'tdsTypeI64' : 'q', 'tdsTypeU8' : 'B', 'tdsTypeU16' : 'H', # unsigned short: 2 bytes 'tdsTypeU32' : 'L', 'tdsTypeU64' : 'Q', 'tdsTypeSingleFloat' : 'f', 'tdsTypeDoubleFloat' : 'd', 'tdsTypeExtendedFloat' : ' ', #NOT YET IMPLEMENTED 'tdsTypeSingleFloatWithUnit' : ' ', #NOT YET IMPLEMENTED 'tdsTypeDoubleFloatWithUnit' : ' ', #NOT YET IMPLEMENTED 'tdsTypeExtendedFloatWithUnit' : ' ', #NOT YET IMPLEMENTED 'tdsTypeString' :' ', # SHOULD BE HANDLED SEPARATELY 'tdsTypeBoolean' :'b', 'tdsTypeTimeStamp' :' ', # SHOULD BE HANDLED SEPARATELY 'tdsTypeDAQmxRawData' :' ', # SHOULD BE HANDLED SEPARATELY } def dataTypeFrom( s ): """ Find back the data type from raw input data. """ repr = struct.unpack("Raw data reading",byteToHex(s),",that is,",rawdataindex,"bytes" #s = f.read(rawdataindex) # New raw data index! inf_length = rawdataindex # DataType s = f.read(4) rawdata_datatype = dataTypeFrom(s) # Dimension of the raw data array s = f.read(4) #print "Dimension: ",byteToHex(s) rawdata_dim = struct.unpack("Done reading raw data:",rawdata # Read the number of properties s = f.read(4) nProp = struct.unpack(" Interleaved" # Initialise data to be empty data = {} for c in channel: data[c]=[] j=0 while j Not Interleaved" data = {} # Start with no data in our channel for c in channels: data[c] = [] for chunk in range(n_chunks): for c in channels: size= channel_sizes[c] (name, rawdataindex, (datatype, rawdata_dim, rawdata_values), values) = segmentobjects[c] # Calculate how many bytes a single value is datapointsize= dataTypeLength(datatype) for j in range(0,rawdata_values): # Read one value from the file s = f.read(datapointsize) value = getValue(s,endianness,datatype) data[c].append( value ) return data def mergeRawData( rawdata, newrawdata ): """ Return the raw data, appended the new raw data. """ for channel in newrawdata.keys(): # If we already had data on this channel if (channel in rawdata.keys()): rawdata[channel].extend(newrawdata[channel]) # Else we just chart it annew else: rawdata[channel] = newrawdata[channel] return rawdata def readSegment( f, filesize, data ): """ Read a segment from file f, whose filesize is given, and data is what we have read already """ # This is the data we have so far. # The stuff in this segment is going to append to this. (objects,rawdata)=data leadin = readLeadIn(f) (metadata,version,next_segment_offset,raw_data_offset) = leadin newobjects = {} # If the segment has metadata... if (metadata["kTocMetaData"]): # Read the meta data (newobjects,newobjectorder) = readMetaData(f) # Merge the new information with what we knew already about the objects. objects = mergeObjects( objects, newobjects ) if (metadata["kTocRawData"]): # Read the raw data newdata = readRawData(f,leadin,newobjects,newobjectorder,filesize) # And merge the data we just read with what we knew already rawdata = mergeRawData( rawdata, newdata ) return (objects,rawdata) def dumpProperties(props): ret = '' for pr in props: (tp,val)=props[pr] ret = ret + (pr+'=') + str(val) + ", " return ret def csvDump((objects,data)): """ Dump the (objects,rawdata) that we read from a TDMS file straight into a CSV file. """ ret = '' for obj in objects.keys(): # Objects (objectpath, rawdataindex, rawdata, properties) = objects[obj] print ("OBJECT "+objectpath+" ("+dumpProperties(properties)+")\n") # ret = ret + '' i = 0 maxi = max([ len(data[obj]) for obj in objects.keys() if obj in data.keys() ]) channels = [ obj for obj in objects.keys() if isChannel(objects[obj]) ] ret += '\t'.join(channels)+'\n' for i in range(maxi): for obj in channels: val = '' if ((obj in data.keys()) and i