neo4j_uploader
1from neo4j_uploader._logger import ModuleLogger 2from neo4j_uploader._queries import specification_queries 3from neo4j_uploader._n4j import reset, upload_query, validate_credentials 4from neo4j_uploader._upload_utils import upload_nodes, upload_relationships 5from neo4j_uploader.models import UploadResult, Neo4jConfig, GraphData 6from neo4j_uploader.errors import InvalidCredentialsError, InvalidPayloadError 7from neo4j_uploader._conversions import convert_legacy_node_records, convert_legacy_relationship_records 8from timeit import default_timer as timer 9from warnings import warn 10import json 11 12# Specify Google doctstring type for pdoc auto doc generation 13__docformat__ = "google" 14 15def start_logging(): 16 """ 17 Enables logging from this module. Log level matches the existing log level of the calling module. 18 """ 19 logger = ModuleLogger() 20 logger.is_enabled = True 21 logger.info("Neo4j Uploader logging enabled") 22 23def stop_logging(): 24 """ 25 Surpresses logging from this module. 26 """ 27 ModuleLogger().info(f'Discontinuing logging') 28 ModuleLogger().is_enabled = False 29 30 31def batch_upload( 32 config: dict | Neo4jConfig, 33 data : dict | GraphData, 34 ) -> UploadResult: 35 """Uploads a dictionary containing nodes, relationships, and target Neo4j database information. The schema for nodes and relationships is more flexible and comprehensive than the schema for the earlier upload function. 36 37 Args: 38 config (dict or Neo4jConfig): A Neo4jConfig object or dict that can be converted to a Neo4jConfig object for defining target Neo4j database and credentials for upload. 39 data (dict or GraphData): A GraphData object or a dict that can be converted to a GraphData object with specifications for nodes and relationships to upload 40 41 42 Returns: 43 UploadResult: Result object containing information regarding a successful or unsuccessful upload. 44 45 Raises: 46 neo4j.exceptions: A Neo4j exception if credentials are invalid or database can not be accessed. 47 InvalidCredentialsError: If credentials are missing or malformed. 48 InvalidPayloadError: If payload schema is missing or unsupported. 49 """ 50 51 try: 52 cdata = Neo4jConfig.model_validate(config) 53 except Exception as e: 54 raise InvalidCredentialsError(e) 55 56 # Will raise a neo4j.exception if credentials failed or database can not be accessed 57 validate_credentials((cdata.neo4j_uri, cdata.neo4j_user, cdata.neo4j_password)) 58 59 try: 60 gdata = GraphData.model_validate(data) 61 except Exception as e: 62 raise InvalidPayloadError(e) 63 64 65 # Start clock for tracking processing time 66 start = timer() 67 total_nodes_created = 0 68 total_relationships_created = 0 69 total_properties_set = 0 70 71 # Get list of tuples containing queries and accompanying params for driver execution 72 query_params = specification_queries(gdata.nodes, cdata) 73 query_params.extend(specification_queries(gdata.relationships, cdata)) 74 75 for qp in query_params: 76 # Run queries and retrieve summary of upload 77 summary = upload_query( 78 creds = (cdata.neo4j_uri, cdata.neo4j_user, cdata.neo4j_password), 79 query = qp[0], 80 params = qp[1], 81 database = cdata.neo4j_database 82 ) 83 84 # Sample summary result 85 # {'metadata': {'query': '<query>', 'parameters': {}, 'query_type': 'w', 'plan': None, 'profile': None, 'notifications': None, 'counters': {'_contains_updates': True, 'labels_added': 17, 'nodes_created': 17, 'properties_set': 78}, 'result_available_after': 73, 'result_consumed_after': 0} 86 87 # {'metadata': {'query': "<rel_upload_query>", 'parameters': {}, 'query_type': 'w', 'plan': None, 'profile': None, 'notifications': None, 'counters': {'_contains_updates': True, 'relationships_created': 1, 'properties_set': 2}, 'result_available_after': 209, 'result_consumed_after': 0} 88 89 # Sum up total nodes, relationshipts and props set 90 try: 91 props = summary.counters.properties_set 92 total_properties_set += props 93 except Exception as _: 94 ModuleLogger().debug('No properties set in summary: {summary}') 95 96 try: 97 nodes = summary.counters.nodes_created 98 total_nodes_created += nodes 99 except Exception as _: 100 pass 101 102 try: 103 relationships = summary.counters.relationships_created 104 total_relationships_created += relationships 105 except Exception as _: 106 pass 107 108 stop = timer() 109 time_to_complete = round((stop - start), 4) 110 111 return UploadResult( 112 was_successful = True, 113 error_message = None, 114 seconds_to_complete = time_to_complete, 115 nodes_created = total_nodes_created, 116 relationships_created = total_relationships_created, 117 properties_set = total_properties_set 118 ) 119 120def upload( 121 neo4j_creds:(str, str, str), 122 data: str | dict, 123 node_key : str = "_uid", 124 dedupe_nodes : bool = True, 125 dedupe_relationships : bool = True, 126 should_overwrite: bool = False, 127 database_name: str = 'neo4j', 128 max_batch_size: int = 500, 129 )-> (float, int, int, int): 130 """ 131 Uploads a dictionary of simple node and relationship records to a target Neo4j instance specified in the arguments. 132 133 Args: 134 neo4j_creds: Tuple containing the hostname, username, password, and optionally a database name of the target Neo4j instance. The host name should contain only the database name and not the protocol. For example, if the host name is 'neo4j+s://<unique_db_id>.databases.neo4j.io', the host string to use is '<unique_db_id>.databases.neo4j.io'. The default database name is 'neo4j'. 135 136 data: A .json string or dictionary of records to upload. The dictionary keys must contain a 'nodes' and 'relationships' key. The value of which should be a list of dictionaries, each of these dictionaries contain the property keys and values for the nodes and relationships to be uploaded, respectively. 137 138 node_key: The key in the dictionary that contains the unique identifier for the node. Relationship generation will also use this to find the from and to Nodes it connects to. Default is '_uid'. 139 140 dedupe_nodes: Should nodes only be created once. False means a new node will always be created. True means if an existing node exists, only the properties will be updated. Default True. 141 142 dedupe_relationships: Should relationships only create 1 of a given relationship between the same from and to node. False means a new relationship will always be created. True means if an existing relationship exists between the target nodes, only the properties will be updated. If no prior relationship, a new one will be created. Default True. 143 144 should_overwrite: A boolean indicating whether the upload should overwrite existing data. If set to True, the upload will delete all existing nodes and relationships before uploading. Default is False. 145 146 database_name: String name of target Neo4j database. 147 148 max_batch_size: Integer maximum number of nodes or relationships to upload in a single Cypher batch. Default 500. 149 150 Returns: 151 Tuple of result data: float of time to complete, int of nodes created, int of relationships created, int of total node and relationship properties set. 152 153 Raises: 154 Exceptions if data is not in the correct format or if the upload ungracefully fails. 155 """ 156 157 # Convert to dictionary if data is string 158 if isinstance(data, str) is True: 159 try: 160 data = json.loads(data) 161 except Exception as e: 162 raise Exception(f'Input data string not a valid JSON format: {e}') 163 164 if data is None or len(data) == 0: 165 raise Exception(f'data payload is empty or an invalid format') 166 167 simple_nodes = data.get('nodes', None) 168 simple_rels = data.get('relationships', None) 169 170 nodes = convert_legacy_node_records(simple_nodes, dedupe_nodes, node_key) 171 172 rels = convert_legacy_relationship_records(simple_rels, dedupe_relationships, node_key) 173 174 uri, user, password = neo4j_creds 175 176 config = Neo4jConfig( 177 neo4j_uri = uri, 178 neo4j_user = user, 179 neo4j_password = password, 180 neo4j_database = database_name, 181 max_batch_size = max_batch_size, 182 overwrite = should_overwrite 183 ) 184 185 return batch_upload( 186 config = config, 187 data = { 188 "nodes": nodes, 189 "relationships": rels 190 } 191 ) 192 193def clear_db(creds: (str, str, str), database: str): 194 """Deletes all existing nodes and relationships in a target Neo4j database. 195 196 Args: 197 creds (str, str, str): Neo4j URI, username, and password. 198 database (str): Target Neo4j database. 199 200 Returns: 201 summary (neo4j.ResultSummary): Result summary of the operation. See https://neo4j.com/docs/api/python-driver/current/api.html#resultsummary for more info. 202 """ 203 return reset(creds, database)
16def start_logging(): 17 """ 18 Enables logging from this module. Log level matches the existing log level of the calling module. 19 """ 20 logger = ModuleLogger() 21 logger.is_enabled = True 22 logger.info("Neo4j Uploader logging enabled")
Enables logging from this module. Log level matches the existing log level of the calling module.
24def stop_logging(): 25 """ 26 Surpresses logging from this module. 27 """ 28 ModuleLogger().info(f'Discontinuing logging') 29 ModuleLogger().is_enabled = False
Surpresses logging from this module.
32def batch_upload( 33 config: dict | Neo4jConfig, 34 data : dict | GraphData, 35 ) -> UploadResult: 36 """Uploads a dictionary containing nodes, relationships, and target Neo4j database information. The schema for nodes and relationships is more flexible and comprehensive than the schema for the earlier upload function. 37 38 Args: 39 config (dict or Neo4jConfig): A Neo4jConfig object or dict that can be converted to a Neo4jConfig object for defining target Neo4j database and credentials for upload. 40 data (dict or GraphData): A GraphData object or a dict that can be converted to a GraphData object with specifications for nodes and relationships to upload 41 42 43 Returns: 44 UploadResult: Result object containing information regarding a successful or unsuccessful upload. 45 46 Raises: 47 neo4j.exceptions: A Neo4j exception if credentials are invalid or database can not be accessed. 48 InvalidCredentialsError: If credentials are missing or malformed. 49 InvalidPayloadError: If payload schema is missing or unsupported. 50 """ 51 52 try: 53 cdata = Neo4jConfig.model_validate(config) 54 except Exception as e: 55 raise InvalidCredentialsError(e) 56 57 # Will raise a neo4j.exception if credentials failed or database can not be accessed 58 validate_credentials((cdata.neo4j_uri, cdata.neo4j_user, cdata.neo4j_password)) 59 60 try: 61 gdata = GraphData.model_validate(data) 62 except Exception as e: 63 raise InvalidPayloadError(e) 64 65 66 # Start clock for tracking processing time 67 start = timer() 68 total_nodes_created = 0 69 total_relationships_created = 0 70 total_properties_set = 0 71 72 # Get list of tuples containing queries and accompanying params for driver execution 73 query_params = specification_queries(gdata.nodes, cdata) 74 query_params.extend(specification_queries(gdata.relationships, cdata)) 75 76 for qp in query_params: 77 # Run queries and retrieve summary of upload 78 summary = upload_query( 79 creds = (cdata.neo4j_uri, cdata.neo4j_user, cdata.neo4j_password), 80 query = qp[0], 81 params = qp[1], 82 database = cdata.neo4j_database 83 ) 84 85 # Sample summary result 86 # {'metadata': {'query': '<query>', 'parameters': {}, 'query_type': 'w', 'plan': None, 'profile': None, 'notifications': None, 'counters': {'_contains_updates': True, 'labels_added': 17, 'nodes_created': 17, 'properties_set': 78}, 'result_available_after': 73, 'result_consumed_after': 0} 87 88 # {'metadata': {'query': "<rel_upload_query>", 'parameters': {}, 'query_type': 'w', 'plan': None, 'profile': None, 'notifications': None, 'counters': {'_contains_updates': True, 'relationships_created': 1, 'properties_set': 2}, 'result_available_after': 209, 'result_consumed_after': 0} 89 90 # Sum up total nodes, relationshipts and props set 91 try: 92 props = summary.counters.properties_set 93 total_properties_set += props 94 except Exception as _: 95 ModuleLogger().debug('No properties set in summary: {summary}') 96 97 try: 98 nodes = summary.counters.nodes_created 99 total_nodes_created += nodes 100 except Exception as _: 101 pass 102 103 try: 104 relationships = summary.counters.relationships_created 105 total_relationships_created += relationships 106 except Exception as _: 107 pass 108 109 stop = timer() 110 time_to_complete = round((stop - start), 4) 111 112 return UploadResult( 113 was_successful = True, 114 error_message = None, 115 seconds_to_complete = time_to_complete, 116 nodes_created = total_nodes_created, 117 relationships_created = total_relationships_created, 118 properties_set = total_properties_set 119 )
Uploads a dictionary containing nodes, relationships, and target Neo4j database information. The schema for nodes and relationships is more flexible and comprehensive than the schema for the earlier upload function.
Arguments:
- config (dict or Neo4jConfig): A Neo4jConfig object or dict that can be converted to a Neo4jConfig object for defining target Neo4j database and credentials for upload.
- data (dict or GraphData): A GraphData object or a dict that can be converted to a GraphData object with specifications for nodes and relationships to upload
Returns:
UploadResult: Result object containing information regarding a successful or unsuccessful upload.
Raises:
- neo4j.exceptions: A Neo4j exception if credentials are invalid or database can not be accessed.
- InvalidCredentialsError: If credentials are missing or malformed.
- InvalidPayloadError: If payload schema is missing or unsupported.
121def upload( 122 neo4j_creds:(str, str, str), 123 data: str | dict, 124 node_key : str = "_uid", 125 dedupe_nodes : bool = True, 126 dedupe_relationships : bool = True, 127 should_overwrite: bool = False, 128 database_name: str = 'neo4j', 129 max_batch_size: int = 500, 130 )-> (float, int, int, int): 131 """ 132 Uploads a dictionary of simple node and relationship records to a target Neo4j instance specified in the arguments. 133 134 Args: 135 neo4j_creds: Tuple containing the hostname, username, password, and optionally a database name of the target Neo4j instance. The host name should contain only the database name and not the protocol. For example, if the host name is 'neo4j+s://<unique_db_id>.databases.neo4j.io', the host string to use is '<unique_db_id>.databases.neo4j.io'. The default database name is 'neo4j'. 136 137 data: A .json string or dictionary of records to upload. The dictionary keys must contain a 'nodes' and 'relationships' key. The value of which should be a list of dictionaries, each of these dictionaries contain the property keys and values for the nodes and relationships to be uploaded, respectively. 138 139 node_key: The key in the dictionary that contains the unique identifier for the node. Relationship generation will also use this to find the from and to Nodes it connects to. Default is '_uid'. 140 141 dedupe_nodes: Should nodes only be created once. False means a new node will always be created. True means if an existing node exists, only the properties will be updated. Default True. 142 143 dedupe_relationships: Should relationships only create 1 of a given relationship between the same from and to node. False means a new relationship will always be created. True means if an existing relationship exists between the target nodes, only the properties will be updated. If no prior relationship, a new one will be created. Default True. 144 145 should_overwrite: A boolean indicating whether the upload should overwrite existing data. If set to True, the upload will delete all existing nodes and relationships before uploading. Default is False. 146 147 database_name: String name of target Neo4j database. 148 149 max_batch_size: Integer maximum number of nodes or relationships to upload in a single Cypher batch. Default 500. 150 151 Returns: 152 Tuple of result data: float of time to complete, int of nodes created, int of relationships created, int of total node and relationship properties set. 153 154 Raises: 155 Exceptions if data is not in the correct format or if the upload ungracefully fails. 156 """ 157 158 # Convert to dictionary if data is string 159 if isinstance(data, str) is True: 160 try: 161 data = json.loads(data) 162 except Exception as e: 163 raise Exception(f'Input data string not a valid JSON format: {e}') 164 165 if data is None or len(data) == 0: 166 raise Exception(f'data payload is empty or an invalid format') 167 168 simple_nodes = data.get('nodes', None) 169 simple_rels = data.get('relationships', None) 170 171 nodes = convert_legacy_node_records(simple_nodes, dedupe_nodes, node_key) 172 173 rels = convert_legacy_relationship_records(simple_rels, dedupe_relationships, node_key) 174 175 uri, user, password = neo4j_creds 176 177 config = Neo4jConfig( 178 neo4j_uri = uri, 179 neo4j_user = user, 180 neo4j_password = password, 181 neo4j_database = database_name, 182 max_batch_size = max_batch_size, 183 overwrite = should_overwrite 184 ) 185 186 return batch_upload( 187 config = config, 188 data = { 189 "nodes": nodes, 190 "relationships": rels 191 } 192 )
Uploads a dictionary of simple node and relationship records to a target Neo4j instance specified in the arguments.
Arguments:
- neo4j_creds: Tuple containing the hostname, username, password, and optionally a database name of the target Neo4j instance. The host name should contain only the database name and not the protocol. For example, if the host name is 'neo4j+s://
.databases.neo4j.io', the host string to use is ' .databases.neo4j.io'. The default database name is 'neo4j'. - data: A .json string or dictionary of records to upload. The dictionary keys must contain a 'nodes' and 'relationships' key. The value of which should be a list of dictionaries, each of these dictionaries contain the property keys and values for the nodes and relationships to be uploaded, respectively.
- node_key: The key in the dictionary that contains the unique identifier for the node. Relationship generation will also use this to find the from and to Nodes it connects to. Default is '_uid'.
- dedupe_nodes: Should nodes only be created once. False means a new node will always be created. True means if an existing node exists, only the properties will be updated. Default True.
- dedupe_relationships: Should relationships only create 1 of a given relationship between the same from and to node. False means a new relationship will always be created. True means if an existing relationship exists between the target nodes, only the properties will be updated. If no prior relationship, a new one will be created. Default True.
- should_overwrite: A boolean indicating whether the upload should overwrite existing data. If set to True, the upload will delete all existing nodes and relationships before uploading. Default is False.
- database_name: String name of target Neo4j database.
- max_batch_size: Integer maximum number of nodes or relationships to upload in a single Cypher batch. Default 500.
Returns:
Tuple of result data: float of time to complete, int of nodes created, int of relationships created, int of total node and relationship properties set.
Raises:
- Exceptions if data is not in the correct format or if the upload ungracefully fails.
194def clear_db(creds: (str, str, str), database: str): 195 """Deletes all existing nodes and relationships in a target Neo4j database. 196 197 Args: 198 creds (str, str, str): Neo4j URI, username, and password. 199 database (str): Target Neo4j database. 200 201 Returns: 202 summary (neo4j.ResultSummary): Result summary of the operation. See https://neo4j.com/docs/api/python-driver/current/api.html#resultsummary for more info. 203 """ 204 return reset(creds, database)
Deletes all existing nodes and relationships in a target Neo4j database.
Arguments:
- creds (str, str, str): Neo4j URI, username, and password.
- database (str): Target Neo4j database.
Returns:
summary (neo4j.ResultSummary): Result summary of the operation. See https://neo4j.com/docs/api/python-driver/current/api.html#resultsummary for more info.