Giter Club home page Giter Club logo

Comments (3)

hangc0276 avatar hangc0276 commented on July 18, 2024 1

@hangc0276 Thanks for rising up this issue. Would you like to contribute to this feature?

Yes, i have developed the first version and run on my gray env. I will contribute this feature.
Flink 1.11 have changed some interface, maybe incompatible with Flink 1.9.1.

from pulsar-flink.

jiazhai avatar jiazhai commented on July 18, 2024

@hangc0276 Thanks for rising up this issue. Would you like to contribute to this feature?

from pulsar-flink.

hangc0276 avatar hangc0276 commented on July 18, 2024
set global.disable.operator.chain = true;

create table test_flink_sql(
	`rip` VARCHAR,
	`rtime`	VARCHAR,
	`uid`	bigint,
  	`rchannel`	VARCHAR,
  	`be_time`	bigint,
  	`be_time`	VARCHAR,
	`activity_id` VARCHAR,
	`country_code`	VARCHAR,
	`os`	VARCHAR,
	`recv_time`	bigint,
	`remark`	VARCHAR,
	`client_ip`	VARCHAR,
	`day` as TO_DATE(rtime),
	`hour` as date_format(rtime, 'HH')
) with (
	'connector.type' = 'pulsar',
	'connector.version' = '1',
	'connector.topic' = 'persistent://test/test-gray/test_flink_sql',
  	'connector.service-url' = 'pulsar://xxx',
  	'connector.admin-url' = 'http://xxx',
	'connector.startup-mode' = 'external-subscription',
	'connector.sub-name' = 'test_flink_sql_v1',
	'connector.properties.0.key' = 'pulsar.reader.readerName',
	'connector.properties.0.value' = 'test_flink_sql_v1',
	'connector.properties.1.key' = 'pulsar.reader.subscriptionRolePrefix',
	'connector.properties.1.value' = 'test_flink_sql_v1',
	'connector.properties.2.key' = 'pulsar.reader.receiverQueueSize',
	'connector.properties.2.value' = '1000',
	'connector.properties.3.key' = 'partitiondiscoveryintervalmillis',
	'connector.properties.3.value' = '5000',
	'format.type' = 'json',
	'format.derive-schema' = 'true',
	'format.ignore-parse-errors' = 'true',
    'update-mode' = 'append'
);

insert into hive.test.test_flink_sql
select
rip, rtime, 
if (uid is null, 0, uid) as uid,
if (activity_id is null, '', activity_id) as activity_id,
if (country_code is null, '', country_code) as country_code,
if (os is null, '', os) as os,
if (recv_time is null, 0, recv_time) as recv_time,
if (remark is null, '', remark) as remark,
if (client_ip is null, '', client_ip) as client_ip,
cast(`day` as string) as `day`, 
cast(`hour` as string) as `hour`
from test_flink_sql;

from pulsar-flink.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.