import copy
import networkx as nx
from datasci_tools import numpy_dep as np
[docs]def features_list(
G,
limb_branch_features = True,
features_to_ignore = ("u",),
verbose = False):
"""
Purpose: Find all of the current features
Ex:
from neuron_morphology_tools from neurd import neuron_nx_feature_processing as nxf
nxf.features_list(G)
"""
if limb_branch_features:
G = nxu.limb_branch_subgraph(G)
node_df = xu.node_df(G)
current_features = list(node_df.columns)
if features_to_ignore is not None:
current_features = list(np.setdiff1d(current_features,features_to_ignore))
if verbose:
print(f"current_features = {current_features}")
return current_features
[docs]def add_node_feature(
G,
feature_func,
feature_name = None,
nodes=None,
inplace = True,
verbose=False,
default_value = None,
feature_value_dict = None,
verbose_loop = False,
#skip_if_exists = True,
):
"""
Puprose: Will apply an feature_func
to a node based on the current node values
"""
if nodes is None:
nodes = nxu.limb_branch_nodes(G)
if not inplace:
G = copy.deepcopy(G)
feature_func = nu.convert_to_array_like(feature_func)
# if verbose:
# print(f"feature functions")
# print(f"{[k.__name__ for k in feature_func]}")
def default_func(*args,**kwargs):
return default_value
for att_func in feature_func:
if "str" in str(type(att_func)):
try:
att_func = getattr(nxf,att_func)
except:
# feature_name = att_func
# att_func = default_func
continue
if feature_name is None:
curr_name = str(att_func.__name__)
else:
curr_name = feature_name
if verbose:
print(f"\n\n---Setting {curr_name}, att_func ={att_func}")
for n in nodes:
if feature_value_dict is not None:
attr_value = feature_value_dict.get(n,curr_name,default_value)
else:
try:
attr_value = att_func(G.nodes[n])
except:
if default_value is not None:
attr_value = default_value
else:
raise Exception("")
if verbose_loop:
print(f"For node {n}, {curr_name} = {attr_value}")
G.nodes[n][curr_name] = attr_value
return G
[docs]def skeleton_vector_upstream_x(node_dict):
return node_dict[f"skeleton_vector_upstream"][0]
[docs]def skeleton_vector_upstream_y(node_dict):
return node_dict[f"skeleton_vector_upstream"][1]
[docs]def skeleton_vector_upstream_z(node_dict):
return node_dict[f"skeleton_vector_upstream"][2]
[docs]def skeleton_vector_downstream_x(node_dict):
return node_dict[f"skeleton_vector_downstream"][0]
[docs]def skeleton_vector_downstream_y(node_dict):
return node_dict[f"skeleton_vector_downstream"][1]
[docs]def skeleton_vector_downstream_z(node_dict):
return node_dict[f"skeleton_vector_downstream"][2]
[docs]def skeleton_vector_downstream_theta(node_dict):
return nu.polar_3D_from_cartesian(*node_dict[f"skeleton_vector_downstream"])[1]
[docs]def skeleton_vector_downstream_phi(node_dict):
return nu.polar_3D_from_cartesian(*node_dict[f"skeleton_vector_downstream"])[2]
[docs]def skeleton_vector_upstream_theta(node_dict):
return nu.polar_3D_from_cartesian(*node_dict[f"skeleton_vector_upstream"])[1]
[docs]def skeleton_vector_upstream_phi(node_dict):
return nu.polar_3D_from_cartesian(*node_dict[f"skeleton_vector_upstream"])[2]
[docs]def width_no_spine(node_dict):
return node_dict["width_new"]["no_spine_median_mesh_center"]
[docs]def axon_label(node_dict):
return int(node_dict["axon_compartment"] == "axon")
[docs]def dendrite_label(node_dict):
return int(node_dict["axon_compartment"] == "dendrite")
[docs]def compartment_proof(node_dict):
return int(nxu.compartment_index_swc_map[node_dict["compartment"]])
[docs]def compartment_one_hot(node_dict,compartment):
compartment = nu.convert_to_array_like(compartment)
return int(node_dict["compartment"] in compartment)
[docs]def apical_label(node_dict):
return compartment_one_hot(
node_dict,
["apical_tuft",
"oblique",
"apical_shaft",
"apical"])
[docs]def basal_label(node_dict):
return compartment_one_hot(
node_dict,
["basal"])
[docs]def feature_clip(
row_dict,
feature_name,
a_min=0,
a_max=100000,
):
feature = row_dict[feature_name]
if feature is None:
return a_max
return np.clip(feature,a_min = a_min,a_max=a_max)
[docs]def min_dist_synapses_pre_downstream_clip(
row_dict):
return feature_clip(
row_dict,
feature_name="min_dist_synapses_pre_downstream",
)
[docs]def min_dist_synapses_pre_upstream_clip(
row_dict):
return feature_clip(
row_dict,
feature_name="min_dist_synapses_pre_downstream",
)
auto_proof_filter_label_map = {
"valid":0,
"high_degree_branching_split_locations_before_filter":1,
"low_degree_branching_split_locations_before_filter":2,
"width_jump_up_axon_split_locations_before_filter":3,
"axon_on_dendrite_merges_split_locations_before_filter":4,
"high_degree_branching_dendrite_split_locations_before_filter":5,
"width_jump_up_dendrite_split_locations_before_filter":6,
"double_back_dendrite_split_locations_before_filter":7,
}
[docs]def auto_proof_filter_label(row_dict):
curr_filt = row_dict["auto_proof_filter"]
if curr_filt is None:
curr_filt = "valid"
return auto_proof_filter_label_map[curr_filt]
[docs]def merge_label(row_dict,merge_name):
curr_filt = row_dict["auto_proof_filter"]
if curr_filt is None:
curr_filt = "clean"
return merge_name in curr_filt
#return auto_proof_filter_label_map[curr_filt]
[docs]def merge_clean(row_dict):
return merge_label(
row_dict,
merge_name="clean")
[docs]def merge_high_degree_branching_label(row_dict):
return merge_label(
row_dict,
merge_name="high_degree_branching_split")
[docs]def merge_low_degree_branching_label(row_dict):
return merge_label(
row_dict,
merge_name="low_degree_branching_split")
[docs]def merge_width_jump_up_axon_label(row_dict):
return merge_label(
row_dict,
merge_name="width_jump_up_axon")
[docs]def merge_axon_on_dendrite_label(row_dict):
return merge_label(
row_dict,
merge_name="axon_on_dendrite")
[docs]def merge_high_degree_branching_dendrite_label(row_dict):
return merge_label(
row_dict,
merge_name="high_degree_branching_dendrite")
def merge_width_jump_up_dendrite_label(row_dict):
return merge_label(
row_dict,
merge_name="width_jump_up_dendrite")
[docs]def merge_double_back_dendrite_label(row_dict):
return merge_label(
row_dict,
merge_name="double_back_dendrite")
[docs]def merge_width_jump_up_dendrite_label(row_dict):
return merge_label(
row_dict,
merge_name="width_jump_up_dendrite")
[docs]def add_skeleton_vector_features(
G,
use_polar_coords = True,
verbose = False,
**kwargs
):
if not use_polar_coords:
attr_funcs = [skeleton_vector_upstream_x,
skeleton_vector_upstream_y,
skeleton_vector_upstream_z,
skeleton_vector_downstream_x,
skeleton_vector_downstream_y,
skeleton_vector_downstream_z,]
else:
if verbose:
print(f"Using polar coordinates")
attr_funcs = [skeleton_vector_downstream_theta,
skeleton_vector_downstream_phi,
skeleton_vector_upstream_theta,
skeleton_vector_upstream_phi,]
G_new_feats = nxf.add_node_feature(
G,
feature_func=attr_funcs,
**kwargs
)
return G_new_feats
[docs]def add_any_missing_node_features(
G,
features=None,
verbose = False,
inplace = False,
#default_value = 0,
):
"""
Purpose:
1) Check that all the features are requested
2) Generate the features that are not
"""
if features is None:
features = features_to_output_for_gnn
curr_features = nxf.features_list(G)
features_not_computed = np.setdiff1d(features,curr_features)
if verbose:
print(f"features_not_computed = {features_not_computed}")
G_new = nxf.add_node_feature(
G,
feature_func=features_not_computed,
verbose = verbose,
inplace = inplace,
#default_value=default_value
)
return G_new
features_to_output_for_gnn_old = [
"n_spines",
"total_spine_volume",
"n_synapses_post",
"n_synapses_pre",
"n_synapses_head",
"n_synapses_neck",
#"parent_skeletal_angle",
"skeletal_length",
"skeleton_vector_upstream_theta",
"skeleton_vector_upstream_phi",
"skeleton_vector_downstream_theta",
"skeleton_vector_downstream_phi",
"width_upstream",
"width_no_spine",
"width_downstream",
]
features_to_output_for_gnn = [
# skeletal features
'skeletal_length',
'skeleton_vector_upstream_theta',
'skeleton_vector_upstream_phi',
'skeleton_vector_downstream_theta',
'skeleton_vector_downstream_phi',
#width features
'width_upstream',
'width_no_spine',
'width_downstream',
# synapse features
'n_synapses_post',
'n_synapses_pre',
'n_synapses_head_postsyn',
'n_synapses_neck_postsyn',
'n_synapses_shaft_postsyn',
'n_synapses_no_head_postsyn',
# the volumes of each
'synapse_volume_shaft_postsyn_sum',
'synapse_volume_head_postsyn_sum',
'synapse_volume_no_head_postsyn_sum',
'synapse_volume_neck_postsyn_sum',
'synapse_volume_postsyn_sum',
#spine features
'n_spines',
'spine_volume_sum',
]
features_hierarchical = [
"soma_start_angle_max",
"max_soma_volume",
"n_syn_soma"]
features_to_output_for_gnn_hierarchical = features_to_output_for_gnn + features_hierarchical
[docs]def filter_G_features(
G,
features=None,
inplace = False,
verbose = False,
):
"""
Purpose: To reduce a networkx graph to a
certain number of features (and all other superflous features are deleted)
Ex:
axon_vs_dendrite_features = [
"mesh_volume",
"n_spines",
"total_spine_volume",
"n_synapses_post",
"n_synapses_pre",
#"n_synapse_head",
#"parent_skeletal_angle",
"skeletal_length",
"skeleton_vector_upstream_theta",
"skeleton_vector_upstream_phi",
"skeleton_vector_downstream_theta",
"skeleton_vector_downstream_phi",
"width_upstream",
"width_no_spine",
"width_downstream",
]
G_feat_filt = nxf.filter_G_features(
G,
features=axon_vs_dendrite_features,
inplace = False,
verbose = True,
)
xu.node_df(G_feat_filt)
"""
if not inplace:
G = copy.deepcopy(G)
G_ret = nxf.add_any_missing_node_features(
G,
features = features,
verbose = verbose
)
if verbose:
print(f"Number of features after adding missing ones = {len(xu.node_df(G_ret).columns)}")
G_ret = xu.delete_node_attributes(G_ret,attributes_not_to_delete=features)
if verbose:
print(f"Number of features after adding missing ones = {len(xu.node_df(G_ret).columns)}")
return G_ret
#--- from neuron_morphology_tools ---
from . import neuron_nx_utils as nxu
#--- from datasci_tools ---
from datasci_tools import networkx_utils as xu
from datasci_tools import numpy_utils as nu
from . import neuron_nx_feature_processing as nxf