Giter Club home page Giter Club logo

openwhisk-package-kafka's Introduction

Apache OpenWhisk package for communication with Kafka or IBM Message Hub

License Build Status

This project is an OpenWhisk package that allows you to communicate with Kafka or IBM Message Hub instances for publishing and consuming messages using native high performance Kafka API.

OpenWhisk is a cloud-first distributed event-based programming service and represents an event-action platform that allows you to execute code in response to an event. These events can come from many different sources, such as Object Storage, direct HTTP, Cloudant database, etc. One of the popular event sources is Message Hub and Kafka, both of which can also be used as an effective instrument to publish events from OpenWhisk to the rest of the world and allow for highly scalable and performant asynchronous communication for event driven applications.

Using the Messaging package

This package allows you to create triggers that react when messages are posted to either an IBM Message Hub instance, or to a generic Kafka instance. Since the parameters required for each of these situations are different, there are two separate feeds to handle them: /messaging/messageHubFeed and messaging/kafkaFeed.

Additionally, two actions are included which allow you to produce messages to either Message Hub, or generic Kafka instances. These are /messaging/messageHubProduce, and /messaging/kafkaProduce respectively.

Creating a Trigger that listens to an IBM MessageHub instance

In order to create a trigger that reacts when messages are posted to a Message Hub instance, you need to use the feed named /messaging/messageHubFeed. This feed action supports the following parameters:

Name Type Description
kafka_brokers_sasl JSON Array of Strings This parameter is an array of <host>:<port> strings which comprise the brokers in your Message Hub instance
user String Your Message Hub user name
password String Your Message Hub password
topic String The topic you would like the trigger to listen to
kafka_admin_url URL String The URL of the Message Hub admin REST interface
isJSONData Boolean (Optional - default=false) When set to true this will cause the provider to attempt to parse the message value as JSON before passing it along as the trigger payload.
isBinaryKey Boolean (Optional - default=false) When set to true this will cause the provider to encode the key value as Base64 before passing it along as the trigger payload.
isBinaryValue Boolean (Optional - default=false) When set to true this will cause the provider to encode the message value as Base64 before passing it along as the trigger payload.

While this list of parameters may seem daunting, they can be automatically set for you by using the package refresh CLI command:

  1. Create an instance of Message Hub service under your current organization and space that you are using for OpenWhisk.

  2. Verify that the topic you want to listen to already exists in Message Hub or create a new topic, for example mytopic.

  3. Refresh the packages in your namespace. The refresh automatically creates a package binding for the Message Hub service instance that you created.

$ wsk package refresh
created bindings:
Bluemix_Message_Hub_Credentials-1
$ wsk package list
packages
/myBluemixOrg_myBluemixSpace/Bluemix_Message_Hub_Credentials-1 private

Your package binding now contains the credentials associated with your Message Hub instance.

  1. Now all you need to do is create a Trigger that will be fired when new messages are posted to your Message Hub topic.
$ wsk trigger create MyMessageHubTrigger -f /myBluemixOrg_myBluemixSpace/Bluemix_Message_Hub_Credentials-1/messageHubFeed -p topic mytopic

Setting up a Message Hub package outside Bluemix

If you're not using OpenWhisk in Bluemix or if you want to set up your Message Hub outside of Bluemix, you must manually create a package binding for your Message Hub service. You need the Message Hub service credentials and connection information.

  1. Create a package binding that is configured for your Message Hub service.
$ wsk package bind /whisk.system/messaging myMessageHub -p kafka_brokers_sasl "[\"kafka01-prod01.messagehub.services.us-south.bluemix.net:9093\", \"kafka02-prod01.messagehub.services.us-south.bluemix.net:9093\", \"kafka03-prod01.messagehub.services.us-south.bluemix.net:9093\"]" -p user <your Message Hub user> -p password <your Message Hub password> -p kafka_admin_url https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443
  1. Now you can create a Trigger using your new package that will be fired when new messages are posted to your Message Hub topic.
$ wsk trigger create MyMessageHubTrigger -f myMessageHub/messageHubFeed -p topic mytopic -p isJSONData true

Creating a Trigger that listens to a Generic Kafka instance

In order to create a trigger that reacts when messages are posted to an unauthenticated Kafka instance, you need to use the feed named messaging/kafkaFeed. This feed supports the following parameters:

Name Type Description
brokers JSON Array of Strings This parameter is an array of <host>:<port> strings which comprise the brokers in your Message Hub instance
topic String The topic you would like the trigger to listen to
isJSONData Boolean (Optional - default=false) When set to true this will cause the provider to attempt to parse the message value as JSON before passing it along as the trigger payload.
isBinaryKey Boolean (Optional - default=false) When set to true this will cause the provider to encode the key value as Base64 before passing it along as the trigger payload.
isBinaryValue Boolean (Optional - default=false) When set to true this will cause the provider to encode the message value as Base64 before passing it along as the trigger payload.

Example:

$ wsk trigger create MyKafkaTrigger -f /whisk.system/messaging/kafkaFeed -p brokers "[\"mykafkahost:9092\", \"mykafkahost:9093\"]" -p topic mytopic -p isJSONData true

Using a separated kafka feed provider for each user

Sometimes users may not want to expose their kafka to the shared feed providers which are provided by OpenWhisk cloud provider. They can run their own providers and use their own CouchDB/Cloudant by passing below additional parameters when create/update/get/delete triggers:

Name Type Description
dedicated Boolean true to use dedicated kafka feed providers and CouchDB/Cloudant, default is false
DB_URL URL The base URL(including username:password) for persistent storage (either CouchDB or Cloudant)
DB_NAME String The database name for triggers
workers An array of the IDs of the running instances with each ID of the form workerX. e.g. ["worker0", "worker1"]

Example:

$ wsk trigger create MyKafkaTrigger -f /whisk.system/messaging/kafkaFeed -p brokers "[\"mykafkahost:9092\", \"mykafkahost:9093\"]" -p topic mytopic -p isJSONData true -p dedicated true -p DB_URL http://admin:admin@localhost:5984 -p DB_NAME dedicated_triggers -p workers "[\"worker0\"]"

Listening for messages

After creating a trigger, the system will monitor the specified topic in your messaging service. When new messages are posted, the trigger will be fired.

The payload of that trigger will contain a messages field which is an array of messages that have been posted since the last time your trigger fired. Each message object in the array will contain the following fields:

  • topic
  • partition
  • offset
  • key
  • value

In Kafka terms, these fields should be self-evident. However, key has an optional feature isBinaryKey that allows the key to transmit binary data. Additionally, the value requires special consideration. Optional fields isJSONData and isBinaryValue are available to handle JSON and binary messages. These fields, isJSONData and isBinaryValue, cannot be used in conjunction with each other.

As an example, if isBinaryKey was set to true when the trigger was created, the key will be encoded as a Base64 string when returned from the payload of a fired trigger.

For example, if a key of Some key is posted with isBinaryKey set to true, the trigger payload will resemble the below:

{
    "messages": [
        {
            "partition": 0,
            "key": "U29tZSBrZXk=",
            "offset": 421760,
            "topic": "mytopic",
            "value": "Some value"
        }
    ]
}

If the isJSONData parameter was set to false (or not set at all) when the trigger was created, the value field will be the raw value of the posted message. However, if isJSONData was set to true when the trigger was created, the system will attempt to parse this value as a JSON object, on a best-effort basis. If parsing is successful, then the value in the trigger payload will be the resulting JSON object.

For example, if a message of {"title": "Some string", "amount": 5, "isAwesome": true} is posted with isJSONData set to true, the trigger payload might look something like this:

{
  "messages": [
    {
      "partition": 0,
      "key": null,
      "offset": 421760,
      "topic": "mytopic",
      "value": {
          "amount": 5,
          "isAwesome": true,
          "title": "Some string"
      }
    }
  ]
}

However, if the same message content is posted with isJSONData set to false, the trigger payload would look like this:

{
  "messages": [
    {
      "partition": 0,
      "key": null,
      "offset": 421761,
      "topic": "mytopic",
      "value": "{\"title\": \"Some string\", \"amount\": 5, \"isAwesome\": true}"
    }
  ]
}

Similar to isJSONData, if isBinaryValue was set to true during trigger creation, the resultant value in the trigger payload will be encoded as a Base64 string.

For example, if a value of Some data is posted with isBinaryValue set to true, the trigger payload might look something like this:

{
  "messages": [
    {
      "partition": 0,
      "key": null,
      "offset": 421760,
      "topic": "mytopic",
      "value": "U29tZSBkYXRh"
    }
  ]
}

If the same message is posted without isBinaryData set to true, the trigger payload would resemble the below example:

{
  "messages": [
    {
      "partition": 0,
      "key": null,
      "offset": 421760,
      "topic": "mytopic",
      "value": "Some data"
    }
  ]
}

Messages are batched

You will notice that the trigger payload contains an array of messages. This means that if you are producing messages to your messaging system very quickly, the feed will attempt to batch up the posted messages into a single firing of your trigger. This allows the messages to be posted to your trigger more rapidly and efficiently.

Please keep in mind when coding actions that are fired by your trigger, the number of messages in the payload will always be greater than 0. While there is technically no upper limit on the number of messages fired, limits are in place to ensure that each trigger payload is below the payload size limit defined by your OpenWhisk deployment.

Here is an example of a batched trigger payload (please note the change in the offset value):

{
  "messages": [
      {
        "partition": 0,
        "key": null,
        "offset": 100,
        "topic": "mytopic",
        "value": {
            "amount": 5
        }
      },
      {
        "partition": 0,
        "key": null,
        "offset": 101,
        "topic": "mytopic",
        "value": {
            "amount": 1
        }
      },
      {
        "partition": 0,
        "key": null,
        "offset": 102,
        "topic": "mytopic",
        "value": {
            "amount": 999
        }
      }
  ]
}

Checking the status and configuration of a trigger

The status and configuration of a feed trigger can be gotten using wsk trigger get.

Example:

$ wsk trigger get myTopicTrigger

This response will contain a result object containing the status of the trigger along with configuration information

e.g.

{
  "result": {
      "config": {
          "isBinaryKey": false,
          "isBinaryValue": false,
          "isJSONData": false,
          "kafka_admin_url": ...,
          "kafka_brokers_sasl": [
              ...
          ],
          "user": ...,
          "password": ...,
          "topic": "myTopic",
          "triggerName": "/myNamespace/myTopicTrigger"
      },
      "status": {
          "active": true,
          "dateChanged": 1517245917340,
          "dateChangedISO": "2018-01-29T17:11:57Z"
      }
  }
}

Triggers may become inactive when certain exceptional behavior occurs. For example, there was an error firing the trigger, or it was not possible to connect to the kafka brokers. When a trigger becomes inactive the status object will contain additional information as to the cause.

e.g

{
   "status": {
       "active": false,
       "dateChanged": 1517936358359,
       "dateChangedISO": "2018-02-06T16:59:18Z",
       "reason": {
           "kind": "AUTO",
           "message": "Automatically disabled trigger. Consumer was unable to connect to broker(s) after 30 attempts",
           "statusCode": 403
       }
   }
}

Updating the configuration of a trigger

It is possible to update a limited set of configuration parameters for a trigger. The updatable parameters are:

  • isBinaryKey
  • isBinaryValue
  • isJSONData

These parameters can be updated using wsk trigger update

Examples:

$ wsk trigger update myTopicTrigger -p isJSONData true
$ wsk trigger update myTopicTrigger -p isJSONData false -p isBinaryKey true -p isBinaryValue

Producing messages to Message Hub

The /messaging/messageHubProduce Action is deprecated and will be removed at a future date. To maintain optimal performance, migrate your usage of the /messaging/messageHubProduce Action to use a persistent connection, for example, by deploying a non-OpenWhisk component which contains a Message Hub client.

The deprecated /messaging/messageHubProduce takes the following parameters:

Name Type Description
kafka_brokers_sasl JSON Array of Strings This parameter is an array of <host>:<port> strings which comprise the brokers in your Message Hub instance
user String Your Message Hub user name
password String Your Message Hub password
topic String The topic you would like the trigger to listen to
value String The value for the message you would like to produce
key String (Optional) The key for the message you would like to produce
base64DecodeValue Boolean (Optional - default=false) If true, the message will be produced with a Base64 decoded version of the value parameter
base64DecodeKey Boolean (Optional - default=false) If true, the message will be produced with a Base64 decoded version of the key parameter

While the first three parameters can be automatically bound by using wsk package refresh, here is an example of invoking the action with all required parameters:

wsk action invoke /messaging/messageHubProduce -p kafka_brokers_sasl "[\"kafka01-prod01.messagehub.services.us-south.bluemix.net:9093\", \"kafka02-prod01.messagehub.services.us-south.bluemix.net:9093\", \"kafka03-prod01.messagehub.services.us-south.bluemix.net:9093\"]" -p topic mytopic -p user <your Message Hub user> -p password <your Message Hub password> -p value "This is the content of my message"

Producing messages to a generic Kafka instance

๐Ÿ‘‰ Note The /messaging/kafkaProduce Action is deprecated and will be removed at a future date. To maintain optimal performance, migrate your usage of the /messaging/kafkaProduce Action to use a persistent connection, for example, by deploying a non-OpenWhisk component which contains a Kafka Producer.

The deprecated /messaging/kafkaProduce takes the following parameters:

Name Type Description
brokers JSON Array of Strings This parameter is an array of <host>:<port> strings which comprise the brokers in your Kafka cluster
topic String The topic you would like the trigger to listen to
value String The value for the message you would like to produce
key String (Optional) The key for the message you would like to produce
base64DecodeValue Boolean (Optional - default=false) If true, the message will be produced with a Base64 decoded version of the value parameter
base64DecodeKey Boolean (Optional - default=false) If true, the message will be produced with a Base64 decoded version of the key parameter

Here is an example of invoking the action with all required parameters:

wsk action invoke /messaging/kafkaProduce -p brokers "[\"mykafkahost:9092\", \"mykafkahost:9093\"]" -p topic mytopic -p value "This is the content of my message"

Producing Messages with Binary Content

You may find that you want to use one of the above actions to produce a message that has a key and/or value that is binary data. The problem is that invoking an OpenWhisk action inherently involves a REST call to the OpenWhisk server, which may require any binary parameter values of the action invocation to be Base64 encoded. How to handle this?

The action caller (you, or your code) must first Base64 encode the data, for example, the value of the message you want to produce. Pass this encoded data as the value parameter to the produce action. However, to ensure that the produced message's value contains the original bytes, you must also set the base64DecodeValue parameter to true. This will cause the produce action to first Base64 decode the value parameter before producing the message. The same procedure applies to producing messages with a binary key, using the base64DecodeKey parameter set to true in conjunction with a Base64 encoded key parameter.

Examples

Integrating OpenWhisk with IBM Message Hub, Node Red, IBM Watson IoT, IBM Object Storage, and IBM Data Science Experience

Example that integrates OpenWhisk with IBM Message Hub, Node Red, IBM Watson IoT, IBM Object Storage, IBM Data Science Experience (Spark) service can be found here.

Architecture

Architecture documentation and diagrams, please refer to the Architecture Docs

Development and Testing

If you wish to deploy the feed service yourself, please refer to the Development Guide.

References

Building from Source

To build this package from source, execute the command ./gradlew distDocker

openwhisk-package-kafka's People

Contributors

abaruni avatar anthonyamanse avatar bvennam avatar cbickel avatar chetanmeh avatar csantanapr avatar dgrove-oss avatar dubee avatar jasonpet avatar jbampton avatar jiangpengcheng avatar markusthoemmes avatar mrutkows avatar nikolaystarodubtsev avatar rabbah avatar romankhar avatar srl295 avatar style95 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

openwhisk-package-kafka's Issues

Thorough logging

Logging should be implemented to not only help in debugging any issues that may arise, but also to help in performance analysis. Interesting events to log include:

  • If an active consumer is programmatically shut down due to an error (e.g. 404 firing a trigger)
  • When a consumer must retry firing a trigger
  • Start and end times when successfully consuming a message
  • Size of consumed messages (in aggregate per each run of poll() is probably fine)
  • When consumers are successfully created or deleted by user request
  • Authentication failures
  • Existing consumers are recreated during start up

Propose the design model for the message provider service

This design continues the previous description of the message consumer service #19.

The provider service has two major responsibilities: monitoring the changes of the event source and sending the message into the messaging service.

Integration with the event sources:
The provider service provides different templates to monitor different event sources. For example, object storage has one template, the database has another template, etc. Users can launch the provider instance based on the template to monitor the changes of the event source. Since each provider instance serves one user for one particular event source, the user can define which trigger will be fire in each provider instance. When an event happens, the provider generates the message and send it to the Kafka service.

In order to generate the message consumed by the consumer service, the provider needs to be compliant with the following format:
ID: used to save the message ID
event_source: used to save the event source, which generates the change.
event_source: which type of the event should be specified.
trigger_id: trigger id or name
trigger_uri: uri of the trigger
namespace: namepsace of the trigger
apikey: authentication to fire the trigger
payload: a json format string as the input of the trigger or action.

When the user wants to listens a one event source like object storage for changes, he needs to choose the template of object storage provider, specify the trigger information and launch the instance.

action to post a message to Kafka

The Kafka/Message Hub package should include an action that allows the caller to post a message to some topic in some Kafka or Message Hub instance.

Guard against creating duplicate consumers

While it is not really possible to use the CLI to create duplicate triggers, the triggers PUT endpoint should guard against direct invocations that might try to create duplicate consumers. A simple check in the consumers dictionary should be sufficient.

split feed action into two

Rather than have one feed action that can service both Message Hub and Kafka instances, let's split it into two separate actions, one for each type of instance.

This will make not only the naming of the actions clear (MessageHubFeed and KafkaFeed) but it will also make describing the parameters much simpler as well.

Make wsk package refresh work

From what I understand, there is some black magic that needs to happen when the Message Hub feed action is installed. This will tell package refresh which bmx service to use as the basis for new package bindings.

Document How to Delete Trigger

The feed package_endpoint parameter must be specified when deleting a trigger. Otherwise, the consumer will not be removed from the service.

update Kafka Log file name for monitoring

Looks like we need to have the log file in a certain format in order to be picked up by the log forwarder and eventually viewable in Kibana. Here is the format for the Kafka logs:

kafkaTriggers_log.log

here is how I think they are currently being stored as:

kafkatriggers.log

You also need to print the lines in a format that our current logstash with forwarder can parsed them and send them to ELK

Message Hub consumer set to auto commit offsets

When initializing the KafkaConsumer for Message Hub, the auto commit flag is set to True even though we definitely want to manually commit offsets only after successfully firing the trigger.

consumers do not recover from a network outage

I happened to be running the feed service on my laptop when the network completely went out (I pulled the plug). As you would expect, the one consumer I had started freaking out at not being able to connect to Message Hub. However, it also did not recover once the network connection was restored.

The following messages were repeated indefinitely until I restarted the entire service:

WARNING:kafka.client:Node 1 connection failed -- refreshing metadata
ERROR:kafka.coordinator:Error sending GroupCoordinatorRequest_v0 to node 1 [NodeNotReadyError: 1]
WARNING:kafka.coordinator:Coordinator unknown during heartbeat -- will retry
WARNING:kafka.coordinator:Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError: ); retrying

Feed Action Displays a String Instead of JSON for Request Responses

If there is an error in a feed action, a string is printed as the response body. The information should, however; be displayed as JSON instead of a string.

Actual output:

"error": {
     "response": "{\n  \"error\": \"not authorized\", \n  \"success\": false\n}\n",
     "statusCode": 401
}

Expected output:

"error": {
     "response": {
          "error":  "not authorized",
          "success": false
     },
     "statusCode": 401
}

build system

Implement a build system based on gradle. This system should be capable of:

  • building the Docker image
  • starting a Docker container with test-only Cloudant credentials
  • running automated tests against that running container

Disable SSL checking for local development

Often when developing/testing locally, OpenWhisk uses a self-signed certificate. This causes problems for this feed provider which (by default) insists on checking the validity of the SSL cert. With checking enabled during local development, GET and POST calls to OpenWhisk fail.

Fix will be to optionally disable SSL cert checking, controlled by the use of an environment variable.

Consumer becomes unresponsive without exception - socket disconnected

It has been observed that a Consumer will get into a state where it is indefinitely repeating "socket disconnected" messages to the log. In the meantime, it is not consuming new messages.

This problem seems to come from the kafka-python client itself, where (I suppose) it is getting into an error-recovery loop.

handle when MH instance is deleted

When a trigger exists and the corresponding Message Hub instance is deleted, I believe an exception is thrown (likely from poll())and the consumer thread crashes. While the rest of the service continues, this is a pretty clumsy way to handle the situation.

When this happens, the exception should be caught and message logged, and the consumer should be shut down and cleaned up gracefully.

In the future, there may be a way to indicate this back to the OpenWhisk backend, but for now I think this should suffice.

Retry vs abort for triggers

When firing a trigger, there are some clear error cases where we want to simply retry firing the trigger. For example, when the user is throttled by firing triggers too quickly, the return code is 429. In this case, it is sufficient to pause and retry firing the trigger.

However, other error conditions warrant completely stopping/deleting the consumer entirely. For example if a 404 is hit when firing the trigger, it is clear that the trigger has been deleted and the consumer should be stopped entirely.

There are certainly other possible errors when firing triggers, which should each be handled appropriately with either a retry or complete abort.

/health endpoint should not expose namespaces

Currently, the /health endpoint shows a list of all triggers by fully qualified name. The trouble is that the namespace value can often include personal information as they tend to be based on the user's email address.

One solution could be to generate a UUID for each consumer and store that in the DB. Then, the health endpoint could replace the namespace in each trigger's FQN with the UUID.

api_key is a redundant parameter

api_key == username + password which means that asking for all three as parameters on the Message Hub feed action is just redundant.

Expose health information to trigger owner

In cases where the consumer can no longer continue (message hub instance removed, authentication changed, etc.) the last thing the consumer should do before shutting down is to post to the trigger URL. The body of this post should include useful information to inform the OW user that their trigger is being disabled and, if possible, provide a reason why.

automated test for health monitoring

We need an automated test that will be used to monitor the health of the system. This test should GET the /health endpoint and parse the output for any signs of sickness.

Increase poll timeout limit

I am experiencing a situation where the time to post a trigger to OW is taking more than 2 seconds. Since this time contributes to the entire poll() time, simply posting a trigger results in a poll timeout and consumer restart.

Increase the timeout to something more reasonable.

Remove Invalid Consumers from Dictionary

Consumers that have invalidated Message Hub credential, have a trigger that is deleted, or fail to invoke a trigger more than ten times must be removed from the main dictionary of consumers. Otherwise, these consumers will exist until the Docker container is restarted.

Create default route

This default route can/will be used by deployment/monitoring scripts to ensure that the service is alive (in the most basic way).

restart consumers when time since last poll is too high

I have confirmed an instance where a single consumer stopped polling due to issues communicating with Kafka and the last successful poll time was significantly greater than the regular poll interval. This proves that a single consumer crash can be detected with this measurement.

Knowing this, we now need to gracefully handle this scenario and restart the consumer to recover from the error.

limit size of trigger payload

As the code currently exists, there is potentially no limit to the size of the payload we will attempt to post to a trigger. This is bad.

I believe there are limits we may be able to set with consumer parameters that may do this automatically for us (see max_partition_fetch_bytes though this may involve some partition math). However, failing that, we can examine the size of each consumed message and use that to actively limit the amount of data posted to the trigger.

http://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

special optional handling for JSON formatted messages

Although kafka message content is, strictly speaking, always a byte array, it seems that JSON formatted strings will be so common that it may warrant special handling.

It has been proposed that if the trigger creator specifies that the message format is a JSON formatted string (using a new isJSONData parameter to the feed action), then the feed service should make a reasonable attempt at parsing the message content and passing it appropriately as the trigger payload.

If isJSONData is set to true and the message cannot be parsed as valid JSON, the result is the same as if isJSONData were set to false - which is the current behavior. In this case, the feed service should log a warning message.

@csantanapr

observed consumer crash

It appears there was either a network glitch or the kafka instance was unavailable during the poll. That being said, our code should handle this situation more gracefully.

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/KafkaFeedProvider/consumer.py", line 89, in run
    partition = self.consumer.poll(1000)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 514, in poll
    records = self._poll_once(remaining)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 541, in _poll_once
    self._coordinator.ensure_coordinator_known()
  File "/usr/local/lib/python2.7/site-packages/kafka/coordinator/base.py", line 221, in ensure_coordinator_known
    self._client.poll(future=metadata_update)
  File "/usr/local/lib/python2.7/site-packages/kafka/client_async.py", line 483, in poll
    self._maybe_connect(node_id)
  File "/usr/local/lib/python2.7/site-packages/kafka/client_async.py", line 322, in _maybe_connect
    conn.connect()
  File "/usr/local/lib/python2.7/site-packages/kafka/conn.py", line 304, in connect
    if self._try_handshake():
  File "/usr/local/lib/python2.7/site-packages/kafka/conn.py", line 366, in _try_handshake
    self._sock.do_handshake()
  File "/usr/local/lib/python2.7/ssl.py", line 830, in do_handshake
    self._sslobj.do_handshake()
error: [Errno 110] Operation timed out

/health endpoint

Implement an endpoint that returns data regarding the health of the feed service provider. Information should include things like:

  • average CPU load (past hour? array of averages for each of the past 24 hours?)
  • memory usage
  • number of active consumers

The caller of this endpoint is interested in determining how much work is being done by the feed service, and how much more work can it do before it is overloaded.

authentication when creating consumers

In order to avoid creating consumers for unauthorized trigger URLs, before creating a new consumer we should somehow authenticate with the OpenWhisk system using the credentials given for the trigger.

Do not error when feed action is invoke with namespace `_`

Have the feed action go get the value for namespace if the one pass as parameter is _

We are instructing users to unset the namespace here:
https://console.ng.bluemix.net/openwhisk/learn/cli

wsk property unset --namespace

The feed action will need to handle when user uses the new auth key method and no namespace is configured in the CLI.

Options:

  1. Have the feed action do a a OW API call to get the namespace for the authkey
  2. Have the runtime provide the namespace string associated with the authkey being used to invoke the feedaction. Today there is a namespace environment variable but it doesn't have the value that we need it has the value of the owner of the action not the namespace associated with the authkey.

Additional Tests Cases Must be Created

We will use this issue to track which tests need to be create.

  • Ensure proper error messages and status codes are returned when creating Kafka, and Message Hub tests with missing or invalid parameters. This will need to be done through creating feeds and posting triggers directly to the container.

  • Do not allow duplicate triggers in the database (Whisk won't allow duplicate trigger creation)

  • Do not allow a trigger that does not exist to be delete

  • Allow for deletion of a trigger that exists

  • Ensure a user has access to a trigger before creating it

  • Ensure a user has access to a trigger before deleting it (Whisk doesn't allow trigger access to unauthenticated triggers)

  • Have a consumer encounter an HTTP status code other than 503, 500, 408, 429, or 200 to ensure retries occur in the polling process and that the consumer recovers on purpose

  • Have a consumer encounter an HTTP status code other than 503, 500, 408, 429, or 200 to ensure retries occur in the polling process and that the consumer gets shutdown on purpose

  • Have the consumer encounter 503, 500, 408, and 429 error codes to make sure the consumer is shutdown without retries

  • Post triggers to the service, shutdown the Docker container then restart it to make sure everything recovers properly

  • Ensure doctor restarts a consumer that polls for too long (ypKafkaHeartBeat, ys1KafkaHeartBeat)

whisk object deprecated

the actions in this package use the now deprecated whisk object - migration to promises (and npm openwhisk) should be in order.

Propose a lightweight model for the message consumer service

OpenWhisk manages different components in different docker containers, each of whom is dedicated to host one service taking care of the workload for all the users. The current in-progress package messaging(kafka) service, designed in the same model, launches one docker container to manage all the consumers and triggers created by all the users, etc. The major pitfall of this design is that it restrains all the resources in one container and directly leads to single point of failure and lack of scalability.

Digging into the code a little bit, we can see a internal hashmap or dict is created to save all the consumers and mapping relation with the triggers, and each new consumer/new trigger has to create a new record in the database for consistency and recovery if there is a demand for the service restart.

The following issues are what we can predict with the current design:

  • The container becomes a single point of failure. If this service container crashes, all the services are shut down. No service HA can be insured.
  • If we have to restart the service container, we have to load everything available saved in the database, to make the service back online, e.g. each time the service provider or the package messaging(kafka) service, all the triggers for every user have to be reloaded and reinitialized. What if the database information is lost? We lose all the information of the consumers and triggers. No service HA can be insured.
  • No matter how much resource we can allocate to this container, it is limited for sure, so that the container will be drained if too many triggers have been created. We have already run into this issue badly, since openwhisk developers have been told to check the maximum number of triggers that can be created with certain resources. We are running a service with limit, which is not a model able to run in cloud.

If the triggers exceeds this number, what can we do to this situation without changing the implementation? Some are possible, but no ideal solution at this moment.

  • We can add a new consumer service to listen to the messagehub. The drawback is that we only have a limited topic to listen to for the kafka message. Even if we add one more consumer, we cannot make sure that the triggers over 1200 created in the second consumer pick up the message, because both the two consumers listen to the the same topic. Also, each consumer service maintains a separate and different internal map, and they are still single points of failure. If we lose one of them, we lose all the information in the internal map. Third, how can we control the message is consumed by the consumer service, which has the internal map we need? If the message needs to be consumed by trigger A, how do we know it can go to the consumer service with the mapping information for trigger A? Very difficult. We can send the message to all the consumer services, but in that case it is a waste of resources.
  • We can add new messagehub service. In this case, we will end up managing multiple messaging services. It will be very complex to manage which trigger/consumer maps to which message hub.

We can see that the internal map of the registry is really an issue, which limits the number triggers we are able to handle, and make the service rather difficult to scale.

In order to resolve the above issues, we have to reconsider the design model for the message consumer service. I would like to propose a more lightweight model, in which, the consumer service only takes care of listening to the messaging service, picking up the message from the specific topic and consuming it. We are able to launch more consumer services in HA mode, since they have the same responsibilities.

Where do we get the information of the trigger to be fired?
From openwhisk's perspective, we are already able to create the actions and triggers, and associate a rule between them, so the only thing that the consumer service is to get the information for the trigger. Instead of maintaining the relationship between the consumer and the trigger in the internal map/dict and the database. We can offload the information of the trigger to provider service. When the provider discovers a change from the event source, it sends a message with the trigger information, e.g. the trigger ID, trigger uri, etc, and credentials and the payload of the final action into the messaging service. When the consumer service receives it, simply fire the trigger by the uri with the payload.

In this model, the consumer service has been tailored into a lightweight module, which listens to the topic, receives the message and fires the trigger with the payload and credentials, since all the information is prepared by the provider service and available in the message. There is no database involved and no specific information to be saved. The service is able to scale, since we can launch the consumer service as many as we need. As long as there is one consumer service running, our service can guarantee the availability.

The only thing we need to define is the message format to be consumed by the consumer service. I will describe the message format in details for the design of the provider service: #20.

refactor feed actions for code reuse

The two feed actions contain a lot of duplicate code. These should be refactored so that common code can go into a common file that is shared between the two actions. This will require deploying the feed actions as zip files.

Set an explicit timeout for posting triggers

I ran into a situation where there was a broken pipe posting a trigger to OW. However, it took an extremely long time (maybe a minute or more?) before the post failed. We should set an explicit timeout on the post to keep this to a reasonable time.

Feed action needs to error if namespace is underscore

The namespace needs to be a unique string, it can be the openwhisk _ default

This is required to create a unique DB id for the trigger when store in the provider DB

    var namespace = encodeURIComponent(triggerComponents[1]);
``
Need to reject/error the action feed if namespace is not a string, and can't be `_`
For both actions:
https://github.com/openwhisk/openwhisk-package-kafka/blob/master/action/kafkaFeed.js#L17
https://github.com/openwhisk/openwhisk-package-kafka/blob/master/action/kafkaFeed.js#L17

Include transaction ID in logged messages

Right now, logged messages are completely interleaved and it is hard to follow individual events through the system (creating a trigger, responding to a message, etc.).

If the logged messages used a transaction ID the way the OpenWhisk core system does, it would be a lot easier to follow events through the log.

Handle Trigger Hosts that do not Respond

An error will occur while attempting to authenticate a trigger if the host does not response. This error should be caught, and a proper response sent back to the user.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.