nvidia / spark-rapids-benchmarks Goto Github PK
View Code? Open in Web Editor NEWSpark RAPIDS Benchmarks – benchmark sets and utilities for the RAPIDS Accelerator for Apache Spark
License: Apache License 2.0
Spark RAPIDS Benchmarks – benchmark sets and utilities for the RAPIDS Accelerator for Apache Spark
License: Apache License 2.0
comamnd:
python3 nds_gen_data.py hdfs 10 10 /data/full_bench_raw_sf10_1 --overwrite_output --update 1
error:
2022-08-15 07:29:39,299 INFO mapreduce.Job: Task Id : attempt_1660543957812_0005_m_000009_2, Status : FAILED
Error: java.lang.InterruptedException: Process failed with status code 139
at org.notmysock.tpcds.GenTable$DSDGen.map(GenTable.java:256)
at org.notmysock.tpcds.GenTable$DSDGen.map(GenTable.java:225)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
I cannot reproduce this error on our internal yarn cluster.
But the following command works well:
python3 nds_gen_data.py hdfs 10 5 /data/full_bench_raw_sf10_1 --overwrite_output --update 1
python3 nds_gen_data.py hdfs 20 5 /data/full_bench_raw_sf10_1 --overwrite_output --update 1
python3 nds_gen_data.py hdfs 10 9 /data/full_bench_raw_sf10_1 --overwrite_output --update 1
The benchmark now support Iceberg, we plan to support Delta Lake as well.
In #34, a new field "queryValidationStatus" is introduced to json summary. The logic for its update is broken, when there're no unmatch queries, this field is not added.
I am trying to convert CSV to another file format. My input data has paths ending in .dat
. According to the documentation, I can use the --input-suffix
argument to handle this:
--input_suffix INPUT_SUFFIX
text to append to every input filename (e.g., ".dat"; the default is empty)
However, this does not seem to be implemented.
I am running this:
$ ./spark-submit-template convert_submit_gpu.template nds_transcode.py --input-suffix .dat --output_format json /mnt/bigdata/tpcds/sf100-tbl/ /mnt/bigdata/tpcds/sf100-jso00-json/ report.txt
```
It fails with:
```
nds_transcode.py: error: unrecognized arguments: --input-suffix report.txt
```
In document, step "nds_transcode.py" and "nds_maintenance.py" use the same submit template "convert_submit_cpu_iceberg" in examples. Actually, 'spark_catalog' warehouse is needed in nds_maintenance step , and should removed in nds_transcode step. So the two steps should use different submit templates.
Could you clarify it in document ? otherwise could be a misleading.
The setup_tables
function in nds_power.py
is hard-coded to read Parquet files. I would like to be able to specify running against other formats (csv, orc, json, etc)
When loading data sources for Power Run, the code is requiring schema for all types of content including Parquet Orc. The schema should be only required for text-type data sources.
If --json_summary_folder
arg is not specified, the JSON summary files will be saved to the default folder "json-summary" by default.
When we run the test for the second time, we detect if the json_summary folder has any files inside, if so, the code will raise an exception to protect those existed summaries.
Some users complain it's really annoying and they don't need those summary files.
Also when running Throughput Test, if this json_summary argument is not specified differently, the run will fail due to duplicated summary folder name.
Data Maintenance will apply N times refresh runs with different refresh data.
The current implementation will require the user to transcode the refresh data from raw CSV into Iceberg.
It's more convenient to allow users to transcode the refresh data to independent Parquet/Orc files instead of Iceberg tables so that user can easily specify only refresh data path to do a refresh run.
it would be nice if the nds query number/name would show up in the spark sql ui under the description column
====== Run LF_CR ======
...
...
22/08/17 17:12:04 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.3 in stage 23.0 (TID 26) (csputil-icmo-w-1.c.rapids-spark.internal executor 5): java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
partition 'cr_returned_date_sk=2451972' in spec [
1000: cr_returned_date_sk: identity(1)
]
at org.apache.iceberg.io.ClusteredWriter.write(ClusteredWriter.java:95)
at org.apache.iceberg.io.ClusteredDataWriter.write(ClusteredDataWriter.java:34)
at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:641)
at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:616)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
22/08/17 17:12:04 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 23.0 failed 4 times; aborting job
22/08/17 17:12:04 ERROR org.apache.spark.sql.execution.datasources.v2.AppendDataExec: Data source write support IcebergBatchWrite(table=spark_catalog.default.catalog_returns, format=PARQUET) is aborting.
22/08/17 17:12:04 ERROR org.apache.spark.sql.execution.datasources.v2.AppendDataExec: Data source write support IcebergBatchWrite(table=spark_catalog.default.catalog_returns, format=PARQUET) aborted.
An error occurred while calling o60.sql.
: org.apache.spark.SparkException: Writing job aborted.
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:46)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 4 times, most recent failure: Lost task 0.3 in stage 23.0 (TID 26) (csputil-icmo-w-1.c.rapids-spark.internal executor 5): java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
partition 'cr_returned_date_sk=2451972' in spec [
1000: cr_returned_date_sk: identity(1)
]
at org.apache.iceberg.io.ClusteredWriter.write(ClusteredWriter.java:95)
at org.apache.iceberg.io.ClusteredDataWriter.write(ClusteredDataWriter.java:34)
at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:641)
at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:616)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
cleaned due to policy.
We need a script to combine all seperate tests together to do a whole NDS run including Power Run, Throughput Run, Data Maintenance etc.
The instructions for generating queries states that the --template
and --streams
arguments are optional, but if both are missing then the command fails with an unhelpful stacktrace, e.g.:
$ python -i nds_gen_query_stream.py $TPCDS_HOME/query_templates 100 ./queries
Traceback (most recent call last):
File "nds_gen_query_stream.py", line 122, in <module>
generate_query_streams(args, tool_path)
File "nds_gen_query_stream.py", line 67, in generate_query_streams
subprocess.run(cmd, check=True, cwd=str(work_dir))
File "/home/jlowe/miniconda3/envs/cudf_dev/lib/python3.8/subprocess.py", line 493, in run
with Popen(*popenargs, **kwargs) as process:
File "/home/jlowe/miniconda3/envs/cudf_dev/lib/python3.8/subprocess.py", line 858, in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
File "/home/jlowe/miniconda3/envs/cudf_dev/lib/python3.8/subprocess.py", line 1639, in _execute_child
self.pid = _posixsubprocess.fork_exec(
TypeError: expected str, bytes or os.PathLike object, not NoneType
Arguably this mode should work and be treated as if the user specified --streams 1
. Otherwise the usage statement needs to be updated to show these are not truly optional and that one must be specified or the other, e.g.: {--template <template_file> | --streams <stream_count>}
In nds/convert_submit_gpu.template, below configs don't exist in plulgin anymore.
"--conf" "spark.rapids.sql.csv.read.date.enabled=true"
"--conf" "spark.rapids.sql.csvTimestamps.enabled=false"
"--conf" "spark.rapids.sql.csv.read.integer.enabled=true"
Let's clean up the configs used in template files.
Config settings have quotation marks around each setting and conf keyword: https://github.com/NVIDIA/spark-rapids-benchmarks/blob/branch-22.08/nds/power_run_gpu.template#L20-L42.
Is it possible to remove the quotation marks for easier usage and readability? Previous iterations of the benchmark scripts supported template files with no quotations.
With data validation, we can check if the query output produced by GPU are the same as the one produced by CPU. Thus we want to add more status flags to show if a query is "both successfully finished and query output validated" or "successfully finished but query output not valid" etc.
It is necessary to compare the query output between GPU and CPU runs to make sure the GPU plugin produces the correct results.
The repository contains multiple functional parts but lack of test code to detect any regression issue.
Parameters:
scala_factor=5
parallel=10
transcode with iceberg(parquet)
power run with iceberg
Detail message:
=== Comparing Query: query78 ===
Collected 100 rows in 0.19057893753051758 seconds
Row 7:
[255, 2.03, 81, Decimal('58.95'), Decimal('4.66'), 40, Decimal('63.34'), Decimal('1.36')]
[255, 2.02, 81, Decimal('58.95'), Decimal('4.66'), 40, Decimal('63.34'), Decimal('1.36')]
Processed 100 rows
There were 1 errors
Collected 100 rows in 0.1670541763305664 seconds
Processed 100 rows
Results match
=== Unmatch Queries: ['query78'] ===
Query 78 in stream:
-- start query 26 in stream 1 using template query78.tpl
with ws as
(select d_year AS ws_sold_year, ws_item_sk,
ws_bill_customer_sk ws_customer_sk,
sum(ws_quantity) ws_qty,
sum(ws_wholesale_cost) ws_wc,
sum(ws_sales_price) ws_sp
from web_sales
left join web_returns on wr_order_number=ws_order_number and ws_item_sk=wr_item_sk
join date_dim on ws_sold_date_sk = d_date_sk
where wr_order_number is null
group by d_year, ws_item_sk, ws_bill_customer_sk
),
cs as
(select d_year AS cs_sold_year, cs_item_sk,
cs_bill_customer_sk cs_customer_sk,
sum(cs_quantity) cs_qty,
sum(cs_wholesale_cost) cs_wc,
sum(cs_sales_price) cs_sp
from catalog_sales
left join catalog_returns on cr_order_number=cs_order_number and cs_item_sk=cr_item_sk
join date_dim on cs_sold_date_sk = d_date_sk
where cr_order_number is null
group by d_year, cs_item_sk, cs_bill_customer_sk
),
ss as
(select d_year AS ss_sold_year, ss_item_sk,
ss_customer_sk,
sum(ss_quantity) ss_qty,
sum(ss_wholesale_cost) ss_wc,
sum(ss_sales_price) ss_sp
from store_sales
left join store_returns on sr_ticket_number=ss_ticket_number and ss_item_sk=sr_item_sk
join date_dim on ss_sold_date_sk = d_date_sk
where sr_ticket_number is null
group by d_year, ss_item_sk, ss_customer_sk
)
select
ss_item_sk,
round(ss_qty/(coalesce(ws_qty,0)+coalesce(cs_qty,0)),2) ratio,
ss_qty store_qty, ss_wc store_wholesale_cost, ss_sp store_sales_price,
coalesce(ws_qty,0)+coalesce(cs_qty,0) other_chan_qty,
coalesce(ws_wc,0)+coalesce(cs_wc,0) other_chan_wholesale_cost,
coalesce(ws_sp,0)+coalesce(cs_sp,0) other_chan_sales_price
from ss
left join ws on (ws_sold_year=ss_sold_year and ws_item_sk=ss_item_sk and ws_customer_sk=ss_customer_sk)
left join cs on (cs_sold_year=ss_sold_year and cs_item_sk=ss_item_sk and cs_customer_sk=ss_customer_sk)
where (coalesce(ws_qty,0)>0 or coalesce(cs_qty, 0)>0) and ss_sold_year=2001
order by
ss_item_sk,
ss_qty desc, ss_wc desc, ss_sp desc,
other_chan_qty,
other_chan_wholesale_cost,
other_chan_sales_price,
ratio
LIMIT 100;
-- end query 26 in stream 1 using template query78.tpl
After #57, the customized listener is registered via the Spark config. It's better to register it via code at runtime to have a better life control of it.
When using nds_validate.py to compare runs on my desktop, I don't necessarily need the json file to be updated, and it is an extra parameter I need to specify.
Would be nice if --json_summary_folder
was optional.
When running Data Maintenance step, the code will parse the DM query content to split the DELETE or INSERT queries by ;
mark.
In previous PR9, the LICENSE content is added with a ;
mark in it and this is making our split code broken.
When executing PowerRun on Dataproc, the following error is thrown:
22/06/28 08:48:35 INFO org.sparkproject.jetty.server.AbstractConnector: Stopped Spark@4d149fe9{HTTP/1.1, (http/1.1)}{0.0.0.0:0}
22/06/28 08:49:05 ERROR org.apache.spark.rpc.netty.Inbox: Ignoring error
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:176)
at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:150)
at org.apache.spark.rpc.netty.NettyRpcEnv.send(NettyRpcEnv.scala:193)
at org.apache.spark.rpc.netty.NettyRpcEndpointRef.send(NettyRpcEnv.scala:564)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.removeExecutor(CoarseGrainedSchedulerBackend.scala:600)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.$anonfun$reset$1(CoarseGrainedSchedulerBackend.scala:578)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.$anonfun$reset$1$adapted(CoarseGrainedSchedulerBackend.scala:576)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:335)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:1111)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.reset(CoarseGrainedSchedulerBackend.scala:576)
at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.reset(YarnSchedulerBackend.scala:254)
at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint$$anonfun$receive$1.applyOrElse(YarnSchedulerBackend.scala:326)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 153876 ms exceeds timeout 120000 ms
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 5 with no recent heartbeats: 157542 ms exceeds timeout 120000 ms
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 7 with no recent heartbeats: 158587 ms exceeds timeout 120000 ms
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 1 with no recent heartbeats: 158708 ms exceeds timeout 120000 ms
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 4 with no recent heartbeats: 161529 ms exceeds timeout 120000 ms
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 6 with no recent heartbeats: 159709 ms exceeds timeout 120000 ms
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 3 with no recent heartbeats: 157897 ms exceeds timeout 120000 ms
22/06/28 08:51:09 ERROR org.apache.spark.network.client.TransportClient: Failed to send RPC RPC 4870712985705646405 to /10.138.0.56:39672: java.nio.channels.ClosedChannelException
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
22/06/28 08:51:09 ERROR org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Sending KillExecutors(List(2)) to AM was unsuccessful
java.io.IOException: Failed to send RPC RPC 4870712985705646405 to /10.138.0.56:39672: java.nio.channels.ClosedChannelException
at org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:363)
at org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:340)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
... 12 more
This error doens't impact the performance number for PowerRun as all queries have successfully finished before the shutting down process.
====== Power Test Time: 2905709 milliseconds ======
====== Total Time: 2972227 milliseconds ======
Paste command to reproduce this error:
sudo /usr/lib/spark/bin/spark-submit \
--master yarn \
--conf spark.rapids.sql.batchSizeBytes=1GB \
--conf spark.driver.maxResultSize=2GB \
--conf spark.executor.cores=16 \
--conf spark.locality.wait=0 \
--conf spark.rapids.sql.concurrentGpuTasks=2 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.0625 \
--conf spark.executor.memory=24G \
--conf spark.driver.memory=100G \
--conf spark.sql.files.maxPartitionBytes=256mb \
--conf spark.rapids.memory.host.spillStorageSize=32G \
--conf spark.sql.adaptive.enabled=true \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.rapids.memory.pinnedPool.size=8g \
--conf spark.rapids.sql.incompatibleOps.enabled=true \
--conf spark.executor.instances=8 \
--conf spark.executor.memoryOverhead=24G \
--conf spark.scheduler.minRegisteredResourcesRatio=1.0 \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.eventLog.compress=true \
--conf spark.sql.shuffle.partitions=200 \
--conf spark.extraListeners="" \
--files /usr/lib/spark/scripts/gpu/getGpusResources.sh \
--jars jars/rapids-4-spark_2.12-22.08.0-20220626.145727-18-cuda11.jar \
nds_power.py \
gs://nds2/parquet_sf3k_decimal \
query_0.sql \
time-1656322357.csv
Use
and clean up current branch-X.Y after all new branches and CI work fine
As per Specification 4.3:
4.3.1 Each query has one or more substitution parameters. Dsqgen must be used to generate executable query texts for the
query streams. In order to generate the required number of query streams, dsqgen must be used with the
RNGSEED, INPUT and STREAMS options. The value for the RNGSEED option, <SEED>, is selected as the
timestamp of the end of the database load time (Load End Time) expressed in the format mmddhhmmsss as defined
in Clause 7.4.3.8. The value for the STREAMS option, <S>, is two times the number of streams, Sq, to be executed
during each Throughput Test (S=2* Sq). The value of the INPUT option, <input.txt>, is a file containing the location
of all 99 query templates in numerical order.
Comment: RNGSEED guarantees that the query substitution parameter values are not known prior to running
the power and throughput tests. Called with a value of <S> for the STREAMS parameter, dsqgen generates S+1
files, named query_0.sql through query_[S].sql. Each file contains a different permutation of the 99 queries
4.3.2 Query_0.sql is the sequence of queries to be executed during the Power Test; files query_1.sql through
query_[Sq].sql are the sequences of queries to be executed during the first Throughput Test; and files
query_[Sq+1].sql through query_[2*Sq].sql are the sequences of queries to be executed during the second
Throughput Test.
We need to add this seed option to our gen script.
I would like to be able to use nds_validate.py to compare two runs that used different output formats (e.g., parquet vs orc).
Currently there is only one --input_format
parameter. Ideally, I would not have to specify this at all, and the tool would figure out what format the data files are. But in the absence of that, maybe provide an optional way to specify different formats for input1 and intput2.
Data Maintenance is detailed explained in TPC-DS Specification at Section 5. To simplify, the following 3 steps are needed for it:
tests
folder in DSGen-software-code-3.2.0rc1
, to update table data.
nds_power.py
to read SQL stream files) to read these new SQL files to update the existing table data.$TPCDS_HOME/tools/tpcds_source.sql
Step 2. is called "Data Maintenance". the SQL files are "lf_*.sql" and "dm_*.sql". Note, we need to make them Spark-compatible.
Step 1+2+3 is called Refresh Run.
Problem:
Some files like nds_transocde.py, nds_bench.py are too big to read.
Expectation:
We should move out some parts in these big files to new files.
Like get_schemas in nds_transcode.py, we can create a new file to only define schema related, and it's also referred by multiple python scripts.
In nds_bench.py, we can move out the functions which analyze the report file to some utils file, so that only keep the step functions and main process, it should be clear to read.
Currently, Power Run is based on TempView created upon parquet data files. The setup_tables step should be skipped if Power Run is based on Iceberg.
CLI to generate iceberg refreshed datasets:
./spark-submit-template convert_submit_gpu.template $EXTRA_CONFS \
nds_transcode.py \
$PWD/raw_refresh_sf1 \
$PWD/parquet_refresh_sf1 \
report.txt \
--output_format parquet \
--output_mode overwrite \
--update
CLI to run Iceberg datasets maintenance:
./spark-submit-template convert_submit_cpu_iceberg.template nds_maintenance.py $PWD/parquet_refresh_sf1 ./data_maintenance time.csv
Error Log:
took 0.033919 s
Traceback (most recent call last):
File "/spark-rapids-benchmarks/nds/nds_maintenance.py", line 234, in <module>
valid_queries)
File "/spark-rapids-benchmarks/nds/nds_maintenance.py", line 125, in get_maintenance_queries
q_content = [ c.decode('utf-8').strip() + ';' for c in f.read().split(';')[1:-1]]
File "/usr/lib/python3.6/encodings/ascii.py", line 26, in decode
return codecs.ascii_decode(input, self.errors)[0]
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe2 in position 985: ordinal not in range(128)
22/08/17 07:07:19 INFO SparkContext: Invoking stop() from shutdown hook
nds_gen_data.py enables data generation locally and on HDFS. Allow options to generate data on GCS, S3 and Azure (in that priority order).
In nds_power.py
, setup_tables
uses get_schemas
to provide all schemas. Incase the tpcds data is not generated using this tool , one of the folder dbgen_version
would not be present.
This causes an error unless the following code block is commented:
spark-rapids-benchmarks/nds/nds_transcode.py
Lines 66 to 71 in 952444f
It would be nice to avoid this constraint so that other datasources can be used as well.
When running NDS 2.0 on Dataproc, the Spark eventlog may have duplicate query IDs.
For example, the Spark eventlog may show as:
query66
query66
query23_part2
query23_part2
This is very confusing. I can reproduce each time on Dataproc.
After #44, we add some code to allow user to specify Iceberg warehouse path in command line argument directly instead of setting in template file for Power Run, we should also apply this action manner for transcode script.
When following the steps to generate data on HDFS in EMR, the data gen script fails with this exception:
[hadoop@ip-172-31-17-43 nds]$ python3 nds_gen_data.py hdfs 3000 128 vr/raw_sf3000 --overwrite_output Exception in thread "main" java.lang.IllegalAccessError: class org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:756) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:473) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3278) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3323) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3362) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3413) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3381) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:486) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:234) at org.notmysock.tpcds.GenTable.genInput(GenTable.java:192) at org.notmysock.tpcds.GenTable.run(GenTable.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76) at org.notmysock.tpcds.GenTable.main(GenTable.java:45) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.RunJar.run(RunJar.java:323) at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
Steps to reproduce:
The README.md in the top level directory refers to the 22.02 jar, it should be updated to the latest version of the jar.
The "usage", "positional arguments" and "optional arguments" for nds_validate.py look like they need to be formatted.
cleaned due to policy.
The nds_transcode.py script is now only responsible for converting CSV raw data to other type of data source like Parquet or Orc. To implement Data Maintenance(#4) test, it's required to setup Iceberg base tables before we do "maintenance" including INSERT, UPDATE, DELETE operations.
According to Iceberg document, we can do something like
df = # raw data frame
df.writeTo("local.db.sample_table").create()
Note: this requires DataSourceV2 API.
After #77, refresh data should be read as independent files, but our code to get delete and inventory_delete table data are still based on Iceberg, we need to change the code to deal with new read manner.
error messages:
Traceback (most recent call last):
File "/usr/workspace/deploy/temp/spark-rapids-benchmarks/nds/nds_maintenance.py", line 207, in <module>
query_dict = get_maintenance_queries(args.maintenance_queries_folder,
File "/usr/workspace/deploy/temp/spark-rapids-benchmarks/nds/nds_maintenance.py", line 98, in get_maintenance_queries
delete_date_dict = get_delete_date(spark)
File "/usr/workspace/deploy/temp/spark-rapids-benchmarks/nds/nds_maintenance.py", line 65, in get_delete_date
delete_dates = spark_session.sql("select * from delete").collect()
File "/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, in sql
File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
File "/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.AnalysisException: Table or view not found: delete; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [delete], [], false
When running power run with nds_power.py
, the following error is thrown:
Traceback (most recent call last):
File "/opt/sparkRapidsPlugin/spark-raplab-cicd/jenkins-nds2-test-raplab-70/nds_power.py", line 288, in <module>
args.output_format)
File "/opt/sparkRapidsPlugin/spark-raplab-cicd/jenkins-nds2-test-raplab-70/nds_power.py", line 201, in run_query_stream
execution_time_list)
File "/opt/sparkRapidsPlugin/spark-raplab-cicd/jenkins-nds2-test-raplab-70/nds_power.py", line 99, in setup_tables
spark_session.read.format(input_format).schema(get_schemas(use_decimal)['table_name']).load(
KeyError: 'table_name'
Current TaskFailureListener is all implemented at Python side. This has caused a lot of expected issues like #37. and errors like
22/06/28 08:09:58 ERROR org.apache.spark.scheduler.AsyncEventQueue: Dropping event from queue shared. This likely means one of the listeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
We need to implement this listener at JVM side and only let it listen to specific event "TaskEnd" to reduce traffic between JVM and Python when transferring data via py4j gateway.
When running python3 nds_gen_data.py hdfs 10 100 /user/sraheja/raw_sf1 --overwrite_output
on an HDFS cluster I got the following error message at the end.
mv: `/user/sraheja/raw_sf1/delete_1.dat-m-00000': No such file or directory
Traceback (most recent call last):
File "/home/sraheja/dev/spark-rapids-benchmarks/nds/nds_gen_data.py", line 282, in <module>
generate_data(args)
File "/home/sraheja/dev/spark-rapids-benchmarks/nds/nds_gen_data.py", line 246, in generate_data
generate_data_hdfs(args, jar_path)
File "/home/sraheja/dev/spark-rapids-benchmarks/nds/nds_gen_data.py", line 172, in generate_data_hdfs
move_delete_date_tables(args.data_dir)
File "/home/sraheja/dev/spark-rapids-benchmarks/nds/nds_gen_data.py", line 124, in move_delete_date_tables
subprocess.run(move, check=True)
File "/home/sraheja/.pyenv/versions/3.10.4/lib/python3.10/subprocess.py", line 524, in run
raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['hadoop', 'fs', '-mv', '/user/sraheja/raw_sf1/delete_1.dat-m-00000', '/user/sraheja/raw_sf1/delete/']' returned non-zero exit status 1.
The following files were written to HDFS, and the subdirectories appear to have data
$ hdfs dfs -ls /user/sraheja/raw_sf1
Found 27 items
-rw-r--r-- 3 sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/_SUCCESS
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/call_center
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/catalog_page
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/catalog_returns
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/catalog_sales
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/customer
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/customer_address
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/customer_demographics
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/date_dim
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/dbgen_version
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/delete
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/household_demographics
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/income_band
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/inventory
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/item
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/promotion
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/reason
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/ship_mode
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/store
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/store_returns
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/store_sales
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/time_dim
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/warehouse
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/web_page
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/web_returns
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/web_sales
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/web_site
cc: @anchorbob
The benchmarking framework is now using a JVM listener to track the status of query execution. This issue is to track the missing doc and build process.
Currently in result.csv, it shows "application_id,query,time/s" as the header.
However actually the query time is in milli-seconds, while the last 2 lines "Power Test Time" and "Total Time" are in seconds.
I wish we use milli-seconds for all rows in the result.csv so that there is no confusion.
Both CPU and GPU runs for query65 are producing indeterministic results, more details in #7 (comment). We need to track this and think about how to fix it.
When generating update data, 2 tables are missed to create a folder: delete
and inventory_delete
.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.