aspecto-io / sns-sqs-big-payload Goto Github PK
View Code? Open in Web Editor NEWAmazon SNS/SQS client library that enables sending and receiving messages with payload larger than 256KiB via Amazon S3.
License: Apache License 2.0
Amazon SNS/SQS client library that enables sending and receiving messages with payload larger than 256KiB via Amazon S3.
License: Apache License 2.0
sns-sqs-big-payload/src/sqs-consumer.ts
Lines 163 to 165 in d5fb153
when integrating with lambda the above catch clause catches the error which causes the message to be removed from queue eventhough there was an error. Can you fix this?
Btw, really nice work on this package! ๐
Is it possible to use same schema as
https://docs.aws.amazon.com/sns/latest/dg/large-message-payloads.html
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
java libraries for SNS and SQS to be able to produce/consume using nodeJs and consume/produce using Java.
they use next Json
[ "software.amazon.payloadoffloading.PayloadS3Pointer", { "s3BucketName": "extended-client-bucket", "s3Key": "xxxx-xxxxx-xxxxx-xxxxxx" } ]
or at least maybe make it configurable
Thanks
Iโm trying to use SqsProducer to send sqs message inside my lambda function. But my function is timed out when Iโm using very simple code from readme.
const sqsProducer = SqsProducer.create({
queueUrl: "https://sqs.us-east-1.amazonaws.com/1234567890/my-sqs",
region: process.env.AWS_REGION,
largePayloadThoughS3: true,
extendedLibraryCompatibility: true,
s3Bucket: "my-s3",
});
Any idea why it may happen?
sns publish method used in readme file is wrong: await snsProducer.sendJSON({
Fix: await snsProducer.publishJSON({
Hi, AWS Lambda makes all the keys in the object lowercase, and your Consumer class for example is trying to do something like message.Body
however this is undefined, because the key Body
is actually body
, therefore an error Invalid JSON at position 0 character 'u'
is thrown.
I think you cannot change it to 'body' directly since the implementation works otherwise in environment outside Lambda, what I would suggest is to pass a key like integration: 'lambda'
when an instance of the class is created and then you know if your class is being used in the context of lambda i.e lowercase record (event) keys.
To whom it may concern,
You code is helpful.
It may give the user more flexibility if user can define which directory in s3 bucket she/he can upload the files to.
May I suggest to add
private s3BucketPrefix: string;
this.s3BucketPrefix = s3BucketPrefix;
to SqsProducer and SnsProducer class
in PublishJson or SendJson function, modify payloadKey
const payloadKey = ${this.s3BucketPrefix}/${payloadId}.json
;
Regards,
Arthur
Since AWS are ending V2 support*. I was wondering if there was any plan to migrate to the latest V3 of AWS SDK for Node.
Otherwise, we could potentially work on that migration should that be something you would be open to?
btw: Thankyou for this project, it's been really helpful to us ๐
*
NOTE: We are formalizing our plans to enter AWS SDK for JavaScript (v2) into maintenance mode in 2023.
Please migrate your code to use AWS SDK for JavaScript (v3).
For more information, check the migration guide at https://a.co/7PzMCcy
Hi,
I noticed everything works fine accept when I send any data that uses S3 then I will get nothing page.
On handle message, I did:
handleMessage: async ({ payload }) => { console.log({ payload }) },
Payload would return undefined. This only happens when I send any data over 256KB and requires S3. When I check SQS messages and poll, I can see something like:
{"S3Payload":{"Id":"3eaff789-c556-4cbd-8381-acde38e2508a","Bucket":"sqs-event-payload","Key":"3eaff789-c556-4cbd-8381-acde38e2508a.json","Location":"https://event-payload.s3.amazonaws.com/3eaff789-c556-4cbd-8381-acde38e2508a.json"}}
I have getPayloadFromS3 set to true but it doesn't seem like the message is downloaded. Using transformMessageBody returns the raw message as saved on SQS. That is an S3Payload
object containing Id, Bucket, Key and Location.
But on calling handleMessage
, I get nothing. Any idea what I am doing wrong?
Thanks to all of the maintainers to create & maintain such a helpful library, there is a similar open issue for SNS & we need similar functionality for SQS as well, as we are using AWS SQS as a trigger to lambda & filtering out messages based on sent MessageAttributes.
The sendJSON method in the SQS produces doesn't support the MessageAtrributes & MessageSystemAttributes sent by the caller as of now.
Can we add this support as well? I would be more than happy to contribute & raise an MR for the fix.
Thanks!
I'd like to use this inside of a Lambda that's triggered from an SQS queue. How would I do that? I don't think polling is an appropriate solution here as AWS has already provided me the SQS message.
Hi again!
I'd like to cover the functionality of the error handlers of my implementation with unit tests, but some of the error handlers are not reachable. AFAIK it's not possible to test them with unit tests currently, as we can't reproduce error scenarios with the public interface of SqsConsumer.
Please see the code below for examples of unreachable code blocks;
sqsConsumer.on(SqsConsumerEvents.error, err => {
errorHandler('error', queueType, err, logger);
});
sqsConsumer.on(SqsConsumerEvents.connectionError, err => {
errorHandler('connection_error', queueType, err, logger);
});
sqsConsumer.on(SqsConsumerEvents.s3PayloadError, err => {
errorHandler('s3_payload_error', queueType, err, logger);
});
To make libraries which depend on event handlers more test friendly, the interfaces that implement EventEmitter might be either updated to publicly expose the EventEmitter instance, or make the class inherit the EventEmitter interface.
Once such a change is in place, it would be possible to trigger the events hence test the event handlers like below example;
sqsConsumer.emit(SqsConsumerEvents.error, new Error('test'));
Please let me know what you think, and if you'd be interested in a PR.
Cheers!
The same problem was reported earlier on issue #19, but the fix does not make sns-sqs-big-payload compatible with the JSON schema used by https://github.com/awslabs/amazon-sqs-java-extended-client-lib.
Amazon S3 reference in the message body of sns-sqs-big-payload does not match with the JSON schema of the Java library.
The JSON structure that Java client produces is in the following format;
[
"software.amazon.payloadoffloading.PayloadS3Pointer",
{
"s3BucketName": "extended-client-bucket",
"s3Key": "xxxx-xxxxx-xxxxx-xxxxxx"
}
]
Whereas the format that sns-sqs-big-payload expects when extendedLibraryCompatibility
is enabled is as follows;
{
"s3BucketName": "extended-client-bucket",
"s3Key": "xxxx-xxxxx-xxxxx-xxxxxx.json"
}
There are two notable differences;
.json
extension in the value of s3Key
, it cannot resolve the object from S3.Assuming there is an object in S3 with the name 7ac71e8a-35bd-420f-b644-97abf2317e1d.json
, running the following script will help reproducing the errors in the library.
import {SQS} from 'aws-sdk';
const sqs = new SQS({region: sqsConfig.region});
await sqs
.sendMessage({
QueueUrl: sqsConfig.queueUrl,
MessageBody:
'["software.amazon.payloadoffloading.PayloadS3Pointer",{"s3BucketName":"extended-client-bucket","s3Key":"7ac71e8a-35bd-420f-b644-97abf2317e1d"}]',
MessageAttributes: {
SQSLargePayloadSize: {
StringValue: '5198',
StringListValues: [],
BinaryListValues: [],
DataType: 'Number',
},
},
})
.promise();
The first error you'd hit would be There were 2 validation errors:\n* MissingRequiredParameter: Missing required key 'Bucket' in params\n* MissingRequiredParameter: Missing required key 'Key' in params"
, because lib cannot map the Key and Bucket fields in the payload due to JSON schema mismatch.
And if you get over it with manual payload transformation, due to not having the .json
extension in S3 key name, the lib will hit "NoSuchKey: The specified key does not exist.
error.
When the opt-in extendedLibraryCompatibility
option is used, the library should respect the JSON schema which the AWS client library produces. And, it should add an additional postfix while fetching the S3 object.
Hello, I tried use package with my lambda functions, for SqsProducer works fine, it send a payload json to S3 Bucket, but my problem is when I need to consume a message that was sent to S3. I tried like this example: https://github.com/aspecto-io/sns-sqs-big-payload/blob/HEAD/docs/usage-in-lambda.md
But never runs the handleMessage function and always returns undefined response. The transformMessageBody runs but with body param undefined too.
My function getMessage should return a message payload.
async getMessage (messageProps) {
const sqsConsumer = SqsConsumer.create({
region: 'us-east-1',
getPayloadFromS3: true,
s3Bucket: 'MY-S3-BUCKET',
transformMessageBody: (body) => {
console.log('transformMessageBody', body)
const snsMessage = JSON.parse(body);
return snsMessage.Message;
},
parsePayload: (raw) => JSON.parse(raw),
handleMessage: async ({ payload }) => {
console.log('handleMessage', payload)
},
});
const result = await sqsConsumer.processMessage(messageProps);
return result;
}
This is my entire message payload
{
"Records": [
{
"messageId": "238bba99-76e3-4fe2-b777-****",
"receiptHandle": "BLABLABAL",
"body": "{\"S3Payload\":{\"Id\":\"a51bed3c-e0d2-45f1-a37a-***\",\"Bucket\":\MY-S3-BUCKET\",\"Key\":\"a51bed3c-e0d2-45f1-a37a-*****.json\",\"Location\":\"https://MY-S3-BUCKET.s3.amazonaws.com/a51bed3c-e0d2-45f1-a37a-****.json\"}}",
"attributes": {
"ApproximateReceiveCount": "8",
"SentTimestamp": "1604921763468",
"SenderId": "AROA4VG6N6C5XCF3YA7S3:BABALBALBAL",
"ApproximateFirstReceiveTimestamp": "1604921763468"
},
"messageAttributes": {},
"md5OfBody": "4a125225a9e7016440****",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:*****:queueName",
"awsRegion": "us-east-1"
}
]
}
I send the first object of array Records, like this.
eventBody = await this.queueService.getMessage(event.Records[0]);
I really believe that I missed something but I dont know where. Could you help me?
Thanks.
sns-sqs-big-payload/src/sqs-consumer.ts
Line 292 in fb283ee
sns-sqs-big-payload/src/sqs-consumer.ts
Line 281 in fb283ee
pretty sure that it should be receiptHandle with lower case and not ReceiptHandle. ๐
Btw, keep up the good work ๐
Hi,
I want to prefix the JSON file with custom value.
` async sendJSON(message: unknown, options: SqsMessageOptions = {}, jsonprefix): Promise {
const messageBody = JSON.stringify(message);
const msgSize = Buffer.byteLength(messageBody, 'utf-8');
if ((msgSize > this.messageSizeThreshold && this.largePayloadThoughS3) || this.allPayloadThoughS3) {
const payloadId = uuid();
const payloadKey = `${jsonprefix}${payloadId}.json`;`
Thanks for making this library available, it is really useful. We are looking to implement this library in our project and came across a small issue. We currently have an events driven architecture, where we use a lot of SNS + SQS to route and fan out messages to different consumers. As multiple SQS queues might be subscribed to an SNS topic, we use MessageAttributes to filter out only the events the consumer needs to process and reduce the amount of lambda invocations.
We currently publish this way:
sns.publish({
TopicArn: routerTopicArn,
Message: JSON.stringify(event),
MessageAttributes: {
eventName: {
DataType: 'String',
StringValue: event.eventName || '',
},
},
});
And our SQS subscription filter policy to SNS looks something like this:
{
"eventName": [
"validation.create",
"activity.process"
]
}
When looking closer at your SNSProducer
we noticed you expose a sendJSON method that creates the JSON string and publishes the message, but does not allow us to pass any other inputs to the core SNS publish method, in our case MessageAttributes
.
Would it be possible to add this support? I am happy to provide a patch for this is you accept contributions.
I already posted this as an idea within the discussions
category but posting it here to make it more visible to the other folks.
I was very much impressed with this library and almost decided to use it for our use case which is similar to yours at Aspecto but we currently use SQS.sendMessageBatch
function to send a batch of messages. I was wondering it would be nice to implement the same functionality within SQS Producer of this library so we can send messages to SQS in batches helping us with less no. of network calls/connections.
amazon-sqs-java-extended-client alredy supports this functionality - https://github.com/awslabs/amazon-sqs-java-extended-client-lib/blob/master/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java#L759
Let me know your thoughts about this idea.
I've created this issue as a general one for lambda issues.
Since we don't use lambda mode in our organization, it was never properly tested.
I think, the main 2 issues are lowercased keys and error handling.
Lowercased keys can be fixed by an additional callback. Error handling is a matter of documentation.
We may not have spare time to fix this in the nearest future, but we'll gladly accept a PR.
From a quick skim of the source code it doesn't look like the JSON is compressed before sending it.
This seems like a simple new feature which would allow more messages to go via SQS without needing to be uploaded to S3
Hi there,
as you can see I started using your library and extending your library #26
I looked into the license, because I thought some of the consumer interface/options/code is kind of similar to the bbc consumer: https://github.com/bbc/sqs-consumer
If you took some of the code to start with this project maybe you should consider stating that in the README or maybe it would even be needed to actually include their license, or at least their lines in your license file.
Copyright (c) 2017-present British Broadcasting Corporation
All rights reserved
(http://www.bbc.co.uk) and sqs-consumer Contributors
But maybe you just randomly have a very similar naming of options and stuff because it's obviously derived from the same aws docs.
Nonetheless, looking at your license file I'm seeing:
Copyright [yyyy] [name of copyright owner]
I wasn't sure if I should even open up this issue, because I don't care too much personally. But I thought if more and more people start using it, you might want to know about this, so you can clean it up.
Cheers and thanks for the great library.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.