Giter Club home page Giter Club logo

google-pubsub's Introduction

GooglePubSub

Build Status

This library lets your agent code connect to Google’s Cloud Pub/Sub service. It makes use of the Google Cloud Pub/Sub REST API.

To add this library to your project, add #require "GooglePubSub.agent.lib.nut:1.1.0" to the top of your agent code.

The Google Cloud Pub/Sub Service

Google Cloud Pub/Sub is a publish/subscribe service — a messaging service where the senders of messages are decoupled from the receivers of those messages. There are five main entities used by the Pub/Sub service:

  • Message The data (with optional attributes) that moves through the service.
  • Topic A named entity that represents a feed of messages.
  • Subscription A named entity that represents an interest in receiving messages on a particular topic.
  • Publisher An entity that creates messages and sends (publishes) them to the messaging service on a specified topic.
  • Subscriber An entity that receives messages on a specified subscription.

Communication between publishers and subscribers can be one-to-many, many-to-one or many-to-many.

Before working with Google Cloud Pub/Sub Service you need to:

Google Cloud Projects

A Google Cloud Project is the component of the Google Cloud Platform which allows users to create, configure and use all Cloud Platform resources and services, including Pub/Sub. All Pub/Sub topics and subscriptions are owned by a specific project; the library’s classes needs to instanced for each project you work with.

You can view example that will show you how to create and configure a Google Cloud Project here. For more information, please see Google Cloud Project resource description and Creating and Managing Projects.

Library Usage

The library API is described in detail in a separate document.

Main Classes

The library consists of five independent classes. You can instantiate and use any of these classes in your agent code as required by your application. They are:

  • GooglePubSub.Topics — Provides Pub/Sub topics management. One instance of this class is enough to manage all of the topics in one project.
  • GooglePubSub.Publisher: — Allows an agent to publish messages to a topic. One instance of this class allows your code to publish messages to one topic.
  • GooglePubSub.Subscriptions — Provides Pub/Sub subscriptions management. One instance of this class is enough to manage all the subscriptions in one project.
  • GooglePubSub.PullSubscriber — Allows your code to receive messages from a pull subscription. One instance of this class allows your code to receive messages from one pull subscription.
  • GooglePubSub.PushSubscriber — Allows your code to receive messages from a push subscription configured with the agent URL as a push endpoint. One instance of this class allows your code to receive messages from one push subscription.

To instantiate any of these classes you need to have:

Access Token Provider

The library requires an external provider of access tokens to gain access to Google Cloud services. The provider must contain an acquireAccessToken() method that takes one required parameter: a handler that is called when an access token is acquired or an error occurs. The handler itself has two required parameters: token, a string representation of the access token, and error, a string with error details (or null if no error occurred). You can either write the provider code yourself or use an external library such as Electric Imp’s OAuth2.JWTProfile.Client OAuth2 library, which contains the required acquireAccessToken() method.

For information about Google Cloud Pub/Sub service authentication, see this page.

Example: OAuth2 JWTProfile Access Token Provider
// GooglePubSub library
#require "GooglePubSub.agent.lib.nut:1.1.0"

// OAuth 2.0 library
#require "OAuth2.agent.lib.nut:2.0.0"

// Substitute with real values
const GOOGLE_ISS = "...";
const GOOGLE_SECRET_KEY = "...";

// configuration for OAuth2
local config = {
    "iss"         : GOOGLE_ISS,
    "jwtSignKey"  : GOOGLE_SECRET_KEY,
    "scope"       : "https://www.googleapis.com/auth/pubsub"
};

// Obtain the Access Tokens Provider
local oAuthTokenProvider = OAuth2.JWTProfile.Client(OAuth2.DeviceFlow.GOOGLE, config);

// Instantiation of topics and publisher components
const PROJECT_ID = "<your_google_cloud_project_id>";
topics <- GooglePubSub.Topics(PROJECT_ID, oAuthTokenProvider);

const TOPIC_NAME = "<your_topic_name>";
publisher <- GooglePubSub.Publisher(PROJECT_ID, oAuthTokenProvider, TOPIC_NAME);

Callbacks and Error Processing

All requests that are made to the Google Cloud Pub/Sub service occur asynchronously. Every method that sends a request has an optional parameter which takes a callback function that will be called when the operation is completed, successfully or not. The callbacks’ parameters are listed in the corresponding method documentation, but every callback has at least one parameter, error. If error is null, the operation has been executed successfully. Otherwise, error is an instance of the GooglePubSub.Error class and contains the details of the error, accessed through the following properties:

  • type — Error type (see below).
  • details — Human readable details of the error.
  • httpStatus — The returned HTTP status code (not present for all error types).
  • httpResponse — Response body of the failed request (not present for all error types).

The error type will be one of the following:

  • PUB_SUB_ERROR.LIBRARY_ERROR — This is reported if the library has been wrongly initialized or invalid arguments are passed into a method. Usually it indicates an issue during an application development which should be fixed during debugging and therefore should not occur after the application has been deployed. Includes the GooglePubSub.Error.details property.

  • PUB_SUB_ERROR.PUB_SUB_REQUEST_FAILED — This is reported if the HTTP request to Google Cloud Pub/Sub service fails. This error may occur during the normal execution of an application. The application logic should process this error. Includes the GooglePubSub.Error.details, GooglePubSub.Error.httpStatus and GooglePubSub.Error.httpResponse properties.

  • PUB_SUB_ERROR.PUB_SUB_UNEXPECTED_RESPONSE — This indicates an unexpected behavior by the Google Cloud Pub/Sub service, such as a response which does not correspond to the Google Cloud Pub/Sub REST API specification. Includes the GooglePubSub.Error.details and GooglePubSub.Error.httpResponse fields.

Message Class

GooglePubSub.Message is an auxiliary class that represents a Google Cloud Pub/Sub message, which is a combination of data (of any type) and attributes (a table) that moves through the service. Both data and attributes are optional.

GooglePubSub.Message instances are retrieved using the GooglePubSub.PullSubscriber and/or GooglePubSub.PushSubscriber classes.

GooglePubSub.Message instances may be sent to the Pub/Sub service using the GooglePubSub.Publisher class.

local gMsg = GooglePubSub.Message("Test message", { "attr1" : "value1", "attr2" : "value2" });

Topics Class

A topic is a named entity that represents a message channel. Each message is sent to the Google Cloud Pub/Sub service via a specified topic. Each topic name must be unique to its project.

The GooglePubSub.Topics class can be used to the check the existence of a topic within a project; create and delete topics from the project; and obtain a list of a project’s topics.

The GooglePubSub.Topics class is not intended to send messages — use the GooglePubSub.Publisher class.

Examples

const TOPIC_NAME = "<my_topic_name>";

topics <- GooglePubSub.Topics(PROJECT_ID, oAuthTokenProvider);

// Create topic if it doesn't exist
topics.obtain(TOPIC_NAME, { "autoCreate" : true }, function(error) {
    if (error) {
        server.error(error.details);
    } else {
        // The topic exists or has been created.
        // You can now publish or receive messages
    }
});

// Remove topic
topics.remove(TOPIC_NAME, function(error) {
    if (error) {
        server.error(error.details);
    } else {
        // The topic was removed successfully
    }
});

// Get the list of all topics
topics.list({ "paginate" : false }, function(error, topicNames, nextOptions) {
    if (error) {
        server.error(error.details);
    } else {
        // 'topicNames' contains names of all the topics registered to the project
        foreach (topic in topicNames) {
            // Process topics individually
        }
    }
});

// Callback for paginated list of topics
function topicsListCallback(error, topicNames, nextOptions) {
    if (error) {
        server.error(error.details);
    } else {
        // 'topicNames' contains limited number of topic names
        foreach (topic in topicNames) {
            // Process topics individually
        }
    }

    if (nextOptions) {
        // More topics exist, so continue listing
        topics.list(nextOptions, topicsListCallback);
    }
}

// Retrieve a paginated list of topics
topics.list({ "paginate" : true, "pageSize" : 5 }, topicsListCallback);

Publisher Class

The GooglePubSub.Publisher class allows the agent to publish messages to a specific topic. One instance of this class publishes messages to one of a project’s topics. Both the project and topic are specified in the class constructor.

The class provides only one method, GooglePubSub.Publisher.publish(), which accepts the following data as a message:

  • Raw data of any type. This may be used to send a data value without attributes.
  • An instance of the GooglePubSub.Message class. It may be used to send a message with attributes, with or without a data value.
  • An array of raw values or an array of GooglePubSub.Message instances. It may be used to send several messages with one request.

Examples

const TOPIC_NAME = "<my_topic_name>";
publisher <- GooglePubSub.Publisher(PROJECT_ID, oAuthTokenProvider, TOPIC_NAME);

// Publish a simple message
publisher.publish("Hello!", function(error, messageIds) {
    if (!error) {
        // Message published successfully
    }
});

// Publish an array of compound messages
publisher.publish(
    [
        { "temperature" : 36.6, "humidity" : 80 },
        { "temperature" : 37.2, "humidity" : 75 }
    ],
    function(error, messageIds) {
        if (!error) {
            // Message published successfully
        }
    });

// Publish a message with attributes using GooglePubSub.Message class
publisher.publish(
    GooglePubSub.Message("Test message", { "attr1" : "value1", "attr2" : "value2" }),
    function(error, messageIds) {
        if (!error) {
            // Message published successfully
        }
    });

Subscriptions Class

A subscription is a named entity that represents an interest in receiving messages on a particular topic. Messages are received from a specified Google Cloud Pub/Sub subscription. Subscription names must be unique within the project.

There are two types of subscription:

  • Pull — A subscriber that wants to receive messages manually initiates requests to the Pub/Sub service to retrieve messages. The received messages should be explicitly acknowledged.
  • Push — The Pub/Sub service sends each message as an HTTP request to the subscriber at a pre-configured endpoint. The endpoint acknowledges the message by returning an HTTP success status code.

The class allows your to manage both pull and push subscriptions. It is possible to change a push subscription to a pull one, or vice versa, using the GooglePubSub.Subscriptions.modifyPushConfig() method.

One instance of GooglePubSub.Subscriptions is sufficient to manage all of the subscriptions belonging to the project specified in the class constructor. It can be used to check the existence of a subscription; to create, configure and delete the project’s subscriptions; and to obtain a list of the subscriptions registered to the project or related to a topic.

A subscription configuration is encapsulated in an instance of the GooglePubSub.SubscriptionConfig class. Additional configuration parameters for a push subscription are encapsulated in an instance of GooglePubSub.PushConfig which has a pushEndpoint property: the URL of the endpoint that messages should be pushed to.

Receiving Messages

The GooglePubSub.Subscriptions class allows to the agent to manage Pub/Sub subscriptions but is not intended to receive messages. To receive messages, the library provides two other components:

  • To receive messages from a pull subscription, use the GooglePubSub.PullSubscriber class.
  • To receive messages from a push subscription, use the GooglePubSub.PushSubscriber class. This only works for a subscription configured with a push endpoint which is based on the URL of the agent where the library is running. The method GooglePubSub.Subscriptions.getImpAgentEndpoint() may be used to generate such an URL. To create and use any push subscription, the push endpoint must be registered with the Google Cloud Platform as described in the ‘Registering endpoints’ section of the Google Cloud Pub/Sub Push Subscriber Guide.

Examples

const TOPIC_NAME = "<my_topic_name>";
const SUBSCR_NAME = "<my_pull_subscription_name>";
const SUBSCR_NAME_2 = "<my_other_pull_subscription_name>";
const PUSH_SUBSCR_NAME = "<my_push_subscription_name>";

subscrs <- GooglePubSub.Subscriptions(PROJECT_ID, oAuthTokenProvider);

// Check if the subscription exists
subscrs.obtain(SUBSCR_NAME, null, function(error, subscrConfig) {
    if (error) {
        if (error.type == PUB_SUB_ERROR.PUB_SUB_REQUEST_FAILED && error.httpStatus == 404) {
            // The subscription doesn't exist
        } else {
            // A different error occurs
            server.error(error.details);
        }
    } else {
        // The subscription exists
    }
});

// Get or create a pull subscription
subscrs.obtain(
    SUBSCR_NAME_2,
    { "autoCreate" : true, "subscrConfig" : GooglePubSub.SubscriptionConfig(TOPIC_NAME) },
    function(error, subscrConfig) {
        if (error) {
            server.error(error.details);
        } else {
            // The subscription is obtained
        }
    });

// Get or create a push subscription based on the agent URL
subscrs.obtain(
    PUSH_SUBSCR_NAME,
    {
        "autoCreate" : true,
        "subscrConfig" : GooglePubSub.SubscriptionConfig(
            TOPIC_NAME,
            10,    // ackDeadlineSeconds
            GooglePubSub.PushConfig(subscrs.getImpAgentEndpoint()))
    },
    function(error, subscrConfig) {
        if (error) {
            server.error(error.details);
        } else {
            // The subscription is obtained
        }
    });

// Remove a subscription
subscrs.remove(SUBSCR_NAME_2, function(error) {
    if (error) {
        server.error(error.details);
    } else {
        // the subscription removed successfully
    }
});

// Get a list of subscriptions related to the topic
subscrs.list({ "topicName" : TOPIC_NAME }, function(error, subscrNames, nextOptions) {
    if (error) {
        server.error(error.details);
    } else {
        // 'subscrNames' contains names of all subscriptions related to the topic
        foreach (subscr in subscrNames) {
            // Process subscriptions individually
        }
    }
});

PullSubscriber Class

The GooglePubSub.PullSubscriber class allows your code to receive messages from a pull subscription and to acknowledge their receipt. One instance of this class receives messages from one of the project’s pull subscription. Both project and subscription are specified in the class constructor.

The received messages are provided as instances of the GooglePubSub.Message class.

The GooglePubSub.PullSubscriber class provides three types of pull operation:

  • One-shot — A basic pull operation. Call the GooglePubSub.PullSubscriber.pull() method. This checks for new messages and triggers the callback immediately, with or without the messages. It might be used in a universal or in any custom use case, or when other pull operations are not convenient.

  • Periodic — Call the GooglePubSub.PullSubscriber.periodicPull() method. This periodically checks for new messages and triggers the callback if new messages are available. It might be used when an application does not need to deal with new messages as quickly as possible. If the required period is small, consider using a pending pull (see below).

  • Pending (waiting) — Call the GooglePubSub.PullSubscriber.pendingPull() method. This waits for new messages and triggers the callback when new messages appear. Optionally, it may automatically recall the same pending pull operation after the callback is executed. This operation might be used in a case when an application needs to deal with new messages as quickly as possible.

Only one pull operation per subscription can be active at a time. An attempt to call a new pull operation while another one is active will fail with a PUB_SUB_ERROR.LIBRARY_ERROR error.

Periodic and pending pull operations may be cancelled by calling the method GooglePubSub.PullSubscriber.stopPull().

Every pull method has an optional parameter to specify the maximum number of messages returned in the callback. Note that the Google Cloud Pub/Sub service may return fewer messages than the specified maximum number even if there are more messages currently available in the subscription.

Message Acknowledgement

There are two ways to acknowledge the receipt of a message:

  • Every pull method has an optional autoAck parameter. When it is set to true, the received messages are automatically acknowledged by the library. This is the recommended setting.
  • Call GooglePubSub.PullSubscriber.ack() to manually acknowledge messages. The method accepts the instances of the GooglePubSub.Message class which are received using the pull methods, as well as message acknowledgment IDs, which may be obtained from the received messages.

Examples

const SUBSCR_NAME = "<my_pull_subscription_name>";
pullSubscriber <- GooglePubSub.PullSubscriber(PROJECT_ID, oAuthTokenProvider, SUBSCR_NAME);

// One-shot pulling
pullSubscriber.pull({ "autoAck" : true }, function(error, messages) {
    if (!error && messages.len() > 0) {
        foreach (msg in messages) {
            // Process messages individually
            server.log(format("Message received: %s: %s", msg.publishTime, msg.data));
        }
    }
});

// Periodic pulling with manual acknowledge
pullSubscriber.periodicPull(5.0, null, function(error, messages) {
    if (!error) {
        pullSubscriber.ack(messages, function(error) {
            if (!error) {
                // Messages acknowledged successfully
            }
        });
    }
});

// Pending pulling
pullSubscriber.pendingPull({ "repeat" : true, "autoAck" : true }, function(error, messages) {
    // The callback is executed every time new messages arrive
    if (!error) {
        foreach (msg in messages) {
            // Process messages individually
        }
    }
});

PushSubscriber Class

The GooglePubSub.PushSubscriber class lets the agent receive messages from a push subscription configured with a push endpoint based on the URL of the agent where the library is running. One instance of this class receives messages from one push subscription. Both the subscription and the project it belongs to, are specified in the class constructor.

The class provides only one method: GooglePubSub.PushSubscriber.setMessagesHandler(). This checks if the specified subscription is configured with an appropriate endpoint URL and sets the specified handler function to be executed every time new messages are received.

The received messages are provided as instances of the GooglePubSub.Message class and are automatically acknowledged by the library.

Examples

const PUSH_SUBSCR_NAME = "<my_push_subscription_name>";
pushSubscriber <- GooglePubSub.PushSubscriber(PROJECT_ID, oAuthTokenProvider, PUSH_SUBSCR_NAME);

function messageHandler(error, messages) {
    // The handler is executed when incoming messages are received
    if (!error) {
        foreach (msg in messages) {
            // Process messages individually
            server.log(format("Message received: %s: %s", msg.publishTime, msg.data));
        }
    }
}

pushSubscriber.setMessagesHandler(messageHandler, function(error) {
    if (!error) {
        // Push messages handler set successfully
    } else if (error.type == PUB_SUB_ERROR.LIBRARY_ERROR) {
        // PushSubscriber cannot be used for the specified subscription
        // (e.g. the subscription's push endpoint does not match the agent URL)
    }
});

IAM Class

Google’s Identity and Access Management (IAM) functionality allows your code to manage access control by defining who (members) has what access (role) for which resource. For example:

  • Grant access to any operation for a particular topic or subscription to a specific user or group of users.
  • Grant access with limited capabilities, such as only publish messages to a certain topic, or only consume messages from a certain subscription, but not to delete the topic or the subscription.

For a detailed description of IAM and its features, please see the Google Cloud Identity and Access Management Overview.

GooglePubSub.IAM is the class that provides IAM functionality for individual Google Cloud Pub/Sub resources (topics and subscriptions). This class should not be instantiated by a user directly; instead the GooglePubSub.Topics.iam() and GooglePubSub.Subscriptions.iam() methods are used to get the instances and to execute IAM methods for topics and subscriptions, respectively.

One instance of GooglePubSub.IAM class is sufficient to manage access to either all the topics or all the subscriptions belonging to one project.

IAM policy representation is encapsulated in instances of the GooglePubSub.IAM.Policy class.

Examples

const TOPIC_NAME = "<my_topic_name>";
topics <- GooglePubSub.Topics(PROJECT_ID, oAuthTokenProvider);

// Get the access control policy for the topic
topics.iam().getPolicy(TOPIC_NAME, function(error, policy) {
    if (error) {
        server.error(error.details);
    } else {
        // The policy was obtained successfully
        foreach (binding in policy.bindings) {
            // Process policy bindings
            server.log(format(
                "Binding %s : %s",
                binding.role,
                binding.members.reduce(function(prev, curr) { return (prev + ", " + curr); })));
        }
    }
});

const SUBSCR_NAME = "<my_subscription_name>";
subscrs <- GooglePubSub.Subscriptions(PROJECT_ID, oAuthTokenProvider);

local testedPermissions = ["pubsub.subscriptions.get", "pubsub.subscriptions.delete"];

// Test subscription permissions
subscrs.iam().testPermissions(
    SUBSCR_NAME,
    testedPermissions,
    function(error, returnedPermissions) {
        if (!error) {
            // 'returnedPermissions' contains the subset of 'testedPermissions' that allowed for the subscription
        }
    });

Further Examples

Working examples are provided in the Examples directory and described here.

License

The GooglePubSub library is licensed under the MIT License

google-pubsub's People

Contributors

ekaterina-nbl avatar alexey-nbl avatar electricimpsamplecode avatar

Watchers

James Cloos avatar

Forkers

ppetrosh

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.