Giter Club home page Giter Club logo

dps's Introduction

DPS (Data Processing System)

Note: there are two frameworks for running Spark-based processing jobs in DPS

  • An RDD-based framework, which is described in this README
  • A DataFrame-based framework, described in a separate document

Requirements

  • python 3.8

How to run DPS?

python setup.py install
python bin/sparkapp.py {job_name} {params}

# Example
# python bin/sparkapp.py sample_job --config_path=./configs/sample_job.yaml

DPS job list

job describe param options
sample_job Sample jsonl data from text files in directories yaml configs
dedup_job De-duplicate jsonl data using MinHash method yaml configs
korean_job Refine jsonl data in Korean language yaml configs

Development guides

Test Run

This is test run for sample_job job.

1. Setup dps package

python setup.py install

2. Check config file and dataset

cat configs/sample_job.yaml
ls datasets/test_sample_jsonl_data

3. Run sample_job job by bin/sparkapp.py

python bin/sparkapp.py sample_job --config_path=./configs/sample_job.yaml

4. Check output file

cat datasets/test_output_data/part-00000

Add your own job

Implement your job function

  1. Make an issue on ElutherAI/dps repository

    • Describe your job first
    • Define input and outputs and these examples
  2. Go to dps/spark/jobs and create python your_own_job.py script file.

  3. Make a function to run your job. Here's template to play your works.

    from pyspark import SparkContext
    from pyspark.rdd import RDD
    
    from dps.spark.spark_session import spark_session
    from dps.spark.utils.io_utils import read_line, to_json
    
    
    def your_own_job(input_path, output_path):
        
        with spark_session(f'your own job') as spark:
            sc: SparkContext = spark.sparkContext # Spark context is to run your spark application
    
            # Read all files in your directory or file
            proc_rdd: RDD = sc.textFile(input_path) \
                .repartition(10) \
                .flatMap(read_line) 
                
            # Write data that you processed
            proc_rdd \
                .repartition(1) \
                .flatMap(to_json) \
                .saveAsTextFile(output_path)
  4. Register your job into dps/spark/run.py

    from .jobs.your_own_job import your_own_job
    
    def run():
        fire.Fire({'sample_job': sample_job,
                   'your_own_job': your_own_job
                   })
  5. Test run your job

    python bin/sparkapp.py your_own_job --input_path='{input_your_data_dir_or_file}' \
                                        --output_path='{output_path}'

dps's People

Contributors

acul3 avatar chris-ha458 avatar donggrii avatar fujiki-1emon avatar hyunwoongko avatar jason9693 avatar josemlopez avatar kaeun-lee avatar mrorii avatar ohwi avatar paulovn avatar polm avatar polm-stability avatar ronalmoo avatar skjang54 avatar skytmddus27 avatar taekyoon avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

dps's Issues

Bug in the function `remove_repeated_text`

The function remove_repeated_text shows unexpected behavior when the sentence is short.

Below is my test with the function.

{before} -> {after}

connue sous le nom de "mort par coeur brisé". -> connue sous brisé".
remontant à 150 ans, -> remontant à ans,
Je m'occupais des gens qui mouraient, et de leurs familles, -> Je m'occupais familles,
une femme qui mourait de démence. -> une femme démence.
qui prenait soin d'elle. -> qui prenait d'elle.
par exemple, pendant la première année. -> par exemple, année.
Ce qui m'est arrivé, c'est que je m'occupais d'une patiente, -> Ce qui patiente,
dans la banlieue sud de Chicago. -> dans la Chicago.
Dans mon labo, j'étudiais l'effet de veuvage, -> Dans mon veuvage,
tout au long de leur fin de vie. -> tout au vie.
c'est sa fille -> c'est sa fille
Et la fille était épuisée par les soins qu'elle apportait à sa mère. -> Et la mère.
Ainsi, quand je mourrai, le risque de décès de ma femme est doublé -> Ainsi, quand doublé
Pour moi, cette histoire a commencé il y a 15 ans, -> Pour moi, ans,
quand j'étais médecin en soins palliatifs à l'Université de Chicago. -> quand j'étais Chicago.
Et j'observais ce qui se passait pour ces gens et leurs familles -> Et j'observais familles
lui aussi était malade, -> lui aussi malade,
Et dans ce cas, contrairement à ce couple, -> Et dans couple,
qui est une très ancienne idée pour les sciences sociales, -> qui est sociales,

Add statistics by data category

with @Kaeun-Lee

objective

  • help to determine which data is proper to train by inspecting data distribution

input

  • preprocessed train dataset
  • data statistics grouped by data category
  • tokenizer

output

  1. excel data which shows text length statistics by data category all at once
  • number of dataset by category
  • min
  • max
  • mean
  • median
  • 25 percentile
  • 75 percentile
  • std
  1. boxplot images which show length distribution by data category

MassiveText Quality Filtering

How about adding some heuristic filters similar to MassiveText's Quality Filtering?
Could be helpful for web crawled datasets.
The numbers/values may vary depending on the language, though

  • any doc that does not contain between 50 and 100,000 words
  • any doc whose mean word length is outside the range of 3 to 10 characters
  • any doc with a symbol-to-word ratio greater than 0.1 for either the hash symbol or the ellipsis
  • any doc with more than 90% of lines starting with a bullet point, or more than 30% ending with an ellipsis
  • any doc that 80% of words in a document does not contain at least one alphabetic character
  • any doc that does not contain at least two of the following English words: the, be, to, of, and, that, have, with (language specific words may needed)

[ja] `.filter` is used instead of `.map` for non-filter methods

On

.filter(lambda x: japanese_bad_words_filter(x[use_column]))
.filter(lambda x: doc_len_filter(x[use_column], conf["min_doc_len"], conf["max_doc_len"]))
.filter(lambda x: japanese_mean_word_len_filter(x[use_column], conf["min_mean_word_len"], conf["max_mean_word_len"]))
.filter(lambda x: japanese_symbol_to_word_ratio_filter(x[use_column], conf["symbol_to_word_ratio"]))
.filter(lambda x: bullet_ellipsis_filter(x[use_column], conf["bullet_point_ratio"], conf["ellipsis_ratio"]))
.filter(lambda x: japanese_word_ratio_filter(x[use_column], conf["japanese_word_ratio"]))
.filter(lambda x: dict(text=preprocess_text(x[use_column])))
.filter(lambda x: doc_len_filter(x[use_column], conf["min_doc_len"], conf["max_doc_len"]))
.filter(lambda x: japanese_frequent_char_existence_filter(x[use_column], conf["freq_char_cnt"]))
.filter(lambda x: reduce_japanese_emoticon(x[use_column]))
.filter(lambda x: many_separators_filter(x[use_column], conf["separator_ratio"]))
.filter(lambda x: remove_symbols(x[use_column]))
there are several cases where we are using .filter but instead it should be a .map.

For example

.filter(lambda x: reduce_japanese_emoticon(x[use_column]))

calls
def reduce_japanese_emoticon(text):
text = re.sub("w{3,}", "www", text)
text = re.sub("笑{2,}", "笑", text)
return text

but in effect this is doing nothing because the expression within .filter is always is true, as long as text is non-empty:

>>> def reduce_japanese_emoticon(text):
...     text = re.sub("w{3,}", "www", text)
...     text = re.sub("笑{2,}", "笑", text)
...     return text
>>> rdd = sc.parallelize([{'text': 'wwwwasdf'}, {'text': '1234笑笑笑'}, {'text': ''}])
>>> rdd.filter(lambda x: reduce_japanese_emoticon(x['text'])).collect()
[{'text': 'wwwwasdf'}, {'text': '1234笑笑笑'}]

Thus, I think the following cases of .filter are simply doing nothing instead of the intended preprocessing:

The remaining calls to methods that end with _filter (e.g. japanese_bad_words_filter, doc_len_filter, etc.) are actually filter methods that return booleans so they should be OK.

Need to add ignore null or empty text during korean text process

Agenda

  • Some raw datasets can have empty or null text.
  • Using filter method in RDD or DF, text like "" need to be ignored during this process.

How to solve this problem?

  • Add filter method after load the data
  • Add condition if the text data is null or empty in filter method

Chiese dedup memory error

there is memory error when deduplicate Chinese data.

23/04/19 19:44:17 WARN MemoryStore: Not enough space to cache rdd_7_0 in memory! (computed 176.2 MiB so far)
23/04/19 19:44:17 WARN BlockManager: Block rdd_7_0 could not be removed as it was not found on disk or in memory
23/04/19 19:44:17 WARN BlockManager: Putting block rdd_7_0 failed
23/04/19 19:44:17 WARN MemoryStore: Not enough space to cache rdd_7_2 in memory! (computed 176.2 MiB so far)
23/04/19 19:44:17 WARN BlockManager: Block rdd_7_2 could not be removed as it was not found on disk or in memory
23/04/19 19:44:17 WARN BlockManager: Putting block rdd_7_2 failed
23/04/19 19:44:17 WARN MemoryStore: Not enough space to cache rdd_7_9 in memory! (computed 176.3 MiB so far)
23/04/19 19:44:17 WARN BlockManager: Block rdd_7_9 could not be removed as it was not found on disk or in memory
23/04/19 19:44:17 WARN BlockManager: Putting block rdd_7_9 failed
23/04/19 19:44:19 WARN MemoryStore: Not enough space to cache rdd_7_7 in memory! (computed 176.6 MiB so far)
23/04/19 19:44:19 WARN BlockManager: Block rdd_7_7 could not be removed as it was not found on disk or in memory
23/04/19 19:44:19 WARN BlockManager: Putting block rdd_7_7 failed
23/04/19 19:44:41 WARN BlockManager: Block rdd_7_6 could not be removed as it was not found on disk or in memory
23/04/19 19:44:42 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 30545)
java.lang.OutOfMemoryError: Java heap space

Add general text refinement job

Task

  • Filter empty or short text
  • Deduplicate text cases
  • Remove general removable text patterns which are human defined by regex
  • Replace privacy info text patterns by tags like <phone_number>, <credit_card_number>

Data should be output as each categories.

Add pre-processing for Japanese texts

Background

  • We want a large-scale Japanese corpus whose dataset size is about more than 1TB.
  • We've already found that we can build 1TB-sized Japanese corpus when we aggregate publicly available Japanese corpus.
    • e.g. mC4, OSCAR
  • However, just by aggregating the Japanese corpus, we might not be able to build a good quality corpus.
    • There might be quality issue.
      • e.g. duplication.

What to do

  • Apply some pre-processing (e.g. deduplication) to the corpus, and make the corpus better in its quality.
    • We can borrow the ideas for pre-processing from DeepMind Gopher paper.
    • Refer to the pre-processing for Korean to build Japanese one.
  • If we add to the corpus our newly crawled data, apply again the pre-processing to the summation of existing corpus and the new one.
  • Repeat the process above until we get corpus good enough in quality and quantity.

[ja] reduce emoticon

Background

  • After the current Japanese quality filtering, as far as I see, there seem to be a lot of bad quality texts like having a lot of repeating emoticons.
  • However, when we check the quality filtered datasets in depth, and/or, when we add other datasets to the current one, we might find such repetitive emoticons.
  • So, we might need to implement some pre-processing like japanese_reduce_emoticon referring to the Korean one.

Remove `soynlp` library

Object

  • We are using soynlp to normalize emotion characters.
  • Remove soynlp library and customize normalize emotion function

[ja] spam word filter

Background

  • Similar background to #50
  • We might need to implement japanese_spam_words_filter as needed basis.

Task consideration

  1. Deduplication: MinHash with jacaard similarity

  2. URL / Email

    • for match in re.finditer("email" ... "url")
    • 70% 확률로 날리기.
    • @hyunwoongko
  3. Replace HTML parser

  4. Unicode correction (mac/linux => nfd / win => nfc)

    • 노멀라이저 추가해서 토크나이저 다시 학습.
    • @bzantium
nfc => '가''발' (완성형)
nfd => 'ㄱ''ㅏ''ㅂ''ㅏ''ㄹ' => 옛 한글 (자모)
nfkc / nfkd => nfc / nfd랑 같으나 일부 글자들에서 차이가 있음.

ex) ㈆
nfc => '㈆'
nfkc => '(''ㅅ'')'
  1. Bad word filtering (무단배포금지 등)
    • spam = ['무단배포금지', '무단전제금지']
    • spam not in re.sub("[ \n\r\t...등등]", "", text)
    • @hyunwoongko

[ja] replace Japanese PII

Background

  • Seems that we don't have to implement a lot of pre-processing to replace Japanese PII
    • because there are already some PII pre-processing in language agnostic processing.
  • But we might need to implement additionally to replace some Japanese specific PII.

TODOs

Add huggingface tokenizers for data length statistics

with @DongChan-Lee

objective

  • add auto tokenizer applied in huggingface model for statistics by data category to compare text length

input

  • preprocessed train dataset
  • huggingface tokenizers

output

  1. excel data which shows text length statistics tokenized by various tokenizers
  2. boxplot images which show length distribution tokenized by various tokenizers

Add job to separate train and validate data

Content

  • Add job to separtate train and validate data from original data
  • dataset format should be like this
{"text": "this is text"}
{"text": "this is text"}
{"text": "this is text"}
{"text": "this is text"}
{"text": "this is text"}

dedup_job java.lang.UnsatisfiedLinkError

Hello I am getting this error while running dedup_job.

I am able to run sample_job and Korean_job but when I get this error in dedup_job.

I am using conda env, spark 3.0.1 with hadoop 2.7 with java SDK 8 on Windows. My system variables are set properly.

Need help

Kindly add a deduplication job for the Korean dataset.

py4j.protocol.Py4JJavaError: An error occurred while calling o52.partitions.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Ljava/lang/String;)Lorg/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat;
        at org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Native Method)
        at org.apache.hadoop.io.nativeio.NativeIO$POSIX.getStat(NativeIO.java:608)
        at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfoByNativeIO(RawLocalFileSystem.java:934)
        at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:848)
        at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:816)
        at org.apache.hadoop.fs.LocatedFileStatus.<init>(LocatedFileStatus.java:52)
        at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2199)
        at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2179)
        at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
        at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:101)
        at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
        at org.apache.spark.rdd.RDD.$anonfun$dependencies$2(RDD.scala:264)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.dependencies(RDD.scala:260)
        at org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)
        at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2529)
        at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:2503)
        at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1898)
        at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.$anonfun$getAllPrefLocs$1(CoalescedRDD.scala:198)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.getAllPrefLocs(CoalescedRDD.scala:197)
        at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.<init>(CoalescedRDD.scala:190)
        at org.apache.spark.rdd.DefaultPartitionCoalescer.coalesce(CoalescedRDD.scala:391)
        at org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:90)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
        at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
        at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
        at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
        at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:748)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.