Giter Club home page Giter Club logo

go-libp2p-pubsub's People

Contributors

aarshkshah1992 avatar aschmahmann avatar daviddias avatar dependabot-preview[bot] avatar dirkmc avatar film42 avatar fxfactorial avatar hsanjuan avatar iand avatar jamesray1 avatar keks avatar kevina avatar kubuxu avatar lthibault avatar lukasz-zimnoch avatar marcopolo avatar michaelmure avatar nisdas avatar prestonvanloon avatar protolambda avatar rargulati avatar raulk avatar richard-ramos avatar stebalien avatar vyzo avatar web-flow avatar whyrusleeping avatar wondertan avatar yhassanzadeh13 avatar yusefnapora avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-libp2p-pubsub's Issues

Signing on some topics

Is it possible to reshape signing messages on pubsub as to turn signing on or off on topics rather than entire pubsub?
The reason behind this question is that there might be topics that do not need signing and the process of signing/validating induce unwanted overhead. As a practical example, one might implement a topic for broadcasting blockchain transactions that are known that are tampered proof and the originator peer node simply doesn't matter.
Of course there is a complication to this as the function msgID needs to compute the message ID in a different way. (may be a hash string of the payload when the message is not signed?)

Global Validators

TODO in #55.

Instead of registering per-topic validators as we do in #55, validators should decide whether or not they care about a topic when we first subscribe to it. E.g.,

type Validator interface {
  Validates(topic string) (bool, error)
  // ...
}

strange issue with ED25519 keys

I have a test failing in my code (below) and I think it shouldn't. Perhaps I'm missing something though - was wondering if this is a bug, or I have incorrect assumptions somewhere:

func TestPubKey(t *testing.T) {
	priv, _, _ := crypto.GenerateEd25519Key(mrand.New(mrand.NewSource(0)))
	id1, _ := peer.IDFromEd25519PublicKey(priv.GetPublic())
	id2, _ := peer.IDFromPrivateKey(priv)
	assert.Equal(t, id1.Pretty(), id2.Pretty(), "should be equal?") // fails...
}

p.s. I found that IDFromPublicKey works instead of IDFromEd25519PublicKey but the difference still seems strange...

Optimize Validator Pipeline

Our validation pipeline is asynchronous: it spawns a goroutine for each incoming message, which may then get throttled based on topic and global limit throttles.
This creates a performance problem for message signature validation (#97) as we now have to enter the validation pipeline for every message when using signing (planned to become the default in ipfs and filecoin).

Proposed action:

  • pre-spawn NUMCPU goroutines to handle the validation front-end.
  • all signatures should be validated in the validation front-end goroutines
  • create a distinction between synchronous and asynchronous validators (which may block and be otherwise slow due to network effects), such that:
    • synchronous validators are applied in the validation front-end
    • asynchronous validators are handled in a freshly spawned goroutine, limited by the throttles.

Handle disconnected peer

Hello

I have the following use case:
peer A connects to peer B. Peer B do not wishes to retain the connection to peer A and closes the connection (calling net.Conn.Close() on peer A newly connection). After a while, peer A tries to connect again to peer B and the following message appear on peer B:
09:54:07.886 ERROR pubsub: already have connection to peer: <peer.ID 16*E91Gqs> pubsub.go:205

Searching through the source code, type PubSubNotif only handles the Connected event. In my above-mentioned use case I have used floodsub as a mean to disseminate data. FloodSub uses PubSub which has a channel called peerDead which is never used.

I have also tried using GossipSub with the same error being displayed.

Am I missing something? Big thanks!

Disconnect slow peers

Instead of infinitely buffering by using go routines, we should disconnect slow peers (peers that develop a large outbound queue).

Panic on ARM

Hi,

a user of cluster reports a panic coming from pubsub on a BeagleBone:

ipfs-cluster/ipfs-cluster#433

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x4 pc=0x11540]

goroutine 46 [running]:
sync/atomic.addUint64(0x131c005c, 0x1, 0x0, 0x131c9150, 0x8)
	/usr/lib64/go/1.9/src/sync/atomic/64bit_arm.go:31 +0x4c
gx/ipfs/QmduQtUFqdq2RhM84yM2mYsdgJRAH8Aukb15viDxWkZtvP/go-libp2p-floodsub.(*PubSub).Publish(0x131c0000, 0x716b54, 0xf, 0x13124630, 0x5d, 0xa2, 0x0, 0x1e)
	gx/ipfs/QmduQtUFqdq2RhM84yM2mYsdgJRAH8Aukb15viDxWkZtvP/go-libp2p-floodsub/pubsub.go:556 +0x64
github.com/ipfs/ipfs-cluster/monitor/pubsubmon.(*Monitor).PublishMetric(0x12d5caa0, 0x70dc03, 0x4, 0x12d16c60, 0x22, 0x0, 0x0, 0xa5f1d075, 0x1531f22e, 0x1, ...)
	github.com/ipfs/ipfs-cluster/monitor/pubsubmon/pubsubmon.go:193 +0x39c
github.com/ipfs/ipfs-cluster.(*Cluster).pushPingMetrics(0x12d64320)
	github.com/ipfs/ipfs-cluster/cluster.go:232 +0xc0
created by github.com/ipfs/ipfs-cluster.(*Cluster).run
	github.com/ipfs/ipfs-cluster/cluster.go:340 +0x48

Might this come from the warning here: https://golang.org/pkg/sync/atomic/#pkg-note-BUG ? @vyzo

I'll be testing with my own RPis next week, maybe I can provide more info.

Reliable and/or Persistent Pubsub

Reliable Pubsub

Sometimes, messages get lost. To allow the receiver to recover lost messages, we could start systematically referencing previous messages when constructing them. That way, when the receiver receives the next message, it will find an unknown message reference and can ask its peers about it (or possibly fetch it like a regular ipfs/ipld object?)

The system I propose is the system is the system used at OrbitDB. Every peer maintains a list of messages that have not been referenced yet, DAG leafs so to speak. Once a new message is received, all message IDs referenced by the new message are removed from the list, and the new message's ID is added.
Once we author a message, we add a field "after": [...], containing the leaf list. Afterwards our leaf list contains only the ID of the message we just authored.

Persistent Pubsub

If we have something like the above, nodes already keep state about the "newest" messages. If we send these to peers sending us a subscription request for that topic, they can start fetching old messages. How many this would be depends on the application - in orbit it would suffice to fetch one screen worth of backlog, but in other application you may want to fetch the entire history.
This solves the problem described in #19.

Note: I want to make this optional. Whether or not this is used depends on the topic configuration.

Validation requires parsing messages twice

I've been exploring the recently added Validator functionality, which is very nice - however there is one (probably unintended) aspect of it that I found interesting. If I need to validate the contents of a message - say it's protobuf encoded - then I need to parse it once when validating, and potentially another time, after actually receiving the ([]byte) data through subscription.Next(). I don't think it's a major issue, and there might not be a clean way to solve it.

Add Associated Data

I propose adding something like associated data. It is kept up-to-date using application-specific logic and sent to peers that subscribe. This could be a string that describes what the topic is about or anything else. Basically an interface{} value.

Motivation

In Orbit we face the problem that a peer kind of sits in the dark after subscribing a topic. It doesn't have access to history until someone who has witnessed previous messages publishes a new message. That message will contain references to one or more most recently seen messages. The new peer can follow these messages and reconstruct history. However, it needs someone to post something to do that.

Instead it would be nice to get that information at subscribe time. That is why I would like to see an AD field that can be set by the application that subscribes to this.

I talked to @haadcode about this and he likes the idea.

Open Questions

How will this play with signing/encryption?

Not sure. I don't think I really got the auth part yet. Is there a hard-ish spec somewhere for encrypted/auth'd topics?
Anyway, I suspect if we do things the right way, the worst a byzantine node could do is to withhold updates to the AD field, which I would rate equal to not forwarding messages, which can still happen. So it shouldn't be a big problem.

question on floodsub and gossipsub compatibility question.

Sorry in advance for bringing this as an issue, as it's probably a better candidate for a wiki. I have a question: Is it intended that the floodsub and gossipsub protocols should be compatible? Ie. if you develop an application that uses one, and you substitute the other for half the nodes - will the network still operate? I understand that there are practical considerations about traffic volumes, etc, however my point is to ask whether the messages are compatible, such that a gossiped message sent by a node using floodsub, and received by a node which uses gossipsub, will be propagated by the second node, and whether that message can be propagated further. Is this a design goal of the system?

Add 'answer' message type

I think it would be really valuable to add an 'answer' type message.

Imagine a pubsub swarm where someone publishes a message asking for a certain value. That message gets propogated to the entire swarm as usual. Now if a peer wants to respond to that question, they essentially have to broadcast their answer to the entire swarm. Instead, i propose they send an 'answer' msesage, referencing the question message, back to whatever peer they received the question message from. The peer they responded to should check if they were the asker, and if not, continue sending the answer back until it reaches its target. Since we already keep a cache of recent messages to prevent cyclic propogation, this shouldnt require too much (if any) extra overhead.

Usecases that this would be great for:

  • Orbit, new peers join, ask for the latest head, other peers answer.
  • IPNS queries through pubsub
  • Latency updates for self adjusting pubsub swarms (once we move away from flooding)

Usage request

So I'd like to use this library to create a p2p pubsub capability, embedded in an app I'm writing. ATM, the price of addmission is I have to figure out how to

  • Create a capable floodsub instance (via underlying *Host)
  • connect this instance to others who are interested in subscribing (via underlying *Host)
  • handling various people joining, leaving, becoming stale
  • propagating messages to those I know who are interested
  • making sure the actual messages get to where they should :)

ATM I love the modularity that libp2p provides compared to *ipfs.IpfsNode, but I'd like to learn how to use it. I'd like an example that shows how to create a *floodsub.PubSub instance that can publish some []bytes and interested nodes could join, leave, etc.

My issue here is that floodsub seems to require a ton of other imports to use, and I cant imbed a *floodsub.PubSub without also embedding a host, and a PeerStore and maybe a []*floodsub.Subscription. Doing all this is doubtless, no problem - of only I knew what to do, or had some examples...

`dial attempt failed: EOF` when `go test` in macOS

I run the command in both macOS High Sierra(10.13.6) and ubuntu 16.04 and get this issue only in my macOS. I removed the entire $GOPATH/src, and performed go get again. This issue is still there.

$ gx-go rw
$ go test -v
...
=== RUN   TestSparseGossipsub
--- PASS: TestSparseGossipsub (3.00s)
=== RUN   TestDenseGossipsub
--- FAIL: TestDenseGossipsub (0.91s)
	floodsub_test.go:54: dial attempt failed: <peer.ID eQ7kYF> --> <peer.ID SwqzNr> dial attempt failed: EOF
=== RUN   TestGossipsubFanout
--- FAIL: TestGossipsubFanout (1.01s)
	floodsub_test.go:54: dial attempt failed: <peer.ID dMgtPZ> --> <peer.ID SCdsq4> dial attempt failed: EOF
=== RUN   TestGossipsubFanoutMaintenance
--- FAIL: TestGossipsubFanoutMaintenance (0.95s)
	floodsub_test.go:54: dial attempt failed: <peer.ID f8X5mz> --> <peer.ID edj6pD> dial attempt failed: EOF
=== RUN   TestGossipsubFanoutExpiry
--- PASS: TestGossipsubFanoutExpiry (4.53s)
=== RUN   TestGossipsubGossip
--- FAIL: TestGossipsubGossip (1.02s)
	floodsub_test.go:54: dial attempt failed: <peer.ID Y2LATb> --> <peer.ID cmZNWD> dial attempt failed: EOF
=== RUN   TestGossipsubGossipPiggyback
--- FAIL: TestGossipsubGossipPiggyback (1.06s)
	floodsub_test.go:54: dial attempt failed: <peer.ID TixfJG> --> <peer.ID QkahvD> dial attempt failed: EOF
...

After debugging for a while, I still don't have any idea about this issue. I will really appreciate if there is any direction to solve it.

Handle multiple connections better.

We can end up with multiple connections to the same peer so we should handle them better. Currently, if two peers connect to each other at the same time, they can end up not communicating.

Slow subscriptions can block the event loop

The event loop dispatches messages to subscriptions in notifySubs, which puts the new message to the subscription's channel.
This channel is buffered for 32 messages, and then the put will block -- which blocks the entire event loop.

Still not sure if this is a bug or a feature, but i don't like slow subscribers blocking the router. Sadly, there is no good solution to the problem, other than dropping messages on overflow just like we do with our output channels.

@Stebalien @whyrusleeping thoughts on the issue?

Throttle validation of incoming messages

When #45 is in, we'll need to find a way to throttle parallel validator executions, so if they take a long time we don't run out of memory.

I plan on doing this by passed them all a buffered chan struct{} that they write to before they are started and read from when they finish. Question is: Should we just block the event loop or should we drop a message when the maximum number of validators is running? I think we already drop messages in a similar situation, so I'd propose going the same way here.

This leads us to the next problem, which is that validators that block very long need to be cancelled because otherwise they'll start piling up until either the event loop blocks or we drop all packets. So, how large should the timeout be? 200ms?

ping @vyzo @whyrusleeping

Impersonating while sending a message

Hello, I have a question. What happens if one (malicious) node starts sending a lot of message and sets the From field with another node's ID (let's call it node_victim)? When node_victim starts to send it's own messages, the messages will get discarded as the func seenMessage will return true on all other nodes. The malicious node also need to know the victim's current sequence but that will be trivial to find (get its last message and increment that sequence number).
Since I have opened this issue I also want to ask if there is a way to make the Message an interface rather than a struct. That would give the code the ability to introduce new features like message signing (and possibly easier way to circumvent the above mentioned problem)

Dialed wrong peer in test

TestGossipsubFanoutMaintenance returned dial attempt failed: <peer.ID aAAbZQ> --> <peer.ID W49xHM> dial attempt failed: connected to wrong peer. That's really weird.

Floodsub - Denial of Service

Given that floodsub isn't widely deployed, I believe it's best to discuss this in the open (more input).

Currently, there are several protocol-level DoS attack vectors against floodsub that we'll need to address before relying on it more widely (note: none of these require botnets or even high-bandwidth internet connections).

Message Silencing

Currently, every message is identified by an (author, seqno) and peers drop all messages with IDs that they've already seen. This means that an attacker could silence messages by sending messages with colliding IDs. Two possible fixes are: (1) having authors sign all messages or (2) identifying messages by a hash of their content.

Traffic Amplification

Currently, an attacker that knows of some well-connected peers subscribed to a target topic could send rapidly send large messages to individual. Each peer will then rebroadcast these messages to all of their peers, thus dramatically amplifying the traffic. Worse, peers can't simply blacklist the attacker because the attacker isn't sending a high volume of traffic to any individual peer. The core problem here is that peers are forwarding traffic from untrusted sources: topic authentication can't be optional.

Traffic Replay

Assuming we had some way to authenticate messages on topics, attackers could still replay old messages to perform the traffic amplification attack by replaying messages older than 30s (the current timeout on our message deduplication cache). That is, they could just store enough old messages such that never have to repeat a message within a 30s window. Unfortunately, simply removing this timeout would lead to memory exhaustion so that's not an option. As far as I can tell, there are a few solutions (not necessarily exclusive):

  1. Provide some way to deduplicate messages on a topic using a constant-sized data structure. For example, we could give all messages a total order and store the latest message.
  2. Limit the topic to a trusted set of peers. That is, on any given topic, only forward messages received directly from a peer (not forwarded) in the set of peers allowed to broadcast on that topic.
  3. Make message expire after 30s (the deduplication timeout).

Design doc for floodsub?

There are threads discussing about this repo in discuss.ipfs.io, and feel a little bit confuse so far. Possible to have a design doc in the repo to clarify current design and the future to go?

Thanks!

Error creating star network with Floodsub

Hi,

I am trying to create a star network with floodsub. The code is below.
I create 3 nodes, A, B, C(Actually I want 6/7, but it fails at three)
I started A, and then B by giving seed address of A, and started C by ging seed address of A.
The 2 nodes work fine A<-->B, but as soon as I start C, every node stops receiving subscription topic messages.

Is there a particular way to start the network?

Best Regards




package main

import (
	"context"

	"crypto/rand"
	"encoding/json"
	"flag"
	"fmt"
	"strconv"
	"io"
	"io/ioutil"
	"log"
	mrand "math/rand"
	"os"
	"strings"
	"time"
	floodsub "gx/ipfs/QmeRgVyVFj3bpgF9vrLepT3AMA8pU5EN8ykbspTo6kcjoz/go-libp2p-floodsub"
	libp2p "github.com/libp2p/go-libp2p"
	ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
	host "gx/ipfs/QmaSfSMvc1VPZ8JbMponFs4WHvF9FgEruF56opm5E1RgQA/go-libp2p-host"
	peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
	pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore"
	crypto "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto"

)

var myipfsID string
var MainNode=false

type LogWriter struct{ io.Writer }
func (w *LogWriter) EnableLogs()  { w.Writer = os.Stdout }
func (w *LogWriter) DisableLogs() { w.Writer = ioutil.Discard }

//For  DAG network
type DagNet struct {
	ps    *floodsub.PubSub
	host host.Host
	ctx context.Context
	subs  []*floodsub.Subscription
}


var TheDagNet DagNet

func makeDAGBlockHost(floodPort int, secio bool, randseed int64) (host.Host, error) {
	file, err := os.Open("testconfig.json")
	var priv crypto.PrivKey

	loadNew := true
	if err != nil {
		loadNew = true
		fmt.Println("Creating New Keys")
	}else{
		loadNew = false
		type Configuration struct {
	    fh  string
			bh  string
			key crypto.PrivKey
		}
		var configuration Configuration
		decoder := json.NewDecoder(file)
		err = decoder.Decode(&configuration)
		if err != nil {
			loadNew = true
			fmt.Println("Creating New Keys")
			//os.Exit(1)
		}
		priv = (configuration.key)


	}

	if loadNew{
		var r io.Reader
		if randseed == 0 {
			r = rand.Reader
		} else {
			r = mrand.New(mrand.NewSource(randseed))
		}

		// Generate a key pair for this host. We will use it
		// to obtain a valid host ID.
		priv, _, err = crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
		if err != nil {
			return nil, err
		}
	}


	// If the seed is zero, use real cryptofloodic randomness. Otherwise, use a
	// deterministic randomness source to make generated keys stay the same
	// across multiple runs
	opts := []libp2p.Option{
		libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", floodPort)),
		libp2p.Identity(priv),
	}

	if !secio {
		opts = append(opts, libp2p.NoEncryption())
	}

	/******Graph Host*********/
	floodHost, err := libp2p.New(context.Background(), opts...)

	if err != nil {
		return  nil, err
	}

	// Build floodhost multiaddress
	ghostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", floodHost.ID().Pretty() ))
	myipfsID = floodHost.ID().Pretty()
	gaddr := floodHost.Addrs()[0]
	gfullAddr := gaddr.Encapsulate(ghostAddr)
	////////////////////////////////////////////////////////////////




	if loadNew{
		privb,_ :=  priv.Bytes()
		mapD := map[string]string{"d": gfullAddr.String(),"key": string(privb) }
		jtadd, _ := json.Marshal(mapD)
		//fmt.Println("::>" , string(jtadd) )
		err = ioutil.WriteFile("testconfig.json", jtadd, 0644)
	}



	log.Printf("I am %s \n", myipfsID)

	log.Println("Now run go run floodtest.go -g ",floodPort+1,"  -d  " ,  gfullAddr )

	///////////////////
	//os.Exit(1)
	return floodHost, nil
}

var GIndex = 1

//Beacon for current Gindex
func Beacon() {
	//instant := true
	for{
		go func(){
			nxts := time.Now().Unix()
			blocktime := nxts%5
			if blocktime >= 0 {
				GIndex = GIndex + 1
				signal := []string{strconv.Itoa(int(nxts)), strconv.Itoa(GIndex), myipfsID }
				beam, _ := json.Marshal(signal)
				TheDagNet.ps.Publish("0", beam)
				//fmt.Println("Beamed "+strconv.Itoa(GIndex))
			}
		}();
		time.Sleep(1*time.Second)
	}


}

func main(){
		

		// Parse options from the command line
		listenG := flag.Int("g", 0, "wait for incoming graph layer connections")
		target := flag.String("d", "", "target peer to dial, comes with both graph and block dialins")
		secio := flag.Bool("secio", false, "enable secio")
		seed := flag.Int64("seed", 0, "set random seed for id generation")
		flag.Parse()

		if  *listenG == 0 {
			//log.Fatal("Please provide a port to bind for Block layer and Graph layer on with -b <port> -g <port>")
			fmt.Println("Using Default ports of 6991 and 7991 for DAG and Blocklayer")

			*listenG = 7991
		}

		gh,  err := makeDAGBlockHost(*listenG, *secio, *seed) //graph host and
		if err != nil {
			log.Fatal(err.Error())
		}



		if *target == "" {
			MainNode = true

			fmt.Println("listening for connections")

			ctx, cancel := context.WithCancel(context.Background())
			defer cancel()

			ps, err := floodsub.NewFloodSub(ctx, gh) //pubsub
			if err != nil {
				return
			}


			tsubs, _ := ps.Subscribe("0")
			TheDagNet = DagNet{ps: ps, host: gh,ctx:ctx, subs: []*floodsub.Subscription{tsubs}}


			}else{
				ctx, cancel := context.WithCancel(context.Background())
				defer cancel()

				adrs := strings.Split(*target, ",")
				targg := adrs[0]

				gipfsaddr, err := ma.NewMultiaddr(targg)
				if err != nil {
					log.Fatalln(err.Error())
				}

				gpid, err := gipfsaddr.ValueForProtocol(ma.P_IPFS)
				if err != nil {
					log.Fatalln(err.Error())
				}

				gpeerid, err := peer.IDB58Decode(gpid)
				if err != nil {
					log.Fatalln(err.Error())
				}

				// Decapsulate the /ipfs/<peerID> part from the target
				// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
				gtargetPeerAddr, _ := ma.NewMultiaddr(
					fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(gpeerid)))
				gtargetAddr := gipfsaddr.Decapsulate(gtargetPeerAddr)

				// We have a peer ID and a targetAddr so we add it to the peerstore
				// so LibP2P knows how to contact it
				gh.Peerstore().AddAddr(gpeerid, gtargetAddr, pstore.PermanentAddrTTL)

				//nh,err := pstore.InfoFromP2pAddr(gtargetPeerAddr)
				pi := pstore.PeerInfo{gpeerid, []ma.Multiaddr{gtargetAddr} }
				//gh.Peerstore().AddAddr(pi.ID, gtargetPeerAddr, pstore.PermanentAddrTTL)
				//fmt.Println("::>",gpeerid,nh.ID, "|", gtargetAddr, "|", gtargetPeerAddr)
				ps, err := floodsub.NewFloodSub(ctx, gh) //pubsub
				gerr := gh.Connect(ctx, pi )
				if gerr != nil {
					log.Fatalln(gerr)
					panic(gerr)
				}
				fmt.Println("opening flood stream" )




				if err != nil {
					return
				}
				//TheDagNet = DagNet{ps: ps, host: gh, ctx:ctx}
				tsubs, _ := ps.Subscribe("0")
				TheDagNet = DagNet{ps: ps, host: gh,ctx:ctx, subs: []*floodsub.Subscription{tsubs}}

			}

			go Beacon()
			go sync()
			select {} // hang forever
}




//Synchronize Blockchain for current SuperBlock
func sync() bool{
	fmt.Println("SYC0.0:")
	//var seed int
	//conf := 0
	for {
		blch,_ := TheDagNet.ps.Subscribe("0") //[0] //The Channel for Blockchain layer
		ctx5, cancel := context.WithTimeout(context.Background(), 3*time.Second)
		defer cancel()
		go func () {
				select {
				case <-time.After(10 * time.Second):
						fmt.Println("overslept")
				case <-ctx5.Done():
						//os.Exit(2)
						fmt.Println("Trying again to find state") // prints "context deadline exceeded"
						//conf = 0
						cancel()
						//ctx5.Close()
						return
				}
		}()
		btxbin, _ := blch.Next(TheDagNet.ctx)
		if btxbin.GetFrom().Pretty() == myipfsID{
			fmt.Println("Self Echo ")
			continue
		}
		fmt.Println("OK ")
		beam := (btxbin.GetData())

		light := []string{}
		err := json.Unmarshal(beam, &light)
		if err != nil{
			continue
		}
		sender := light[2]
		sindex := light[1]
		ts := light[0]





			fmt.Println("Got First Quanta",sender,sindex,ts,"@me=",myipfsID)
			time.Sleep(500 *time.Millisecond)
			continue


	}
	fmt.Println("No Light Yet")
	return false
}



//End

Messages are not received if len(message) >1048438 bytes

Hello

I have found a new issue when receiving a message larger then 1048438 bytes (almost 1MB). The message gets trimmed and the signature will not appear valid. The issue comes from line 29 in comm.go where there is a delimiter of 2^20 bytes (1MB message size). Is this delimiter really necessarily? The underlying host object should take care of message fragmentation/flood protection/muxing and others.

Thanks.

Possible deadlock

When creating a small cluster of PubSub's around a topic in some test code I wrote, I found I needed to add a little time.Sleep just after the calls to Subscribe().

Subscribe() returns right away with a subscription and a nil error, however, if I don't sleep a bit, calling subscription.Next() blocked forever. Could this be a deadlock issue?

subscribers get their own messages if they publish

I've noticed that if a pubsub is subscribed to a topic, and it also publishes on that topic, that it will receive it's own message. Is this intended behavior?

If so, I'd be interested to understand the thinking. In my usage so far, I have had to explicitly tag publications, so I can ignore messages from my own node. It's not a huge burden, but it did lead to some unexpected issues.

[Question] Multiaddr of publisher available to subscriber

Hey!

Is multiaddr of the publisher available to the subscriber? What I can understand from the source is that Subscription.Next can get us peer.ID and not multiaddr.

User case: If pubsub is used for peer discovery and subscriber whats to directly connect with the publisher.

Thanks

Message Aggregation

A way to aggregate multiple messages into one/fewer may be a nice option for apps operating on CRDTs or other data types which are nice to 'merge'. It could enable massive savings in terms of network bandwidth and cpu processing for certain use cases.

One particular use case I have in mind is voice-over-pubsub app, where an Aggregator would mix multiple sound channels into one (sort of audio-crdt), reducing network/cpu load for participants of big group calls.

I'm not sure about implementation details, one problem is probably a trade-off between efficiency and latency (e.g for voice apps we ideally don't want to increase latency more than 1-whatever-length-audio-chunk), so the optimal interface would let the aggregator control that well.

type Aggregator interface {
    Handle(<-chan Message) <-chan Message
}

Aggregator isn't the best name, and the interface optimally wouldn't require it to run in a separate goroutine to use. It would probably plug next to Validator stuff (and could be used to implement it)

Possible false positive ignore message

I've just found in js-libp2p-floodsub that messages might be ignored when they shouldn't be.

We're doing something similar to:

go-libp2p-pubsub/pubsub.go

Lines 460 to 462 in 107d351

func msgID(pmsg *pb.Message) string {
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
}

...but in JS, converting the seqno to utf8 string can sometimes result in the same string for different buffers. The following are two different seqno's that js-ipfs was sent from go-ipfs that demonstrates the problem:

$ node
> const buf2 = Buffer.from('15603533e990dfe0', 'hex')
> const buf1 = Buffer.from('15603533e990dfde', 'hex')
> buf1.toString('utf8')
'\u0015`53���'
> buf2.toString('utf8')
'\u0015`53���'
> buf1.toString('utf8') === buf2.toString('utf8')
true
> 

So sometimes we think we've seen the message when we haven't! 🤦‍♂️

I'm flagging this here incase this could be a problem in Go as well...please feel free to close if you can determine it's not a problem :)

FYI for the PR to js-libp2p-floodsub I'll probably convert the seqno buffer to a hex encoded string instead.

refs libp2p/js-libp2p-floodsub#59

Isn't FloodSub is a leaky abstraction?

As a developer using libp2p, I understand that libp2p provides a publish-subscribe message passing system. I shouldn't need to know or care how it is implemented (flooding). However, by using the word "flood" in the name of objects and imports, an implementation detail about the use of flooding is being leaked through to me.

It would be preferable to remove the word flood in interfaces exposed to higher level code.

Thoughts?

delay for heartbeat in tests is inconvenient

In tests that use gossipsub, we have to sleep for a bit to wait for the heartbeats to set everything up properly. This is pretty inconvenient, as adding two seconds of wait time to every single test gets pretty expensive.

Is there a way to force this to happen immediately?

cc @vyzo

Gossipsub membership implementation

The Gossipsub spec contains a membership management protocol which doesn't seem to be implemented here. Is this still on the roadmap? If not, what is the alternative? Also, is the spec itself final already or is it work in progress?

Peers not always joining a topic and/or connecting

We've been running into some hickups in OrbitDB recently as we've been upgrading from go-ipfs 0.4.13 to 0.4.16 (and now 0.4.17).

We've observed a behaviour where pubsub peers don't always connect to each other. Detailed information on the behaviour and what we've discovered so far in orbitdb/orbitdb#417. To summarize: the tests are run with two local go-ipfs nodes, both subscribe to the same pubsub topic, but sometimes (about 50% of test runs) they don't seem to connect to each other on pubsub level even though they're connected on swarm level (see this comment orbitdb/orbitdb#417 (comment)).

Looking at the commit log in this repo, I noticed that the pubsub implementation was changed (yay, great to see the gossip pubsub coming together!) some time ago, so most likely this is regression. I believe 0.4.13 was working fine, but we haven't verified which version after that doesn't work anymore.

If anyone has an idea why this is happening and work on a fix, would highly appreciate it. We're (finally) adding back the support for go-ipfs in OrbitDB, but the tests are not running reliably at all times due to this bug, so we're hesitant to make a new release until this is fixed (alternatively we could instruct user to use =< 0.4.13 but that's no fun).

If there's any information I can provide to solve this, please let me know!

Add connection manager tagging

We need to start tagging connections that are useful to us through the connection manager

For example, in the DHT, we tag any peer that goes into our routing table with a value of 5.
We should likely tag peers that are in pubsub topics that we are subscribed to, or that we publish to.

Can't remove a topic validator

Hi !
I might be missing something, but I cannot find a way to unregister a topic validator.
Indeed, in my case I have multiple services that use the floodsub layer. A service subscribes to a topic and registers a validator for that topic. When stopping the service, I can call Cancel() on the Subscribtion, but there is no way to remove the topic validator from Pubsub.topicVals.

Maybe I am doing this wrong, but it feels necessary to have one of these:

  • unregister the topic validator when canceling the subscription (not good IMO as you might want to set the validator once only at launch and then register/unregister as much as you want)
  • expose a method to clean the validator for a given topic.

PubSub sometimes failing

C# test and JS test are intermittently failing. Both are publishing multiple messages and then counting the subscribed messages.

It appears that some published messages are not being relayed to the subscribed channels. In the C# test both publisher and subscriber are on the same peer.

NOTE: these tests are running on Windows. This was first detected in ipfs-inactive/js-ipfs-http-client#648

Floodsub Receiving only its own messages

Hi,

I am refactoring general p2p package to floodsub, but only see the node's own messages being received. Am I missing something in floodsub setup? Here is the attached code:
Command: go run flood2test.go -g 4330 -secio false

TIA

package main

import (

	"context"
	"crypto/rand"
	"flag"
	"fmt"
	"io"
	//"io/ioutil"
	"log"
	mrand "math/rand"
	//"strings"
	"time"

	floodsub "gx/ipfs/QmeRgVyVFj3bpgF9vrLepT3AMA8pU5EN8ykbspTo6kcjoz/go-libp2p-floodsub"

	libp2p "github.com/libp2p/go-libp2p"
	ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
	//net "gx/ipfs/QmXoz9o2PT3tEzf7hicegwex5UgVP54n3k82K7jrWFyN86/go-libp2p-net"
	host "gx/ipfs/QmaSfSMvc1VPZ8JbMponFs4WHvF9FgEruF56opm5E1RgQA/go-libp2p-host"

	peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
	pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore"
	crypto "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto"
	uuid "gx/ipfs/QmcBWojPoNh4qm7zvv4qiepvCnnc7ALS9qcp7TNwwxT1gT/go.uuid"
)


var GID = uuid.Must(uuid.NewV4()).String() //Unique node id for alpha



type DagNet struct {
	ps    *floodsub.PubSub
	host host.Host
	ctx context.Context
}
var Floodnet DagNet

func makeDAGBlockHost(floodPort int, secio bool, randseed int64) (host.Host, error) {

	// If the seed is zero, use real cryptofloodic randomness. Otherwise, use a
	// deterministic randomness source to make generated keys stay the same
	// across multiple runs
	var r io.Reader
	if randseed == 0 {
		r = rand.Reader
	} else {
		r = mrand.New(mrand.NewSource(randseed))
	}

	// Generate a key pair for this host. We will use it
	// to obtain a valid host ID.
	priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
	if err != nil {
		return nil, err
	}

	opts := []libp2p.Option{
		libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", floodPort)),
		libp2p.Identity(priv),
	}

	if !secio {
		opts = append(opts, libp2p.NoEncryption())
	}

	/******Graph Host*********/
	floodHost, err := libp2p.New(context.Background(), opts...)

	if err != nil {
		return  nil, err
	}

	// Build floodhost multiaddress
	ghostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", floodHost.ID().Pretty()))

	// Now we can build a full multiaddress to reach this host
	// by encapsulating both addresses:
	gaddr := floodHost.Addrs()[0]
	gfullAddr := gaddr.Encapsulate(ghostAddr)
	////////////////////////////////////////////////////////////////

	/*************************/
	log.Printf("I am %s flood & %s block Node\n", gfullAddr)
	if secio {
		log.Printf("Now run \"go run flood2test.go -g %d  -d %s  -secio false \" on a different terminal\n", floodPort+1,  gfullAddr)
	} else {
		log.Printf("Now run \"go run flood2test.go -g %d  -d %s  \" on a different terminal\n", floodPort+1, gfullAddr)
	}

	///////////////////

	return floodHost, nil
}




func main() {
	//log.DisableLogs()
	mrand.Seed(int64(time.Second))
	//t := time.Now()

	// Parse options from the command line
	listenG := flag.Int("g", 0, "wait for incoming flood layer connections")
	target := flag.String("d", "", "target peer to dial, comes with both flood and block dialins")
	secio := flag.Bool("secio", false, "enable secio")
	seed := flag.Int64("seed", 0, "set random seed for id generation")

	flag.Parse()

	if  *listenG == 0 {
		log.Fatal("Please provide a port to bind for -g <port>")
	}

	gh, err := makeDAGBlockHost(*listenG, *secio, *seed)
	if err != nil {
		log.Fatal(err)
	}

	if *target == "" {
		log.Println("listening for connections")

		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()

		ps, err := floodsub.NewFloodSub(ctx, gh) //pubsub
		if err != nil {
			return
		}
		Floodnet = DagNet{ps: ps, host: gh,ctx:ctx}

		for{
				ch, err := Floodnet.ps.Subscribe("floodtest")
				if err != nil {
					log.Fatalln("FOps:",err)
				}

				time.Sleep(time.Millisecond * 10)

				msg := []byte("floodtest: "+GID)
				err = Floodnet.ps.Publish("floodtest", msg)
				if err != nil {
					log.Fatalln("T696",err)
				}





				msg2 := []byte("floodtest: "+GID)
				err = Floodnet.ps.Publish("floodtest", msg2)
				if err != nil {
					log.Fatalln("T706",err)
				}

				testbin, _ := ch.Next(Floodnet.ctx)
				fmt.Println("Test1: ",string(testbin.GetData()) )
				testbin, _ = ch.Next(Floodnet.ctx)
				fmt.Println("Test2: ",string(testbin.GetData()) )
				time.Sleep(5 * 1000*1000*1000)
			}

		}else{
				ctx, cancel := context.WithCancel(context.Background())
				defer cancel()


				gipfsaddr, err := ma.NewMultiaddr(*target)
				if err != nil {
					log.Fatalln(err)
				}

				gpid, err := gipfsaddr.ValueForProtocol(ma.P_IPFS)
				if err != nil {
					log.Fatalln(err)
				}

				gpeerid, err := peer.IDB58Decode(gpid)
				if err != nil {
					log.Fatalln(err)
				}

				// Decapsulate the /ipfs/<peerID> part from the target
				// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
				gtargetPeerAddr, _ := ma.NewMultiaddr(
					fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(gpeerid)))
				gtargetAddr := gipfsaddr.Decapsulate(gtargetPeerAddr)

				// We have a peer ID and a targetAddr so we add it to the peerstore
				// so LibP2P knows how to contact it
				gh.Peerstore().AddAddr(gpeerid, gtargetAddr, pstore.PermanentAddrTTL)


				nh,err := pstore.InfoFromP2pAddr(gtargetPeerAddr)
				fmt.Println("::>",*nh)
				gerr := gh.Connect(ctx, *nh )
				if gerr != nil {
					log.Fatalln(gerr)
				}
				log.Println("opening flood stream")


				/*
				/////test///
				ps, err := floodsub.NewFloodSub(ctx, gh) //pubsub
				s, err := gh.NewStream(context.Background(), gpeerid, "/p2p/1.0.0")
				if err != nil {
					log.Fatalln("T238",err)
				}

				_, err = s.Write([]byte("Hello, world!\n"))
				if err != nil {
					log.Fatalln(err)
				}

				out, err := ioutil.ReadAll(s)
				if err != nil {
					log.Fatalln(err)
				}

				log.Printf("read reply: %q\n", out)
				*/
				//////////////

				// make a new stream from host B to host A
				// it should be handled on host A by the handler we set above because
				// we use the same /p2p/1.0.0 protocol




				if err != nil {
					return
				}
				Floodnet = DagNet{ps: ps, host: gh,ctx:ctx}
				//Start the flood miner
				for{
						ch, err := Floodnet.ps.Subscribe("floodtest")
						if err != nil {
							log.Fatalln("FOps:",err)
						}

						time.Sleep(time.Millisecond * 10)

						msg := []byte("floodtest: "+GID)
						err = Floodnet.ps.Publish("floodtest", msg)
						if err != nil {
							log.Fatalln("T696",err)
						}





						msg2 := []byte("floodtest: "+GID)
						err = Floodnet.ps.Publish("floodtest", msg2)
						if err != nil {
							log.Fatalln("T706",err)
						}

						testbin, _ := ch.Next(Floodnet.ctx)
						fmt.Println("Test1: ",string(testbin.GetData()) )
						testbin, _ = ch.Next(Floodnet.ctx)
						fmt.Println("Test2: ",string(testbin.GetData()) )
						time.Sleep(5 * 1000*1000*1000)
					}


				select {} // hang forever

			}
		}









//End

if pubsub route loses remote peer connection information?

Version of go-ipfs

$ ipfs version --all
go-ipfs version: 0.4.18-
Repo version: 7
System version: amd64/linux
Golang version: go1.11.1

What I do

Peer A subscribes to a topic 'test_topic', and then peer B publish data to peer A to 'test_topic'. At the beginning, peer A can receive the data published by B, but after about 8 hours, peer B would publish data to peer A. At this time, A would not receive the data of peer B.

(Note: the middle 8 hours peer B did not publish any data to peer A and I executed ipfs swarm peers on peer B to see the connection of peer A. I executed ipfs pubsub peers on peer B without seeing any output.).

Then, I am in the peer B execution ipfs swarm disconnect <peer A>, ipfs swarm connect <peer A>, at this time peer B re-publishes data to peer A, peer A can receive it. And I execute ipfs pubsub peers on peer B can see the id of peer A.

Why?

I suspect that after the peer connection, when writing data to the peer, other non-network errors occurred, causing the peer to be removed from the pubsub route. But in the swap peers (dht route?) there is always a connection to the peer, so there is no new connection to the peer to cause the pubsub route to register the peer.

Reference source code:

Naming & modularization

To avoid future confusion such as -- #67 (comment) --. I want to propose that we adopt the following:

  • go-pubsub - The pubsub API and the thing that knows how to take multiple pubsub implementation
  • go-floodsub - The floodsub implementation
  • go-gossipsub - The gossipsub implementation

This will make it very clear that libp2p/IPFS support multiple pubsub implementations and that we encourage everyone to try new ones.

Per topic validation

In #55, validation is per-subscription. Unfortunately, this means:

  1. If we have multiple subscribers subscribing with the same validator, we'll run it multiple times (e.g., check a signature multiple times). This is really inefficient.
  2. If we have multiple subscribers subscribing with different validators, these subscriptions will affect each other. That's not what a user will suspect.

Really, validation is a property of the topic, not the subscription. Eventually, I'd like to be able to tell pubsub to relay a topic without explicitly subscribing (#28).


In terms of implementation, we have a few choices:

  1. Require that all topics be namespaced and use the namespace to pick the validator.
  2. Allow validators to decide whether or not they care about a topic.

2 is probably the easiest but not the cleanest. Possible interface:

// Register a validator with the pubsub.
func (p *PubSub) RegisterValidator(v Validator)

type Validator interface {
    // Apply this validator to this topic.
    Validates(topic string) bool // or `ValidatorFor(topic string) TopicValidator`?
    // Apply this validator to this topic by descriptor. Do we even need topic descriptors anymore? We don't use them... Having both of these is annoying...
    ValidatesTopicDescriptor(topic *pb.TopicDescriptor) bool
    Validate(msg) bool
}

Maximum number of nodes in a gossipsub network.

For the latest Ethereum sharding + Casper spec, the theoretical maximum of validators (assuming a supply cap of 128m ETH) is 4m. This conflicts with the target size of 10,000 in https://github.com/libp2p/specs/tree/master/pubsub/gossipsub#membership-management-protocol. 10,000 won't even be OK initially, since there are 16,825 nodes as I write this. Dynamic sizing is needed support up to 4,000,000 (or more) will be needed eventually, and 20,000 will be needed initially.

Two models have been proposed for the network layer: one network for each shard, or a torus shaped network, where a node mostly communicates with nodes in the same shard, while propagation of messages between shards are limited to a certain intra-shard distance, and to certain types of messages such as new collations, e.g. a node in shard 2018 (there could be up to 4096 shards in total) can relay new collations to notaries between shards 2013-2023, while these notaries may be shuffled to shard 2018 soon (while possibly notaries in one shard may be shuffled to a shard up to n, say 5, shards away in the next period).

I'll cross-post this on the above ethresear.ch thread and in issue #77.

Assuming N = 4,000,000 then C_rand = 7.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.