Giter Club home page Giter Club logo

dataflowsme-python's Introduction

Tutorial for Cloud Dataflow

This is a collection of tutorial-style Dataflow exercises based on the Dataflow gaming example and inspired by the Beam tutorial.

In the gaming scenario, many users play a fictional game, as members of different teams, over the course of a day, and their events are logged for processing.

The exercises either read the batch data from CSV files on GCS or the streaming data from a PubSub topic (generated by the included Injector program). All exercises write their output to BigQuery.

Set up your environment

Tools

  1. Download and install Git.
  2. Install Python 2.7. If you use Linux, install it through apt or yum. Otherwise download it from the website.
  3. Install virtualenv.
  4. Install Google Cloud SDK.
  5. Install an IDE that supports Python (optional).

Set up a project

  1. Go to https://cloud.google.com/console.
  2. Enable billing and create a project. For this project:
    1. Enable Google Dataflow API and BigQuery API.
    2. Create a GCS bucket and two folders inside the bucket:
      1. A staging folder.
      2. A temp folder.
    3. Create a BigQuery dataset to store the results.

Prepare your environment

  1. Create your SME environment. Open a shell (terminal or console) and cd into your home folder. There, run:

    $ virtualenv dataflowsme
    $ source dataflowsme/bin/activate
    (dataflowsme) $
  2. Authenticate to Google Cloud using the gcloud command and set the default credentials and default project. You will need to replace YOUR-PROJECT-ID with the id of the project you created before:

    $ gcloud auth login
    $ gcloud auth application-default login
    $ gcloud config set project YOUR-PROJECT-ID
  3. Get the project name with Gcloud and set it as an env variable:

    $ export PROJECT=`gcloud config get-value project`
  4. Set other environment variables

    $ export STAGING_FOLDER=gs://<path of the bucket and staging folder that you created before>
    $ export TEMP_FOLDER=gs://<path of the bucket and temp folder that you created before>
    $ export BIGQUERY_DATASET=<name of the dataset that you created before>
    $ export USER=`whoami`

Download the code

Clone the github repository

$ git clone https://github.com/nahuellofeudo/DataflowSME-Python.git
$ cd DataflowSME-Python

Exercise 0 (prework)

Goal: Use the provided Dataflow pipeline to import the input events from a file in GCS to BigQuery and run simple queries on the result.

Procedure:

  1. Compile and run the pipeline:

    $ python2.7 exercise0.py \
        --project=$PROJECT \
        --setup_file=./setup.py \
        --input=gs://dataflow-samples/game/gaming_data1.csv \
        --output_dataset=$BIGQUERY_DATASET \
        --output_table_name=events \
        --runner=DataflowRunner \
        --temp_location=$TEMP_FOLDER \
        --staging_location=$STAGING_FOLDER 
  2. Open https://console.cloud.google.com and navigate to the Dataflow UI.

  3. Once the pipeline finishes (should take about 15-20 minutes), the Job Status on the UI changes to Succeeded.

  4. After the pipeline finishes, check the value of ParseGameEvent/ParseErrors aggregator on the UI. Scroll down in the Summary tab to find it.

  5. Check the number of distinct teams in the created BigQuery table.

    $ bq query --project_id=$PROJECT \
        "select count(distinct team) from $BIGQUERY_DATASET.events;"

Exercise 1

Goal: Use Dataflow to calculate per-user scores and write them to BigQuery.

Procedure

  1. Modify exercise1.py

  2. Run the pipeline (using Direct runner):

    $ python2.7 exercise1.py \
               --project=$PROJECT \
               --setup_file=./setup.py \
               --input=gs://dataflow-sme-tutorial/gaming_data0.csv \
               --output_dataset=$BIGQUERY_DATASET \
               --output_table_name=user_scores \
               --runner=DirectRunner \
               --temp_location=$TEMP_FOLDER \
               --staging_location=$STAGING_FOLDER 
  3. Once the pipeline finishes successfully check the score for 'user0_AmberDingo':

    $ bq query --project_id=$PROJECT \
        "select total_score from $BIGQUERY_DATASET.user_scores \
         where user = \"user0_AmberDingo\";"
  4. Rerun the pipeline on the Dataflow service, but remove the BigQuery table first:

    $ bq rm --project_id=$PROJECT $BIGQUERY_DATASET.user_scores

    and then execute the above mvn command with

        --runner=DataflowRunner

Exercise 2

Goal: Use Dataflow to calculate per-hour team scores and write them to BigQuery.

Procedure:

  1. Modify exercise2.py

  2. Run the pipeline:

    $ python2.7 exercise2.py \
                     --project=$PROJECT \
                     --setup_file=./setup.py \
                     --input=gs://dataflow-sme-tutorial/gaming_data0.csv \
                     --output_dataset=$BIGQUERY_DATASET \
                     --output_table_name=hourly_team_scores \
                     --runner=DataflowRunner \
                     --temp_location=$TEMP_FOLDER \
                     --staging_location=$STAGING_FOLDER
  3. Once the pipeline finishes successfully check the score for team 'AmberDingo':

    $ bq query --project_id=$PROJECT \
        "select total_score from $BIGQUERY_DATASET.hourly_team_scores \
         where team = \"AmberDingo\" and window_start = \"2017-03-18 16:00:00 UTC\";"

Exercise 3

Goal: Convert the previous pipeline to run in streaming mode.

First, you need to set up the injector to publish scores via PubSub.

  1. Create and download a JSON key for Google Application Credentials. See instructions. Make sure that the key's account has at least the following role:

    • Pub/Sub --> Editor
  2. Open a second terminal window. In this terminal run the commands listed in steps 2, 3 and 4 of the section "Prepare your enviroment" to set the same variables as in the first terminal (you do not need to do step 1).

  3. In the new terminal set the new credentials by running:

    $ export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials-key.json
  4. Create a new topic:

    $ gcloud pubsub topics create game_events_$USER
  5. In the second terminal run the injector:

    $ python2.7 utils/injector.py $PROJECT game_events_$USER none

Now complete the exercise so that it runs the pipeline from Exercise 2 in either batch or streaming mode.

Procedure:

  1. Modify exercise3.py

  2. Run the pipeline in batch mode:

    $ python2.7 exercise3.py \
                        --project=$PROJECT \
                        --setup_file=./setup.py \
                        --input=gs://dataflow-sme-tutorial/gaming_data0.csv \
                        --output_dataset=$BIGQUERY_DATASET \
                        --output_table_name=streaming_team_scores \
                        --runner=DataflowRunner \
                        --temp_location=$TEMP_FOLDER \
                        --staging_location=$STAGING_FOLDER
  3. Once the pipeline finishes successfully check the score for team 'AmberDingo':

    $ bq query --project_id=$PROJECT \
        "select window_start, total_score from $BIGQUERY_DATASET.streaming_team_scores \
              where team = \"AmberDingo\";""
  4. Delete the table so that the streaming job can create a new one:

    $ bq rm streaming_team_scores
  5. Run the pipeline in streaming mode (make sure that the injector is still running first!):

    $ python2.7 exercise3.py \
                        --project=$PROJECT \
                        --setup_file=./setup.py \
                        --topic=projects/$PROJECT/topics/game_events_$USER \
                        --output_dataset=$BIGQUERY_DATASET \
                        --output_table_name=streaming_team_scores \
                        --runner=DataflowRunner \
                        --temp_location=$TEMP_FOLDER \
                        --staging_location=$STAGING_FOLDER \
                        --streaming
  6. Once the pipeline starts, let it run for approximately 5 to 10 minutes. Then stop (cancel) the job.

  7. Check the new scores. Since teams and windows are dynamically generated we can't just query for a single team so we query the whole history:

    $ bq query --project_id=$PROJECT \
        "select team, window_start, total_score from $BIGQUERY_DATASET.streaming_team_scores \
         order by window_start desc;"

dataflowsme-python's People

Contributors

nahuellofeudo avatar

Forkers

luxiangu

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.