Skip to content

数据导入与导出

CSV 数据导入

基本语法

cypher
// 导入节点
LOAD CSV FROM 'file:///nodes.csv' AS line
CREATE (n:Label {property1: line[0], property2: line[1]})

// 导入关系
LOAD CSV FROM 'file:///relationships.csv' AS line
MATCH (a:Label1 {id: line[0]}), (b:Label2 {id: line[1]})
CREATE (a)-[:RELATIONSHIP_TYPE]->(b)

示例

  1. 导入节点

    cypher
    // 导入 Person 节点
    LOAD CSV WITH HEADERS FROM 'file:///persons.csv' AS row
    CREATE (p:Person {
      id: toInteger(row.id),
      name: row.name,
      age: toInteger(row.age),
      email: row.email
    })
  2. 导入关系

    cypher
    // 导入 FRIENDS_WITH 关系
    LOAD CSV WITH HEADERS FROM 'file:///friendships.csv' AS row
    MATCH (a:Person {id: toInteger(row.from_id)}), (b:Person {id: toInteger(row.to_id)})
    CREATE (a)-[:FRIENDS_WITH {since: toInteger(row.since)}]->(b)
  3. 使用批量导入

    cypher
    // 批量导入节点
    USING PERIODIC COMMIT 1000
    LOAD CSV WITH HEADERS FROM 'file:///persons.csv' AS row
    CREATE (p:Person {
      id: toInteger(row.id),
      name: row.name,
      age: toInteger(row.age)
    })
  4. 使用 MERGE 避免重复

    cypher
    // 使用 MERGE 导入节点
    LOAD CSV WITH HEADERS FROM 'file:///persons.csv' AS row
    MERGE (p:Person {id: toInteger(row.id)})
    ON CREATE SET p.name = row.name, p.age = toInteger(row.age)
    ON MATCH SET p.name = row.name, p.age = toInteger(row.age)

CSV 文件格式

节点 CSV 文件示例 (persons.csv):

csv
id,name,age,email
1,John,30,john@example.com
2,Alice,28,alice@example.com
3,Bob,32,bob@example.com

关系 CSV 文件示例 (friendships.csv):

csv
from_id,to_id,since
1,2,2010
1,3,2012
2,3,2011

JSON 数据导入

基本语法

cypher
// 使用 APOC 插件导入 JSON
CALL apoc.load.json('file:///data.json') YIELD value
CREATE (n:Label) SET n = value

示例

  1. 导入 JSON 数据

    cypher
    // 导入 JSON 格式的节点数据
    CALL apoc.load.json('file:///persons.json') YIELD value
    CREATE (p:Person) SET p = value
  2. 导入嵌套 JSON 数据

    cypher
    // 导入嵌套 JSON 数据
    CALL apoc.load.json('file:///data.json') YIELD value
    UNWIND value.persons AS person
    CREATE (p:Person) SET p = person
  3. 使用参数导入

    cypher
    // 使用参数导入 JSON
    WITH $jsonData AS data
    UNWIND data.persons AS person
    CREATE (p:Person) SET p = person

JSON 文件格式

示例 (persons.json):

json
{
  "persons": [
    {
      "id": 1,
      "name": "John",
      "age": 30,
      "email": "john@example.com"
    },
    {
      "id": 2,
      "name": "Alice",
      "age": 28,
      "email": "alice@example.com"
    }
  ]
}

数据导出

导出为 CSV

cypher
// 导出节点为 CSV
MATCH (p:Person)
WITH collect({id: p.id, name: p.name, age: p.age}) AS persons
CALL apoc.export.csv.data(persons, 'persons.csv', {})
YIELD file, nodes, relationships, properties
RETURN file, nodes, relationships, properties

// 导出关系为 CSV
MATCH (a:Person)-[r:FRIENDS_WITH]->(b:Person)
WITH collect({from_id: a.id, to_id: b.id, since: r.since}) AS relationships
CALL apoc.export.csv.data(relationships, 'friendships.csv', {})
YIELD file, nodes, relationships, properties
RETURN file, nodes, relationships, properties

导出为 JSON

cypher
// 导出节点为 JSON
MATCH (p:Person)
WITH collect(p) AS persons
CALL apoc.export.json.data(persons, 'persons.json', {})
YIELD file, nodes, relationships, properties
RETURN file, nodes, relationships, properties

// 导出关系为 JSON
MATCH (a:Person)-[r:FRIENDS_WITH]->(b:Person)
WITH collect(r) AS relationships
CALL apoc.export.json.data(relationships, 'friendships.json', {})
YIELD file, nodes, relationships, properties
RETURN file, nodes, relationships, properties

导出整个数据库

cypher
// 导出整个数据库为 CSV
CALL apoc.export.csv.all('database.csv', {})
YIELD file, nodes, relationships, properties
RETURN file, nodes, relationships, properties

// 导出整个数据库为 JSON
CALL apoc.export.json.all('database.json', {})
YIELD file, nodes, relationships, properties
RETURN file, nodes, relationships, properties

批量操作优化

1. 使用 PERIODIC COMMIT

cypher
// 每 1000 行提交一次
USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM 'file:///large_file.csv' AS row
CREATE (n:Label) SET n = row

2. 使用参数批量导入

cypher
// 使用参数批量导入
UNWIND $batch AS item
CREATE (n:Label) SET n = item

3. 并行导入

  • 使用多个导入语句并行执行
  • 合理分配导入任务
  • 监控导入进度

4. 导入性能优化

  • 禁用索引和约束,导入后再启用
  • 使用批量加载工具(如 neo4j-admin import)
  • 调整内存设置
  • 优化 CSV 文件格式

示例:使用 neo4j-admin import 工具

命令行导入

bash
neo4j-admin import --database=neo4j --nodes=import/persons.csv --relationships=import/friendships.csv

配置文件格式

persons.csv 头部:

csv
:ID, name, age:int, email

friendships.csv 头部:

csv
:START_ID, :END_ID, :TYPE, since:int

小结

数据导入与导出是 Neo4j 数据管理的重要功能,通过 CSV 和 JSON 等格式,可以方便地与其他系统进行数据交换。在实际应用中,需要根据数据量和性能要求,选择合适的导入导出方法,并进行必要的优化。

在接下来的章节中,我们将介绍图算法,这是 Neo4j 的强大功能之一,可以用于解决各种复杂的图分析问题。