Source code for neurd.concept_network_utils


import networkx as nx
from datasci_tools import numpy_dep as np

non_branching_upstream = False

[docs]def distance_between_nodes_di(limb_obj, start_idx, destination_idx, reverse_di_graph,): """ Purpose: To determine the distance between two nodes along a path """ branch_path = nru.branch_path_to_node(limb_obj, start_idx=start_idx, destination_idx=destination_idx, include_branch_idx=False, include_last_branch_idx=False, reverse_di_graph=reverse_di_graph) if branch_path is None: return np.inf return nst.skeletal_length_along_path(limb_obj,branch_path)
[docs]def distance_between_nodes_di_upstream(limb_obj, start_idx, destination_idx): """ Purpose: To determine the upstream distance from branch_idx to start_idx """ return cnu.distance_between_nodes_di(limb_obj, start_idx, destination_idx, reverse_di_graph=True)
[docs]def distance_between_nodes_di_downstream(limb_obj, start_idx, destination_idx): """ Purpose: To determine the upstream distance from branch_idx to start_idx """ return cnu.distance_between_nodes_di(limb_obj, start_idx, destination_idx, reverse_di_graph=False)
[docs]def branches_within_distance(limb_obj, branch_idx, dist_func, distance_threshold, include_branch_idx = False): """ Purpose: To find all branches with a certain downstream distance """ branch_names = np.array(limb_obj.get_branch_names()) branch_dist = np.array([dist_func(limb_obj,branch_idx,k) for k in branch_names]) #print(f"branch_names and dist = \n {np.vstack([branch_names,branch_dist]).T.astype("int")}") branches_within_dist = branch_names[(branch_dist<=distance_threshold) & (branch_dist != np.inf)] if include_branch_idx: return branches_within_dist else: return branches_within_dist[branches_within_dist != branch_idx]
[docs]def branches_within_distance_upstream(limb_obj, branch_idx, distance_threshold, include_branch_idx = False): return branches_within_distance(limb_obj, branch_idx, dist_func=cnu.distance_between_nodes_di_upstream, distance_threshold=distance_threshold, include_branch_idx = include_branch_idx)
[docs]def branches_within_distance_downstream(limb_obj, branch_idx, distance_threshold, include_branch_idx = False): """ Ex: nst.branches_within_distance_downstream(limb_obj,223, 2000) """ return branches_within_distance(limb_obj, branch_idx, dist_func=cnu.distance_between_nodes_di_downstream, distance_threshold=distance_threshold, include_branch_idx = include_branch_idx)
#find all end nodes within a downstream threshold
[docs]def endnode_branches_of_branches_within_distance_downtream(limb_obj, branch_idx, skip_distance=2000, return_skipped_branches=False, **kwargs): """ Purpose: To get the branches that are a certain distance away from a branch but to only return the furthermost branches Ex: limb_obj = neuron_obj[0] branch_idx = 223 cnu.endnode_branches_of_branches_within_distance_downtream(limb_obj, branch_idx, 0) """ downstream_branches = cnu.branches_within_distance_downstream(limb_obj,branch_idx, skip_distance) G = limb_obj.concept_network_directional G_sub = G.subgraph(downstream_branches) end_nodes = xu.end_nodes_of_digraph(G_sub) if return_skipped_branches: skipped_branches = np.setdiff1d(downstream_branches,end_nodes) return end_nodes,skipped_branches else: return end_nodes
[docs]def branches_with_parent_branching(limb_obj): return xu.nodes_with_parent_branching(limb_obj.concept_network_directional)
[docs]def branches_with_parent_non_branching(limb_obj): """ Purpose: To see if a branch had a parent node that branched off into multiple branches """ return xu.nodes_with_parent_non_branching(limb_obj.concept_network_directional)
[docs]def subgraph_around_branch(limb_obj, branch_idx, upstream_distance=0, downstream_distance=0, distance = None, distance_attribute = "skeletal_length", include_branch_in_upstream_dist=True, include_branch_in_downstream_dist=True, only_non_branching_downstream=True, only_non_branching_upstream = non_branching_upstream, include_branch_idx = True, return_branch_idxs=True, plot_subgraph = False, nodes_to_exclude=None, nodes_to_include = None, verbose = False ): """ Purpose: To return a subgraph around a certain branch to find all the nodes upstream and/or downstream Pseudocode: 1) Find all the branches upstream of branch (subtract the skeletal length of branch if included in upstream dist ) 2) Find all the branches downstrem of branch (subtract the skeletal length of branch if included in upstream dist ) 3) Find the upstream and downstream nodes a certain distance away Ex: cnu.subgraph_around_branch(limb_obj, branch_idx=97, upstream_distance=1000000, downstream_distance=1000000, distance = None, distance_attribute = "skeletal_length", include_branch_in_upstream_dist=True, include_branch_in_downstream_dist=True, only_non_branching_downstream=True, include_branch_idx = False, return_branch_idxs=True, plot_subgraph = True, verbose = False ) """ if distance is not None: upstream_distance = downstream_distance = distance branch_idx_dist = getattr(limb_obj[branch_idx],distance_attribute) if include_branch_in_upstream_dist: upstream_distance = upstream_distance - branch_idx_dist if include_branch_in_downstream_dist: downstream_distance = downstream_distance - branch_idx_dist if verbose: print(f"branch_idx_dist = {branch_idx_dist}") print(f"upstream_distance = {upstream_distance}") print(f"downstream_distance = {downstream_distance}") upstream_branches = cnu.branches_within_distance_upstream(limb_obj, branch_idx, upstream_distance, include_branch_idx=include_branch_idx) downstream_branches = cnu.branches_within_distance_downstream(limb_obj, branch_idx, downstream_distance, include_branch_idx=include_branch_idx) if verbose: print(f"upstream_branches= {upstream_branches}") print(f"downstream_branches = {downstream_branches}") if nodes_to_exclude is None: nodes_to_exclude = limb_obj.nodes_to_exclude #print(f"nodes_to_exclude = {nodes_to_exclude}") if nodes_to_exclude is not None: downstream_branches = np.setdiff1d(downstream_branches,nodes_to_exclude) upstream_branches = np.setdiff1d(upstream_branches,nodes_to_exclude) if verbose: print(f"Excluding Nodes: {nodes_to_exclude}") print(f"After exclusion:\n downstream_branches = {downstream_branches}\nupstream_branches = {upstream_branches}") if nodes_to_include is not None: downstream_branches = np.intersect1d(downstream_branches,nodes_to_include) upstream_branches = np.intersect1d(upstream_branches,nodes_to_include) if verbose: print(f"Limiting to only Include Nodes: {nodes_to_include}") print(f"After inclusion:\n downstream_branches = {downstream_branches}\nupstream_branches = {upstream_branches}") if only_non_branching_upstream: upstream_branches = np.intersect1d(upstream_branches,cnu.upstream_nodes_without_branching(limb_obj,branch_idx, nodes_to_exclude=nodes_to_exclude)) if verbose: print(f"\nAfter only_non_branching_upstream:\nupstream_branches = {upstream_branches} ") if only_non_branching_downstream: # downstream_branches = np.setdiff1d(downstream_branches,cnu.branches_with_parent_branching(limb_obj)) downstream_branches = np.intersect1d(downstream_branches,cnu.downstream_nodes_without_branching(limb_obj,branch_idx, nodes_to_exclude=nodes_to_exclude)) if verbose: print(f"\nAfter only_non_branching_downstream:\ndownstream_branches = {downstream_branches} ") total_branches = np.hstack([upstream_branches,downstream_branches]) if include_branch_idx: if branch_idx not in total_branches: total_branches = np.hstack([total_branches,[branch_idx]]) if verbose: print(f"total_branches = {total_branches}") if return_branch_idxs: return total_branches sub_G = limb_obj.concept_network_directional.subgraph(total_branches) if plot_subgraph: nx.draw(sub_G,with_labels=True) return sub_G
[docs]def downstream_nodes_without_branching(limb_obj, branch_idx, nodes_to_exclude = None): """ Purpose: To return all nodes that are downstream of a branch but not after a branching point """ if nodes_to_exclude is None: nodes_to_exclude = limb_obj.nodes_to_exclude downstream_nodes = [] curr_node = branch_idx for i,b in enumerate(limb_obj.get_branch_names()): d_nodes = nru.downstream_nodes(limb_obj,curr_node) if nodes_to_exclude is not None: d_nodes = np.setdiff1d(d_nodes,nodes_to_exclude) if len(d_nodes) == 0 or len(d_nodes) > 1: return downstream_nodes elif len(d_nodes) == 1: curr_node = d_nodes[0] downstream_nodes.append(curr_node) return downstream_nodes
[docs]def upstream_nodes_without_branching(limb_obj, branch_idx, nodes_to_exclude = None): """ Purpose: To return all nodes that are downstream of a branch but not after a branching point """ if nodes_to_exclude is None: nodes_to_exclude = limb_obj.nodes_to_exclude upstream_nodes = [] curr_node = branch_idx for i,b in enumerate(limb_obj.get_branch_names()): u_node = nru.upstream_node(limb_obj,curr_node) if u_node is None: return upstream_nodes d_nodes = nru.downstream_nodes(limb_obj,u_node) if nodes_to_exclude is not None: d_nodes = np.setdiff1d(d_nodes,nodes_to_exclude) if len(d_nodes) > 1: return upstream_nodes elif len(d_nodes) == 1: curr_node = u_node upstream_nodes.append(curr_node) return upstream_nodes
# ------ 6/25: Helps find attributes that are downstream or upstream ------- '''def downstream_attribute(limb_obj, branch_idx, attribute_name, concat_func = np.concatenate, downstream_distance = np.inf, include_branch_in_downstream_dist = True, only_non_branching_downstream = True, include_branch_idx = True, verbose = False, nodes_to_exclude = None, return_nodes = False, ): """ Purpose: To retrieve and concatenate the attributes of a branch and all of the branches downsream of the branch until there is a branching point or within a certain distance Pseudocode: 1) Get all of the branches that are downstream (either up to branch point or within certain distance) 2) Get the attributes of the branch and all those downstream 3) concatenate the attributes using the prescribed function """ # 1) Get all of the branches that are downstream # (either up to branch point or within certain distance) all_downstream_nodes = cnu.subgraph_around_branch(limb_obj, branch_idx = branch_idx, include_branch_idx=include_branch_idx, include_branch_in_downstream_dist = include_branch_in_downstream_dist, downstream_distance = downstream_distance, upstream_distance = -1, only_non_branching_downstream=only_non_branching_downstream, verbose = verbose ) if verbose: print(f"With downstream_distance= {downstream_distance}, only_non_branching_downstream = {only_non_branching_downstream}") print(f"all_downstream_nodes = {all_downstream_nodes}") if nodes_to_exclude is not None: all_downstream_nodes = np.setdiff1d(all_downstream_nodes,nodes_to_exclude) if verbose: print(f"Excluding Nodes: {nodes_to_exclude}") print(f"After exclusion: all_downstream_nodes = {all_downstream_nodes}") #2) Get the attributes of the branch and all those downstream down_attr = [getattr(limb_obj[k],attribute_name) for k in all_downstream_nodes] #3) concatenate the attributes using the prescribed function if len(down_attr) > 0: down_attr_concat = concat_func(down_attr) else: down_attr_concat = down_attr if return_nodes: return down_attr_concat,all_downstream_nodes else: return down_attr_concat '''
[docs]def other_direction(direction): if direction=="upstream": return "downstream" elif direction == "downstream": return "upstream" else: raise Exception(f"unknown direction: {direction}")
[docs]def nodes_upstream_downstream(limb_obj, branch_idx, direction, distance = np.inf,include_branch_in_dist = True, only_non_branching = True, include_branch_idx = True, verbose = False, nodes_to_exclude = None, nodes_to_include=None): """ Will return nodes that are upstream or downstream by a certain dist """ arg_dict = {f"include_branch_in_{direction}_dist":include_branch_in_dist, f"only_non_branching_{direction}":only_non_branching, f"{direction}_distance":distance, f"{other_direction(direction)}_distance":-1} # 1) Get all of the branches that are downstream # (either up to branch point or within certain distance) all_downstream_nodes = cnu.subgraph_around_branch(limb_obj, branch_idx = branch_idx, include_branch_idx=include_branch_idx, verbose = verbose, nodes_to_exclude=nodes_to_exclude, nodes_to_include=nodes_to_include, **arg_dict ) if verbose: print(f"With direction = {direction}, distance= {distance}, only_non_branching = {only_non_branching}, include_branch_in_dist = {include_branch_in_dist}") print(f"all_{direction}_nodes = {all_downstream_nodes}") return all_downstream_nodes
[docs]def nodes_downstream(limb_obj, branch_idx, distance = np.inf, include_branch_in_dist = False, only_non_branching = False, include_branch_idx = False, verbose = False, nodes_to_exclude = None, nodes_to_include = None): return nodes_upstream_downstream(limb_obj, branch_idx, direction="downstream", distance = distance, include_branch_in_dist = include_branch_in_dist, only_non_branching = only_non_branching, include_branch_idx = include_branch_idx, verbose = verbose, nodes_to_exclude = nodes_to_exclude, nodes_to_include=nodes_to_include,)
[docs]def nodes_upstream(limb_obj, branch_idx, distance = np.inf, include_branch_in_dist = False, only_non_branching = False, include_branch_idx = False, verbose = False, nodes_to_exclude = None, nodes_to_include = None): return nodes_upstream_downstream(limb_obj, branch_idx, direction="upstream", distance = distance, include_branch_in_dist = include_branch_in_dist, only_non_branching = only_non_branching, include_branch_idx = include_branch_idx, verbose = verbose, nodes_to_exclude = nodes_to_exclude, nodes_to_include=nodes_to_include,)
[docs]def attribute_upstream_downstream(limb_obj, branch_idx, direction, attribute_name=None, attribute_func = None, concat_func = np.concatenate, distance = np.inf, include_branch_in_dist = True, only_non_branching = True, include_branch_idx = True, verbose = False, nodes_to_exclude = None, return_nodes = False, ): """ Purpose: To retrieve and concatenate the attributes of a branch and all of the branches downsream of the branch until there is a branching point or within a certain distance Pseudocode: 1) Get all of the branches that are downstream (either up to branch point or within certain distance) 2) Get the attributes of the branch and all those downstream 3) concatenate the attributes using the prescribed function """ ''' arg_dict = {f"include_branch_in_{direction}_dist":include_branch_in_dist, f"only_non_branching_{direction}":only_non_branching, f"{direction}_distance":distance, f"{other_direction(direction)}_distance":-1} # 1) Get all of the branches that are downstream # (either up to branch point or within certain distance) all_downstream_nodes = cnu.subgraph_around_branch(limb_obj, branch_idx = branch_idx, include_branch_idx=include_branch_idx, verbose = verbose, nodes_to_exclude=nodes_to_exclude, **arg_dict ) if verbose: print(f"With direction = {direction}, distance= {distance}, only_non_branching = {only_non_branching}, include_branch_in_dist = {include_branch_in_dist}") print(f"all_{direction}_nodes = {all_downstream_nodes}")''' all_downstream_nodes = cnu.nodes_upstream_downstream(limb_obj, branch_idx, direction, distance = distance, include_branch_in_dist = include_branch_in_dist, only_non_branching = only_non_branching, include_branch_idx = include_branch_idx, verbose = verbose, nodes_to_exclude = nodes_to_exclude,) #2) Get the attributes of the branch and all those downstream if attribute_func is None: down_attr = [getattr(limb_obj[k],attribute_name) for k in all_downstream_nodes] else: down_attr = [attribute_func(limb_obj[k]) for k in all_downstream_nodes] #3) concatenate the attributes using the prescribed function if len(down_attr) > 0 and concat_func is not None: down_attr_concat = concat_func(down_attr) else: down_attr_concat = down_attr if return_nodes: return down_attr_concat,all_downstream_nodes else: return down_attr_concat
''' def upstream_attribute(limb_obj, branch_idx, attribute_name, concat_func = np.concatenate, upstream_distance = np.inf, include_branch_in_upstream_dist = True, only_non_branching_upstream = non_branching_upstream, include_branch_idx = True, verbose = False, return_nodes=False, nodes_to_exclude = None, **kwargs): """ Purpose: To retrieve and concatenate the attributes of a branch and all of the branches downsream of the branch until there is a branching point or within a certain distance Pseudocode: 1) Get all of the branches that are downstream (either up to branch point or within certain distance) 2) Get the attributes of the branch and all those downstream 3) concatenate the attributes using the prescribed function """ # 1) Get all of the branches that are downstream # (either up to branch point or within certain distance) all_upstream_nodes = cnu.subgraph_around_branch(limb_obj, branch_idx = branch_idx, include_branch_idx=include_branch_idx, include_branch_in_upstream_dist = include_branch_in_upstream_dist, only_non_branching_upstream = only_non_branching_upstream upstream_distance = upstream_distance, downstream_distance = -1, verbose = verbose ) if verbose: print(f"With upstream_distance= {upstream_distance}, only_non_branching_upstream = {only_non_branching_upstream}, include_branch_idx = {include_branch_idx}") print(f"all_upstream_nodes = {all_upstream_nodes}") if nodes_to_exclude is not None: all_upstream_nodes = np.setdiff1d(all_upstream_nodes,nodes_to_exclude) if verbose: print(f"Excluding Nodes: {nodes_to_exclude}") print(f"After exclusion: all_upstream_nodes = {all_upstream_nodes}") #2) Get the attributes of the branch and all those downstream up_attr = [getattr(limb_obj[k],attribute_name) for k in all_upstream_nodes] #3) concatenate the attributes using the prescribed function up_attr_concat = concat_func(up_attr) if return_nodes: return up_attr_concat,all_upstream_nodes else: return up_attr_concat '''
[docs]def downstream_nodes_mesh_connected(limb_obj,branch_idx, n_points_of_contact = None, downstream_branches=None, verbose = False): """ Purpose: will determine if at least N number of points of contact between the upstream and downstream meshes Ex: nst.downstream_nodes_mesh_connected(limb_obj,147, verbose=True) """ upstream_node = branch_idx if downstream_branches is None: downstream_nodes = cnu.downstream_nodes(limb_obj,upstream_node) else: downstream_nodes = downstream_branches if n_points_of_contact is None: n_points_of_contact = len(downstream_nodes) conn_array = tu.mesh_list_connectivity(meshes=[limb_obj[k].mesh for k in [upstream_node] + list(downstream_nodes) ], main_mesh = limb_obj.mesh) #intersect_array = nu.intersect2d(conn_array,np.array([[0,1],[0,2]])) if verbose: print(f"conn_array = {conn_array}") print(f"n_points_of_contact = {n_points_of_contact}") if len(conn_array) >= n_points_of_contact: return True else: return False
[docs]def skeleton_upstream_downstream(limb_obj, branch_idx, direction, distance = np.inf, only_non_branching=True, include_branch_idx = True, include_branch_in_dist=True, plot_skeleton = False, verbose = False, **kwargs ): """ Purpose: To get the downstream skeleton of a branch Ex: skel = downstream_skeleton(limb_obj, 96, only_non_branching_downstream = False, downstream_distance = 30000 ) """ skel = cnu.attribute_upstream_downstream(limb_obj = limb_obj, branch_idx = branch_idx, attribute_name = "skeleton", direction = direction, concat_func = sk.stack_skeletons, include_branch_idx=include_branch_idx, include_branch_in_dist = include_branch_in_dist, distance = distance, only_non_branching = only_non_branching, verbose = verbose, **kwargs) if plot_skeleton: nviz.plot_objects(skeletons=[skel]) return skel
[docs]def skeleton_downstream(limb_obj, branch_idx, distance = np.inf, only_non_branching=True, include_branch_idx = True, include_branch_in_dist = True, plot_skeleton = False, verbose = False, **kwargs ): return skeleton_upstream_downstream(limb_obj, branch_idx, direction="downstream", distance = distance, only_non_branching=only_non_branching, include_branch_idx = include_branch_idx, include_branch_in_dist = include_branch_in_dist, plot_skeleton = plot_skeleton, verbose = verbose, **kwargs )
[docs]def skeleton_upstream(limb_obj, branch_idx, distance = np.inf, only_non_branching=True, include_branch_idx = True, plot_skeleton = False, verbose = False, **kwargs ): return skeleton_upstream_downstream(limb_obj, branch_idx, direction="upstream", distance = distance, only_non_branching=only_non_branching, include_branch_idx = include_branch_idx, plot_skeleton = plot_skeleton, verbose = verbose, **kwargs )
[docs]def synapses_upstream_downstream(limb_obj, branch_idx, direction, distance = np.inf, only_non_branching=True, include_branch_in_dist = True, include_branch_idx = True, plot_synapses = False, verbose = False, synapse_type="synapses", return_nodes = False, nodes_to_exclude = None, **kwargs ): """ Purpose: To get the downstream synapses at a branch Ex: syns = downstream_synapses(limb_obj,16,downstream_distance = 0, include_branch_in_downstream_dist = False, only_non_branching_downstream=False, plot_synapses=True) E """ syns,nodes = cnu.attribute_upstream_downstream(limb_obj = limb_obj, branch_idx = branch_idx, direction=direction, attribute_name = synapse_type, concat_func = np.concatenate, include_branch_idx=include_branch_idx, distance = distance, only_non_branching = only_non_branching, include_branch_in_dist = include_branch_in_dist, verbose = verbose, nodes_to_exclude=nodes_to_exclude, return_nodes = True, **kwargs) if verbose: print(f"# of syns = {len(syns)}") if plot_synapses: downstream_nodes = [k for k in nodes if k != branch_idx] branch_idx_color = "red" d_color = "blue" scatter_color = "yellow" scatter_size = 1 print(f"branch_idx ({branch_idx}): {branch_idx_color}\n {direction} nodes ({downstream_nodes}): {d_color}") nviz.plot_objects(limb_obj[branch_idx].mesh, main_mesh_color=branch_idx_color, meshes = [limb_obj[k].mesh for k in downstream_nodes], meshes_colors=d_color, scatters=[[s.coordinate for s in syns]], scatter_size=scatter_size) if return_nodes: return syns,nodes else: return syns
[docs]def synapses_downstream(limb_obj, branch_idx, distance = np.inf, only_non_branching=True, include_branch_in_dist = True, include_branch_idx = True, plot_synapses = False, verbose = False, synapse_type="synapses", return_nodes = False, nodes_to_exclude = None, **kwargs ): return synapses_upstream_downstream(limb_obj, branch_idx, direction="downstream", distance = distance, only_non_branching=only_non_branching, include_branch_in_dist = include_branch_in_dist, include_branch_idx = include_branch_idx, plot_synapses = plot_synapses, verbose = verbose, synapse_type=synapse_type, return_nodes = return_nodes, nodes_to_exclude = nodes_to_exclude, **kwargs )
[docs]def synapses_upstream(limb_obj, branch_idx, distance = np.inf, only_non_branching=True, include_branch_in_dist = True, include_branch_idx = True, plot_synapses = False, verbose = False, synapse_type="synapses", return_nodes = False, nodes_to_exclude = None, **kwargs ): return synapses_upstream_downstream(limb_obj, branch_idx, direction="upstream", distance = distance, only_non_branching=only_non_branching, include_branch_in_dist = include_branch_in_dist, include_branch_idx = include_branch_idx, plot_synapses = plot_synapses, verbose = verbose, synapse_type=synapse_type, return_nodes = return_nodes, nodes_to_exclude = nodes_to_exclude, **kwargs )
[docs]def weighted_attribute_upstream_downstream(limb_obj, branch_idx, direction, attribute_name, attribute_func = None, verbose = False, filter_away_zero_sk_lengths=True, **kwargs): sk_lengths,nodes = cnu.attribute_upstream_downstream(limb_obj = limb_obj, branch_idx = branch_idx, direction=direction, attribute_name = "skeletal_length", concat_func = None, return_nodes=True, verbose = verbose, **kwargs) attr_values,nodes = cnu.attribute_upstream_downstream(limb_obj = limb_obj, branch_idx = branch_idx, direction=direction, attribute_name = attribute_name, attribute_func=attribute_func, concat_func = None, return_nodes=True, **kwargs) sk_lengths = np.array(sk_lengths) attr_values = np.array(attr_values) if verbose: print(f"sk_lengths = {sk_lengths}") print(f"{attribute_name} (aka attribute value) = {attr_values}") # if filter_away_zero_widths: if filter_away_zero_sk_lengths: keep_mask = sk_lengths > 0 attr_values = attr_values[keep_mask] sk_lengths = sk_lengths[keep_mask] if verbose: print(f"filter_away_zero_sk_lengths Set:") print(f"sk_lengths = {sk_lengths}") print(f"{attribute_name} (aka attribute value) = {attr_values}") if len(attr_values) != len(sk_lengths): raise Exception("") if len(attr_values) > 0: return nu.weighted_average(attr_values,sk_lengths) else: return 0
[docs]def width_upstream_downstream(limb_obj, branch_idx, direction, distance = np.inf, only_non_branching=True, include_branch_in_dist = True, include_branch_idx = True, verbose = False, width_func = None, width_attribute = None, nodes_to_exclude = None, **kwargs): if width_func is None: width_func = au.axon_width return cnu.weighted_attribute_upstream_downstream(limb_obj, branch_idx, direction=direction, attribute_name=width_attribute, attribute_func = width_func, verbose = verbose, include_branch_idx=include_branch_idx, distance = distance, only_non_branching = only_non_branching, include_branch_in_dist = include_branch_in_dist, nodes_to_exclude=nodes_to_exclude, **kwargs)
''' def width_upstream_downstream(limb_obj, branch_idx, direction, distance = np.inf, only_non_branching=True, include_branch_in_dist = True, include_branch_idx = True, verbose = False, width_func = au.axon_width, width_attribute = None, return_nodes = False, nodes_to_exclude = None, filter_away_zero_sk_lengths = True, **kwargs): """ Purpose: To find the up and downstream width Pseudocode: 1) Get all up/down sk lengths 2) Get all up/down widths 3) Filter away non-zeros widths if argument set 4) If arrays are non-empty, computed the weighted average """ sk_lengths,nodes = cnu.attribute_upstream_downstream(limb_obj = limb_obj, branch_idx = branch_idx, direction=direction, attribute_name = "skeletal_length", concat_func = None, include_branch_idx=include_branch_idx, distance = distance, only_non_branching = only_non_branching, include_branch_in_dist = include_branch_in_dist, verbose = verbose, nodes_to_exclude=nodes_to_exclude, return_nodes = True, **kwargs) widths,nodes = cnu.attribute_upstream_downstream(limb_obj = limb_obj, branch_idx = branch_idx, direction=direction, attribute_name = width_attribute, attribute_func = width_func, concat_func = None, include_branch_idx=include_branch_idx, distance = distance, only_non_branching = only_non_branching, include_branch_in_dist = include_branch_in_dist, verbose = verbose, nodes_to_exclude=nodes_to_exclude, return_nodes = True, **kwargs) sk_lengths = np.array(sk_lengths) widths = np.array(widths) if verbose: print(f"sk_lengths = {sk_lengths}") print(f"widths = {widths}") # if filter_away_zero_widths: if filter_away_zero_sk_lengths: keep_mask = sk_lengths > 0 widths = widths[keep_mask] sk_lengths = sk_lengths[keep_mask] if verbose: print(f"filter_away_zero_sk_lengths Set:") print(f"sk_lengths = {sk_lengths}") print(f"widths = {widths}") if len(widths) != len(sk_lengths): raise Exception("") if len(widths) > 0: return nu.weighted_average(widths,sk_lengths) else: return 0 '''
[docs]def width_upstream(limb_obj, branch_idx, distance = np.inf, only_non_branching=True, include_branch_in_dist = True, include_branch_idx = True, verbose = False, width_func = None, width_attribute = None, nodes_to_exclude = None, **kwargs): """ cnu.width_downstream(limb_obj, branch_idx = 65, distance = np.inf, only_non_branching=False, include_branch_in_dist = True, include_branch_idx = True, verbose = False, width_func = au.axon_width, width_attribute = None, return_nodes = False, nodes_to_exclude = None,) """ if width_func is None: width_func = au.axon_width return width_upstream_downstream(limb_obj, branch_idx, direction="upstream", distance = distance, only_non_branching=only_non_branching, include_branch_in_dist = include_branch_in_dist, include_branch_idx = include_branch_idx, verbose = verbose, width_func = width_func, width_attribute = width_attribute, nodes_to_exclude = nodes_to_exclude, **kwargs)
[docs]def width_downstream(limb_obj, branch_idx, distance = np.inf, only_non_branching=True, include_branch_in_dist = True, include_branch_idx = True, verbose = False, width_func = None, width_attribute = None, nodes_to_exclude = None, **kwargs): if width_func is None: width_func = au.axon_width return width_upstream_downstream(limb_obj, branch_idx, direction="downstream", distance = distance, only_non_branching=only_non_branching, include_branch_in_dist = include_branch_in_dist, include_branch_idx = include_branch_idx, verbose = verbose, width_func = width_func, width_attribute = width_attribute, nodes_to_exclude = nodes_to_exclude, **kwargs)
[docs]def skeletal_length_upstream_downstream(limb_obj, branch_idx, direction, distance = np.inf, only_non_branching=True, include_branch_in_dist = True, include_branch_idx = True, verbose = False, return_nodes = False, nodes_to_exclude = None, **kwargs): """ Purpose: To find the up and downstream width Pseudocode: 1) Get all up/down sk lengths 2) Get all up/down widths 3) Filter away non-zeros widths if argument set 4) If arrays are non-empty, computed the weighted average """ sk_len,nodes = cnu.attribute_upstream_downstream(limb_obj = limb_obj, branch_idx = branch_idx, direction=direction, attribute_name = "skeletal_length", concat_func = np.sum, include_branch_idx=include_branch_idx, distance = distance, only_non_branching = only_non_branching, include_branch_in_dist = include_branch_in_dist, verbose = verbose, nodes_to_exclude=nodes_to_exclude, return_nodes = True, **kwargs) if verbose: print(f"sk_len = {sk_len} for {len(nodes)} branches ({nodes})") if return_nodes: return sk_len,nodes else: return sk_len
[docs]def skeletal_length_upstream(limb_obj, branch_idx, distance = np.inf, only_non_branching=True, include_branch_in_dist = True, include_branch_idx = True, verbose = False, return_nodes = False, nodes_to_exclude = None, **kwargs): return skeletal_length_upstream_downstream(limb_obj, branch_idx, direction = "upstream", distance = distance, only_non_branching=only_non_branching, include_branch_in_dist = include_branch_in_dist, include_branch_idx = include_branch_idx, verbose = verbose, return_nodes = return_nodes, nodes_to_exclude = nodes_to_exclude, **kwargs)
[docs]def skeletal_length_downstream(limb_obj, branch_idx, distance = np.inf, only_non_branching=True, include_branch_in_dist = True, include_branch_idx = True, verbose = False, return_nodes = False, nodes_to_exclude = None, **kwargs): return skeletal_length_upstream_downstream(limb_obj, branch_idx, direction = "downstream", distance = distance, only_non_branching=only_non_branching, include_branch_in_dist = include_branch_in_dist, include_branch_idx = include_branch_idx, verbose = verbose, return_nodes = return_nodes, nodes_to_exclude = nodes_to_exclude, **kwargs)
# ------------ synapse density ---------- #
[docs]def synapse_density_upstream_downstream(limb_obj, branch_idx, direction, distance = np.inf, only_non_branching=True, include_branch_in_dist = True, include_branch_idx = True, verbose = False, synapse_density_type = "synapse_density", nodes_to_exclude = None, **kwargs): """ Purpose: To find the up and downstream width Pseudocode: 1) Get all up/down sk lengths 2) Get all up/down widths 3) Filter away non-zeros widths if argument set 4) If arrays are non-empty, computed the weighted average """ return cnu.weighted_attribute_upstream_downstream(limb_obj = limb_obj, branch_idx = branch_idx, direction=direction, attribute_name = synapse_density_type, include_branch_idx=include_branch_idx, distance = distance, only_non_branching = only_non_branching, include_branch_in_dist = include_branch_in_dist, verbose = verbose, nodes_to_exclude=nodes_to_exclude, **kwargs)
[docs]def synapse_density_upstream(limb_obj, branch_idx, distance = np.inf, only_non_branching=True, include_branch_in_dist = True, include_branch_idx = True, verbose = False, synapse_density_type = "synapse_density", nodes_to_exclude = None, filter_away_zero_widths = True, **kwargs): """ cnu.width_downstream(limb_obj, branch_idx = 65, distance = np.inf, only_non_branching=False, include_branch_in_dist = True, include_branch_idx = True, verbose = False, width_func = au.axon_width, width_attribute = None, return_nodes = False, nodes_to_exclude = None, filter_away_zero_widths = True,) """ return synapse_density_upstream_downstream(limb_obj, branch_idx, direction="upstream", distance = distance, only_non_branching=only_non_branching, include_branch_in_dist = include_branch_in_dist, include_branch_idx = include_branch_idx, verbose = verbose, synapse_density_type=synapse_density_type, nodes_to_exclude = nodes_to_exclude, **kwargs)
[docs]def synapse_density_downstream(limb_obj, branch_idx, distance = np.inf, only_non_branching=True, include_branch_in_dist = True, include_branch_idx = True, verbose = False, synapse_density_type= "synapse_density", nodes_to_exclude = None, **kwargs): return synapse_density_upstream_downstream(limb_obj, branch_idx, direction="downstream", distance = distance, only_non_branching=only_non_branching, include_branch_in_dist = include_branch_in_dist, include_branch_idx = include_branch_idx, verbose = verbose, synapse_density_type=synapse_density_type, nodes_to_exclude = nodes_to_exclude, **kwargs)
[docs]def downstream_nodes(limb_obj, branch_idx): """ Will give the downstream nodes excluding the nodes to be excluded """ down_nodes = nru.downstream_nodes(limb_obj,branch_idx) return np.setdiff1d(down_nodes,limb_obj.nodes_to_exclude)
[docs]def all_downtream_branches(limb_obj, branch_idx): return xu.all_downstream_nodes(limb_obj.concept_network_directional,branch_idx)
[docs]def all_downtream_branches_including_branch(limb_obj, branch_idx): return xu.all_downstream_nodes_including_node(limb_obj.concept_network_directional,branch_idx)
[docs]def all_upstream_branches(limb_obj, branch_idx): return xu.all_upstream_nodes(limb_obj.concept_network_directional,branch_idx)
[docs]def all_upstream_branches_including_branch(limb_obj, branch_idx): return xu.all_upstream_nodes_including_node(limb_obj.concept_network_directional,branch_idx)
[docs]def skeleton_downstream_restricted(limb_obj, branch_idx, downstream_skeletal_length, downstream_nodes=None, nodes_to_exclude=None, plot_downstream_skeleton = False, plot_restricted_skeleton = False, verbose = False, ): """ Purpose: To get restricted downstream skeleton starting from the upstream node and going a certain distance Application: will help select a part of skeleton that we want to find the width around (for axon identification purposes) Psuedocode: 1) Get the downstream skeleton 2) Get the upstream coordinate and restrict the skeleton to a certain distance away from the upstream coordinate 3) Calculate the new width based on the skeleton and the meshes """ #1) Get the downstream skeleton if downstream_nodes is not None: nodes_to_exclude = np.setdiff1d(list(limb_obj.get_branch_names()),downstream_nodes) if verbose: print(f"nodes_to_exclude = {nodes_to_exclude}") downstream_sk = cnu.skeleton_downstream(limb_obj, branch_idx=branch_idx, distance=downstream_skeletal_length, only_non_branching=False, nodes_to_exclude=nodes_to_exclude, plot_skeleton=False) upstream_coordinate = nru.upstream_endpoint(limb_obj,branch_idx) if verbose: print(f"upstream_coordinate = {upstream_coordinate}") if plot_downstream_skeleton: print(f"Plotting Downstream skeleton:") nviz.plot_objects(limb_obj.mesh, skeletons=[downstream_sk], scatters=[upstream_coordinate], scatter_size=1) restr_sk = sk.restrict_skeleton_to_distance_from_coordinate(downstream_sk, coordinate = upstream_coordinate, distance_threshold=downstream_skeletal_length) if plot_restricted_skeleton: print(f"Plotting Restricted skeleton:") nviz.plot_objects(limb_obj.mesh, skeletons=[restr_sk], scatters=[upstream_coordinate], scatter_size=1) return restr_sk
[docs]def width_downstream_restricted(limb_obj, branch_idx, downstream_skeletal_length, downstream_nodes = None, plot_restricted_skeleton = False, remove_spines_from_mesh = True, verbose = False, **kwargs): """ Purpose: To find the width around a skeleton starting from a certain branch and uptream coordinate Ex: from neurd import concept_network_utils as cnu cnu.width_downstream_restricted( limb_obj = neuron_obj_exc_syn_sp[0], branch_idx = 21, downstream_skeletal_length = 30_000, downstream_nodes = [21,26,30,35], nodes_to_exclude=None, plot_restricted_skeleton = True, remove_spines_from_mesh = True, verbose = True) """ #1) Get the downstream skeleton if downstream_nodes is not None: nodes_to_exclude = np.setdiff1d(list(limb_obj.get_branch_names()),downstream_nodes) if verbose: print(f"nodes_to_exclude = {nodes_to_exclude}") old_width = cnu.width_downstream(limb_obj, branch_idx, distance=downstream_skeletal_length, nodes_to_exclude=nodes_to_exclude, #width_func=nst.width_new, width_func=nst.width_basic, ) if verbose: print(f"old_width = {old_width}") restr_sk = cnu.skeleton_downstream_restricted( limb_obj = limb_obj, branch_idx = branch_idx, downstream_skeletal_length = downstream_skeletal_length, downstream_nodes = downstream_nodes, plot_restricted_skeleton=False, **kwargs ) ref_mesh = limb_obj.mesh if remove_spines_from_mesh: spine_meshes = limb_obj.spines if verbose: print(f"ref_mesh before spine remove = {ref_mesh}") if len(spine_meshes) > 0: ref_mesh = tu.subtract_mesh(ref_mesh,tu.combine_meshes(spine_meshes), error_for_exact_match=False) if verbose: print(f"ref_mesh after spine_remove = {ref_mesh}") if plot_restricted_skeleton: print(f"Plotting Restricted skeleton:") nviz.plot_objects(ref_mesh, skeletons=[restr_sk],) new_width = wu.new_width_from_mesh_skeleton(restr_sk, ref_mesh, backup_width=old_width, verbose = False, ) if verbose: print(f"new_width = {new_width}") return new_width
[docs]def G_weighted_from_limb(limb_obj, weight_name = "weight", upstream_attribute_for_weight = "skeletal_length", node_properties = None): """ Purpose: Convert the concept_network_directional to a graph with weighted edges being the length of the upstream edge Pseudocode: 1) Copy the concept network directional 2) Add the edge weight property 3) Add any node properties requested Ex: G = cnu.G_weighted_from_limb(limb_obj, weight_name = "weight", upstream_attribute_for_weight = "skeletal_length", node_properties = [nst.width_new]) from datasci_tools import numpy_utils as nu nu.turn_off_scientific_notation() xu.get_node_attributes(G,"width_new",24) xu.get_edges_with_weights(G) """ G = xu.copy_G_without_data(limb_obj.concept_network_directional) for (n1,n2) in G.edges(): G[n1][n2][weight_name] = nst.get_stat(limb_obj[n1],upstream_attribute_for_weight) if node_properties is not None: for n_prop in node_properties: for n in G.nodes(): if type(n_prop) == str: curr_name = n_prop else: curr_name = str(n_prop.__name__) G.nodes[n][curr_name] = nst.get_stat(limb_obj[n],n_prop) return G
[docs]def all_downstream_branches_from_branches(limb_obj, branches, include_original_branches=False, verbose = False): if not nu.is_array_like(branches): branches = [branches] all_downs = np.concatenate([cnu.all_downtream_branches(limb_obj, k) for k in branches]) if include_original_branches: all_downs = np.concatenate([all_downs,branches]) else: all_downs = np.setdiff1d(all_downs,branches) downstream_nodes = np.unique(all_downs) if verbose: print(f"downstream_nodes = {downstream_nodes}") return downstream_nodes
[docs]def all_upstream_branches_from_branches(limb_obj, branches, include_original_branches=False, verbose = False): if not nu.is_array_like(branches): branches = [branches] all_downs = np.concatenate([cnu.all_upstream_branches(limb_obj, k) for k in branches]) if include_original_branches: all_downs = np.concatenate([all_downs,branches]) else: all_downs = np.setdiff1d(all_downs,branches) upstream_nodes = np.unique(all_downs) if verbose: print(f"upstream_nodes = {upstream_nodes}") return upstream_nodes
# ---- helps with developing statistics over current/above/below branches
[docs]def feature_over_branches( limb_obj, branches, direction = None,#downstream or upstream include_original_branches_in_direction = False, # argument for computing the feature feature_name=None, feature_function=None, combining_function=None, return_skeletal_length = False, verbose = False, **kwargs): """ Purpose: To find the average value over a list of branches Pseudocode: 1) convert the branches list into the branches that will be used to compute the statistic 2) Compute the skeletal length for all the branches 3) Compute the statistic for all the nodes Ex: feature_over_branches(limb_obj = n_obj_2[6], branches = [24,2], direction="upstream", verbose = True, feature_function=ns.width ) """ #1) convert the branches list into the branches #that will be used to compute the statistic if direction is not None: branches = getattr(cnu,f"all_{direction}_branches_from_branches")(limb_obj, branches, include_original_branches=include_original_branches_in_direction) if verbose: print(f"New branches computed with direction ({direction}): {branches}") #2) Compute the skeletal length for all the branches sk_len = [limb_obj[k].skeletal_length for k in branches] if verbose: print(f"sk_len = {sk_len}") #3) compute statistics over branches branches_val = nru.feature_over_branches(limb_obj, branch_list = branches, feature_name=feature_name, feature_function=feature_function, combining_function=combining_function, **kwargs) if verbose: print(f"branches_val = {branches_val}") if return_skeletal_length: return branches_val,sk_len else: return branches_val
[docs]def weighted_feature_over_branches( limb_obj, branches, direction = None,#downstream or upstream include_original_branches_in_direction = False, # argument for computing the feature feature_name=None, feature_function=None, combining_function=None, default_value = 0, verbose = False, **kwargs): """ Purpose: To find the average value over a list of branches Pseudocode: 1) Find features over branches with skeletal length 4) Do a weighted average based on skeletal length Ex: weighted_feature_over_branches(limb_obj = n_obj_2[6], branches = [24,2], direction="upstream", verbose = True, feature_function=ns.width ) """ branches_val,sk_len = cnu.feature_over_branches( limb_obj, branches, direction = direction,#downstream or upstream include_original_branches_in_direction = include_original_branches_in_direction, # argument for computing the feature feature_name=feature_name, feature_function=feature_function, combining_function=combining_function, return_skeletal_length = True, verbose = verbose, **kwargs) if len(sk_len) == 0: return_value = default_value else: return_value = nu.weighted_average(branches_val,sk_len) if verbose: print(f"Weighted value = {return_value}") return return_value
[docs]def sum_feature_over_branches( limb_obj, branches, direction = None,#downstream or upstream include_original_branches_in_direction = False, # argument for computing the feature feature_name=None, feature_function=None, combining_function=None, default_value = 0, verbose = False, **kwargs): """ Purpose: To find the average value over a list of branches Pseudocode: 1) Find features over branches with skeletal length 4) Do a weighted average based on skeletal length Ex: cnu.sum_feature_over_branches(limb_obj = n_obj_2[6], branches = [24,2], direction="upstream", verbose = True, feature_function=ns.width ) """ branches_val,sk_len = cnu.feature_over_branches( limb_obj, branches, direction = direction,#downstream or upstream include_original_branches_in_direction = include_original_branches_in_direction, # argument for computing the feature feature_name=feature_name, feature_function=feature_function, combining_function=combining_function, return_skeletal_length = True, verbose = verbose, **kwargs) if len(sk_len) == 0: return_value = default_value else: return_value = np.sum(branches_val) if verbose: print(f"Sum value = {return_value}") return return_value
[docs]def all_downstream_nodes(limb_obj,branch_idx): return xu.all_downstream_nodes(limb_obj.concept_network_directional, branch_idx)
[docs]def upstream_branches_in_branches_list(limb_obj, branches): """ Purpose: To return branch idxs where other branch idxs are in the downstream nodes of a current branch idx Pseudocode: For each branch idx 1) Get all the downstream nodes 2) Add it to the upstream list if intersect exists """ upstream_nodes = [] for b in branches: try: all_down = cnu.all_downstream_nodes(limb_obj,b) except: continue if len(np.intersect1d(all_down,branches)) > 0: upstream_nodes.append(b) return upstream_nodes
#--- from neurd_packages --- from . import axon_utils as au from . import neuron_statistics as nst from . import neuron_utils as nru from . import neuron_visualizations as nviz from . import width_utils as wu #--- from mesh_tools --- from mesh_tools import skeleton_utils as sk from mesh_tools import trimesh_utils as tu #--- from datasci_tools --- from datasci_tools import networkx_utils as xu from datasci_tools import numpy_dep as np from datasci_tools import numpy_utils as nu from . import concept_network_utils as cnu