Giter Club home page Giter Club logo

spark-connector-test's Introduction

A step to step guide on how to use the Pulsar Spark Connector

The Pulsar Spark Connector is open source on July 9, 2019. See the source code and user guide here.

Environment

The following example uses the Homebrew package manager to download and install software on macOS, and you can choose other package managers based on your own requirements and operating system.

  1. Install Homebrew.
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
  1. Install Java 8 or a higher version.

    This example uses Homebrew to install JDK8.

brew tap adoptopenjdk/openjdk
brew cask install adoptopenjdk8
  1. Install Apache Spark 2.4.0 or higher.

    From the official website download Spark 2.4.3 and decompress.

tar xvfz spark-2.4.3-bin-hadoop2.7.tgz
  1. Download Apache Pulsar 2.4.0.

    From the official website download Pulsar 2.4.0.

wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/apache-pulsar-2.4.0-bin.tar.gz
tar xvfz apache-pulsar-2.4.0-bin.tar.gz
  1. Install Apache Maven.
brew install maven
  1. Set up the development environment.

    This example creates a Maven project called connector-test.

  (1) Build a framework for a Scala project using archetype provided by Scala Maven Plugin.

mvn archetype:generate

In the list that appears, select the latest version of net.alchim31.maven:scala-archetype-simple, which is currently 1.7, and specify groupId, artifactId, and version for the new project.   This example uses:

groupId: com.example
artifactId: connector-test
version: 1.0-SNAPSHOT

After the above steps, a Maven Scala project framework is basically set up.

 (2) Introduce Spark, Pulsar Spark Connector dependencies in pom.xml under the project root directory, and use maven_shade_plugin for project packaging.

    a. Define the version information of the dependent package.

  <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.12</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <spark.version>2.4.3</spark.version>
        <pulsar-spark-connector.version>2.4.0</pulsar-spark-connector.version>
        <spec2.version>4.2.0</spec2.version>
        <maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
  </properties>

    b. Introduce Spark, Pulsar Spark Connector dependencies.

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-catalyst_${scala.compat.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>io.streamnative.connectors</groupId>
        <artifactId>pulsar-spark-connector_${scala.compat.version}</artifactId>
        <version>${pulsar-spark-connector.version}</version>
    </dependency>

    c. Add a Maven repository that contains pulsar-spark-connector.

    <repositories>
      <repository>
        <id>central</id>
        <layout>default</layout>
        <url>https://repo1.maven.org/maven2</url>
      </repository>
      <repository>
        <id>bintray-streamnative-maven</id>
        <name>bintray</name>
        <url>https://dl.bintray.com/streamnative/maven</url>
      </repository>
    </repositories>

      d. Package the sample class with pulsar-spark-connector using maven_shade_plugin.

    <plugin>
          <!-- Shade all the dependencies to avoid conflicts -->
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>${maven-shade-plugin.version}</version>
          <executions>
            <execution>
              <phase>package</phase>
              <goals>
                <goal>shade</goal>
              </goals>
              <configuration>
                <createDependencyReducedPom>true</createDependencyReducedPom>
                <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
                <minimizeJar>false</minimizeJar>

                <artifactSet>
                  <includes>
                    <include>io.streamnative.connectors:*</include>
                  </includes>
                </artifactSet>
                <filters>
                  <filter>
                    <artifact>*:*</artifact>
                    <excludes>
                      <exclude>META-INF/*.SF</exclude>
                      <exclude>META-INF/*.DSA</exclude>
                      <exclude>META-INF/*.RSA</exclude>
                    </excludes>
                  </filter>
                </filters>
                <transformers>
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                  <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
                </transformers>
              </configuration>
            </execution>
          </executions>
        </plugin>

Read from and write to Pulsar in Spark programs

The project in the example includes the following programs:

  1. Read the data from Pulsar (name the app StreamRead).
  2. Write the data to Pulsar (name the app BatchWrite).

Build a stream processing job to read data from Pulsar

  1. In StreamRead, create SparkSession.
val spark = SparkSession
    .builder()
    .appName("data-read")
    .config("spark.cores.max", 2)
    .getOrCreate()
  1. In order to connect to Pulsar, you need to specify service.url and admin.url when building DataFrame and specify the topic to be read.
val ds = spark.readStream
    .format("pulsar")
    .option("service.url", "pulsar://localhost:6650")
    .option("admin.url", "http://localhost:8088")
    .option("topic", "topic-test")
    .load()
ds.printSchema()  // print schema information of `topic-test`, as a validation step.
  1. Output ds to the console to start the job execution.
val query = ds.writeStream
    .outputMode("append")
    .format("console")
    .start()
query.awaitTermination()

Write data to Pulsar

  1. Similarly, in BatchWrite, first create SparkSession.
val spark = SparkSession
    .builder()
    .appName("data-sink")
    .config("spark.cores.max", 2)
    .getOrCreate()
  1. Create a list of 1-10 and convert it to a Spark Dataset and write to Pulsar.
import spark.implicits._
spark.createDataset(1 to 10)
    .write
    .format("pulsar")
    .option("service.url", "pulsar://localhost:6650")
    .option("admin.url", "http://localhost:8088")
    .option("topic", "topic-test")
    .save()

Running the program

First configure and start the single-node cluster of Spark and Pulsar, then package the sample project, and submit two jobs through spark-submit respectively, and finally observe the execution result of the program.

  1. Modify the log level of Spark (optional).
  cd ${spark.dir}/conf
  cp log4j.properties.template log4j.properties

  In the text editor, change the log level to WARN .

  log4j.rootCategory=WARN, console
  1. Start the Spark cluster.
cd ${spark.dir}
sbin/start-all.sh
  1. Modify the Pulsar WebService port to 8088 (edit ${pulsar.dir}/conf/standalone.conf) to avoid conflicts with the Spark port.
webServicePort=8088
  1. Start the Pulsar cluster.
bin/pulsar standalone
  1. Package the sample project.
cd ${connector_test.dir}
mvn package
  1. Start StreamRead to monitor data changes in topic-test.
${spark.dir}/bin/spark-submit --class com.example.StreamRead --master spark://localhost:7077 ${connector_test.dir}/target/connector-test-1.0-SNAPSHOT.jar
  1. In another terminal window, start BatchWrite to write a 1-10 digit to topic-test at a time.
${spark.dir}/bin/spark-submit --class com.example.BatchWrite --master spark://localhost:7077 ${connector_test.dir}/target/connector-test-1.0-SNAPSHOT.jar
  1. At this point, you can get a similar output in the terminal where StreamRead is located.
root
 |-- value: integer (nullable = false)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)

Batch: 0
+-----+-----+-------+-----------+-------------+-----------+
|value|__key|__topic|__messageId|__publishTime|__eventTime|
+-----+-----+-------+-----------+-------------+-----------+
+-----+-----+-------+-----------+-------------+-----------+

Batch: 1
+-----+-----+--------------------+--------------------+--------------------+-----------+
|value|__key|             __topic|         __messageId|       __publishTime|__eventTime|
+-----+-----+--------------------+--------------------+--------------------+-----------+
|    6| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|    7| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|    8| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|    9| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|   10| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|    1| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
|    2| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
|    3| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
|    4| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
|    5| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
+-----+-----+--------------------+--------------------+--------------------+-----------+

So far, we've started a Pulsar and a Spark, built the framework of the sample project, and used the Pulsar Spark Connector to read data from pulsar and write data to pulsar. Get a final result in spark at last.

spark-connector-test's People

Contributors

yjshen avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

spark-connector-test's Issues

write data into pulsar,then requirement failed: Literal must have a corresponding value to string, but class String found.

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Literal must have a corresponding value to string, but class String found.


def main(args: Array[String]): Unit = {

val sparkSession = SparkSession.builder().appName("test-pulsar").master("local").getOrCreate()
 
val startingOffsets = topicOffsets(Map("persistent://public/default/my-topic" -> MessageId.fromByteArray(Array(8,33,16,8))))

import sparkSession.implicits._
val ds = sparkSession.createDataset(1 to 10)
   .write
   .format("pulsar")
   .option("service.url", "pulsar://localhost:6650")
   .option("admin.url", "http://localhost:8080")
   .option("topic", "persistent://public/default/my-topic-2")
   .save()


sparkSession.stop()

}


Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.