fingltd / 4mc Goto Github PK
View Code? Open in Web Editor NEW4mc - splittable lz4 and zstd in hadoop/spark/flink
License: Other
4mc - splittable lz4 and zstd in hadoop/spark/flink
License: Other
These commands are executed back to back on the same input file, it only works when I do not specify any output file at all
UK-HAV-M4NAG8WN:zstd-fast yucelm01$ 4mc -d split_0000.4mz /tmp/delete/split_0000.json Unrecognized header : not a 4mc file UK-HAV-M4NAG8WN:zstd-fast yucelm01$ 4mc -d split_0000.4mz /tmp/delete/split_0000 Unrecognized header : not a 4mc file UK-HAV-M4NAG8WN:zstd-fast yucelm01$ 4mc -d split_0000.4mz /tmp/delete/split_0000.4mc Unrecognized header : not a 4mc file UK-HAV-M4NAG8WN:zstd-fast yucelm01$ 4mc -d split_0000.4mz /tmp/delete Warning : /tmp/delete already exists Overwrite ? (Y/N) : y Cannot open output file: /tmp/delete UK-HAV-M4NAG8WN:zstd-fast yucelm01$ 4mc -d split_0000.4mz Decoding file split_0000 Compression: ZSTD Successfully decoded 32517745 bytes UK-HAV-M4NAG8WN:zstd-fast yucelm01$
edit:
also it fails with the same error if I have a dot(.) in the filename anywhere like:
4mc -d split_0000.json.4mz
Hello,
I'm trying to use hadoop-4mc with my AWS EMR cluster and am using a product called Hunk to interface with the cluster. Whenever I run a search job in Hunk, my results for the first file are returned fine but at the end of reading the first tile I get a NullPointerException with a stack trace as shown below.
Any idea what might be causing this? Let me know if you need any additional information.
(This is with Hadoop 2.7.2-amzn-3 and hadoop-4mc-1.4.0.)
2016-08-13 15:38:38,547 FATAL [IPC Server handler 44 on 44360] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1470333745198_0062_m_000000_1 - exited : java.lang.NullPointerException
at com.hadoop.compression.fourmc.Lz4Decompressor.reset(Lz4Decompressor.java:234)
at org.apache.hadoop.io.compress.CodecPool.returnDecompressor(CodecPool.java:224)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.close(LineRecordReader.java:235)
at com.splunk.mr.input.SplunkLineRecordReader.close(SplunkLineRecordReader.java:21)
at com.splunk.mr.SplunkBaseMapper$RecReader.close(SplunkBaseMapper.java:246)
at com.splunk.mr.SplunkBaseMapper.runImpl(SplunkBaseMapper.java:305)
at com.splunk.mr.SplunkSearchMapper.runImpl(SplunkSearchMapper.java:419)
at com.splunk.mr.SplunkBaseMapper.run(SplunkBaseMapper.java:164)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:796)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Not sure if this is the right place to report such issues. I just installed the hadoop-4mc-2.1.0 to my Hadoop 2.7.7 cluster, but get a null pointer exception when it is apparently trying to decompress the input for a streaming job. The same job with the uncompressed input foo.txt runs fine.
packageJobJar: [/tmp/hadoop-unjar2959120489988360905/] [] /tmp/streamjob6915837012525200457.jar tmpDir=null
19/08/20 17:46:36 INFO impl.TimelineClientImpl: Timeline service address: ...
19/08/20 17:46:36 INFO client.RMProxy: Connecting to ResourceManager at ...
19/08/20 17:46:36 INFO client.AHSProxy: Connecting to Application History server at ...
19/08/20 17:46:36 INFO impl.TimelineClientImpl: Timeline service address: ...
19/08/20 17:46:36 INFO client.RMProxy: Connecting to ResourceManager at ...
19/08/20 17:46:36 INFO client.AHSProxy: Connecting to Application History server at ...
19/08/20 17:46:37 INFO fourmc.FourMcNativeCodeLoader: hadoop-4mc: loaded native library (embedded)
19/08/20 17:46:37 INFO fourmc.Lz4Codec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO fourmc.ZstdCodec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO fourmc.Lz4HighCodec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO fourmc.Lz4MediumCodec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO fourmc.Lz4UltraCodec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO mapred.FileInputFormat: Total input paths to process : 1
19/08/20 17:46:37 INFO mapreduce.JobSubmitter: number of splits:1
19/08/20 17:46:37 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1565620832498_0009
19/08/20 17:46:38 INFO impl.YarnClientImpl: Submitted application application_1565620832498_0009
19/08/20 17:46:38 INFO mapreduce.Job: The url to track the job: ...
19/08/20 17:46:38 INFO mapreduce.Job: Running job: job_1565620832498_0009
19/08/20 17:46:44 INFO mapreduce.Job: Job job_1565620832498_0009 running in uber mode : false
19/08/20 17:46:44 INFO mapreduce.Job: map 0% reduce 0%
19/08/20 17:47:03 INFO mapreduce.Job: map 67% reduce 0%
19/08/20 17:47:04 INFO mapreduce.Job: Task Id : attempt_1565620832498_0009_m_000000_0, Status : FAILED
Error: java.lang.NullPointerException
at com.hadoop.compression.fourmc.Lz4Decompressor.reset(Lz4Decompressor.java:234)
at org.apache.hadoop.io.compress.CodecPool.returnDecompressor(CodecPool.java:230)
at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:288)
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.close(MapTask.java:210)
at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:1979)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:468)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
4mc is able to compress from stdin, like this:
cat filename | ./4mc
Maybe I have just not specified necessary flag, but I have not figured out, how to decompress from stdin:
cat filename | ./4mc -d
returns help.
It would be very convinient and i hope we will see this feature soon.
Hello, I try to use 4mc codecs. I copied .so libraries and .jar file to my ../lib and /lib/native on all nodes. I also set core-site.xml on all nodes. See below. Other codecs residing in this paths work well unless fourmc. Is there something I forgot do do? Looks like Hadoop cannot find where classes are located? Running HDP 2.4.0.
I did not compile by myself, just downloaded .so and .jar file. Excuse my naivity, I am new to this.
Any help appreciated. Many thanks!
cp 4mc-master/java/hadoop-4mc/src/main/resources/com/hadoop/compression/fourmc/linux/amd64/libhadoop-4mc.so /usr/hdp/2.4.0.0-169/hadoop/lib/native
cp 4mc/hadoop-4mc-2.0.0.jar /usr/hdp/2.4.0.0-169/hadoop/lib
<name>io.compression.codecs</name> <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,com.hadoop.compression.fourmc.Lz4Codec,com.hadoop.compression.fourmc.Lz4MediumCodec,com.hadoop.compression.fourmc.Lz4HighCodec,com.hadoop.compression.fourmc.Lz4UltraCodec,com.hadoop.compression.fourmc.FourMcCodec,com.hadoop.compression.fourmc.FourMcMediumCodec,com.hadoop.compression.fourmc.FourMcHighCodec,com.hadoop.compression.fourmc.FourMcUltraCodec,com.hadoop.compression.fourmc.FourMzCodec,com.hadoop.compression.fourmc.FourMzMediumCodec,com.hadoop.compression.fourmc.FourMzHighCodec,com.hadoop.compression.fourmc.FourMzUltraCodec</value> </property>
When trying to use codec within sqoop tool I see following output
`
17/03/27 15:41:58 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
17/03/27 15:41:58 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 0d76fa3d79ee46cc5bc1f2bdad607fb18978826d]
17/03/27 15:41:58 INFO fourmc.FourMcNativeCodeLoader: hadoop-4mc: loaded native library (embedded)
17/03/27 15:41:58 INFO fourmc.Lz4Codec: Successfully loaded & initialized native-4mc library
17/03/27 15:41:58 INFO fourmc.Lz4MediumCodec: Successfully loaded & initialized native-4mc library
17/03/27 15:41:58 INFO fourmc.Lz4HighCodec: Successfully loaded & initialized native-4mc library
17/03/27 15:41:58 INFO fourmc.Lz4UltraCodec: Successfully loaded & initialized native-4mc library
17/03/27 15:41:58 INFO fourmc.ZstdCodec: Successfully loaded & initialized native-4mc library
Error: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.fourmc.Lz4Codec was not found.
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCompressorClass(FileOutputFormat.java:122)
at org.apache.sqoop.mapreduce.RawKeyTextOutputFormat.getRecordWriter(RawKeyTextOutputFormat.java:89)
at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.(MapTask.java:647)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:767)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.fourmc.Lz4Codec not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCompressorClass(FileOutputFormat.java:119)
... 9 more
`
Jan
I would like to use the hadoop-4mc
library in a gradle project, but there is no artifact in maven central or any other repository. In addition, jitpack.io cannot grab the information to auto-generate an artifact because there is no build file (pom.xml or gradle.build) because this is a mixed repository (see https://jitpack.io/#carlomedas/4mc logs).
Would it be possible to upload an artifact for every release? Thank you in advance!
I wanna create a hive table on a compressed file. when reading this file, it will give it to multiple mappers rather than 1 only if the compressed file is splittable.
How to do this with 4mc? Only to change the mapreduce api into old ones which is supported in Hive?
I compressed a file (~30mb, just for testing) using the 4mc tool:
$ 4mc data.txt
Compressed filename will be : data.txt.4mc
Compression: LZ4
Compressed (fast) 30288541 bytes into 2865501 bytes ==> 9.46% (Ratio=10.570)
Then I tried to open the compressed file in (py)spark:
$ pyspark --master local[1]
>>> sc.textFile('file:///data.txt.4mc').count()
17/01/24 09:28:04 WARN org.apache.spark.rdd.HadoopRDD: Exception in RecordReader.close()
java.lang.NullPointerException
at com.hadoop.compression.fourmc.Lz4Decompressor.reset(Lz4Decompressor.java:234)
at org.apache.hadoop.io.compress.CodecPool.returnDecompressor(CodecPool.java:224)
at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:288)
at org.apache.spark.rdd.HadoopRDD$$anon$1.close(HadoopRDD.scala:276)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1953)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
4634
I got the result, but it warned me about a null pointer exception. Thought you guys might want to know!
This is a continuation of the discussion which started on a LZ4 issue.
To summarize, I proposed:
byte[]
to natively accessible memory is inevitable either way;@carlomedas stated that the second approach still suffered from OOMEs. The main point with this approach is that there must be a clear demarcation of the direct buffer's lifecycle. Usually a job of some kind is started, causing the allocation of the direct buffer, and later the job is ended, providing the opportunity to clean up. If this aspect is missing and the only cleanup mechanism is the finalizer method, then this approach will be of no use. On the other hand, invoking cleaner.clean()
will, to the best of my knowledge, unconditionally free the native memory block.
Hey Carlo!
Great work on this repository; I'm very excited about the potential.
I'm spinning up some AWS EMR clusters for a production workflow and I'm hoping to incorporate your compression. I'm curious about whether or not there would be an easy way to configure AWS EMR to pull in the hadoop-4mc library when it's started, since it'll be a bit of a pain to go in after the fact and install the library across the cluster.
Do you have any advice or suggestions for how to implement hadoop-4mc on an AWS EMR cluster, and if so could you add it to your documentation?
Cheers and thanks in advance!
When using this decompressor while decompressing the map outputs I run into problems with large files and it looks like the DirectBuffer grows too large. I have tried changing the maximum direct buffer size but it seems to require too large of a size to be practical ( -XX:MaxDirectMemorySize=2g
didn't work and I stopped there). The following exception is thrown:
2015-01-21 16:54:39,137 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: Error while doing final merge
at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:160)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:658)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
at com.hadoop.compression.fourmc.Lz4Decompressor.<init>(Lz4Decompressor.java:101)
at com.hadoop.compression.fourmc.Lz4MediumCodec.createDecompressor(Lz4MediumCodec.java:156)
at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:178)
at org.apache.hadoop.mapred.IFile$Reader.<init>(IFile.java:345)
at org.apache.hadoop.mapred.Merger$Segment.init(Merger.java:302)
at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:634)
at org.apache.hadoop.mapred.Merger.merge(Merger.java:191)
at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.finalMerge(MergeManagerImpl.java:796)
at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.close(MergeManagerImpl.java:363)
at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:158)
... 6 more
I am not sure if there is something that can be done on decompressor side to fix this or if I am just not setting the options correctly. Any help would be appreciated.
I've noticed that, to make stream splittable, we have to read footer section to determine block indexes.
But in some context, it is hard to retrieve the file length, thus cannot easily seek to a relative position of the end of file.
On the other hand, in these contexts, another field in HEADER which indicates the footer offset will be handy.
Can this be solved?
I am able to load json files compressed by 4mz/c into spark. But writing does not work. Is this expected?
In [53]: df.write.mode('overwrite').option("codec","com.hadoop.compression.fourmc.ZstdCodec").csv('foo')
IllegalArgumentException: 'Codec [com.hadoop.compression.fourmc.ZstdCodec] is not available. Known codecs are bzip2, deflate, uncompressed, lz4, gzip, snappy, none.'
I'm using lzo format in Hadoop, and I need to create and read a index file to support splittable which may cause some bizarre problems.
I have look at the format of lz4 and lzo, find they are very similar. And I don't have a clue about how you make lz4 support splittable.
Could you give me a short brief how you did it?
Thank you.
I'm surprized it's not yet part of Apache Hadoop project :)
LZO is a pain to index. Plus has some licensing issues.
Great project.
Hello, I load 2.0.0 version, rebuild it and try to use in my project. And I got following exception:
Exception in thread "Thread-1" java.lang.UnsatisfiedLinkError: C:\Users\User\AppData\Local\Temp\libhadoop-4mc4717395728020324256.dll: Can't load IA 32-bit .dll on a AMD 64-bit platform
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1937)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1822)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at com.hadoop.compression.fourmc.FourMcNativeCodeLoader.loadLibrary(FourMcNativeCodeLoader.java:140)
at com.hadoop.compression.fourmc.FourMcNativeCodeLoader.(FourMcNativeCodeLoader.java:166)
I check that temporary dll file is byte to byte equal to win32/amd64/libhadoop-4mc.dll
I am running Hadoop 2.4.1. I run my jobs through mrjob (if that matters). When I run against an uncompressed file, splits happen and I automatically have more mappers than files. However when I run against *.4mc files no splitting occurs. Running hadoop fs -text file.4mc
works so I know it's decompressing okay and running a job against *.4mc files works just no splitting occurs.
One other thing I noticed is that if I use the files with the .lz4_uc
extension hadoop fs -text file.lz4_uc
using I get the following error:
15/01/23 01:47:58 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
Exception in thread "main" java.lang.InternalError: LZ4_decompress_safe returned: -2
I am not sure if that's related or not.
I tried the example from the doc
I ran the pyspark with the flag --jars hadoop-4mc-2.0.0.jar and I got this exception:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.io.IOException: Codec for file file:/media/netanel/Windows/Documents and Settings/Netanel/Downloads/people_json.4mc not found, cannot run
at com.hadoop.mapreduce.FourMcLineRecordReader.initialize(FourMcLineRecordReader.java:134)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.liftedTree1$1(NewHadoopRDD.scala:182)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:179)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:134)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:203)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopFile(PythonRDD.scala:561)
at org.apache.spark.api.python.PythonRDD.newAPIHadoopFile(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Codec for file file:/media/netanel/Windows/Documents and Settings/Netanel/Downloads/people_json.4mc not found, cannot run
at com.hadoop.mapreduce.FourMcLineRecordReader.initialize(FourMcLineRecordReader.java:134)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.liftedTree1$1(NewHadoopRDD.scala:182)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:179)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:134)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Great project !
Would you consider changing license to Apache?
There are some many use cases in Big Data ecosystem where most of the projects are Apache license based.
Thank you.
Hi
Couple of doubts
Thanks
Kind Regards
Akhil Dogra
could you provide a 4mc example for flink when flink read 4mc data on HDFS files?
Having difficulty with using the command line tool (linux) to process a directory of uncompressed text files to produce a directory of compressed files with the stdout redirected to a log file.
4mc [input] seems fine with wildcards, however, i cannot get wildcards working with [output] names, and cannot get the stdout to log to a file.
4mc -vz2 ./*.txt ./* >> log.txt
It is good to have a command line tool to test, but seems very limited for doing batch workloads on the local filesystem before uploading hdfs. Was expecting bash or gzip style basic input output log operations would work using a similar command syntax. If these operations are supported, could some documentation be added to describe the syntax, ideally in the -h
help
Suppose input is a 4MB buffer and is incompressible. The corresponding compressed size would be lz4_compressBound(4MB) > 4MB.
FourMcOutputStream.compress() will trigger the Lz4Compressor to compress input. And returns 4MB in
int len = compressor.compress(buffer, 0, buffer.length);
FourMcOutputStream detects that it should use uncompressed data directly. After getting the raw uncompressed data, there is still compressed data left in compressor.compressedDirectBuf, thus the compressor.finished()
returns false. Which triggers another call of FourMcOutputStream.compress()
in FourMcOutputStream.finish()
. It will try to get uncompressed again, which results an BufferUnderflowException
.
@Override
public void finish() throws IOException {
if (!compressor.finished()) {
compressor.finish();
while (!compressor.finished()) {
compress();
}
}
}
A quick fix to this problem would be allocating a big enough buffer when creating FourMcOutputStream. When the buffer is big enough to hold all the compressed data, line 196
int len = compressor.compress(buffer, 0, buffer.length);
in FourMcOutputStream will transfer all the data in compressedDirectBuf in to buffer, thus set the compressor.finish() to true.
Fix would be as followed in FourCodec.java:
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor) throws IOException {
int bufferPlusOverhead = Lz4Compressor.compressBound(FOURMC_MAX_BLOCK_SIZE);
return new FourMcOutputStream(out, compressor, bufferPlusOverhead);
}
If you believe this fix is ok to accept, I would send a pr later. However, I do think there should be a better way to solve this problem as we really don't need a buffer that's larger than 4MB.
@carlomedas maybe you can help me here ...
When I'm running my EMR mapreduce and want the output compressed with 4mz I get multiple errors like:
... is running beyond physical memory limits. Current usage: 4.6 GB of 4 GB physical memory used; 6.3 GB of 20 GB virtual memory used. Killing container. ...
I tried increasing the memory limit and I still get these errors (only later on the reducer processing).
Do you have an idea why the moment I started compressing with 4mz I started getting these errors? (When I compressed with lzo or bz I wasn't getting it)
Thanks!
./native
native code is compiled successfully with cmake but got error with just makefile.
make
command at native
directory, got error msg /tmp/ccBlDcLt.o: In function `HUF_decompress4X2_usingDTable_internal_bmi2_asm':
huf_decompress.c:(.text+0x2fba): undefined reference to `HUF_decompress4X2_usingDTable_internal_bmi2_asm_loop'
/tmp/ccBlDcLt.o: In function `HUF_decompress4X1_usingDTable_internal_bmi2_asm':
huf_decompress.c:(.text+0x3e29): undefined reference to `HUF_decompress4X1_usingDTable_internal_bmi2_asm_loop'
/usr/bin/ld: libhadoop-4mc.so.2.0.0: hidden symbol `HUF_decompress4X1_usingDTable_internal_bmi2_asm_loop' isn't defined
/usr/bin/ld: final link failed: Bad value
collect2: error: ld returned 1 exit status
make: *** [libhadoop-4mc] Error 1
Hi,
We have 4mc format files in my Hadoop cluster. We are trying to read these files and create DataSet (instead of creating RDD and then DataSet) in spark-2.0. Can you please us to do the same?
Hey,
Since EMR Streaming is using the old format of FileInputFormat class (required the old mapred
package name), we can't find a way to read the compressed files within the EMR Streaming steps.
Is there a wrapper to FourMzTextInputFormat
using the older api?
Hi, there's a typo in FourMzTextInputFormat where it extends from the wrong class: Mc should be Mz.
#27 Fixes this problem, but it hasn't been merged yet for some reason since October. There's also the issue of the wrong extends:
FourMcTextInputFormat
should extend from FourMcInputFormat<LongWritable, Text>
instead of just FourMcInputFormat
.FourMzTextInputFormat
.So I try to run a streaming job with a .4mc compressed input like
> hadoop jar $hadoop_streaming_jar -libjars $hadoop_4mc_jar -input txt.4mc -inputformat com.hadoop.mapreduce.FourMcTextInputFormat -output /test/out -mapper mapper.sh -reducer reducer.sh
and get an error:
Exception in thread "main" java.lang.RuntimeException: class com.hadoop.mapreduce.FourMcTextInputFormat not org.apache.hadoop.mapred.InputFormat
But
[4mc-2.2.0]$ grep extends java/hadoop-4mc/src/main/java/com/hadoop/mapreduce/FourMcInputFormat.java
public abstract class FourMcInputFormat<K, V> extends FileInputFormat<K, V> {
which as per the API doc in turn extends InputFormat:
org.apache.hadoop.mapred
Class FileInputFormat<K,V>
java.lang.Object
org.apache.hadoop.mapred.FileInputFormat<K,V>
All Implemented Interfaces:
InputFormat<K,V>
So why does this not work? Is this not supposed to work?
I am trying to use 4mc compression in AWS graviton machines which are powered by an aarch64 bit platform.
I tried to build the native libraries but ran into some issues. So wondering if we have any plans to support the aarch64 platform.
There doesn't seem to be a configured CI attached to the project.
Hi, I was looking for some backported codecs to hadoop 2.7 and your amazing work just did what I was looking for.
I noticed project versions are:
zstd: 1.0.1 (4 years old)
lz4: 1.3.0 (6 years old)
Would it be please possible to upgrade it?
I would help, but I am no expert in JNI.
The implementation of Codec
and InputFormat
seems to follow the pattern from Elephantbird. However, this isn't a good pattern in my opinion. In the spirit of Hadoop, the concept of compression and file format should be decoupled. We should be able to change compression formats without needed to change the way those files are read.
Currently, if we change the compression from e.g. gz to 4mc, we need to change the InputFormat
that is used to read the files, and we wouldn't be able to change the compression again. To do this gracefully, we would need to code defensively and dynamically change the InputFormats based on what files are in the input location. I don't think this strategy would work if you have a directory that has files that have been compressed with different formats.
In order to support this type of flexibility, the 4mc codecs should implement the SplittableCompressionCodec
interface. This provides existing formats the ability to gracefully handle the new compression formats.
testZstCodec(com.hadoop.compression.fourmc.TestFourMcCodec) Time elapsed: 0.034 sec <<< ERROR!
java.lang.UnsatisfiedLinkError: com.hadoop.compression.fourmc.zstd.Zstd.cStreamInSize()J
at com.hadoop.compression.fourmc.zstd.Zstd.cStreamInSize(Native Method)
at com.hadoop.compression.fourmc.zstd.ZstdStreamCompressor.<clinit>(ZstdStreamCompressor.java:66)
at com.hadoop.compression.fourmc.ZstCodec.<clinit>(ZstCodec.java:69)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
$ rpm -qa | grep zstd
libzstd-1.3.1-1.el7.x86_64
zstd-1.3.1-1.el7.x86_64
I am using ZstdCodec as follows:
given an existing (uncompressed) outputStream:
new ZstdCodec().createOutputStream(outputStream);
However, this line runs into:
cannot access org.apache.hadoop.conf.Configurable
class file for org.apache.hadoop.conf.Configurable not found
version: 2.0.0
os: centos x64
hadoop: 2.7.1
scenario: Simply generate a compressed sequence file with FourMzUltraCodec.
Exception:
2017-06-16 22:32:48 [INFO ](c.h.c.f.FourMcNativeCodeLoader :142) hadoop-4mc: loaded native library (embedded)
2017-06-16 22:32:48 [INFO ](c.h.c.f.ZstdCodec :84 ) Successfully loaded & initialized native-4mc library
2017-06-16 22:32:49 [INFO ](o.a.h.i.c.CodecPool :153) Got brand-new compressor [.4mz]
Exception in thread "main" java.lang.NullPointerException
at com.hadoop.compression.fourmc.ZstdCompressor.reset(ZstdCompressor.java:267)
at org.apache.hadoop.io.compress.CodecPool.returnCompressor(CodecPool.java:204)
at org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1275)
at org.apache.hadoop.io.SequenceFile$BlockCompressWriter.close(SequenceFile.java:1504)
at toolbox.analyzer2.TryFourMzUltra.run(TryFourMzUltra.java:44)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at toolbox.analyzer2.TryFourMzUltra.main(TryFourMzUltra.java:23)
my code
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.Tool;
import toolbox.analyzer2.util.debug.WrappedRunner;
/**
* @author lisn
*/
public class TryFourMzUltra extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new TryFourMzUltra(), args);
}
@Override
public int run(String[] args) throws Exception {
CompressionCodec codec = new com.hadoop.compression.fourmc.FourMzUltraCodec();
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
SequenceFile.Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(new Path("/tmp/testFourMzUltra")),
SequenceFile.Writer.keyClass(LongWritable.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.compression(
SequenceFile.CompressionType.BLOCK,
codec
));
writer.append(new LongWritable(1), new Text("12341234"));
writer.close(); //exception raise from here
return 0;
}
}
Hi,
Great project. I was wondering if its possible to write to standard output when decompressing?
Hello,
First of all thank you for your hard work delivering this solution - it is just fantastic.
Second - sorry for posting into issues but i did not find any other place to do so.
The issue I have is with loading properly compressed 4mz file (by properly compressed i mean one that gets properly decompressed by your 4mc.exe application, so this gives me confidence that i have created proper file) into Spark DataFrame.
The setup is Spark 2.4.3 for Hadoop 2.7.
4mc lattest version (2.0.0), the LD_LIBRARY_PATH set to proper folder containing hadoop native libraries.
The coode (Scala):
`
import com.hadoop.mapreduce.FourMcTextInputFormat
import com.hadoop.mapreduce.FourMzTextInputFormat
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import spark.implicits._
var conf = new org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration)
conf.set("textinputformat.record.delimiter", "\r\n")
conf.set("io.compression.codecs","org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.fourmc.Lz4Codec,com.hadoop.compression.fourmc.Lz4MediumCodec,com.hadoop.compression.fourmc.Lz4HighCodec,com.hadoop.compression.fourmc.Lz4UltraCodec,com.hadoop.compression.fourmc.FourMcCodec,com.hadoop.compression.fourmc.FourMcMediumCodec,com.hadoop.compression.fourmc.FourMcHighCodec,com.hadoop.compression.fourmc.FourMcUltraCodec,com.hadoop.compression.fourmc.FourMzCodec,com.hadoop.compression.fourmc.FourMzMediumCodec,com.hadoop.compression.fourmc.FourMzHighCodec,com.hadoop.compression.fourmc.FourMzUltraCodec")
var rdd = sc.newAPIHadoopFile("file:/home/me/part-m-20190605121832.4mz", classOf[FourMzTextInputFormat].asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[LongWritable, Text]]), classOf[LongWritable],classOf[Text], conf);
val dset = rdd.map(_._2.toString).toDS
val frame = spark.read.option("header", false).option("inferSchema",true).option("delimiter","\u0001").csv(dset)
frame.printSchema()
frame.show()`
Every time the rdd is empty, so frame is empty as well. No matter what i do it always ends up empty without any exception raised and without any debug / warning messages (when i set log level to DEBUG i get lots of messages from Spark but none from the 4mc saying anything wrong). Would you be so kind to suggest what am i doing wrong or what are the external references (native libraries or sth?) that i have set faulty?
The code for 4mc compression works flawlessly and it is almost the same (different imput file of course and different TextImputFormat):
`import com.hadoop.mapreduce.FourMcTextInputFormat
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import spark.implicits._
var conf = new org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration)
conf.set("textinputformat.record.delimiter", "\r\n")
conf.set("io.compression.codecs","org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.fourmc.Lz4Codec,com.hadoop.compression.fourmc.Lz4MediumCodec,com.hadoop.compression.fourmc.Lz4HighCodec,com.hadoop.compression.fourmc.Lz4UltraCodec,com.hadoop.compression.fourmc.FourMcCodec,com.hadoop.compression.fourmc.FourMcMediumCodec,com.hadoop.compression.fourmc.FourMcHighCodec,com.hadoop.compression.fourmc.FourMcUltraCodec,com.hadoop.compression.fourmc.FourMzCodec,com.hadoop.compression.fourmc.FourMzMediumCodec,com.hadoop.compression.fourmc.FourMzHighCodec,com.hadoop.compression.fourmc.FourMzUltraCodec")
var rdd = sc.newAPIHadoopFile("file:/home/me/part-m-20190605121825.4mc", classOf[FourMcTextInputFormat].asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[LongWritable, Text]]), classOf[LongWritable],classOf[Text], conf)
val dset = rdd.map(_._2.toString).toDS
val frame = spark.read.option("header", false).option("inferSchema",true).option("delimiter","\u0001").csv(dset)
frame.printSchema()
frame.show()
`
Thank you very much in advance.
Best regards.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.