Hi,
while writing a basic ipfs based program in go i encountered many bugs which i intend to open bug reports about, but i thought i post here before preparing the bug report since i have limited go experience and i might be doing something wrong.
So for the first bug, if i initialize a p2p instance and bootstrap it with the default kubo peers then when i try to fetch a block from a specific peer, the program hangs, if i instead close all connections before connecting to the peer in question then the fetching works and i am able to receive data
package main
import (
"context"
"fmt"
"io"
"log"
"os"
"sync"
"time"
"github.com/ipfs/boxo/bitswap/client"
bsnet "github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
dag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/unixfs"
uio "github.com/ipfs/boxo/ipld/unixfs/io"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p"
kaddht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
var DefaultBootstrapAddresses = []string{
"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
"/ip4/104.131.131.82/udp/4001/quic/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
}
type ProgressTracker struct {
totalSize uint64
downloaded int64
startTime time.Time
mutex sync.Mutex
lastUpdate time.Time
updatePeriod time.Duration
}
func NewProgressTracker(totalSize uint64) *ProgressTracker {
return &ProgressTracker{
totalSize: totalSize,
startTime: time.Now(),
lastUpdate: time.Now(),
updatePeriod: time.Millisecond * 100, // Update progress every 100ms
}
}
func (pt *ProgressTracker) Update(size int64) {
pt.mutex.Lock()
defer pt.mutex.Unlock()
pt.downloaded += size
now := time.Now()
// Only update display if enough time has passed
if now.Sub(pt.lastUpdate) >= pt.updatePeriod {
elapsed := now.Sub(pt.startTime).Seconds()
speed := float64(pt.downloaded) / elapsed / 1024 / 1024 // MB/s
progress := float64(pt.downloaded) / float64(pt.totalSize) * 100
fmt.Printf("\rProgress: %.2f%% (%.2f MB/s)", progress, speed)
pt.lastUpdate = now
}
}
func createLibp2pHost() (host.Host, error) {
return libp2p.New(
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
)
}
func downloadFile(ctx context.Context, blockService blockservice.BlockService, targetCid cid.Cid, outPath string) error {
fmt.Printf("Setting up dag\n")
// Create DAGService
dagService := dag.NewDAGService(blockService)
fmt.Printf("Fetch the root node\n")
rootNode, err := dagService.Get(ctx, targetCid)
if err != nil {
return fmt.Errorf("failed to get root node: %w", err)
}
fmt.Printf("Parse as UnixFS\n")
fsNode, err := unixfs.ExtractFSNode(rootNode)
if err != nil {
return fmt.Errorf("failed to parse UnixFS node: %w", err)
}
// Create progress tracker
tracker := NewProgressTracker(fsNode.FileSize())
// Create output file
file, err := os.Create(outPath)
if err != nil {
return fmt.Errorf("failed to create output file: %w", err)
}
defer file.Close()
// Create a dag reader
dagReader, err := uio.NewDagReader(ctx, rootNode, dagService)
if err != nil {
return fmt.Errorf("failed to create DAG reader: %w", err)
}
fmt.Printf("Reading data\n")
// Copy data while tracking progress
buf := make([]byte, 1024*1024) // 1MB buffer
for {
n, err := dagReader.Read(buf)
if n > 0 {
_, writeErr := file.Write(buf[:n])
if writeErr != nil {
return fmt.Errorf("failed to write to file: %w", writeErr)
}
tracker.Update(int64(n))
}
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read data: %w", err)
}
}
fmt.Println("\nDownload completed successfully!")
return nil
}
func main() {
ctx := context.Background()
// Target peer ID and CID
targetPeerID := "12D3KooWF8fwVzzEsCeznHz2bBUfsQ5FhN5WFMzdkWXghmxNkT4Y"
targetCIDStr := "QmPfJrYhP6X7Eb1uToZkYdTjsMn4AAtfG966Eufw7mSrL8"
// Parse CID
targetCid, err := cid.Decode(targetCIDStr)
if err != nil {
fmt.Printf("Failed to decode CID: %s\n", err)
return
}
// Create libp2p host
host, err := createLibp2pHost()
if err != nil {
fmt.Printf("Failed to create host: %s\n", err)
return
}
defer host.Close()
fmt.Printf("Local peer ID: %s\n", host.ID().String())
dht, err := kaddht.New(ctx, host)
if err != nil {
log.Fatalf("Failed to create DHT: %v", err)
}
defer dht.Close()
// Bootstrap the DHT
fmt.Println("Bootstrapping DHT...")
for _, addr := range DefaultBootstrapAddresses {
multiAddr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
log.Printf("Invalid bootstrap address: %s, skipping...\n", addr)
continue
}
addrInfo, err := peer.AddrInfoFromP2pAddr(multiAddr)
if err != nil {
log.Printf("Failed to parse bootstrap address: %s, skipping...\n", addr)
continue
}
err = host.Connect(ctx, *addrInfo)
if err != nil {
log.Printf("Failed to connect to bootstrap node %s: %v\n", addrInfo.ID, err)
} else {
fmt.Printf("Connected to bootstrap node: %s\n", addrInfo.ID)
}
}
fmt.Println("DHT bootstrapped successfully.")
// Parse target peer ID
peerID, err := peer.Decode(targetPeerID)
if err != nil {
fmt.Printf("Failed to decode peer ID: %s\n", err)
return
}
// Search for the peer's address
fmt.Printf("Searching for peer ID: %s\n", peerID)
peerAddrInfo, err := dht.FindPeer(ctx, peerID)
if err != nil {
log.Fatalf("Failed to find peer ID %s: %v", peerID, err)
}
fmt.Printf("Found peer %v\n", peerAddrInfo)
// somehow this is needed for the dag to work and fetch data
// for _, peer := range host.Network().Peers() {
// host.Network().ClosePeer(peer)
// }
fmt.Printf("Connections: %v\n", host.Network().Peers())
fmt.Printf("Setting up bitswap\n")
// Setup Bitswap
ds := dssync.MutexWrap(datastore.NewMapDatastore())
fmt.Printf("Setting up blockstore\n")
bs := blockstore.NewBlockstore(ds)
network := bsnet.NewFromIpfsHost(host, routinghelpers.Null{})
exchange := client.New(ctx, network, bs)
network.Start(exchange)
// Connect to the peer
fmt.Printf("Attempting to connect to peer %s...\n", peerID)
err = host.Connect(ctx, peerAddrInfo)
if err != nil {
log.Fatalf("Failed to connect to peer: %v", err)
}
fmt.Printf("Connected successfully\n")
fmt.Printf("Setting up blockService\n")
// Create BlockService
bserv := blockservice.New(bs, exchange)
fmt.Printf("Connected peers: %v\n", host.Network().Peers())
fmt.Printf("connections: %v\n", host.Network().Conns())
fmt.Printf("Pinging peer")
pingService := ping.NewPingService(host)
ch := pingService.Ping(ctx, peerID)
for i := 0; i < 3; i++ { // Send 3 pings
select {
case res := <-ch:
if res.Error != nil {
fmt.Printf("Ping failed: %v\n", res.Error)
}
fmt.Printf("Ping RTT: %v\n", res.RTT)
case <-time.After(5 * time.Second):
fmt.Printf("Ping timeout\n")
}
}
fmt.Printf("Peer %s is reachable\n", peerID)
fmt.Printf("Downloading\n")
// Download the file
err = downloadFile(ctx, bserv, targetCid, "downloaded_file")
if err != nil {
fmt.Printf("Download failed: %s\n", err)
return
}
}
If i uncomment these lines:
// for _, peer := range host.Network().Peers() {
// host.Network().ClosePeer(peer)
// }
then the fetching works, if i leave them commented then the program hangs at this line indefinitely:
rootNode, err := dagService.Get(ctx, targetCid)