import random
from scipy.stats import loguniform
import argparse
import numpy as np
from string import Template
import json
import os
import sys
from datetime import date
import logging
from nufeb_tools import __version__
from pathlib import Path
from glob import glob
__author__ = "Jonathan Sakkos"
__copyright__ = "Jonathan Sakkos"
__license__ = "MIT"
_logger = logging.getLogger(__name__)
[docs]def parse_args(args):
"""Parse command line parameters
Args:
args (List[str]): command line parameters as list of strings
(for example ``["--help"]``).
Returns:
:obj:`argparse.Namespace`: command line parameters namespace
"""
# arguments to modify the conditions of the simulation seeding
parser = argparse.ArgumentParser(description="Create atom definition files")
parser.add_argument(
"--n",
dest="num",
action="store",
default=1,
help="Create atom definition files for NUFEB with --n #files desired (default is 1)",
)
parser.add_argument(
"--r", dest="reps", action="store", default=1, help="Number of replicates"
)
parser.add_argument(
"--c",
dest="culture_type",
action="store",
default="co",
help="Set culture conditions with --c (co-culture), --ax-c (cyano), --ax-e (e.coli)",
)
parser.add_argument(
"--co2",
dest="co2",
action="store",
default=6.8e-1,
help="Set initial CO2 concentration (mM)",
)
parser.add_argument(
"--d",
dest="dims",
action="store",
type=str,
default="1e-4,1e-4,1e-5",
help="Set simulation box dimensions (m)",
)
parser.add_argument(
"--t",
dest="timesteps",
action="store",
default=35000,
help="Number of timesteps to run",
)
parser.add_argument(
"--suc",
dest="sucrose",
action="store",
default=1e-19,
help="Set initial sucrose concentration (mM)",
)
parser.add_argument(
"--grid",
dest="grid",
action="store",
default=2,
help="Diffusion grid density (um/grid)",
)
parser.add_argument(
"--mono",
dest="monolayer",
action="store",
default=True,
type=bool,
help="Set seed generation to monolayer of cells",
)
parser.add_argument("--u", dest="user", action="store", help="CADES/CNMS user ID")
parser.add_argument(
"--cells",
dest="cells",
action="store",
default=None,
help="Number of cyanobacteria and e.coli to initialize simulation with, `e.g., 100,100. ` Default is random number between 1 and 100.",
)
parser.add_argument(
"--od",
dest="od",
action="store",
default=None,
help="Optical density of cyanobacteria and e.coli to initialize simulation with, `e.g., 0.3,1`",
)
parser.add_argument(
"--sucR",
dest="SucRatio",
action="store",
default=None,
help="Set sucrose secretion ratio (0 to 1). Default is random.",
)
parser.add_argument(
"--iptg",
dest="iptg",
action="store",
default=None,
type=float,
help="Set IPTG induction for sucrose secretion (0 to 1). Default is random.",
)
parser.add_argument(
"--muecw",
dest="mu_ecw",
action="store",
default=3.55e-4,
type=float,
help="E. coli W maximum growth rate",
)
parser.add_argument(
"--mucya",
dest="mu_cya",
action="store",
default=2.296e-05,
type=float,
help="S. elongatus maximum growth rate",
)
parser.add_argument(
"--rhoecw",
dest="rho_ecw",
action="store",
default=230,
type=float,
help="E. coli W cell density",
)
parser.add_argument(
"--rhocya",
dest="rho_cya",
action="store",
default=370,
type=float,
help="S. elongatus cell density",
)
parser.add_argument(
"--kco2",
dest="kco2",
action="store",
default=0.00812672,
type=float,
help="S. elongatus Kco2",
)
parser.add_argument(
"--ksuc",
dest="ksuc",
action="store",
default=3.6,
type=float,
help="E. coli W Ksuc",
)
parser.add_argument(
"--maintecw",
dest="ecw_maint",
action="store",
default=0,
type=float,
help="E. coli W maintenance cost",
)
parser.add_argument(
"--yieldecw",
dest="ecw_yield",
action="store",
default=0.49,
type=float,
help="E. coli W biomass yield (g-dw/g-sucrose)",
)
parser.add_argument(
"--decayecw",
dest="ecw_decay",
action="store",
default=0,
type=float,
help="E. coli W decay rate",
)
parser.add_argument(
"--biodt",
dest="biodt",
action="store",
default=100,
type=int,
help="Biological timesteps.",
)
parser.add_argument(
"--mass",
dest="mass_max",
action="store",
default=None,
type=float,
help="Maximum biomass",
)
parser.add_argument(
"--vtk",
dest="vtk",
action="store",
default=False,
type=bool,
help="Output VTK files",
)
parser.add_argument(
"--h5", dest="hdf5", action="store", default=True, help="Output HDF5 files"
)
parser.add_argument(
"--lammps",
dest="lammps",
action="store",
default=False,
help="Output lammps files",
)
parser.add_argument(
"-v",
"--verbose",
dest="loglevel",
help="set loglevel to INFO",
action="store_const",
const=logging.INFO,
)
parser.add_argument(
"-vv",
"--very-verbose",
dest="loglevel",
help="set loglevel to DEBUG",
action="store_const",
const=logging.DEBUG,
)
parser.add_argument(
"--niter",
dest="niter",
action="store",
help="Number of iterations for diffusion calculation",
type=int,
default=1000000,
)
parser.add_argument(
"--suc_halt",
dest="sucrose_halt",
default=None,
type=int,
help="Add halt condition for sucrose levels.",
)
parser.add_argument(
"--division", dest="division", default="on", type=str, help="Cellular division"
)
return parser.parse_args(args)
# TODO Change sucRatio to IPTG
[docs]def setup_logging(loglevel):
"""Setup basic logging
Args:
loglevel (int): minimum loglevel for emitting messages
"""
logformat = "[%(asctime)s] %(levelname)s:%(name)s:%(message)s"
logging.basicConfig(
level=loglevel, stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S"
)
[docs]def clean():
"""Remove old NUFEB runs"""
if os.path.isdir("runs"):
import shutil
# TODO add folder removal to logger
try:
shutil.rmtree("runs")
except OSError as e:
print("Error: %s : %s" % ("runs", e.strerror))
slurm_path = glob("*.slurm")
if slurm_path:
for file in slurm_path:
os.remove(file)
[docs]def main(args):
"""Wrapper function to generate new NUFEB simulation conditions
Args:
args (List[str]): command line parameters as list of strings
"""
args = parse_args(args)
setup_logging(args.loglevel)
_logger.info("Generating NUFEB simulation files")
# maximum growth rates, mu
# $mu_cyanos = round(0.06/3600,7)
# mu_ecw = 2.7e-04 for 37C only
# mu_ecw = 6.71e-5
# molecular weights of co2 and sucrose for unit conversions
CO2MW = 44.01
SucMW = 342.3
Biomass2OD = np.array([0.28, 0.44]) * np.prod(
[float(x) for x in args.dims.split(",")]
) # kg-cells/m^3/OD
TEMPLATES_DIR = (Path(__file__).parent) / "templates"
InitialConditions = {
"cyano": {
"GrowthRate": args.mu_cya,
"min_size": 1.37e-6,
"max_size": 1.94e-6,
"Density": args.rho_cya,
"K_s": {"sub": 3.5e-4, "o2": 2e-4, "suc": 1e-2, "co2": args.kco2},
"GrowthParams": {"Yield": 0.55, "Maintenance": 0, "Decay": 0},
},
"ecw": {
"GrowthRate": args.mu_ecw,
"min_size": 8.8e-7,
"max_size": 1.39e-6,
"Density": args.rho_ecw,
"K_s": {"sub": 0, "o2": 1e-3, "suc": args.ksuc, "co2": 5e-2},
"GrowthParams": {
"Yield": args.ecw_yield,
"Maintenance": args.ecw_maint,
"Decay": args.ecw_decay,
},
},
"Nutrients": {
"Concentration": {
"sub": 1e-1,
"o2": 9e-3,
"suc": float(args.sucrose) * SucMW * 1e-3,
"co2": float(args.co2) * CO2MW * 1e-3,
},
"State": {"sub": "g", "o2": "l", "suc": "l", "co2": "l"},
"xbc": {"sub": "nn", "o2": "nn", "suc": "nn", "co2": "nn"},
"ybc": {"sub": "nn", "o2": "nn", "suc": "nn", "co2": "nn"},
"zbc": {"sub": "nn", "o2": "nd", "suc": "nn", "co2": "nd"},
},
"Diff_c": {"sub": 0, "o2": 2.30e-9, "suc": 5.2e-10, "co2": 1.9e-09},
"Dimensions": [float(x) for x in args.dims.split(",")],
"Replicates": int(args.reps),
}
# check for runs folder
if not os.path.isdir("runs"):
os.mkdir("runs")
today = str(date.today())
if args.mass_max is not None:
max_mass = float(args.mass_max)
else:
max_mass = (
1.5e-11
* np.prod([float(x) for x in args.dims.split(",")])
/ (1e-4 * 1e-4 * 1e-5)
)
# TODO add file generation details to logger
for n in range(1, int(args.num) + 1):
if args.iptg is not None:
IPTG = float(args.iptg)
SucRatio = IPTG
else:
IPTG = np.round(loguniform.rvs(1e-3, 1e0, size=1)[0], 5)
SucRatio = IPTG
InitialConditions["IPTG"] = IPTG
InitialConditions["SucRatio"] = SucRatio
_logger.info(f"IPTG = {IPTG}")
SucPct = int(SucRatio * 100)
mean_cyano_mass = (
np.mean(
[
(InitialConditions["cyano"]["min_size"]) ** 3,
(InitialConditions["cyano"]["max_size"]) ** 3,
]
)
/ 6
* np.pi
* 370
)
mean_ecw_mass = (
np.mean(
[
(InitialConditions["ecw"]["min_size"]) ** 3,
(InitialConditions["ecw"]["max_size"]) ** 3,
]
)
/ 6
* np.pi
* 370
)
if args.culture_type == "co":
cell_types = ["cyano", "ecw"]
if args.cells is not None:
n_cyanos = int(args.cells.split(",")[0])
n_ecw = int(args.cells.split(",")[1])
total_cyano_biomass = n_cyanos * mean_cyano_mass
total_ecw_biomass = n_ecw * mean_ecw_mass
InitialConditions["cyano"]["OD"] = total_cyano_biomass / Biomass2OD[0]
InitialConditions["ecw"]["OD"] = total_ecw_biomass / Biomass2OD[1]
elif args.od is not None:
total_cyano_biomass = float(args.od.split(",")[0]) * Biomass2OD[0]
n_cyanos = round(total_cyano_biomass / mean_cyano_mass)
total_ecw_biomass = float(args.od.split(",")[1]) * Biomass2OD[1]
n_ecw = round(total_ecw_biomass / mean_ecw_mass)
InitialConditions["cyano"]["OD"] = float(args.od.split(",")[0])
InitialConditions["ecw"]["OD"] = float(args.od.split(",")[1])
else:
n_cyanos = int(random.uniform(1, 100))
n_ecw = int(random.uniform(1, 100))
total_cyano_biomass = n_cyanos * mean_cyano_mass
total_ecw_biomass = n_ecw * mean_ecw_mass
InitialConditions["cyano"]["OD"] = total_cyano_biomass / Biomass2OD[0]
InitialConditions["ecw"]["OD"] = total_ecw_biomass / Biomass2OD[1]
n_cells = n_cyanos + n_ecw
cyGroup = "group CYANO type 1"
ecwGroup = "group ECW type 2"
if args.division.lower() == "on":
cyDiv = f"fix d1 CYANO divide {args.biodt} v_EPSdens v_divDia1 {random.randint(1,1e6)}"
ecwDiv = f"fix d2 ECW divide {args.biodt} v_EPSdens v_divDia2 {random.randint(1,1e6)}"
else:
cyDiv = ""
ecwDiv = ""
masses = "c_myMass[1]+c_myMass[2]"
elif args.culture_type == "ax-c":
cell_types = ["cyano"]
if args.cells is not None:
n_cyanos = int(args.cells.split(",")[0])
elif args.od is not None:
total_cyano_biomass = float(args.od.split(",")[0]) * Biomass2OD[0]
n_cyanos = round(total_cyano_biomass / mean_cyano_mass)
total_ecw_biomass = float(args.od.split(",")[1]) * Biomass2OD[1]
n_ecw = round(total_ecw_biomass / mean_ecw_mass)
InitialConditions["cyano"]["OD"] = float(args.od.split(",")[0])
InitialConditions["ecw"]["OD"] = float(args.od.split(",")[1])
else:
n_cyanos = int(random.uniform(1, 100))
n_ecw = 0
n_cells = n_cyanos
total_cyano_biomass = n_cyanos * mean_cyano_mass
total_ecw_biomass = n_ecw * mean_ecw_mass
InitialConditions["cyano"]["OD"] = total_cyano_biomass / Biomass2OD[0]
InitialConditions["ecw"]["OD"] = total_ecw_biomass / Biomass2OD[1]
cyGroup = "group CYANO type 1"
ecwGroup = ""
if args.division.lower() == "on":
cyDiv = f"fix d1 CYANO divide {args.biodt} v_EPSdens v_divDia1 {random.randint(1,1e6)}"
else:
cyDiv = ""
ecwDiv = ""
masses = "c_myMass[1]"
elif args.culture_type == "ax-e":
cell_types = ["ecw"]
if args.cells is not None:
n_ecw = int(args.cells.split(",")[1])
elif args.od is not None:
total_cyano_biomass = float(args.od.split(",")[0]) * Biomass2OD[0]
n_cyanos = round(total_cyano_biomass / mean_cyano_mass)
total_ecw_biomass = float(args.od.split(",")[1]) * Biomass2OD[1]
n_ecw = round(total_ecw_biomass / mean_ecw_mass)
InitialConditions["cyano"]["OD"] = float(args.od.split(",")[0])
InitialConditions["ecw"]["OD"] = float(args.od.split(",")[1])
else:
n_ecw = int(random.uniform(1, 100))
n_cyanos = 0
n_cells = n_ecw
total_cyano_biomass = n_cyanos * mean_cyano_mass
total_ecw_biomass = n_ecw * mean_ecw_mass
InitialConditions["cyano"]["OD"] = total_cyano_biomass / Biomass2OD[0]
InitialConditions["ecw"]["OD"] = total_ecw_biomass / Biomass2OD[1]
cyGroup = ""
ecwGroup = "group ECW type 1"
cyDiv = ""
if args.division.lower() == "on":
ecwDiv = f"fix d2 ECW divide {args.biodt} v_EPSdens v_divDia2 {random.randint(1,1e6)}"
else:
ecwDiv = ""
masses = "c_myMass[1]"
InitialConditions["cyano"]["StartingCells"] = n_cyanos
InitialConditions["ecw"]["StartingCells"] = n_ecw
InitialConditions["cyano"]["initial_biomass"] = total_cyano_biomass
InitialConditions["ecw"]["initial_biomass"] = total_ecw_biomass
_logger.info(f"Cyanos = {n_cyanos}")
_logger.info(f"Ecw = {n_ecw}")
_logger.info(f"Cyano biomass = {total_cyano_biomass} kg")
_logger.info(f"Ecw biomass = {total_ecw_biomass} kg")
_logger.info(f'Starting Cyano OD {InitialConditions["cyano"]["OD"]}')
_logger.info(f'Starting Ecw OD {InitialConditions["ecw"]["OD"]}')
RUN_DIR = Path(
f"runs/Run_{n_cyanos}_{n_ecw}_{IPTG:.2e}_{args.reps}_{today}_{random.randint(1,1e6)}"
)
if not os.path.isdir(RUN_DIR):
os.mkdir(RUN_DIR)
# TODO embed cell type into metadata file and generate cell type programmatically
grids = int(args.grid)
while True:
if (
InitialConditions["Dimensions"][0] * 1e6 % grids == 0
and InitialConditions["Dimensions"][1] * 1e6 % grids == 0
and InitialConditions["Dimensions"][2] * 1e6 % grids == 0
):
Mesh = f'{int(InitialConditions["Dimensions"][0]*1e6/grids)} {int(InitialConditions["Dimensions"][1]*1e6/grids)} {int(InitialConditions["Dimensions"][2]*1e6/grids)}'
break
else:
grids += 1
NutesNum = len(InitialConditions["Nutrients"]["Concentration"])
for r in range(1, int(args.reps) + 1):
L = [
" NUFEB Simulation\r\n\n",
f" {n_cells} atoms \n",
f" {len(cell_types)} atom types \n",
f" {NutesNum} nutrients \n\n",
f' 0.0e-4 {InitialConditions["Dimensions"][0] :.2e} xlo xhi \n',
f' 0.0e-4 {InitialConditions["Dimensions"][1] :.2e} ylo yhi \n',
f' 0.0e-4 {InitialConditions["Dimensions"][2] :.2e} zlo zhi \n\n',
" Atoms \n\n",
]
j = 1
for c, CellType in enumerate(cell_types, start=1):
sizes = np.random.uniform(
InitialConditions[CellType]["min_size"],
InitialConditions[CellType]["max_size"],
size=(InitialConditions[CellType]["StartingCells"],),
)
initial_biomass = InitialConditions[CellType]["initial_biomass"]
if initial_biomass > 0:
while not (
0.999 * initial_biomass
< sum(sizes ** 3 / 6 * np.pi * 370)
< 1.001 * initial_biomass
):
sizes = np.random.uniform(
InitialConditions[CellType]["min_size"],
InitialConditions[CellType]["max_size"],
size=(InitialConditions[CellType]["StartingCells"],),
)
n = 0
for i in range(j, InitialConditions[CellType]["StartingCells"] + j):
size = sizes[n]
x = random.uniform(
0 + size, InitialConditions["Dimensions"][0] - size
)
y = random.uniform(
0 + size, InitialConditions["Dimensions"][1] - size
)
if args.monolayer is True:
z = size
else:
z = random.uniform(
0 + size, InitialConditions["Dimensions"][2] - size
)
L.append(
f' %d {c} {size :.2e} {InitialConditions[CellType]["Density"]} {x :.2e} {y :.2e} {z :.2e} {size :.2e} \n'
% (i)
)
j += 1
n += 1
L.append("\n")
L.append(" Nutrients \n\n")
for i, nute in enumerate(
InitialConditions["Nutrients"]["Concentration"].keys()
):
L.append(
f' %d {nute} {InitialConditions["Nutrients"]["State"][nute]} {InitialConditions["Nutrients"]["xbc"][nute]} {InitialConditions["Nutrients"]["ybc"][nute]} {InitialConditions["Nutrients"]["zbc"][nute]} {InitialConditions["Nutrients"]["Concentration"][nute] :.2e} {InitialConditions["Nutrients"]["Concentration"][nute] :.2e} \n'
% (i + 1)
)
L.append("\n")
L.append(" Type Name \n\n")
for c, CellType in enumerate(cell_types, start=1):
L.append(f" {c} {CellType} \n")
L.append("\n")
L.append(" Diffusion Coeffs \n\n")
for key in InitialConditions["Diff_c"].keys():
L.append(f' {key} {InitialConditions["Diff_c"][key]} \n')
L.append("\n")
L.append(" Growth Rate \n\n")
for CellType in cell_types:
L.append(
f' {CellType} {InitialConditions[CellType]["GrowthRate"]} \n'
)
L.append("\n")
L.append(" Ks \n\n")
for CellType in cell_types:
k = f" {CellType}"
for key in InitialConditions[CellType]["K_s"].keys():
k = k + " " + str(InitialConditions[CellType]["K_s"][key])
k = k + f" \n"
L.append(k)
L.append("\n")
for key in InitialConditions["cyano"]["GrowthParams"].keys():
L.append(" " + key + f" \n\n")
for CellType in cell_types:
L.append(
f' {CellType} {InitialConditions[CellType]["GrowthParams"][key]} \n'
)
L.append("\n")
L.append("\n\n")
# write atom definition file
f = open(RUN_DIR / "atom.in", "w+")
f.writelines(L)
# write initial conditions json file
dumpfile = open(RUN_DIR / "metadata.json", "w")
json.dump(InitialConditions, dumpfile, indent=6)
dumpfile.close()
# write Inputscript
# open the file
atom_file_path = RUN_DIR.resolve() / "atom.in"
if args.lammps == True:
lammps = "dump id all custom 100 output.lammmps id type diameter x y z"
else:
lammps = ""
if args.hdf5 == True:
hdf5 = (
"dump traj all bio/hdf5 100 trajectory.h5 id type radius x y z con"
)
else:
hdf5 = ""
if args.vtk == True:
vtk = "dump du1 all vtk 100 atom_*.vtu id type diameter x y z"
grid = "dump du2 all grid 100 grid_%_*.vti con"
vtk_tarball = "true"
else:
vtk = ""
grid = ""
vtk_tarball = "false"
if args.sucrose_halt is not None:
suc_halt = f"fix h2 all halt {args.sucrose_halt} v_suc <= 1e-19"
else:
suc_halt = ""
filein = open(TEMPLATES_DIR / "inputscript.txt")
# read it
src = Template(filein.read())
# do the substitution
result = src.safe_substitute(
{
"n": n,
"SucRatio": SucRatio,
"SucPct": SucPct,
"n_cyanos": n_cyanos,
"n_ecw": n_ecw,
"Replicates": args.reps,
"IPTG": f"{IPTG:.0e}",
"Timesteps": args.timesteps,
"date": today,
"CYANOGroup": cyGroup,
"ECWGroup": ecwGroup,
"Zheight": InitialConditions["Dimensions"][2],
"CYANODiv": cyDiv,
"ECWDiv": ecwDiv,
"GridMesh": f'{int(InitialConditions["Dimensions"][0]*1e6/int(args.grid))} {int(InitialConditions["Dimensions"][1]*1e6/int(args.grid))} {int(InitialConditions["Dimensions"][2]*1e6/int(args.grid))}',
"lammps": lammps,
"hdf5": hdf5,
"vtk": vtk,
"grid": grid,
"masses": masses,
"mass_max": f"{max_mass:.2e}",
"DiffusionSteps": args.niter,
"atom_file_path": atom_file_path,
"biodt": args.biodt,
"sucrose_halt": suc_halt,
}
)
f = open(RUN_DIR / f"Inputscript.lammps", "w+")
f.writelines(result)
x = int(InitialConditions["Dimensions"][0] * 1e6)
y = int(InitialConditions["Dimensions"][1] * 1e6)
z = int(InitialConditions["Dimensions"][2] * 1e6)
# write slurm script
# open the file
filein = open(TEMPLATES_DIR / "slurm.txt")
# read it
src = Template(filein.read())
# do the substitution
result = src.safe_substitute(
{"job": f"NUFEB_{n}", "USER": args.user, "VTK": vtk_tarball}
)
f = open(f"NUFEB_{today}.slurm", "w+")
f.writelines(result)
# write local run script
# open the file
filein = open(TEMPLATES_DIR / "local.txt")
# read it
src = Template(filein.read())
# do the substitution
result = src.safe_substitute(
{
"n": n,
"SucRatio": SucRatio,
"SucPct": SucPct,
"n_cyanos": n_cyanos,
"n_ecw": n_ecw,
"Reps": args.reps,
}
)
f = open(RUN_DIR / f"local_{n_cyanos}_{n_ecw}_{SucPct}.sh", "w+")
f.writelines(result)
for arg in vars(args):
_logger.info(f"{arg}={getattr(args, arg)}")
_logger.info("Script ends here")
[docs]def run():
"""Calls :func:`main` passing the CLI arguments extracted from :obj:`sys.argv`
This function can be used as entry point to create console scripts with setuptools.
"""
main(sys.argv[1:])
if __name__ == "__main__":
run()