Source code for cytocalc.csmparser
#! /usr/bin/env python3
# Contains classes CSMParser (for parsing trajectory file: report####.txt)
# and CYMParser (for parsing simulation parameters: configurations.cym)
import io
import os
import re
import sys
import warnings
import pandas as pd
from cytocalc.csmframe import CSMFrame
from cytocalc.csmsimulation import CSMSimulation
[docs]
class CSMParser:
"""
Parses the cytosim trajectory file for a given frame
Returns an extended DataFrame object (CSMFrame)
"""
def __init__(self):
pass
[docs]
def parse_frameFile(self, filename):
with open(filename, "r") as frame:
csmframe = self.parse_frame(frame)
return csmframe
[docs]
def parse_simFile(self, filename):
"""
Parses cytosim trajectory file generated using `report`
Returns a CSMSimulation
"""
sim = CSMSimulation()
with open(filename, "r") as file_object:
for line in file_object:
if line.startswith("% frame"):
frame_data = ""
line = file_object.readline()
while not line.startswith("% end"):
frame_data += line
line = file_object.readline()
frame_data = io.StringIO(frame_data)
sim.add_frame(self.parse_frame(frame_data))
return sim
[docs]
def parse_frame(self, frame_data):
"""Parses the cytosim trajectory file"""
data_list = [] # list to store the dataframe object
time = 0
def _generate_data_pattern(*args):
"""
Deprecated
generates a compiled regex to parse data
such that match.group(arg) returns the value
of the corresponding arg
"""
arg_string = r""
for arg in args:
arg_string += r"\s*"
arg_string += r"(?P<" + arg + r">\S+)"
arg_string += r"\n"
return re.compile(arg_string)
# define the regex patterns
data_pattern = None # read from file
report_keyword = None
has_headers = True
column_headers = []
time_pattern = re.compile(r"% time\s+(?P<time>\d+.\d+e?\d+)")
report_typeII = {"fiber", "fiber:confine_force", "fiber:position"}
report_typeIII = {"fiber:distribution"}
report_unsupported = {"fiber:speckle", "bead:singles"}
indexLine = 0
for line in frame_data:
if match := time_pattern.search(line): # obtain frame time
time = float(match.group("time"))
if data_pattern is None and has_headers:
# Deprecated
# We don't know where the indexLine is
# so we use some heuristics to decide where it
# might be and trigger this line of code after finding it
if indexLine == 1:
column_headers = line.split()
column_headers.remove(r"%")
data_pattern = _generate_data_pattern(*column_headers)
indexLine = 0
# To-Do: This is very fragile: idea is to trigger a data pattern
# search for the line after current line
if indexLine == 2:
# skip this line entirely
indexLine = 1
continue
if line.startswith("% report"):
report_keyword = line.split()[2]
# the line following this line typically contains
# the data arguments (posX,posY etc.)
# trigger data pattern search in the next line
# file structure:
# 09 % report bead:position
# 10 % class posX posY posZ ...
indexLine = 1
if report_keyword in report_typeII:
# Some files have an additional comment below the % report line
# handle some of those cases
# trigger data pattern search two lines later
# file structure:
# 09 % report fiber:position
# 10 % some comment
# 11 % class posX posY posZ ...
indexLine = 2
if report_keyword in report_typeIII:
# Some report files simply do not have headers
# give up on generating column headers
# file structure
# 09 % report fiber:distribution
# 10 bin 1 2 3 4 5 ...
# 11 count 4 5 3 3 2 ...
has_headers = False
if report_keyword in report_unsupported:
raise NotImplementedError(
f"Report type {report_keyword} is not supported yet."
)
# extract data if the line is not a comment
elif not line.startswith("%"):
if not line.isspace():
data_vals = line.split()
# Convert potential numbers in data_vals to appropriate types
for i, val in enumerate(data_vals):
try:
# Try integer conversion first
if (
("." not in val)
and ("e" not in val)
and ("E" not in val)
):
data_vals[i] = int(val)
else:
data_vals[i] = float(val)
except ValueError:
# Keep as string if conversion fails
pass
data_list.append(data_vals)
if report_keyword is None:
warnings.warn(
"Could not find report keyword in frame data. The file may be malformed.",
UserWarning,
)
if len(data_list) == 0:
warnings.warn(
"No data found in frame. The file may be empty or malformed.",
UserWarning,
)
# convert data_dict into a CSMFrame object
frame_data = pd.DataFrame(data_list)
if column_headers and len(frame_data.columns) == len(column_headers):
frame_data.columns = column_headers
frame = CSMFrame(frame_data)
# append csmframe attributes
frame.time = time
return frame
[docs]
class CYMParser:
"""
Deprecated
A Class for parsin the configuration file (configuration.cym)
Returns a dictionary that can be used in a CSMSimulation
"""
[docs]
def parse_config(config_file):
"""Parses configuration file using regex to get params"""
param_dictionary = {}
# construct regex dictionaries
# parameters in rx_dict are automatically added to the simulation parameters
rx_dict = {
"motor_count": re.compile(r"new (?P<motor_count>\d+) couple motor"),
"cell_radius": re.compile(r" +geometry = circle (?P<cell_radius>\d+)"),
"crosslinker_count": re.compile(
r"new (?P<crosslinker_count>\d+) couple crosslinker"
),
"filament_count": re.compile(r"new (?P<filament_count>\d+) fiber filament"),
}
# object_dict is used to obtain parameters that belong to an object, like motors
object_dict = {
"plus_motor": re.compile(r"set hand plus_motor"),
"binder": re.compile(r"set hand binder"),
"filament": re.compile(r"(set fiber filament|new \d+ fiber filament)"),
}
# the object attributes end here
end_pattern = re.compile(r"}")
attribute_pattern = re.compile(r" +(?P<attr_name>\S+) = (?P<attr_val>\S+)")
with open(config_file, "r") as file:
for line in file:
# check if line matches main key
for key, rx in rx_dict.items():
match = rx.match(line)
if match:
param_dictionary[key] = float(match.group(key))
break
# get properties of objects, if objects are found
for obj, rx in object_dict.items():
if match := rx.match(line):
obj_dict = {}
line = file.readline()
# read all the attributes within the object specification
while not end_pattern.match(line):
if match := attribute_pattern.match(line):
try: # convert to float if possible
obj_dict[match.group("attr_name")] = float(
match.group("attr_val")
)
except ValueError:
obj_dict[match.group("attr_name")] = match.group(
"attr_val"
)
line = file.readline()
if not obj in param_dictionary:
param_dictionary[obj] = (
obj_dict # add the object dictionary to the parameters
)
else:
param_dictionary[obj].update(
obj_dict
) # update in cases where multiple places provide attributes
break
return param_dictionary