Source code for nufeb_tools.generate_atom_dev

"""
This is a script to seed NUFEB simulations
"""

import random
import argparse
import numpy as np
import pickle
from string import Template
import json  # For dealing with metadata
import os  # For file level operations
from itertools import combinations
import itertools
from collections import defaultdict
import sys
import logging
from importlib import resources
from pathlib import Path
from nufeb_tools import __version__

__author__ = "Jonathan Sakkos"
__copyright__ = "Jonathan Sakkos"
__license__ = "MIT"

_logger = logging.getLogger(__name__)

# TODO update lmp and atom templates
CellInfo = {
    "cyano": {
        "GrowthRate": round(0.06 / 3600, 7),
        "min_length": 1e-6,
        "max_length": 5e-6,
        "Diameter": 1e-6,
        "Density": 370,
        "Inertia": {"ixx": 0, "iyy": 0, "izz": 9.2e-23, "ixy": 0, "ixz": 0, "iyz": 0},
        "K_s": {"light": 3.5e-4, "o2": 2e-4, "suc": 1e-2, "co2": 1.38e-4},
        "Yield": 0.55,
        "Maintenance": 0,
        "Decay": 0,
    },
    "ecw": {
        "GrowthRate": 2.7e-04,
        "min_length": 1.94e-6,
        "max_length": 2.72e-6,
        "Diameter": 0.73e-6,
        "Density": 236,
        "Inertia": {"ixx": 0, "iyy": 0, "izz": 9.2e-23, "ixy": 0, "ixz": 0, "iyz": 0},
        "K_s": {"light": 0, "o2": 1e-3, "suc": 3.6, "co2": 5e-2},
        "Yield": 0.43,
        "Maintenance": 9.50e-7,
        "Decay": 2e-5,
    },
}


# arguments to modify the conditions of the simulation seeding


[docs]def parse_args(args): """Parse command line parameters Args: args (List[str]): command line parameters as list of strings (for example ``["--help"]``). Returns: :obj:`argparse.Namespace`: command line parameters namespace """ parser = argparse.ArgumentParser(description="Create atom definition files") parser.add_argument( "--n", dest="num", action="store", default=1, help="Create atom definition files for NUFEB with --n #files desired (default is 1)", ) parser.add_argument( "--r", dest="reps", action="store", default=1, help="Number of replicates" ) parser.add_argument( "--c", dest="culture_type", action="store", default="co", help="Set culture conditions with --c (co-culture), --ax-c (cyano), --ax-e (e.coli)", ) parser.add_argument( "--cells", dest="cells_init", action="store", default=50, help="Number of total cells to initialize simulation with", ) parser.add_argument( "--co2", dest="co2", action="store", default=1e3, help="Set initial CO2 concentration (mM)", ) parser.add_argument( "--d", dest="dims", action="store", type=str, default="1e-4,1e-4,1e-5", help="Set simulation box dimensions (m)", ) parser.add_argument( "--ts", dest="timestep", action="store", default=600, help="Timestep length" ) parser.add_argument( "--t", dest="ntimesteps", action="store", default=600, help="Number of timesteps to run", ) parser.add_argument( "--suc", dest="sucrose", action="store", default=1e-19, help="Set initial sucrose concentration (mM)", ) parser.add_argument( "--grid", dest="grid", action="store", default=2, help="Diffusion grid density (um/grid)", ) parser.add_argument( "--mono", dest="monolayer", action="store", default=True, help="Set seed generation to monolayer of cells", ) parser.add_argument("--u", dest="user", action="store", help="CADES/CNMS user ID") parser.add_argument( "--vtk", dest="vtk", action="store", default=True, help="Enable VTK dump" ) parser.add_argument( "--hdf5", dest="hdf", action="store", default=True, help="Enable HDF5 dump" ) parser.add_argument( "--img", dest="img", action="store", default=False, help="Enable image dump" ) parser.add_argument( "--mov", dest="movie", action="store", default=True, help="Enable movie dump" ) parser.add_argument( "-v", "--verbose", dest="loglevel", help="set loglevel to INFO", action="store_const", const=logging.INFO, ) parser.add_argument( "-vv", "--very-verbose", dest="loglevel", help="set loglevel to DEBUG", action="store_const", const=logging.DEBUG, ) return parser.parse_args(args)
[docs]def setup_logging(loglevel): """Setup basic logging Args: loglevel (int): minimum loglevel for emitting messages """ logformat = "[%(asctime)s] %(levelname)s:%(name)s:%(message)s" logging.basicConfig( level=loglevel, stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S" )
[docs]class Nutrient: """ Nutrient class to define the chemicals present in the simulation volume, their concentrations, and their properties """ def __init__(self, c, d, mw, state, bc): self.concentration = c self.diffusion = d self.moleculuarWeight = mw self.state = state self.boundary = bc if self.moleculuarWeight is not None: self.concentrationNufeb = np.format_float_scientific( self.concentration * self.moleculuarWeight * 1e-3, precision=1 ) else: self.concentrationNufeb = np.format_float_scientific( self.concentration, precision=1 )
[docs]class Cell: """ Bacteria object class """ def __init__(self, Species, Group, idx, args, Info=CellInfo): self.group = Group self.species = Species self.growth = Info[self.species]["GrowthRate"] self.min_length = Info[self.species]["min_length"] self.max_length = Info[self.species]["max_length"] self.diameter = Info[self.species]["Diameter"] self.density = Info[self.species]["Density"] self.Ks = Info[self.species]["K_s"] self.yld = Info[self.species]["Yield"] self.maintenance = Info[self.species]["Maintenance"] self.decay = Info[self.species]["Decay"] self.inertia = Info[self.species]["Inertia"] self.boundaries = [float(x) for x in args.dims.split(",")] self.length = random.uniform(self.min_length, self.max_length) self.monolayer = args.monolayer self.x = random.uniform(0, self.boundaries[0]) self.y = random.uniform(0, self.boundaries[1]) if self.monolayer: self.z = 1e-6 else: self.z = random.uniform(0, self.boundaries[2]) self.angle = random.uniform(1, 180) self.x_displacement = np.format_float_scientific( self.length / 2 * np.cos(self.angle * np.pi / 360), precision=1 ) self.y_displacement = np.format_float_scientific( self.length / 2 * np.sin(self.angle * np.pi / 360), precision=1 ) self.z_displacement = 0 self.z_angle = 0 self.index = idx
[docs] def Atom(self): """ Function to return atom (cell) positions to render atom.in file """ return " ".join( map( str, [ self.index + 1, self.group, 1, self.density, np.format_float_scientific(self.x, precision=2), np.format_float_scientific(self.y, precision=2), np.format_float_scientific(self.z, precision=2), 1, " \n", ], ) )
[docs] def rotate(self, z_dim=False): """ Randomly generate cell orientation displacements based on input angle """ self.x_displacement = np.format_float_scientific( self.length / 2 * np.cos(self.angle * np.pi / 360), precision=1 ) self.y_displacement = np.format_float_scientific( self.length / 2 * np.sin(self.angle * np.pi / 360), precision=1 ) zd = z_dim if zd == True: self.z_angle = random.uniform(1, 180) self.z_displacement = np.format_float_scientific( self.length / 2 * np.sin(self.z_angle * np.pi / 360), precision=1 ) else: self.z_displacement = 0 self.z_angle = 0 return [self.x_displacement, self.y_displacement, self.z_displacement]
[docs] def Bacillus(self): """ Function to return rod shape (bacillus) parameters to render atom.in file """ return " ".join( map( str, [self.index + 1] + list(self.inertia.values()) + self.rotate() + [self.diameter] + [" \n"], ) )
[docs] def Report(self): """ Return cell position, orientation, and size """ return [self.x, self.y, self.z, self.angle, self.length, self.diameter]
[docs] def Check(self): """ Return cell position, orientation, and size """ return self.x, self.y, self.length, self.index
[docs]class Culture: """ Create a collection of cells with defined positions, lengths, and orientations """ def __init__(self, args): self.cultureType = args.culture_type self.n_cells = int(args.cells_init) self.SucRatio = round(random.random(), 3) self.SucPct = int(self.SucRatio * 100) self.boundaries = [float(x) for x in args.dims.split(",")] if self.cultureType == "co": self.cell_types = ["cyano", "ecw"] self.n_cyanos = int(random.uniform(1, self.n_cells - 1)) self.n_ecw = int(self.n_cells - self.n_cyanos) # self.n_cells = n_cyanos + n_ecw self.cellCount = {"cyano": self.n_cyanos, "ecw": self.n_ecw} self.cyGroup = "group CYANO type 1" self.ecwGroup = "group ECW type 2" self.cyDiv = f'fix div_cya CYANO nufeb/divide/bacillus {CellInfo["cyano"]["max_length"]} {random.randint(1,1e6)}' self.ecwDiv = f'fix div_ecw ECW nufeb/divide/bacillus {CellInfo["ecw"]["max_length"]} {random.randint(1,1e6)}' self.cyMonod = f'fix monod_cyano CYANO nufeb/monod/cyano light {CellInfo["cyano"]["K_s"]["light"] : .2e} o2 co2 {CellInfo["cyano"]["K_s"]["co2"] : .2e} suc gco2 growth {CellInfo["cyano"]["GrowthRate"] : .2e} yield {CellInfo["cyano"]["Yield"] : .2e} suc_exp {self.SucRatio}' self.ecwMonod = f'fix monod_ecw ECW nufeb/monod/ecoli/wild suc {CellInfo["ecw"]["K_s"]["suc"] : .2e} o2 {CellInfo["ecw"]["K_s"]["o2"] : .2e} co2 growth {CellInfo["ecw"]["GrowthRate"] : .2e} yield {CellInfo["ecw"]["Yield"] : .2e} maintain {CellInfo["ecw"]["Maintenance"] : .2e} decay {CellInfo["ecw"]["Decay"] : .2e}' self.cyanoCount = 'variable ncyano equal "count(CYANO)"' self.ecwCount = 'variable necw equal "count(ECW)"' self.vcyano = "v_ncyano" self.vecw = "v_necw" elif self.cultureType == "ax-c": self.cell_types = ["cyano"] self.n_cyanos = int(random.uniform(1, self.n_cells)) self.n_ecw = 0 # self.n_cells = n_cyanos self.cellCount = {"cyano": self.n_cyanos} self.cyGroup = "group CYANO type 1" self.ecwGroup = "" self.cyDiv = f'fix div_cya CYANO nufeb/divide/bacillus {CellInfo["cyano"]["max_length"]} {random.randint(1,1e6)}' self.ecwDiv = "" self.cyMonod = f'fix monod_cyano CYANO nufeb/monod/cyano light {CellInfo["cyano"]["K_s"]["light"] : .2e} o2 co2 {CellInfo["cyano"]["K_s"]["co2"] : .2e} suc gco2 growth {CellInfo["cyano"]["GrowthRate"] : .2e} yield {CellInfo["cyano"]["Yield"] : .2e} suc_exp {self.SucRatio}' self.ecwMonod = "" self.cyanoCount = 'variable ncyano equal "count(CYANO)"' self.ecwCount = "" self.vcyano = "v_ncyano" self.vecw = "" elif self.cultureType == "ax-e": self.cell_types = ["ecw"] self.n_ecw = int(random.uniform(1, self.n_cells)) self.n_cyanos = 0 # self.n_cells = n_ecw self.cellCount = {"ecw": self.n_ecw} self.cyGroup = "" self.ecwGroup = "group ECW type 1" self.cyDiv = "" self.ecwDiv = f'fix div_ecw ECW nufeb/divide/bacillus {CellInfo["ecw"]["max_length"]} {random.randint(1,1e6)}' self.cyMonod = "" self.ecwMonod = f'fix monod_ecw ECW nufeb/monod/ecoli/wild suc {CellInfo["ecw"]["K_s"]["suc"] : .2e} o2 {CellInfo["ecw"]["K_s"]["o2"] : .2e} co2 growth {CellInfo["ecw"]["GrowthRate"] : .2e} yield {CellInfo["ecw"]["Yield"] : .2e} maintain {CellInfo["ecw"]["Maintenance"] : .2e} decay {CellInfo["ecw"]["decay"] : .2e}' self.cyanoCount = "" self.ecwCount = 'variable necw equal "count(ECW)"' self.vcyano = "" self.vecw = "v_necw" counter = itertools.count(0) self.cells = [ Cell(CellType, c, next(counter), args) for c, CellType in enumerate(self.cell_types, start=1) for i in range(self.cellCount[CellType]) ] self.Check() def __iter__(self): return iter(self.cells)
[docs] def Check(self): """ Check initial positions of each cell and move one of them if there is a collision """ cleared = False while not cleared: for i in list(combinations([cell.Check() for cell in self.cells], 2)): # for i in list(combinations(zip(self.locations.x,self.locations.y,self.locations.length,self.locations.index),2)): x1 = i[0][0] y1 = i[0][1] r1 = i[0][2] / 2 idx1 = i[0][3] x2 = i[1][0] y2 = i[1][1] r2 = i[1][2] / 2 idx1 = i[0][3] idx2 = i[1][3] distance = (x1 - x2) * (x1 - x2) + (y1 - y2) * (y1 - y2) radii = (r1 + r2) * (r1 + r2) if distance == radii: cleared = True elif distance > radii: cleared = True else: if x1 > x2 and y1 > y2: if ( x1 + r1 > 0 and x1 + r1 < self.boundaries[0] and y1 + r1 > 0 and y1 + r1 < self.boundaries[1] ): self.cells[idx1].x = x1 + r1 / 2 self.cells[idx1].y = y1 + r1 / 2 elif x1 > x2 and y1 < y2: if ( x1 + r1 > 0 and x1 + r1 < self.boundaries[0] and y1 - r1 > 0 and y1 - r1 < self.boundaries[1] ): self.cells[idx1].x = x1 + r1 / 2 self.cells[idx1].y = y1 - r1 / 2 elif x1 < x2 and y1 > y2: if ( x1 - r1 > 0 and x1 - r1 < self.boundaries[0] and y1 + r1 > 0 and y1 + r1 < self.boundaries[1] ): self.cells[idx1].x = x1 - r1 / 2 self.cells[idx1].y = y1 + r1 / 2 else: if ( x1 - r1 > 0 and x1 - r1 < self.boundaries[0] and y1 - r1 > 0 and y1 - r1 < self.boundaries[1] ): self.cells[idx1].x = x1 - r1 / 2 self.cells[idx1].y = y1 - r1 / 2 _logger.debug( f"Bumped from {x1 :.2e}, {y1 :.2e} to {self.cells[idx1].x :.2e}, {self.cells[idx1].y :.2e}" ) cleared = False return
[docs]def clean(): if os.path.isdir("runs"): import shutil try: shutil.rmtree("runs") except OSError as e: print("Error: %s : %s" % ("runs", e.strerror))
[docs]def main(args): """Wrapper function to generate new NUFEB simulation conditions Args: args (List[str]): command line parameters as list of strings """ args = parse_args(args) setup_logging(args.loglevel) _logger.debug("Generating NUFEB simulation files") # create nutrients light = Nutrient(1e-1, None, None, "g", "nn") co2 = Nutrient(float(args.co2), 1.9e-09, 44.01, "l", "nn") o2 = Nutrient(0.28125, 2.30e-9, 32, "l", "nn") sucrose = Nutrient(float(args.sucrose), 5.2e-10, 342.3, "l", "nn") gco2 = Nutrient(0, None, 44.01, "g", "nn") TEMPLATES_DIR = (Path(__file__).parent) / "templates" captureRate = round(1000 / args.timestep) # define dump parameters dump_list = { "vtk_dump": f"dump atom_vtk all vtk {captureRate} dump*.vtu id type diameter vx vy vz fx fy fz \n dump grid_vtk all grid/vtk {captureRate} dump_%_*.vti con", "image_dump": f"dump du_image all image {captureRate} image.*.jpg type diameter zoom 2 bacillus type size 1280 720 view 45 60 \n dump_modify du_image acolor 1 green acolor 2 red", "movie_dump": f"dump du_mov all movie {captureRate} movie.avi type diameter zoom 1.5 bacillus type size 1280 720 view 0 0 \n dump_modify du_mov acolor 1 green acolor 2 red", "hdf_dump": f"dump du_h5 all nufeb/hdf5 {captureRate} dump.h5 id type x y z vx vy vz fx fy fz radius conc reac", } dumps = defaultdict(list) for i in range(4): tmp = ["vtk_dump", "image_dump", "movie_dump", "hdf_dump"] dumps[tmp[i]] for dump, dump_var in zip( [args.vtk, args.img, args.movie, args.hdf], ["vtk_dump", "image_dump", "movie_dump", "hdf_dump"], ): if dump is True or dump == "True": dumps[dump_var] = dump_list[dump_var] else: dumps[dump_var] = "" ## Species-specific parameters # check for runs folder if not os.path.isdir("runs"): os.mkdir("runs") x = float(args.dims.split(",")[0]) y = float(args.dims.split(",")[1]) z = float(args.dims.split(",")[2]) for n in range(1, int(args.num) + 1): culture = Culture(args) atoms_list = [] bacilli_list = [] # Create list of atoms and bacilli for atom definition file for cell in culture.cells: atoms_list.append(cell.Atom()) bacilli_list.append(cell.Bacillus()) # make atom definition file for r in range(1, int(args.reps) + 1): L = [ " NUFEB Simulation\r\n\n", f" {args.cells_init} atoms \n", f" {len(culture.cell_types)} atom types \n", f" {args.cells_init} bacilli \n\n", f" 0.0e-4 {x :.2e} xlo xhi \n", f" 0.0e-4 {y :.2e} ylo yhi \n", f" 0.0e-4 {z :.2e} zlo zhi \n\n", " Atoms \n\n", ] atoms = L + atoms_list atoms.append("\n") atoms.append(" Bacilli \n\n") atoms = atoms + bacilli_list # write atom definition file f = open( f"runs/atom_{culture.n_cyanos}_{culture.n_ecw}_{culture.SucPct}_{r}.in", "w+", ) f.writelines(atoms) RUN_DIR = ( Path("runs") / f"Run_{culture.n_cyanos}_{culture.n_ecw}_{culture.SucPct}_{args.reps}" ) if not os.path.isdir(RUN_DIR): os.mkdir(RUN_DIR) # os.mkdir(f'runs/Run_{culture.n_cyanos}_{culture.n_ecw}_{culture.SucPct}_{args.reps}') # write initial conditions json file dumpfile = open(RUN_DIR / "metadata.json", "w") # dumpfile = open(f"/runs/Run_{culture.n_cyanos}_{culture.n_ecw}_{culture.SucPct}_{args.reps}/metadata.json",'w') json.dump(CellInfo, dumpfile, indent=6) dumpfile.close() ### # write Inputscript # open the file filein = open(TEMPLATES_DIR / "bacillus.txt") # filein = resources.read_text("nufeb_tools.templates", "Bacillus.txt") # read it src = Template(filein.read()) # do the substitution result = src.safe_substitute( { "n": args.cells_init, "SucRatio": culture.SucRatio, "SucPct": culture.SucPct, "n_cyanos": culture.n_cyanos, "n_ecw": culture.n_ecw, "Replicates": args.reps, "Timesteps": args.ntimesteps, "ts": args.timestep, "CYANOGroup": culture.cyGroup, "ECWGroup": culture.ecwGroup, "Zheight": float(args.dims.split(",")[2]), "CYANODiv": culture.cyDiv, "ECWDiv": culture.ecwDiv, "light": light.concentration, "co2": co2.concentration, "o2": o2.concentration, "sucrose": sucrose.concentration, "gco2": gco2.concentration, "CYANOMonod": culture.cyMonod, "ECWMonod": culture.ecwMonod, "CYANOcount": culture.cyanoCount, "ECWcount": culture.ecwCount, "v_ncyano": culture.vcyano, "v_necw": culture.vecw, "vtk_dump": dumps["vtk_dump"], "image_dump": dumps["image_dump"], "movie_dump": dumps["movie_dump"], "hdf_dump": dumps["hdf_dump"], } ) f = open( f"./runs/Inputscript_{culture.n_cyanos}_{culture.n_ecw}_{culture.SucPct}.lmp", "w+", ) f.writelines(result) # write local run script # open the file filein = open(TEMPLATES_DIR / "local.txt") # filein = resources.read_text("nufeb_tools.templates", "local.txt") # read it src = Template(filein.read()) # do the substitution result = src.safe_substitute( { "n": n, "SucRatio": culture.SucRatio, "SucPct": culture.SucPct, "n_cyanos": culture.n_cyanos, "n_ecw": culture.n_ecw, "Reps": args.reps, } ) f = open( f"./runs/local_{culture.n_cyanos}_{culture.n_ecw}_{culture.SucPct}.sh", "w+" ) f.writelines(result) # write slurm script # open the file filein = open(TEMPLATES_DIR / "slurm_dev.txt") # filein = resources.read_text("nufeb_tools.templates", "Slurm.txt") # read it src = Template(filein.read()) # do the substitution result = src.safe_substitute( { "n": args.cells_init, "job": f"NUFEB_cyano{n}", "USER": args.user, "Replicates": args.reps, "SucPct": culture.SucPct, "n_cyanos": culture.n_cyanos, "n_ecw": culture.n_ecw, } ) _logger.info("Script ends here")
[docs]def run(): """Calls :func:`main` passing the CLI arguments extracted from :obj:`sys.argv` This function can be used as entry point to create console scripts with setuptools. """ main(sys.argv[1:])
if __name__ == "__main__": run()