libp2p / go-libp2p-pubsub Goto Github PK
View Code? Open in Web Editor NEWThe PubSub implementation for go-libp2p
Home Page: https://github.com/libp2p/specs/tree/master/pubsub
License: Other
The PubSub implementation for go-libp2p
Home Page: https://github.com/libp2p/specs/tree/master/pubsub
License: Other
The latest iteration is: https://github.com/libp2p/pubsub-notes/blob/master/flooding/flooding.proto, while https://github.com/libp2p/go-floodsub/blob/master/pb/rpc.proto is the first commit of that protobuf.
@whyrusleeping could you update
Must be some sort of race condition, but this test hangs for me maybe one in ten times
Is it possible to reshape signing messages on pubsub as to turn signing on or off on topics rather than entire pubsub?
The reason behind this question is that there might be topics that do not need signing and the process of signing/validating induce unwanted overhead. As a practical example, one might implement a topic for broadcasting blockchain transactions that are known that are tampered proof and the originator peer node simply doesn't matter.
Of course there is a complication to this as the function msgID
needs to compute the message ID in a different way. (may be a hash string of the payload when the message is not signed?)
I have a test failing in my code (below) and I think it shouldn't. Perhaps I'm missing something though - was wondering if this is a bug, or I have incorrect assumptions somewhere:
func TestPubKey(t *testing.T) {
priv, _, _ := crypto.GenerateEd25519Key(mrand.New(mrand.NewSource(0)))
id1, _ := peer.IDFromEd25519PublicKey(priv.GetPublic())
id2, _ := peer.IDFromPrivateKey(priv)
assert.Equal(t, id1.Pretty(), id2.Pretty(), "should be equal?") // fails...
}
p.s. I found that IDFromPublicKey
works instead of IDFromEd25519PublicKey
but the difference still seems strange...
Our validation pipeline is asynchronous: it spawns a goroutine for each incoming message, which may then get throttled based on topic and global limit throttles.
This creates a performance problem for message signature validation (#97) as we now have to enter the validation pipeline for every message when using signing (planned to become the default in ipfs and filecoin).
Proposed action:
Hello
I have the following use case:
peer A connects to peer B. Peer B do not wishes to retain the connection to peer A and closes the connection (calling net.Conn.Close() on peer A newly connection). After a while, peer A tries to connect again to peer B and the following message appear on peer B:
09:54:07.886 ERROR pubsub: already have connection to peer: <peer.ID 16*E91Gqs> pubsub.go:205
Searching through the source code, type PubSubNotif
only handles the Connected
event. In my above-mentioned use case I have used floodsub as a mean to disseminate data. FloodSub uses PubSub which has a channel called peerDead which is never used.
I have also tried using GossipSub with the same error being displayed.
Am I missing something? Big thanks!
Instead of infinitely buffering by using go routines, we should disconnect slow peers (peers that develop a large outbound queue).
Hi,
a user of cluster reports a panic coming from pubsub on a BeagleBone:
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x4 pc=0x11540]
goroutine 46 [running]:
sync/atomic.addUint64(0x131c005c, 0x1, 0x0, 0x131c9150, 0x8)
/usr/lib64/go/1.9/src/sync/atomic/64bit_arm.go:31 +0x4c
gx/ipfs/QmduQtUFqdq2RhM84yM2mYsdgJRAH8Aukb15viDxWkZtvP/go-libp2p-floodsub.(*PubSub).Publish(0x131c0000, 0x716b54, 0xf, 0x13124630, 0x5d, 0xa2, 0x0, 0x1e)
gx/ipfs/QmduQtUFqdq2RhM84yM2mYsdgJRAH8Aukb15viDxWkZtvP/go-libp2p-floodsub/pubsub.go:556 +0x64
github.com/ipfs/ipfs-cluster/monitor/pubsubmon.(*Monitor).PublishMetric(0x12d5caa0, 0x70dc03, 0x4, 0x12d16c60, 0x22, 0x0, 0x0, 0xa5f1d075, 0x1531f22e, 0x1, ...)
github.com/ipfs/ipfs-cluster/monitor/pubsubmon/pubsubmon.go:193 +0x39c
github.com/ipfs/ipfs-cluster.(*Cluster).pushPingMetrics(0x12d64320)
github.com/ipfs/ipfs-cluster/cluster.go:232 +0xc0
created by github.com/ipfs/ipfs-cluster.(*Cluster).run
github.com/ipfs/ipfs-cluster/cluster.go:340 +0x48
Might this come from the warning here: https://golang.org/pkg/sync/atomic/#pkg-note-BUG ? @vyzo
I'll be testing with my own RPis next week, maybe I can provide more info.
Sometimes, messages get lost. To allow the receiver to recover lost messages, we could start systematically referencing previous messages when constructing them. That way, when the receiver receives the next message, it will find an unknown message reference and can ask its peers about it (or possibly fetch it like a regular ipfs/ipld object?)
The system I propose is the system is the system used at OrbitDB. Every peer maintains a list of messages that have not been referenced yet, DAG leafs so to speak. Once a new message is received, all message IDs referenced by the new message are removed from the list, and the new message's ID is added.
Once we author a message, we add a field "after": [...]
, containing the leaf list. Afterwards our leaf list contains only the ID of the message we just authored.
If we have something like the above, nodes already keep state about the "newest" messages. If we send these to peers sending us a subscription request for that topic, they can start fetching old messages. How many this would be depends on the application - in orbit it would suffice to fetch one screen worth of backlog, but in other application you may want to fetch the entire history.
This solves the problem described in #19.
Note: I want to make this optional. Whether or not this is used depends on the topic configuration.
protoc --rust_out . rpc.proto [libprotobuf WARNING google/protobuf/compiler/parser.cc:546] No syntax specified for the proto file: rpc.proto. Please use 'syntax = "proto2";' or 'syntax = "proto3";' to specify a syntax version. (Defaulted to proto2 syntax.)
It's called just floodsub
at the moment
I've been exploring the recently added Validator
functionality, which is very nice - however there is one (probably unintended) aspect of it that I found interesting. If I need to validate the contents of a message - say it's protobuf encoded - then I need to parse it once when validating, and potentially another time, after actually receiving the ([]byte
) data through subscription.Next()
. I don't think it's a major issue, and there might not be a clean way to solve it.
This will be a bit of a pain due to how imports work in go but we should do it eventually.
I propose adding something like associated data. It is kept up-to-date using application-specific logic and sent to peers that subscribe. This could be a string that describes what the topic is about or anything else. Basically an interface{}
value.
In Orbit we face the problem that a peer kind of sits in the dark after subscribing a topic. It doesn't have access to history until someone who has witnessed previous messages publishes a new message. That message will contain references to one or more most recently seen messages. The new peer can follow these messages and reconstruct history. However, it needs someone to post something to do that.
Instead it would be nice to get that information at subscribe time. That is why I would like to see an AD field that can be set by the application that subscribes to this.
I talked to @haadcode about this and he likes the idea.
Not sure. I don't think I really got the auth part yet. Is there a hard-ish spec somewhere for encrypted/auth'd topics?
Anyway, I suspect if we do things the right way, the worst a byzantine node could do is to withhold updates to the AD field, which I would rate equal to not forwarding messages, which can still happen. So it shouldn't be a big problem.
Sorry in advance for bringing this as an issue, as it's probably a better candidate for a wiki. I have a question: Is it intended that the floodsub
and gossipsub
protocols should be compatible? Ie. if you develop an application that uses one, and you substitute the other for half the nodes - will the network still operate? I understand that there are practical considerations about traffic volumes, etc, however my point is to ask whether the messages are compatible, such that a gossiped message sent by a node using floodsub
, and received by a node which uses gossipsub
, will be propagated by the second node, and whether that message can be propagated further. Is this a design goal of the system?
I think it would be really valuable to add an 'answer' type message.
Imagine a pubsub swarm where someone publishes a message asking for a certain value. That message gets propogated to the entire swarm as usual. Now if a peer wants to respond to that question, they essentially have to broadcast their answer to the entire swarm. Instead, i propose they send an 'answer' msesage, referencing the question message, back to whatever peer they received the question message from. The peer they responded to should check if they were the asker, and if not, continue sending the answer back until it reaches its target. Since we already keep a cache of recent messages to prevent cyclic propogation, this shouldnt require too much (if any) extra overhead.
Usecases that this would be great for:
So I'd like to use this library to create a p2p pubsub capability, embedded in an app I'm writing. ATM, the price of addmission is I have to figure out how to
ATM I love the modularity that libp2p provides compared to *ipfs.IpfsNode
, but I'd like to learn how to use it. I'd like an example that shows how to create a *floodsub.PubSub
instance that can publish some []bytes
and interested nodes could join, leave, etc.
My issue here is that floodsub
seems to require a ton of other imports to use, and I cant imbed a *floodsub.PubSub
without also embedding a host, and a PeerStore and maybe a []*floodsub.Subscription
. Doing all this is doubtless, no problem - of only I knew what to do, or had some examples...
I run the command in both macOS High Sierra(10.13.6) and ubuntu 16.04 and get this issue only in my macOS. I removed the entire $GOPATH/src
, and performed go get
again. This issue is still there.
$ gx-go rw
$ go test -v
...
=== RUN TestSparseGossipsub
--- PASS: TestSparseGossipsub (3.00s)
=== RUN TestDenseGossipsub
--- FAIL: TestDenseGossipsub (0.91s)
floodsub_test.go:54: dial attempt failed: <peer.ID eQ7kYF> --> <peer.ID SwqzNr> dial attempt failed: EOF
=== RUN TestGossipsubFanout
--- FAIL: TestGossipsubFanout (1.01s)
floodsub_test.go:54: dial attempt failed: <peer.ID dMgtPZ> --> <peer.ID SCdsq4> dial attempt failed: EOF
=== RUN TestGossipsubFanoutMaintenance
--- FAIL: TestGossipsubFanoutMaintenance (0.95s)
floodsub_test.go:54: dial attempt failed: <peer.ID f8X5mz> --> <peer.ID edj6pD> dial attempt failed: EOF
=== RUN TestGossipsubFanoutExpiry
--- PASS: TestGossipsubFanoutExpiry (4.53s)
=== RUN TestGossipsubGossip
--- FAIL: TestGossipsubGossip (1.02s)
floodsub_test.go:54: dial attempt failed: <peer.ID Y2LATb> --> <peer.ID cmZNWD> dial attempt failed: EOF
=== RUN TestGossipsubGossipPiggyback
--- FAIL: TestGossipsubGossipPiggyback (1.06s)
floodsub_test.go:54: dial attempt failed: <peer.ID TixfJG> --> <peer.ID QkahvD> dial attempt failed: EOF
...
After debugging for a while, I still don't have any idea about this issue. I will really appreciate if there is any direction to solve it.
We can end up with multiple connections to the same peer so we should handle them better. Currently, if two peers connect to each other at the same time, they can end up not communicating.
Probably an issue with stream closing versus resetting.
What is a flooding pubsub? Why is it of interest to libp2p?
The event loop dispatches messages to subscriptions in notifySubs
, which puts the new message to the subscription's channel.
This channel is buffered for 32 messages, and then the put will block -- which blocks the entire event loop.
Still not sure if this is a bug or a feature, but i don't like slow subscribers blocking the router. Sadly, there is no good solution to the problem, other than dropping messages on overflow just like we do with our output channels.
@Stebalien @whyrusleeping thoughts on the issue?
When #45 is in, we'll need to find a way to throttle parallel validator executions, so if they take a long time we don't run out of memory.
I plan on doing this by passed them all a buffered chan struct{}
that they write to before they are started and read from when they finish. Question is: Should we just block the event loop or should we drop a message when the maximum number of validators is running? I think we already drop messages in a similar situation, so I'd propose going the same way here.
This leads us to the next problem, which is that validators that block very long need to be cancelled because otherwise they'll start piling up until either the event loop blocks or we drop all packets. So, how large should the timeout be? 200ms?
ping @vyzo @whyrusleeping
Hello, I have a question. What happens if one (malicious) node starts sending a lot of message and sets the From
field with another node's ID (let's call it node_victim)? When node_victim starts to send it's own messages, the messages will get discarded as the func seenMessage
will return true on all other nodes. The malicious node also need to know the victim's current sequence but that will be trivial to find (get its last message and increment that sequence number).
Since I have opened this issue I also want to ask if there is a way to make the Message an interface rather than a struct. That would give the code the ability to introduce new features like message signing (and possibly easier way to circumvent the above mentioned problem)
TestGossipsubFanoutMaintenance returned dial attempt failed: <peer.ID aAAbZQ> --> <peer.ID W49xHM> dial attempt failed: connected to wrong peer
. That's really weird.
Given that floodsub isn't widely deployed, I believe it's best to discuss this in the open (more input).
Currently, there are several protocol-level DoS attack vectors against floodsub that we'll need to address before relying on it more widely (note: none of these require botnets or even high-bandwidth internet connections).
Currently, every message is identified by an (author, seqno)
and peers drop all messages with IDs that they've already seen. This means that an attacker could silence messages by sending messages with colliding IDs. Two possible fixes are: (1) having authors sign all messages or (2) identifying messages by a hash of their content.
Currently, an attacker that knows of some well-connected peers subscribed to a target topic could send rapidly send large messages to individual. Each peer will then rebroadcast these messages to all of their peers, thus dramatically amplifying the traffic. Worse, peers can't simply blacklist the attacker because the attacker isn't sending a high volume of traffic to any individual peer. The core problem here is that peers are forwarding traffic from untrusted sources: topic authentication can't be optional.
Assuming we had some way to authenticate messages on topics, attackers could still replay old messages to perform the traffic amplification attack by replaying messages older than 30s (the current timeout on our message deduplication cache). That is, they could just store enough old messages such that never have to repeat a message within a 30s window. Unfortunately, simply removing this timeout would lead to memory exhaustion so that's not an option. As far as I can tell, there are a few solutions (not necessarily exclusive):
There are threads discussing about this repo in discuss.ipfs.io, and feel a little bit confuse so far. Possible to have a design doc in the repo to clarify current design and the future to go?
Thanks!
GossipSubRouter.sendRPC
calls gs.piggybackGossip
on a shared RPC object. It should probably copy it first.
Hi,
I am trying to create a star network with floodsub. The code is below.
I create 3 nodes, A, B, C(Actually I want 6/7, but it fails at three)
I started A, and then B by giving seed address of A, and started C by ging seed address of A.
The 2 nodes work fine A<-->B, but as soon as I start C, every node stops receiving subscription topic messages.
Is there a particular way to start the network?
Best Regards
package main
import (
"context"
"crypto/rand"
"encoding/json"
"flag"
"fmt"
"strconv"
"io"
"io/ioutil"
"log"
mrand "math/rand"
"os"
"strings"
"time"
floodsub "gx/ipfs/QmeRgVyVFj3bpgF9vrLepT3AMA8pU5EN8ykbspTo6kcjoz/go-libp2p-floodsub"
libp2p "github.com/libp2p/go-libp2p"
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
host "gx/ipfs/QmaSfSMvc1VPZ8JbMponFs4WHvF9FgEruF56opm5E1RgQA/go-libp2p-host"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore"
crypto "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto"
)
var myipfsID string
var MainNode=false
type LogWriter struct{ io.Writer }
func (w *LogWriter) EnableLogs() { w.Writer = os.Stdout }
func (w *LogWriter) DisableLogs() { w.Writer = ioutil.Discard }
//For DAG network
type DagNet struct {
ps *floodsub.PubSub
host host.Host
ctx context.Context
subs []*floodsub.Subscription
}
var TheDagNet DagNet
func makeDAGBlockHost(floodPort int, secio bool, randseed int64) (host.Host, error) {
file, err := os.Open("testconfig.json")
var priv crypto.PrivKey
loadNew := true
if err != nil {
loadNew = true
fmt.Println("Creating New Keys")
}else{
loadNew = false
type Configuration struct {
fh string
bh string
key crypto.PrivKey
}
var configuration Configuration
decoder := json.NewDecoder(file)
err = decoder.Decode(&configuration)
if err != nil {
loadNew = true
fmt.Println("Creating New Keys")
//os.Exit(1)
}
priv = (configuration.key)
}
if loadNew{
var r io.Reader
if randseed == 0 {
r = rand.Reader
} else {
r = mrand.New(mrand.NewSource(randseed))
}
// Generate a key pair for this host. We will use it
// to obtain a valid host ID.
priv, _, err = crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
if err != nil {
return nil, err
}
}
// If the seed is zero, use real cryptofloodic randomness. Otherwise, use a
// deterministic randomness source to make generated keys stay the same
// across multiple runs
opts := []libp2p.Option{
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", floodPort)),
libp2p.Identity(priv),
}
if !secio {
opts = append(opts, libp2p.NoEncryption())
}
/******Graph Host*********/
floodHost, err := libp2p.New(context.Background(), opts...)
if err != nil {
return nil, err
}
// Build floodhost multiaddress
ghostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", floodHost.ID().Pretty() ))
myipfsID = floodHost.ID().Pretty()
gaddr := floodHost.Addrs()[0]
gfullAddr := gaddr.Encapsulate(ghostAddr)
////////////////////////////////////////////////////////////////
if loadNew{
privb,_ := priv.Bytes()
mapD := map[string]string{"d": gfullAddr.String(),"key": string(privb) }
jtadd, _ := json.Marshal(mapD)
//fmt.Println("::>" , string(jtadd) )
err = ioutil.WriteFile("testconfig.json", jtadd, 0644)
}
log.Printf("I am %s \n", myipfsID)
log.Println("Now run go run floodtest.go -g ",floodPort+1," -d " , gfullAddr )
///////////////////
//os.Exit(1)
return floodHost, nil
}
var GIndex = 1
//Beacon for current Gindex
func Beacon() {
//instant := true
for{
go func(){
nxts := time.Now().Unix()
blocktime := nxts%5
if blocktime >= 0 {
GIndex = GIndex + 1
signal := []string{strconv.Itoa(int(nxts)), strconv.Itoa(GIndex), myipfsID }
beam, _ := json.Marshal(signal)
TheDagNet.ps.Publish("0", beam)
//fmt.Println("Beamed "+strconv.Itoa(GIndex))
}
}();
time.Sleep(1*time.Second)
}
}
func main(){
// Parse options from the command line
listenG := flag.Int("g", 0, "wait for incoming graph layer connections")
target := flag.String("d", "", "target peer to dial, comes with both graph and block dialins")
secio := flag.Bool("secio", false, "enable secio")
seed := flag.Int64("seed", 0, "set random seed for id generation")
flag.Parse()
if *listenG == 0 {
//log.Fatal("Please provide a port to bind for Block layer and Graph layer on with -b <port> -g <port>")
fmt.Println("Using Default ports of 6991 and 7991 for DAG and Blocklayer")
*listenG = 7991
}
gh, err := makeDAGBlockHost(*listenG, *secio, *seed) //graph host and
if err != nil {
log.Fatal(err.Error())
}
if *target == "" {
MainNode = true
fmt.Println("listening for connections")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ps, err := floodsub.NewFloodSub(ctx, gh) //pubsub
if err != nil {
return
}
tsubs, _ := ps.Subscribe("0")
TheDagNet = DagNet{ps: ps, host: gh,ctx:ctx, subs: []*floodsub.Subscription{tsubs}}
}else{
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
adrs := strings.Split(*target, ",")
targg := adrs[0]
gipfsaddr, err := ma.NewMultiaddr(targg)
if err != nil {
log.Fatalln(err.Error())
}
gpid, err := gipfsaddr.ValueForProtocol(ma.P_IPFS)
if err != nil {
log.Fatalln(err.Error())
}
gpeerid, err := peer.IDB58Decode(gpid)
if err != nil {
log.Fatalln(err.Error())
}
// Decapsulate the /ipfs/<peerID> part from the target
// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
gtargetPeerAddr, _ := ma.NewMultiaddr(
fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(gpeerid)))
gtargetAddr := gipfsaddr.Decapsulate(gtargetPeerAddr)
// We have a peer ID and a targetAddr so we add it to the peerstore
// so LibP2P knows how to contact it
gh.Peerstore().AddAddr(gpeerid, gtargetAddr, pstore.PermanentAddrTTL)
//nh,err := pstore.InfoFromP2pAddr(gtargetPeerAddr)
pi := pstore.PeerInfo{gpeerid, []ma.Multiaddr{gtargetAddr} }
//gh.Peerstore().AddAddr(pi.ID, gtargetPeerAddr, pstore.PermanentAddrTTL)
//fmt.Println("::>",gpeerid,nh.ID, "|", gtargetAddr, "|", gtargetPeerAddr)
ps, err := floodsub.NewFloodSub(ctx, gh) //pubsub
gerr := gh.Connect(ctx, pi )
if gerr != nil {
log.Fatalln(gerr)
panic(gerr)
}
fmt.Println("opening flood stream" )
if err != nil {
return
}
//TheDagNet = DagNet{ps: ps, host: gh, ctx:ctx}
tsubs, _ := ps.Subscribe("0")
TheDagNet = DagNet{ps: ps, host: gh,ctx:ctx, subs: []*floodsub.Subscription{tsubs}}
}
go Beacon()
go sync()
select {} // hang forever
}
//Synchronize Blockchain for current SuperBlock
func sync() bool{
fmt.Println("SYC0.0:")
//var seed int
//conf := 0
for {
blch,_ := TheDagNet.ps.Subscribe("0") //[0] //The Channel for Blockchain layer
ctx5, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
go func () {
select {
case <-time.After(10 * time.Second):
fmt.Println("overslept")
case <-ctx5.Done():
//os.Exit(2)
fmt.Println("Trying again to find state") // prints "context deadline exceeded"
//conf = 0
cancel()
//ctx5.Close()
return
}
}()
btxbin, _ := blch.Next(TheDagNet.ctx)
if btxbin.GetFrom().Pretty() == myipfsID{
fmt.Println("Self Echo ")
continue
}
fmt.Println("OK ")
beam := (btxbin.GetData())
light := []string{}
err := json.Unmarshal(beam, &light)
if err != nil{
continue
}
sender := light[2]
sindex := light[1]
ts := light[0]
fmt.Println("Got First Quanta",sender,sindex,ts,"@me=",myipfsID)
time.Sleep(500 *time.Millisecond)
continue
}
fmt.Println("No Light Yet")
return false
}
//End
Hello
I have found a new issue when receiving a message larger then 1048438 bytes (almost 1MB). The message gets trimmed and the signature will not appear valid. The issue comes from line 29 in comm.go where there is a delimiter of 2^20 bytes (1MB message size). Is this delimiter really necessarily? The underlying host object should take care of message fragmentation/flood protection/muxing and others.
Thanks.
When creating a small cluster of PubSub's around a topic in some test code I wrote, I found I needed to add a little time.Sleep
just after the calls to Subscribe()
.
Subscribe()
returns right away with a subscription and a nil error, however, if I don't sleep a bit, calling subscription.Next()
blocked forever. Could this be a deadlock issue?
I've noticed that if a pubsub is subscribed to a topic, and it also publishes on that topic, that it will receive it's own message. Is this intended behavior?
If so, I'd be interested to understand the thinking. In my usage so far, I have had to explicitly tag publications, so I can ignore messages from my own node. It's not a huge burden, but it did lead to some unexpected issues.
Hey!
Is multiaddr of the publisher available to the subscriber? What I can understand from the source is that Subscription.Next
can get us peer.ID
and not multiaddr.
User case: If pubsub is used for peer discovery and subscriber whats to directly connect with the publisher.
Thanks
Currently this is a TODO.
A way to aggregate multiple messages into one/fewer may be a nice option for apps operating on CRDTs or other data types which are nice to 'merge'. It could enable massive savings in terms of network bandwidth and cpu processing for certain use cases.
One particular use case I have in mind is voice-over-pubsub app, where an Aggregator would mix multiple sound channels into one (sort of audio-crdt), reducing network/cpu load for participants of big group calls.
I'm not sure about implementation details, one problem is probably a trade-off between efficiency and latency (e.g for voice apps we ideally don't want to increase latency more than 1-whatever-length-audio-chunk), so the optimal interface would let the aggregator control that well.
type Aggregator interface {
Handle(<-chan Message) <-chan Message
}
Aggregator
isn't the best name, and the interface optimally wouldn't require it to run in a separate goroutine to use. It would probably plug next to Validator stuff (and could be used to implement it)
I've just found in js-libp2p-floodsub
that messages might be ignored when they shouldn't be.
We're doing something similar to:
Lines 460 to 462 in 107d351
...but in JS, converting the seqno
to utf8 string can sometimes result in the same string for different buffers. The following are two different seqno's that js-ipfs
was sent from go-ipfs
that demonstrates the problem:
$ node
> const buf2 = Buffer.from('15603533e990dfe0', 'hex')
> const buf1 = Buffer.from('15603533e990dfde', 'hex')
> buf1.toString('utf8')
'\u0015`53���'
> buf2.toString('utf8')
'\u0015`53���'
> buf1.toString('utf8') === buf2.toString('utf8')
true
>
So sometimes we think we've seen the message when we haven't! 🤦♂️
I'm flagging this here incase this could be a problem in Go as well...please feel free to close if you can determine it's not a problem :)
FYI for the PR to js-libp2p-floodsub
I'll probably convert the seqno buffer to a hex encoded string instead.
As a developer using libp2p, I understand that libp2p provides a publish-subscribe message passing system. I shouldn't need to know or care how it is implemented (flooding). However, by using the word "flood" in the name of objects and imports, an implementation detail about the use of flooding is being leaked through to me.
It would be preferable to remove the word flood
in interfaces exposed to higher level code.
Thoughts?
In tests that use gossipsub, we have to sleep for a bit to wait for the heartbeats to set everything up properly. This is pretty inconvenient, as adding two seconds of wait time to every single test gets pretty expensive.
Is there a way to force this to happen immediately?
cc @vyzo
Not all subscribers will want to relay (bandwidth reasons) and not all relays will want to subscribe (they may just want to forward values without looking at them).
The Gossipsub spec contains a membership management protocol which doesn't seem to be implemented here. Is this still on the roadmap? If not, what is the alternative? Also, is the spec itself final already or is it work in progress?
We've been running into some hickups in OrbitDB recently as we've been upgrading from go-ipfs 0.4.13 to 0.4.16 (and now 0.4.17).
We've observed a behaviour where pubsub peers don't always connect to each other. Detailed information on the behaviour and what we've discovered so far in orbitdb/orbitdb#417. To summarize: the tests are run with two local go-ipfs nodes, both subscribe to the same pubsub topic, but sometimes (about 50% of test runs) they don't seem to connect to each other on pubsub level even though they're connected on swarm level (see this comment orbitdb/orbitdb#417 (comment)).
Looking at the commit log in this repo, I noticed that the pubsub implementation was changed (yay, great to see the gossip pubsub coming together!) some time ago, so most likely this is regression. I believe 0.4.13 was working fine, but we haven't verified which version after that doesn't work anymore.
If anyone has an idea why this is happening and work on a fix, would highly appreciate it. We're (finally) adding back the support for go-ipfs in OrbitDB, but the tests are not running reliably at all times due to this bug, so we're hesitant to make a new release until this is fixed (alternatively we could instruct user to use =< 0.4.13 but that's no fun).
If there's any information I can provide to solve this, please let me know!
We need to start tagging connections that are useful to us through the connection manager
For example, in the DHT, we tag any peer that goes into our routing table with a value of 5.
We should likely tag peers that are in pubsub topics that we are subscribed to, or that we publish to.
Hi !
I might be missing something, but I cannot find a way to unregister a topic validator.
Indeed, in my case I have multiple services that use the floodsub layer. A service subscribes to a topic and registers a validator for that topic. When stopping the service, I can call Cancel()
on the Subscribtion
, but there is no way to remove the topic validator from Pubsub.topicVals
.
Maybe I am doing this wrong, but it feels necessary to have one of these:
C# test and JS test are intermittently failing. Both are publishing multiple messages and then counting the subscribed messages.
It appears that some published messages are not being relayed to the subscribed channels. In the C# test both publisher and subscriber are on the same peer.
NOTE: these tests are running on Windows. This was first detected in ipfs-inactive/js-ipfs-http-client#648
Hi,
I am refactoring general p2p package to floodsub, but only see the node's own messages being received. Am I missing something in floodsub setup? Here is the attached code:
Command: go run flood2test.go -g 4330 -secio false
TIA
package main
import (
"context"
"crypto/rand"
"flag"
"fmt"
"io"
//"io/ioutil"
"log"
mrand "math/rand"
//"strings"
"time"
floodsub "gx/ipfs/QmeRgVyVFj3bpgF9vrLepT3AMA8pU5EN8ykbspTo6kcjoz/go-libp2p-floodsub"
libp2p "github.com/libp2p/go-libp2p"
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
//net "gx/ipfs/QmXoz9o2PT3tEzf7hicegwex5UgVP54n3k82K7jrWFyN86/go-libp2p-net"
host "gx/ipfs/QmaSfSMvc1VPZ8JbMponFs4WHvF9FgEruF56opm5E1RgQA/go-libp2p-host"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore"
crypto "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto"
uuid "gx/ipfs/QmcBWojPoNh4qm7zvv4qiepvCnnc7ALS9qcp7TNwwxT1gT/go.uuid"
)
var GID = uuid.Must(uuid.NewV4()).String() //Unique node id for alpha
type DagNet struct {
ps *floodsub.PubSub
host host.Host
ctx context.Context
}
var Floodnet DagNet
func makeDAGBlockHost(floodPort int, secio bool, randseed int64) (host.Host, error) {
// If the seed is zero, use real cryptofloodic randomness. Otherwise, use a
// deterministic randomness source to make generated keys stay the same
// across multiple runs
var r io.Reader
if randseed == 0 {
r = rand.Reader
} else {
r = mrand.New(mrand.NewSource(randseed))
}
// Generate a key pair for this host. We will use it
// to obtain a valid host ID.
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
if err != nil {
return nil, err
}
opts := []libp2p.Option{
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", floodPort)),
libp2p.Identity(priv),
}
if !secio {
opts = append(opts, libp2p.NoEncryption())
}
/******Graph Host*********/
floodHost, err := libp2p.New(context.Background(), opts...)
if err != nil {
return nil, err
}
// Build floodhost multiaddress
ghostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", floodHost.ID().Pretty()))
// Now we can build a full multiaddress to reach this host
// by encapsulating both addresses:
gaddr := floodHost.Addrs()[0]
gfullAddr := gaddr.Encapsulate(ghostAddr)
////////////////////////////////////////////////////////////////
/*************************/
log.Printf("I am %s flood & %s block Node\n", gfullAddr)
if secio {
log.Printf("Now run \"go run flood2test.go -g %d -d %s -secio false \" on a different terminal\n", floodPort+1, gfullAddr)
} else {
log.Printf("Now run \"go run flood2test.go -g %d -d %s \" on a different terminal\n", floodPort+1, gfullAddr)
}
///////////////////
return floodHost, nil
}
func main() {
//log.DisableLogs()
mrand.Seed(int64(time.Second))
//t := time.Now()
// Parse options from the command line
listenG := flag.Int("g", 0, "wait for incoming flood layer connections")
target := flag.String("d", "", "target peer to dial, comes with both flood and block dialins")
secio := flag.Bool("secio", false, "enable secio")
seed := flag.Int64("seed", 0, "set random seed for id generation")
flag.Parse()
if *listenG == 0 {
log.Fatal("Please provide a port to bind for -g <port>")
}
gh, err := makeDAGBlockHost(*listenG, *secio, *seed)
if err != nil {
log.Fatal(err)
}
if *target == "" {
log.Println("listening for connections")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ps, err := floodsub.NewFloodSub(ctx, gh) //pubsub
if err != nil {
return
}
Floodnet = DagNet{ps: ps, host: gh,ctx:ctx}
for{
ch, err := Floodnet.ps.Subscribe("floodtest")
if err != nil {
log.Fatalln("FOps:",err)
}
time.Sleep(time.Millisecond * 10)
msg := []byte("floodtest: "+GID)
err = Floodnet.ps.Publish("floodtest", msg)
if err != nil {
log.Fatalln("T696",err)
}
msg2 := []byte("floodtest: "+GID)
err = Floodnet.ps.Publish("floodtest", msg2)
if err != nil {
log.Fatalln("T706",err)
}
testbin, _ := ch.Next(Floodnet.ctx)
fmt.Println("Test1: ",string(testbin.GetData()) )
testbin, _ = ch.Next(Floodnet.ctx)
fmt.Println("Test2: ",string(testbin.GetData()) )
time.Sleep(5 * 1000*1000*1000)
}
}else{
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
gipfsaddr, err := ma.NewMultiaddr(*target)
if err != nil {
log.Fatalln(err)
}
gpid, err := gipfsaddr.ValueForProtocol(ma.P_IPFS)
if err != nil {
log.Fatalln(err)
}
gpeerid, err := peer.IDB58Decode(gpid)
if err != nil {
log.Fatalln(err)
}
// Decapsulate the /ipfs/<peerID> part from the target
// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
gtargetPeerAddr, _ := ma.NewMultiaddr(
fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(gpeerid)))
gtargetAddr := gipfsaddr.Decapsulate(gtargetPeerAddr)
// We have a peer ID and a targetAddr so we add it to the peerstore
// so LibP2P knows how to contact it
gh.Peerstore().AddAddr(gpeerid, gtargetAddr, pstore.PermanentAddrTTL)
nh,err := pstore.InfoFromP2pAddr(gtargetPeerAddr)
fmt.Println("::>",*nh)
gerr := gh.Connect(ctx, *nh )
if gerr != nil {
log.Fatalln(gerr)
}
log.Println("opening flood stream")
/*
/////test///
ps, err := floodsub.NewFloodSub(ctx, gh) //pubsub
s, err := gh.NewStream(context.Background(), gpeerid, "/p2p/1.0.0")
if err != nil {
log.Fatalln("T238",err)
}
_, err = s.Write([]byte("Hello, world!\n"))
if err != nil {
log.Fatalln(err)
}
out, err := ioutil.ReadAll(s)
if err != nil {
log.Fatalln(err)
}
log.Printf("read reply: %q\n", out)
*/
//////////////
// make a new stream from host B to host A
// it should be handled on host A by the handler we set above because
// we use the same /p2p/1.0.0 protocol
if err != nil {
return
}
Floodnet = DagNet{ps: ps, host: gh,ctx:ctx}
//Start the flood miner
for{
ch, err := Floodnet.ps.Subscribe("floodtest")
if err != nil {
log.Fatalln("FOps:",err)
}
time.Sleep(time.Millisecond * 10)
msg := []byte("floodtest: "+GID)
err = Floodnet.ps.Publish("floodtest", msg)
if err != nil {
log.Fatalln("T696",err)
}
msg2 := []byte("floodtest: "+GID)
err = Floodnet.ps.Publish("floodtest", msg2)
if err != nil {
log.Fatalln("T706",err)
}
testbin, _ := ch.Next(Floodnet.ctx)
fmt.Println("Test1: ",string(testbin.GetData()) )
testbin, _ = ch.Next(Floodnet.ctx)
fmt.Println("Test2: ",string(testbin.GetData()) )
time.Sleep(5 * 1000*1000*1000)
}
select {} // hang forever
}
}
//End
$ ipfs version --all go-ipfs version: 0.4.18- Repo version: 7 System version: amd64/linux Golang version: go1.11.1
Peer A subscribes to a topic 'test_topic', and then peer B publish data to peer A to 'test_topic'. At the beginning, peer A can receive the data published by B, but after about 8 hours, peer B would publish data to peer A. At this time, A would not receive the data of peer B.
(Note: the middle 8 hours peer B did not publish any data to peer A and I executed ipfs swarm peers
on peer B to see the connection of peer A. I executed ipfs pubsub peers
on peer B without seeing any output.).
Then, I am in the peer B execution ipfs swarm disconnect <peer A>
, ipfs swarm connect <peer A>
, at this time peer B re-publishes data to peer A, peer A can receive it. And I execute ipfs pubsub peers
on peer B can see the id of peer A.
I suspect that after the peer connection, when writing data to the peer, other non-network errors occurred, causing the peer to be removed from the pubsub route. But in the swap peers (dht route?) there is always a connection to the peer, so there is no new connection to the peer to cause the pubsub route to register the peer.
Reference source code:
To avoid future confusion such as -- #67 (comment) --. I want to propose that we adopt the following:
This will make it very clear that libp2p/IPFS support multiple pubsub implementations and that we encourage everyone to try new ones.
In #55, validation is per-subscription. Unfortunately, this means:
Really, validation is a property of the topic, not the subscription. Eventually, I'd like to be able to tell pubsub to relay a topic without explicitly subscribing (#28).
In terms of implementation, we have a few choices:
2 is probably the easiest but not the cleanest. Possible interface:
// Register a validator with the pubsub.
func (p *PubSub) RegisterValidator(v Validator)
type Validator interface {
// Apply this validator to this topic.
Validates(topic string) bool // or `ValidatorFor(topic string) TopicValidator`?
// Apply this validator to this topic by descriptor. Do we even need topic descriptors anymore? We don't use them... Having both of these is annoying...
ValidatesTopicDescriptor(topic *pb.TopicDescriptor) bool
Validate(msg) bool
}
For the latest Ethereum sharding + Casper spec, the theoretical maximum of validators (assuming a supply cap of 128m ETH) is 4m. This conflicts with the target size of 10,000 in https://github.com/libp2p/specs/tree/master/pubsub/gossipsub#membership-management-protocol. 10,000 won't even be OK initially, since there are 16,825 nodes as I write this. Dynamic sizing is needed support up to 4,000,000 (or more) will be needed eventually, and 20,000 will be needed initially.
Two models have been proposed for the network layer: one network for each shard, or a torus shaped network, where a node mostly communicates with nodes in the same shard, while propagation of messages between shards are limited to a certain intra-shard distance, and to certain types of messages such as new collations, e.g. a node in shard 2018 (there could be up to 4096 shards in total) can relay new collations to notaries between shards 2013-2023, while these notaries may be shuffled to shard 2018 soon (while possibly notaries in one shard may be shuffled to a shard up to n, say 5, shards away in the next period).
I'll cross-post this on the above ethresear.ch thread and in issue #77.
Assuming N = 4,000,000 then C_rand = 7.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.