Use-Case
Many users have external systems which write their logs to Amazon S3. These users want to use OpenSearch to analyze these logs. Data Prepper is an ingestion tool which can aid teams in extracting these logs for S3 and sending them to OpenSearch or elsewhere.
This proposal is to receive events from S3 notifications, read the object from S3, and create log lines for these.
Basic Configuration
This plugin will be a single source plugin which:
- Polls a configured SQS standard queue which should hold S3 Event messages.
- Reads S3 objects which the message indicates as created.
- Uses a configured codec to parse the S3 object into Log Events.
- Writes the Log Events into the Data Prepper buffer.
The following example shows what a basic configure would look like.
source:
s3:
notification_type: sqs
sqs:
queue_url: "https://sqs.us-east-2.amazonaws.com/123456789012/MyS3EventQueue"
codec:
single-line:
processor:
grok:
match:
message: [ "%{COMMONAPACHELOG}" ]
Detailed Process
The S3 Source will start a new thread for reading from S3. (The number of threads can be configured).
This thread will perform the following steps repeatedly until shutdown
- Use the SQS
ReceiveMessage
API to receive messages from SQS.
- For each Message from SQS, it will:
a. Parse the Message as an S3Event.
b. Download the S3 Object which the S3Event indicates was created.
c. Decompress the object if configured to do so.
d. Parse the decompressed file using the configured codec
into a list of Log
Event
objects.
e. Write the Log
objects into the Data Prepper buffer.
- Perform a
DeleteMessageBatch
with all of the messages which were successfully processed.
- Repeat
Error Handling
The S3 Source will suppress exceptions which occur during processing. Any Message which is not processed correctly will not be included in the DeleteMessageBatch
request. Thus, the message will appear in the SQS again. Data Prepper expects that the SQS queue is correctly configured with a DLQ or MessageRetentionPeriod to prevent the SQS queue from filling up with invalid messages.
Codecs
The S3 Source will use configurable codecs to support multiple data formats in the S3 objects. Initially, two codecs are planned:
single-line
- This is used for logs which should be separated by a newline.
json
- A codec for parsing JSON logs
Single Line
The single-line
codec has no configuration items.
Below is an example S3 object.
POST /search
POST /index
PUT /document/12345
With single-line
, the S3 source will produce 3 Events, each with the following structure.
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.log",
"message" : "POST /search"
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.log",
"message" : "POST /index"
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.log",
"message" : "PUT /document/12345"
JSON
The json
codec supports reading a JSON file and will create Events for each JSON object in an array. This S3 plugin is starting with the expectation that the incoming JSON is formed as a large JSON array of JSON objects. Each JSON object in that array is an Event. Thus, this codec will find the first JSON array in the JSON. It will output the objects within that array as Events from the JSON.
Future iterations of this plugin could allow for more customization. One possibility is to use JSON Pointer. However, the first iteration should meet many use-cases and allows for streaming the JSON to support parsing large JSON objects.
Below is an example configuration. This configures the S3 Sink to read a JSON array from the items
key.
Given the following S3 Object:
{
"http_requests" : [
{ "status" : 200, "path" : "/search", "method" : "POST" },
{ "status" : 200, "path" : "/index", "method" : "POST" },
{ "status" : 200, "path" : "/document/12345", "method" : "PUT" }
]
}
The S3 source will output 3 Log events:
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.json",
"message" : { "status" : 200, "path" : "/index", "method" : "POST" }
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.json",
"message" : { "status" : 200, "path" : "/search", "method" : "POST" }
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.json",
"message" : { "status" : 200, "path" : "/document/12345", "method" : "PUT" }
Compression
The S3 Source will support three configurations for compression.
none
- The object will be treated as uncompressed.
gzip
- The object will be decompressed using the gzip decompression algorithm
automatic
- The S3 Source will example the object key to guess if it is compressed or not. If the key ends with .gz
the S3 Source will attempt to decompress it using gzip. It can support other heuristics to determine if the file is compressed in future iterations.
Full Configuration Options
Option |
Type |
Required |
Description |
notification_type |
Enum: sqs |
Yes |
Only SQS is supported. SNS may be a future option |
compression |
Enum: none , gzip , automatic |
No |
Default is none |
codec |
Codec |
Yes |
See Codecs section above. |
sqs.queue_url |
String - URL |
Yes |
The queue URL of the SQS queue. |
sqs.maximum_messages |
Integer |
No |
Directly related to SQS input. Default is 10. |
sqs.visibility_timeout |
Duration |
No |
Directly related to SQS input. Default is TBD. |
sqs.wait_time |
Duration |
No |
Directly related to SQS input. Default is TBD. |
sqs.poll_delay |
Duration |
No |
An optional delay between iterations of the process. Default is 0 seconds. |
sqs.thread_count |
Integer |
No |
Number of threads polling S3. Default is 1. |
region |
String |
Yes |
The AWS Region. TBD. |
sts_role_arn |
String |
No |
Role used for accessing S3 and SQS |
access_key_id |
String |
No |
Static access to S3 and SQS |
secret_key_id |
String |
No |
Static access to S3 and SQS |
buckets |
String List |
No |
If provided, only read objects from the buckets provided in the list. |
account_ids |
String List |
No |
If provided, only read objects from the buckets owned by an accountId in this list. |
S3 Events
The S3 Source will parse all SQS Messages according to the S3 Event message structure.
The S3 Source will also parse out any event types which are not s3:ObjectCreated:*
. These events will be silently ignored. That is, the S3 Source will remove them from the SQS Queue, and will not create an Events for them.
Additionally, this source will have an optional buckets
and account_ids
lists. If supplied by the pipeline author, Data Prepper will only read objects for S3 events which are part of that list. For the buckets
list, only S3 buckets in the list are used. For the account_ids
list, only buckets owned by accounts with matching Ids are used. If this list is not provided, Data Prepper will read from any bucket which is owned by the accountId of the SQS queue. Use of this list is optional.
AWS Permissions Needed
The S3 Source will require the following permissions:
Action |
Resource |
s3:GetObject |
The S3 bucket and key path for any object needed |
sqs:ReceiveMessage |
The ARN of the SQS queue specified by sqs.queue_url |
sqs:DeleteMessageBatch |
The ARN of the SQS queue specified by sqs.queue_url |
Possible Future Enhancements
Direct SNS Notification
The notification_type
currently only supports SQS. Some teams may want Data Prepper to receive notifications directly from SNS and thus remove the need for an SQS queue.
The notification_type
could support an sns
value in the future.
Additional Codecs
As needed, Data Prepper can support other codecs. Some possible candidates to consider are:
Metrics
- messagesReceived (Counter)
- messagesDeleted (Counter)
- messagesFailed (Counter)
- eventsCreated (Counter)
- requestsDuration (Timer)
Not Included
- This proposal is focused only reading S3 objects starting with a notification. Thus any use-case for replay is not part of this scope. Also, use-cases for reading existing logs are not covered. These use-cases can have their own issue.
- Updated S3 objects are not part of the scope. This work will only support use-cases when a log file is written once.
- Configuration of SQS queue to receive SNS topics should be done externally. Data Prepper will not manage this.
Tasks