Giter Club home page Giter Club logo

connectors's Introduction

Quick Start Guide: https://github.com/LeoPlatform/Leo

Index

Creating a checksum bot

Prerequisites

NPM requirements

  • leo-sdk: 1.1.0+
  • leo-connector-common: 1.1.2+
  • leo-connector-(mysql|postgres|sqlserver): 1.3.0+

Available Connectors

  1. leo-connector-mysql: npm install leo-connector-mysql
  2. leo-connector-postgres: npm install leo-connector-postgres
  3. leo-connector-sqlserver: npm install leo-connector-sqlserver

Create Database connectors

1: Create a secret key

If using the AWS secrets manager, create secret keys for your databases. The secret names will be used in step 2.

2: Create a database connector

If you already have a connector setup for this database connection, skip this step.

Using the CLI, create a connector bot for each database you need to connect to. If one or more of your connections are an endpoint or a database type we don't support, see the basicConnector section.

Syntax
leo-cli create leo-connector-{connector type} checksum {bot name}
Example
leo-cli create leo-connector-mysql checksum mysqlConnector

Now browse to your new bot (bots/mysqlConnector) and open up package.json and replace the dbsecret key name with the one you created in AWS Secrets Manager.

If you are using a VPC for access to your database, or are using an AWS RDS instance, add the VpcConfig to the *package.json under config.leo object.

Example (config object only, from package.json)
"config": {
    "leo": {
        "type": "bot",
        "memory": 256,
        "timeout": 300,
        "role": "ApiRole",
        "env": {
            "dbsecret": "database_secret_key_name"
        },
        "VpcConfig": {
            "SecurityGroupIds": [
                "sg-123456ab"
            ],
            "SubnetIds": [
                "subnet-abc12345",
                "subnet-def67890",
                "subnet-ghi45679"
            ]
        }
    }
}

Repeat this step for each master or slave database you will run a checksum against.

3: Deploy the connectors

In your service, be sure to install the NPM modules for the connectors you are using.

Now publish and deploy the bots.

Congratulations! You now have connectors setup to run a checksum. Next we'll need to create a checksum runner.

Create a checksum runner (bot) with database connectors

1. Add the required modules

const leo = require('leo-sdk');
const checksum = require('leo-connector-common/checksum');
const moment = require('moment');

2. Connect to the master and slave connectors

Use lambdaConnector to connect to the 2 database connectors you created in the previous section and build out the data you want to compare between the 2 connectors. For this example, I'm using a MySQL connector for the master, and the Postgres for the slave. We're going to compare id and status from the orders tables in both databases.

exports.handler = function(event, context, callback) {
    let db1 = checksum.lambdaConnector('MySQL DB Lead checksum', process.env.mysql_lambda, {
        sql: `SELECT id, status FROM orders WHERE id __IDCOLUMNLIMIT__`,
        table: 'orders',
        id_column: 'id',
        key_column: 'primary'
    });
    let db2 = checksum.lambdaConnector('Postgres DB Lead checksum', process.env.postgres_lambda, {
        sql: `SELECT id, status FROM orders WHERE id __IDCOLUMNLIMIT__`,
        table: 'orders',
        id_column: 'id',
        key_column: 'primary'
    });
    
    // checksum code in step 3 (below) goes here
}

3. Setup the checksum

Now create the checksum with parameters.

let system = 'default';
checksum.checksum(system, event.botId, db1, db2, {
    stopOnStreak: 1750000, // Set the number of records that if the checksum finds in sequence that are identical, it will stop and mark itself as completed.
    stop_at: moment().add({minutes: 4}), // Lambda has a 5-minute limit, so we set this to 4 so the bot has time to cleanup. It will restart right after this and continue where it left off.
    limit: 20000, // the number of records to start comparing between the 2 databases.
    maxLimit: 500000, // If a "block" 20,000 or more records are identical, increase the comparison block size from limit to this max limit
    shouldDelete: false, // set this to true if you want records that exist in the slave database but not in master to be deleted.
    loadSize: 50000, // this is the recommended load size
    reverse: true, // Processes records from highest to lowest. Set to false to process from lowest to highest.
    sample: true, // 
    queue: { // this controls the queue where the ID's go that are marked as missing from the slave database
        name: event.destination, // queue name.
        transform: leo.streams.through((obj, done) => { // How to transform the ID's before sending into the queue.
            done(null, {
                Orders: obj.missing.concat(obj.incorrect)
            });
        })
    }
    //skipBatch: true, // only set to true if you need to 2 connectors to compare individual records insteadof batches
    //showOutput: false
})
.then(data=>{ console.log(data); callback()})
.catch(callback);

4. Configure the checksum bot package.json

Example package.json
{
    "name": "OrdersChecksum",
    "version": "1.0.0",
    "description": "Checksum for the Orders table",
    "main": "index.js",
    "directories": {
        "test": "test"
    },
    "scripts": {
        "test": "leo-cli test . "
    },
    "config": {
        "leo": {
            "type": "cron",
            "memory": 256,
            "timeout": 300,
            "role": "ApiRole",
            "env": {
                "mysql_lambda": {
                    "Fn::Sub": "${MysqlConnector}"
                },
                "postgres_lambda": {
                    "Fn::Sub": "${PostgresConnector}"
                }
            },
            "cron": {
                "settings": {
                    "source": "system:mysqlConnector",
                    "destination": "orderChanges",
                }
            },
            "time": "0 0 0 * * * "
        }
    }
}

5. Edit your cloudformation.json

Your cloudformation will now need to be configured to be able to "invoke lambda". Skip this step if you already have this set.

In your cloudformation.json, search for the configuration for the role you're using. In the package.json example above, we're using "ApiRole". Find the ApiRole in the Resources:

  "Resources": {
    "ApiRole": {
      "Type": "AWS::IAM::Role",
      "Properties": {
        "AssumeRolePolicyDocument": [...],
        "ManagedPolicyArns": [...],
        "Policies": [...Add Invoke Lambda policy here...]
      }
    },

Add policies to invoke lambda, connect to kms, and secrets manager.

Example
{
    "PolicyName": "Invoke_Lambda",
    "PolicyDocument": {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": "lambda:*",
                "Resource": "*"
            }
        ]
    }
},
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret"
            ],
            "Resource": "arn:aws:secretsmanager:*:*:secret:*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "secretsmanager:ListSecrets",
            "Resource": "*"
        }
    ]
},
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "kms:GetParametersForImport",
                "kms:ListKeyPolicies",
                "kms:GetKeyRotationStatus",
                "kms:ListRetirableGrants",
                "kms:GetKeyPolicy",
                "kms:DescribeKey",
                "kms:ListResourceTags",
                "kms:ListGrants",
                "kms:Decrypt"
            ],
            "Resource": "arn:aws:kms:*:*:key/*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "kms:ListKeys",
                "kms:GenerateRandom",
                "kms:ListAliases",
                "kms:ReEncryptTo",
                "kms:ReEncryptFrom"
            ],
            "Resource": "*"
        }
    ]
}

6. Deploy the checksum runner

Make sure the checksum runner is not in a VPC (No VpcConfig in the package.json). Publish and deploy the checksum runner.

7. Running the checksum

You can either wait for the checksum to run from the cron time set, or you can force it to run through botmon. Once the bot runs once, when you open it up in botmon, the checksum tab will appear and you can see the current status, if it's running, or the results from the last run.

Custom Connector

If one of your endpoints is not a database (e.g. API), or not a database we support, you can create a custom connector using basicConnector. A custom connector differs from a database connector in that you don't create a specific connector for it, instead you add this directly into the runner.

You can create a custom connector either as the master and use a database connector for the slave; use a database connector for the master and a custom connector for the slave; or use a custom connect for both the master and the slave.

Example of the structure of a basic connector
let customConnector = checksum.basicConnector('< Checksum name >', {
    id_column: 'id'
}, {
    // custom handlers go here
});

Now add the handlers to handle the data.

Available handlers for a master connector

Required handlers
  • batch
  • individual
  • range
  • nibble
  • delete
Optional handlers
  • sample (required if sample is set to true)
  • initialize
  • destroy

Available handlers for a slave connector

Required handlers
  • batch
  • individual
  • delete
Optional handlers
  • sample (required if sample is set to true)
  • initialize
  • destroy
  • range
  • nibble

Handlers

Initialize

Called when checksum starts (does not include restarts after a lambda 5-minute timeout)

/**
 * Called when checksum starts.
 * Used for situations such as when your endpoint requires authorization.
 * Called with data, return a session
 */
initialize: function(data) {
    return Promise.resolve({});
}
Range

Called after initialize. Range gets the max and min id's, as well as the total number of id's. This is stored in the session until the checksum completes. Each restart of checksum after a lambda timeout will use the range stored in the session.

/**
 * @int start
 * @int end
 * @object options (optional)
 * @return object {min: int, max: int, total: int}
 */
// Respond to start and end -- options
// Return object with min, max, total
range: function(start, end) {
    let min = null;
    let max = null;
    let total = 0;
    
    /************************************************
    * Begin example code to get min, max, and total.*
    * This example loops through records returned   *
    * into “db” and creates a start and end from    *
    * the greatest and least id’s.                  *
    *************************************************/
    // db: object containing records to compare
    let db = [{id: 1, name: 'foo', etc: 'etc'}, {...}];
    Object.keys(db).map(id => {
        id = db[id][this.settings.id_column];
        if ((start === undefined || id >= start) && (end === undefined || id <= end)) {
            total++;
            if (min == null || id < min) {
                min = id;
            }
            if (max == null || id > max) {
                max = id;
            }
        }
    });
    /**********************************************
    * End example code to get min, max, and total *
    ***********************************************/
    
    // return a min, max and total
    return Promise.resolve({
        min,
        max,
        total
    });
}
Batch

Gets a chunk of data between a specified start and end to compare against a master or slave set of data.

/**
 * Respond to a start and end, and build an array of data returned into “db”
 * 
 * @int start
 * @int end
 * @return mixed (Stream|Array|hash)
 */
batch: function(start, end) {
    let data = [];
    // db: object containing records to compare
    let db = [{id: 1, name: 'foo', etc: 'etc'}, {...}];
    
    /***********************************************************************************
     * Example code to put together an array of data using the data returned from “db” *
     ***********************************************************************************/
    for (v of db) {
        data.push(v);
    }
    
    /**********************************************************************************************************
    * Alternatively, if you cannot pass in a start and end and just get a chunk of data back, build an array *
    * with the data having id’s between start and end                                                         *
    ***********************************************************************************************************/
    for (let i = start; i <= end; i++) {
        if (typeof db[i] !== 'undefined') {
            data.push(db[i]);
        }
    }
    
    // return the array of data
    return Promise.resolve(data);
}
Nibble
/**
 * Nibble handler: Uses a start, end, limit, and reverse; and gives a “next” and “current” to continue checking data
 * @int start
 * @int end
 * @int limit
 * @bool reverse
 */
// Responds to start, end, limit, reverse
// Returns object with next, current
nibble: function(start, end, limit, reverse) {
    // db: object containing records to compare
    let db = [{id: 1, name: 'foo', etc: 'etc'}, {...}];
    let current = null;
    let next = null;
    let dir = 1;
    let ostart = start;
    let oend = end;
    if (reverse) {
        start = end;
        end = ostart;
        dir = -1;
    }
    let cnt = 0;
    for (let i = start; i >= ostart && i <= oend; i += dir) {
        if (typeof db[i] !== undefined) {
            let v = db[i];
            cnt++;
            if (cnt >= limit) {
                if (!current) {
                    current = v[this.settings.id_column];
                } else {
                    next = v[this.settings.id_column];
                    break;
                }
            }
        }
    }

    return Promise.resolve({
        current,
        next
    });
}
Individual

The code required is the same as “batch” above. If you have created batch, just call batch in the return and you're done here. If you don't already have batch, follow the example for batch, but use individual.

individual: function(start, end) {
    return this.batch(start, end);
}
Delete

Delete records in the slave database that do not exist in the master database. This only runs if shouldDelete is set to true.

delete: function(ids) {
    // db: object containing records to compare
    let db = [{id: 1, name: 'foo', etc: 'etc'}, {...}];
    ids.map(id => {
        if (id in db) {
            delete db[id];
        }
    });
    return Promise.resolve();
}
Sample

Used to return a sample of ID that are different between the master and slave.

// Respond to ids
// Return Stream, Array
sample: function(ids) {
    let data = [];
    // db: object containing records to compare
    let db = [{id: 1, name: 'foo', etc: 'etc'}, {...}];
    ids.map(id => {
        let v = db[id];
        if (v !== undefined) {
            data.push(v);
            console.log(v);
        }
    });

    return Promise.resolve(data);
}
Destroy

Destroy runs once on checksum completion. Use this if you need to shutdown a session or add additional logging.

destroy: function(data) {
    return Promise.resolve();
}

Create a Domain Object Loader bot

Example code
// include the required handlers
const leo = require("leo-sdk");
const helperFactory = require("leo-connector-common/botHelper");
// config requires leo-sdk 2.x
const config = require('leo-config');
// use the connector for your database type:
const connector = require('leo-connector-<dbtype>');

// use this handler for leo-sdk 2.x
exports.handler = require("leo-sdk/wrappers/cron.js")(async function(event, context, callback) {
    
// use this handler for leo-sdk 1.x    
exports.handler = async function(event, context, callback) {
    
    // create the helper    
    const helper = new helperFactory(event, context, leo);

Build domain objects

Example:

"use strict";

const leo = require("leo-sdk");
const ls = require("leo-streams");
const config = require("leo-config");
const connector = require('leo-connector-mysql');

exports.handler = require("leo-sdk/wrappers/cron.js")(async function (event, context, callback) {

    let source = 'system:mysql';
    let destination = 'people';
    // these settings are specific to snapshot. 
    let snapshotSettings = {
        table: 'people', // table name that has our primary keys
        pk: 'id', // primary key of the table we're using
        limit: 5000, // max number of id's to insert into the select query
    };

    let domainObjectBuilder = connector.domainObjectBuilder(config.db);
    let snapshotter = connector.snapshotter(config.db);

    // translate id's into id's specific to the domain we're creating.
    // In the translation object below, we're going to get people.id from an array of people ids, as well as an array of contact ids.
    let tableIdTranslations = {
        people: true, // No Translation needed. Expects id's to be in format like this: {payload.[update.]people[1, 2, 3, 5, 25, 35, 49, 50, etc…]}
        contacts: data => `SELECT people_id FROM contacts WHERE id in (?)`, // Gets the people.id from the contacts table from a format like this: {payload.[update.]contacts[1, 2, 3, 5, 25, 35, 49, 50, etc…]}
    };

    let domainObject = new domainObjectBuilder.DomainObject(
        // for mysql, c.database contains the database name
        c => `SELECT
            t.id as _domain_id,
            t.*,
            'test' as myField
            FROM people AS t
            WHERE t.id IN (?)`
    );
    
    // if you have a one-to-many to join:
    domainObject.hasMany("sub_object", c => `SELECT related_table.pk as _domain_id, related_table.* from related_table where pk IN (?)`); // ? will be filled with the object primary key as laid out in the tableIdTranslation

    //Advanced Settings
    // if we're creating a snapshot
    if (process.env.snapshot) {
        ls.pipe(
            // read through primary keys
            snapshotter.read(snapshotSettings),

            // transform keys into domain objects
            dol.domainObjectTransform(domainObject),

            // load into S3, toLeo, and checkpoint
            snapshotter.write(context.botId, destination),

            // end
            err => {
                callback(err);
            }
        );
    } else {
        // regular domain object load
        ls.pipe(
            leo.read(context.botId, source),

            dol.translateIds(tableIdTranslations),
            dol.domainObjectTransform(domainObject),

            leo.load(context.botId, destination),
            (err) => {
                callback(err);
            }
        );
    }
    //End Advanced Settings
});

The query in the examples above is what builds the domain object. Here is an example of how to build a domain object and what the various parts of the query do to build it.

Query example
let query = `SELECT a.id
        , a.name
        , a.foo_id
        , '' AS prefix_Foo
        , b.id
        , b.name
    FROM tableOne a
    JOIN tableTwo b ON (b.id = a.foo_id)    
    WHERE a.id IN (?)`;
Output example
{
    "id": 1,
    "name": "test",
    "foo_id": 5,
    "Foo": {
        "id": 5,
        "name": "bar"
    }
}

Support

Want to hire an expert, or need technical support? Reach out to the Leo team: https://leoinsights.com/contact

connectors's People

Contributors

burleyb avatar chamont avatar handyman avatar jtmckay avatar roaringdev avatar seriouscoderone avatar sreader1414 avatar zirkerc avatar

Stargazers

 avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

connectors's Issues

mysql listener: idle mysql database causes rogue listener

Description:
While working on a dev database with no changes happening in the db, the listener checkpoint gets stuck on a binlog file that doesn't exist anymore. This causes a rogue that makes it hard to develop because you have to reset the checkpoint every hour or so to restart the listener.

How to reproduce:

  1. If I do show master logs; I get this:
    Log_name File_size
    mysql-bin-changelog.238847 578
    mysql-bin-changelog.238848 578
    mysql-bin-changelog.238849 578
    mysql-bin-changelog.238850 521

  2. I set the checkpoint to start at the earliest log position that exists. mysql-bin-changelog.238847::0. Then restart the bot.

  3. If I make a change to a table that is being listened to, the listener works as expected and catches up to the last log location. The checkpoint points to a location in the last file mysql-bin-changelog.238850

  4. After an amount of time, the listener goes into rogue status with the error.
    Error: ER_MASTER_FATAL_ERROR_READING_BINLOG: Could not find first log file name in binary log index file at Binlog.Sequence._packetToError (/var/task/index.js:84094:14) at Binlog.Sequence.ErrorPacket (/var/task/index.js:84143:17) at Protocol._parsePacket (/var/task/index.js:79237:23) at Parser.write (/var/task/index.js:78556:12) at Protocol.write (/var/task/index.js:78997:16) at Socket. (/var/task/index.js:76656:28) at emitOne (events.js:116:13) at Socket.emit (events.js:211:7) at addChunk (_stream_readable.js:263:12) at readableAddChunk (_stream_readable.js:250:11) -------------------- at Protocol._enqueue (/var/task/index.js:79103:48) at Immediate._start [as _onImmediate] (/var/task/index.js:20906:29) at runCallback (timers.js:794:20) at tryOnImmediate (timers.js:752:5) at processImmediate [as _immediateCallback] (timers.js:729:5) code: 'ER_MASTER_FATAL_ERROR_READING_BINLOG', errno: 1236, sqlMessage: 'Could not find first log file name in binary log index file', sqlState: 'HY000'

  5. Doing show master logs; indicates that the checkpoint location points to a file that has already been rotated out and deleted.

Expected behavior:
Our install is mysql setup on RDS with a standard configuration. It rotates files pretty quickly, even though there is no activity. I realize that I can change mysql parameters to not rotate and delete as quickly, and that this wouldn't be an issue on production. But I would expect the listener not to get stuck regardless of inactivity... Among other issues, it wastes a lot of time during development to have to reset the checkpoint.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.