Giter Club home page Giter Club logo

4mc's Introduction

Build Status

4mc - Four More Compression

About

The 4MC (4 More Compression) is a library for hadoop providing a new splittable compressed file format (4mc) which lets you leverage the power of LZ4 and ZSTD algorithms. It's been designed to add more features to existing big data solutions like HADOOP/ElephantBird, addressing the following major points:

  • Performances: LZ4 isn't just the world's fastest compression algorithm, it can also achieve seamlessly much higher compression ratios, by using medium/high codecs, reaching ratios very near GZIP. It only costs some compression CPU time, since decompression remains identical, or speed up even more. Fresh born ZSTD achieves even better compression rates with amazing performances, it's just perfect for long-term big data.
  • Hadoop/EB: current state-of-the art solution needs external index file to be able to split and process in parallel the big files, leveraging local mappers. 4mc format has been designed for big data purpose, thus the block index is internal, there is no need for any external file or pre processing of input data: any 4mc/4mz file is ready for parallel processing.
  • Licensing: LZ4, ZSTD and 4mc licenses are BSD!

License

BSD 2-Clause License - http://www.opensource.org/licenses/bsd-license.php

4MC package content

4MC is composed by the following items, included in source code repository:

  • hadoop-4mc - java library to be used with hadoop to leverage 4mc format and LZ4 codecs
  • hadoop-4mc lib native - JNI bindings leveraging LZ4 compression/decompression (embedded in jar, no need to recompile)
  • 4mc - command line tool for compression/decompression of your files - written in C, working on Linux/MacOS/Windows (available in tool folder)

Compression speed and levels

4mc comes with 4 compression levels and 2 compression algorithms: 4mc format leverages the LZ4 standard library, while 4mz format leverages ZSTD library. Both 4mc command line tool and Java HADOOP classes do provide codecs for these 4 levels.

  • 4mc Fast (LZ4) Compression: default one, using LZ4 fast
  • 4mc Medium (LZ4) Compression: LZ4 MC
  • 4mc High (LZ4) Compression: LZ4 HC lvl 4
  • 4mc Ultra (LZ4) Compression: LZ4 HC lvl 8
  • 4mz Fast (zstd) Compression: ZSTD lvl 1
  • 4mz Medium (zstd) Compression: ZSTD lvl 3
  • 4mz High (zstd) Compression: ZSTD lvl 6
  • 4mz Ultra (zstd) Compression: ZSTD lvl 12

Bechmark with silesia on MacOS OSX El Captain - Intel(R) CPU 64bit @ 2.5GHz Core i7

 Algorithm      Compression Speed     Decompression Speed      Ratio
 ZSTD-Fast               225 MB/s                330 MB/s      2.873
 ZSTD-Medium             140 MB/s                301 MB/s      3.151
 ZSTD-High                62 MB/s                307 MB/s      3.341
 ZSTD-Ultra               16 MB/s                326 MB/s      3.529
 LZ4-Fast                270 MB/s                460 MB/s      2.084
 LZ4-Medium              135 MB/s                460 MB/s      2.340
 LZ4-High                 57 MB/s                495 MB/s      2.630
 LZ4-Ultra                31 MB/s                502 MB/s      2.716

Please note that 4mc/4mz compression codecs can be also used in any stage of the M/R as compression codecs. ZSTD is winning over LZ4 on almost all use cases, except for super real-time cases or near real-time cases where you are not needing long-term storage.

Releases and change history

Releases with artifacts available at https://github.com/fingltd/4mc/releases - Attached artifacts contain jar with embedded native library for Windows/Linux/MacOS. You can anyway compile JNI bindings for your own platform and override embedded ones. 4mc CLI tool for all platforms is now available at https://github.com/fingltd/4mc/tree/master/tool

  • 4mc 3.0.0 - Updated native libaries: LZ4 1.9.4 and ZSTD 1.5.2, package rename
  • 4mc 2.2.0 - Updated native libaries: LZ4 1.9.2 and ZSTD 1.4.4
  • 4mc 2.1.0 - Compatibility with newer Hadoop (2.7.x) and Spark (2.4.3)
  • 4mc 2.0.0 - 4mz to support ZSTD (zstandard https://github.com/facebook/zstd)
  • 4mc 1.4.0 - Native libraries are now embedded in jar, thus hadoop-4mc library can be used w/o manual configurations on Hadoop/Spark/Flink/etc
  • 4mc 1.3.0 - Introduced direct buffers pool, to cope with "java.lang.OufOfMemoryError: Direct Buffer Memory"
  • 4mc 1.1.0 - Support both of hadoop-1 and hadoop-2
  • 4mc 1.0.0 - Very first version of 4mc

Build

  • Native: 4mc command line tool and hadoop-4mc native library for JNI codecs Makefile is provided for unix/linux/mac; also cmake can be used (best choice on Windows).

  • Java: hadoop-4mc library for hadoop can be built with maven, using provided pom.

  • Java Native: see above, make sure JAVA_HOME is set.

Hadoop configuration

You only have to make sure that your jobs depends on hadoop-4mc jar and they bring it and set it as shared lib needed for cluster execution. Enabling codecs has no difference from usual, i.e. by adding them to configuration xml (core-site.xml):

	<property>
        <name>io.compression.codecs</name>
        <value>
			<!-- standard and lzo codecs -->
			org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,
			com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,
			<!-- 4mc codecs -->
			com.fing.compression.fourmc.Lz4Codec,com.fing.compression.fourmc.Lz4MediumCodec,com.fing.compression.fourmc.Lz4HighCodec,com.fing.compression.fourmc.Lz4UltraCodec,
			com.fing.compression.fourmc.FourMcCodec,com.fing.compression.fourmc.FourMcMediumCodec,com.fing.compression.fourmc.FourMcHighCodec,com.fing.compression.fourmc.FourMcUltraCodec,
      <!-- 4mz codecs -->
      com.fing.compression.fourmc.FourMzCodec,com.fing.compression.fourmc.FourMzMediumCodec,com.fing.compression.fourmc.FourMzHighCodec,com.fing.compression.fourmc.FourMzUltraCodec
		</value>
    </property>

Please note that snippet above enables all codecs provided in the library, as follows:

  • 4mc codecs to read and write splittable LZ4 compressed files: FourMcCodec FourMcMediumCodec FourMcHighCodec FourMcUltraCodec
  • 4mz codecs to read and write splittable ZSTD compressed files: FourMzCodec FourMzMediumCodec FourMzHighCodec FourMzUltraCodec
  • straight LZ4 codecs usable in your intermediate job outputs or as alternate compression for your solution (e.g. in SequenceFile): Lz4Codec Lz4MediumCodec Lz4HighCodec Lz4UltraCodec
  • straight ZSTD codecs usable in your intermediate job outputs or as alternate compression for your solution (e.g. in SequenceFile): ZstdCodec ZstdMediumCodec ZstdHighCodec ZstdUltraCodec

Why so many different codecs and not usual single one reading level from config? The aim here is to have by all means a way to programmatically tune your M/R engine at any stage. E.g. use case: M/R job willing to have a fast/medium codec as intermediate map output, and then high codec in output, as data is going to be kept for long time. Please remember once again that compression level in both ZSTD and LZ4 is seamless to the decompressor and the more you compress the data not only affects the output size but also the decompressor speed, as it gets even faster.

Java examples

The maven module examples is a separate module providing several usage examples with hadoop Map/Reduce and also with Spark. Flink examples will be added soon, but it's straightforward like Spark. As you can see in the examples, 4mc can be used with text input/output but also it can leverge ElephantBird framework to process protobuf encoded binary data.

PySpark Example

Use sc.newAPIHadoopFile to load your data. This will leverage the splittable feature of 4mc and load your data into many partitions.

filepath = 'gs://data/foo.4mc'

# This will read the file and partition it as it loads
data = sc.newAPIHadoopFile(
    filepath
,   'com.fing.mapreduce.FourMcTextInputFormat'
,   'org.apache.hadoop.io.LongWritable'
,   'org.apache.hadoop.io.Text'
)
data.getNumPartitions()
# -> 24

# This is what the RDD looks like after it's loaded
data.take(1)
# -> [(0, 'first line')]

You may use sc.textFile or any other method to load the data. However, the data will be loaded in one partition only.

data = sc.textFile(filepath)
data.getNumPartitions()
# -> 1

How To Contribute

Bug fixes, features, and documentation improvements are welcome!

Contributors

Major contributors are listed below.

  • Carlo Medas - author of the 4mc format and library
  • Yann Collet - mentor, author of LZ4 and ZSTD compression libraries

4mc's People

Contributors

advancedxy avatar ankurbarua avatar carlo-medas avatar carlomedas avatar dependabot[bot] avatar jordiolivares avatar mikcox avatar noodlesbad avatar scip88 avatar snoe925 avatar surjikal avatar tommaso-latini avatar trixpan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

4mc's Issues

null pointer exception

Not sure if this is the right place to report such issues. I just installed the hadoop-4mc-2.1.0 to my Hadoop 2.7.7 cluster, but get a null pointer exception when it is apparently trying to decompress the input for a streaming job. The same job with the uncompressed input foo.txt runs fine.

foo.txt.4mc.gz

packageJobJar: [/tmp/hadoop-unjar2959120489988360905/] [] /tmp/streamjob6915837012525200457.jar tmpDir=null
19/08/20 17:46:36 INFO impl.TimelineClientImpl: Timeline service address: ...
19/08/20 17:46:36 INFO client.RMProxy: Connecting to ResourceManager at ...
19/08/20 17:46:36 INFO client.AHSProxy: Connecting to Application History server at ...
19/08/20 17:46:36 INFO impl.TimelineClientImpl: Timeline service address: ...
19/08/20 17:46:36 INFO client.RMProxy: Connecting to ResourceManager at ...
19/08/20 17:46:36 INFO client.AHSProxy: Connecting to Application History server at ...
19/08/20 17:46:37 INFO fourmc.FourMcNativeCodeLoader: hadoop-4mc: loaded native library (embedded)
19/08/20 17:46:37 INFO fourmc.Lz4Codec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO fourmc.ZstdCodec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO fourmc.Lz4HighCodec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO fourmc.Lz4MediumCodec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO fourmc.Lz4UltraCodec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO mapred.FileInputFormat: Total input paths to process : 1
19/08/20 17:46:37 INFO mapreduce.JobSubmitter: number of splits:1
19/08/20 17:46:37 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1565620832498_0009
19/08/20 17:46:38 INFO impl.YarnClientImpl: Submitted application application_1565620832498_0009
19/08/20 17:46:38 INFO mapreduce.Job: The url to track the job: ...
19/08/20 17:46:38 INFO mapreduce.Job: Running job: job_1565620832498_0009
19/08/20 17:46:44 INFO mapreduce.Job: Job job_1565620832498_0009 running in uber mode : false
19/08/20 17:46:44 INFO mapreduce.Job:  map 0% reduce 0%
19/08/20 17:47:03 INFO mapreduce.Job:  map 67% reduce 0%
19/08/20 17:47:04 INFO mapreduce.Job: Task Id : attempt_1565620832498_0009_m_000000_0, Status : FAILED
Error: java.lang.NullPointerException
	at com.hadoop.compression.fourmc.Lz4Decompressor.reset(Lz4Decompressor.java:234)
	at org.apache.hadoop.io.compress.CodecPool.returnDecompressor(CodecPool.java:230)
	at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:288)
	at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.close(MapTask.java:210)
	at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:1979)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:468)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

What is the idea behind design of footer?

I've noticed that, to make stream splittable, we have to read footer section to determine block indexes.
But in some context, it is hard to retrieve the file length, thus cannot easily seek to a relative position of the end of file.
On the other hand, in these contexts, another field in HEADER which indicates the footer offset will be handy.

Can this be solved?

Problem with Java on windows7 64-bit

Hello, I load 2.0.0 version, rebuild it and try to use in my project. And I got following exception:

Exception in thread "Thread-1" java.lang.UnsatisfiedLinkError: C:\Users\User\AppData\Local\Temp\libhadoop-4mc4717395728020324256.dll: Can't load IA 32-bit .dll on a AMD 64-bit platform
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1937)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1822)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at com.hadoop.compression.fourmc.FourMcNativeCodeLoader.loadLibrary(FourMcNativeCodeLoader.java:140)
at com.hadoop.compression.fourmc.FourMcNativeCodeLoader.(FourMcNativeCodeLoader.java:166)

I check that temporary dll file is byte to byte equal to win32/amd64/libhadoop-4mc.dll

release-3.0.0 make error on centos7.

./native native code is compiled successfully with cmake but got error with just makefile.

  • use make command at native directory, got error msg
     /tmp/ccBlDcLt.o: In function `HUF_decompress4X2_usingDTable_internal_bmi2_asm':
     huf_decompress.c:(.text+0x2fba): undefined reference to `HUF_decompress4X2_usingDTable_internal_bmi2_asm_loop'
     /tmp/ccBlDcLt.o: In function `HUF_decompress4X1_usingDTable_internal_bmi2_asm':
     huf_decompress.c:(.text+0x3e29): undefined reference to `HUF_decompress4X1_usingDTable_internal_bmi2_asm_loop'
     /usr/bin/ld: libhadoop-4mc.so.2.0.0: hidden symbol `HUF_decompress4X1_usingDTable_internal_bmi2_asm_loop' isn't defined
     /usr/bin/ld: final link failed: Bad value
     collect2: error: ld returned 1 exit status
     make: *** [libhadoop-4mc] Error 1

NullPointerException after reading first file

Hello,

I'm trying to use hadoop-4mc with my AWS EMR cluster and am using a product called Hunk to interface with the cluster. Whenever I run a search job in Hunk, my results for the first file are returned fine but at the end of reading the first tile I get a NullPointerException with a stack trace as shown below.

Any idea what might be causing this? Let me know if you need any additional information.

(This is with Hadoop 2.7.2-amzn-3 and hadoop-4mc-1.4.0.)

2016-08-13 15:38:38,547 FATAL [IPC Server handler 44 on 44360] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1470333745198_0062_m_000000_1 - exited : java.lang.NullPointerException
at com.hadoop.compression.fourmc.Lz4Decompressor.reset(Lz4Decompressor.java:234)
at org.apache.hadoop.io.compress.CodecPool.returnDecompressor(CodecPool.java:224)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.close(LineRecordReader.java:235)
at com.splunk.mr.input.SplunkLineRecordReader.close(SplunkLineRecordReader.java:21)
at com.splunk.mr.SplunkBaseMapper$RecReader.close(SplunkBaseMapper.java:246)
at com.splunk.mr.SplunkBaseMapper.runImpl(SplunkBaseMapper.java:305)
at com.splunk.mr.SplunkSearchMapper.runImpl(SplunkSearchMapper.java:419)
at com.splunk.mr.SplunkBaseMapper.run(SplunkBaseMapper.java:164)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:796)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

DirectBuffer grows larger than available space.

When using this decompressor while decompressing the map outputs I run into problems with large files and it looks like the DirectBuffer grows too large. I have tried changing the maximum direct buffer size but it seems to require too large of a size to be practical ( -XX:MaxDirectMemorySize=2g didn't work and I stopped there). The following exception is thrown:

2015-01-21 16:54:39,137 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: Error while doing final merge 
    at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:160)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:658)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
    at com.hadoop.compression.fourmc.Lz4Decompressor.<init>(Lz4Decompressor.java:101)
    at com.hadoop.compression.fourmc.Lz4MediumCodec.createDecompressor(Lz4MediumCodec.java:156)
    at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:178)
    at org.apache.hadoop.mapred.IFile$Reader.<init>(IFile.java:345)
    at org.apache.hadoop.mapred.Merger$Segment.init(Merger.java:302)
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:634)
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:191)
    at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.finalMerge(MergeManagerImpl.java:796)
    at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.close(MergeManagerImpl.java:363)
    at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:158)
    ... 6 more

I am not sure if there is something that can be done on decompressor side to fix this or if I am just not setting the options correctly. Any help would be appreciated.

Update lz4 and zstd version?

Hi, I was looking for some backported codecs to hadoop 2.7 and your amazing work just did what I was looking for.

I noticed project versions are:
zstd: 1.0.1 (4 years old)
lz4: 1.3.0 (6 years old)

Would it be please possible to upgrade it?

I would help, but I am no expert in JNI.

Warning in pyspark: NullPointerException in RecordReader.close()

I compressed a file (~30mb, just for testing) using the 4mc tool:

$ 4mc data.txt 
Compressed filename will be : data.txt.4mc 
Compression: LZ4
Compressed (fast) 30288541 bytes into 2865501 bytes ==> 9.46% (Ratio=10.570)   

Then I tried to open the compressed file in (py)spark:

$ pyspark --master local[1]
>>> sc.textFile('file:///data.txt.4mc').count()
17/01/24 09:28:04 WARN org.apache.spark.rdd.HadoopRDD: Exception in RecordReader.close()
java.lang.NullPointerException
        at com.hadoop.compression.fourmc.Lz4Decompressor.reset(Lz4Decompressor.java:234)
        at org.apache.hadoop.io.compress.CodecPool.returnDecompressor(CodecPool.java:224)
        at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:288)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.close(HadoopRDD.scala:276)
        at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
        at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1953)
        at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
4634

I got the result, but it warned me about a null pointer exception. Thought you guys might want to know!

physical memory issues on EMR

@carlomedas maybe you can help me here ...

When I'm running my EMR mapreduce and want the output compressed with 4mz I get multiple errors like:
... is running beyond physical memory limits. Current usage: 4.6 GB of 4 GB physical memory used; 6.3 GB of 20 GB virtual memory used. Killing container. ...

I tried increasing the memory limit and I still get these errors (only later on the reducer processing).

Do you have an idea why the moment I started compressing with 4mz I started getting these errors? (When I compressed with lzo or bz I wasn't getting it)

Thanks!

TextInputFormat with EMR Streaming

Hey,

@carlomedas

Since EMR Streaming is using the old format of FileInputFormat class (required the old mapred package name), we can't find a way to read the compressed files within the EMR Streaming steps.

Is there a wrapper to FourMzTextInputFormat using the older api?

how to use 4mc tool for linux fs batch processing and logging

Having difficulty with using the command line tool (linux) to process a directory of uncompressed text files to produce a directory of compressed files with the stdout redirected to a log file.

4mc [input] seems fine with wildcards, however, i cannot get wildcards working with [output] names, and cannot get the stdout to log to a file.

4mc -vz2 ./*.txt ./* >> log.txt

It is good to have a command line tool to test, but seems very limited for doing batch workloads on the local filesystem before uploading hdfs. Was expecting bash or gzip style basic input output log operations would work using a similar command syntax. If these operations are supported, could some documentation be added to describe the syntax, ideally in the -h help

Issue creating output stream

I am using ZstdCodec as follows:
given an existing (uncompressed) outputStream:
new ZstdCodec().createOutputStream(outputStream);

However, this line runs into:
cannot access org.apache.hadoop.conf.Configurable
class file for org.apache.hadoop.conf.Configurable not found

Create DataSet using SparkSession

Hi,
We have 4mc format files in my Hadoop cluster. We are trying to read these files and create DataSet (instead of creating RDD and then DataSet) in spark-2.0. Can you please us to do the same?

Decompressing from stdin

4mc is able to compress from stdin, like this:
cat filename | ./4mc
Maybe I have just not specified necessary flag, but I have not figured out, how to decompress from stdin:
cat filename | ./4mc -d returns help.
It would be very convinient and i hope we will see this feature soon.

How to use 4mc in Hive?

I wanna create a hive table on a compressed file. when reading this file, it will give it to multiple mappers rather than 1 only if the compressed file is splittable.
How to do this with 4mc? Only to change the mapreduce api into old ones which is supported in Hive?

Request: upload maven artifact for java library

I would like to use the hadoop-4mc library in a gradle project, but there is no artifact in maven central or any other repository. In addition, jitpack.io cannot grab the information to auto-generate an artifact because there is no build file (pom.xml or gradle.build) because this is a mixed repository (see https://jitpack.io/#carlomedas/4mc logs).

Would it be possible to upload an artifact for every release? Thank you in advance!

4mc codecs should implement SplittableCompressionCodec

The implementation of Codec and InputFormat seems to follow the pattern from Elephantbird. However, this isn't a good pattern in my opinion. In the spirit of Hadoop, the concept of compression and file format should be decoupled. We should be able to change compression formats without needed to change the way those files are read.

Currently, if we change the compression from e.g. gz to 4mc, we need to change the InputFormat that is used to read the files, and we wouldn't be able to change the compression again. To do this gracefully, we would need to code defensively and dynamically change the InputFormats based on what files are in the input location. I don't think this strategy would work if you have a directory that has files that have been compressed with different formats.

In order to support this type of flexibility, the 4mc codecs should implement the SplittableCompressionCodec interface. This provides existing formats the ability to gracefully handle the new compression formats.

Can't load AMD 64-bit .so on a AARCH64-bit platform

I am trying to use 4mc compression in AWS graviton machines which are powered by an aarch64 bit platform.

I tried to build the native libraries but ran into some issues. So wondering if we have any plans to support the aarch64 platform.

PySpark Example doesn't work

I tried the example from the doc
I ran the pyspark with the flag --jars hadoop-4mc-2.0.0.jar and I got this exception:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.io.IOException: Codec for file file:/media/netanel/Windows/Documents and Settings/Netanel/Downloads/people_json.4mc not found, cannot run
	at com.hadoop.mapreduce.FourMcLineRecordReader.initialize(FourMcLineRecordReader.java:134)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.liftedTree1$1(NewHadoopRDD.scala:182)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:179)
	at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:134)
	at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
	at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:203)
	at org.apache.spark.api.python.PythonRDD$.newAPIHadoopFile(PythonRDD.scala:561)
	at org.apache.spark.api.python.PythonRDD.newAPIHadoopFile(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Codec for file file:/media/netanel/Windows/Documents and Settings/Netanel/Downloads/people_json.4mc not found, cannot run
	at com.hadoop.mapreduce.FourMcLineRecordReader.initialize(FourMcLineRecordReader.java:134)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.liftedTree1$1(NewHadoopRDD.scala:182)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:179)
	at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:134)
	at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

Can not specify output file/folder name using command line tool (darwin)

These commands are executed back to back on the same input file, it only works when I do not specify any output file at all

UK-HAV-M4NAG8WN:zstd-fast yucelm01$ 4mc -d split_0000.4mz /tmp/delete/split_0000.json
Unrecognized header : not a 4mc file
UK-HAV-M4NAG8WN:zstd-fast yucelm01$ 4mc -d split_0000.4mz /tmp/delete/split_0000
Unrecognized header : not a 4mc file
UK-HAV-M4NAG8WN:zstd-fast yucelm01$ 4mc -d split_0000.4mz /tmp/delete/split_0000.4mc
Unrecognized header : not a 4mc file
UK-HAV-M4NAG8WN:zstd-fast yucelm01$ 4mc -d split_0000.4mz /tmp/delete
Warning : /tmp/delete already exists
Overwrite ? (Y/N) : y
Cannot open output file: /tmp/delete
UK-HAV-M4NAG8WN:zstd-fast yucelm01$ 4mc -d split_0000.4mz
Decoding file split_0000 
Compression: ZSTD
Successfully decoded 32517745 bytes                                            
UK-HAV-M4NAG8WN:zstd-fast yucelm01$

edit:
also it fails with the same error if I have a dot(.) in the filename anywhere like:
4mc -d split_0000.json.4mz

merge to hadoop?

I'm surprized it's not yet part of Apache Hadoop project :)
LZO is a pain to index. Plus has some licensing issues.
Great project.

hadoop-4mc with AWS EMR?

Hey Carlo!

Great work on this repository; I'm very excited about the potential.

I'm spinning up some AWS EMR clusters for a production workflow and I'm hoping to incorporate your compression. I'm curious about whether or not there would be an easy way to configure AWS EMR to pull in the hadoop-4mc library when it's started, since it'll be a bit of a pain to go in after the fact and install the library across the cluster.

Do you have any advice or suggestions for how to implement hadoop-4mc on an AWS EMR cluster, and if so could you add it to your documentation?

Cheers and thanks in advance!

Fix Typo in FourMzTextInputFormat

Hi, there's a typo in FourMzTextInputFormat where it extends from the wrong class: Mc should be Mz.

#27 Fixes this problem, but it hasn't been merged yet for some reason since October. There's also the issue of the wrong extends:

  • FourMcTextInputFormat should extend from FourMcInputFormat<LongWritable, Text> instead of just FourMcInputFormat.
  • Same thing applies to FourMzTextInputFormat.

very curious about the design principles

I'm using lzo format in Hadoop, and I need to create and read a index file to support splittable which may cause some bizarre problems.
I have look at the format of lz4 and lzo, find they are very similar. And I don't have a clue about how you make lz4 support splittable.
Could you give me a short brief how you did it?
Thank you.

how to use 4mc/4mz when write to json or parquet

I am able to load json files compressed by 4mz/c into spark. But writing does not work. Is this expected?

In [53]: df.write.mode('overwrite').option("codec","com.hadoop.compression.fourmc.ZstdCodec").csv('foo')

IllegalArgumentException: 'Codec [com.hadoop.compression.fourmc.ZstdCodec] is not available. Known codecs are bzip2, deflate, uncompressed, lz4, gzip, snappy, none.'

Can't make it work on HDP 2.4.0.

Hello, I try to use 4mc codecs. I copied .so libraries and .jar file to my ../lib and /lib/native on all nodes. I also set core-site.xml on all nodes. See below. Other codecs residing in this paths work well unless fourmc. Is there something I forgot do do? Looks like Hadoop cannot find where classes are located? Running HDP 2.4.0.

I did not compile by myself, just downloaded .so and .jar file. Excuse my naivity, I am new to this.

Any help appreciated. Many thanks!

cp 4mc-master/java/hadoop-4mc/src/main/resources/com/hadoop/compression/fourmc/linux/amd64/libhadoop-4mc.so /usr/hdp/2.4.0.0-169/hadoop/lib/native
cp 4mc/hadoop-4mc-2.0.0.jar /usr/hdp/2.4.0.0-169/hadoop/lib

<name>io.compression.codecs</name> <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,com.hadoop.compression.fourmc.Lz4Codec,com.hadoop.compression.fourmc.Lz4MediumCodec,com.hadoop.compression.fourmc.Lz4HighCodec,com.hadoop.compression.fourmc.Lz4UltraCodec,com.hadoop.compression.fourmc.FourMcCodec,com.hadoop.compression.fourmc.FourMcMediumCodec,com.hadoop.compression.fourmc.FourMcHighCodec,com.hadoop.compression.fourmc.FourMcUltraCodec,com.hadoop.compression.fourmc.FourMzCodec,com.hadoop.compression.fourmc.FourMzMediumCodec,com.hadoop.compression.fourmc.FourMzHighCodec,com.hadoop.compression.fourmc.FourMzUltraCodec</value> </property>

When trying to use codec within sqoop tool I see following output

`
17/03/27 15:41:58 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
17/03/27 15:41:58 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 0d76fa3d79ee46cc5bc1f2bdad607fb18978826d]
17/03/27 15:41:58 INFO fourmc.FourMcNativeCodeLoader: hadoop-4mc: loaded native library (embedded)
17/03/27 15:41:58 INFO fourmc.Lz4Codec: Successfully loaded & initialized native-4mc library
17/03/27 15:41:58 INFO fourmc.Lz4MediumCodec: Successfully loaded & initialized native-4mc library
17/03/27 15:41:58 INFO fourmc.Lz4HighCodec: Successfully loaded & initialized native-4mc library
17/03/27 15:41:58 INFO fourmc.Lz4UltraCodec: Successfully loaded & initialized native-4mc library
17/03/27 15:41:58 INFO fourmc.ZstdCodec: Successfully loaded & initialized native-4mc library

Error: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.fourmc.Lz4Codec was not found.
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCompressorClass(FileOutputFormat.java:122)
at org.apache.sqoop.mapreduce.RawKeyTextOutputFormat.getRecordWriter(RawKeyTextOutputFormat.java:89)
at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.(MapTask.java:647)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:767)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.fourmc.Lz4Codec not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCompressorClass(FileOutputFormat.java:119)
... 9 more
`

Jan

Consider changing license to Apache?

Great project !

Would you consider changing license to Apache?

There are some many use cases in Big Data ecosystem where most of the projects are Apache license based.

Thank you.

4mz - Zstd - cannot load csv.4mz into Spark (4mc works correctly)

Hello,

First of all thank you for your hard work delivering this solution - it is just fantastic.
Second - sorry for posting into issues but i did not find any other place to do so.
The issue I have is with loading properly compressed 4mz file (by properly compressed i mean one that gets properly decompressed by your 4mc.exe application, so this gives me confidence that i have created proper file) into Spark DataFrame.

The setup is Spark 2.4.3 for Hadoop 2.7.
4mc lattest version (2.0.0), the LD_LIBRARY_PATH set to proper folder containing hadoop native libraries.

The coode (Scala):
`
import com.hadoop.mapreduce.FourMcTextInputFormat
import com.hadoop.mapreduce.FourMzTextInputFormat
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import spark.implicits._

var conf = new org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration)
conf.set("textinputformat.record.delimiter", "\r\n")
conf.set("io.compression.codecs","org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.fourmc.Lz4Codec,com.hadoop.compression.fourmc.Lz4MediumCodec,com.hadoop.compression.fourmc.Lz4HighCodec,com.hadoop.compression.fourmc.Lz4UltraCodec,com.hadoop.compression.fourmc.FourMcCodec,com.hadoop.compression.fourmc.FourMcMediumCodec,com.hadoop.compression.fourmc.FourMcHighCodec,com.hadoop.compression.fourmc.FourMcUltraCodec,com.hadoop.compression.fourmc.FourMzCodec,com.hadoop.compression.fourmc.FourMzMediumCodec,com.hadoop.compression.fourmc.FourMzHighCodec,com.hadoop.compression.fourmc.FourMzUltraCodec")

var rdd = sc.newAPIHadoopFile("file:/home/me/part-m-20190605121832.4mz", classOf[FourMzTextInputFormat].asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[LongWritable, Text]]), classOf[LongWritable],classOf[Text], conf);
val dset = rdd.map(_._2.toString).toDS
val frame = spark.read.option("header", false).option("inferSchema",true).option("delimiter","\u0001").csv(dset)
frame.printSchema()
frame.show()`

Every time the rdd is empty, so frame is empty as well. No matter what i do it always ends up empty without any exception raised and without any debug / warning messages (when i set log level to DEBUG i get lots of messages from Spark but none from the 4mc saying anything wrong). Would you be so kind to suggest what am i doing wrong or what are the external references (native libraries or sth?) that i have set faulty?

The code for 4mc compression works flawlessly and it is almost the same (different imput file of course and different TextImputFormat):

`import com.hadoop.mapreduce.FourMcTextInputFormat
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import spark.implicits._

var conf = new org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration)
conf.set("textinputformat.record.delimiter", "\r\n")
conf.set("io.compression.codecs","org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.fourmc.Lz4Codec,com.hadoop.compression.fourmc.Lz4MediumCodec,com.hadoop.compression.fourmc.Lz4HighCodec,com.hadoop.compression.fourmc.Lz4UltraCodec,com.hadoop.compression.fourmc.FourMcCodec,com.hadoop.compression.fourmc.FourMcMediumCodec,com.hadoop.compression.fourmc.FourMcHighCodec,com.hadoop.compression.fourmc.FourMcUltraCodec,com.hadoop.compression.fourmc.FourMzCodec,com.hadoop.compression.fourmc.FourMzMediumCodec,com.hadoop.compression.fourmc.FourMzHighCodec,com.hadoop.compression.fourmc.FourMzUltraCodec")

var rdd = sc.newAPIHadoopFile("file:/home/me/part-m-20190605121825.4mc", classOf[FourMcTextInputFormat].asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[LongWritable, Text]]), classOf[LongWritable],classOf[Text], conf)
val dset = rdd.map(_._2.toString).toDS
val frame = spark.read.option("header", false).option("inferSchema",true).option("delimiter","\u0001").csv(dset)
frame.printSchema()
frame.show()
`

Thank you very much in advance.
Best regards.

AVRO support

Hi
Couple of doubts

  1. How to convert text file to 4mc file format in linux so that it can be used as input to my mapreduce program.
  2. Can 4mc commpression used on AVRO file format..?

Thanks

Kind Regards
Akhil Dogra

BUG: NPE when closing SequenceFile.Writer

version: 2.0.0
os: centos x64
hadoop: 2.7.1

scenario: Simply generate a compressed sequence file with FourMzUltraCodec.

Exception:

2017-06-16 22:32:48 [INFO ](c.h.c.f.FourMcNativeCodeLoader     :142) hadoop-4mc: loaded native library (embedded)
2017-06-16 22:32:48 [INFO ](c.h.c.f.ZstdCodec                  :84 ) Successfully loaded & initialized native-4mc library
2017-06-16 22:32:49 [INFO ](o.a.h.i.c.CodecPool                :153) Got brand-new compressor [.4mz]
Exception in thread "main" java.lang.NullPointerException
	at com.hadoop.compression.fourmc.ZstdCompressor.reset(ZstdCompressor.java:267)
	at org.apache.hadoop.io.compress.CodecPool.returnCompressor(CodecPool.java:204)
	at org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1275)
	at org.apache.hadoop.io.SequenceFile$BlockCompressWriter.close(SequenceFile.java:1504)
	at toolbox.analyzer2.TryFourMzUltra.run(TryFourMzUltra.java:44)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at toolbox.analyzer2.TryFourMzUltra.main(TryFourMzUltra.java:23)

my code

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.Tool;
import toolbox.analyzer2.util.debug.WrappedRunner;

/**
 * @author lisn
 */
public class TryFourMzUltra  extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(),  new TryFourMzUltra(), args);
    }

    @Override
    public int run(String[] args) throws Exception {

        CompressionCodec codec = new com.hadoop.compression.fourmc.FourMzUltraCodec();

        Configuration conf = getConf();
        FileSystem fs = FileSystem.get(conf);

        SequenceFile.Writer writer = SequenceFile.createWriter(conf,
                SequenceFile.Writer.file(new Path("/tmp/testFourMzUltra")),
                SequenceFile.Writer.keyClass(LongWritable.class),
                SequenceFile.Writer.valueClass(Text.class),
                SequenceFile.Writer.compression(
                        SequenceFile.CompressionType.BLOCK,
                        codec
                ));
        writer.append(new LongWritable(1), new Text("12341234"));
        writer.close();  //exception raise from here

        return 0;
    }
}

Commit 17c36e08653162f6956ef27f55c96c20425df4b8 breaks test units

testZstCodec(com.hadoop.compression.fourmc.TestFourMcCodec)  Time elapsed: 0.034 sec  <<< ERROR!
java.lang.UnsatisfiedLinkError: com.hadoop.compression.fourmc.zstd.Zstd.cStreamInSize()J
        at com.hadoop.compression.fourmc.zstd.Zstd.cStreamInSize(Native Method)
        at com.hadoop.compression.fourmc.zstd.ZstdStreamCompressor.<clinit>(ZstdStreamCompressor.java:66)
        at com.hadoop.compression.fourmc.ZstCodec.<clinit>(ZstCodec.java:69)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)

$ rpm -qa | grep zstd
libzstd-1.3.1-1.el7.x86_64
zstd-1.3.1-1.el7.x86_64

FourMcOutputStream.compress doesn't handle incompressible data correctly

Suppose input is a 4MB buffer and is incompressible. The corresponding compressed size would be lz4_compressBound(4MB) > 4MB.

FourMcOutputStream.compress() will trigger the Lz4Compressor to compress input. And returns 4MB in

        int len = compressor.compress(buffer, 0, buffer.length);

FourMcOutputStream detects that it should use uncompressed data directly. After getting the raw uncompressed data, there is still compressed data left in compressor.compressedDirectBuf, thus the compressor.finished() returns false. Which triggers another call of FourMcOutputStream.compress() in FourMcOutputStream.finish(). It will try to get uncompressed again, which results an BufferUnderflowException.

    @Override
    public void finish() throws IOException {
        if (!compressor.finished()) {
            compressor.finish();
            while (!compressor.finished()) {
                compress();
            }
        }
    }

A quick fix to this problem would be allocating a big enough buffer when creating FourMcOutputStream. When the buffer is big enough to hold all the compressed data, line 196

        int len = compressor.compress(buffer, 0, buffer.length);

in FourMcOutputStream will transfer all the data in compressedDirectBuf in to buffer, thus set the compressor.finish() to true.

Fix would be as followed in FourCodec.java:

    @Override
    public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor) throws IOException {
        int bufferPlusOverhead = Lz4Compressor.compressBound(FOURMC_MAX_BLOCK_SIZE);
        return new FourMcOutputStream(out, compressor, bufferPlusOverhead);
    }

If you believe this fix is ok to accept, I would send a pr later. However, I do think there should be a better way to solve this problem as we really don't need a buffer that's larger than 4MB.

why is FourMcTextInputFormat not an InputFormat?

So I try to run a streaming job with a .4mc compressed input like

> hadoop jar $hadoop_streaming_jar -libjars $hadoop_4mc_jar -input txt.4mc -inputformat com.hadoop.mapreduce.FourMcTextInputFormat -output /test/out -mapper mapper.sh -reducer reducer.sh

and get an error:

Exception in thread "main" java.lang.RuntimeException: class com.hadoop.mapreduce.FourMcTextInputFormat not org.apache.hadoop.mapred.InputFormat

But

  [4mc-2.2.0]$ grep extends java/hadoop-4mc/src/main/java/com/hadoop/mapreduce/FourMcInputFormat.java
   public abstract class FourMcInputFormat<K, V> extends FileInputFormat<K, V> {

which as per the API doc in turn extends InputFormat:

  org.apache.hadoop.mapred
  Class FileInputFormat<K,V>
  java.lang.Object
  org.apache.hadoop.mapred.FileInputFormat<K,V>
  All Implemented Interfaces:
  InputFormat<K,V>

So why does this not work? Is this not supposed to work?

Ideas to improve the handling of direct buffers

This is a continuation of the discussion which started on a LZ4 issue.

To summarize, I proposed:

  1. dispensing with the direct buffer on the input side because the copying from byte[] to natively accessible memory is inevitable either way;
  2. using direct, but non-public, JDK API call to free the direct buffer's memory.

@carlomedas stated that the second approach still suffered from OOMEs. The main point with this approach is that there must be a clear demarcation of the direct buffer's lifecycle. Usually a job of some kind is started, causing the allocation of the direct buffer, and later the job is ended, providing the opportunity to clean up. If this aspect is missing and the only cleanup mechanism is the finalizer method, then this approach will be of no use. On the other hand, invoking cleaner.clean() will, to the best of my knowledge, unconditionally free the native memory block.

4mc files not splitting

I am running Hadoop 2.4.1. I run my jobs through mrjob (if that matters). When I run against an uncompressed file, splits happen and I automatically have more mappers than files. However when I run against *.4mc files no splitting occurs. Running hadoop fs -text file.4mc works so I know it's decompressing okay and running a job against *.4mc files works just no splitting occurs.

One other thing I noticed is that if I use the files with the .lz4_uc extension hadoop fs -text file.lz4_uc using I get the following error:

15/01/23 01:47:58 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
Exception in thread "main" java.lang.InternalError: LZ4_decompress_safe returned: -2

I am not sure if that's related or not.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.