Giter Club home page Giter Club logo

pulsar-flink's Introduction

Pulsar Flink Connector

The Pulsar Flink connector implements elastic data processing using Apache Pulsar and Apache Flink.

如果需要阅读中文文档,请点击此处

Prerequisites

  • Java 8 or higher version
  • Flink 1.13.0 or higher version
  • Pulsar 2.8.0 or higher version

Basic information

This section describes basic information about the Pulsar Flink connector.

Client

We change our project version definition, the Flink & Pulsar supporting matrix is here.

Flink version Pulsar client version (or above) Connector branch
1.11.x 2.6.x release-1.11
1.12.x 2.7.x release-1.12
1.13.x 2.8.x release-1.13
1.14.x 2.9.x release-1.14

Note
Since Flink's API changed greatly through different versions, we mainly work on new features for the latest released flink version and fix bugs for old release.

The old release (prior 1.10.x) is no longer maintained. Users who used old flink is recommend to upgrade to 1.11.

Version definitions

Since the JAR package to Maven central, you can use this connector by using Maven, Gradle, or sbt. There are two types of connector, the pulsar-flink-connector_2.11 for Scala 2.11, and the pulsar-flink-connector_2.12 for Scala 2.12. This naming style is the same as Flink. The version of this project is in a four-part form, the first three part is the relying Flink version, and the last part is the patching version for connector.

This version definition is simple for users to choose right connector. We do not shade the pulsar-client-all to the Distro. Instead, we just use the Maven dependency. You can override the dependent pulsar-client-all as long as its version is higher than the one listed in the supporting matrix.

Maven projects

For Maven projects, add the following dep to your pom. scala.binary.version is following the flink dependency style, you can add it in your pom properties field. ${pulsar-flink-connector.version} can be changed to your desired version, or defined it in pom properties field.

<dependency>
    <groupId>io.streamnative.connectors</groupId>
    <artifactId>pulsar-flink-connector_${scala.binary.version}</artifactId>
    <version>${pulsar-flink-connector.version}</version>
</dependency>

For Maven projects, you can use the following shade plugin definition template to build an application JAR package that contains all the dependencies required for the client library and Pulsar Flink connector.

<plugin>
  <!-- Shade all the dependencies to avoid conflicts -->
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <version>${maven-shade-plugin.version}</version>
  <executions>
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <createDependencyReducedPom>true</createDependencyReducedPom>
        <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
        <minimizeJar>false</minimizeJar>

        <artifactSet>
          <includes>
            <include>io.streamnative.connectors:*</include>
            <include>org.apache.pulsar:*</include>
            <!-- more libs to include here -->
          </includes>
        </artifactSet>
        <filters>
          <filter>
            <artifact>*:*</artifact>
            <excludes>
              <exclude>META-INF/*.SF</exclude>
              <exclude>META-INF/*.DSA</exclude>
              <exclude>META-INF/*.RSA</exclude>
            </excludes>
          </filter>
        </filters>
        <transformers>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
          <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>

Gradle projects

For Gradle projects, make sure maven central is added to your build.gradle, as shown below.

repositories {
    mavenCentral()
}

For gradle projects, you can use the following shade plugin definition template to build an application JAR package that contains all the dependencies required for the client library and Pulsar Flink connector.

buildscript {
     dependencies {
         classpath 'com.github.jengelman.gradle.plugins:shadow:6.0.0'
     }
}

apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'java'

Build Pulsar Flink connector

To build the Pulsar Flink connector for reading data from Pulsar or writing the results to Pulsar, follow these steps.

  1. Check out the source code.

    git clone https://github.com/streamnative/pulsar-flink.git
    cd pulsar-flink
  2. Install the Docker.

    The Pulsar Flink connector uses Testcontainers for integration test. To run the integration test, ensure to install the Docker. For details about how to install the Docker, see here.

  3. Set the Java version.

    Modify java.version and java.binary.version in pom.xml.

    Note
    Ensure that the Java version should be identical to the Java version for the Pulsar Flink connector.

  4. Build the project.

    mvn clean install -DskipTests
  5. Run the test.

    mvn clean install

After the Pulsar Flink connector is installed, a JAR package that contains all the dependencies is generated in both the local Maven repository and the target directory.

Deploy Pulsar Flink connector

This section describes how to deploy the Pulsar Flink connector.

Client library

For any Flink application, use the ./bin/flink run command to compile and start your application.

If you have already built a JAR package with dependencies using the above shade plugin, you can use the --classpath option to add your JAR package.

Note
The path must be in a protocol format (such as file://) and the path must be accessible on all nodes.

Example

./bin/flink run -c com.example.entry.point.ClassName file://path/to/jars/your_fat_jar.jar

Scala REPL

The Scala REPL is a tool (scala) for evaluating expressions in Scala. Use the bin/start-scala-shell.sh command to deploy Pulsar Flink connector on Scala client. You can use the --addclasspath to add pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar package.

Example

./bin/start-scala-shell.sh remote <hostname> <portnumber>
 --addclasspath pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar

For more information on submitting applications through the CLI, see Command-Line Interface .

SQL client

The SQL Client is used to write SQL queries for manipulating data in Pulsar, you can use the -addclasspath option to add pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar package.

Example

./bin/sql-client.sh embedded --jar pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar

Note
If you put the JAR package of our connector under $FLINK_HOME/lib, do not use --jar again to specify the package of the connector.

By default, to use the Pulsar directory in the SQL client and register it automatically at startup, the SQL client reads its configuration from the ./conf/sql-client-defaults.yaml environment file. You need to add the Pulsar catalog to the catalogs section of this YAML file, as shown below.

catalogs:
- name: pulsarcatalog
    type: pulsar
    default-database: tn/ns
    service-url: "pulsar://localhost:6650"
    admin-url: "http://localhost:8080"
    format: json

Usage

This section describes how to use the Pulsar Flink connector in the stream environment and table environment.

Stream environment

This section describes how to use the Pulsar Flink connector in the stream environment.

Source

In Pulsar Flink, the Pulsar consumer is called FlinkPulsarSource<T>. It accesses to one or more Pulsar topics.

Its constructor method has the following parameters.

  • serviceUrl (service address) and adminUrl (administrative address): they are used to connect to the Pulsar instance.
  • PulsarDeserializationSchema<T>: when the FlinkPulsarSource is used, you need to set the PulsarDeserializationSchema<T> parameter.
  • Properties: it is used to configure the behavior of the Pulsar consumer, including the topic, topics, and topicsPattern options. The topic, topics, or topicsPattern option is used to configure information about the topic to be consumed. You must set a value for it. (The topics parameters refers to multiple topics separated by a comma (,), and the topicsPattern parameter is a Java regular expression that matches a number of topics.)
  • setStartFromLatest, setStartFromEarliest, setStartFromSpecificOffsets, or setStartFromSubscription: these parameters are used to configure the consumption mode. When the setStartFromSubscription consumption mode is configured, the checkpoint function must be enabled.

Example

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("topic", "test-source-topic");
props.setProperty("partition.discovery.interval-millis", "5000");

FlinkPulsarSource<String> source = new FlinkPulsarSource<>(serviceUrl, adminUrl, PulsarDeserializationSchema.valueOnly(new SimpleStringSchema()), props);

// or setStartFromLatest、setStartFromSpecificOffsets、setStartFromSubscription
source.setStartFromEarliest(); 

DataStream<String> stream = see.addSource(source);

// chain operations on dataStream of String and sink the output
// end method chaining

see.execute();

Sink

The Pulsar producer uses the FlinkPulsarSink instance. It allows to write record streams to one or more Pulsar topics.

Example

PulsarSerializationSchema<Person> pulsarSerialization = new PulsarSerializationSchemaWrapper.Builder<>(JsonSer.of(Person.class))
    .usePojoMode(Person. class, RecordSchemaType.JSON)
    .setTopicExtractor(person -> null)
    .build();
FlinkPulsarSink<Person> sink = new FlinkPulsarSink(
    serviceUrl,
    adminUrl,
    Optional.of(topic), // mandatory target topic or use `Optional.empty()` if sink to different topics for each record
    props,
    pulsarSerialization
);

stream.addSink(sink);

PulsarDeserializationSchema

PulsarDeserializationSchema is a connector-defined Flink DeserializationSchema wrapper that allows flexible manipulation of Pulsar messages.

PulsarDeserializationSchemaWrapper is a simple implementation of PulsarDeserializationSchema with two parameters: Flink DeserializationSchema and information about the decoded message type.

PulsarDeserializationSchemaWrapper(new SimpleStringSchema(),DataTypes.STRING())

Note
The DataTypes type comes from Flink's table-common module.

PulsarSerializationSchema

PulsarSerializationSchema is a wrapper for Flink SerializationSchema that provides more functionality. In most cases, users do not need to implement PulsarSerializationSchema by themselves. PulsarSerializationSchemaWrapper is provided to wrap a Flink SerializationSchema as PulsarSerializationSchema.

PulsarSerializationSchema uses the builder pattern and you can call setKeyExtractor or setTopicExtractor to extract the key and customize the target topic from each message.

In particular, since Pulsar maintains its own Schema information internally, our messages must be able to export SchemaInfo when they are written to Pulsar. The useSpecialMode, useAtomicMode, usePojoMode, and useRowMode methods help you quickly build the Schema information required for Pulsar. You must choose one of these four modes.

  • SpecialMode: specify the Schema<?> mode directly. Ensure that this Schema is compatible with the Flink SerializationSchema setting.
  • AtomicMode: For some atomic types, pass the type of AtomicDataType, such as DataTypes.INT(), which corresponds to Schema<Integer> in Pulsar.
  • PojoMode: you need to pass a custom class object and either JSON or Arvo Schema to specify how to build a composite type Schema, such as usePojoMode(Person.class, RecordSchemaType.JSON).
  • RowMode: in general, it is used for our internal Table&SQL API implementation.

Fault tolerance

With Flink's checkpoints being enabled, FlinkPulsarSink can provide at-least-once and exactly-once delivery guarantees.

In addition to enabling checkpoints for Flink, you should also configure setLogFailuresOnly(boolean) and setFlushOnCheckpoint(boolean) parameters.

Note
setFlushOnCheckpoint(boolean): by default, it is set to true. When it is enabled, writing to Pulsar records is performed at this checkpoint snapshotState. This ensures that all records before the checkpoint are written to Pulsar. And, at-least-once setting must also be enabled.

Table environment

The Pulsar Flink connector supports all the Table features, as listed below.

  • SQL and DDL
  • Catalog

SQL and DDL

The following section describes SQL configurations and DDL configurations.

SQL configurations

CREATE TABLE pulsar (
  `physical_1` STRING,
  `physical_2` INT,
  `eventTime` TIMESTAMP(3) METADATA,
  `properties` MAP<STRING, STRING> METADATA ,
  `topic` STRING METADATA VIRTUAL,
  `sequenceId` BIGINT METADATA VIRTUAL,
  `key` STRING ,
  `physical_3` BOOLEAN
) WITH (
  'connector' = 'pulsar',
  'topic' = 'persistent://public/default/topic82547611',
  'key.format' = 'raw',
  'key.fields' = 'key',
  'value.format' = 'avro',
  'service-url' = 'pulsar://localhost:6650',
  'admin-url' = 'http://localhost:8080',
  'scan.startup.mode' = 'earliest' 
)

INSERT INTO pulsar 
VALUES
 ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k11', 'v11', 'k12', 'v12'], 'key1', TRUE),
 ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', MAP['k21', 'v21', 'k22', 'v22'], 'key2', FALSE),
 ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k31', 'v31', 'k32', 'v32'], 'key3', TRUE)
 
SELECT * FROM pulsar

SQL supports configuring physical fields, calculated columns, watermark, METADATA and other features.

DDL configurations

Parameter Default value Description Required or not
connector null Set the connector type. Available options are pulsar and upsert-pulsar. Yes
topic null Set the input or output topic, use half comma for multiple and concatenate topics. Choose one with the topic-pattern. No
topic-pattern null Use regular to get the matching topic. No
service-url null Set the Pulsar broker service address. Yes
admin-url null Set the Pulsar administration service address. Yes
scan.startup.mode latest Configure the Source's startup mode. Available options are earliest, latest, external-subscription, and specific-offsets. No
scan.startup.specific-offsets null This parameter is required when the specific-offsets parameter is specified. No
scan.startup.sub-name null This parameter is required when the external-subscription parameter is specified. No
discovery topic interval null Set the time interval for partition discovery, in unit of milliseconds. No
sink.message-router key-hash Set the routing method for writing messages to the Pulsar partition. Available options are key-hash, round-robin, and custom MessageRouter. No
sink.semantic at-least-once The Sink writes the assurance level of the message. Available options are at-least-once, exactly-once, and none. No
properties empty Set Pulsar's optional configurations, in a format of properties.key='value'. For details, see Configuration parameters. No
key.format null Set the key-based serialization format for Pulsar messages. Available options are No format, optional raw, Avro, JSON, etc. No
key.fields null The SQL definition field to be used when serializing Key, multiple by half comma , concatenated. No
key.fields-prefix null Define a custom prefix for all fields in the key format to avoid name conflicts with fields in the value format. By default, the prefix is empty. If a custom prefix is defined, the Table schema and key.fields are used. No
format or value.format null Set the name with a prefix. When constructing data types in the key format, the prefix is removed and non-prefixed names are used within the key format. Pulsar message value serialization format, support JSON, Avro, etc. For more information, see the Flink format. Yes
value.fields-include ALL The Pulsar message value contains the field policy, optionally ALL, and EXCEPT_KEY. No

Metadata configurations

The METADATA flag is used to read and write metadata in Pulsar messages. The support list is as follows.

Note
The R/W column defines whether a metadata field is readable (R) and/or writable (W). Read-only columns must be declared VIRTUAL to exclude them during an INSERT INTO operation.

Key Data Type Description R/W
topic STRING NOT NULL Topic name of the Pulsar message. R
messageId BYTES NOT NULL Message ID of the Pulsar message. R
sequenceId BIGINT NOT NULL sequence ID of the Pulsar message. R
publishTime TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL Publishing time of the Pulsar message. R
eventTime TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL Generation time of the Pulsar message. R/W
properties MAP<STRING, STRING> NOT NULL Extensions information of the Pulsar message. R/W

Catalog

Flink always searches for tables, views and UDFs in the current catalog and database. To use the Pulsar Catalog and treat the topic in Pulsar as a table in Flink, you should use the pulsarcatalog that has been defined in ./conf/sql-client-defaults.yaml in pulsarcatalog.

tableEnv.useCatalog("pulsarcatalog")
tableEnv.useDatabase("public/default")
tableEnv.scan("topic0")
Flink SQL> USE CATALOG pulsarcatalog;
Flink SQL> USE `public/default`;
Flink SQL> select * from topic0;

The following configuration is optional in the environment file, or it can be overridden in the SQL client session using the SET command.

OptionValueDefaultDescription
`default-database` Default database name public/default When using the Pulsar catalog, the topic in Pulsar is treated as a table in Flink. Therefore, `database` is another name for `tenant/namespace`. The database is the base path for table lookups or creation.
`table-default-partitions` Default topic partition 5 When using the Pulsar catalog, the topic in Pulsar is treated as a table in Flink. The size of the partition is set when creating the topic.

For more details, see DDL configurations.

Note
In Catalog, you cannot delete tenant/namespace or topic.

Advanced features

This section describes advanced features supported by Pulsar Flink connector.

Pulsar primitive types

Pulsar provides some basic native types. To use these native types, you can support them in the following ways.

Stream API environment

PulsarPrimitiveSchema is an implementation of the PulsarDeserializationSchema and PulsarSerializationSchema interfaces.

You can create the required instance in a similar way new PulsarSerializationSchema(String.class).

Table environment

We have created a new Flink format component called atomic that you can use in SQL format. In Source, it translates the Pulsar native type into only one column of RowData. In Sink, it translates the first column of RowData into the Pulsar native type and writes it to Pulsar.

Upsert Pulsar

There is an increasing demand for Upsert mode message queues for three main reasons.

  • Interpret the Pulsar topic as a changelog stream, which interprets records with keys as Upsert events.
  • As part of the real-time pipeline, multiple streams are connected for enrichment and the results are stored in the Pulsar topic for further computation. However, the results may contain updated events.
  • As part of the real-time pipeline, the data stream is aggregated and the results are stored in Pulsar Topic for further computation. However, the results may contain updated events.

Based on these requirements, we support Upsert Pulsar. With this feature, users can read data from and write data to Pulsar topics in an Upsert fashion.

In the SQL DDL definition, you can set the connector to upsert-pulsar to use the Upsert Pulsar connector.

In terms of configuration, the primary key of the Table must be specified, and key.fields, key.fields-prefix cannot be used.

As a source, the Upsert Pulsar connector produces changelog streams, where each data record represents an update or deletion event. More precisely, the value in a data record is interpreted as a UPDATE of the last value of the same key, if this key exists (If the corresponding key does not exist, the UPDATE is considered as an INSERT.). Using the table analogy, data records in the changelog stream are interpreted as UPSERT, also known as INSERT/UPDATE, because any existing row with the same key is overwritten. Also, a message with a null value is treated as a DELETE message.

As a sink, the Upsert Pulsar connector can consume changelog streams. It writes INSERT/UPDATE_AFTER data as normal Pulsar messages and writes DELETE data as Pulsar messages with null value (It indicates that key of the message is deleted). Flink partitions the data based on the value of the primary key so that the messages on the primary key are ordered. And, UPDATE/DELETE messages with the same primary key fall in the same partition.

Key-Shared subscription mode

In some scenarios, users need messages to be strictly guaranteed message order to ensure correct business processing. Usually, in the case of strictly order-preserving messages, only one consumer can consume messages at the same time to guarantee the order. This results in a significant reduction in message throughput. Pulsar designs the Key-Shared subscription mode for such scenarios by adding keys to messages and routing messages with the same Key Hash to the same messenger, which ensures message order and improves throughput.

Pulsar Flink connector supports this feature the as well. This feature can be enabled by configuring the enable-key-hash-range=true parameter. When enabled, the range of Key Hash processed by each consumer is divided based on the parallelism of the task.

Fault tolerance

Pulsar Flink connector 2.7.0 provides different semantics for source and sink.

Source

For Pulsar source, Pulsar Flink connector 2.7.0 provides exactly-once semantic.

Sink

Pulsar Flink connector 2.4.12 only supports at-least-once semantic for sink. Based on transactions supported in Pulsar 2.7.0 and the Flink TwoPhaseCommitSinkFunction API, Pulsar Flink connector 2.7.0 supports both exactly-once and at-least-once semantics for sink. For more information, see here.

Before setting exactly_once semantic for a sink, you need to make the following configuration changes.

  1. In Pulsar, transaction related functions are disabled by default. In this case, you need to set transactionCoordinatorEnabled = true in the configuration file (conf/standalone.conf or conf/broker.conf) .

  2. When creating a sink, set PulsarSinkSemantic.EXACTLY_ONCE. The default value of PulsarSinkSemantic is AT_LEAST_ONCE.

    Example

    SinkFunction<Integer> sink = new FlinkPulsarSink<>(
          adminUrl,
          Optional.of(topic),
          clientConfigurationData,
          new Properties(),
          new PulsarSerializationSchemaWrapper.Builder<>
                  ((SerializationSchema<Integer>) element -> Schema.INT32.encode(element))
                  .useAtomicMode(DataTypes.INT())
                  .build(),
          PulsarSinkSemantic.EXACTLY_ONCE
    );
    

    Additionally, you can set transaction related configurations as below.

    Parameter Description Default value
    PulsarOptions.TRANSACTION_TIMEOUT Timeout for transactions in Pulsar. If the time exceeds, the transaction operation fails. 360000ms
    PulsarOptions.MAX_BLOCK_TIME_MS Maximum time to wait for a transaction to commit or abort. If the time exceeds, the operator throws an exception. 100000ms

    Alternatively, you can override these configurations in the Properties object and pass it into the Sink constructor.

Configuration parameters

This parameter corresponds to the FlinkPulsarSource in StreamAPI, the Properties object in the FlinkPulsarSink construction parameter, and the configuration properties parameter in Table mode.

Parameter Default value Description Effective range
topic null Pulsar topic source
topics null Multiple Pulsar topics connected by half-width commas source
topicspattern null Multiple Pulsar topics with more Java regular matching source
partition.discovery.interval-millis -1 Automatically discover added or removed topics, in unit of milliseconds. If the value is set to -1, it indicates that means not open. source
clientcachesize 100 Set the number of cached Pulsar clients. source, sink
auth-params null Set the authentication parameters for Pulsar clients. source, sink
auth-plugin-classname null Set the authentication class name for Pulsar clients. source, sink
flushoncheckpoint true Write a message to Pulsar topics. sink
failonwrite false When sink error occurs, continue to confirm the message. sink
polltimeoutms 120000 Set the timeout for waiting to get the next message, in unit of milliseconds. source
pulsar.reader.fail-on-data-loss true When data is lost, the operation fails. source
pulsar.reader.use-earliest-when-data-loss false When data is lost, use earliest reset offset. source
commitmaxretries 3 Set the maximum number of retries when an offset is set for Pulsar messages. source
send-delay-millisecond 0 delay millisecond message, just use TableApi, StreamApi usePulsarSerializationSchema.setDeliverAtExtractor Sink
scan.startup.mode null Set the earliest, latest, and the position where subscribers consume news,. It is a required parameter. source
enable-key-hash-range false Enable the Key-Shared subscription mode. source
pulsar.reader.* For details about Pulsar reader configurations, see Pulsar reader. source
pulsar.reader.subscriptionRolePrefix flink-pulsar- When no subscriber is specified, the prefix of the subscriber name is automatically created. source
pulsar.reader.receiverQueueSize 1000 Set the receive queue size. source
pulsar.producer.* For details about Pulsar producer configurations, see Pulsar producer. Sink
pulsar.producer.sendTimeoutMs 30000 Set the timeout for sending a message, in unit of milliseconds. Sink
pulsar.producer.blockIfQueueFull false The Pulsar producer writes messages. When the queue is full, the method is blocked instead of an exception is thrown. Sink

pulsar.reader.* and pulsar.producer.* specify more detailed configuration of the Pulsar behavior. The asterisk sign (*) is replaced by the configuration name in Pulsar. For details, see Pulsar reader and Pulsar producer.

In the DDL statement, the sample which is similar to the following is used.

'properties.pulsar.reader.subscriptionRolePrefix' = 'pulsar-flink-',
'properties.pulsar.producer.sendTimeoutMs' = '30000',

Authentication configuration

For Pulsar instances configured with authentication, the Pulsar Flink connector can be configured in a similar as the regular Pulsar client.

  1. For FlinkPulsarSource and FlinkPulsarSink on Java API, you can use one of the following ways to set up authentication.

    • Set the Properties parameter.

      props.setProperty(PulsarOptions.AUTH_PLUGIN_CLASSNAME_KEY, "org.apache.pulsar.client.impl.auth.AuthenticationToken");
      props.setProperty(PulsarOptions.AUTH_PARAMS_KEY, "token:abcdefghijklmn");
    • Set the ClientConfigurationData parameter, which has a higher priority than the Properties parameter.

      ClientConfigurationData conf = new ClientConfigurationData();
      conf.setServiceUrl(serviceUrl);
      conf.setAuthPluginClassName(className);
      conf.setAuthParams(params);
  2. For the Table and SQL, you can use the following way to set up authentication.

    CREATE TABLE pulsar (
                           `physical_1` STRING,
                           `physical_2` INT,
                           `eventTime` TIMESTAMP(3) METADATA,
                           `properties` MAP<STRING, STRING> METADATA ,
                           `topic` STRING METADATA VIRTUAL,
                           `sequenceId` BIGINT METADATA VIRTUAL,
                           `key` STRING ,
                           `physical_3` BOOLEAN
    ) WITH (
        'connector' = 'pulsar',
        'topic' = 'persistent://public/default/topic82547611',
        'key.format' = 'raw',
        'key.fields' = 'key',
        'value.format' = 'avro',
        'service-url' = 'pulsar://localhost:6650',
        'admin-url' = 'http://localhost:8080',
        'scan.startup.mode' = 'earliest',
        'properties.auth-plugin-classname' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
        'properties.auth-params' = 'token:xxxxxxxxxx',
    )

For details about authentication configuration, see Pulsar Security.

ProtoBuf

Note

Currently, ProtoBuf is an experimental feature.

This feature is based on this PR and is not merged yet. Therefore, it is temporarily placed in this repository as a source code for packaging and dependencies.

Example

create table pulsar (
                        a INT,
                        b BIGINT,
                        c BOOLEAN,
                        d FLOAT,
                        e DOUBLE,
                        f VARCHAR(32),
                        g BYTES,
                        h VARCHAR(32),
                        f_abc_7d INT,
                        `eventTime` TIMESTAMP(3) METADATA,
                        compute as a + 1,
                        watermark for eventTime as eventTime
                        ) with (
                        'connector' = 'pulsar',
                        'topic' = 'test-protobuf',
                        'service-url' = 'pulsar://localhost:6650',
                        'admin-url' = 'http://localhost:8080',
                        'scan.startup.mode' = 'earliest',
                        'format' = 'protobuf',
                        'protobuf.message-class-name' = 'org.apache.flink.formats.protobuf.testproto.SimpleTest'
                        )

INSERT INTO pulsar VALUES (1,2,false,0.1,0.01,'haha', ENCODE('1', 'utf-8'), 'IMAGES',1, TIMESTAMP '2020-03-08 13:12:11.123');

The SimpleTest class must be GeneratedMessageV3.

pulsar-flink's People

Contributors

anonymitaet avatar dd-ray avatar dependabot[bot] avatar gaoran10 avatar gaozhangmin avatar hangc0276 avatar haormj avatar hnail avatar huanli-meng avatar imaffe avatar jiangqiaodd avatar jianyun8023 avatar jiazhai avatar kevinyhzou avatar minchowang avatar nlu90 avatar reswqa avatar rocmarshal avatar seeday avatar shibd avatar sijie avatar streamnativebot avatar sunxiaoguang avatar syhily avatar vzhikserg avatar wuzhanpeng avatar yaalsn avatar yjshen avatar yuruguo avatar zymap avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pulsar-flink's Issues

部署太费劲,总是报错。。。代码是不是有问题?

Describe the bug
A clear and concise description of what the bug is.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[BUG]Getting java.lang.IllegalStateException: InjectionManagerFactory not found

Describe the bug
Running a simple Flink job on AWS EMR to read from a Pulsar topic produces an error:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
	at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
	... 23 more
Caused by: java.lang.IllegalStateException: InjectionManagerFactory not found.
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.inject.Injections.lambda$lookupInjectionManagerFactory$0(Injections.java:98)
	at java.util.Optional.orElseThrow(Optional.java:290)
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.inject.Injections.lookupInjectionManagerFactory(Injections.java:98)
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.inject.Injections.createInjectionManager(Injections.java:68)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig$State.initRuntime(ClientConfig.java:432)
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.util.collection.Values$LazyValueImpl.get(Values.java:341)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig.getRuntime(ClientConfig.java:826)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRequest.getConfiguration(ClientRequest.java:285)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.validateHttpMethodAndEntity(JerseyInvocation.java:143)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.<init>(JerseyInvocation.java:112)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.<init>(JerseyInvocation.java:108)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.<init>(JerseyInvocation.java:99)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$AsyncInvoker.method(JerseyInvocation.java:706)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$AsyncInvoker.get(JerseyInvocation.java:566)
	at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:165)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadataAsync(TopicsImpl.java:283)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadata(TopicsImpl.java:267)
	at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitionsAll(PulsarMetadataReader.java:347)
	at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitions(PulsarMetadataReader.java:337)
	at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.discoverTopicChanges(PulsarMetadataReader.java:118)
	at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.open(FlinkPulsarSource.java:350)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)`

To Reproduce
Steps to reproduce the behavior:

  1. AWS EMR cluster 5.29.0 (Hadoop 2.8.5, Flink 1.9.1, Scala 2.11)
  2. Dependencies in build.sbt:
`ThisBuild / scalaVersion := "2.11.12"`
`val flinkVersion = "1.9.1"`
`val pulsarVersion = "2.4.9"`
`val flinkDependencies = Seq(`
`  "org.scalactic" %% "scalactic" % "3.1.1",`
`  "org.scalatest" %% "scalatest" % "3.1.1" % "test",`
`  "org.apache.flink" %% "flink-scala" % flinkVersion, `
`  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, `
`  "io.streamnative.connectors" %% "pulsar-flink-connector" % pulsarVersion,`
`)`
  1. Read from a Pulsar topic with:
`val env = StreamExecutionEnvironment.getExecutionEnvironment`
`env.addSource(
        new FlinkPulsarRowSource(admin_url, conf, props).setStartFromEarliest()
).print()`
  1. See error

[BUG]Restart the program to verify whether to continue consumption from the last consumption location

val source: FlinkPulsarSource[Array[ComeDetailBean]] = new FlinkPulsarSource(serverUrl,adminUrl,desc,props)
source.setStartFromSubscription("gpp_come")

I set the substraction name to verify whether the program will consume from the last location after it restarts

but I get error log:

2020-07-15 15:45:52,745 INFO  org.apache.pulsar.client.impl.ConsumerImpl                    - [persistent://cd/dev/flink_gpp_come_1-partition-1][gpp_come-reader-5b05e44e2c] Subscribing to topic on cnx [id: 0xf19c64e0, L:/10.52.10.21:59634 - R:112.124.209.229/112.124.209.229:16656]
2020-07-15 15:45:52,746 INFO  org.apache.pulsar.client.impl.ConsumerImpl                    - [persistent://cd/dev/flink_gpp_come_1-partition-2][gpp_come-reader-d9f531ffe6] Subscribing to topic on cnx [id: 0x15ab56f5, L:/10.52.10.21:59638 - R:112.124.209.229/112.124.209.229:16656]
2020-07-15 15:45:52,749 INFO  org.apache.pulsar.client.impl.ConsumerImpl                    - [persistent://cd/dev/flink_gpp_come_1-partition-0][gpp_come-reader-db3e0364c5] Subscribing to topic on cnx [id: 0xdb18977d, L:/10.52.10.21:59636 - R:112.124.209.229/112.124.209.229:16656]
2020-07-15 15:45:52,750 INFO  org.apache.pulsar.client.impl.ConsumerImpl                    - [persistent://cd/dev/flink_gpp_come_1-partition-2][gpp_come-reader-d9f531ffe6] Subscribed to topic on 112.124.209.229/112.124.209.229:16656 -- consumer: 13
2020-07-15 15:45:52,758 INFO  org.apache.pulsar.client.impl.ConsumerImpl                    - [persistent://cd/dev/flink_gpp_come_1-partition-2][gpp_come-reader-d9f531ffe6] Get topic last message Id
2020-07-15 15:45:52,761 INFO  org.apache.pulsar.client.impl.ConsumerImpl                    - [persistent://cd/dev/flink_gpp_come_1-partition-2][gpp_come-reader-d9f531ffe6] Successfully getLastMessageId 698660:0
2020-07-15 15:45:52,762 ERROR org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread  - the start message id is beyond the last commit message id, with topic:persistent://cd/dev/flink_gpp_come_1-partition-2
2020-07-15 15:45:52,762 ERROR org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread  - Error while closing Pulsar reader org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException
2020-07-15 15:45:52,765 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed (1/1) (6ed38ee2c0456a1c6cc6f9fff2ce0b46) switched from RUNNING to FAILED.
java.lang.RuntimeException: start message id beyond the last commit
	at org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread.skipFirstMessageIfNeeded(ReaderThread.java:153)
	at org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread.run(ReaderThread.java:98)
2020-07-15 15:45:52,765 INFO  org.apache.pulsar.client.impl.ConsumerImpl                    - [persistent://cd/dev/flink_gpp_come_1-partition-2] [gpp_come-reader-d9f531ffe6] Closed consumer
2020-07-15 15:45:52,766 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed (1/1) (6ed38ee2c0456a1c6cc6f9fff2ce0b46).
2020-07-15 15:45:52,766 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed (1/1) (6ed38ee2c0456a1c6cc6f9fff2ce0b46) [FAILED]
2020-07-15 15:45:52,766 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed(1/1) 6ed38ee2c0456a1c6cc6f9fff2ce0b46.
2020-07-15 15:46:02,146 WARN  org.apache.pulsar.client.impl.ConsumerImpl                    - [persistent://cd/dev/flink_gpp_come_1-partition-1][gpp_come-reader-f1be7bfb17] Failed to subscribe to topic on 112.124.209.229/112.124.209.229:16656
2020-07-15 15:46:02,146 WARN  org.apache.pulsar.client.impl.PulsarClientImpl                - [persistent://cd/dev/flink_gpp_come_1-partition-1] Failed to get create topic reader
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 39 lookup request timedout after ms 30000
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
	at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:700)
	at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1026)
	at org.apache.pulsar.client.impl.ClientCnx.lambda$channelActive$0(ClientCnx.java:187)
	at org.apache.pulsar.shade.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
	at org.apache.pulsar.shade.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176)
	at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 39 lookup request timedout after ms 30000
	at org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1025)
	... 10 more
2020-07-15 15:46:02,147 WARN  org.apache.pulsar.client.impl.ClientCnx                       - [id: 0xf19c64e0, L:/10.52.10.21:59634 - R:112.124.209.229/112.124.209.229:16656] request 39 timed out after 30000 ms
2020-07-15 15:46:02,147 WARN  org.apache.pulsar.client.impl.ConsumerImpl                    - [persistent://cd/dev/flink_gpp_come_1-partition-0][gpp_come-reader-fcce1ceb6e] Failed to subscribe to topic on 112.124.209.229/112.124.209.229:16656
2020-07-15 15:46:02,147 WARN  org.apache.pulsar.client.impl.PulsarClientImpl                - [persistent://cd/dev/flink_gpp_come_1-partition-0] Failed to get create topic reader
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 41 lookup request timedout after ms 30000
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
	at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:700)
	at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1026)
	at org.apache.pulsar.client.impl.ClientCnx.lambda$channelActive$0(ClientCnx.java:187)
	at org.apache.pulsar.shade.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
	at org.apache.pulsar.shade.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176)
	at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:387)
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 41 lookup request timedout after ms 30000
	at org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1025)
	... 10 more

=============================================
I use pulsar-flink-connector_2.11-2.4.23 , this problem still exists.

2020-07-15 17:38:20,951 INFO  org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader  - Committing offset 698658:-1:0 to topic persistent://cd/dev/flink_gpp_come_1-partition-0
2020-07-15 17:38:20,966 INFO  org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader  - Cannot commit cursor offset %d since the topic 698658:-1:0 has been deleted during execution
2020-07-15 17:38:20,966 INFO  org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader  - Committing offset 698659:-1:1 to topic persistent://cd/dev/flink_gpp_come_1-partition-1
2020-07-15 17:38:20,976 INFO  org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader  - Cannot commit cursor offset %d since the topic 698659:-1:1 has been deleted during execution
2020-07-15 17:38:20,976 INFO  org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader  - Committing offset 698660:2:2 to topic persistent://cd/dev/flink_gpp_come_1-partition-2
2020-07-15 17:38:20,985 INFO  org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader  - Successfully committed offset 698660:2:2 to topic persistent://cd/dev/flink_gpp_come_1-partition-2
2020-07-15 17:38:23,536 WARN  org.apache.pulsar.client.impl.ConsumerImpl                    - [persistent://cd/dev/flink_gpp_come_1-partition-0][gpp_come-reader-7dbd7e0c82] Failed to subscribe to topic on 112.124.209.229/112.124.209.229:16656
2020-07-15 17:38:23,536 WARN  org.apache.pulsar.client.impl.PulsarClientImpl                - [persistent://cd/dev/flink_gpp_come_1-partition-0] Failed to get create topic reader
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 33 lookup request timedout after ms 30000
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
	at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:700)
	at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1026)
	at org.apache.pulsar.client.impl.ClientCnx.lambda$channelActive$0(ClientCnx.java:187)
	at org.apache.pulsar.shade.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
	at org.apache.pulsar.shade.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176)
	at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 33 lookup request timedout after ms 30000
	at org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1025)
	... 10 more
2020-07-15 17:38:23,536 WARN  org.apache.pulsar.client.impl.ClientCnx                       - [id: 0x51ad5f27, L:/10.52.10.22:50758 - R:112.124.209.229/112.124.209.229:16656] request 33 timed out after 30000 ms
2020-07-15 17:38:23,537 WARN  org.apache.pulsar.client.impl.ConsumerImpl                    - [persistent://cd/dev/flink_gpp_come_1-partition-1][gpp_come-reader-b549d31f3b] Failed to subscribe to topic on 112.124.209.229/112.124.209.229:16656
2020-07-15 17:38:23,537 WARN  org.apache.pulsar.client.impl.PulsarClientImpl                - [persistent://cd/dev/flink_gpp_come_1-partition-1] Failed to get create topic reader
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 34 lookup request timedout after ms 30000
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
	at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:700)
	at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1026)
	at org.apache.pulsar.client.impl.ClientCnx.lambda$channelActive$0(ClientCnx.java:187)
	at org.apache.pulsar.shade.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
	at org.apache.pulsar.shade.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176)
	at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:387)
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 34 lookup request timedout after ms 30000
	at org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1025)
	... 10 more

Looking forward to your reply , thank you.

[BUG] unable to connect to pulsar source using token with consume permission

Describe the bug
I'm trying to use the FlinkPulsarSource to connect a simple Flink app to read messages from a persistent Pulsar topic. I'm authenticating using the org.apache.pulsar.client.impl.auth.AuthenticationToken plugin, and providing a JWT for a user with consume permissions on a specific a topic.

As suggested in the documentation, I create the source and provide a client configuration with authentication:

FlinkPulsarSource<String> source = new FlinkPulsarSource<>(
    adminUrlString, // The admin URL
    clientConfig, // The client config, with auth included
    new SimpleStringSchema(),
    sourceProperties
);

Unfortunately, the connector fails with:

org.apache.pulsar.client.admin.PulsarAdminException$NotAuthorizedException: HTTP 401 Unauthorized

I tracked this down to a rejected call to the admin API at /admin/v2/persistent/<tenant>/<namespace>/<topic>/partitions.

Expected behavior
Since the authentication token provided has consume permissions for the desired topic, I would expect the FlinkPulsarSource to correctly connect and start consuming messages.

Unless I'm missing something, the token used to authenticate will need to be a superuser for the entire tenant? Wouldn't that render publish/consume authorization at the topic level useless?

[BUG] snatshot state oom for flink 1.09

Describe the bug
We are seeing the below err in the flink pulsar connector use case
2020-09-16 11:44:39.764 [Source: opr_event -> (opr_event_monitoring, invalid_data_topic_monitoring, Sink: invalid_data_topic, String_to_JsonNode -> filtered-input-stream -> (ITOM-Analytics-Event-Correlation
-OBM-filter_monitoring, mapped-input-stream -> ITOM-Analytics-Event-Correlation-OBM-mapper_monitoring)) (7/8)] DEBUG org.apache.flink.runtime.state.TaskStateManagerImpl - Operator 140d9cc943c079ec895d010fe
c9ee23b has remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={topic-partition-offset-states_subName=StateMetaInfo{offsets=[1106, 1127
, 1148, 1169, 1190, 1211, 1232, 1253, 1274, 1295, 1316, 1337, 1358, 1379, 1400, 1421, 1442, 1463, 1484, 1505, 1526, 1547, 1568, 1589, 1610, 1631, 1652, 1673, 1694, 1715, 1736, 1757, 1778, 1799, 1820,
it has like this million of records. and the state restoration is not happening .can you please confirm weather the purge is happening or not on offsets.

To Reproduce
We found that the contents of unionSubscriptionNameStates were not cleaned up in PulsarFlinkSource#snapshotState, which would cause flink States oom.

[BUG] java.lang.NoClassDefFoundError: Could not initialize class org.apache.pulsar.common.compression.CompressionCodecProvider

Describe the bug

When using the 2.5.0, 2.5.3 connector, an Exception will occur when the connector is placed under the flink lib.

java.lang.NoClassDefFoundError: Could not initialize class org.apache.pulsar.common.compression.CompressionCodecProvider

To Reproduce

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[BUG] Topic is created with Flink Pulsar Source [with Non-existing Topic] when Auto Topic creation is disable in KoP broker.conf

Describe the bug
When Auto topic creation is disabled in KoP, non partitioned topic is getting created with Flink Pulsar Source.

To Reproduce
Steps to reproduce the behavior:

  1. Disable auto topic creation in KoP using broker.conf
  2. Try to create a consumer using Flink Pulsar Source with non existing topic
  3. Do a topics list using pulsar admin to see a non partitioned topic is created

Expected behavior
Topics should not be created while creating FlinkPulsarSource[ FlinkPulsarConsumer] when auto topic creation is turned off in KoP.

Please let us know the behavior of Flink Pulsar Sink and Source in the absence of topics. Should it fail ?

Additional context
Disabling of auto topic creation is although honored by FlinkPulsarSink and a clear exception is observed in KoP logs. Please find more details below and attached logs[refer kop-2].(
kop-1.log
kop-2.log
kop-0.log
)

Exception Details

Source Topic : dp_moving_average_source_data_topic6 [ Getting created . Not expected ]
Sink Topics : dp_moving_average_dest_data_topic6 [Not getting created . Expected ]

Expected an exception but topic is getting created:

06:33:20.672 [pulsar-web-31-3] INFO  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [itomdipulsar-proxy][persistent://public/default/**dp_moving_average_source_data_topic6**] Creating subscription flink-pulsar-60659686-c796-4dec-b9b7-0dc7e85a28d9 at message id 9223372036854775807:9223372036854775807:-1
06:33:20.677 [pulsar-ordered-OrderedExecutor-1-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger public/default/persistent/dp_moving_average_source_data_topic6
06:33:20.679 [bookkeeper-ml-workers-OrderedExecutor-4-0] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper - **Creating '/managed-ledgers/public/default/persistent/dp_moving_average_source_data_topic6'**
06:33:20.686 [pulsar-ordered-OrderedExecutor-2-0-EventThread] INFO  org.apache.pulsar.zookeeper.ZooKeeperChildrenCache - [State:CONNECTED Timeout:30000 sessionid:0x3005fc2eacc0006 local:/172.16.16.160:60330 remoteserver:itomdipulsar-zookeeper-2.itomdipulsar-zookeeper/172.16.16.169:2181 lastZxid:4295022680 xid:16226 sent:16226 recv:16406 queuedpkts:0 pendingresp:0 queuedevents:1] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/managed-ledgers/public/default/persistent
06:33:20.688 [pulsar-ordered-OrderedExecutor-2-0-EventThread] INFO  org.apache.pulsar.zookeeper.ZooKeeperChildrenCache - reloadCache called in zookeeperChildrenCache for path /managed-ledgers/public/default/persistent
06:33:20.692 [bookkeeper-ml-workers-OrderedExecutor-4-0] WARN  org.apache.bookkeeper.client.BookieWatcherImpl - New ensemble: [itomdipulsar-bookkeeper-1.itomdipulsar-bookkeeper.pulsar-loader.svc.cluster.local.:3181, itomdipulsar-bookkeeper-2.itomdipulsar-bookkeeper.pulsar-loader.svc.cluster.local.:3181] is not adhering to Placement Policy. quarantinedBookies: []

Exception as expected in case of Flink Pulsar Sink :

06:33:20.576 [bookkeeper-ml-workers-OrderedExecutor-3-0] ERROR org.apache.pulsar.broker.service.ServerCnx - [/172.16.16.1:39496] **Failed to create topic persistent://public/default/dp_moving_average_dest_data_topic6**
java.util.concurrent.CompletionException: java.util.NoSuchElementException: No value present
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_242]
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_242]
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:618) ~[?:1.8.0_242]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_242]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_242]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) ~[?:1.8.0_242]
        at org.apache.pulsar.broker.service.BrokerService$2.openLedgerFailed(BrokerService.java:926) ~[org.apache.pulsar-pulsar-broker-2.5.0-ad0224407.jar:2.5.0-ad0224407]
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$8(ManagedLedgerFactoryImpl.java:343) ~[org.apache.pulsar-managed-ledger-2.5.0-ad0224407.jar:2.5.0-ad0224407]
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) ~[?:1.8.0_242]
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) ~[?:1.8.0_242]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_242]
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_242]
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeFailed(ManagedLedgerFactoryImpl.java:336) ~[org.apache.pulsar-managed-ledger-2.5.0-ad0224407.jar:2.5.0-ad0224407]
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$1.operationFailed(ManagedLedgerImpl.java:338) ~[org.apache.pulsar-managed-ledger-2.5.0-ad0224407.jar:2.5.0-ad0224407]
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$1(MetaStoreImplZookeeper.java:164) ~[org.apache.pulsar-managed-ledger-2.5.0-ad0224407.jar:2.5.0-ad0224407]
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.5.0-ad0224407.jar:2.5.0-ad0224407]
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_242]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_242]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
Caused by: java.util.NoSuchElementException: No value present
        at java.util.Optional.get(Optional.java:135) ~[?:1.8.0_242]
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_242]
        ... 18 more

[BUG] Caused by: java.lang.NoClassDefFoundError: scala/Product$class

Describe the bug
Flink report error when run app :Caused by: java.lang.NoClassDefFoundError: scala/Product$class. And I get such info when search for this error:your scala version is 2.12.x but you are using libraries compiled in scala version 2.11. I also to check the flink scala version and docker image info show flink scala version is 2.12:

SCALA_VERSION=2.12",
"GPG_KEY=E2C45417BED5C104154F341085BACB5AEFAE3202",
 "FLINK_HOME=/opt/flink",
"FLINK_URL_FILE_PATH=flink/flink-1.9.1/flink-1.9.1-bin-scala_2.12.tgz",
"FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.9.1/flink-1.9.1-bin-scala_2.12.tgz",
"FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.12.tgz.asc"

So I rebuild the app with change POM.xml to:

<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.1</scala.version>
<PULSAR_FLINK_VERSION>2.4.1</PULSAR_FLINK_VERSION>

But it seems this lib didn't have any scala 2.12 version lib in repo.
So there is any plan to push scala 2.12 version lib to repo?

[BUG] topic-partition-offset-states_subName State memory leak

Describe the bug
We are seeing the below err in the flink pulsar connector use case
2020-09-16 11:44:39.764 [Source: opr_event -> (opr_event_monitoring, invalid_data_topic_monitoring, Sink: invalid_data_topic, String_to_JsonNode -> filtered-input-stream -> (XXXX-Analytics-Event-Correlation
-XXX-filter_monitoring, mapped-input-stream -> XXXX-Analytics-Event-Correlation-XXX-mapper_monitoring)) (7/8)] DEBUG org.apache.flink.runtime.state.TaskStateManagerImpl - Operator 140d9cc943c079ec895d010fe
c9ee23b has remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={topic-partition-offset-states_subName=StateMetaInfo{offsets=[1106, 1127
, 1148, 1169, 1190, 1211, 1232, 1253, 1274, 1295, 1316, 1337, 1358, 1379, 1400, 1421, 1442, 1463, 1484, 1505, 1526, 1547, 1568, 1589, 1610, 1631, 1652, 1673, 1694, 1715, 1736, 1757, 1778, 1799, 1820,
it has like this million of records. and the state restoration is not happening .can you please confirm weather the purge is happening or not on offsets.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
pulsar-flink connector 2.4.28
pulsar-flink connector 2.5.0、2.5.1 for flink 1.9

Useing JsonDeser.of() and custom schema to resolve JSON format message error

my code :
1.use JsonDeser.of()
val desc: JsonDeser[MyBean] = JsonDeser.of(classOf[MyBean])
val source: FlinkPulsarSource[MyBean] = new FlinkPulsarSource(serverUrl,adminUrl,desc,props)

2.use custom schema
val source: FlinkPulsarSource[MyBean] = new FlinkPulsarSource(serverUrl,adminUrl,new JsonSchema(),props)

val dataStream = env.addSource(source)
dataStream.print()
env.execute("pulsar_flink")

ERROR log
2020-06-24 17:48:28 ERROR ReaderThread [org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread.run(ReaderThread.java:115)]: Error while closing Pulsar reader org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException
2020-06-24 17:48:28 ERROR FlinkPulsarSource [org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.cancel(FlinkPulsarSource.java:610)]: Failed to cancel the Pulsar Fetcher java.lang.RuntimeException:Failed to remove cursor for persistent://dev/dbtest/flink-partition-0
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.removeCursor(PulsarMetadataReader.java:272)
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher.cancel(PulsarFetcher.java:419)
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.cancel(FlinkPulsarSource.java:608)
at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:157)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:508)
at org.apache.flink.runtime.taskmanager.Task.cancelInvokable(Task.java:1281)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:773)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Client instance has been closed.
at org.apache.pulsar.shade.org.glassfish.jersey.internal.guava.Preconditions.checkState(Preconditions.java:169)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyClient.checkNotClosed(JerseyClient.java:327)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.checkNotClosed(JerseyWebTarget.java:140)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.path(JerseyWebTarget.java:151)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.path(JerseyWebTarget.java:60)
at org.apache.pulsar.client.admin.internal.TopicsImpl.topicPath(TopicsImpl.java:962)
at org.apache.pulsar.client.admin.internal.TopicsImpl.deleteSubscriptionAsync(TopicsImpl.java:640)
at org.apache.pulsar.client.admin.internal.TopicsImpl.deleteSubscription(TopicsImpl.java:625)
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.removeCursor(PulsarMetadataReader.java:265)
... 9 more

2020-06-24 17:48:28 ERROR Task [org.apache.flink.runtime.taskmanager.Task.cancelInvokable(Task.java:1284)]: Error while canceling task Source: Custom Source -> Sink: Print to Std. Out (1/1).
java.lang.RuntimeException: java.lang.RuntimeException: Failed to remove cursor for persistent://dev/dbtest/flink-partition-0
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.cancel(FlinkPulsarSource.java:611)
at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:157)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:508)
at org.apache.flink.runtime.taskmanager.Task.cancelInvokable(Task.java:1281)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:773)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to remove cursor for persistent://dev/dbtest/flink-partition-0
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.removeCursor(PulsarMetadataReader.java:272)
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher.cancel(PulsarFetcher.java:419)
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.cancel(FlinkPulsarSource.java:608)
... 7 more
Caused by: java.lang.IllegalStateException: Client instance has been closed.
at org.apache.pulsar.shade.org.glassfish.jersey.internal.guava.Preconditions.checkState(Preconditions.java:169)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyClient.checkNotClosed(JerseyClient.java:327)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.checkNotClosed(JerseyWebTarget.java:140)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.path(JerseyWebTarget.java:151)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.path(JerseyWebTarget.java:60)
at org.apache.pulsar.client.admin.internal.TopicsImpl.topicPath(TopicsImpl.java:962)
at org.apache.pulsar.client.admin.internal.TopicsImpl.deleteSubscriptionAsync(TopicsImpl.java:640)
at org.apache.pulsar.client.admin.internal.TopicsImpl.deleteSubscription(TopicsImpl.java:625)
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.removeCursor(PulsarMetadataReader.java:265)
... 9 more

PS: I try to print type of datastream ,it is GenericType<com.kt.bd.pulsar2flink.Driver.MyBean>
when is use new SimpleStringSchema() ,it is String and use new SimleSringSchema() it is correct
I'm not sure this information is useful.

Delete and recreate of topics for the running pipeline leads to job failure

Describe the bug
A clear and concise description of what the bug is.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

Option to specify start message Id when the specified message Id is invalid

Is your feature request related to a problem? Please describe.
There are multiple scenarios which can cause messageId persisted in savepoint/checkpoint to be invalid. Few examples : 1. The persistent store in pulsar is recreated because of system failures 2. The topic was recreated because of accidental deletion of topic. There should be options in connector ( like auto.offset.reset in Kafka) to indicate the start offset when the messageId in Flink savepoint/checkpoint is no longer valid in Pulsar.

Describe the solution you'd like
auto.offset.reset flag

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

[BUG] Compatibility of pulsar schema in flink 1.11

Describe the bug
In table mode, flink 1.11 requires the schema information to be set before start. If you want to set it, you need to get it from the MetadataReader in advance.

If the schema is set, the schema will be converted to schema properties, causing the DataType to lose the bound physical type, ex DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class). This will cause the start check to fail

When restored from the backed up savepoint and checkpoint .unable to run transformation of data

Describe the bug
A clear and concise description of what the bug is.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[BUG]use this connector in flink 1.11.1 ,when i run flink stop -p jobid command ,the command will timeout, i cannot stop the job

Describe the bug
pulsar version is 2.6.0
flink version is 1.11.1
flink job is a source connector job

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context

2020-07-30 20:55:39
java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: Failed to remove cursor for persistent://public/default/person-flink-test-topic
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:567)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:538)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Failed to remove cursor for persistent://public/default/person-flink-test-topic
	at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.cancel(FlinkPulsarSource.java:616)
	at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.close(FlinkPulsarSource.java:578)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
	at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130)
	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:91)
	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
	... 8 more
Caused by: java.lang.RuntimeException: Failed to remove cursor for persistent://public/default/person-flink-test-topic
	at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.removeCursor(PulsarMetadataReader.java:281)
	at org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher.cancel(PulsarFetcher.java:429)
	at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.cancel(FlinkPulsarSource.java:613)
	... 20 more
Caused by: java.lang.IllegalStateException: Client instance has been closed.
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.guava.Preconditions.checkState(Preconditions.java:169)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyClient.checkNotClosed(JerseyClient.java:327)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.checkNotClosed(JerseyWebTarget.java:140)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.path(JerseyWebTarget.java:151)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.path(JerseyWebTarget.java:60)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.topicPath(TopicsImpl.java:962)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.deleteSubscriptionAsync(TopicsImpl.java:640)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.deleteSubscription(TopicsImpl.java:625)
	at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.removeCursor(PulsarMetadataReader.java:274)
	... 22 more

[Task] Add test case for backup & restore the connector state

Test Case

Flink state is backed up from system 1. Pulsar is installed on a system 2. The topic is created on system 2. Flink state is restored in system2. The cursor position in the flink state does not correspond to the cursor position of the topic in the Pulsar in system 2. With failOnDataLoss set to false, the flink job should run when started from the restored state. We are yet to validate this case.

[FEATURE]WIP: support Flink 1.11 and using flink-sql DDL to operate with pulsar

Is your feature request related to a problem? Please describe.
I use pulsar in Flink ETL, and i am trying to consume messages from pulsar and write to hive table (HDFS). One solution is using flink sql DDL to submit a flink sql task to address this task.

However, the current pulsar-flink connector has following problems.

  1. only support flink 1.9.1 and this version of flink not support sql completely (e.g. not support write data to hive using parquet format)
  2. schema management not work fine.
  3. the design of message deserialization can't support schema defined in DDL

[BUG] Flink Pipeline going to failing state with Error : start message id beyond the last commit

Describe the bug
Seeing Below Error .When followed following steps.

  1.  Placed pipeline and flink job was running.
    
  2.  Started streaming data to source topic and is in progressing during below steps.
    
  3.  Process triggered to upgrade to the latest connector .
    
  4.  The failOnDataLoss was set to false before the connector upgrade.
    

2020-07-10 14:48:46.334 [...] INFO org.apache.flink.runtime.taskmanager.Task - Source: opr_event -> (...) (4/8) (38127628da3ccada63fb0ad1e0987ada) switched from RUNNING to FAILED.
java.lang.RuntimeException: start message id beyond the last commit
at org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread.skipFirstMessageIfNeeded(ReaderThread.java:156) ~[pulsar-flink-connector_2.11-2.4.23.jar:2.4.23]
at org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread.run(ReaderThread.java:98) ~[pulsar-flink-connector_2.11-2.4.23.jar:2.4.23]

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[FEATURE] Let users use builder to create source and sink

Is your feature request related to a problem? Please describe.
When users create sinks and sources through java code, it is not intuitive to use properties to configure pulsar information. This causes difficulty in maintenance, and users do not know how to use it.

Describe the solution you'd like
Add builder to allow users to create sink and source through builder.

Describe alternatives you've considered
Properties replaced to java config

Additional context

  • Users don’t know how to use topicsPattern #114
  • The user uses the source construction method to instantiate and is not easy to modify #113

[QUESTION] Error after running for few hours

Hello, I am facing a problem on pulsar flink where this NoClassDefFoundError: org/apache/pulsar/shade/com/yahoo/sketches/quantiles/DoublesAuxiliary exception will throw after it had run for some time. The job will run again if I restarted it, but it will randomly stop after few hours again, with the same exception. Any idea what caused it?

Thanks!

2020-08-11 23:10:27,150 WARN  org.apache.pulsar.shade.io.netty.util.HashedWheelTimer        - An exception was thrown by TimerTask.
java.lang.NoClassDefFoundError: org/apache/pulsar/shade/com/yahoo/sketches/quantiles/DoublesAuxiliary
	at org.apache.pulsar.shade.com.yahoo.sketches.quantiles.DoublesSketch.constructAuxiliary(DoublesSketch.java:607)
	at org.apache.pulsar.shade.com.yahoo.sketches.quantiles.DoublesSketch.getQuantiles(DoublesSketch.java:233)
	at org.apache.pulsar.client.impl.ProducerStatsRecorderImpl.lambda$init$0(ProducerStatsRecorderImpl.java:129)
	at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
	at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
	at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
2020-08-11 23:10:27,150 INFO  org.apache.pulsar.client.impl.ConnectionHandler               - [persistent://public/flink_u/occupancy-data-branch-19467] [pulsar-cluster-1-360-22497] Reconnecting after timeout
2020-08-11 23:10:27,150 WARN  org.apache.pulsar.client.impl.ConnectionHandler               - [persistent://public/flink_u/occupancy-data-branch-18499] [pulsar-cluster-1-362-42067] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException: Connection already closed
2020-08-11 23:10:27,150 INFO  org.apache.pulsar.client.impl.ConnectionHandler               - [persistent://public/flink_u/occupancy-data-branch-20293] [pulsar-cluster-1-314-36923] Reconnecting after connection was closed
2020-08-11 23:10:27,150 INFO  org.apache.pulsar.client.impl.ConnectionHandler               - [persistent://public/flink_u/occupancy-data-branch-18464] [pulsar-cluster-1-360-22500] Reconnecting after timeout
2020-08-11 23:10:27,150 WARN  org.apache.pulsar.client.impl.ConnectionHandler               - [persistent://public/flink_u/occupancy-data-branch-20293] [pulsar-cluster-1-314-36923] Exception thrown while getting connection: 
java.lang.NoClassDefFoundError: org/apache/pulsar/shade/com/google/common/cache/RemovalCause
	at org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.expireEntries(LocalCache.java:2593)
	at org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.tryExpireEntries(LocalCache.java:2574)
	at org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.getLiveValue(LocalCache.java:2714)
	at org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2030)
	at org.apache.pulsar.shade.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
	at org.apache.pulsar.shade.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3973)
	at org.apache.pulsar.shade.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957)
	at org.apache.pulsar.common.naming.TopicName.get(TopicName.java:88)
	at org.apache.pulsar.client.impl.PulsarClientImpl.getConnection(PulsarClientImpl.java:606)
	at org.apache.pulsar.client.impl.ConnectionHandler.grabCnx(ConnectionHandler.java:67)
	at org.apache.pulsar.client.impl.ConnectionHandler.lambda$reconnectLater$1(ConnectionHandler.java:101)
	at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
	at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
	at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

[FEATURE] Unsupported features in Pulsar Catalog

  • Database
    • alterDatabase - currently, a database is mapped to a namespace in Pulsar, we are unable to store or change metadata of the DB in k/v manner
  • View
    • listViews - we don't support view in pulsar on top of a topic
  • Table
    • alterTable - metadata in k/v of a topic cannot be store in Pulsar, right?
    • renameTable - can we change the name of a topic after creation?
  • Partition - partition in Apache Hive is extra columns that store their value as partition path
    • listPartitions
    • listPartitions
    • getPartition
    • partitionExists
    • createPartition
    • dropPartition
    • alterPartition
  • Function - UDFs that handle fields calculation
    • listFunctions
    • getFunction
    • functionExists
    • createFunction
    • alterFunction
    • dropFunction
  • Statistics
    • getTableStatistics - such as row count, file count, total size, raw data size, etc.
    • getTableColumnStatistics - such as null count, max and min values.
    • alterTableStatistics
    • alterTableColumnStatistics
    • getPartitionStatistics - partition is regarded as a sub-table, share same structure with table statistics
    • getPartitionColumnStatistics - same with column statistics
    • alterPartitionStatistics
    • alterPartitionColumnStatistics

[BUG] AlreadyClosedException in the Pulsar client at the start of a Flink Stream

Describe the bug
We are trying to start a Flink Stream using the flink-connector, but there seem to be an issue with the CachedPulsarClient (AlreadyClosedException on the pulsar client of the FlinkPulsarSink).

To Reproduce
The Flink Stream have 2 sources (2 FlinkPulsarSource) and a FlinkPulsarSink as output. Both FlinkPulsarSource instanciate a PulsarClient in the CachedPulsarClient (one per source, no matter the parallelism value), the FlinkPulsarSink instanciates one PulsarClient per slot used (3 instances with a parallelism set to 3). The CachedPulsarClient have a maximum cache size of 5 by default, so until a parallelism of 3 there is no issue (1 client for each source, and 3 clients for the sink), but when the parallelism is set to a greater value than the size of the cache, we have this exception :

Caused by: org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: Client already closed : state = 
Closing
	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:870)
	at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93)
	at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.createProducer(FlinkPulsarSinkBase.java:264)

The client appears to be closed while still in use. We tried to change the clientcachesize parameter to increase the number of clients stored in the cache, but the parameter doesn't seem to work (only 5 clients seem to be stored while debugging), just a guess : the setCacheSize method on the CachedPulsarClient set a static value but as the guavaCache is also a static value of the same class, it's already set before the cacheSize could be changed.

Anyway, we don't know if increasing the clientcachesize should be a real good solution or if a single client should be reused from the cache across all the different tasks ?

Expected behavior
No AlreadyClosedException at the start of the stream.

Additional context

  • flink-pulsar 2.5.3
  • flink 1.11.1
  • Scala 2.12
  • Pulsar 2.6.0 cluster

[FEATURE] Pulsar Flink Integration

This is the master issue for tracking all the tasks for pulsar.

Streaming Connector

  • Implement the connectors using Flink 1.9 with schema #4
  • Test case for various data types (serde)
  • Double check if caseInsensitive conf for client/producer/consumer exists for flink #6
  • Make CachedPulsarClient cache expire timeout configurable
  • Example on streaming source and sink

Table API

  • Enable Pulsar as table sink and source #5
  • Integration Test #5
  • Example on table api

TODOs

Batch Connector

State backend

Implement Pulsar / BookKeeper based state backend

[BUG] Pulsar Flink 2.5.3 source cannot be started

Describe the bug
A clear and concise description of what the bug is.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[FEATURE] Add batch support in flink connector

Add batch support in Pulsar-Flink to use segment reader

  • For completed segments use segment reader
  • For in-progress / active segments use stream (sub-stream) reader

This is a pre-task for the columnar offloader.

[BUG] NoClassDefFoundError: java.lang.NoClassDefFoundError: org/apache/pulsar/shade/io/netty/buffer/PoolArena$1

Describe the bug
While executing StreamRead in https://github.com/yjshen/flink-connector-test, in STDOUT, Flink would complain about NoClassDefFoundError: https://gist.github.com/yjshen/c4960d78f503a8ee54b9e364271812d4

However, the PoolArena$1.class does exist in our connector fat jar:

Yijie-MBP18 ॐ  ~/pulsar-project/pulsar-flink/target:
7251 ± jar tvf pulsar-flink-connector_2.11-2.4.0.jar| grep PoolArena
  1054 Wed Oct 09 15:47:52 CST 2019 org/apache/pulsar/shade/io/netty/buffer/PoolArenaMetric.class
  4409 Wed Oct 09 15:47:52 CST 2019 org/apache/pulsar/shade/io/netty/buffer/PoolArena$DirectArena.class
   872 Wed Oct 09 15:47:52 CST 2019 org/apache/pulsar/shade/io/netty/buffer/PoolArena$1.class
 21868 Wed Oct 09 15:47:52 CST 2019 org/apache/pulsar/shade/io/netty/buffer/PoolArena.class
  1318 Wed Oct 09 15:47:52 CST 2019 org/apache/pulsar/shade/io/netty/buffer/PoolArena$SizeClass.class
  3215 Wed Oct 09 15:47:52 CST 2019 org/apache/pulsar/shade/io/netty/buffer/PoolArena$HeapArena.class
  1504 Wed Oct 09 15:47:54 CST 2019 org/apache/pulsar/common/stats/AllocatorStats$PoolArenaStats.class

pulsar-flink remove subscription failed ,The client instance is closed

Describe the bug

flink run in yarn-cluster mode.

An exception occurs after the program has been running for some time.

2020-09-18 09:58:03,879 INFO org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader - Removing subscription flink-pulsar-3dec829b-e11a-4958-922f-e8ce41e4a823 from topic persistent://2000003485-lctest/monitor-uic/7bd0d02c2aa04cc48718ea86014d70ce
2020-09-18 09:58:03,890 INFO org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader - Successfully removed subscription flink-pulsar-3dec829b-e11a-4958-922f-e8ce41e4a823 from topic persistent://2000003485-lctest/monitor-uic/7bd0d02c2aa04cc48718ea86014d70ce
2020-09-18 09:58:03,890 INFO org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader - Removing subscription flink-pulsar-3dec829b-e11a-4958-922f-e8ce41e4a823 from topic persistent://2000003485-lctest/monitor-uic/42656da6941542c1b8d84f9a37f3a6ba
2020-09-18 09:58:03,899 INFO org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader - Successfully removed subscription flink-pulsar-3dec829b-e11a-4958-922f-e8ce41e4a823 from topic persistent://2000003485-lctest/monitor-uic/42656da6941542c1b8d84f9a37f3a6ba
2020-09-18 09:58:03,915 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Unnamed (1/1) (73d1f8a89159119536e25ab184ea5a83) switched from RUNNING to FAILED.
java.lang.RuntimeException: Failed to set up cursor for persistent://2000003485-lctest/monitor-uic/7bd0d02c2aa04cc48718ea86014d70ce
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.setupCursor(PulsarMetadataReader.java:234)
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher.createAndStartReaderThread(PulsarFetcher.java:531)
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher.runFetchLoop(PulsarFetcher.java:259)
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.runWithTopicsDiscovery(FlinkPulsarSource.java:514)
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.run(FlinkPulsarSource.java:473)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: org.apache.pulsar.client.admin.PulsarAdminException$ConflictException: Subscription already exists for topic
at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:218)
at org.apache.pulsar.client.admin.internal.TopicsImpl.createSubscription(TopicsImpl.java:820)
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.setupCursor(PulsarMetadataReader.java:230)
... 7 more
Caused by: org.apache.pulsar.shade.javax.ws.rs.ClientErrorException: HTTP 409 Conflict
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.createExceptionForFamily(JerseyInvocation.java:1122)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:1105)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.translate(JerseyInvocation.java:883)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:767)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:316)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:298)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:229)
at org.apache.pulsar.shade.org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:414)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:765)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:456)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$Builder.put(JerseyInvocation.java:340)
at org.apache.pulsar.client.admin.internal.TopicsImpl.createSubscription(TopicsImpl.java:818)
... 8 more
2020-09-18 09:58:03,916 INFO org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader - Removing subscription flink-pulsar-3dec829b-e11a-4958-922f-e8ce41e4a823 from topic persistent://2000003485-lctest/monitor-uic/7bd0d02c2aa04cc48718ea86014d70ce
2020-09-18 09:58:03,916 ERROR org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource - Failed to cancel the Pulsar Fetcher java.lang.RuntimeException: Failed to remove cursor for persistent://2000003485-lctest/monitor-uic/7bd0d02c2aa04cc48718ea86014d70ce
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.removeCursor(PulsarMetadataReader.java:272)
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher.cancel(PulsarFetcher.java:418)
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.cancel(FlinkPulsarSource.java:593)
at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
at org.apache.flink.runtime.taskmanager.Task.cancelInvokable(Task.java:1216)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:775)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Client instance has been closed.
at org.apache.pulsar.shade.org.glassfish.jersey.internal.guava.Preconditions.checkState(Preconditions.java:169)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyClient.checkNotClosed(JerseyClient.java:327)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.checkNotClosed(JerseyWebTarget.java:140)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.path(JerseyWebTarget.java:151)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.path(JerseyWebTarget.java:60)
at org.apache.pulsar.client.admin.internal.TopicsImpl.topicPath(TopicsImpl.java:962)
at org.apache.pulsar.client.admin.internal.TopicsImpl.deleteSubscriptionAsync(TopicsImpl.java:640)
at org.apache.pulsar.client.admin.internal.TopicsImpl.deleteSubscription(TopicsImpl.java:625)
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.removeCursor(PulsarMetadataReader.java:265)
... 9 more

2020-09-18 09:58:03,917 ERROR org.apache.flink.runtime.taskmanager.Task - Error while canceling task Source: Custom Source -> Sink: Unnamed (1/1).
java.lang.RuntimeException: java.lang.RuntimeException: Failed to remove cursor for persistent://2000003485-lctest/monitor-uic/7bd0d02c2aa04cc48718ea86014d70ce
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.cancel(FlinkPulsarSource.java:596)
at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
at org.apache.flink.runtime.taskmanager.Task.cancelInvokable(Task.java:1216)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:775)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to remove cursor for persistent://2000003485-lctest/monitor-uic/7bd0d02c2aa04cc48718ea86014d70ce
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.removeCursor(PulsarMetadataReader.java:272)
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher.cancel(PulsarFetcher.java:418)
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.cancel(FlinkPulsarSource.java:593)
... 7 more
Caused by: java.lang.IllegalStateException: Client instance has been closed.
at org.apache.pulsar.shade.org.glassfish.jersey.internal.guava.Preconditions.checkState(Preconditions.java:169)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyClient.checkNotClosed(JerseyClient.java:327)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.checkNotClosed(JerseyWebTarget.java:140)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.path(JerseyWebTarget.java:151)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.path(JerseyWebTarget.java:60)
at org.apache.pulsar.client.admin.internal.TopicsImpl.topicPath(TopicsImpl.java:962)
at org.apache.pulsar.client.admin.internal.TopicsImpl.deleteSubscriptionAsync(TopicsImpl.java:640)
at org.apache.pulsar.client.admin.internal.TopicsImpl.deleteSubscription(TopicsImpl.java:625)
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.removeCursor(PulsarMetadataReader.java:265)

pulsar-flink.log

[BUG] Pulsar Option failOnDataLoss parameter not working as expected

Describe the bug
A clear and concise description of what the bug is.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[BUG] DeserializationSchema::isEndOfStream does not terminate FlinkPulsarSource

Describe the bug
When using a Flink DeserializationSchema that has overridden the isEndOfStream method, the source does not terminate when isEndOfStream evaluates to true.

To Reproduce
Steps to reproduce the behavior:

  1. Create a DeserializationSchema that returns true for isEndOfStream
  2. Use FlinkPulsarSource with the schema
  3. Stream continues indefinitely

Expected behavior
The FlinkPulsarSource closes with a maximum timestamp to flush downstream nodes.

Additional context
I am currently attempting to use this in a very simple flink job to export event-time-bounded selections data from pulsar into an S3 bucket. The job will not finalize the file till the stream terminates. If I manually cancel the stream the data shows up in s3.

[BUG] Fix CVE issues

Priority Category FileName
Critical CWE-119 CWE-119 Improper Restriction of Operations within the Bounds of a Memory Buffer: CVE-2020-11612 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-transport-native-epoll:4.1.45.Final)
Critical CWE-119 CWE-119 Improper Restriction of Operations within the Bounds of a Memory Buffer: CVE-2020-11612 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-transport-native-unix-common:4.1.45.Final)
Critical CWE-119 CWE-119 Improper Restriction of Operations within the Bounds of a Memory Buffer: CVE-2020-11612 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-transport:4.1.45.Final)
Critical CWE-119 CWE-119 Improper Restriction of Operations within the Bounds of a Memory Buffer: CVE-2020-11612 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-codec-dns:4.1.45.Final)
Critical CWE-119 CWE-119 Improper Restriction of Operations within the Bounds of a Memory Buffer: CVE-2020-11612 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-buffer:4.1.45.Final)
Critical CWE-119 CWE-119 Improper Restriction of Operations within the Bounds of a Memory Buffer: CVE-2020-11612 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-codec:4.1.45.Final)
Critical CWE-119 CWE-119 Improper Restriction of Operations within the Bounds of a Memory Buffer: CVE-2020-11612 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-handler:4.1.45.Final)
Critical CWE-119 CWE-119 Improper Restriction of Operations within the Bounds of a Memory Buffer: CVE-2020-11612 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-resolver:4.1.45.Final)
Critical CWE-119 CWE-119 Improper Restriction of Operations within the Bounds of a Memory Buffer: CVE-2020-11612 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-common:4.1.45.Final)
Critical CWE-119 CWE-119 Improper Restriction of Operations within the Bounds of a Memory Buffer: CVE-2020-11612 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-codec-http:4.1.45.Final)
Critical CWE-119 CWE-119 Improper Restriction of Operations within the Bounds of a Memory Buffer: CVE-2020-11612 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-resolver-dns:4.1.45.Final)
High CWE-20 Improper Input Validation: CVE-2015-2156 pulsar-flink-connector_2.11-2.4.12.jar (shaded: com.typesafe.netty:netty-reactive-streams:2.0.0)
High CWE-20 Improper Input Validation: CVE-2015-2156 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-tcnative-boringssl-static:2.0.26.Final)
Medium CWE-444 Inconsistent Interpretation of HTTP Requests ('HTTP Request Smuggling'): CVE-2019-20445 pulsar-flink-connector_2.11-2.4.12.jar (shaded: com.typesafe.netty:netty-reactive-streams:2.0.0)
Medium CWE-444 Inconsistent Interpretation of HTTP Requests ('HTTP Request Smuggling'): CVE-2019-20444 pulsar-flink-connector_2.11-2.4.12.jar (shaded: com.typesafe.netty:netty-reactive-streams:2.0.0)
Medium CWE-444 Inconsistent Interpretation of HTTP Requests ('HTTP Request Smuggling'): CVE-2019-16869 pulsar-flink-connector_2.11-2.4.12.jar (shaded: com.typesafe.netty:netty-reactive-streams:2.0.0)
Medium CWE-119 Improper Restriction of Operations within the Bounds of a Memory Buffer: CVE-2014-3488 pulsar-flink-connector_2.11-2.4.12.jar (shaded: com.typesafe.netty:netty-reactive-streams:2.0.0)
Medium CWE-444 Inconsistent Interpretation of HTTP Requests ('HTTP Request Smuggling'): CVE-2019-20445 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-tcnative-boringssl-static:2.0.26.Final)
Medium CWE-444 Inconsistent Interpretation of HTTP Requests ('HTTP Request Smuggling'): CVE-2019-20444 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-tcnative-boringssl-static:2.0.26.Final)
Medium CWE-444 Inconsistent Interpretation of HTTP Requests ('HTTP Request Smuggling'): CVE-2019-16869 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-tcnative-boringssl-static:2.0.26.Final)
Medium CWE-119 Improper Restriction of Operations within the Bounds of a Memory Buffer: CVE-2014-3488 pulsar-flink-connector_2.11-2.4.12.jar (shaded: io.netty:netty-tcnative-boringssl-static:2.0.26.Final)

extract-commom

Is your feature request related to a problem? Please describe.
To support two flink versions, two modules are used, and their common parts are not extracted, which leads to the complexity of Pulsar's new feature support.

Describe the solution you'd like
Extract the common part and add the common module.

Describe alternatives you've considered
small steps and gradually complete the extraction of the public part

Additional context

see support Flink 1.11 and using flink-sql DDL to operate with pulsar

Cannot FlinkPulsarSource specify SubscriptionType, or how???

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(10000)

val prop = new Properties()
prop.put("topic", "persistent://ts/ns/mpc_test")
prop.put("pulsar.reader.subscriptionName", "ts.ns-profile")

val clientConf = new ClientConfigurationData()
clientConf.setAuthPluginClassName("org.apache.pulsar.client.impl.auth.AuthenticationToken")
clientConf.setAuthParams("eDkaliAfeGMsevPF0km6c1B741gxAWT2hgc")
clientConf.setServiceUrl("pulsar://127.0.0.1:6650")

val source1 = new FlinkPulsarSource[String]("http://127.0.0.1:8080", clientConf, new SimpleStringSchema(), prop)
source1.setStartFromSubscription(prop.getProperty("pulsar.reader.subscriptionName", "ts.ns-profile"))
val dd = env.addSource(source1)

dd.print("pulsar:").setParallelism(1)

env.execute(this.getClass.getName)

[BUG] Tests fail with mismatched messageIds against pulsar 2.5.0

Describe the bug
When changing this code to run against pulsar 2.5.0, the tests fail. The main error from the tests looks like so:

java.lang.Exception: java.lang.IllegalStateException: Potential Data Loss in reading persistent://public/default/topic-53-partition-4: intended to start at 96:0:4:303, actually we get 96:0:4:0Some data may have been lost because they are not available in Pulsar any more; either the
 data was aged out by Pulsar or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".

As can be seen, the messageIds almost match, just the batchIndex is getting lost. It would appear that when we are creating new readers, the broker is responding with a subscription from the beginning of the batch, not at the requested batchIndex.

It isn't clear if this is intended behavior in 2.5.0 or if this would is a regression in Pulsar. If this is intended, then this code needs to change to either skip or call seek on the reader again to a more specific position

To Reproduce
Steps to reproduce the behavior:

  1. Change the pulsar version to 2.5.0 and the test framework to 2.5.1
  2. Run the tests
  3. Observe the error

Expected behavior
Tests should pass

Sink data with JSON schema

I use FlinkPulsarSink to sink my object into pulsar topic. however they are in AVRO schema.
Anyway to set them to JSON schema programatically?
Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.