diff --git a/lnd/lnd.go b/lnd/lnd.go index 3d9c54f..ca895d4 100644 --- a/lnd/lnd.go +++ b/lnd/lnd.go @@ -7,6 +7,8 @@ import ( "github.com/neo4j/neo4j-go-driver/v4/neo4j" "log" "os" + "strings" + "time" ) func convertChannelIDToString(channelID uint64) string { @@ -27,15 +29,35 @@ func ConnectToLND() (*lndclient.GrpcLndServices, error) { } func writeNodesToMemgraph(session neo4j.Session, nodes []lndclient.Node) { - for _, node := range nodes { - query := "MERGE (n:node {pubkey: $pubKey, alias: $alias})" - params := map[string]interface{}{ - "pubKey": node.PubKey.String(), - "alias": node.Alias, + const batchSize = 100 + + for i := 0; i < len(nodes); i += batchSize { + end := i + batchSize + if end > len(nodes) { + end = len(nodes) } + batch := nodes[i:end] + + // Prepare the list of maps for UNWIND + records := make([]map[string]interface{}, 0, len(batch)) + for _, node := range batch { + records = append(records, map[string]interface{}{ + "pubKey": node.PubKey.String(), + "alias": node.Alias, + }) + } + + query := ` + UNWIND $rows AS row + MERGE (n:node {pubkey: row.pubKey}) + SET n.alias = row.alias + ` + + params := map[string]interface{}{"rows": records} + _, err := session.Run(query, params) if err != nil { - log.Printf("Failed to execute node query: %v", err) + log.Printf("Failed to execute batch node query: %v", err) } } } @@ -53,7 +75,7 @@ func createNodeIndex(session neo4j.Session) { func createIndexForChannels(session neo4j.Session) { // Query to create an index on the channel_id property of CHANNEL relationships - indexQuery := "CREATE INDEX ON :CHANNEL(channel_id)" + indexQuery := "CREATE INDEX ON :edge(channel_id)" // Execute the index creation query _, err := session.Run(indexQuery, nil) @@ -63,29 +85,67 @@ func createIndexForChannels(session neo4j.Session) { } func writeChannelsToMemgraph(session neo4j.Session, edges []lndclient.ChannelEdge) { + const batchSize = 100 + + relations := []map[string]interface{}{} + for _, edge := range edges { - chanID := convertChannelIDToString(edge.ChannelID) // Convert uint64 to string format + chanID := strings.Replace(convertChannelIDToString(edge.ChannelID), ":", "x", -1) + if edge.Node1Policy != nil { - writeChannelPolicyToMemgraph(session, &edge, edge.Node1Policy, edge.Node1.String(), edge.Node2.String(), chanID) + relations = append(relations, map[string]interface{}{ + "from": edge.Node1.String(), + "to": edge.Node2.String(), + "chan_id": chanID, + "capacity": edge.Capacity, + "fee_base": edge.Node1Policy.FeeBaseMsat, + "fee_rate": edge.Node1Policy.FeeRateMilliMsat, + "time_lock": edge.Node1Policy.TimeLockDelta, + "disabled": edge.Node1Policy.Disabled, + "min_htlc": edge.Node1Policy.MinHtlcMsat, + "max_htlc": edge.Node1Policy.MaxHtlcMsat, + }) } + if edge.Node2Policy != nil { - writeChannelPolicyToMemgraph(session, &edge, edge.Node2Policy, edge.Node2.String(), edge.Node1.String(), chanID) + relations = append(relations, map[string]interface{}{ + "from": edge.Node2.String(), + "to": edge.Node1.String(), + "chan_id": chanID, + "capacity": edge.Capacity, + "fee_base": edge.Node2Policy.FeeBaseMsat, + "fee_rate": edge.Node2Policy.FeeRateMilliMsat, + "time_lock": edge.Node2Policy.TimeLockDelta, + "disabled": edge.Node2Policy.Disabled, + "min_htlc": edge.Node2Policy.MinHtlcMsat, + "max_htlc": edge.Node2Policy.MaxHtlcMsat, + }) } } -} -func writeChannelPolicyToMemgraph(session neo4j.Session, edge *lndclient.ChannelEdge, policy *lndclient.RoutingPolicy, node1PubKey, node2PubKey, chanID string) { - if policy != nil { - query := fmt.Sprintf(` - MATCH (a:node {pubkey: '%s'}), (b:node {pubkey: '%s'}) - MERGE (a)-[r:CHANNEL {channel_id: '%s', capacity: %d}]->(b) - SET r.fee_base_msat = %d, r.fee_rate_milli_msat = %d, r.time_lock_delta = %d, - r.disabled = %v, r.min_htlc_msat = %d, r.max_htlc_msat = %d - `, node1PubKey, node2PubKey, chanID, edge.Capacity, - policy.FeeBaseMsat, policy.FeeRateMilliMsat, policy.TimeLockDelta, policy.Disabled, policy.MinHtlcMsat, policy.MaxHtlcMsat) - _, err := session.Run(query, nil) + for i := 0; i < len(relations); i += batchSize { + end := i + batchSize + if end > len(relations) { + end = len(relations) + } + + batch := relations[i:end] + query := ` + UNWIND $rows AS row + MATCH (a:node {pubkey: row.from}), (b:node {pubkey: row.to}) + MERGE (a)-[r:edge {channel_id: row.chan_id, capacity: row.capacity}]->(b) + SET r.fee_base_msat = row.fee_base, + r.fee_rate_milli_msat = row.fee_rate, + r.time_lock_delta = row.time_lock, + r.disabled = row.disabled, + r.min_htlc_msat = row.min_htlc, + r.max_htlc_msat = row.max_htlc + ` + + params := map[string]interface{}{"rows": batch} + _, err := session.Run(query, params) if err != nil { - log.Printf("Failed to execute channel policy query: %v", err) + log.Printf("Failed to execute batch channel policy query: %v", err) } } } @@ -93,8 +153,11 @@ func writeChannelPolicyToMemgraph(session neo4j.Session, edge *lndclient.Channel func PullGraph(lndServices *lndclient.GrpcLndServices) *lndclient.Graph { fmt.Println("Pulling graph...") - - graph, err := lndServices.Client.DescribeGraph(context.Background(), false) + duration := 10 * 60 * time.Second + _ctx := context.WithoutCancel(context.Background()) + ctx, cancel := context.WithTimeout(_ctx, duration) + defer cancel() + graph, err := lndServices.Client.DescribeGraph(ctx, false) if err != nil { log.Printf("Failed to execute channel policy query: %v", err) } @@ -102,6 +165,19 @@ func PullGraph(lndServices *lndclient.GrpcLndServices) *lndclient.Graph { } +func GetInfo(lndServices *lndclient.GrpcLndServices) { + fmt.Println("getting info...") + duration := time.Now().Add(10 * time.Minute) + ctx, cancel := context.WithDeadline(context.Background(), duration) + defer cancel() + info, err := lndServices.Client.GetInfo(ctx) + if err != nil { + log.Printf("Failed to execute get info query: %v", err) + } + log.Print(info) + //return info +} + func WriteGraphToMemgraph(graph *lndclient.Graph, neo4jDriver neo4j.Driver) { var err error session := neo4jDriver.NewSession(neo4j.SessionConfig{}) diff --git a/memgraph/memgraph.go b/memgraph/memgraph.go index 9a74ea6..b49b59d 100644 --- a/memgraph/memgraph.go +++ b/memgraph/memgraph.go @@ -40,11 +40,11 @@ func DropDatabase(neo4jDriver neo4j.Driver) { // Drop index on pub_key property _, err = session.Run("DROP INDEX ON :node(pubkey)", nil) if err != nil { - log.Printf("Failed to drop index on pub_key property: %v", err) + log.Printf("Failed to drop index on pubkey property: %v", err) } // Drop index on channel_id property - _, err = session.Run("DROP INDEX ON :CHANNEL(channel_id)", nil) + _, err = session.Run("DROP INDEX ON :edge(channel_id)", nil) if err != nil { log.Printf("Failed to drop index on channel_id property: %v", err) } @@ -63,7 +63,7 @@ func CommitQuery(driver neo4j.Driver, query string, params map[string]interface{ // ProcessNodeUpdate converts node updates to Memgraph queries func ProcessNodeUpdate(nodeUpdate lndclient.NodeUpdate) (string, map[string]interface{}) { - nodeQuery := "MERGE (n:Node {pub_key: $pubKey})\nSET n.alias = $alias" + nodeQuery := "MERGE (n:node {pubkey: $pubKey})\nSET n.alias = $alias" params := map[string]interface{}{ "pubKey": nodeUpdate.IdentityKey.String(), "alias": nodeUpdate.Alias, @@ -78,13 +78,13 @@ func ProcessEdgeUpdate(edgeUpdate lndclient.ChannelEdgeUpdate) (string, map[stri params map[string]interface{} ) if edgeUpdate.RoutingPolicy.Disabled { - edgeQuery = "MATCH ()-[r:CHANNEL {channel_id: $channelID}]->()\nWHERE r.disabled = true\nDELETE r" + edgeQuery = "MATCH ()-[r:edge {channel_id: $channelID}]->()\nset r.disabled = true" params = map[string]interface{}{ "channelID": edgeUpdate.ChannelID.String(), } } else { - edgeQuery = "MERGE (n1:Node {pub_key: $advertisingNode})\nMERGE (n2:Node {pub_key: $connectingNode})\n" + - "MERGE (n1)-[r:CHANNEL {channel_id: $channelID}]->(n2)\n" + + edgeQuery = "MERGE (n1:node {pubkey: $advertisingNode})\nMERGE (n2:node {pubkey: $connectingNode})\n" + + "MERGE (n1)-[r:edge {channel_id: $channelID}]->(n2)\n" + "SET r.fee_base_msat = $fee_base_msat, r.fee_rate_milli_msat = $fee_rate_milli_msat, r.time_lock_delta = $time_lock_delta, r.disabled = $disabled" params = map[string]interface{}{ "advertisingNode": edgeUpdate.AdvertisingNode.String(), @@ -102,7 +102,7 @@ func ProcessEdgeUpdate(edgeUpdate lndclient.ChannelEdgeUpdate) (string, map[stri // ProcessCloseUpdate converts channel close updates to Memgraph queries func ProcessCloseUpdate(closeUpdate lndclient.ChannelCloseUpdate) (string, map[string]interface{}) { - closeQuery := "MATCH ()-[r:CHANNEL {channel_id: $channelID}]->()\nWHERE r.disabled = true\nDELETE r" + closeQuery := "MATCH ()-[r:edge {channel_id: $channelID}]->()\nDELETE r" params := map[string]interface{}{ "channelID": closeUpdate.ChannelID.String(), }