batching supa fast

This commit is contained in:
davisv7 2025-08-23 16:06:26 -04:00
parent bd45671e1d
commit bc74ef9cbd
2 changed files with 107 additions and 31 deletions

View file

@ -7,6 +7,8 @@ import (
"github.com/neo4j/neo4j-go-driver/v4/neo4j" "github.com/neo4j/neo4j-go-driver/v4/neo4j"
"log" "log"
"os" "os"
"strings"
"time"
) )
func convertChannelIDToString(channelID uint64) string { func convertChannelIDToString(channelID uint64) string {
@ -27,15 +29,35 @@ func ConnectToLND() (*lndclient.GrpcLndServices, error) {
} }
func writeNodesToMemgraph(session neo4j.Session, nodes []lndclient.Node) { func writeNodesToMemgraph(session neo4j.Session, nodes []lndclient.Node) {
for _, node := range nodes { const batchSize = 100
query := "MERGE (n:node {pubkey: $pubKey, alias: $alias})"
params := map[string]interface{}{ for i := 0; i < len(nodes); i += batchSize {
end := i + batchSize
if end > len(nodes) {
end = len(nodes)
}
batch := nodes[i:end]
// Prepare the list of maps for UNWIND
records := make([]map[string]interface{}, 0, len(batch))
for _, node := range batch {
records = append(records, map[string]interface{}{
"pubKey": node.PubKey.String(), "pubKey": node.PubKey.String(),
"alias": node.Alias, "alias": node.Alias,
})
} }
query := `
UNWIND $rows AS row
MERGE (n:node {pubkey: row.pubKey})
SET n.alias = row.alias
`
params := map[string]interface{}{"rows": records}
_, err := session.Run(query, params) _, err := session.Run(query, params)
if err != nil { if err != nil {
log.Printf("Failed to execute node query: %v", err) log.Printf("Failed to execute batch node query: %v", err)
} }
} }
} }
@ -53,7 +75,7 @@ func createNodeIndex(session neo4j.Session) {
func createIndexForChannels(session neo4j.Session) { func createIndexForChannels(session neo4j.Session) {
// Query to create an index on the channel_id property of CHANNEL relationships // Query to create an index on the channel_id property of CHANNEL relationships
indexQuery := "CREATE INDEX ON :CHANNEL(channel_id)" indexQuery := "CREATE INDEX ON :edge(channel_id)"
// Execute the index creation query // Execute the index creation query
_, err := session.Run(indexQuery, nil) _, err := session.Run(indexQuery, nil)
@ -63,29 +85,67 @@ func createIndexForChannels(session neo4j.Session) {
} }
func writeChannelsToMemgraph(session neo4j.Session, edges []lndclient.ChannelEdge) { func writeChannelsToMemgraph(session neo4j.Session, edges []lndclient.ChannelEdge) {
const batchSize = 100
relations := []map[string]interface{}{}
for _, edge := range edges { for _, edge := range edges {
chanID := convertChannelIDToString(edge.ChannelID) // Convert uint64 to string format chanID := strings.Replace(convertChannelIDToString(edge.ChannelID), ":", "x", -1)
if edge.Node1Policy != nil { if edge.Node1Policy != nil {
writeChannelPolicyToMemgraph(session, &edge, edge.Node1Policy, edge.Node1.String(), edge.Node2.String(), chanID) relations = append(relations, map[string]interface{}{
"from": edge.Node1.String(),
"to": edge.Node2.String(),
"chan_id": chanID,
"capacity": edge.Capacity,
"fee_base": edge.Node1Policy.FeeBaseMsat,
"fee_rate": edge.Node1Policy.FeeRateMilliMsat,
"time_lock": edge.Node1Policy.TimeLockDelta,
"disabled": edge.Node1Policy.Disabled,
"min_htlc": edge.Node1Policy.MinHtlcMsat,
"max_htlc": edge.Node1Policy.MaxHtlcMsat,
})
} }
if edge.Node2Policy != nil { if edge.Node2Policy != nil {
writeChannelPolicyToMemgraph(session, &edge, edge.Node2Policy, edge.Node2.String(), edge.Node1.String(), chanID) relations = append(relations, map[string]interface{}{
} "from": edge.Node2.String(),
"to": edge.Node1.String(),
"chan_id": chanID,
"capacity": edge.Capacity,
"fee_base": edge.Node2Policy.FeeBaseMsat,
"fee_rate": edge.Node2Policy.FeeRateMilliMsat,
"time_lock": edge.Node2Policy.TimeLockDelta,
"disabled": edge.Node2Policy.Disabled,
"min_htlc": edge.Node2Policy.MinHtlcMsat,
"max_htlc": edge.Node2Policy.MaxHtlcMsat,
})
} }
} }
func writeChannelPolicyToMemgraph(session neo4j.Session, edge *lndclient.ChannelEdge, policy *lndclient.RoutingPolicy, node1PubKey, node2PubKey, chanID string) { for i := 0; i < len(relations); i += batchSize {
if policy != nil { end := i + batchSize
query := fmt.Sprintf(` if end > len(relations) {
MATCH (a:node {pubkey: '%s'}), (b:node {pubkey: '%s'}) end = len(relations)
MERGE (a)-[r:CHANNEL {channel_id: '%s', capacity: %d}]->(b) }
SET r.fee_base_msat = %d, r.fee_rate_milli_msat = %d, r.time_lock_delta = %d,
r.disabled = %v, r.min_htlc_msat = %d, r.max_htlc_msat = %d batch := relations[i:end]
`, node1PubKey, node2PubKey, chanID, edge.Capacity, query := `
policy.FeeBaseMsat, policy.FeeRateMilliMsat, policy.TimeLockDelta, policy.Disabled, policy.MinHtlcMsat, policy.MaxHtlcMsat) UNWIND $rows AS row
_, err := session.Run(query, nil) MATCH (a:node {pubkey: row.from}), (b:node {pubkey: row.to})
MERGE (a)-[r:edge {channel_id: row.chan_id, capacity: row.capacity}]->(b)
SET r.fee_base_msat = row.fee_base,
r.fee_rate_milli_msat = row.fee_rate,
r.time_lock_delta = row.time_lock,
r.disabled = row.disabled,
r.min_htlc_msat = row.min_htlc,
r.max_htlc_msat = row.max_htlc
`
params := map[string]interface{}{"rows": batch}
_, err := session.Run(query, params)
if err != nil { if err != nil {
log.Printf("Failed to execute channel policy query: %v", err) log.Printf("Failed to execute batch channel policy query: %v", err)
} }
} }
} }
@ -93,8 +153,11 @@ func writeChannelPolicyToMemgraph(session neo4j.Session, edge *lndclient.Channel
func PullGraph(lndServices *lndclient.GrpcLndServices) *lndclient.Graph { func PullGraph(lndServices *lndclient.GrpcLndServices) *lndclient.Graph {
fmt.Println("Pulling graph...") fmt.Println("Pulling graph...")
duration := 10 * 60 * time.Second
graph, err := lndServices.Client.DescribeGraph(context.Background(), false) _ctx := context.WithoutCancel(context.Background())
ctx, cancel := context.WithTimeout(_ctx, duration)
defer cancel()
graph, err := lndServices.Client.DescribeGraph(ctx, false)
if err != nil { if err != nil {
log.Printf("Failed to execute channel policy query: %v", err) log.Printf("Failed to execute channel policy query: %v", err)
} }
@ -102,6 +165,19 @@ func PullGraph(lndServices *lndclient.GrpcLndServices) *lndclient.Graph {
} }
func GetInfo(lndServices *lndclient.GrpcLndServices) {
fmt.Println("getting info...")
duration := time.Now().Add(10 * time.Minute)
ctx, cancel := context.WithDeadline(context.Background(), duration)
defer cancel()
info, err := lndServices.Client.GetInfo(ctx)
if err != nil {
log.Printf("Failed to execute get info query: %v", err)
}
log.Print(info)
//return info
}
func WriteGraphToMemgraph(graph *lndclient.Graph, neo4jDriver neo4j.Driver) { func WriteGraphToMemgraph(graph *lndclient.Graph, neo4jDriver neo4j.Driver) {
var err error var err error
session := neo4jDriver.NewSession(neo4j.SessionConfig{}) session := neo4jDriver.NewSession(neo4j.SessionConfig{})

View file

@ -40,11 +40,11 @@ func DropDatabase(neo4jDriver neo4j.Driver) {
// Drop index on pub_key property // Drop index on pub_key property
_, err = session.Run("DROP INDEX ON :node(pubkey)", nil) _, err = session.Run("DROP INDEX ON :node(pubkey)", nil)
if err != nil { if err != nil {
log.Printf("Failed to drop index on pub_key property: %v", err) log.Printf("Failed to drop index on pubkey property: %v", err)
} }
// Drop index on channel_id property // Drop index on channel_id property
_, err = session.Run("DROP INDEX ON :CHANNEL(channel_id)", nil) _, err = session.Run("DROP INDEX ON :edge(channel_id)", nil)
if err != nil { if err != nil {
log.Printf("Failed to drop index on channel_id property: %v", err) log.Printf("Failed to drop index on channel_id property: %v", err)
} }
@ -63,7 +63,7 @@ func CommitQuery(driver neo4j.Driver, query string, params map[string]interface{
// ProcessNodeUpdate converts node updates to Memgraph queries // ProcessNodeUpdate converts node updates to Memgraph queries
func ProcessNodeUpdate(nodeUpdate lndclient.NodeUpdate) (string, map[string]interface{}) { func ProcessNodeUpdate(nodeUpdate lndclient.NodeUpdate) (string, map[string]interface{}) {
nodeQuery := "MERGE (n:Node {pub_key: $pubKey})\nSET n.alias = $alias" nodeQuery := "MERGE (n:node {pubkey: $pubKey})\nSET n.alias = $alias"
params := map[string]interface{}{ params := map[string]interface{}{
"pubKey": nodeUpdate.IdentityKey.String(), "pubKey": nodeUpdate.IdentityKey.String(),
"alias": nodeUpdate.Alias, "alias": nodeUpdate.Alias,
@ -78,13 +78,13 @@ func ProcessEdgeUpdate(edgeUpdate lndclient.ChannelEdgeUpdate) (string, map[stri
params map[string]interface{} params map[string]interface{}
) )
if edgeUpdate.RoutingPolicy.Disabled { if edgeUpdate.RoutingPolicy.Disabled {
edgeQuery = "MATCH ()-[r:CHANNEL {channel_id: $channelID}]->()\nWHERE r.disabled = true\nDELETE r" edgeQuery = "MATCH ()-[r:edge {channel_id: $channelID}]->()\nset r.disabled = true"
params = map[string]interface{}{ params = map[string]interface{}{
"channelID": edgeUpdate.ChannelID.String(), "channelID": edgeUpdate.ChannelID.String(),
} }
} else { } else {
edgeQuery = "MERGE (n1:Node {pub_key: $advertisingNode})\nMERGE (n2:Node {pub_key: $connectingNode})\n" + edgeQuery = "MERGE (n1:node {pubkey: $advertisingNode})\nMERGE (n2:node {pubkey: $connectingNode})\n" +
"MERGE (n1)-[r:CHANNEL {channel_id: $channelID}]->(n2)\n" + "MERGE (n1)-[r:edge {channel_id: $channelID}]->(n2)\n" +
"SET r.fee_base_msat = $fee_base_msat, r.fee_rate_milli_msat = $fee_rate_milli_msat, r.time_lock_delta = $time_lock_delta, r.disabled = $disabled" "SET r.fee_base_msat = $fee_base_msat, r.fee_rate_milli_msat = $fee_rate_milli_msat, r.time_lock_delta = $time_lock_delta, r.disabled = $disabled"
params = map[string]interface{}{ params = map[string]interface{}{
"advertisingNode": edgeUpdate.AdvertisingNode.String(), "advertisingNode": edgeUpdate.AdvertisingNode.String(),
@ -102,7 +102,7 @@ func ProcessEdgeUpdate(edgeUpdate lndclient.ChannelEdgeUpdate) (string, map[stri
// ProcessCloseUpdate converts channel close updates to Memgraph queries // ProcessCloseUpdate converts channel close updates to Memgraph queries
func ProcessCloseUpdate(closeUpdate lndclient.ChannelCloseUpdate) (string, map[string]interface{}) { func ProcessCloseUpdate(closeUpdate lndclient.ChannelCloseUpdate) (string, map[string]interface{}) {
closeQuery := "MATCH ()-[r:CHANNEL {channel_id: $channelID}]->()\nWHERE r.disabled = true\nDELETE r" closeQuery := "MATCH ()-[r:edge {channel_id: $channelID}]->()\nDELETE r"
params := map[string]interface{}{ params := map[string]interface{}{
"channelID": closeUpdate.ChannelID.String(), "channelID": closeUpdate.ChannelID.String(),
} }