Source code for neurd.graph_filters

'''








'''
import matplotlib.pyplot as plt
import networkx as nx
from datasci_tools import numpy_dep as np
from datasci_tools import module_utils as modu
from datasci_tools import general_utils as gu

[docs]def upstream_pair_singular(limb_obj, G=None, upstream_branch=None, downstream_branches=None, plot_starting_branches = False, offset=1000,#1500, comparison_distance = 2000, plot_extracted_skeletons = False, worst_case_sk_angle_match_threshold = 65, width_diff_max = 75,#np.inf,100, width_diff_perc = 0.60, perform_synapse_filter = True, synapse_density_diff_threshold = 0.00015, #was 0.00021 n_synapses_diff_threshold = 6, plot_G_local_edge = False, # ----- Phase B.3: parameters for global attributes --- #args for definite pairs perform_global_edge_filter = True, sk_angle_match_threshold = 45, sk_angle_buffer = 27, width_diff_perc_threshold = 0.15, width_diff_perc_buffer = 0.30, # ----- Phase B.4 paraeters for global query --- plot_G_global_edge = False, # ------- For Node Query ----# perform_node_filter = False, use_exclusive_partner = True, plot_G_node_edge = False, kiss_check = False, kiss_check_bbox_longest_side_threshold = 450, # ---- Phase D: Picking the final winner ----- plot_final_branch_matches = False, match_method = "all_error_if_not_one_match",# "best_match", #other option is "best_match" verbose = False, ): """ Purpose: To pair the upstream branch with a possible match with a downstream branch Pseudocode: 1) Use local edge filters 2) Use global edge filters 3) Perform node filters if requested 4) If the upstream and downstream node are alone in same component then has pairing --> if not return no pairing Ex: from datasci_tools import networkx_utils as xu import matplotlib.pyplot as plt import networkx as nx ed.upstream_pair_singular(G = G_saved, limb_obj = filt_neuron.axon_limb, upstream_branch = 65, downstream_branches =[30,47], ) """ plot_G_local_edge = True plot_G_global_edge = True plot_G_node_edge= True verbose = True # find the upstream and downstream branches if None if upstream_branch is None: upstream_branch = ed.upstream_node_from_G(G) if downstream_branches is None: downstream_branches = ed.downstream_nodes_from_G(G) downstream_branches = np.array(downstream_branches) if G is None: G_e_2 = nst.compute_edge_attributes_locally_upstream_downstream( limb_obj, upstream_branch = upstream_branch, downstream_branches = downstream_branches, offset=offset, comparison_distance=comparison_distance, plot_extracted_skeletons=plot_extracted_skeletons ) else: G_e_2 = G if plot_starting_branches: nviz.plot_branch_groupings(limb_obj = limb_obj, groupings = [[k] for k in G_e_2.nodes], verbose = False, plot_meshes = True, plot_skeletons = True,) #2) Filter the edges by local properties synapse_query = (f"((synapse_density_diff<{synapse_density_diff_threshold}) or" f" (n_synapses_diff < {n_synapses_diff_threshold}))") branch_match_query = (f"(((width_diff < {width_diff_max}) or (width_diff_percentage < {width_diff_perc}))" f" and (sk_angle < {worst_case_sk_angle_match_threshold}))") if perform_synapse_filter: branch_match_query += f"and {synapse_query}" if verbose: print(f"branch_match_query = :\n{branch_match_query}") G_edge_filt = xu.query_to_subgraph(G_e_2, edge_query=branch_match_query, verbose=verbose) if plot_G_local_edge: print(f"\n--- Before Local Query ---") print(xu.edge_df(G_e_2)) print("Afer Local query: ") print(xu.edge_df(G_edge_filt)) nx.draw(G_edge_filt,with_labels=True) plt.show() G = G_edge_filt if len(G_edge_filt.edges()) > 0 and perform_global_edge_filter: # ------------- Phase B.2: Looking at global features for query ------- # if verbose: print(f"Performing global features query") # 3) computes the global fetures edge_functions_global = dict(definite_partner_sk_delete=dict(function=nst.edges_to_delete_from_threshold_and_buffer, arguments=dict(threshold=sk_angle_match_threshold, buffer= sk_angle_buffer, verbose = False, edge_attribute = "sk_angle")), definite_partner_width_delete=dict(function=nst.edges_to_delete_from_threshold_and_buffer, arguments=dict(threshold=width_diff_perc_threshold, buffer= width_diff_perc_buffer, verbose = False, edge_attribute = "width_diff_percentage")) ) # 4) Filtering Graph by global properties (applying the definite filter pair) G_edge_filt_with_att = nst.compute_edge_attributes_globally(G_edge_filt, edge_functions_global) G_global_1 = xu.query_to_subgraph(G_edge_filt_with_att, edge_query="(definite_partner_sk_delete == False) or ((definite_partner_sk_delete != True) and (definite_partner_width_delete != True))", verbose=verbose) if plot_G_global_edge: print(f"\n--- Before Global Query ---") print(xu.edge_df(G_edge_filt_with_att)) print("Afer Global query: ") print(xu.edge_df(G_global_1)) nx.draw(G_global_1,with_labels=True) plt.show() G = G_global_1 if len(G_global_1.edges())>0 and perform_node_filter: if verbose: print(f"Performing node features query") # 5) Computing NOde features (for sfiltering on the upstream node edges) edge_functions_node_global = dict(above_threshold_delete=dict( function=nst.edges_to_delete_on_node_above_threshold_if_one_below, arguments=dict(threshold=sk_angle_match_threshold, verbose = False) ) ) if use_exclusive_partner: nodes_to_compute = list(G_global_1.nodes()) else: nodes_to_compute = branch_idx G_edge_filt_with_node_att = nst.compute_edge_attributes_around_node(G_global_1, edge_functions_node_global, nodes_to_compute=nodes_to_compute, ) # 6) Filtering graph based on node features G_global_2 = xu.query_to_subgraph(G_edge_filt_with_node_att, edge_query="above_threshold_delete != True", verbose=verbose) if plot_G_node_edge: print(f"\n--- Before Node Query ---") print(xu.edge_df(G_edge_filt_with_node_att)) print("Afer Node query: ") print(xu.edge_df(G_global_2)) nx.draw(G_global_2,with_labels=True) plt.show() G = G_global_2 # ------- Phase C: Optional Kiss Filter ------ """ ---------- 4/29 Addition: Kiss Filter ----------- Pseudocode: 0) Get the offset skeleton coordinates for all nodes in graph 1) find all the possible partitions of the remaining nodes """ if kiss_check: if verbose: print(f"Attempting to perform Kiss check") coordinate = nru.downstream_endpoint(limb_obj,upstream_branch) upstream_matches = xu.get_neighbors(G,upstream_branch) if len(upstream_matches)>1: print(f"Working on Kissing check because possible upstream matches greater than 1: {upstream_matches}") G = ed.cut_kissing_graph_edges(G,limb_obj, kiss_check_bbox_longest_side_threshold = kiss_check_bbox_longest_side_threshold, coordinate = coordinate, offset=offset, comparison_distance = comparison_distance, plot_offset_skeletons = False, plot_source_sink_vertices = False, plot_cut_vertices = False, plot_cut_bbox = False, verbose = False ) if verbose: print(f"Step 5b: Removing kissing edges") print(f"Remaining Edges = {G.edges()}") else: if verbose: print(f"Not doing kiss check because upstream_matches = {upstream_matches}") # Now check if any partners upstream_subgraph = np.array([list(k) for k in nx.connected_components(G) if upstream_branch in k][0]) if len(upstream_subgraph) == 1: winning_node = None error_branches = downstream_branches else: if match_method == "best_match": if verbose: print(f"Using best match method") winning_node = xu.get_neighbor_min_weighted_edge(G,upstream_branch) elif match_method == "lowest_angle_sum": if verbose: print(f"Using lowest_angle_sum method") raise Exception("hasn't been fixed to make sure the upstream node is guaranteed to be in the output graph") G_final = xu.graph_to_lowest_weighted_sum_singular_matches(G, verbose = verbose, return_graph = True) winning_node = xu.get_neighbors(G_final,upstream_branch) if len(winning_node) != 1: raise Exception(f"Not just one winning node: {winning_node}") else: winning_node = winning_node[0] elif match_method == "all_error_if_not_one_match": error_branches = downstream_branches if len(upstream_subgraph) == 2: winning_node = upstream_subgraph[upstream_subgraph!=upstream_branch][0] else: winning_node = None else: raise Exception(f"Unimplemented match_method : {match_method} ") error_branches = downstream_branches[downstream_branches!= winning_node] if verbose: print(f"for upstream node {upstream_branch}, winning_node = {winning_node}, error_branches = {error_branches}") if plot_final_branch_matches: nviz.plot_branch_groupings(limb_obj = limb_obj, groupings = G, verbose = False, plot_meshes = True, plot_skeletons = True,) return winning_node,error_branches
[docs]def graph_filter_adapter(G, limb_obj, motif, graph_filter_func, attempt_upstream_pair_singular=False, verbose = False, **kwargs): """ Purpose: To apply a graph filter to a specific local graph by 1) Determining if the graph filter should be run on this local graph 2) If it should be applied, run the graph filter and determine if there are any branches that should be errored out Pseudocode: 1) Takes motif 2) Runs motif on the graph 3) Sends the graph and the limb to the function to get a true false 4) If it is yes, then maybe run the upstream pair singular """ motif_matches = dmu.graph_matches(G,motif) if len(motif_matches) == 0: if verbose: print(f"Motif not found so returning no errors") return [] if verbose: print(f"motif_matches = {motif_matches}") d_nodes = ed.downstream_nodes_from_G(G) upstream_branch = ed.upstream_node_from_G(G) error_check = graph_filter_func(limb_obj=limb_obj, upstream_branch=upstream_branch, downstream_branches=d_nodes, G = G, verbose = verbose, **kwargs) if verbose: print(f"error_check = {error_check}") if error_check: if attempt_upstream_pair_singular: winning_node,error_branches = gf.upstream_pair_singular(limb_obj,G,**kwargs) if verbose: print(f"In attempt_upstream_pair_singular") print(f"winning_node = {winning_node}") else: error_branches = d_nodes else: error_branches = [] if verbose: print(f"error_branches = {error_branches}") return error_branches
[docs]def wide_angle_t_motif(child_width_maximum = 100000, child_width_minimum = 0, parent_width_maximum = 75, child_skeletal_threshold = 10000, child_skeletal_threshold_total = 0, child_angle_max = 40,): motif = f""" u -> d1 u -> d2 d1 -> d2 [sk_angle <= {child_angle_max}] u.node_type = "upstream" u.width_upstream < {parent_width_maximum} d1.width_downstream < {child_width_maximum} d1.width_downstream > {child_width_minimum} d1.skeletal_length_downstream > {child_skeletal_threshold} d2.skeletal_length_downstream > {skeletal_length_downstream} d1.skeletal_length_downstream_total > {child_skeletal_threshold_total} """ return motif
[docs]def axon_webbing_filter(G, limb_obj, #arguments for the motif finding child_width_maximum = None,#75, parent_width_maximum = None,#75, child_skeletal_threshold = None,#3000,#10000 child_skeletal_threshold_total = None,#10000, child_angle_max = None,#40, #arguments for checking there is a valid webbing web_size_threshold=None,#120, web_size_type="ray_trace_median", web_above_threshold = None,#True, verbose = False, attempt_upstream_pair_singular = None,#False, error_on_web_none = False, **kwargs): """ Purpose: To find the error branches from the axon webbing filter (no valid webbing if children branches are wide angle t and the parent width is low Pseudocode: 1) Motif checking 2) Check that downstream branches are connected 3) Checking the webbing 4) If Invalid webbing return the error branches Ex: from neurd import graph_filters as gf gf.axon_webbing_filter(G, limb_obj, verbose = True, child_angle_max=40, child_width_maximum=90, web_size_threshold = 300) """ if child_width_maximum is None: child_width_maximum = child_width_maximum_ax_web_ax_web_global if parent_width_maximum is None: parent_width_maximum = parent_width_maximum_ax_web_global if child_skeletal_threshold is None: child_skeletal_threshold = child_skeletal_threshold_ax_web_global if child_skeletal_threshold_total is None: child_skeletal_threshold_total = child_skeletal_threshold_total_ax_web_global if child_angle_max is None: child_angle_max = child_angle_max_ax_web_global if web_size_threshold is None: web_size_threshold = web_size_threshold_ax_web_global if web_size_type is None: web_size_type = web_size_type_ax_web_global if web_above_threshold is None: web_above_threshold = web_above_threshold_ax_web_global # ----- 1) Motif checking -------- # motif = gf.wide_angle_t_motif(child_width_maximum = child_width_maximum, # parent_width_maximum = parent_width_maximum, # child_skeletal_threshold = child_skeletal_threshold, # child_angle_max = child_angle_max,) motif = f""" u -> d1 u -> d2 d1 -> d2 [sk_angle <= {child_angle_max}] u.node_type = "upstream" u.width_upstream < {parent_width_maximum} d1.width_downstream < {child_width_maximum} d2.width_downstream < {child_width_maximum} d1.skeletal_length_downstream > {child_skeletal_threshold} d2.skeletal_length_downstream > {child_skeletal_threshold} d1.skeletal_length_downstream_total > {child_skeletal_threshold_total} d2.skeletal_length_downstream_total > {child_skeletal_threshold_total} """ def axon_web_func(limb_obj, upstream_branch, downstream_branches, verbose = False, **kwargs): mesh_connection = cnu.downstream_nodes_mesh_connected(limb_obj, upstream_branch, downstream_branches=downstream_branches, n_points_of_contact = 2) if verbose: print(f"mesh_connection = {mesh_connection}") if not mesh_connection: if verbose: print(f"Downstream nodes were not mesh connected so returning no errors") return False # ----- 3) Checking the webbing --------- curr_web = limb_obj[upstream_branch].web if curr_web is not None: valid_web_result = au.valid_web_for_t(curr_web, size_threshold = web_size_threshold, size_type = web_size_type, above_threshold = web_above_threshold, verbose=verbose) else: if error_on_web_none: raise Exception("No webbing computed") else: valid_web_result = True return not valid_web_result return gf.graph_filter_adapter(G=G, limb_obj=limb_obj, motif=motif, graph_filter_func=axon_web_func, attempt_upstream_pair_singular=attempt_upstream_pair_singular, verbose = verbose, **kwargs)
[docs]def axon_spine_at_intersection_filter(G, limb_obj, attempt_upstream_pair_singular = True, upstream_width_threshold = None,#110, downstream_width_threshold = None,#150, child_skeletal_threshold_total = None,#10000, verbose = False, **kwargs): """ Purpose: Find error branches if there is an axon spine that is downstream of the upstrea branch Pseudocode: 1) Get all downstsream nodes of the upstream branch 2) Find the intersection with the limb axon spines 3) if axon spine is detected If attempt_upstream_pair_singular: Run upstream_pair_singular and return error branches else: Return all downstream nodes as errors Ex: gf.axon_spine_at_intersection_filter(G, limb_obj = filt_neuron.axon_limb, attempt_upstream_pair_singular = True, verbose = True, **dict()) """ if upstream_width_threshold is None: upstream_width_threshold = upstream_width_threshold_ax_spine_at_inters_global if downstream_width_threshold is None: downstream_width_threshold = downstream_width_threshold_ax_spine_at_inters_global if child_skeletal_threshold_total is None: child_skeletal_threshold_total = child_skeletal_threshold_total_ax_spine_at_inters_global # ----- 1) Motif checking -------- motif = f""" u -> d1 u -> d2 u.node_type = "upstream" u.width_upstream < {upstream_width_threshold} d1.skeletal_length_downstream_total > {child_skeletal_threshold_total} d2.skeletal_length_downstream_total > {child_skeletal_threshold_total} d1.width_downstream < {downstream_width_threshold} d2.width_downstream < {downstream_width_threshold} """ def axon_spine_func(limb_obj, upstream_branch, downstream_branches, verbose = False, **kwargs): axon_spines_on_intersection = np.intersect1d(limb_obj.axon_spines, nru.downstream_nodes(limb_obj,upstream_branch)) if verbose: print(f"Current # of axon spines = {len(limb_obj.axon_spines)}") print(f"axon_spines_on_intersection = {axon_spines_on_intersection}") return len(axon_spines_on_intersection) > 0 return gf.graph_filter_adapter(G=G, limb_obj=limb_obj, motif=motif, graph_filter_func=axon_spine_func, attempt_upstream_pair_singular=attempt_upstream_pair_singular, verbose = verbose, **kwargs)
[docs]def min_synapse_dist_to_branch_point_filter(G, limb_obj, attempt_upstream_pair_singular = True, upstream_width_threshold = None,#110, downstream_width_threshold = None,#150, min_synape_dist = None,#1300, verbose = False, **kwargs): """ Purpose: Find error branches if there is a synapse at the intersection Pseudocode: 1) Find the min distance of a synapse to the branching point 3) if min distance of synapse is less than thresshold If attempt_upstream_pair_singular: Run upstream_pair_singular and return error branches else: Return all downstream nodes as errors Ex: gf.min_synapse_dist_to_branch_point_filter(G, limb_obj, verbose=True) """ if upstream_width_threshold is None: upstream_width_threshold = upstream_width_threshold_min_syn_dist_global if downstream_width_threshold is None: downstream_width_threshold = downstream_width_threshold_min_syn_dist_global if min_synape_dist is None: min_synape_dist = min_synape_dist_min_syn_dist_global # ----- 1) Motif checking -------- motif = f""" u -> d1 u -> d2 u.node_type = "upstream" u.width_upstream < {upstream_width_threshold} d1.width_downstream < {downstream_width_threshold} d2.width_downstream < {downstream_width_threshold} """ def min_synapse_dist_func(limb_obj, upstream_branch, downstream_branches, verbose = False, **kwargs): min_distance = nst.min_synapse_dist_to_branch_point(limb_obj, downstream_branches=downstream_branches, branch_idx = upstream_branch, plot_closest_synapse=False, synapse_type = "synapses_pre", verbose = verbose) if verbose: print(f"min_distance = {min_distance}") return min_distance < min_synape_dist return gf.graph_filter_adapter(G=G, limb_obj=limb_obj, motif=motif, graph_filter_func=min_synapse_dist_func, attempt_upstream_pair_singular=attempt_upstream_pair_singular, verbose = verbose, **kwargs)
[docs]def fork_divergence_filter(G, limb_obj, #arguments for the motif search downstream_width_max = None,#90, upstream_width_max = None,#90,# 145,#90, total_downstream_skeleton_length_threshold = None,#4000,#8000, individual_branch_length_threshold = None,#3000, #for the fork divergence divergence_threshold_mean = None,#160, attempt_upstream_pair_singular = False, comparison_distance = None, verbose = False, **kwargs): """ Purpose: Find error branches if there is a forking that is too close to each other Pseudocode: 1) Get all downstsream nodes of the upstream branch 3) if axon spine is detected If attempt_upstream_pair_singular: Run upstream_pair_singular and return error branches else: Return all downstream nodes as errors Ex: gf.axon_spine_at_intersection_filter(G, limb_obj = filt_neuron.axon_limb, attempt_upstream_pair_singular = True, verbose = True, **dict()) """ if downstream_width_max is None: downstream_width_max = downstream_width_max_fork_div_global if upstream_width_max is None: upstream_width_max = upstream_width_max_fork_div_global if total_downstream_skeleton_length_threshold is None: total_downstream_skeleton_length_threshold = total_downstream_skeleton_length_threshold_fork_div_global if individual_branch_length_threshold is None: individual_branch_length_threshold = individual_branch_length_threshold_fork_div_global if divergence_threshold_mean is None: divergence_threshold_mean = divergence_threshold_mean_fork_div_global if comparison_distance is None: comparison_distance = comparison_distance_fork_div_global # ----- 1) Motif checking -------- motif = f""" u -> d1 u -> d2 u.node_type = "upstream" u.width_upstream < {upstream_width_max} d1.width_downstream < {downstream_width_max} d2.width_downstream < {downstream_width_max} d1.skeletal_length_downstream > {individual_branch_length_threshold} d2.skeletal_length_downstream > {individual_branch_length_threshold} d1.skeletal_length_downstream_total > {total_downstream_skeleton_length_threshold} d2.skeletal_length_downstream_total > {total_downstream_skeleton_length_threshold} """ def fork_div_func(limb_obj, upstream_branch, downstream_branches, verbose = False, **kwargs): div = nst.fork_divergence(limb_obj,upstream_branch, downstream_idxs = downstream_branches, total_downstream_skeleton_length_threshold=0, individual_branch_length_threshold = 0, comparison_distance=comparison_distance, plot_restrictions=False, verbose = verbose) if verbose: print(f"div = {div}") return div < divergence_threshold_mean return gf.graph_filter_adapter(G=G, limb_obj=limb_obj, motif=motif, graph_filter_func=fork_div_func, attempt_upstream_pair_singular=attempt_upstream_pair_singular, verbose = verbose, **kwargs)
[docs]def fork_min_skeletal_distance_filter(G, limb_obj, #arguments for the motif search downstream_width_max = None,#90, upstream_width_max = None,#145,#90, total_downstream_skeleton_length_threshold = None,#4000, individual_branch_length_threshold = None,#4000, #for the fork divergence min_distance_threshold = None,#550,#600,#1150, attempt_upstream_pair_singular = False, verbose = False, **kwargs): """ Purpose: Find error branches if there is a forking that is too close to each other Pseudocode: 1) Get all downstsream nodes of the upstream branch 3) if axon spine is detected If attempt_upstream_pair_singular: Run upstream_pair_singular and return error branches else: Return all downstream nodes as errors Ex: gf.axon_spine_at_intersection_filter(G, limb_obj = filt_neuron.axon_limb, attempt_upstream_pair_singular = True, verbose = True, **dict()) """ if downstream_width_max is None: downstream_width_max = downstream_width_max_fork_min_dist_global if upstream_width_max is None: upstream_width_max = upstream_width_max_fork_min_dist_global if total_downstream_skeleton_length_threshold is None: total_downstream_skeleton_length_threshold = total_downstream_skeleton_length_threshold_fork_min_dist_global if individual_branch_length_threshold is None: individual_branch_length_threshold = individual_branch_length_threshold_fork_min_dist_global if min_distance_threshold is None: min_distance_threshold = min_distance_threshold_fork_min_dist_global # ----- 1) Motif checking -------- motif = f""" u -> d1 u -> d2 u.node_type = "upstream" u.width_upstream < {upstream_width_max} d1.width_downstream < {downstream_width_max} d2.width_downstream < {downstream_width_max} d1.skeletal_length_downstream > {individual_branch_length_threshold} d2.skeletal_length_downstream > {individual_branch_length_threshold} d1.skeletal_length_downstream_total > {total_downstream_skeleton_length_threshold} d2.skeletal_length_downstream_total > {total_downstream_skeleton_length_threshold} """ def fork_min_skeletal_distance_func(limb_obj, upstream_branch, downstream_branches, verbose = False, **kwargs): div = nst.fork_min_skeletal_distance(limb_obj,upstream_branch, downstream_idxs = downstream_branches, total_downstream_skeleton_length_threshold=0, individual_branch_length_threshold = 0, plot_skeleton_restriction = False, plot_min_pair=False, verbose = verbose) if verbose: print(f"div = {div} with threshold set to {min_distance_threshold}") return div < min_distance_threshold return gf.graph_filter_adapter(G=G, limb_obj=limb_obj, motif=motif, graph_filter_func=fork_min_skeletal_distance_func, attempt_upstream_pair_singular=attempt_upstream_pair_singular, verbose = verbose, **kwargs)
[docs]def axon_double_back_filter(G, limb_obj, #arguments for the motif search branch_skeletal_length_min = None,#2000,#4000, total_downstream_skeleton_length_threshold = None,#2000,#10000, upstream_skeletal_length_min = None,#5000, axon_width_threshold_thin = None,#180,#au.axon_ais_threshold axon_width_threshold_thick = None,#180,#au.axon_ais_threshold attempt_upstream_pair_singular = True, verbose = False, **kwargs): """ Purpose: Find errors if branches double back by too much Pseudocode: 1) """ if branch_skeletal_length_min is None: branch_skeletal_length_min = branch_skeletal_length_min_ax_double_b_global if total_downstream_skeleton_length_threshold is None: total_downstream_skeleton_length_threshold = total_downstream_skeleton_length_threshold_ax_double_b_global if upstream_skeletal_length_min is None: upstream_skeletal_length_min = upstream_skeletal_length_min_ax_double_b_global if axon_width_threshold_thin is None: axon_width_threshold_thin = axon_width_threshold_thin_ax_double_b_global if axon_width_threshold_thick is None: axon_width_threshold_thick = axon_width_threshold_thick_ax_double_b_global # ----- 1) Motif checking -------- motif = f""" u -> d1 [sk_angle >= {min_double_back_threshold}] u -> d2 u.node_type = "upstream" u.width_upstream < {axon_width_threshold_thick} u.skeletal_length_upstream_total > {upstream_skeletal_length_min} d1.skeletal_length_downstream > {branch_skeletal_length_min} d1.skeletal_length_downstream_total > {total_downstream_skeleton_length_threshold} """ def double_back_func(limb_obj, upstream_branch, downstream_branches, G, verbose = False, **kwargs): #determine the double back threshold downstream_branches = np.array(downstream_branches) double_back_threshold = ed.double_back_threshold_axon_by_width( width=G.nodes[upstream_branch]["width_upstream"], double_back_threshold_thin = ed.double_back_threshold_axon_thin, double_back_threshold_thick = ed.double_back_threshold_axon_thick, ) sk_angle_edges = np.array([G[upstream_branch][d]["sk_angle"] for d in downstream_branches]) double_back_branches = downstream_branches[sk_angle_edges > double_back_threshold] if verbose: print(f"double_back_threshold = {double_back_threshold}") print(f"sk_angle_edges = {sk_angle_edges}") print(f"Double back branches: {double_back_branches}") return len(double_back_branches) > 0 return gf.graph_filter_adapter(G=G, limb_obj=limb_obj, motif=motif, graph_filter_func=double_back_func, attempt_upstream_pair_singular=attempt_upstream_pair_singular, verbose = verbose, **kwargs)
[docs]def thick_t_filter(G, limb_obj, parent_width_maximum = None,#70, min_child_width_max = None,#78,#85, child_skeletal_threshold = None,#3000,#7000, child_skeletal_threshold_total = None,#10000, child_angle_max = None,#40, attempt_upstream_pair_singular = False, verbose = False, **kwargs): """ Purpose: To find the error branches from the axon thick t wide angle children Example: gf.thick_t_filter(G,limb_obj,verbose = True, parent_width_maximum = 110, child_angle_max=150, ) """ if parent_width_maximum is None: parent_width_maximum = parent_width_maximum_thick_t_global if min_child_width_max is None: min_child_width_max = min_child_width_max_thick_t_global if child_skeletal_threshold is None: child_skeletal_threshold = child_skeletal_threshold_thick_t_global if child_skeletal_threshold_total is None: child_skeletal_threshold_total = child_skeletal_threshold_total_thick_t_global if child_angle_max is None: child_angle_max = child_angle_max_thick_t_global # ----- 1) Motif checking -------- # motif = gf.wide_angle_t_motif(child_width_minimum = min_child_width_max, # parent_width_maximum = parent_width_maximum, # child_skeletal_threshold = child_skeletal_threshold, # child_angle_max = child_angle_max,) motif = f""" u -> d1 u -> d2 d1 -> d2 [sk_angle <= {child_angle_max}] u.node_type = "upstream" u.width_upstream < {parent_width_maximum} d1.width_downstream > {min_child_width_max} d1.skeletal_length_downstream > {child_skeletal_threshold} d2.skeletal_length_downstream > {child_skeletal_threshold} d1.skeletal_length_downstream_total > {child_skeletal_threshold_total} d2.skeletal_length_downstream_total > {child_skeletal_threshold_total} """ def thick_t_func(limb_obj, upstream_branch, downstream_branches, G, verbose = False, **kwargs): if verbose: print(f"Downstream angle = {G[downstream_branches[0]][downstream_branches[1]]['sk_angle']}") print(f"Downstream 1 widths = {G.nodes[downstream_branches[0]]['width_downstream']}") print(f"Downstream 2 widths = {G.nodes[downstream_branches[1]]['width_downstream']}") print(f"parent width = {G.nodes[upstream_branch]['width_upstream']}") return True return gf.graph_filter_adapter(G=G, limb_obj=limb_obj, motif=motif, graph_filter_func=thick_t_func, attempt_upstream_pair_singular=attempt_upstream_pair_singular, verbose = verbose, **kwargs)
# ------------------ 7/26 Inhibitory Filters ----------------
[docs]def axon_double_back_inh_filter(G, limb_obj, #arguments for the motif search branch_skeletal_length_min = None,#2000,#4000, total_downstream_skeleton_length_threshold = None,#2000,#10000, upstream_skeletal_length_min = None,#5000, axon_width_threshold_thin = None,#au.axon_ais_threshold, axon_width_threshold_thick = None,#au.axon_ais_threshold, attempt_upstream_pair_singular =None,# True, verbose = False, **kwargs): """ Purpose: Find errors if branches double back by too much Pseudocode: 1) """ if branch_skeletal_length_min is None: branch_skeletal_length_min = branch_skeletal_length_min_double_b_axon_inh_global if total_downstream_skeleton_length_threshold is None: total_downstream_skeleton_length_threshold = total_downstream_skeleton_length_threshold_double_b_axon_inh_global if upstream_skeletal_length_min is None: upstream_skeletal_length_min = upstream_skeletal_length_min_double_b_axon_inh_global if axon_width_threshold_thin is None: axon_width_threshold_thin = axon_width_threshold_thin_double_b_axon_inh_global if axon_width_threshold_thick is None: axon_width_threshold_thick = axon_width_threshold_thick_double_b_axon_inh_global if attempt_upstream_pair_singular is None: attempt_upstream_pair_singular = attempt_upstream_pair_singular_double_b_axon_inh_global # ----- 1) Motif checking -------- motif = f""" u -> d1 [sk_angle >= {min_double_back_threshold_inh}] u -> d2 u.node_type = "upstream" u.width_upstream < {axon_width_threshold_thick} u.skeletal_length_upstream_total > {upstream_skeletal_length_min} d1.skeletal_length_downstream > {branch_skeletal_length_min} d1.skeletal_length_downstream_total > {total_downstream_skeleton_length_threshold} """ def double_back_func(limb_obj, upstream_branch, downstream_branches, G, verbose = False, **kwargs): #determine the double back threshold downstream_branches = np.array(downstream_branches) double_back_threshold = ed.double_back_threshold_axon_by_width( width=G.nodes[upstream_branch]["width_upstream"], double_back_threshold_thin = ed.double_back_threshold_axon_thin_inh, double_back_threshold_thick = ed.double_back_threshold_axon_thick_inh, ) sk_angle_edges = np.array([G[upstream_branch][d]["sk_angle"] for d in downstream_branches]) double_back_branches = downstream_branches[sk_angle_edges > double_back_threshold] if verbose: print(f"double_back_threshold = {double_back_threshold}") print(f"sk_angle_edges = {sk_angle_edges}") print(f"Double back branches: {double_back_branches}") # if len(double_back_branches) > 0: # print(f"\n{upstream_branch}: sk_angle_edges = {sk_angle_edges[sk_angle_edges > double_back_threshold]}") return len(double_back_branches) > 0 return gf.graph_filter_adapter(G=G, limb_obj=limb_obj, motif=motif, graph_filter_func=double_back_func, attempt_upstream_pair_singular=attempt_upstream_pair_singular, verbose = verbose, **kwargs)
# ------------- parameters for stats --------------- global_parameters_dict_default = dict( #--- axon web filters --- child_width_maximum_ax_web_ax_web = 75, parent_width_maximum_ax_web = 75, child_skeletal_threshold_ax_web = 3000, child_skeletal_threshold_total_ax_web = 10000, child_angle_max_ax_web = 40, #arguments for checking there is a valid webbing web_size_threshold_ax_web = 120, web_size_type_ax_web = "ray_trace_median", web_above_threshold_ax_web = True, #-- thick_t_filter -- parent_width_maximum_thick_t = 70, min_child_width_max_thick_t = 78,#85, child_skeletal_threshold_thick_t = 3000,#7000, child_skeletal_threshold_total_thick_t = 10000, child_angle_max_thick_t = 40, # -- axon_double_back_filter-- branch_skeletal_length_min_ax_double_b = 2000,#4000, total_downstream_skeleton_length_threshold_ax_double_b = 2000,#10000, upstream_skeletal_length_min_ax_double_b = 5000, axon_width_threshold_thin_ax_double_b = 180,#au.axon_ais_threshold axon_width_threshold_thick_ax_double_b = 180,#au.axon_ais_threshold # -- fork_divergence_filter -- downstream_width_max_fork_div = 90, upstream_width_max_fork_div = 90,# 145,#90, total_downstream_skeleton_length_threshold_fork_div = 4000,#8000, individual_branch_length_threshold_fork_div = 3000, #for the fork divergence divergence_threshold_mean_fork_div = 160, comparison_distance_fork_div = 400, # --- fork_min_skeletal_distance_filter -- downstream_width_max_fork_min_dist = 90, upstream_width_max_fork_min_dist = 145,#90, total_downstream_skeleton_length_threshold_fork_min_dist = 4000, individual_branch_length_threshold_fork_min_dist = 4000, #for the fork divergence min_distance_threshold_fork_min_dist = 550,#600,#1150, #--axon_spine_at_intersection_filter-- upstream_width_threshold_ax_spine_at_inters = 110, downstream_width_threshold_ax_spine_at_inters = 150, child_skeletal_threshold_total_ax_spine_at_inters = 10000, # -- min_synapse_dist_to_branch_point_filter -- upstream_width_threshold_min_syn_dist = 110, downstream_width_threshold_min_syn_dist = 150, min_synape_dist_min_syn_dist = 1300, # -- axon_double_back_inh_filter --- branch_skeletal_length_min_double_b_axon_inh = 2000,#4000, total_downstream_skeleton_length_threshold_double_b_axon_inh = 2000,#10000, upstream_skeletal_length_min_double_b_axon_inh = 5000, axon_width_threshold_thin_double_b_axon_inh = 180, axon_width_threshold_thick_double_b_axon_inh = 180, attempt_upstream_pair_singular_double_b_axon_inh = True, ) attributes_dict_default = dict( default_low_degree_graph_filters = [ axon_webbing_filter, thick_t_filter, axon_double_back_filter, fork_divergence_filter, fork_min_skeletal_distance_filter, axon_spine_at_intersection_filter, min_synapse_dist_to_branch_point_filter, ] ) # ------- microns ----------- global_parameters_dict_microns = {} attributes_dict_microns = {} # --------- h01 ------------- global_parameters_dict_h01 = dict([(k,1.15*v) if "width" in k else (k,v) for k,v in global_parameters_dict_default.items()]) global_parameters_dict_h01_update = dict( branch_skeletal_length_min_ax_double_b = 4000, branch_skeletal_length_min_double_b_axon_inh = 4000, comparison_distance_fork_div = 800, ) global_parameters_dict_h01.update(global_parameters_dict_h01_update) attributes_dict_h01 = dict( default_low_degree_graph_filters = [ axon_webbing_filter, thick_t_filter, axon_double_back_filter, fork_divergence_filter, fork_min_skeletal_distance_filter, #axon_spine_at_intersection_filter, #min_synapse_dist_to_branch_point_filter, ] ) # data_type = "default" # algorithms = None # modules_to_set = [gf] # def set_global_parameters_and_attributes_by_data_type(dt, # algorithms_list = None, # modules = None, # set_default_first = True, # verbose=False): # if modules is None: # modules = modules_to_set # modu.set_global_parameters_and_attributes_by_data_type(modules,dt, # algorithms=algorithms_list, # set_default_first = set_default_first, # verbose = verbose) # set_global_parameters_and_attributes_by_data_type(data_type, # algorithms) # def output_global_parameters_and_attributes_from_current_data_type( # modules = None, # algorithms = None, # verbose = True, # lowercase = True, # output_types = ("global_parameters"), # include_default = True, # algorithms_only = False, # **kwargs): # if modules is None: # modules = modules_to_set # return modu.output_global_parameters_and_attributes_from_current_data_type( # modules, # algorithms = algorithms, # verbose = verbose, # lowercase = lowercase, # output_types = output_types, # include_default = include_default, # algorithms_only = algorithms_only, # **kwargs, # ) #--- from neurd_packages --- from . import axon_utils as au from . import concept_network_utils as cnu from . import error_detection as ed from . import neuron_statistics as nst from . import neuron_utils as nru from . import neuron_visualizations as nviz min_double_back_threshold = ed.double_back_threshold_axon_thin min_double_back_threshold_inh = ed.double_back_threshold_axon_thick_inh #--- from datasci_tools --- from datasci_tools import dotmotif_utils as dmu from datasci_tools import general_utils as gu from datasci_tools import module_utils as modu from datasci_tools import networkx_utils as xu from datasci_tools import numpy_dep as np from . import graph_filters as gf