Source code for cytocalc.csmparser
#! /usr/bin/env python3
# Contains classes CSMParser (for parsing trajectory file: report####.txt)
# and CYMParser (for parsing simulation parameters: configurations.cym)
import os
import re
import io
import warnings
import sys
from cytocalc.csmframe import CSMFrame
from cytocalc.csmsimulation import CSMSimulation
import pandas as pd
[docs]
class CSMParser:
"""
Parses the cytosim trajectory file for a given frame
Returns an extended DataFrame object (CSMFrame)
"""
def __init__(self):
pass
[docs]
def parse_frameFile(self,filename):
with open(filename, 'r') as frame:
csmframe = self.parse_frame(frame)
return csmframe
[docs]
def parse_simFile(self,filename):
"""
Parses cytosim trajectory file generated using `report`
Returns a CSMSimulation
"""
sim = CSMSimulation()
with open(filename, 'r') as file_object:
for line in file_object:
if line.startswith('% frame'):
frame_data = ""
line = file_object.readline()
while not line.startswith('% end'):
frame_data+=line
line = file_object.readline()
frame_data = io.StringIO(frame_data)
sim.add_frame(self.parse_frame(frame_data))
return sim
[docs]
def parse_frame(self,frame_data):
""" Parses the cytosim trajectory file """
data_list = [] # list to store the dataframe object
time = 0
def _generate_data_pattern(*args):
"""
Deprecated
generates a compiled regex to parse data
such that match.group(arg) returns the value
of the corresponding arg
"""
arg_string = r''
for arg in args:
arg_string += r'\s*'
arg_string += r'(?P<' + arg + r'>\S+)'
arg_string += r'\n'
return re.compile(arg_string)
# define the regex patterns
data_pattern = None # read from file
report_keyword = None
has_headers = True
column_headers = []
time_pattern = re.compile(r'% time\s+(?P<time>\d+.\d+e?\d+)')
report_typeII = {'fiber', 'fiber:confine_force', 'fiber:position'}
report_typeIII = {'fiber:distribution'}
report_unsupported = {'fiber:speckle', 'bead:singles'}
indexLine = 0
for line in frame_data:
if match := time_pattern.search(line): # obtain frame time
time = float(match.group('time'))
if data_pattern is None and has_headers:
# Deprecated
# We don't know where the indexLine is
# so we use some heuristics to decide where it
# might be and trigger this line of code after finding it
if indexLine == 1:
column_headers = line.split()
column_headers.remove(r'%')
data_pattern = _generate_data_pattern(*column_headers)
print(f"Generated data pattern: {data_pattern}")
indexLine = 0
# To-Do: This is very fragile: idea is to trigger a data pattern
# search for the line after current line
if indexLine == 2:
# skip this line entirely
indexLine = 1
continue
if line.startswith('% report'):
report_keyword = line.split()[2]
# the line following this line typically contains
# the data arguments (posX,posY etc.)
# trigger data pattern search in the next line
# file structure:
# 09 % report bead:position
# 10 % class posX posY posZ ...
indexLine = 1
if report_keyword in report_typeII:
# Some files have an additional comment below the % report line
# handle some of those cases
# trigger data pattern search two lines later
# file structure:
# 09 % report fiber:position
# 10 % some comment
# 11 % class posX posY posZ ...
indexLine = 2
if report_keyword in report_typeIII:
# Some report files simply do not have headers
# give up on generating column headers
# file structure
# 09 % report fiber:distribution
# 10 bin 1 2 3 4 5 ...
# 11 count 4 5 3 3 2 ...
has_headers = False
if report_keyword in report_unsupported:
raise NotImplementedError(f"Report type {report_keyword} is not supported yet.")
# extract data if the line is not a comment
elif not line.startswith('%'):
if not line.isspace():
data_vals = line.split()
# Convert potential numbers in data_vals to appropriate types
for i, val in enumerate(data_vals):
try:
# Try integer conversion first
if ('.' not in val) and ('e' not in val) and ('E' not in val):
data_vals[i] = int(val)
else:
data_vals[i] = float(val)
except ValueError:
# Keep as string if conversion fails
pass
data_list.append(data_vals)
if report_keyword is None:
warnings.warn("Could not find report keyword in frame data. The file may be malformed.", UserWarning)
if len(data_list) == 0:
warnings.warn("No data found in frame. The file may be empty or malformed.", UserWarning)
# convert data_dict into a CSMFrame object
frame_data = pd.DataFrame(data_list)
if column_headers and len(frame_data.columns) == len(column_headers):
frame_data.columns = column_headers
frame = CSMFrame(frame_data)
# append csmframe attributes
frame.time = time
return frame
[docs]
class CYMParser:
"""
Deprecated
A Class for parsin the configuration file (configuration.cym)
Returns a dictionary that can be used in a CSMSimulation
"""
[docs]
def parse_config(config_file):
""" Parses configuration file using regex to get params """
param_dictionary = {}
# construct regex dictionaries
# parameters in rx_dict are automatically added to the simulation parameters
rx_dict = {
'motor_count':re.compile(r'new (?P<motor_count>\d+) couple motor'),
'cell_radius':re.compile(r' +geometry = circle (?P<cell_radius>\d+)'),
'crosslinker_count':re.compile(r'new (?P<crosslinker_count>\d+) couple crosslinker'),
'filament_count': re.compile(r'new (?P<filament_count>\d+) fiber filament')
}
# object_dict is used to obtain parameters that belong to an object, like motors
object_dict = {
'plus_motor':re.compile(r'set hand plus_motor'),
'binder':re.compile(r'set hand binder'),
'filament' : re.compile(r'(set fiber filament|new \d+ fiber filament)')
}
# the object attributes end here
end_pattern = re.compile(r'}')
attribute_pattern = re.compile(r' +(?P<attr_name>\S+) = (?P<attr_val>\S+)')
with open(config_file, 'r') as file:
for line in file:
# check if line matches main key
for key,rx in rx_dict.items():
match = rx.match(line)
if match:
param_dictionary[key] = float(match.group(key))
break
# get properties of objects, if objects are found
for obj, rx in object_dict.items():
if match:= rx.match(line):
obj_dict = {}
line = file.readline()
# read all the attributes within the object specification
while not end_pattern.match(line):
if match := attribute_pattern.match(line):
try: # convert to float if possible
obj_dict[match.group('attr_name')] = float(match.group('attr_val'))
except ValueError:
obj_dict[match.group('attr_name')] = match.group('attr_val')
line = file.readline()
if not obj in param_dictionary:
param_dictionary[obj] = obj_dict # add the object dictionary to the parameters
else:
param_dictionary[obj].update(obj_dict) # update in cases where multiple places provide attributes
break
return param_dictionary