import os
import h5py
from pathlib import Path
import pandas as pd
import numpy as np
import json
import subprocess
import sys
import argparse
import pickle
import json
from urllib.parse import urlparse
from urllib.request import urlretrieve
import tarfile
from scipy.spatial.distance import pdist, squareform
from scipy.spatial import KDTree
from tqdm import tqdm
import cv2
from nufeb_tools import __version__
urls = ["https://github.com/Jsakkos/nufeb-tools/raw/main/data/runs.tar"]
[docs]class get_data:
"""Collect results for analysis.
NUFEB simulation data class to collect results for analysis
Attributes:
test (bool): Set `test = True` to get example data from the Github repository
directory (str): Path to the directory containing NUFEB simulation data.
timestep (int): Length of simulation timestep in seconds
SucRatio (int): Relative cyanobacterial sucrose secretion level, 0-100
timepoints (List(str)): List of timepoints in the simulation
dims (List(str)): Size of the simulation boundaries in micrometers
numsteps (int): Number of timepoints
biomass (pandas.DataFrame): Pandas Dataframe containing the biomass vs time data from biomass.csv
ntypes (pandas.DataFrame): Pandas Dataframe containing the cell number vs time data from ntypes.csv
avg_con (pandas.DataFrame): Pandas Dataframe containing the average nutrient concentrations vs time data from avg_concentration.csv
positions (pandas.DataFrame): Pandas Dataframe containing the single cell biomass over time of all cell ids present at the timepoint
"""
def __init__(self, directory=None, id=None, test=None, timestep=10):
self.timestep = timestep
if test:
self.directory = str(
(Path.home())
/ ".nufeb_tools"
/ "data"
/ "Run_45_56_45_1_2021-12-03_671906"
)
if not os.path.isdir(self.directory):
download_test_data()
self.get_local_data()
elif directory:
self.directory = directory
self.get_local_data()
else:
print("Missing local directory")
self.dims += [self.dims.pop(0)] # move Z dimension to last: z,x,y to x,y,z
self.numsteps = len(self.timepoints)
self.Timesteps = self.positions.Timestep.unique()
self.calc_biomass()
# self.get_mothers()
[docs] def get_local_data(self):
"""
Collect NUFEB simulation data from a local directory.
"""
try:
h5 = h5py.File(os.path.join(self.directory, "trajectory.h5"), mode="r")
self.timepoints = [key for key in h5["concentration"]["co2"].keys()]
self.timepoints.sort(key=int)
self.dims = list(h5["concentration"]["co2"]["0"].shape)
self.nutrients = list(h5["concentration"].keys())
self.collect_positions(h5)
self.get_nutrient_grid(h5)
h5.close()
except:
print("Missing HDF5 file")
self.biomass = pd.read_csv(
os.path.join(self.directory, "Results", "biomass.csv"),
usecols=[0, 1, 2],
delimiter="\t",
)
self.ntypes = pd.read_csv(
os.path.join(self.directory, "Results", "ntypes.csv"),
usecols=[0, 1, 2],
delimiter="\t",
)
self.avg_con = pd.read_csv(
os.path.join(self.directory, "Results", "avg_concentration.csv"),
usecols=[0, 2, 3, 4],
delimiter="\t",
names=["Time", "O2", "Sucrose", "CO2"],
skiprows=1,
)
f = open(os.path.join(self.directory, "metadata.json"), "r")
self.metadata = json.load(f)
f.close()
if "IPTG" in self.metadata:
self.IPTG = self.metadata["IPTG"]
self.sucRatio = self.metadata["IPTG"]
else:
self.IPTG = self.metadata["SucRatio"]
# TODO replace sucRatio with IPTG
self.sucRatio = self.metadata["SucRatio"]
self.convert_units_avg_con()
self.convert_units_biomass()
[docs] def convert_units_avg_con(self):
"""Convert the object attribute avg_con, which contains the average nutrient concentration, units to hours and mM.
"""
self.avg_con.index = self.avg_con.Time / 60 / 60 * self.timestep
self.avg_con.index.name = "Hours"
self.avg_con.drop("Time", inplace=True, axis=1)
SucroseMW = 342.3
O2MW = 32
CO2MW = 44.01
self.avg_con.O2 = self.avg_con.O2 / O2MW * 1e3
self.avg_con.Sucrose = self.avg_con.Sucrose / SucroseMW * 1e3
self.avg_con.loc[:, "CO2"] = self.avg_con.loc[:, "CO2"] / CO2MW * 1e3
[docs] def convert_units_biomass(self):
"""Convert the object attribute biomass units to hours and femtograms.
"""
self.biomass.index = self.biomass.step / 60 / 60 * self.timestep
self.biomass.index.name = "Hours"
self.biomass.iloc[:, 1:] = self.biomass.iloc[:, 1:] * 1e18
[docs] def calc_biomass(self):
df = self.positions
df["biomass"] = 0
df.loc[df.type == 1, "biomass"] = (
4 / 3 * np.pi * df["radius"] ** 3 * self.metadata["cyano"]["Density"] * 1e18
)
df.loc[df.type == 2, "biomass"] = (
4 / 3 * np.pi * df["radius"] ** 3 * self.metadata["ecw"]["Density"] * 1e18
)
df["time"] = df["Timestep"].transform(lambda j: j / 360)
[docs] def collect_positions(self, h5):
"""
Extract the x, y, z position of each cell during the simulation.
Args:
timepoint (int):
The simulation timestep to get the position data from.
Returns:
pandas.DataFrame:
Dataframe containing Timestep, ID, type, radius, x, y, z columns
"""
dfs = list()
for t in self.timepoints:
dfs.append(
pd.concat(
[
pd.Series(
np.ones(h5["x"][str(t)].len()) * int(t),
dtype=int,
name="Timestep",
),
pd.Series(h5["id"][str(t)], name="ID"),
pd.Series(h5["type"][str(t)], name="type"),
pd.Series(h5["radius"][str(t)], name="radius"),
pd.Series(h5["x"][str(t)], name="x"),
pd.Series(h5["y"][str(t)], name="y"),
pd.Series(h5["z"][str(t)], name="z"),
],
axis=1,
)
)
temp = pd.concat(dfs, ignore_index=True)
idx = temp[temp.type == 0].index
self.positions = temp.drop(idx).reset_index(drop=True)
[docs] def get_neighbor_distance(self, id, timepoint):
"""
Get the nearest neighbor cell distances
Args:
id (int):
The ID of the reference cell
timepoint (int):
The timepoint to check the neighbor distances from
Returns:
pandas.DataFrame:
Dataframe containing ID, type, Distance
"""
# TODO Speed up or parallelize this computation
df = self.positions[self.positions.Timestep == timepoint]
temp = (
df.loc[df.ID == id, ["x", "y", "z"]].squeeze()
- df.loc[df.ID != id, ["x", "y", "z"]]
) ** 2
dist = pd.Series(np.sqrt(temp.x + temp.y + temp.z), name="Distance")
return pd.concat(
[df.loc[df.ID != id, ["ID", "type"]], dist], axis=1
).reset_index(drop=True)
[docs] def get_neighbors(self, timestep):
"""
Get the nearest neighbor cell distances
Args:
timestep (int):
The timepoint to check the neighbor distances from
Returns:
pd.DataFrame:
Pandas dataframe containing pairwise neighbor distances
"""
df = self.positions
df2 = df[df.Timestep == timestep].set_index(["ID"])
df2.sort_index(inplace=True)
# distances =pdist(df2[['x','y','z']])
pairwise = pd.DataFrame(
squareform(pdist(df2[["x", "y", "z"]])), columns=df2.index, index=df2.index
)
pairwise[pairwise == 0] = np.nan
return pairwise
[docs] def get_mothers__old(self):
"""
Assign mother cells based on initial cells in the simulation.
Returns:
pandas.DataFrame:
Dataframe containing ID, type, position, radius, and mother_cell
"""
df = self.positions
df["mother_cell"] = -1
for ID in df.loc[df.Timestep == 0, "ID"].unique():
idx = df[df["ID"] == ID].index
df.loc[idx, "mother_cell"] = ID
for time in tqdm(
sorted(df[df.Timestep != 0].Timestep.unique()), desc="Assigning ancestry"
):
for type_ in df.type.unique():
ancestors = df[
(df.type == type_) & (df.Timestep == time) & (df.mother_cell != -1)
]
arr1 = ancestors[["x", "y", "z"]].to_numpy()
tree1 = KDTree(arr1)
motherless = df[
(df.type == type_) & (df.Timestep == time) & (df.mother_cell == -1)
]
if not motherless.empty:
d, i = tree1.query(motherless[["x", "y", "z"]].to_numpy(), k=1)
idx1 = motherless.index
a = ancestors.iloc[i, :].mother_cell.values
df.loc[idx1, "mother_cell"] = a
self.colonies = df
[docs] def get_mothers(self):
"""
Assign mother cells based on initial cells in the simulation.
Returns:
pandas.DataFrame:
Dataframe containing Timestep, ID, type, position, radius, biomass, total biomass, and mother_cell
"""
df = self.positions.copy()
df["mother_cell"] = -1
df.loc[df.Timestep == 0, "mother_cell"] = df.loc[df.Timestep == 0, "ID"]
ancestry_df = df.loc[df.Timestep == 0, ["ID", "mother_cell"]]
type_ = 1
for time in tqdm(
sorted(df[df.Timestep != 0].Timestep.unique()), desc="Assigning ancestry"
):
for type_ in df.type.unique():
temp = df.loc[
(df.type == type_) & (df.Timestep == time), ["ID", "x", "y", "z"]
]
ancestors = temp.join(
ancestry_df.set_index(["ID"]),
on="ID",
how="inner",
lsuffix="_left",
rsuffix="_right",
)
arr = ancestors[["x", "y", "z"]].to_numpy()
tree = KDTree(arr)
motherless = (
pd.merge(temp, ancestors, on="ID", how="left", indicator=True)
.query('_merge == "left_only"')
.drop("_merge", 1)
.drop("x_y", 1)
.iloc[:, :4]
)
if not motherless.empty:
d, i = tree.query(motherless[["x_x", "y_x", "z_x"]].to_numpy(), k=1)
motherless.loc[:, "mother_cell"] = ancestors.iloc[i, 4].to_numpy()
ancestry_df = pd.concat(
[ancestry_df, motherless.loc[:, ["ID", "mother_cell"]]],
ignore_index=True,
)
df = df.join(
ancestry_df.set_index(["ID"]),
on="ID",
how="right",
lsuffix="_left",
rsuffix="",
).drop("mother_cell_left", 1)
df["total_biomass"] = df.groupby(["mother_cell", "Timestep"]).cumsum()[
"biomass"
]
df["total_biomass2"] = (
df.loc[df.Timestep == df.Timestep.iloc[-1]]
.groupby("mother_cell")
.sum()
.reset_index()["biomass"]
)
self.colonies = df
[docs] def count_colony_area(self, timestep):
"""
Count the 2d area in pixel dimensions of each colony at a given timestep.
Args:
timestep (int):
Timestep to count
"""
if not hasattr(self, "colonies"):
self.get_mothers()
df = self.colonies
else:
df = self.colonies
tp = df[df.Timestep == timestep]
img_size = 2000
bk = 255 * np.ones(shape=[img_size, img_size, 3], dtype=np.uint8)
circles = [
cv2.circle(
bk,
center=(
round(x / self.metadata["Dimensions"][0] * img_size),
round(y / self.metadata["Dimensions"][1] * img_size),
),
radius=round(radius / self.metadata["Dimensions"][1] * img_size),
color=(cell, 0, 0),
thickness=-1,
)
for x, y, radius, cell in zip(tp.x, tp.y, tp.radius, tp.mother_cell)
]
cols, counts = np.unique(bk[:, :, 0], return_counts=1)
for colony, area in zip(cols[:-1], counts[:-1]):
idx = df[(df.mother_cell == int(colony)) & (df.Timestep == timestep)].index
self.colonies.loc[idx, "Colony Area"] = area
[docs] def get_colony_areas(self):
"""Count colony areas for all timesteps
"""
if not hasattr(self, "colonies"):
self.get_mothers()
df = self.colonies
else:
df = self.colonies
for time in tqdm(df.Timestep.unique(), desc="Counting colony areas"):
self.count_colony_area(time)
[docs] def get_nutrient_grid(self, h5):
# TODO make nutrient grid function independent of h5 file
keys = list(h5["concentration"].keys())
timepoints = [k for k in h5["concentration"][keys[0]].keys()]
timepoints.sort(key=int)
stacks = list()
for key in keys:
dfs = list()
for time in timepoints:
dfs.append(h5["concentration"][key][time])
stacks.append(np.stack(dfs))
grid = np.stack(stacks, axis=1)
self.grid = grid
return
[docs] def get_local_con(self, timestep, cellID):
"""
Get the local nutrient concentration of a cell
Args:
timestep (int):
The timestep at which to check the concentration
cellID (int):
The cell identification number
Returns:
Nutrient Concentration (float):
The concentration of the specified nutrient within the cell's grid
"""
cell_locs = self.positions
grid = [
np.linspace(0, self.metadata["Dimensions"][x], self.dims[x])
for x in range(3)
]
grid_loc = [
get_grid_idx(grid[i], cell_locs[cell_locs.ID == cellID][d].values[0])
for i, d in enumerate(["x", "y", "z"])
]
return self.grid[timestep, :, grid_loc[2], grid_loc[0], grid_loc[1]]
[docs] def get_fitness(self, timestep, cellID):
"""
Get the fitness of an individual cell based on the relative Monod growth rate at a given timestep
Args:
timestep (int):
The timestep at which to check the concentration
cellID (int):
The cell identification number
Returns:
float:
The Monod growth rate (1/s)
"""
# TODO Speed up or parallelize this computation
df = self.positions
cell_type = df[(df.Timestep == timestep) & (df.ID == cellID)].type.values[0]
if df[(df.Timestep == timestep) & (df.ID == cellID)].empty:
print("Timestep or cell ID not found")
return
concentrations = self.get_local_con(
list(df.Timestep.unique()).index(timestep), cellID
)
if cell_type == 1:
metadata = self.metadata["cyano"]
light = concentrations[self.nutrients.index("sub")]
co2 = concentrations[self.nutrients.index("co2")]
fitness = (
metadata["GrowthRate"]
* (light / (metadata["K_s"]["sub"] + light))
* (co2 / (metadata["K_s"]["co2"] + co2))
)
return fitness
elif cell_type == 2:
metadata = self.metadata["ecw"]
suc = concentrations[self.nutrients.index("suc")]
o2 = concentrations[self.nutrients.index("o2")]
maintenance = metadata["GrowthParams"]["Maintenance"] * (
o2 / (metadata["K_s"]["o2"] + o2)
)
decay = metadata["GrowthParams"]["Decay"]
fitness = (
metadata["GrowthRate"]
* (suc / (metadata["K_s"]["suc"] + suc))
* (o2 / (metadata["K_s"]["o2"] + o2))
)
return fitness - maintenance - decay
[docs] def collect_fitness(self):
df = self.positions
fitness = pd.DataFrame(columns=["Time", "ID", "Fitness"])
for time in tqdm(self.Timesteps):
for cell in df[(df.Timestep == time)].ID:
fitness = fitness.append(
pd.DataFrame(
[[time, cell, self.get_fitness(time, cell)]],
columns=["Time", "ID", "Fitness"],
),
ignore_index=True,
)
self.fitness = fitness
[docs]def get_grid_idx(array, value):
"""
Find the nutrient grid index value. Taken from https://stackoverflow.com/questions/2566412/find-nearest-value-in-numpy-array.
Args:
array (numpy.array):
1D Array containing the grid positions
value (float):
Cell location to map to the grid
Returns:
index (int):
Grid index
"""
n = len(array)
jl = 0 # Initialize lower
ju = n - 1 # and upper limits.
while ju - jl > 1: # If we are not yet done,
jm = (ju + jl) >> 1 # compute a midpoint with a bitshift
if value >= array[jm]:
jl = jm # and replace either the lower limit
else:
ju = jm # or the upper limit, as appropriate.
# Repeat until the test condition is satisfied.
if value == array[0]: # edge cases at bottom
return 0
elif value == array[n - 1]: # and top
return n - 1
else:
return jl
[docs]def download_test_data(urls=urls):
"""
Get an example dataset from the Github repo. Downloads to "home/.nufeb_tools/data"
Args:
urls (List(str))
"""
# nufeb_tools directory
cp_dir = Path.home().joinpath(".nufeb_tools")
cp_dir.mkdir(exist_ok=True)
data_dir = cp_dir.joinpath("data")
data_dir.mkdir(exist_ok=True)
# TODO Add progress bar
for url in urls:
parts = urlparse(url)
filename = os.path.basename(parts.path)
cached_file = os.path.join(data_dir, filename)
if not os.path.exists(cached_file):
local_filename, headers = urlretrieve(url, cached_file)
tar = tarfile.open(local_filename, "r")
tar.extractall(path=data_dir)
tar.close()
Path(local_filename).unlink()