np.sort failed for GPU.
In [10]: %time print(np.sort(np.random.rand(100_000_000, gpu=True)))
0%| | 0.00/100 [00:01<?, ?it/s]2023-01-30 09:27:22,974 xorbits._mars.services.scheduling.worker.execution 2867073 ERROR Failed to run subtask FvEe9WQxwQ3wEr5zzgnGcQe8 on band gpu-0
Traceback (most recent call last):
File "/home/xuyeqin/projects/xorbits/python/xorbits/_mars/services/subtask/worker/processor.py", line 203, in _execute_operand
return execute(ctx, op)
File "/home/xuyeqin/projects/xorbits/python/xorbits/_mars/core/operand/core.py", line 491, in execute
result = executor(results, op)
File "/home/xuyeqin/projects/xorbits/python/xorbits/_mars/tensor/base/psrs.py", line 525, in execute
res = ctx[op.outputs[0].key] = _sort(a, op, xp)
File "/home/xuyeqin/projects/xorbits/python/xorbits/_mars/tensor/base/psrs.py", line 425, in _sort
assert xp is cp
AssertionError
2023-01-30 09:27:22,976 xorbits._mars.services.task.execution.mars.stage 2867073 ERROR Subtask FvEe9WQxwQ3wEr5zzgnGcQe8 errored
Traceback (most recent call last):
File "/home/xuyeqin/projects/xorbits/python/xorbits/_mars/services/subtask/worker/processor.py", line 203, in _execute_operand
return execute(ctx, op)
File "/home/xuyeqin/projects/xorbits/python/xorbits/_mars/core/operand/core.py", line 491, in execute
result = executor(results, op)
File "/home/xuyeqin/projects/xorbits/python/xorbits/_mars/tensor/base/psrs.py", line 525, in execute
res = ctx[op.outputs[0].key] = _sort(a, op, xp)
File "/home/xuyeqin/projects/xorbits/python/xorbits/_mars/tensor/base/psrs.py", line 425, in _sort
assert xp is cp
AssertionError
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100.00/100 [00:01<00:00, 73.21it/s]
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
File <timed eval>:1
File ~/projects/xorbits/python/xorbits/utils.py:33, in safe_repr_str.<locals>.inn(self, *args, **kwargs)
31 return getattr(object, f.__name__)(self)
32 else:
---> 33 return f(self, *args, **kwargs)
File ~/projects/xorbits/python/xorbits/core/data.py:223, in DataRef.__str__(self)
221 return self.data._mars_entity.op.data.__str__()
222 else:
--> 223 run(self)
224 return self.data.__str__()
File ~/projects/xorbits/python/xorbits/core/execution.py:42, in run(obj)
40 if isinstance(obj, DataRef):
41 if need_to_execute(obj):
---> 42 mars_execute(_get_mars_entity(obj))
43 else:
44 refs_to_execute = [_get_mars_entity(ref) for ref in obj if need_to_execute(ref)]
File ~/projects/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1875, in execute(tileable, session, wait, new_session_kwargs, show_progress, progress_update_interval, *tileables, **kwargs)
1873 session = get_default_or_create(**(new_session_kwargs or dict()))
1874 session = _ensure_sync(session)
-> 1875 return session.execute(
1876 tileable,
1877 *tileables,
1878 wait=wait,
1879 show_progress=show_progress,
1880 progress_update_interval=progress_update_interval,
1881 **kwargs,
1882 )
File ~/projects/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1669, in SyncSession.execute(self, tileable, show_progress, warn_duplicated_execution, *tileables, **kwargs)
1667 fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
1668 try:
-> 1669 execution_info: ExecutionInfo = fut.result(
1670 timeout=self._isolated_session.timeout
1671 )
1672 except KeyboardInterrupt: # pragma: no cover
1673 logger.warning("Cancelling running task")
File ~/miniconda3/envs/mars/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
444 raise CancelledError()
445 elif self._state == FINISHED:
--> 446 return self.__get_result()
447 else:
448 raise TimeoutError()
File ~/miniconda3/envs/mars/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
389 if self._exception:
390 try:
--> 391 raise self._exception
392 finally:
393 # Break a reference cycle with the exception in self._exception
394 self = None
File ~/projects/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1855, in _execute(session, wait, show_progress, progress_update_interval, cancelled, *tileables, **kwargs)
1852 else:
1853 # set cancelled to avoid wait task leak
1854 cancelled.set()
-> 1855 await execution_info
1856 else:
1857 return execution_info
File ~/projects/xorbits/python/xorbits/_mars/deploy/oscar/session.py:106, in ExecutionInfo._ensure_future.<locals>.wait()
105 async def wait():
--> 106 return await self._aio_task
File ~/projects/xorbits/python/xorbits/_mars/deploy/oscar/session.py:954, in _IsolatedSession._run_in_background(self, tileables, task_id, progress, profiling)
948 logger.warning(
949 "Profile task %s execution result:\n%s",
950 task_id,
951 json.dumps(task_result.profiling, indent=4),
952 )
953 if task_result.error:
--> 954 raise task_result.error.with_traceback(task_result.traceback)
955 if cancelled:
956 return
File ~/projects/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:373, in TaskProcessor.run(self)
371 async with self._executor:
372 async for stage_args in self._iter_stage_chunk_graph():
--> 373 await self._process_stage_chunk_graph(*stage_args)
374 except Exception as ex:
375 self.result.error = ex
File ~/projects/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:250, in TaskProcessor._process_stage_chunk_graph(self, stage_id, stage_profiler, chunk_graph)
244 tile_context = await asyncio.to_thread(
245 self._get_stage_tile_context,
246 {c for c in chunk_graph.result_chunks if not isinstance(c.op, Fetch)},
247 )
249 with Timer() as timer:
--> 250 chunk_to_result = await self._executor.execute_subtask_graph(
251 stage_id, subtask_graph, chunk_graph, tile_context
252 )
253 stage_profiler.set("run", timer.duration)
255 self._preprocessor.post_chunk_graph_execution()
File ~/projects/xorbits/python/xorbits/_mars/services/task/execution/mars/executor.py:208, in MarsTaskExecutor.execute_subtask_graph(self, stage_id, subtask_graph, chunk_graph, tile_context, context)
206 curr_tile_progress = self._tile_context.get_all_progress() - prev_progress
207 self._stage_tile_progresses.append(curr_tile_progress)
--> 208 return await stage_processor.run()
File ~/projects/xorbits/python/xorbits/_mars/services/task/execution/mars/stage.py:231, in TaskStageProcessor.run(self)
227 if self.subtask_graph.num_shuffles() > 0:
228 # disable scale-in when shuffle is executing so that we can skip
229 # store shuffle meta in supervisor.
230 await self._scheduling_api.disable_autoscale_in()
--> 231 return await self._run()
232 finally:
233 if self.subtask_graph.num_shuffles() > 0:
File ~/projects/xorbits/python/xorbits/_mars/services/task/execution/mars/stage.py:251, in TaskStageProcessor._run(self)
249 if self.error_or_cancelled():
250 if self.result.error is not None:
--> 251 raise self.result.error.with_traceback(self.result.traceback)
252 else:
253 raise asyncio.CancelledError()
File ~/projects/xorbits/python/xorbits/_mars/services/subtask/worker/processor.py:203, in _execute_operand()
198 @enter_mode(build=False, kernel=True)
199 def _execute_operand(
200 self, ctx: Dict[str, Any], op: OperandType
201 ): # noqa: R0201 # pylint: disable=no-self-use
202 try:
--> 203 return execute(ctx, op)
204 except BaseException as ex:
205 # wrap exception in execution to avoid side effects
206 raise ExecutionError(ex).with_traceback(ex.__traceback__) from None
File ~/projects/xorbits/python/xorbits/_mars/core/operand/core.py:491, in execute()
487 else:
488 # Cast `UFuncTypeError` to `TypeError` since subclasses of the former is unpickleable.
489 # The `UFuncTypeError` was introduced by numpy#12593 since v1.17.0.
490 try:
--> 491 result = executor(results, op)
492 succeeded = True
493 if op.stage is not None:
File ~/projects/xorbits/python/xorbits/_mars/tensor/base/psrs.py:525, in execute()
522 if not op.return_indices:
523 if op.kind is not None:
524 # sort
--> 525 res = ctx[op.outputs[0].key] = _sort(a, op, xp)
526 else:
527 # do not sort, prepare for sample by `xp.partition`
528 kth = xp.linspace(
529 max(w - 1, 0), a.shape[op.axis] - 1, num=n, endpoint=False
530 ).astype(int)
File ~/projects/xorbits/python/xorbits/_mars/tensor/base/psrs.py:425, in _sort()
422 return method(axis=axis, kind=kind, order=order)
423 else: # pragma: no cover
424 # cupy does not support structure type
--> 425 assert xp is cp
426 assert order is not None
427 method = a.sort if inplace else partial(cp.sort, a)
AssertionError: