Source code for neurd.cave_client_utils

try:
    import cloudvolume
    from caveclient import CAVEclient
except:
    print(f"Must install cloudvolume and caveclinet")
    print(f"pip3 install cloud-volume --upgrade")
    print("pip3 install caveclient --upgrade")
    
    
try:
    from code_structure_tools.decorators import default_args,add_func
except:
    print(f"Must install code_structure_utils")
    print(f"https://github.com/bacelii/code_structure_tools")
    
import numpy as np
import pandas as pd
from dotenv import load_dotenv
import os
from pathlib import Path
from datasci_tools import json_utils as jsu
import trimesh
    
""" tables in public release
'synapses_pni_2',
 'baylor_gnn_cell_type_fine_model_v2',
 'nucleus_alternative_points',
 'connectivity_groups_v507',
 'proofreading_status_public_release',
 'allen_column_mtypes_v1',
 'allen_v1_column_types_slanted_ref',
 'aibs_column_nonneuronal_ref',
 'nucleus_ref_neuron_svm',
 'aibs_soma_nuc_exc_mtype_preds_v117',
 'baylor_log_reg_cell_type_coarse_v1',
 'apl_functional_coreg_forward_v5',
 'nucleus_detection_v0',
 'aibs_soma_nuc_metamodel_preds_v117',
 'coregistration_manual_v3'
"""

voxel_to_nm_scaling = np.array([4,4,40])

release_tables = {
    "minnie65_public_v117": dict(
        neuron_non_neuron= 'nucleus_neuron_svm',
        synapse = 'synapses_pni_2',
        nucleus = "nucleus_detection_v0",
        release_info = "proofreading_status_public_release",
    ),
    "minnie65_public": dict(
        synapse = 'synapses_pni_2',
        nucleus = "nucleus_detection_v0",
        gnn_cell_type_fine = "baylor_gnn_cell_type_fine_model_v2",
        neuron_non_neuron="nucleus_ref_neuron_svm",
        cell_type = "allen_column_mtypes_v1",
        release_info = "proofreading_status_public_release",
    )
    
}

env_filepath = "./.env"
release_name = "minnie65_public"
cloudvolume_secrets_path = Path("/root/.cloudvolume/secrets")
#release_name = "minnie65_public"

[docs]def table_dict_from_release_name( release_name, return_default = True,): table_dict = release_tables.get( release_name, None, ) if return_default and table_dict is None: try: table_dict = release_tables[release_name] except: first_key = list(release_tables.keys())[0] table_dict = release_tables[first_key] return table_dict
[docs]def table_name_from_table_str(table_str): return f"{table_str}_table_name"
[docs]@default_args() def set_global_table_names( release_name = None, verbose = False ): table_dict = table_dict_from_release_name(release_name) for k,v in table_dict.items(): table_name = table_name_from_table_str(k) if verbose: print(f"Creating {table_name} = {v}") exec(f"global {table_name};{table_name}='{v}'",globals(),locals())
set_global_table_names()
[docs]def get_table(table_name,client): return client.materialize.query_table(table_name)
[docs]@default_args() def load_cave_token(env_filepath=None): try: load_dotenv(env_filepath) except Exception as e: print(f"{e}") return None cave_token = os.getenv('CAVE_TOKEN') return cave_token
[docs]@default_args() def save_cave_token_to_cloudvolume_secrets( cave_token, cloudvolume_secrets_path = None, **kwargs ): cloudvolume_secrets_path.mkdir(exist_ok = True) data = {"token": cave_token} json_file = jsu.dict_to_json_file( data, cloudvolume_secrets_path / Path("cave-secret.json") ) return json_file
[docs]@default_args() def set_cave_auth( client = None, cave_token=None, env_filepath = None, set_global_token = True, **kwargs ): if cave_token is None: cave_token = load_cave_token(env_filepath=env_filepath) if cave_token is None: return client.auth.token = cave_token if set_global_token: try: client.auth.save_token(token=cave_token) except: client.auth.save_token(token=cave_token,overwrite = True) save_cave_token_to_cloudvolume_secrets( cave_token=cave_token, **kwargs )
[docs]def release_name_from_release( release, prefix = "minnie65_public" ): return f'{prefix}_{release}'
[docs]def release_from_release_name(release_name): return int(release_name.split("_")[-1])
[docs]@default_args() def init_cave_client( release_name = None, env_filepath = None, cave_token = None, release = None, ): if release_name is None: release_name = release_name_from_release(release) client = CAVEclient(release_name) return client
[docs]def release_name_from_client(client): return client.datastack_name
# --------- functions for helping access client
[docs]def neuron_nucleus_df( client, neuron_non_neuron_table_name=None, verbose = False): nuc_df = client.materialize.query_table( neuron_non_neuron_table_name, filter_equal_dict={'cell_type':'neuron'}, filter_out_dict={'pt_root_id':[0]} ) if verbose: print(f"# of nuclei = {len(verbose)}") return pd.DataFrame(nuc_df)
[docs]def segment_ids_with_nucleus( client, neuron_non_neuron_table_name=None, verbose=False): nuc_df = neuron_nucleus_df( client, neuron_non_neuron_table_name=neuron_non_neuron_table_name, ) return_list = np.unique(nuc_df["pt_root_id"].to_numpy()) if verbose: print(f"# of segments with at least one nucleus: {len(return_list)}") return return_list
[docs]def table_size(self,client,table_name): return self.client.materialize.get_annotation_count( table_name )
# pick a segment you selected from neuroglancer to enter here
[docs]def presyn_df_from_seg_id( seg_id, client, verbose = False ): df = client.materialize.synapse_query(pre_ids=seg_id) if verbose: print(f"# of pre = {len(df)}") return pd.DataFrame(df)
[docs]def postsyn_df_from_seg_id( seg_id, client, verbose = False ): df = client.materialize.synapse_query(post_ids=seg_id) if verbose: print(f"# of post = {len(df)}") return pd.DataFrame(df)
[docs]@default_args() def prepost_syn_df_from_cave_syn_df( syn_df, seg_id, columns = ( "segment_id", "segment_id_secondary", "synapse_id", "prepost", "synapse_x", "synapse_y", "synapse_z", "synapse_size", ), voxel_to_nm_scaling = None, ): """ Purpose: Want to reformat the synapse dataframe from the CAVE table to the standard synapse format --- old columns --- "pre_pt_root_id" "post_pt_root_id" "size" "id" "prepost" --- new columns --- segment_id, segment_id_secondary, synapse_id, prepost, synapse_x, synapse_y, synapse_z, synapse_size, ctr_pt_position Pseudocode: For presyn/postsyn: 1) Restrict the dataframe to the current segment_id Example: from neurd import cave_client_utils as ccu ccu.prepost_syn_df_from_cave_syn_df( syn_df, seg_id=seg_id, ) """ new_dfs = [] syn_types = ["pre","post"] columns = list(columns) #print(f"Inside prepost syn voxel_to_nm_scaling = {voxel_to_nm_scaling}") size_scaling = np.prod(voxel_to_nm_scaling) for i in range(len(syn_types)): prepost = syn_types[i] other = syn_types[1-i] prepost_col = f"{prepost}_pt_root_id" other_col = f"{other}_pt_root_id" prepost_df = syn_df.query(f"({prepost_col} == {seg_id})").reset_index(drop=True) center_positions = np.vstack(prepost_df["ctr_pt_position"].to_numpy()) prepost_df["synapse_x"] = center_positions[:,0]*voxel_to_nm_scaling[0].astype('int') prepost_df["synapse_y"] = center_positions[:,1]*voxel_to_nm_scaling[1].astype('int') prepost_df["synapse_z"] = center_positions[:,2]*voxel_to_nm_scaling[2].astype('int') prepost_df["synapse_size"] = prepost_df["size"]*size_scaling.astype('int') prepost_df_renamed = pu.rename_columns( prepost_df, { prepost_col:"segment_id", other_col:'segment_id_secondary', "id":"synapse_id", }, ) prepost_df_renamed["prepost"] = prepost prepost_df_restr = prepost_df_renamed[columns] new_dfs.append(prepost_df_restr) new_syn_df = pd.concat(new_dfs,axis = 0).reset_index(drop=True) return new_syn_df
[docs]def pre_post_df_from_seg_id( seg_id, client, concat = True, verbose = False,): """ Example: seg_id = 864691137197197121 """ # inputs/outputs pre_df = presyn_df_from_seg_id( seg_id=seg_id, client = client) post_df = postsyn_df_from_seg_id( seg_id=seg_id, client = client) if verbose: print(f"# of pre = {len(pre_df)}") print(f"# of post = {len(post_df)}") if concat: return pd.concat([pre_df,post_df],axis=0).reset_index(drop=True) return pre_df,post_df
[docs]@default_args() def synapse_df_from_seg_id( seg_id, client, verbose = False, voxel_to_nm_scaling=None, ): #print(f"voxel_to_nm_scaling inside synapse_df_from_seg_id = {voxel_to_nm_scaling}") new_syn_df = pre_post_df_from_seg_id( seg_id, client, verbose = verbose, ) new_syn_df=prepost_syn_df_from_cave_syn_df( new_syn_df, seg_id=seg_id, voxel_to_nm_scaling = voxel_to_nm_scaling, ) return new_syn_df
[docs]def get_tables(client): return client.materialize.get_tables()
[docs]def mesh_from_seg_id( seg_id, client, use_https = True, progress=False, return_trimesh = True, verbose = False): """ Purpose: To fetch a mesh from the cave table using cloudvolume example_cell_id = 864691137197197121 """ # to access the minnie65 public release dataset # you initialize the client like this cv = cloudvolume.CloudVolume( client.info.segmentation_source(), progress=progress, use_https = use_https, ) # which, given a root_id, can be used to get a mesh # cloud volume returns a dictionary with the neuron segment id as the key # and the mesh as the value mesh = cv.mesh.get(seg_id)[seg_id] if return_trimesh: mesh = trimesh.Trimesh( vertices=mesh.vertices, faces=mesh.faces, ) return mesh
# ------------------- wrapper for interface -------
[docs]@add_func( func_to_add_unpacked = [ neuron_nucleus_df, segment_ids_with_nucleus, set_cave_auth, load_cave_token, table_size, presyn_df_from_seg_id, postsyn_df_from_seg_id, pre_post_df_from_seg_id, get_tables, mesh_from_seg_id, synapse_df_from_seg_id, mesh_from_seg_id, get_table, ], ) class CaveInterface(): @default_args() def __init__( self, release_name = None, env_filepath = None, cave_token = None, client = None, release = None, ): if client is None: client = init_cave_client( release = release, env_filepath = env_filepath, release_name = release_name, cave_token = cave_token, ) self.client = client self.release_name = release_name_from_client(client) #self.release = release_from_release_name(self.release_name) set_cave_auth( self.client, cave_token=cave_token, env_filepath = env_filepath ) self.set_table_names() def set_table_names(self,table_dict=None): if table_dict is None: table_dict = table_dict_from_release_name( self.release_name ) for k,v in table_dict.items(): setattr( self, table_name_from_table_str(k), v ) @property def release_info(self): return self.client.materialize.get_table_metadata( self.release_info_table_name ) @property def voxel_to_nm_scaling(self): return np.array( self.release_info['voxel_resolution'] ) def __getattr__(self,k): if k[:2] == "__": raise AttributeError(k) try: return getattr(self.client,k) except: return self.__getattribute__(k)
from datasci_tools import pandas_utils as pu import numpy as np import pandas as pd