flink-parquet-writer's People
Forkers
vaquarkhanflink-parquet-writer's Issues
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
I am trying to consume avro generic records from kafka and producing parquet files and storing it to s3. but flink job is failing with the error:
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
flink version used: 1.16.3
java version used: 15
gradle version used: 6.4.1
my build.gradle file looks like this:
`
plugins {
id 'application'
}
repositories {
mavenCentral()
mavenLocal()
}
dependencies {
implementation group: 'org.apache.flink', name: 'flink-java', version: '1.16.0'
implementation group: 'org.apache.flink', name: 'flink-streaming-java', version: '1.16.0'
implementation group: 'org.apache.flink', name: 'flink-clients', version: '1.16.0'
implementation group: 'org.apache.flink', name: 'flink-connector-kafka', version: '1.16.0'
implementation group: 'org.apache.flink', name: 'flink-avro', version: '1.16.0'
implementation group: 'org.apache.avro', name: 'avro', version: '1.6.0'
implementation group: 'software.amazon.glue', name: 'schema-registry-flink-serde', version: '1.1.19'
implementation group: 'org.apache.flink', name: 'flink-parquet', version: '1.16.0'
implementation (group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0') {
exclude group: 'org.apache.hadoop', module: 'hadoop-client'
exclude group: 'it.unimi.dsi', module: 'fastutil'
}
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.8.3'
implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.29'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testImplementation group: 'junit', name: 'junit', version: '4.12'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.6.2'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.6.2'
testImplementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.36'
}
jar {
from {
configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
}
duplicatesStrategy(DuplicatesStrategy.EXCLUDE)
zip64=true
}
application {
mainClass = 'demo.App'
}
tasks.named('test') {
useJUnitPlatform()
}
`
i have also included necessary configuration in flink-config.yaml file
`
s3.access-key: AKIAUFA3CXECTWHS6VMA
s3.secret-key: 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
s3.endpoint.region: ap-south-1
fs.s3a.access.key: AKIAUFA3CXECTWHS6VMA
fs.s3a.secret.key: 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
fs.s3a.endpoint: s3.ap-south-1.amazonaws.com
`
Flink job code:
`
public class App {
private static Schema formSubmitSchema = new Schema.Parser().parse(formSubmitSchemaString);
private static String formSubmitSchemaString = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"FormSubmitEvent\",\n" +
" \"fields\": [\n" +
" { \"name\": \"event\", \"type\": [\"null\", \"string\"] },\n" +
" { \"name\": \"tenantId\", \"type\": [\"null\", \"string\"] },\n" +
" { \"name\": \"formData\", \"type\": [\"null\", \"string\"] },\n" +
" { \"name\": \"formID\", \"type\": [\"null\", \"string\"] },\n" +
" { \"name\": \"startTime\", \"type\": [\"null\", \"string\"] },\n" +
" { \"name\": \"year\", \"type\": [\"null\", \"int\"] },\n" +
" { \"name\": \"month\", \"type\": [\"null\", \"int\"] },\n" +
" { \"name\": \"day\", \"type\": [\"null\", \"int\"] }\n" +
" ]\n" +
"}\n";
private static Logger logger = LoggerFactory.getLogger(App.class);
@SuppressWarnings("deprecation")
public static void main(String[] args) {
try {
final StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("auto.offset.reset", "earliest");
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "ap-south-1");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "KafkaSchemaRegistry");
configs.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "testSchema");
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
"testTopic1",
GlueSchemaRegistryAvroDeserializationSchema.forGeneric(formSubmitSchema, configs),
properties);
DataStream<GenericRecord> stream = see.addSource(consumer);
Path outputBasePath = new Path("s3://qa-nsl-ge-data/topics/");
final StreamingFileSink<GenericRecord> sink = StreamingFileSink
.forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(formSubmitSchema))
.withBucketAssigner(new BucketAssigner<GenericRecord, String>() {
@Override
public String getBucketId(GenericRecord record, Context context) {
int year = (int) record.get("year");
int month = (int) record.get("month");
int day = (int) record.get("day");
logger.info("year: {}, month: {}, day: {}", year, month, day);
return String.format("year=%d/month=%d/day=%d", year, month, day);
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
})
.build();
stream.print();
logger.info("flink job for tagmanager is started");
stream.addSink(sink);
see.execute();
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}
}
`
PLEASE HELP ME IN RESOLVING THIS ISSUE
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. ๐๐๐
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google โค๏ธ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.