'''
To help analyze the motifs found using the dotmotif package
from a connectome dataset
'''
import networkx as nx
import pandas as pd
import re
import time
from datasci_tools import numpy_dep as np
from datasci_tools import module_utils as modu
from . import microns_volume_utils as mvu
from . import h01_volume_utils as hvu
motif_key = "motif"
identifier_name_global = "identifier"
edge_pattern = r"([A-Z])[ ]*->[ ]*([A-Z])"
min_gnn_probability_global = 0.6
cell_type_fine_exclude_global = ['SST','NGC',]
[docs]def edges_from_str(
string,
verbose = False,
return_edge_str = False
):
pattern = re.compile(edge_pattern)
s_find = [k for k in pattern.finditer(string)]
if return_edge_str:
from datasci_tools import regex_utils as reu
edges = [reu.substr_from_match_obj(k) for k in s_find]
else:
edges = [(g.groups()[0],
g.groups()[1])
for g in s_find]
if verbose:
pritn(f"edges = {edges}")
return edges
[docs]def nodes_from_str(string):
edges_str = mfu.edges_from_str(string,return_edge_str = False)
return list(np.unique(np.hstack(edges_str)))
[docs]def motif_nodes_from_motif(
motif,
only_upper = True,
verbose = False,
return_n_nodes = False,
):
"""
Purpose: Determine the number of nodes (and what their names are )
from a motif string
Pseudocode:
1) Look for all upper case letters where there is other
words before or after
2) Order the pairs found
3) Can return the length of the dictionary or just the number
"""
s = motif
if only_upper:
pattern = re.compile(r"\W*([A-Z])\W*")
else:
pattern = re.compile(r"\W*([A-Za-z])\W*")
s_find = pattern.finditer(s)
s_find = [k.groups()[0] for k in s_find]
found_letters = np.sort(np.unique(s_find))
if verbose:
print(f"found_letters = {found_letters}")
if return_n_nodes:
return len(found_letters)
else:
return {i:k for i,k in enumerate(found_letters)}
[docs]def n_nodes_from_motif(
motif,
only_upper = True,
verbose = False,
):
return mfu.motif_nodes_from_motif(
motif,
only_upper = only_upper,
verbose = verbose,
return_n_nodes = True,
)
[docs]def nodes_mapping_from_G(
G,
):
"""
Purpose: Get the node mapping
"""
identifiers = []
names = []
for n in G.nodes():
names.append(n)
identifiers.append(G.nodes[n][identifier_name_global])
arg_idx = np.argsort(identifiers)
mapping = dict([(identifiers[k],
names[k]) for k in arg_idx])
return mapping
[docs]def nodes_from_motif_dict(
motif_dict,
return_dict = True,
verbose = False):
"""
Purpose: To extract the node names
from the motif dict
Pseudocode:
1) get all of the keys with segment id in them
2) sort them
3) iterate and get the segment id and split index and put into dict
"""
seg_names = [k for k in motif_dict if k[2:] == "segment_id"]
r_dict = dict()
if len(seg_names) > 0:
# if verbose:
# print(f"segment_id names = {seg_names}")
for s_id in seg_names:
letter = s_id[0]
node_name = (f"{motif_dict[f'{letter}_segment_id']}_"
f"{motif_dict[f'{letter}_split_index']}"
)
r_dict[letter.upper()] = node_name
else:
seg_names = [k for k in motif_dict if k[2:] == "name"]
for s_id in seg_names:
letter = s_id[0]
node_name = motif_dict[s_id]
r_dict[letter.upper()] = node_name
if verbose:
print(f"Node names = {r_dict}")
if return_dict:
return r_dict
else:
return list(r_dict.values())
[docs]def edges_from_motif_dict(
motif_dict,
return_dict = False,
return_node_mapping = False,
verbose = True,
):
"""
Purpose: To get a list of the edges represented by the motif
Pseudocode:
1) Get a mapping of the nodes
2) Query the dotmotif for the edge definitions
3) For each of the groups found substitute in the node name
Ex:
from datasci_tools import networkx_utils as xu
import networkx as nx
G = vdi.G_auto_DiGraph
motif_info = motif_dicts[20000]
edges = mfu.edges_from_motif_dict(
motif_info,
return_dict=False,
verbose = True,)
sub_G = xu.subgraph_from_edges(G,edges)
nx.draw(sub_G,with_labels = True)
motif_nodes_from_motif
"""
node_mapping = mfu.nodes_from_motif_dict(motif_dict)
s = motif_dict[motif_key]
if verbose:
print(f"node_mapping = {node_mapping}")
edges_identifiers = mfu.edges_from_str(s)
if verbose:
print(f"# of edges found = {len(edges_identifiers)}")
edges = [(node_mapping[g[0]],
node_mapping[g[1]])
for g in edges_identifiers]
if verbose:
print(f"Edges = {edges}")
if return_dict:
from datasci_tools import regex_utils as reu
s_find = mfu.edges_from_str(s,return_edge_str=True)
# return_dict = {reu.substr_from_match_obj(k):v for k,v in
# zip(s_find,edges)}
return_dict = {k:v for k,v in
zip(s_find,edges)}
return_value= return_dict
else:
return_value= edges
if return_node_mapping:
return return_value,node_mapping
else:
return return_value
[docs]def subgraph_from_motif_dict(
G,
motif_dict,
verbose = False,
identifier_name = None,
plot = False,
):
if identifier_name is None:
identifier_name = identifier_name_global
if verbose:
print(f"motif_dict = {motif_dict}")
edges,node_mapping = mfu.edges_from_motif_dict(
motif_dict,
return_dict=False,
verbose = verbose,
return_node_mapping=True)
sub_G = xu.subgraph_from_edges(G,edges)
for ident,node_name in node_mapping.items():
sub_G.nodes[node_name]["identifier"] = ident
if plot:
nx.draw(sub_G,with_labels = True)
return sub_G
[docs]def motif_segment_df_from_motifs(
motifs,
return_df = True,
motif = None,
graph_type = "DiGraph"
):
"""
Purpose: Turn the motif results
(where motif results are in the form of
a dictionary A:"segment_split",B:"segment_splits")
into a dataframe or dictionaries
and returns dictionary or dataframe we
have keys like a_segment_id,a_split_index,b_segment_id....
"""
keys_to_write = []
for j,m in enumerate(motifs):
local_keys = {}
for k,v in m.items():
segment_id,split_index = vdi.segment_id_and_split_index(v)
local_keys.update({f"{k.lower()}_segment_id":segment_id})
local_keys.update({f"{k.lower()}_split_index":split_index})
if motif is not None:
local_keys["motif"] = motif
if graph_type is not None:
local_keys["graph_type"] = graph_type
keys_to_write.append(local_keys)
if return_df:
return pd.DataFrame.from_records(keys_to_write)
return keys_to_write
[docs]def motif_data(
G,
motif_dict,
# for edge attributes
cell_type_kind = "gnn_cell_type_fine",
include_layer = True,
include_visual_area = True,
include_node_identifier = True,
# for edge attrbutes
include_edges_in_name = True,
include_compartment = True,
edge_attributes = ("presyn_soma_postsyn_soma_euclid_dist",
"presyn_soma_postsyn_soma_skeletal_dist",
"presyn_skeletal_distance_to_soma",
"presyn_soma_euclid_dist",
"postsyn_skeletal_distance_to_soma",
"postsyn_soma_euclid_dist",
"synapse_id",
),
node_attributes = ("skeletal_length",
"external_manual_proofread",
"gnn_cell_type_fine_prob",
"gnn_cell_type"),
node_attributes_additional = None,
return_str = False,
verbose = True,
):
"""
Purpose: Convert a graph into a string representation
to be indexed (used as an identifier)
2 possible representations:
1) list all cell types, then all downstream compartmnets
2) List presyn_cell_type, downstream cell type, compartment
Pseudocode:
1) Get node mapping and presyns associated
2) Get all of the edges in the graph
3) Construct a list of a identifier, identifier_2, compartment
4) Make name cell type(id), cell type 2 (id2)....: id1id2(comp)....
"""
#print(f"node_attributes before = {node_attributes}")
if node_attributes_additional is not None:
node_attributes = list(np.union1d(node_attributes,node_attributes_additional))
#print(f"node_attributes after = {node_attributes}")
st = time.time()
motif_data_dict = dict()
edges,node_mapping = mfu.edges_from_motif_dict(
motif_dict,
verbose = verbose,
return_dict = True,
return_node_mapping = True)
#ct_mapping = dict()
cell_type_str = ""
for j,(ident,node) in enumerate(node_mapping.items()):
node_dict = xu.get_node_attribute_dict(G,node)
ct = node_dict[cell_type_kind]
cell_type_str += f"{ct}"
motif_data_dict[f"{ident}_name"] = node
motif_data_dict[f"{ident}_gnn_cell_type_fine"] = node_dict["gnn_cell_type_fine"]
motif_data_dict[f"{ident}_cell_type"] = node_dict["cell_type"]
motif_data_dict[f"{ident}_layer"] = node_dict['external_layer']
motif_data_dict[f"{ident}_area"] = node_dict['external_visual_area']
for k in node_attributes:
motif_data_dict[f"{ident}_{k}"] = node_dict[k]
if include_layer:
cell_type_str += f"/{node_dict['external_layer']}"
if include_visual_area:
cell_type_str += f"/{node_dict['external_visual_area']}"
if include_node_identifier:
cell_type_str += f"({ident})"
if j != len(node_mapping)-1:
cell_type_str += ", "
if verbose:
print(f"cell_type_str = {cell_type_str}")
if include_edges_in_name:
cell_type_str += " : "
for j,(edge_str,node_pair) in enumerate(edges.items()):
if include_edges_in_name:
cell_type_str += f"{edge_str}"
edge_dict = G[node_pair[0]][node_pair[1]]
compartment = edge_dict["postsyn_compartment_fine"]
if not type(compartment) == str:
compartment = edge_dict["postsyn_compartment_coarse"]
motif_data_dict[f"{edge_str}_postsyn_compartment"] = compartment
if edge_attributes is not None:
for ea in edge_attributes:
motif_data_dict[f"{edge_str}_{ea}"] = edge_dict[ea]
if include_edges_in_name and include_compartment:
cell_type_str += f"({compartment})"
if j != len(edges) - 1:
cell_type_str += f", "
if verbose:
print(f"cell_type_str (AFTER EDGES) = {cell_type_str}")
if verbose:
print(f"Total time = {time.time() - st}")
motif_data_dict["motif_str"] = cell_type_str
#copying old attributes over:
for k in ["motif","graph_type"]:
motif_data_dict[k] = motif_dict[k]
if return_str:
return cell_type_str
else:
return motif_data_dict
[docs]def filter_G_attributes(
G,
node_attributes = (
"gnn_cell_type_fine",
"cell_type",
"external_layer",
"external_visual_area",
"manual_cell_type_fine",
"identifier",
),
edge_attributes = (
"postsyn_compartment_coarse",
"postsyn_compartment_fine",
"presyn_skeletal_distance_to_soma",
"postsyn_skeletal_distance_to_soma",
),
):
# filter the node attributes
sub_G = xu.filter_down_node_attributes(
G,
attributes = node_attributes,
)
# filter the edge attributes
sub_G = xu.filter_down_edge_attributes(
sub_G,
attributes = edge_attributes,
)
return sub_G
[docs]def motif_G(
G,
motif_dict,
plot = False,
verbose = False,
**kwargs
):
"""
Purpose: To form a graph data structure
representing the motif
Pseudocode:
1) Restrict the graph to a subgraph based on the motif
2) Filter the node attributes and edge attributes to only
those specified
Ex:
curr_G = motif_G(
G,
motif_info,
plot = True)
"""
sub_G = mfu.subgraph_from_motif_dict(
G,motif_dict,
verbose=verbose,
plot=plot)
sub_G = mfu.filter_G_attributes(
sub_G,
**kwargs
)
if verbose:
print(f"Setting graph attributes")
xu.set_graph_attr(
sub_G,
"motif",
motif_dict["motif"],
)
try:
mfu.set_compartment_flat(sub_G)
except:
pass
return sub_G
[docs]def node_attributes_from_G(
G,
features = None,
features_to_ignore = None,
features_order = (
"gnn_cell_type_fine",
"external_layer",
"external_visual_area",
),
):
if features_to_ignore is None:
features_to_ignore = (
xu.upstream_name,
identifier_name_global
)
if features_order is None:
features_order = []
node_df = xu.node_df(G)
features_list = list(node_df.columns)
if (features_to_ignore is not None
and len(features_to_ignore) > 0):
features_list = np.setdiff1d(features_list,
features_to_ignore)
priority_features = []
for f in features_order:
if f in features_list:
priority_features.append(f)
non_priority_features = list(np.setdiff1d(
features_list,
priority_features))
final_features = priority_features + non_priority_features
if features is not None:
final_features = [k for k in final_features if k in features]
return final_features
[docs]def set_compartment_flat(G):
def comp_flat(key):
if not type(key["postsyn_compartment_fine"]) == str:
return key["postsyn_compartment_coarse"]
else:
return key["postsyn_compartment_fine"]
xu.derived_edge_attribute_from_func(
G,
"postsyn_compartment",
comp_flat
)
return G
# ----------- conversions to str --------------
[docs]def str_from_G_motif(
G,
node_attributes = None,
edge_attributes = ("postsyn_compartment_flat",),
verbose = False,
joining_str = "/",
include_edges_in_name = True
):
"""
Purpose: To convert a graph to a string representation
to be used as an identifier
Pseudocode:
1) Gather the node attributes for each of the
nodes (order by identifier and order the attributes)
2) Gather the edge attributes
Ex:
mfu.set_compartment_flat(curr_G)
mfu.str_from_G_motif(
curr_G,
node_attributes = ("gnn_cell_type_fine",),
edge_attributes=["postsyn_compartment_flat",],
)
"""
node_mapping = mfu.nodes_mapping_from_G(G)
if node_attributes is None:
node_attributes = mfu.node_attributes_from_G(G)
if verbose:
print(f"node_attributes = {node_attributes}")
G_str = ""
for j,(ident,name) in enumerate(node_mapping.items()):
attributes = [str(G.nodes[name][f]) for f in node_attributes]
attr_str = joining_str.join(attributes)
G_str += f"{attr_str} ({ident})"
if j != len(node_mapping) - 1:
G_str += ", "
"""
want to add the edge attributes to the motif
1)
pseudocode:
1)
"""
if include_edges_in_name:
edges_str = " : "
if edge_attributes is None:
edge_attributes = []
curr_motif = xu.get_graph_attr(G,"motif")
curr_edges = mfu.edges_from_str(curr_motif)
for j,(id1,id2) in enumerate(curr_edges):
edges_str += f"{id1}->{id2}"
if len(edge_attributes) > 0:
edges_str += f"({', '.join([str(G[node_mapping[id1]][node_mapping[id2]][f]) for f in edge_attributes])})"
if j != len(curr_edges) - 1:
edges_str += ", "
if verbose:
print(f"edges_str = {edges_str}")
G_str += edges_str
if verbose:
print(f"G_str = {G_str}")
return G_str
[docs]def dotmotif_str_from_G_motif(
G,
node_attributes = None,
edge_attributes = ("postsyn_compartment",),
verbose = False,
):
"""
Purpose: To convert a graph to a string representation
to be used as an identifier
Pseudocode:
1) Gather the node attributes for each of the
nodes (order by identifier and order the attributes)
2) Gather the edge attributes
Ex:
mfu.set_compartment_flat(curr_G)
mfu.str_from_G_motif(
curr_G,
node_attributes = ("gnn_cell_type_fine",),
edge_attributes=["postsyn_compartment_flat",],
)
Ex:
dotmotif_str_from_G_motif(
curr_G,
node_attributes = ("gnn_cell_type_fine",))
"""
G_str = ""
node_mapping = mfu.nodes_mapping_from_G(G)
if edge_attributes is None:
edge_attributes = []
curr_motif = xu.get_graph_attr(G,"motif")
curr_edges = mfu.edges_from_str(curr_motif)
edges_str = ""
for j,(id1,id2) in enumerate(curr_edges):
edges_str += f"{id1}->{id2}"
if len(edge_attributes) > 0:
edges_str_curr = ', '.join([f"{f} = {G[node_mapping[id1]][node_mapping[id2]][f]}" for f in edge_attributes])
edges_str += f"[{edges_str_curr}]"
if j != len(curr_edges) - 1:
edges_str += ";\n"
if verbose:
print(f"edges_str = {edges_str}")
G_str += edges_str
if node_attributes is None:
node_attributes = mfu.node_attributes_from_G(G)
if verbose:
print(f"node_attributes for default = {node_attributes}")
G_str += "\n"
for j,(ident,name) in enumerate(node_mapping.items()):
for f in node_attributes:
try:
att_value = str(G.nodes[name][f])
except:
continue
G_str += f"{ident}.{f} = {att_value}\n"
if verbose:
print(f"\n---Final Dotmotif str:--- \n{G_str}")
return G_str
[docs]def node_attributes_strs(
G,
joining_str = "/",
node_attributes= None,
verbose = False,
):
"""
Purpose: To get a list of strings
representing the node attributes
(that could then be used as a set for comparisons)
Pseudocode:
1) Get the node attributes you want to output
"""
st = time.time()
if node_attributes is None:
node_attributes = mfu.node_attributes_from_G(G)
if verbose:
print(f"node_attributes = {node_attributes}")
total_nodes = []
for j,name in enumerate(G.nodes()):
attributes = []
for f in node_attributes:
try:
curr_attr = str(G.nodes[name][f])
attributes.append(curr_attr)
except:
continue
curr_str = joining_str.join(attributes)
if verbose:
print(f"Node {j}: {curr_str}")
total_nodes.append(curr_str)
if verbose:
print(f"Total time for node attributes strs: {time.time() - st}")
return total_nodes
#edges_str = xu.get_graph_attr(curr_G,"motif")
[docs]def motif_column_mapping(
df,
mapping):
"""
Purpose: Want to rename certain columns
to different characters so everything matches
Columns want to rename are very constrained:
option 1:
[name]_....
[name]->[name]....
Pseudocode:
"""
column_mapping = dict()
for k in df.columns:
if "_" == k[1] and k[0] in list(mapping.keys()):
column_mapping[k] = f"{mapping[k[0]]}{k[1:]}"
elif (("->" == k[1:3])
and (k[0] in list(mapping.keys()))
and (k[3] in list(mapping.keys()))
):
column_mapping[k] = f"{mapping[k[0]]}->{mapping[k[3]]}{k[4:]}"
else:
pass
return column_mapping
[docs]def unique_motif_reduction(
G,
df,
column = "motif_str",
node_attributes = None,
edge_attributes=None,
#new_column = None,
verbose = False,
debug_time = False,
relabel_columns = True,
):
"""
Pseudocode:
1) Create a dictionary mapping the non-redundant str to dotmotif
2) Find all unique str options
3) For each str option:
a. Find one occurance of str
b. conert it to a graph object
c. Iterate through all non-reundance keys and do dot motif search
i) if not found --> continue down list
ii) if found (make this the non-redundant name and add to dict)
4) Use non redundant dict to create new columns
5) Find the count of all non-redundanct and sort from greatest to least
6) plot the first x number of motifs
"""
#if new_column is None:
new_column = f"{column}_unique"
motif_cell_type_df = df
#motif_str_unique = motif_cell_type_df["motif_str"].unique()
first_inst_df = pu.filter_to_first_instance_of_unique_column(
motif_cell_type_df,
"motif_str"
).reset_index()
if verbose:
print(f"# of unique motif str (including redundancy) = {len(first_inst_df)}")
global_time = time.time()
unique_map = dict()
unique_dotmotif_map = dict()
for j,m in tqdm(enumerate(pu.df_to_dicts(first_inst_df))):
#b. convert it to a graph object
curr_motif_str = m["motif_str"]
# if verbose:
# print(f"\n\n--- Working on {j} motif: {curr_motif_str}-----")
if debug_time:
st = time.time()
curr_motif_G = mfu.motif_G(
G,
m,
plot = False,
)
nodes_mapping = mfu.nodes_mapping_from_G(curr_motif_G)
reverse_mapping = {v:k for k,v in mfu.nodes_mapping_from_G(curr_motif_G).items()}
curr_node_strs = set(mfu.node_attributes_strs(
curr_motif_G,
verbose = False,
node_attributes = node_attributes,
))
if debug_time:
print(f"motif_G generation: {time.time() - st}")
st = time.time()
found = False
for k,data_dict in unique_dotmotif_map.items():
node_strs = data_dict["node_strs"]
if node_strs != curr_node_strs:
continue
dotmotif_str = data_dict["dotmotif"]
matches = dmu.graph_matches(
curr_motif_G,
dotmotif_str,
convert_characters = True)
if debug_time:
print(f"n_graph_matches: {time.time() - st}")
st = time.time()
if len(matches) >= 1:
if verbose:
print(f"{curr_motif_str}\n matched to \n{k}")
found = True
unique_map[curr_motif_str] = dict(
match = k,
mapping = {reverse_mapping[v]:k for k,v in matches[0].items()})
break
# c) If no match was found in the unique str
if not found:
dotmotif_str = mfu.dotmotif_str_from_G_motif(
curr_motif_G,
edge_attributes=edge_attributes,
node_attributes=node_attributes,
)
if debug_time:
print(f"dotmotif_str_from_G_motif: {time.time() - st}")
st = time.time()
# if verbose:
# print(f"Adding {curr_motif_str} to non redundant list")
unique_dotmotif_map[curr_motif_str] = dict(
dotmotif=dotmotif_str,
node_strs = curr_node_strs)
unique_map[curr_motif_str] = dict(
match = curr_motif_str,
mapping = {k:k for k in nodes_mapping})
if not relabel_columns:
motif_cell_type_df[new_column] = pu.new_column_from_dict_mapping(
motif_cell_type_df,
{k:v["match"] for k,v in unique_map.items()},
column_name=column
)
else:
motif_cell_type_df_list = []
for k,v_data in unique_map.items():
curr_df = motif_cell_type_df.query(f"{column} == '{k}'")
#raise Exception("")
motif_map = motif_column_mapping(curr_df,mapping = v_data["mapping"])
curr_df = pu.rename_columns(curr_df,motif_map)
curr_df[new_column] = v_data["match"]
motif_cell_type_df_list.append(curr_df)
if len(curr_df) > 1:
pass
#break
#raise Exception("")
motif_cell_type_df = pu.concat(motif_cell_type_df_list)
if verbose:
print(f"Total time for reduction = {time.time() - global_time}")
motif_cell_type_df = pu.delete_columns(motif_cell_type_df,column)
motif_cell_type_df = pu.rename_columns(motif_cell_type_df,{new_column:column})
return motif_cell_type_df
[docs]def motif_dicts_from_motif_from_database(
motif,
):
motif_table = vdi.motif_table_from_motif(motif)
motif_table_df = vdi.df_from_table(motif_table)
motif_dicts = pu.df_to_dicts(motif_table_df)
return motif_dicts
[docs]def annotated_motif_df(
G,
motif,
node_attributes = (
"external_layer",
"external_visual_area",
"gnn_cell_type_fine",
"gnn_cell_type_fine_prob",
"gnn_cell_type",
"skeletal_length"
),
edge_attributes = (
"postsyn_compartment",
),
n_samples = None,
verbose = False,
filter_df = True,
motif_reduction = True,
add_counts = True,
motif_dicts= None,
matches = None,
additional_node_attributes = None,
):
"""
Purpose: To add all of the features to the motifs
Ex:
from neurd import motif_utils as mfu
G = vdi.G_auto_DiGraph
mfu.annotated_motif_df(
motif = "A->B;B->A",
G = vdi.G_auto_DiGraph,
n_samples = None,
verbose = False
)
"""
if additional_node_attributes is not None:
node_attributes = list(node_attributes) + nu.array_like(additional_node_attributes)
global_time = time.time()
if matches is not None:
motif_dicts = motif_segment_df_from_motifs(
matches,
motif=motif,
return_df=False
)
if motif_dicts is None:
motif_dicts = motif_dicts_from_motif_from_database(motif)
# motif_table = vdi.motif_table_from_motif(motif)
# motif_table_df = vdi.df_from_table(motif_table)
# motif_dicts = pu.df_to_dicts(motif_table_df)
# --- creating the data dicts ----
idx = np.arange(len(motif_dicts))
np.random.seed(1000)
np.random.shuffle(idx)
column = "motif_str"
if n_samples is None:
n = n_samples
else:
n = np.min([n_samples,len(motif_dicts)])
cell_type_list = []
for i in tqdm(idx[:n]):
cell_type_list.append(mfu.motif_data(
G,
motif_dict=motif_dicts[i],
# for edge attributes
include_layer = "external_layer" in node_attributes,
include_visual_area = "external_visual_area" in node_attributes,
include_node_identifier = True,
# for edge attrbutes
include_edges_in_name = True,
include_compartment = "postsyn_compartment" in edge_attributes,
return_str = False,
verbose = verbose,
node_attributes_additional = node_attributes,
)
)
motif_cell_type_df = pd.DataFrame.from_records(cell_type_list)
if motif_reduction:
unique_df = mfu.unique_motif_reduction(
G,
motif_cell_type_df,
node_attributes=node_attributes,
edge_attributes=edge_attributes,
column = column,
verbose = verbose,
debug_time = False,
relabel_columns = True
)
else:
unique_df = motif_cell_type_df
sorting_columns = [column]
if add_counts:
count_column_name = "n_motifs"
unique_df = pu.unique_row_counts(
df = unique_df,
columns = column,
count_column_name = count_column_name,
add_to_df = True,
verbose = False,
)
sorting_columns = [count_column_name] + sorting_columns
unique_df = pu.sort_df_by_column(
unique_df,
sorting_columns,
ascending = False,
)
if verbose:
print(f"Total time for annotated df = {time.time() - global_time}")
if filter_df:
unique_df = mfu.filter_motif_df(
unique_df,
verbose = verbose
)
return unique_df
[docs]def query_with_edge_col(
df,
query,
edge_delimiter = "->"
):
"""
Purpose: To do an edge query that will
1) Rename the column values
2) Rename the query
so that it is valid with pandas querying
"""
edge_delimiter = "->"
edge_delimiter_new = "arrow"
replace_dict = {edge_delimiter:edge_delimiter_new}
query_new = reu.multiple_replace(query,replace_dict)
new_column_dict = {k:reu.multiple_replace(k,replace_dict) for k in df.columns}
new_column_dict_reverse = {v:k for k,v in new_column_dict.items()}
df_new = pu.rename_columns(df,new_column_dict)
df_filt = df_new.query(query_new)
df_filt = pu.rename_columns(df_filt,new_column_dict_reverse)
return df_filt
[docs]def filter_motif_df(
df,
node_filters = None,
min_gnn_probability = None, #gives about a 90% on inhibitory cells
edges_filters = None,
single_edge_motif = False,
cell_type_fine_exclude = None,
verbose = False,
):
"""
Purpose: To restrict a motif with node
and edge requirements
Ex:
from neurd import motif_utils as mfu
G = vdi.G_auto_DiGraph
unique_df = mfu.annotated_motif_df(
motif = "A->B;B->A",
G = vdi.G_auto_DiGraph,
n_samples = None,
verbose = False
)
mfu.filter_motif_df(
unique_df,
min_gnn_probability = 0.5,
edges_filters = [
"edge_postsyn_compartment == 'soma'",
]
)
"""
if min_gnn_probability is None:
min_gnn_probability = min_gnn_probability_global
if cell_type_fine_exclude is None:
cell_type_fine_exclude = cell_type_fine_exclude_global
if verbose:
print(f"***Filtering motif df***")
if node_filters is None:
node_filters = [
"node_gnn_cell_type_fine == node_gnn_cell_type_fine",
"node_gnn_cell_type_fine != 'None'",
f"node_gnn_cell_type_fine_prob > {min_gnn_probability}",
f"node_gnn_cell_type_fine not in {cell_type_fine_exclude}",
f"node_cell_type == node_gnn_cell_type",
]
# if single_edge_motif:
# node_filters = [f"node_{k}" for k in node_filters]
# node_filters[0] = "node_gnn_cell_type_fine == node_gnn_cell_type_fine"
if edges_filters is None:
edges_filters = [
#"edge_postsyn_compartment == 'soma'",
]
if not single_edge_motif:
curr_str = df.iloc[0,:]["motif_str"]
edges_str = mfu.edges_from_str(curr_str,return_edge_str = True)
nodes_str = mfu.nodes_from_str(curr_str)
#print('inside motif str')
else:
nodes_str = ["presyn","postsyn"]
query_str = []
for nf in node_filters:
if "node" in nf:
curr_query_str = " and ".join([f"( {reu.multiple_replace(nf,dict(node=k))} )"
for k in nodes_str])
else:
curr_query_str = nf
curr_query_str = f"({curr_query_str})"
query_str.append(curr_query_str)
for ef in edges_filters:
if "edge" in ef:
curr_query_str = " and ".join([f"( {reu.multiple_replace(ef,dict(edge=k))} )"
for k in edges_str])
else:
curr_query_str = ef
curr_query_str = f"({curr_query_str})"
query_str.append(curr_query_str)
if verbose:
print(f"query_str =")
for k in query_str:
print(f" {k}")
total_query = " and ".join(query_str)
filt_df = mfu.query_with_edge_col(df,total_query)
return filt_df
[docs]def counts_df_from_motif_df(
motif_df,
motif_column = "motif_str"):
motif_counts = pu.filter_to_first_instance_of_unique_column(
motif_df,
motif_column
)
motif_counts = pu.sort_df_by_column(
motif_counts,
["n_motifs","motif_str"])
return motif_counts
[docs]def visualize_graph_connections(
G,
key,
verbose = True,
verbose_visualize = False,
restrict_to_synapse_ids = True,
method="neuroglancer",
**kwargs
):
"""
Purpose: To visualize the motif connection
from an entry in a motif dataframe
Pseudocode:
1) Turn entry into dict if not
2) Get the node names for the motif
3) Get the synapse ids
4) Plot the connections
"""
if type(key) != dict:
key = key.to_dict()
motif_str = key["motif_str"]
if verbose:
print(f"motif_str = {motif_str}")
#2) Get the node names for the motif
node_names = [key[f'{k}_name']
for k in mfu.nodes_from_str(motif_str)]
if verbose:
print(f"node_names= {node_names}")
#3) Get the synapse ids
if restrict_to_synapse_ids:
edges = mfu.edges_from_str(motif_str,return_edge_str=True)
synapse_ids = [key[f"{k}_synapse_id"] for k in edges]
else:
synapse_ids = None
if verbose:
print(f"synapse_ids= {synapse_ids}")
from neurd import connectome_utils as conu
return conu.visualize_graph_connections_by_method(
G,
segment_ids=node_names,
method=method,
verbose = verbose_visualize,
)
# ----------------- Helper functions for 3D analysis ------------- #
# -- default
attributes_dict_default = dict(
#voxel_to_nm_scaling = microns_volume_utils.voxel_to_nm_scaling,
vdi = mvu.data_interface
)
global_parameters_dict_default = dict(
#max_ais_distance_from_soma = 50_000
)
# -- microns
global_parameters_dict_microns = {}
attributes_dict_microns = {}
#-- h01--
attributes_dict_h01 = dict(
#voxel_to_nm_scaling = h01_volume_utils.voxel_to_nm_scaling,
vdi = hvu.data_interface
)
global_parameters_dict_h01 = dict()
# data_type = "default"
# algorithms = None
# modules_to_set = [mfu]
# def set_global_parameters_and_attributes_by_data_type(data_type,
# algorithms_list = None,
# modules = None,
# set_default_first = True,
# verbose=False):
# if modules is None:
# modules = modules_to_set
# modu.set_global_parameters_and_attributes_by_data_type(modules,data_type,
# algorithms=algorithms_list,
# set_default_first = set_default_first,
# verbose = verbose)
# set_global_parameters_and_attributes_by_data_type(data_type,
# algorithms)
# def output_global_parameters_and_attributes_from_current_data_type(
# modules = None,
# algorithms = None,
# verbose = True,
# lowercase = True,
# output_types = ("global_parameters"),
# include_default = True,
# algorithms_only = False,
# **kwargs):
# if modules is None:
# modules = modules_to_set
# return modu.output_global_parameters_and_attributes_from_current_data_type(
# modules,
# algorithms = algorithms,
# verbose = verbose,
# lowercase = lowercase,
# output_types = output_types,
# include_default = include_default,
# algorithms_only = algorithms_only,
# **kwargs,
# )
#--- from neurd_packages ---
from . import h01_volume_utils as hvu
from . import microns_volume_utils as mvu
#--- from datasci_tools ---
from datasci_tools import module_utils as modu
from datasci_tools import networkx_utils as xu
from datasci_tools import numpy_dep as np
from datasci_tools import numpy_utils as nu
from datasci_tools import pandas_utils as pu
from datasci_tools import regex_utils as reu
from datasci_tools.tqdm_utils import tqdm
motif_Gs_for_n_nodes = xu.motif_Gs_for_n_nodes
from . import motif_utils as mfu
from datasci_tools import dotmotif_utils as dmu