Comments (14)
Hey @JohnRoesler thanks for trying it out. I'm assuming this is due to not having SASL auth set up yet, it should be quick to add.
from connect.
Hey @mintbridge, I don't see any major reason not to include it as an option. Since the protocols are so different I imagine the configuration fields we would expose would be completely different, in which case it ought to be a separate type within Benthos. Obviously, that puts us in an awkward position with amqp
being 0.91 and then some other type being called amqp_1_0
.
Any idea which broker implementations supporting 1.0 are most common? Would be nice to get an idea of use statistics and future prospects for both versions and maybe do some breaking config changes if we go down this route.
from connect.
Marking this as help wanted, right now I just want to collect use cases and opinions.
from connect.
yeah, i was thinking maybe do something with a version option, and just swap out the implementation underneath but yeah the config would be totally different
this has good list https://github.com/xinchen10/awesome-amqp but the one i'm after in particular is azure service bus
from connect.
This project looks wonderful, but without a more modern version of AMQP, I cannot use it. I assume no updates on this?
from connect.
Hey @holmanbph, the Go libraries on offer for AMQP 1.0 still seem a little premature, https://github.com/vcabbage/amqp still seems to be the safest bet but it's still not at version 1. However, I'm not opposed to adding it in as a beta component.
from connect.
I've renamed the existing amqp
(0.91) components to amqp_0_9
and deprecated the existing ones, they'll get removed in V4. This opens us up to add amqp_1
(and potentially amqp_0_10
as well).
from connect.
👍
from connect.
Thanks for the update!
from connect.
It looks like the amqp 1.0 library was forked by azure https://github.com/Azure/go-amqp
from connect.
@mintbridge, @holmanb, @JohnRoesler, I finally got around to this 4f12998
I've added some very basic components and labelled them BETA, they're currently tested against ActiveMQ. There's work needed for supporting metadata/headers. I'll probably put a release out this evening. Is anyone interested in being a guinea pig and trying them out?
from connect.
@Jeffail wow, thanks for getting this going!
Doing some preliminary testing, I am not able to get it working with the benthos plugin to consume from an Azure Service Bus. However, if I use the underlying Go library I am able to consume. See snippets below.
input:
type: amqp_1
amqp_1:
source_address: "/<EventHubName>/ConsumerGroups/<ConsumerGroupName>/Partitions/<PartitionNumber>"
url: "amqp://<mydomain>.servicebus.windows.net/;SharedAccessKeyName=<name>;SharedAccessKey=<key>;"
output:
type: stdout
stdout:
delimiter: ""
{"@timestamp":"2020-06-29T23:05:33-05:00","@service":"benthos","level":"ERROR","component":"benthos.input","message":"Failed to connect to amqp_1: *Error{Condition: amqp:unauthorized-access, Description: Unauthorized access. 'Listen' claim(s) are required to perform this operation. Resource: 'sb://<mydomain>.servicebus.windows.net/<event_hub>'. TrackingId:<tracking_id>, SystemTracker:gateway5, Timestamp:2020-06-30T04:05:32, Info: map[]}"}
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
// Create client
client, err := amqp.Dial("amqps://<mydomain>.servicebus.windows.net",
amqp.ConnSASLPlain("access-key-name", "access-key"),
)
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
ctx := context.Background()
// Continuously read messages
{
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkSourceAddress("/<EventHubName>/ConsumerGroups/<ConsumerGroupName>/Partitions/<PartitionNumber>"),
amqp.LinkCredit(10),
)
if err != nil {
log.Fatal("Creating receiver link:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
receiver.Close(ctx)
cancel()
}()
for {
// Receive next message
msg, err := receiver.Receive(ctx)
if err != nil {
log.Fatal("Reading message from AMQP:", err)
}
// Accept message
msg.Accept()
fmt.Printf("Message received: %s\n", msg.GetData())
}
}
}
The Azure Service Bus was provided to me, so I do not have access to tweak the settings for it. It appears to be a partitioned topic, so my understanding is that I would need to create multiple receivers - one for each partition (0,1,2,3).
from connect.
Confirmed the 3.20.0 release works 👍
input:
type: amqp_1
amqp_1:
source_address: "/<EventHubName>/ConsumerGroups/<ConsumerGroupName>/Partitions/0"
url: "amqps://<mybus>.servicebus.windows.net"
sasl:
mechanism: plain
user: <user>
password: "<password>"
from connect.
🎉
from connect.
Related Issues (20)
- redis_streams: support for `XAUTOCLAIM`
- Docs typo in Configuration: Templating
- sql_insert - high CPU usage mainly due to GC cycles and allocations. HOT 6
- Global options no longer work via rpk connect HOT 5
- Elasticsearch output backoff should honor HTTP code `429`
- Log rotation is extra aggressive on removing older log files
- Kafka_franz info HOT 1
- Emit `kafka_lag` metadata for the `kafka_franz` input similarly to the `kafka` input
- Add connector support levels to the connector source and templates
- S3 output missing header since aws-sdk-go-v2 upgrade HOT 1
- Contributing link leads nowhere
- Stored procedure output HOT 2
- How to overwrite the value of tracparent in tracing_span().traceparent
- Bloblang `with` method is not properly filtering arrays HOT 3
- Enhance the http_client output with additional error checking HOT 4
- Incorrect Usage: flag provided but not defined: -r when in streams mode HOT 2
- Failed to recover from broker unavailable using kafka_franz HOT 2
- Is there any option to retrieve error result from underlying output component that is wrapper inside Retry output component? HOT 1
- sql_insert ORA-01461: can bind a LONG value only for insert into a LONG column HOT 3
- add name to the mongo input/output for tracking w/ mongodb
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from connect.