ln-stream/lnd/lnd.go

119 lines
3.8 KiB
Go

package lnd
import (
"context"
"fmt"
"github.com/lightninglabs/lndclient"
"github.com/neo4j/neo4j-go-driver/v4/neo4j"
"log"
"os"
)
func convertChannelIDToString(channelID uint64) string {
blockHeight := channelID >> 40
blockIndex := (channelID >> 16) & ((1 << 24) - 1)
outputIndex := channelID & ((1 << 16) - 1)
return fmt.Sprintf("%d:%d:%d", blockHeight, blockIndex, outputIndex)
}
func ConnectToLND() (*lndclient.GrpcLndServices, error) {
config := lndclient.LndServicesConfig{
LndAddress: os.Getenv("LND_ADDRESS"),
Network: lndclient.Network(os.Getenv("LND_NETWORK")),
CustomMacaroonPath: os.Getenv("LND_MACAROON_PATH"),
TLSPath: os.Getenv("LND_TLS_CERT_PATH"),
}
return lndclient.NewLndServices(&config)
}
func writeNodesToMemgraph(session neo4j.Session, nodes []lndclient.Node) {
for _, node := range nodes {
query := "MERGE (n:node {pubkey: $pubKey, alias: $alias})"
params := map[string]interface{}{
"pubKey": node.PubKey.String(),
"alias": node.Alias,
}
_, err := session.Run(query, params)
if err != nil {
log.Printf("Failed to execute node query: %v", err)
}
}
}
func createNodeIndex(session neo4j.Session) {
// Query to create an index on the pubkey property of Node
indexQuery := "CREATE INDEX ON :node(pubkey)"
// Execute the index creation query
_, err := session.Run(indexQuery, nil)
if err != nil {
log.Printf("Failed to create index: %v", err)
}
}
func createIndexForChannels(session neo4j.Session) {
// Query to create an index on the channel_id property of CHANNEL relationships
indexQuery := "CREATE INDEX ON :CHANNEL(channel_id)"
// Execute the index creation query
_, err := session.Run(indexQuery, nil)
if err != nil {
log.Printf("Failed to create index for channels: %v", err)
}
}
func writeChannelsToMemgraph(session neo4j.Session, edges []lndclient.ChannelEdge) {
for _, edge := range edges {
chanID := convertChannelIDToString(edge.ChannelID) // Convert uint64 to string format
if edge.Node1Policy != nil {
writeChannelPolicyToMemgraph(session, &edge, edge.Node1Policy, edge.Node1.String(), edge.Node2.String(), chanID)
}
if edge.Node2Policy != nil {
writeChannelPolicyToMemgraph(session, &edge, edge.Node2Policy, edge.Node2.String(), edge.Node1.String(), chanID)
}
}
}
func writeChannelPolicyToMemgraph(session neo4j.Session, edge *lndclient.ChannelEdge, policy *lndclient.RoutingPolicy, node1PubKey, node2PubKey, chanID string) {
if policy != nil {
query := fmt.Sprintf(`
MATCH (a:node {pubkey: '%s'}), (b:node {pubkey: '%s'})
MERGE (a)-[r:CHANNEL {channel_id: '%s', capacity: %d}]->(b)
SET r.fee_base_msat = %d, r.fee_rate_milli_msat = %d, r.time_lock_delta = %d,
r.disabled = %v, r.min_htlc_msat = %d, r.max_htlc_msat = %d
`, node1PubKey, node2PubKey, chanID, edge.Capacity,
policy.FeeBaseMsat, policy.FeeRateMilliMsat, policy.TimeLockDelta, policy.Disabled, policy.MinHtlcMsat, policy.MaxHtlcMsat)
_, err := session.Run(query, nil)
if err != nil {
log.Printf("Failed to execute channel policy query: %v", err)
}
}
}
func PullGraph(lndServices *lndclient.GrpcLndServices) *lndclient.Graph {
fmt.Println("Pulling graph...")
graph, err := lndServices.Client.DescribeGraph(context.Background(), false)
if err != nil {
log.Printf("Failed to execute channel policy query: %v", err)
}
return graph
}
func WriteGraphToMemgraph(graph *lndclient.Graph, neo4jDriver neo4j.Driver) {
var err error
session := neo4jDriver.NewSession(neo4j.SessionConfig{})
defer session.Close()
if err != nil {
log.Fatalf("Failed to retrieve graph: %v", err)
}
fmt.Println("Writing to Memgraph...")
createNodeIndex(session)
createIndexForChannels(session)
writeNodesToMemgraph(session, graph.Nodes)
writeChannelsToMemgraph(session, graph.Edges)
fmt.Println("Finished writing to Memgraph...")
}