Giter Club home page Giter Club logo

elastic-agent-shipper's Introduction

elastic-agent-shipper

⚠️ Development of the Elastic Agent data shipper has been paused. The functionality it intended to provide will be implemented in other ways.

Data shipper for the Elastic Agent - a single, unified way to add monitoring for logs, metrics, and other types of data to a host.

The data shipper is a new process in the Elastic agent system designed to centralize local data processing, queueing, and publishing to the target output (Elasticsearch, Logstash, etc.).

The data shipper is a part of a larger effort to rearchitect the Elastic agent. In the initial Elastic agent architecture each underlying data collector (e.g. Filebeat) was required to implement its own processing, queueing, and output connection(s) for each supported output type. The data shipper simplifies this architecture by allowing data collectors to implement a single gRPC client to process, queue, and publish data. The initial design goals of the data shipper are to:

  • Remove the need for processing, queueing, and output protocols to be reimplemented in each input.
  • Minimize the number of output connections required in Elastic agent deployments.
  • Simplify configuration and performance tuning of Elastic agent data pipelines.
  • Make Elastic agent data pipelines more observable and easier to debug.
  • Improve on or maintain the performance of the existing Beats outputs.
  • Define the event publishing interface all current and future Elastic agent data inputs will use.

Each output in an agent policy will map to a separate instance of the shipper process: Elastic Agent Data Shipper

Client Development

Data shipper clients must implement the shipper gRPC API. The reference client is the Beats shipper output, which is used by Beats like Filebeat and Metricbeat when they are started by Elastic agent integrations.

Data shipper support in the Elastic Agent is under active development.

The shipper currently can be run in 2 modes:

  • Under Elastic agent (managed mode): the main mode for running in production environments
  • Using a local config file (unmanaged mode): is supposed to be used for local development and testing

To run the shipper in the unmanaged mode use this flag on the built binary:

./elastic-agent-shipper run -c elastic-agent-shipper.yml

Where elastic-agent-shipper.yml is a path to a local configuration file. The reference shipper configuration file defines the available configuration options.

Contributing

The process for contributing to any of the Elastic repositories is similar.

  1. Please make sure you have signed our Contributor License Agreement. We are not asking you to assign copyright to us, but to give us the right to distribute your code without restriction. We ask this of all contributors in order to assure our users of the origin and continuing existence of the code. You only need to sign the CLA once.

  2. Send a pull request! Push your changes to your fork of the repository and submit a pull request. New PRs go to the main branch. The development team will backport your PR to previous release branches if necessary. In the pull request, describe what your changes do and mention any bugs/issues related to the pull request.

Developing

The data shipper is developed in Go so install the version which is being used for shipper development. One deterministic manner to install the proper Go version to work with the shipper is to use the GVM Go version manager.

The data shipper primarily uses the mage build system. The list of supported mage commands can be obtained by running mage -l from the root of the repository. The most commonly used commands are:

  • mage build to build the data shipper binary.
  • mage check to check license files and dependencies.
  • mage lint to lint the source code using golangci-lint.
  • mage package to produce release artifacts.
  • go test ./... to run all tests.

Run mage commands with the -v flag for more detailed output, for example mage -v check.

elastic-agent-shipper's People

Contributors

alexsapran avatar andersonq avatar apmmachine avatar cmacknz avatar dependabot[bot] avatar elasticmachine avatar faec avatar fearful-symmetry avatar github-actions[bot] avatar jlind23 avatar kpollich avatar leehinman avatar leo-ri avatar michalpristas avatar narph avatar pierrehilbert avatar rdner avatar robbavey avatar v1v avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

elastic-agent-shipper's Issues

Libbeat disk queue should support custom serialization

Currently libbeat's disk queue (de)serialization (https://github.com/elastic/beats/blob/main/libbeat/publisher/queue/diskqueue/serialize.go) assumes the underlying event type is publisher.Event, which needs to change before we can support the disk queue in the shipper. The disk queue should accept a serializer as part of its configuration (possibly defaulting to its current behavior for simplicity in libbeat), so we can use it to store shipper events.

Evaluate defining the PublishEvents() RPC as a stream

The primary RPC for sending events to the shipper is PublishEvents, currently defined as:

// Publishes a list of events via the Elastic agent shipper.
// Blocks until all processing steps complete and data is written to the queue.
// The order of `PublishRequest.events` always matches `PublishReply.results`.
//
// Returns the `codes.ResourceExhausted` gRPC status code if the queue is full and none of the events
// can be accepted at the moment.
//
// If the queue could accept some events from the request, this returns a successful response
// containing results for the first K events that were accepted by the queue.
// The client is expected to retry sending the rest of the events in a separate request later.
//
// Inputs may execute multiple concurrent Produce requests for independent data streams.
// The order in which concurrent requests complete is not guaranteed. Use sequential requests to
// control ordering.
rpc PublishEvents(PublishRequest) returns (PublishReply);

We should evaluate whether it would be more efficient to implement this RPC as a stream, or provide a streaming version of the RPC. The gRPC performance best practices guide (https://grpc.io/docs/guides/performance/) suggests using streaming RPCs for long lived exchanges, like those between most inputs and the shipper:

Use streaming RPCs when handling a long-lived logical flow of data from the client-to-server, server-to-client, or in both directions. Streams can avoid continuous RPC initiation, which includes connection load balancing at the client-side, starting a new HTTP/2 request at the transport layer, and invoking a user-defined method handler on the server side.

Streams, however, cannot be load balanced once they have started and can be hard to debug for stream failures. They also might increase performance at a small scale but can reduce scalability due to load balancing and complexity, so they should only be used when they provide substantial performance or simplicity benefit to application logic. Use streams to optimize the application, not gRPC.

Side note: This does not apply to Python (see Python section for details).

(Special topic) Each gRPC channel uses 0 or more HTTP/2 connections and each connection usually has a limit on the number of concurrent streams. When the number of active RPCs on the connection reaches this limit, additional RPCs are queued in the client and must wait for active RPCs to finish before they are sent. Applications with high load or long-lived streaming RPCs might see performance issues because of this queueing. There are two possible solutions:

Create a separate channel for each area of high load in the application.

Use a pool of gRPC channels to distribute RPCs over multiple connections (channels must have different channel args to prevent re-use so define a use-specific channel arg such as channel number).

Side note: The gRPC team has plans to add a feature to fix these performance issues (see grpc/grpc#21386 for more info), so any solution involving creating multiple channels is a temporary workaround that should eventually not be needed.

Note that the agent is planning to migrate the gRPC connection from using TCP to named pipes/Unix domain sockets which may affect our choices here: elastic/elastic-agent#218

Implement the shipper's gRPC API

Implement the shipper's gRPC API, which is currently a skeleton defined in https://github.com/elastic/elastic-agent-shipper/blob/main/server/server.go

The API specification is currently defined in https://github.com/elastic/elastic-agent-shipper/blob/main/api/shipper.proto

Acceptance Criteria:

  • #84
  • The API specification is updated with any changes or improvements that were necessary.
  • A reference API client implementation exists and integrates with the server side implementation.
  • An integration test suite exists to verify that the client and server implementations interact correctly, with emphasis on error conditions: partially accepted batches, full queues, server and client side restarts, etc.

Correctly handle enqueued events affected by agent policy changes

We need to think through all the edge cases that can arise when events in the shipper queue are affected by an agent policy change. For a concrete example, consider the case where a user removes an integration but events collected by that integration still reside in the shipper queue:

  1. User creates Agent policy rev.1 containing integration A and Integration B.
  2. Fleet server generates an API key with append permission to write to data stream for integration A and B.
  3. Elastic Agent receives and runs the Agent policy rev.1
  4. Elastic Agent needs to persist events to disk (events from integration B and A are persisted on disk).
  5. User removes integration B, Agent policy is updated to rev. 2.
  6. Fleet server generates an API key with append permission to write to data stream for integration A.
  7. Elastic Agent receives and runs Agent policy rev.2
  8. Elastic Agent acknowledges the configuration.
  9. Fleet-Server invalidates Elasticsearch api key.

In the case above the events for the removed integration B will never be able to be ingested by Elasticsearch after the API key has been changed. This series of events is worse with the disk queue because the number of events can be larger, but this situation would apply to the memory queue as well.

We must also consider that every policy change does not necessarily cause a problem. For example, changing the number of output workers does not affect events in the queue.

For policy changes that do affect enqueued events, there are several paths forward we could take to solve this problem:

  1. Decide that it is safe to drop events for integration B, and have a mechanism to do so reliably when the API key changes. This option is complicated by the shipper pipeline being unaware of agent policy changes and the ability to configure infinite retry of failed events.
  2. Ensure all affected events have been successfully sent and removed from the queue before acknowledging the policy change. In the V2 agent control protocol the the agent will could send the shipper expected state of stopped which the shipper can take it as a signal to flush all events. The gent doesn't accept it to be done and the policy rolled out until that unit is reported back as observed "stopped". So as soon as it gets "stopped" as the expected state, it reports "stopping" (aka. starting the flush), then "stopped" (aka. completely flushed).

Option 2 avoids data loss, but is the most complex path forward. There are multiple ways we could ensure all events affected by a policy change are drained from the shipper queue before acknowledging the policy change:

  1. Have the agent provision a second instance of the shipper process, with new events routed to the new second instance. The policy change is considered acknowledged when the original shipper exits successfully after flushing all events to the output. The system would need to handle the case where the first shipper never exits successfully. The primary downside with this solution is it temporarily doubles the number of queues and connections made the to the output.
  2. Have the shipper internally provision a second instance of its data pipeline, with all new events routed to the new pipeline. This is the same as the first option but with the pipeline duplicated in a single shipper process. The number of connections can be kept constant but the number of queues is doubled.
  3. A policy change emits a special meta event into the pipeline. When this event is read at the output the shipper knows all affected events have been flushed through the queue and it acknowledges the policy change. This avoids duplicating the queues and connections.

This is a complex issue with many possible solutions. Evaluate each of the proposed solutions (and consider new ones) to decide which path we should take to solve this issue. The outcome of this issue should be a meta issue with an implementation plan for solving this problem.

[Meta] Elastic Agent Shipper Project

Experimental, Beta, and GA Criteria

Blockers for Beta Milestone 1 - Elasticsearch output and memory queue

NEW - Convert the shipper to a Filebeat input

Bugs

Features

Elasticsearch Output V2

Monitoring

Performance

Testing

Technical Debt

Documentation

  • Document the shipper process and how it interacts with agent policy:
  • Document the shipper feature flag and the current stability guarantees.
  • Document why a user would want to enable the shipper.
  • Document how the number of shippers (and therefore output connections) can be controlled via the agent policy.
  • Document configuration of the shipper queues.
  • Document any backwards compatibility or migration issues when moving from per beat outputs and queues to the centralized shipper.

Future Feature Milestones

Kafka Output

Logstash Output

Global Processors

Endpoint Security Support

  • #213
  • #275
  • Require use of an encrypted disk queue when Elastic Defend is installed

Disk Queue

Future Enhancements - Not Yet Planned

Allow the Beats' shipper output to depend on the shipper client directly.

The Beats' shipper output currently clones the the shipper's generated protobuf client to avoid a dependency cycle between the shipper and beats: https://github.com/elastic/beats/tree/main/libbeat/outputs/shipper/api

We need to break this dependency cycle and ensure the API is only defined in a single place. Decide on an approach and eliminate the duplication of the shipper API between beats and the shipper. Initial options include but are not limited to:

  1. Removing the shipper's dependency on beats by moving the code it needs to https://github.com/elastic/elastic-agent-libs. In the long term we should do this regardless, but it may be easier to wait until the shipper prototype is complete to avoid delaying implementation with package moves: #43
  2. Move the shipper API definitions to separate repository specifically for the API definition and any reusable client code. The most obvious place is https://github.com/elastic/elastic-agent-client which is currently used for the agent control protocol. This would give developers a single repository to consume for working with the agent. The elastic-agent-client repository is currently owned by the agent control plane team.

Prototype running the shipper under agent using the V1 protocol

The agent V2 protocol is what the final version of the shipper should use, but given that it is still under development we should attempt to integrate with the agent as it exists today with the V1 protocol. The V1 version of the protocol is defined in the same proto file as V2: https://github.com/elastic/elastic-agent-client/blob/main/elastic-agent-client.proto

For reference the beats V1 client implementation can be found in the x-pack/libbeat directory: https://github.com/elastic/beats/tree/main/x-pack/libbeat/management

To allow the agent to run the shipper binary, we must define a spec file for it defining how to start the process and how to transform the agent policy into a format the shipper can understand. See the one for metricbeat as an example: https://github.com/elastic/elastic-agent/blob/main/internal/spec/metricbeat.yml

The shipper should be provided the monitoring and output sections of a policy verbatim.

Investigate changing the pipeline order to improve performance

Let's consider the case where the shipper is configured to use a disk queue with the Elasticsearch output. Let's also assume we use the default protobuf encoding over gRPC. If we reuse the existing structure of the beats publishing pipeline, the data flow will look like:

flowchart LR

A[Input] -->|Protobuf| B[Server] 
B --> C[Processors] 
C -->|CBOR| D[Disk Queue] 
D -->|JSON| E[Elasticsearch]
Loading

The diagram shows that the data must be serialized multiple times:

  1. To the protobuf wire format when the input sends events to the shipper using gRPC. This could optionally be replaced with JSON, but we would likely still need to deserialize it regardless.
  2. To CBOR when writing to the disk queue.
  3. To JSON when writing to Elasticsearch.

It seems extremely worthwhile to restructure the pipeline to eliminate the amount of times the data must be serialized:

flowchart LR

A[Input] -->|Protobuf| B[Server] 
B -->|Protobuf| C[Disk Queue] 
C --> D[Processors] 
D -->|JSON| E[Elasticsearch]
Loading

In this case we would change the disk queue's serialization format to protobuf, deferring deserialization until after data as been read from the queue. This leaves us with a single transformation from protobuf, to the shipper's internal data format, and then back to JSON (or whatever encoding the output requires).

If the memory queue were used instead of the disk queue, we could use the same strategy of storing serialized events in the memory queue and only decoding them when they are read from the queue. This would give us a way to deterministically calculate the number of bytes stored in the memory queue. Currently the memory queue size must be specified in events.

The output of this issue should be a proof of concept demonstrating that this reordering of the pipeline is possible and has the expected benefits. At minimum the work will need to include:

  1. Modifying the gRPC server in the shipper to stop deserializing messages so they can be passed directly to the queue. The ideal option would be to keep the existing RPC definitions but implement a no-op codec. See the gRPC encoding documentation. We may need to write a custom set of RPC handlers instead of generating them:
    func _Producer_PublishEvents_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    A fallback option is to use messages that just wrap a bytes payload with the required message type and serialization documented in the RPC call.
  2. Modify the disk queue to use protobuf serialization. At minimum this depends on #41 and possibly some of the work for #33 to use the new disk queue headers.
  3. Ensure we can still return errors back to clients (after deserialization or processing, for example). #9 should provide a mechanism for this.
  4. Benchmark the performance of the modified pipeline and compare it to the original configuration. We do not have a set of repeatable performance tests yet, so we may choose to defer this work until we do.

Automatically set GOMAXPROCS

Describe the enhancement:
Automatically set GOMAXPROCS when beats are run in containers with a CFS quota set to minimize CPU throttling.

See elastic/apm-server#7967 and elastic/apm-server#8278 for more context. https://github.com/uber-go/automaxprocs can be used to easily add this feature.

Describe a specific use case for the enhancement or feature:
Setting GOMAXPROCS based on the configured CPU quota makes performance more predictable in containerized environments. Kubernetes deployments are particularly likely to experience problems from unoptimized GOMAXPROCS configurations.

Clone of elastic/beats#31920 for the shipper.

Elastic agent metric dashboard for the shipper

Fleet exposes a metric dashboard for each enrolled agent, with output metric broken down by each process making a connection to Elasticsearch. A screenshot of this dashboard is below:

Screen Shot 2022-06-13 at 2 57 22 PM

How should we update this dashboard when the agent is using the shipper? The output statistics are currently grouped by beat.type by default. Should we continue doing this? Will aggregating by beat type continue to work with the shipper implementation where the output is no longer per beat?

Should we group by data stream or input source instead and obsolete the beat.type parameter?


Design document: https://docs.google.com/document/d/18bYC5IshBhVoIRXmbDCA8Ga8UD1mrrgtUlmhPeoWqoU/edit

Define the initial shipper gRPC interface

Define the initial shipper gRPC service and use protoc to generate the client and server implementations. The interface should be defined in the https://github.com/elastic/elastic-agent-client repository to ensure the entire elastic agent interface is defined in one place, and can be imported without depending on it's internal implementation.

The proposal specifies an initial event protocol. Decide whether specification should be taken as the starting point, or we can start with a minimal version of it (a Publish RPC with empty request and response messages) and add to it as we add features tot he shipper.

The shipper gRPC client and server should be an importable Go package once this issue is complete.

The PublishEvents RPC should block when the queue is full

Implementation issue following the discussion in #81.

Specifically the RPC should block until at least one event is accepted into the queue. The RPC should not block until all events in the batch have been accepted into the queue.

The shipper queue's publish interface already blocks when the queue is full based on the underlying Beats' memory and disk queue implementations:

producer := eventQueue.Producer(beatsqueue.ProducerConfig{})
return &Queue{eventQueue: eventQueue, producer: producer}, nil
}
func (queue *Queue) Publish(event *messages.Event) (EntryID, error) {
if !queue.producer.Publish(event) {
return EntryID(0), ErrQueueIsFull
}

  1. Beats' memory queue publish implementation: https://github.com/elastic/beats/blob/e6db9a53f2fff60b350153b03407bc38b21bf0b1/libbeat/publisher/queue/memqueue/produce.go#L128-L136
  2. Beats' disk queue publish implementation: https://github.com/elastic/beats/blob/e6db9a53f2fff60b350153b03407bc38b21bf0b1/libbeat/publisher/queue/diskqueue/producer.go#L52-L62

One missing piece with the existing Publish method is that it does not accept a context.Context as input. This means that when a PublishEvents RPC call is made with a timeout then that timeout will be ignored if the RPC is blocked in the queue Publish method. We will need to modify the queue interface to accept a context.Context and propagate the RPC context to it in the PublishEvents method.

Implement more efficient output tuning parameters to manage throughput

Beats have many knobs and whistles that allow the user to modify output related parameters in order to increase throughput. These parameters are extremely convoluted and sometimes contradict one another. With the new shipper design we have the opportunity to simplify and create more meaningful parameters for users to use.

Performance Tuning Proposal

  1. Change bulk_max_size to maximum_batch_size to be more meaningful. maximum_batch_size is the total batch size in bytes
  2. Allow the user to modify the maximum_batch_size in the UI. Specify maximum_batch_size to be in bytes rather than events.
    a. Bytes are easier to mentally consume
    b. It’s also easier to map to data seen on the wire
    c. On the Elasticsearch ingest, the max document size is configured in bytes
  3. Introduce a NEW variable output_queue_flush_timeout
    a. Upon expiry the output queue is flushed and data written to the output
    b. Users can lower this timeout to reduce the delay in collecting data

In summary for tuning the output we now will have 2 variables: maximum_batch_size and output_queue_flush_timeout

[Design] Should the `PublishEvents` RPC block when the queue is full?

The shipper PublishEvents documentation currently specifies that it should block until processing is completed and events have been enqueued:

service Producer {
 // Publishes a list of events via the Elastic agent shipper.
 // Blocks until all processing steps complete and data is written to the queue.
 //
 // If the queue could not accept some events from the request, this returns a successful response
 // containing a count of events that were accepted by the queue.
 // The client is expected to retry sending the rest of the events in a separate request.
 // 
 // The client is also expected to have some kind of backoff strategy
 //	in case of a reply with an accepted count < the amount of sent events.
 rpc PublishEvents(messages.PublishRequest) returns (messages.PublishReply)

The initial gRPC server implementation however does not implement this behaviour: #76 (comment).

The scope of this issue is to decide whether the PublishEvents RPC should block when the queue is full, and then update either the RPC documentation or the server implementation based on the outcome.

There were two example scenarios provided in the original discussion:

Example 1. The queue filled up during the request (logic already in place)

  1. The client requests to publish 50 events
  2. The queue can accept only 5 and we get "queue is full" error back from publishing the 6th
  3. The server replies to the client with the current AcceptedCount and AcceptedIndex values partially accepting the event batch

Example 2. The queue is full from the beginning (needs to be implemented)

  1. The client requests to publish 50 events
  2. When publishing the first event, the queue returns "queue is full" error and cannot accept even a single event
  3. The server blocks the client when calling the underlying queue's Publish call.
  4. Once at least one event is accepted by the queue this switches to the behaviour in Example

The behaviour proposed in the examples above are that PublishEvents would block until at least one event is accepted, and then return with the accepted count indicating how many events were accepted. The alternative would be blocking until all events are accepted.

Build 31 for main with status FAILURE

💔 Build Failed

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2022-08-15T18:16:42.863+0000

  • Duration: 20 min 23 sec

Test stats 🧪

Test Results
Failed 0
Passed 20
Skipped 0
Total 20

Steps errors 2

Expand to view the steps failures

Restore files previously stashed
  • Took 12 min 45 sec . View more details here
  • Description: source
Checks if running on a Unix-like node
  • Took 0 min 0 sec . View more details here

Support delaying shutdown until the shipper queue is empty

To minimize data loss, the shipper should support delaying shutdown and process exit until there are no longer any active Publish RPCs and all events remaining in the queue have been written to the output.

Specifically, when the agent control protocol indicates the shipper's expected state is STOPPING, the shipper should not acknowledge the STOPPING state and exit until the number of events in the queue is zero.

Ideally the agent will stop the shipper after all agent inputs are confirmed to have stopped so that the only in flight data will reside inside the shipper. This likely requires a change to the agent control plane, but the shipper could also infer whether the inputs are stopped based in the amount of RPC activity.

The maximum time to wait before exiting regardless of the number of events in the queue should be added to the shipper configuration file (and possibly the agent policy). When a disk queue is configured, the shipper can exit after all active RPC calls to publish data are terminated. The events in the queue will have been persisted to disk.

  • Relates to #49

Implement shipper gRPC server metrics

Once the gRPC API implementation in #76 is merged we need to instrument it to ensure metrics are available for debugging.

The server metrics should be available from the shipper's HTTP interface (when enabled) and should be reported every 30s in the logs. The initial scaffolding for reporting metrics exists in https://github.com/elastic/elastic-agent-shipper/tree/main/monitoring.

The following set of metrics should be made available where reasonable to implement:

  • The number of currently active RPC connections.
  • Count of events and ideally bytes received, broken down by input or client ID.
  • Cumulative count of messages and bytes received since start, broken dow by input or client ID.
  • Latency of each gRPC call, ideally broken down by input or client ID.
  • Count of the number of RPCs that accepted partial data because the queue is full.
    • Count of the number of RPCs that rejected a request because the shipper UUID did not match.

[Meta][Feature] Implement encrypted disk queue

Summary

This is a feature meta issue to implement the encrypted disk queue for the shipper. The scope is restricted to implementation of encryption for the existing beats disk queue, changing how the queue interacts with producers, consumers and pipelines is out of scope. Design doc can be found here.

Milestones

This feature is considered complete when the following conditions are met:

  • benchmark existing beats disk queue
  • Implement encrypted queue in beats repo
  • Add end to end test that exercises encrypted queue in beats repo
  • Add to shipper binary
  • benchmark encrypted disk queue

Related Issues

  • elastic/obs-dc-team#729

[Meta] Implement shipper performance testing

The Elastic agent data shipper is actively under development and we need a way to benchmark its performance as part of the agent system. Specifically we are interested in benchmarking the achievable throughput of a single agent using the shipper along with its CPU, memory, and disk IOPS overhead. Users care about the performance of the agent and we need a way to measure and improve it.

Design

The proposed solution is to develop a new load generating input for the agent, which can be installed and configured as a standard agent integration. The test scenario can be changed by modifying the integration configuration or agent policy. Metrics will be collected using the existing agent monitoring features. Where the existing agent monitoring is not adequate, it should be enhanced so that all data necessary to diagnose performance issues is also available in the field. For example, all performance data should be available in the existing agent metrics dashboard.

Data Shipper Performance Testing@2x

The new load generating input should be developed as one of the first non-beat inputs in the V2 agent input architecture. The load generator should be packaged into an agent load testing integration developed using the existing Elastic package tooling. Any agent is then capable of being load tested via installing the necessary integration.

Automated deployment and provisioning can ideally reuse the same tools used to provision Fleet managed agents for end-to-end testing with minimal extra work. When testing Elasticsearch, ideally the instance used for fleet and monitoring data is separate from the instance receiving data from the shipper to avoid introducing instability into Fleet itself during stress tests.

The performance metrics resulting from each test can be queried out of the agent monitoring indices at the conclusion of each test. Profiles can be periodically collected via agent diagnostics or the /debug/pprof endpoint of the shipper.

The initial version of the agent load testing package will implement only a shipper client which it will use to write simulated or pre-recorded events at a configurable rate. Multiple tools exist that could be integrated into the load generator input to generate data on demand: stream, integration corpos generator, spigot, or flog.

Future versions of the load testing package can be developed with the load generator input configured to act as the data source for other inputs to pull from. For example a filebeat instance could be started and configured to consume data from the load generator using the syslog protocol, enabling tests of the entire agent ingestion system. Stream is already used to test integrations with elastic-package today and could serve as the starting point for this functionality.

Implementation Plan

TBD. Insert a development plan with linked issues, including at least the following high level tasks:

  1. Develop a load generator agent input, possibly based on https://github.com/elastic/stream and integrating synthetic data generation.
  2. Develop and publish an agent load testing integration. Allow local testing of the load generator input using the elastic-package tool (see https://github.com/elastic/integrations/blob/main/CONTRIBUTING.md).
  3. Allow running performance tests locally, and collecting test results into a report document that can be ingested into Elasticsearch and tracked over time. Use the APM benchmark output format as a reference: elastic/apm-server#7540
  4. Update the existing agent metrics dashboard to include all relevant performance metrics if they are not already present.
  5. Automate running performance tests on a daily basis. The key to integrating performance testing into CI will be creating repeatable hardware conditions, something several teams in Elastic have already solved.
  6. Allow running performance tests on a PR basis, possibly triggered a dedicated label or as part of the existing E2E test suite.

Beats shipper output shouldn't keep pending events in the queue

ACK handling is being implemented in the shipper and its clients. This means Beats inputs that feed into the shipper output don't receive their acknowledgment callbacks until the shipper has sent the events upstream.

This means that currently, pending events are enqueued both in Beats and in the shipper. Neither one can safely remove them, because it isn't known until the response comes back whether it will need to retry.

There isn't a real need for Beats to do that in this case, though, because as soon as the event has been handed off to the shipper it's already in a local queue with proper retry behavior. The Beats pipeline should accommodate this case by separating producer acknowledgments (which are how inputs track progress) from queue allocations.

One way to do this that would probably require minimal modification of existing APIs is to create a proxy object implementing queue.Queue and used only for the shipper output, that instead of genuinely queueing would just synchronously Publish as batches are assembled, discarding the event data after a successful handoff, but still propagating event acknowledgment back to its producers afterwards. (This is essentially the same pattern we expect standalone shipper clients to follow.)

One complication is that batch assembly typically preserves pointers to the events inside the batch, so even if it isn't explicitly enqueued, the memory can still be used. That will probably require some extra handling or special call in the shipper output to separate freeing the memory from acknowledgment.

Implement an MVP of the Elasticsearch output

Once #7 is complete, implement basic support for the Elasticsearch output.

The simplest solution is to use the existing beats Elasticsearch output, but we should evaluate the effort required to switch to the official go-elasticsearch client (#14) with the initial output. If the effort is too high then switching the underlying Elasticsearch client can be deferred into a future iteration.

This feature is considered complete when at least the following criteria are satisfied:

  • A test exists proving that an Elasticsearch output can be configured, and data published to the shipper is written to Elasticsearch.

Include the shipper in the Elastic Agent diagnostic bundle

The shipper state should be captured by the Elastic agent diagnostic command, documented here: https://www.elastic.co/guide/en/fleet/current/fleet-troubleshooting.html#_collect_elastic_agent_diagnostics_bundle

The diagnostic archive collects the following information:

  • Elastic Agent versions numbers
  • Beats (and other process) version numbers and process metadata
  • Local configuration, elastic-agent policy, and the configuration that is rendered and passed to Beats and other processes
  • Elastic Agent’s local log files

The diagnostics command is also able to collect pprof data in the archive if the --pprof flag is passed.

Ensure that the diagnostic command captures all the expected information when run, and add tests to ensure we do not break compatibility with the diagnostic command in the future.

[Meta][Feature] Enable filebeat and metricbeat to publish data to the shipper

This is a feature meta issue to allow filebeat and metricbeat to publish data to the shipper when run under Elastic agent. All other beats are out of scope.

An output for existing beats should be implemented that publishes to the shipper gRPC interface. When the shipper gRPC output is used, the beat output pipeline should be configured to be as simple as possible. Using a per beat disk queue with the shipper is forbidden. A memory queue may be used with the shipper output, but how it should be configured by users will require careful consideration. Ideally any necessary queue configuration can be made automatic.

Removing processors from beats is out of scope for this issue. Processors will be removed in a later issue.

image

This feature is considered complete when at least the following criteria are satisfied for both filebeat and metricbeat:

  • A test exists proving data ingested by the beat is published to the shipper.
  • A test exists proving there is no data loss when the shipper process restarts while the beat is publishing.
  • A test exists proving there is no data loss when the shipper backpressures the beat (because the shipper queue is full for example).

The assignee of this issue is expected to create the development plan with all child issues for this feature. The following set of tasks should be included in the initial issues at a minimum:

  • Creating a beats output that that publishes to the shipper gRPC interface.
  • Defining a standard configuration for using a beat with the shipper that the control plane can easily apply: processors disabled, queues disabled, etc.
  • Creating an integration test suite for the beat and shipper interactions.

UPD by @rdner

I split this in the following steps:

  • #22
  • #23
    • An event batch is published from an input to the shipper gRPC server
    • An event batch is not dropped when the gRPC server is not available but starts later
    • An event batch is not dropped when ResourceExhausted code is returned from the gRPC server, TTL does not decrease in this case
  • #24
  • #34

[Meta] Shipper 8.5 - Experimental integration with Filebeat and Metricbeat

The 8.5 release milestone minimum feature set necessary to use the shipper with metricbeat and filebeat: only support the memory queue and the Elasticsearch output, leaving processors in the beats. The shipper can be enabled by setting an environment variable when starting the agent. A feature flag will be added to the agent policy and Fleet UI in a future iteration.

image

Dependencies

The primary dependency of the phase 1 data shipper is the agent control plane. The control plane work to support the phase 1 shipper is tracked in:

Implementation

The implementation tasks for the shipper are broken into several independent feature groups, listed below. Tasks will be converted to issues as the implementation progresses.

QA / Testing Needs

Deferred to 8.6

Documentation

N/A - The shipper will be experimental and enabled via an environment variable when starting the agent.

Build 29 for main with status FAILURE

💔 Tests Failed

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2022-08-12T19:08:06.418+0000

  • Duration: 10 min 21 sec

Test stats 🧪

Test Results
Failed 3
Passed 37
Skipped 0
Total 40

Test errors 3

Expand to view the tests failures

Test / Matrix - PLATFORM = "ubuntu-20.04 && immutable" / Test / TestPublish/should_return_validation_errors/no_timestamp – github.com/elastic/elastic-agent-shipper/server
    Expand to view the error details

     Failed 
    

    Expand to view the stacktrace

     === RUN   TestPublish/should_return_validation_errors/no_timestamp
        server_test.go:243: 
            	Error Trace:	server_test.go:243
            	Error:      	Not equal: 
            	            	expected: "timestamp: proto:\u00a0invalid nil Timestamp"
            	            	actual  : "timestamp: proto: invalid nil Timestamp"
            	            	
            	            	Diff:
            	            	--- Expected
            	            	+++ Actual
            	            	@@ -1 +1 @@
            	            	-timestamp: proto: invalid nil Timestamp
            	            	+timestamp: proto: invalid nil Timestamp
            	Test:       	TestPublish/should_return_validation_errors/no_timestamp
            --- FAIL: TestPublish/should_return_validation_errors/no_timestamp (0.00s)
     
    

Test / Matrix - PLATFORM = "ubuntu-20.04 && immutable" / Test / TestPublish/should_return_validation_errors – github.com/elastic/elastic-agent-shipper/server
    Expand to view the error details

     Failed 
    

    Expand to view the stacktrace

     === RUN   TestPublish/should_return_validation_errors
        --- FAIL: TestPublish/should_return_validation_errors (0.00s)
     
    

Test / Matrix - PLATFORM = "ubuntu-20.04 && immutable" / Test / TestPublish – github.com/elastic/elastic-agent-shipper/server
    Expand to view the error details

     Failed 
    

    Expand to view the stacktrace

     === RUN   TestPublish
    --- FAIL: TestPublish (0.01s)
     
    

Steps errors 1

Expand to view the steps failures

Running Go tests
  • Took 0 min 7 sec . View more details here
  • Description: gotestsum --format testname --junitfile junit-report.xml -- -v ./...

[Meta][Project] Implement the Elastic Agent Data Shipper

Goal

The is the project issue tracking the implementation of the Elastic agent data shipper, as designed in the proposal.

image

Motivation

The shipper is part of the Elastic Agent v2 project: elastic/elastic-agent#189

The goals of the Elastic agent data shipper are to:

  • Remove the need for processing, queueing, and output protocols to be reimplemented in each input.
  • Minimize the number of output connections required in Elastic agent deployments.
  • Simplify configuration and performance tuning of Elastic agent output pipelines.
  • Make Elastic agent output pipelines more observable and easier to debug.
  • Define the event publishing interface all Elastic agent inputs will use.
  • Improve on or maintain the performance of the existing beats outputs.

Development Plan

The rollout of the shipper to customers will occur in incremental phases:

  • #15
  • #16
  • Phase 3: Migrate remaining beats and agent inputs. TBD.

Add "elastic-agent' product origin header when running under the Elastic agent.

Describe the enhancement:

In elastic/beats#29966 the Elastic product origin header was added to all requests made to Elasticsearch with the hard coded value "beats". When the code involved is used by or run under the Elastic agent the header value should be changed to "elastic-agent" to properly attribute the source of the request.

This will require adding a new value to the list of known product origins, currently defined here: https://github.com/elastic/kibana/blob/main/x-pack/plugins/upgrade_assistant/common/constants.ts#L50

Describe a specific use case for the enhancement or feature:

Requests made by or under the supervision of the Elastic agent should not be attributed to the beats product.

Create a skeleton shipper executable

Create a skeleton shipper executable that:

  • Logs to stdout per the control plane pipeline specification.
  • Starts a gRPC server when run, that exposes the event gRPC service.
  • Can start an HTTP server based on a command line flag that:
    • Exposes the /debug/vars endpoint for metrics (or similar, like the beats /stats endpoint).
    • Exposes the /debug/pprof endpoint for profiling.
  • Consumes a configuration file that can configure the gRPC and HTTP interfaces at a minimum. The configuration file will be a subsection of the Elastic agent policy and should use the same format.

Attempt to reuse as much of the agent code as we can for these tasks: elastic/elastic-agent#276

Add integration tests for the shipper output

  • An event batch is published from an input to the shipper gRPC server
  • An event batch is not dropped when the gRPC server is not available but starts later
  • An event batch is not dropped when ResourceExhausted code is returned from the gRPC

Increase the default number of workers from 1

Describe the enhancement:

Currently default for worker is 1. This means that for each host that is configured only bulk_max_size events can be in flight at a given time.

Describe a specific use case for the enhancement or feature:

Default of 1 can be a problem when an input has a high volume of data. ES will have 2x number of CPU write threads, so with a default of 2 libbeat could always have twice bulk_max_size in flight.

https://github.com/elastic/beats/blob/d62ecf0d68a17dc0a569b27a3bffb55ccb31d7bd/libbeat/outputs/hosts.go#L30

Tune / finalize the queue's batch and timing parameters

Right now the shipper's internal Beats memory queue uses fixed defaults for its initialization parameters (batch minimum event count and timeout). These are placeholders to bootstrap the shipper, and should be replaced once things are fleshed out enough to run a comparison of the options. This may involve removing those parameters entirely as we rework internal batch handling, but if not, we should decide on a baseline we're happy with and expose configuration hooks if appropriate.

Use the go-elasticsearch client for the Elasticsearch output

The existing shipper Elasticsearch output uses the legacy Beats Elasticsearch client, https://github.com/elastic/beats/tree/main/libbeat/esleg.

The shipper should switch to the official go-elasticsearch client and use its BulkIndexer for writing events to Elasticsearch. In particular, the BulkIndexer will give us the desired set of tuning parameters described in #28 out of the box via the FlushBytes and FlushInterval parameters.

The APM server team has already done this migration successfully, with the caveat that they forked the BulkIndexer into a ModelIndexer specialized to the APM data types.

The code for the APM output is at https://github.com/elastic/apm-server/tree/02740f761049d6c04a283ef4708cdeb51c997b74/internal/model/modelindexer. The main entrypoint is ProcessBatch. The initial sequence of PRs are here for reference:

Exposing the new go-elasticsearch configuration will be handled in follow up issues. The scope of this issue is to switch the existing MVP Elasticsearch output implementation from #137 to use the go-elasticsearch client and its BulkIndexer.

[Meta][Feature] Implement end to end acknowledgement

This is a feature meta issue to support end to end acknowledgement in the shipper. Inputs must be able to be notified when their data has successfully been written to the target output to avoid data loss. End to end acknowledgement is only necessary when using the memory queue. Data published to the disk queue will be acknowledged immediately as it cannot be lost once persisted.

The delivery guarantees section of the shipper proposal describes the high level implementation of end to end acknowledgement in the shipper system. The Event Protocol section includes a preliminary definition of the acknowledgement streaming RPC that can be taken as a starting point.

The feature is considered complete when:

  • A test exists to prove that data published to the shipper is asynchronously acknowledged when:
    • Data is successfully written to the output system using any retry policy.
    • Data fails to write and then succeeds when using the infinite or max retry policies.
    • Data fails to write and then is dropped when using a max retry policy.
  • A test exists to prove that end to end acknowledgement behaves correctly when the shipper is restarted during publishing. Specifically, acknowledgement or queue identifiers are not reused between restarts of the process to avoid acknowledgement collisions.

The assignee of this issue is expected to create the development plan with all child issues for this feature. The following set of tasks should be included in the initial issues at a minimum:

  • elastic/beats#32541
  • #27
  • Implement a mechanism to avoid acknowledgement collisions across shipper restarts. The queue ID sequence should not restart from zero when the shipper restarts for example.
  • Implement the acknowledgement stream RPC to notify inputs when their messages have been acknowledged.
  • elastic/beats#32329
  • #97

[Meta][Feature] Implement the memory queue and output pipeline

This is a feature meta issue to implement the memory queue to output pipeline in the shipper. The scope is restricted to implementation of the memory queue and an output with no external dependencies (the console or file output for example). The disk queue, Elasticsearch/Kafka/Logstash outputs, and processors are explicitly out of scope.

image

This feature is considered complete when at least the following criteria are satisfied:

  • A test exists to prove that data written to the shipper event gRPC interface is publishes to the output. The test should write single events and batches, including batches that are as large as the configured size of the queue to prove it does not block.
  • A test exists to prove that the shipper will backpressure the producer when the queue has been filled. Ideally this means the producer will block until there is enough space in the queue. The backpressure should stop once the queue begins to drain.

The assignee of this issue is expected to create the development plan with all child issues for this feature. The following set of tasks should be included in the initial issues at a minimum:

  • Implementing the event publishing RPC and have it write to the queue.
  • Add queue and output sections to the shipper configuration file. The format must match the format used in agent policy output sections today.
  • Create the queue and output pipeline based on the provided configuration. Allow the configuration to be refreshed.
  • Creation of an integration test suite for the shipper process.

Important milestones:

  • Adapt the memory queue to accept shipper types (elastic/beats#31307)
  • Create a memory queue in the shipper binary and propagate input events through it
  • Create a test output that can confirm events received from the queue
  • Create an integration test that invokes the gRPC publishing interface and verifies its handling via the test output

Allow google.protobuf.Timestamp in shipper Event fields and metadata

The shipper Event payload are currently defined as google.protobuf.Struct which require encoding timestamps as strings:

// Metadata JSON object (map[string]google.protobuf.Value)
google.protobuf.Struct metadata = 5;
// Field JSON object (map[string]google.protobuf.Value)
google.protobuf.Struct fields = 6;
// Note: The google.protobuf.Value type should be extended (or re-implemented) with a
// google.protobuf.Timestamp to avoid the cost of encoding timestamps as strings and then
// parsing them back into a date representation.

Following from the note in the event definition, we should extend the google.protobuf.Struct type to allow using google.protobuf.Timestamp as one of the Value cases. The agent always writes to datastreams which require a timestamp, forcing timestamps to be encoded causes us to inefficiently parse them from string for every message received.

It should be enough to clone the existing google.protobuf.Struct definition and add the new message case in the Value oneof type, for example:

message Value {
  // The kind of value.
  oneof kind {
    // Represents a null value.
    NullValue null_value = 1;
    // Represents a double value.
    double number_value = 2;
    // Represents a string value.
    string string_value = 3;
    // Represents a boolean value.
    bool bool_value = 4;
    // Represents a structured value.
    Struct struct_value = 5;
    // Represents a repeated `Value`.
    ListValue list_value = 6;
    // Represents a timestamp.
    google.protobuf.Timestamp timestamp = 7;
  }
}

Setup the shipper's build, lint, and CI process

Setup the shipper repository with the standard mage targets (check, build, unitTest, etc.). The mage and lint targets should match those setup for https://github.com/elastic/elastic-agent-libs and use the same golangci-lint configuration. The lint step in CI should use a Github action, see https://github.com/elastic/beats/pull/30985/files for the setup.

Jenkins should be configured to run the lint, build, and test steps on each supported platform. The output of the build should initially be an elastic-agent-shipper binary, which can start as an empty main.go file. The https://github.com/elastic/elastic-agent repository can be used as a reference for a build process that produces a binary artifact.

Automated Go version bumps should be configured: https://github.com/elastic/observability-robots/blob/main/docs/bump-versions.md#go-bump

The CI onboarding Jenkins job has already been run, resulting in the following PRs that should be reviewed and merged:

We should attempt to reuse the mage targets from the elastic agent where possible: elastic/elastic-agent#321

Measure and expose shipper queue performance metrics

Add metrics counters and gauges for the follow set of metrics, and ensure they are exposed on the shipper's HTTP monitoring interface under /debug/vars:

  • Report the following set of queue metrics:
    • The current configured maximum size of the queue:
    • The current utilization of the queue in events:
    • #145

The eventual goal is to expose these metrics on the agent details page (example https://$kbana/app/fleet/agents/$agentID) in fleet via the agent monitoring metrics.

Allow running the shipper in standalone mode independently of the agent

The shipper's default mode of operation is to expect to be started by the agent, expecting to be passed the address of the agent control protocol server to connect to. The control protocol is then responsible for sending the shipper it's runtime configuration.

To make local development and testing easier, allow the shipper to run in a standalone mode independent of the agent. Add a new command line flag that causes the shipper to read its configuration file from disk and start without waiting to connect to the agent control protocol server.

Add initial repository documentation

Shipper development has progressed enough that we should add all of the standard repository documentation, and also some more details on how the shipper is implemented:

  1. Add a more detailed README file explaining what the shipper is. Include an architecture diagram that includes how it fits in with the agent.
  2. Add a CONTRIBUTING guide explaining the development process: building, linting, testing, etc.
  3. Create a top level docs folder containing an initial architecture.md file that explains the implementation details of the shipper in detail. Include how it runs under agent, the event pipeline, the queues, etc. Where it exists link to the individual component documentation.

Create something like the fleet dev_docs directory: https://github.com/elastic/kibana/tree/main/x-pack/plugins/fleet/dev_docs

Add a basic shipper integration test.

Most of our ongoing work can or should be verified using integration tests that interact with the shipper process as a whole. This work can happen as part of several of our ongoing issues, but it is easier to define when separated out into a separate issue.

Add the first shipper integration test that starts the shipper process (or invokes its main entry point from a Go test), publishes data to its gRPC API, monitors the progress of the data through the shipper, and verifies the data is produced to the configured output with the expected format.

The simplest initial test will produce a batch using the console output. A file output can be added instead if it is simpler to implement.

The first tests can be modelled after the filestream integration tests in beats. Once we introduce the Elasticsearch output we can rely on docker-compose to start the ES container as filebeat does.

Dependencies:

Allow the Elastic Agent to run the shipper

The data shipper must be runnable by the Elastic agent. This issue captures the shipper side work for elastic/elastic-agent#214.

The shipper executable must implement the Elastic agent V2 control protocol, defined here: https://github.com/elastic/elastic-agent-client. Note that the control protocol is being updated as part of the Agent V2 architecture in elastic/elastic-agent-client#29. The control protocol features to be implemented must be coordinated with the control plane team.

While we should use the V2 protocol in the final version of the shipper, initial prototyping can be done using the V1 implementation to unblock progress while the control plane team finalizes the V2 protocol:

This issue is considered complete when:

  • A test proves that the Elastic agent can successfully start and monitor the data shipper, including logs, metrics, and runtime health.
  • A test proves that the Elastic agent can upgrade the data shipper.
  • A test proves that the Elastic agent can detect when the shipper is unhealthy or stopped.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.