Unable to get blocks when the node is connected to multiple peers

Hi,
while writing a basic ipfs based program in go i encountered many bugs which i intend to open bug reports about, but i thought i post here before preparing the bug report since i have limited go experience and i might be doing something wrong.

So for the first bug, if i initialize a p2p instance and bootstrap it with the default kubo peers then when i try to fetch a block from a specific peer, the program hangs, if i instead close all connections before connecting to the peer in question then the fetching works and i am able to receive data

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"
	"sync"
	"time"

	"github.com/ipfs/boxo/bitswap/client"
	bsnet "github.com/ipfs/boxo/bitswap/network"
	"github.com/ipfs/boxo/blockservice"
	"github.com/ipfs/boxo/blockstore"
	"github.com/libp2p/go-libp2p/p2p/protocol/ping"

	dag "github.com/ipfs/boxo/ipld/merkledag"
	"github.com/ipfs/boxo/ipld/unixfs"
	uio "github.com/ipfs/boxo/ipld/unixfs/io"
	routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"

	"github.com/ipfs/go-cid"
	"github.com/ipfs/go-datastore"
	dssync "github.com/ipfs/go-datastore/sync"
	"github.com/libp2p/go-libp2p"
	kaddht "github.com/libp2p/go-libp2p-kad-dht"
	"github.com/libp2p/go-libp2p/core/host"
	"github.com/libp2p/go-libp2p/core/peer"
	"github.com/multiformats/go-multiaddr"
)

var DefaultBootstrapAddresses = []string{
	"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
	"/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
	"/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
	"/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
	"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",      // mars.i.ipfs.io
	"/ip4/104.131.131.82/udp/4001/quic/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
}

type ProgressTracker struct {
	totalSize    uint64
	downloaded   int64
	startTime    time.Time
	mutex        sync.Mutex
	lastUpdate   time.Time
	updatePeriod time.Duration
}

func NewProgressTracker(totalSize uint64) *ProgressTracker {
	return &ProgressTracker{
		totalSize:    totalSize,
		startTime:    time.Now(),
		lastUpdate:   time.Now(),
		updatePeriod: time.Millisecond * 100, // Update progress every 100ms
	}
}

func (pt *ProgressTracker) Update(size int64) {
	pt.mutex.Lock()
	defer pt.mutex.Unlock()

	pt.downloaded += size
	now := time.Now()

	// Only update display if enough time has passed
	if now.Sub(pt.lastUpdate) >= pt.updatePeriod {
		elapsed := now.Sub(pt.startTime).Seconds()
		speed := float64(pt.downloaded) / elapsed / 1024 / 1024 // MB/s
		progress := float64(pt.downloaded) / float64(pt.totalSize) * 100

		fmt.Printf("\rProgress: %.2f%% (%.2f MB/s)", progress, speed)
		pt.lastUpdate = now
	}
}

func createLibp2pHost() (host.Host, error) {

	return libp2p.New(
		libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
	)

}
func downloadFile(ctx context.Context, blockService blockservice.BlockService, targetCid cid.Cid, outPath string) error {
	fmt.Printf("Setting up dag\n")
	// Create DAGService
	dagService := dag.NewDAGService(blockService)
	fmt.Printf("Fetch the root node\n")
	rootNode, err := dagService.Get(ctx, targetCid)
	if err != nil {
		return fmt.Errorf("failed to get root node: %w", err)
	}

	fmt.Printf("Parse as UnixFS\n")

	fsNode, err := unixfs.ExtractFSNode(rootNode)
	if err != nil {
		return fmt.Errorf("failed to parse UnixFS node: %w", err)
	}

	// Create progress tracker
	tracker := NewProgressTracker(fsNode.FileSize())

	// Create output file
	file, err := os.Create(outPath)
	if err != nil {
		return fmt.Errorf("failed to create output file: %w", err)
	}
	defer file.Close()

	// Create a dag reader
	dagReader, err := uio.NewDagReader(ctx, rootNode, dagService)
	if err != nil {
		return fmt.Errorf("failed to create DAG reader: %w", err)
	}
	fmt.Printf("Reading data\n")
	// Copy data while tracking progress
	buf := make([]byte, 1024*1024) // 1MB buffer
	for {
		n, err := dagReader.Read(buf)
		if n > 0 {
			_, writeErr := file.Write(buf[:n])
			if writeErr != nil {
				return fmt.Errorf("failed to write to file: %w", writeErr)
			}
			tracker.Update(int64(n))
		}
		if err == io.EOF {
			break
		}
		if err != nil {
			return fmt.Errorf("failed to read data: %w", err)
		}
	}

	fmt.Println("\nDownload completed successfully!")
	return nil
}

func main() {
	ctx := context.Background()

	// Target peer ID and CID
	targetPeerID := "12D3KooWF8fwVzzEsCeznHz2bBUfsQ5FhN5WFMzdkWXghmxNkT4Y"
	targetCIDStr := "QmPfJrYhP6X7Eb1uToZkYdTjsMn4AAtfG966Eufw7mSrL8"

	// Parse CID
	targetCid, err := cid.Decode(targetCIDStr)
	if err != nil {
		fmt.Printf("Failed to decode CID: %s\n", err)
		return
	}

	// Create libp2p host
	host, err := createLibp2pHost()
	if err != nil {
		fmt.Printf("Failed to create host: %s\n", err)
		return
	}
	defer host.Close()

	fmt.Printf("Local peer ID: %s\n", host.ID().String())

	dht, err := kaddht.New(ctx, host)
	if err != nil {
		log.Fatalf("Failed to create DHT: %v", err)
	}
	defer dht.Close()

	// Bootstrap the DHT
	fmt.Println("Bootstrapping DHT...")
	for _, addr := range DefaultBootstrapAddresses {
		multiAddr, err := multiaddr.NewMultiaddr(addr)
		if err != nil {
			log.Printf("Invalid bootstrap address: %s, skipping...\n", addr)
			continue
		}
		addrInfo, err := peer.AddrInfoFromP2pAddr(multiAddr)
		if err != nil {
			log.Printf("Failed to parse bootstrap address: %s, skipping...\n", addr)
			continue
		}
		err = host.Connect(ctx, *addrInfo)
		if err != nil {
			log.Printf("Failed to connect to bootstrap node %s: %v\n", addrInfo.ID, err)
		} else {
			fmt.Printf("Connected to bootstrap node: %s\n", addrInfo.ID)
		}
	}
	fmt.Println("DHT bootstrapped successfully.")

	// Parse target peer ID
	peerID, err := peer.Decode(targetPeerID)
	if err != nil {
		fmt.Printf("Failed to decode peer ID: %s\n", err)
		return
	}
	// Search for the peer's address
	fmt.Printf("Searching for peer ID: %s\n", peerID)
	peerAddrInfo, err := dht.FindPeer(ctx, peerID)
	if err != nil {
		log.Fatalf("Failed to find peer ID %s: %v", peerID, err)
	}
	fmt.Printf("Found peer %v\n", peerAddrInfo)

	// somehow this is needed for the dag to work and fetch data
	// for _, peer := range host.Network().Peers() {
	// 	host.Network().ClosePeer(peer)
	// }

	fmt.Printf("Connections: %v\n", host.Network().Peers())

	fmt.Printf("Setting up bitswap\n")
	// Setup Bitswap
	ds := dssync.MutexWrap(datastore.NewMapDatastore())
	fmt.Printf("Setting up blockstore\n")
	bs := blockstore.NewBlockstore(ds)

	network := bsnet.NewFromIpfsHost(host, routinghelpers.Null{})
	exchange := client.New(ctx, network, bs)
	network.Start(exchange)

	// Connect to the peer
	fmt.Printf("Attempting to connect to peer %s...\n", peerID)
	err = host.Connect(ctx, peerAddrInfo)
	if err != nil {
		log.Fatalf("Failed to connect to peer: %v", err)
	}

	fmt.Printf("Connected successfully\n")

	fmt.Printf("Setting up blockService\n")
	// Create BlockService
	bserv := blockservice.New(bs, exchange)
	fmt.Printf("Connected peers: %v\n", host.Network().Peers())

	fmt.Printf("connections:  %v\n", host.Network().Conns())

	fmt.Printf("Pinging peer")

	pingService := ping.NewPingService(host)
	ch := pingService.Ping(ctx, peerID)

	for i := 0; i < 3; i++ { // Send 3 pings
		select {
		case res := <-ch:
			if res.Error != nil {
				fmt.Printf("Ping failed: %v\n", res.Error)
			}
			fmt.Printf("Ping RTT: %v\n", res.RTT)
		case <-time.After(5 * time.Second):
			fmt.Printf("Ping timeout\n")
		}
	}

	fmt.Printf("Peer %s is reachable\n", peerID)
	fmt.Printf("Downloading\n")
	// Download the file
	err = downloadFile(ctx, bserv, targetCid, "downloaded_file")
	if err != nil {
		fmt.Printf("Download failed: %s\n", err)
		return
	}
}

If i uncomment these lines:

// for _, peer := range host.Network().Peers() {
// 	host.Network().ClosePeer(peer)
// }

then the fetching works, if i leave them commented then the program hangs at this line indefinitely:

rootNode, err := dagService.Get(ctx, targetCid)

If you move the bootstrapping logic until after you’ve started the bitswap client things should also work. IIRC the boxo/bitswap client currently has some difficulty recognizing connections that happened before the service is started. This is why closing the connections would help, because they’ll get reopened after the bitswap service is running

1 Like

This was the other bug i found, but found it when connecting to the target peer before starting bitswap, for some reason it didn’t cross my mind that it’s the same issue with the bootstrap nodes

Sorry I wasn’t previously reading quite as closely. It’s not really about the bootstrapping, but about the fact that you’re connecting to the peer here before starting up the bitswap client. Really this should be fixed in the client, but if you start your services up before using them then this problem should go away (e.g. instantiate the DHT, Bitswap, etc. and start the connections up afterwards). While it’s unlikely that connecting to the bootstrappers early could cause you issues, there are theoretically some issues where the peer hosting your data is also a DHT server and you happen to connect to them while the DHT is starting up before Bitswap does (although that’s unlikely unless you’re in a test environment where latencies are really small).

Note: dht.FindPeer will connect to the peer being discovered if possible. It’s not strictly required by the interfaces, but it’ll do it. Which is why as long as the bitswap client has this issue you’d want to run this after the client is started.

1 Like