diff --git a/lnd/lnd.go b/lnd/lnd.go index e808c64..3d9c54f 100644 --- a/lnd/lnd.go +++ b/lnd/lnd.go @@ -28,7 +28,7 @@ func ConnectToLND() (*lndclient.GrpcLndServices, error) { func writeNodesToMemgraph(session neo4j.Session, nodes []lndclient.Node) { for _, node := range nodes { - query := "MERGE (n:Node {pub_key: $pubKey, alias: $alias})" + query := "MERGE (n:node {pubkey: $pubKey, alias: $alias})" params := map[string]interface{}{ "pubKey": node.PubKey.String(), "alias": node.Alias, @@ -40,13 +40,35 @@ func writeNodesToMemgraph(session neo4j.Session, nodes []lndclient.Node) { } } +func createNodeIndex(session neo4j.Session) { + // Query to create an index on the pubkey property of Node + indexQuery := "CREATE INDEX ON :node(pubkey)" + + // Execute the index creation query + _, err := session.Run(indexQuery, nil) + if err != nil { + log.Printf("Failed to create index: %v", err) + } +} + +func createIndexForChannels(session neo4j.Session) { + // Query to create an index on the channel_id property of CHANNEL relationships + indexQuery := "CREATE INDEX ON :CHANNEL(channel_id)" + + // Execute the index creation query + _, err := session.Run(indexQuery, nil) + if err != nil { + log.Printf("Failed to create index for channels: %v", err) + } +} + func writeChannelsToMemgraph(session neo4j.Session, edges []lndclient.ChannelEdge) { for _, edge := range edges { chanID := convertChannelIDToString(edge.ChannelID) // Convert uint64 to string format - if edge.Node1Policy != nil && !edge.Node1Policy.Disabled { + if edge.Node1Policy != nil { writeChannelPolicyToMemgraph(session, &edge, edge.Node1Policy, edge.Node1.String(), edge.Node2.String(), chanID) } - if edge.Node2Policy != nil && !edge.Node2Policy.Disabled { + if edge.Node2Policy != nil { writeChannelPolicyToMemgraph(session, &edge, edge.Node2Policy, edge.Node2.String(), edge.Node1.String(), chanID) } } @@ -55,11 +77,12 @@ func writeChannelsToMemgraph(session neo4j.Session, edges []lndclient.ChannelEdg func writeChannelPolicyToMemgraph(session neo4j.Session, edge *lndclient.ChannelEdge, policy *lndclient.RoutingPolicy, node1PubKey, node2PubKey, chanID string) { if policy != nil { query := fmt.Sprintf(` - MATCH (a:Node {pub_key: '%s'}), (b:Node {pub_key: '%s'}) + MATCH (a:node {pubkey: '%s'}), (b:node {pubkey: '%s'}) MERGE (a)-[r:CHANNEL {channel_id: '%s', capacity: %d}]->(b) - SET r.fee_base_msat = %d, r.fee_rate_milli_msat = %d, r.time_lock_delta = %d, r.disabled = %v + SET r.fee_base_msat = %d, r.fee_rate_milli_msat = %d, r.time_lock_delta = %d, + r.disabled = %v, r.min_htlc_msat = %d, r.max_htlc_msat = %d `, node1PubKey, node2PubKey, chanID, edge.Capacity, - policy.FeeBaseMsat, policy.FeeRateMilliMsat, policy.TimeLockDelta, policy.Disabled) + policy.FeeBaseMsat, policy.FeeRateMilliMsat, policy.TimeLockDelta, policy.Disabled, policy.MinHtlcMsat, policy.MaxHtlcMsat) _, err := session.Run(query, nil) if err != nil { log.Printf("Failed to execute channel policy query: %v", err) @@ -67,15 +90,30 @@ func writeChannelPolicyToMemgraph(session neo4j.Session, edge *lndclient.Channel } } -func WriteGraphToMemgraph(lndServices *lndclient.GrpcLndServices, neo4jDriver neo4j.Driver) { - session := neo4jDriver.NewSession(neo4j.SessionConfig{}) - defer session.Close() +func PullGraph(lndServices *lndclient.GrpcLndServices) *lndclient.Graph { + + fmt.Println("Pulling graph...") graph, err := lndServices.Client.DescribeGraph(context.Background(), false) + if err != nil { + log.Printf("Failed to execute channel policy query: %v", err) + } + return graph + +} + +func WriteGraphToMemgraph(graph *lndclient.Graph, neo4jDriver neo4j.Driver) { + var err error + session := neo4jDriver.NewSession(neo4j.SessionConfig{}) + defer session.Close() if err != nil { log.Fatalf("Failed to retrieve graph: %v", err) } + fmt.Println("Writing to Memgraph...") + createNodeIndex(session) + createIndexForChannels(session) writeNodesToMemgraph(session, graph.Nodes) writeChannelsToMemgraph(session, graph.Edges) + fmt.Println("Finished writing to Memgraph...") } diff --git a/memgraph/memgraph.go b/memgraph/memgraph.go index 28725ed..9a74ea6 100644 --- a/memgraph/memgraph.go +++ b/memgraph/memgraph.go @@ -29,11 +29,24 @@ func CloseDriver(driver neo4j.Driver) { // DropDatabase drops all nodes/relationships from the database. func DropDatabase(neo4jDriver neo4j.Driver) { + fmt.Println("Dropping database...") session := neo4jDriver.NewSession(neo4j.SessionConfig{}) defer session.Close() _, err := session.Run("MATCH (n) DETACH DELETE n", nil) if err != nil { - log.Printf("Failed to create index on pub_key property: %v", err) + log.Printf("Failed to drop database: %v", err) + } + + // Drop index on pub_key property + _, err = session.Run("DROP INDEX ON :node(pubkey)", nil) + if err != nil { + log.Printf("Failed to drop index on pub_key property: %v", err) + } + + // Drop index on channel_id property + _, err = session.Run("DROP INDEX ON :CHANNEL(channel_id)", nil) + if err != nil { + log.Printf("Failed to drop index on channel_id property: %v", err) } } diff --git a/routes/routes.go b/routes/routes.go index 60b079d..c56e457 100644 --- a/routes/routes.go +++ b/routes/routes.go @@ -54,9 +54,10 @@ func ResetGraphHandler(c *gin.Context) { } memgraph.DropDatabase(Driver) - lnd.WriteGraphToMemgraph(LndServices, Driver) + graph := lnd.PullGraph(LndServices) + lnd.WriteGraphToMemgraph(graph, Driver) - IsRoutineRunning = true + //IsRoutineRunning = true StopChannel = make(chan bool) //go SubscribeToGraphUpdates()