Giter Club home page Giter Club logo

flink-parquet-writer's People

Contributors

sjwiesman avatar

Stargazers

 avatar  avatar

Watchers

 avatar  avatar  avatar

Forkers

vaquarkhan

flink-parquet-writer's Issues

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

I am trying to consume avro generic records from kafka and producing parquet files and storing it to s3. but flink job is failing with the error:
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

flink version used: 1.16.3
java version used: 15
gradle version used: 6.4.1

my build.gradle file looks like this:
`
plugins {
id 'application'
}

repositories {
mavenCentral()
mavenLocal()
}

dependencies {
implementation group: 'org.apache.flink', name: 'flink-java', version: '1.16.0'
implementation group: 'org.apache.flink', name: 'flink-streaming-java', version: '1.16.0'
implementation group: 'org.apache.flink', name: 'flink-clients', version: '1.16.0'
implementation group: 'org.apache.flink', name: 'flink-connector-kafka', version: '1.16.0'
implementation group: 'org.apache.flink', name: 'flink-avro', version: '1.16.0'
implementation group: 'org.apache.avro', name: 'avro', version: '1.6.0'
implementation group: 'software.amazon.glue', name: 'schema-registry-flink-serde', version: '1.1.19'
implementation group: 'org.apache.flink', name: 'flink-parquet', version: '1.16.0'
implementation (group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0') {
exclude group: 'org.apache.hadoop', module: 'hadoop-client'
exclude group: 'it.unimi.dsi', module: 'fastutil'
}
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.8.3'
implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.29'

testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testImplementation group: 'junit', name: 'junit', version: '4.12'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.6.2'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.6.2'
testImplementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.36'

}

jar {
from {
configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
}
duplicatesStrategy(DuplicatesStrategy.EXCLUDE)
zip64=true
}

application {
mainClass = 'demo.App'
}

tasks.named('test') {
useJUnitPlatform()
}
`

i have also included necessary configuration in flink-config.yaml file
`
s3.access-key: AKIAUFA3CXECTWHS6VMA

s3.secret-key: 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

s3.endpoint.region: ap-south-1

fs.s3a.access.key: AKIAUFA3CXECTWHS6VMA

fs.s3a.secret.key: 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

fs.s3a.endpoint: s3.ap-south-1.amazonaws.com
`

Flink job code:
`
public class App {

private static Schema formSubmitSchema = new Schema.Parser().parse(formSubmitSchemaString);

private static String formSubmitSchemaString = "{\n" +
            "  \"type\": \"record\",\n" +
            "  \"name\": \"FormSubmitEvent\",\n" +
            "  \"fields\": [\n" +
            "    { \"name\": \"event\", \"type\": [\"null\", \"string\"] },\n" +
            "    { \"name\": \"tenantId\", \"type\": [\"null\", \"string\"] },\n" +
            "    { \"name\": \"formData\", \"type\": [\"null\", \"string\"] },\n" +
            "    { \"name\": \"formID\", \"type\": [\"null\", \"string\"] },\n" +
            "    { \"name\": \"startTime\", \"type\": [\"null\", \"string\"] },\n" +
            "    { \"name\": \"year\", \"type\": [\"null\", \"int\"] },\n" +
            "    { \"name\": \"month\", \"type\": [\"null\", \"int\"] },\n" +
            "    { \"name\": \"day\", \"type\": [\"null\", \"int\"] }\n" +
            "  ]\n" +
            "}\n";

private static Logger logger = LoggerFactory.getLogger(App.class);


@SuppressWarnings("deprecation")
public static void main(String[] args) {
    try {
        final StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("auto.offset.reset", "earliest");
        
        Map<String, Object> configs = new HashMap<>();
        configs.put(AWSSchemaRegistryConstants.AWS_REGION, "ap-south-1");
        configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
        configs.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "KafkaSchemaRegistry");
        configs.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "testSchema");
        configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
        
        FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
            "testTopic1",
            GlueSchemaRegistryAvroDeserializationSchema.forGeneric(formSubmitSchema, configs),
            properties);
        
        DataStream<GenericRecord> stream = see.addSource(consumer);

        Path outputBasePath = new Path("s3://qa-nsl-ge-data/topics/");

        final StreamingFileSink<GenericRecord> sink = StreamingFileSink
            .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(formSubmitSchema))
            .withBucketAssigner(new BucketAssigner<GenericRecord, String>() {
                
                @Override
                public String getBucketId(GenericRecord record, Context context) {
                    int year = (int) record.get("year");
                    int month = (int) record.get("month");
                    int day = (int) record.get("day");
                    logger.info("year: {}, month: {}, day: {}", year, month, day);
                    return String.format("year=%d/month=%d/day=%d", year, month, day);
                }

                @Override
                public SimpleVersionedSerializer<String> getSerializer() {
                    return SimpleVersionedStringSerializer.INSTANCE;
                }
            })
            .build();
        
        stream.print();
        logger.info("flink job for tagmanager is started");

        stream.addSink(sink);
        
        see.execute();
    } catch (Exception e) {
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

}
`

PLEASE HELP ME IN RESOLVING THIS ISSUE

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.