Giter Club home page Giter Club logo

knit's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

knit's Issues

JVM fails to report back for custom env

I pass an env with the path to my current environment to DaskYarnCluster and I can see the zip file being built and uploaded but then I get an error (below) saying that the JVM fails to report back. Passing the channel conda-forge starts up the environment correctly, but fails to load a parquet file from hdfs because it lacks hdfs3.

My environment should be a superset of the other environment with the same versions of dask and distributed both from the conda-forge channel.

Exception                                 Traceback (most recent call last)
<ipython-input-12-267836867704> in <module>()
     13                                 'rm_port': resource_manager_port})
     14 client = Client(cluster)
---> 15 cluster.start(2, cpus=1, memory=500)
     16 
     17 #future = client.submit(lambda x: x + 1, 10)

/nas/isg_prodops_work/jlord/conda/envs/dask/lib/python3.6/site-packages/knit-0.2.2-py3.6.egg/knit/dask_yarn.py in start(self, n_workers, cpus, memory, checks, **kwargs)
    127         app_id = self.knit.start(command, env=self.env,
    128                                  num_containers=n_workers, virtual_cores=cpus,
--> 129                                  memory=memory, checks=checks, **kwargs)
    130         self.app_id = app_id
    131         return app_id

/nas/isg_prodops_work/jlord/conda/envs/dask/lib/python3.6/site-packages/knit-0.2.2-py3.6.egg/knit/core.py in start(self, cmd, num_containers, virtual_cores, memory, env, files, app_name, queue, checks)
    294  - that the cluster is otherwise unhealthy - check the RM and NN logs
    295    (use k.yarn_api.system_logs() to find these on a one-node system
--> 296 """)
    297         master_rpchost = self.client.masterRPCHost()
    298 

Exception: The application master JVM process failed to report back. This can mean:
 - that the YARN cluster cannot scheduler adequate resources - check
   k.yarn_api.cluster_metrics() and other diagnostic methods;
 - that the ApplicationMaster crashed - check the application logs, k.logs();
 - that the cluster is otherwise unhealthy - check the RM and NN logs 
   (use k.yarn_api.system_logs() to find these on a one-node system

Worker restarted until killed

I believe the key error is here and it appears that everything is fine even though the distributed/worker.py module thinks the response is unexpected.

"/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/distributed/worker.py", line 248, in _register_with_scheduler
    raise ValueError("Unexpected response from register: %r" % (resp,))
ValueError: Unexpected response from register: {'status': 'OK', 'time': 1507305239.205065}
distributed.nanny - WARNING - Restarting worker

Below is more of the container log. It continues restarting until it is killed. The final error when it is killed is at the bottom.

Container: container_e110_1506861552726_19299_01_000002 on hostname.allstate.com_8041
======================================================================================
LogType:stderr
Log Upload Time:Fri Oct 06 10:54:03 -0500 2017
LogLength:22808
Log Contents:
/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/distributed/config.py:55: UserWarning: Could not write default config file to '/home/.dask/config.yaml'. Received error [Errno 13] Permission denied: '/home/.dask'
  UserWarning)
distributed.nanny - INFO -         Start Nanny at: 'tcp://10.195.102.32:45126'
/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/distributed/config.py:55: UserWarning: Could not write default config file to '/home/.dask/config.yaml'. Received error [Errno 13] Permission denied: '/home/.dask'
  UserWarning)
distributed.worker - INFO -       Start worker at:  tcp://10.195.102.32:37045
distributed.worker - INFO -          Listening to:  tcp://10.195.102.32:37045
distributed.worker - INFO -              nanny at:        10.195.102.32:45126
distributed.worker - INFO -               http at:        10.195.102.32:32817
distributed.worker - INFO -              bokeh at:         10.195.102.32:8789
distributed.worker - INFO - Waiting to connect to: tcp://10.195.208.190:40025
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                    0.50 GB
distributed.worker - INFO -       Local Directory:            worker-nfomhqoz
distributed.worker - INFO - -------------------------------------------------
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/distributed/nanny.py", line 467, in run
    yield worker._start(*worker_start_args)
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/distributed/worker.py", line 319, in _start
    yield self._register_with_scheduler()
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/distributed/worker.py", line 248, in _register_with_scheduler
    raise ValueError("Unexpected response from register: %r" % (resp,))
ValueError: Unexpected response from register: {'status': 'OK', 'time': 1507305239.205065}
distributed.nanny - WARNING - Restarting worker

Final error:

 tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f96cdcfb510>, <tornado.concurrent.Future object at 0x7f96ce9c5978>)
Traceback (most recent call last):
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/ioloop.py", line 605, in _run_callback
    ret = callback()
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/ioloop.py", line 626, in _discard_future_result
    future.result()
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/distributed/nanny.py", line 138, in _start
    response = yield self.instantiate()
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/distributed/nanny.py", line 205, in instantiate
    yield self.process.start()
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/distributed/nanny.py", line 311, in start
    yield self._wait_until_running()
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/hadoop02/yarn/nm/usercache/jlord/appcache/application_1506861552726_19299/container_e110_1506861552726_19299_01_000002/PYTHON_DIR/dask-35d2a1ee201208ae9fca6905fa88ea9e54557b58/lib/python3.6/site-packages/distributed/nanny.py", line 397, in _wait_until_running
    raise ValueError("Worker not started")
ValueError: Worker not started

Auto Upload Jar

rambling should be capable of bootstrapping itself similar to spark. For example, spark apps build a directory like: /user/ubuntu/.sparkStaging/application_1452033885404_0005 with the following contents:

  • __spark_conf__3428001626499023137.zip
  • py4j-0.8.2.1-src.zip
  • pyspark.zip
  • spark-assembly-1.5.2-hadoop2.6.0.jar

ApplicationMaster out of memory?

I can't seem to start a cluster on elastic mapreduce from the head node - I think the ApplicationMaster is consuming too much vmem and being killed before the cluster can start.

Example:

In [2]: knit.__version__
Out[2]: '0.2.3'
In [3]: cluster = DaskYARNCluster(packages=['python=3', 'numpy', 'pandas', 'dask', 'distributed'])
In [4]: cluster.start()

Code thinks everything is working, but app logs suggest otherwise:

Application application_1509543030224_0006 failed 2 times due to AM Container for appattempt_1509543030224_0006_000002 exited with exitCode: -103
For more detailed output, check application tracking page:http://localhost:8088/cluster/app/application_1509543030224_0006Then, click on links to logs of each attempt.
Diagnostics: Container [pid=11934,containerID=container_1509543030224_0006_02_000001] is running beyond virtual memory limits. Current usage: 178.7 MB of 320 MB physical memory used; 2.0 GB of 1.6 GB virtual memory used. Killing container.
Dump of the process-tree for container_1509543030224_0006_02_000001 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 11934 11933 11934 11934 (bash) 0 0 115822592 678 /bin/bash -c /usr/lib/jvm/java-openjdk/bin/java -Xmx256M io.continuum.knit.ApplicationMaster 1>/var/log/hadoop-yarn/containers/application_1509543030224_0006/container_1509543030224_0006_02_000001/stdout 2>/var/log/hadoop-yarn/containers/application_1509543030224_0006/container_1509543030224_0006_02_000001/stderr
|- 11938 11934 11934 11934 (java) 464 26 2083704832 45057 /usr/lib/jvm/java-openjdk/bin/java -Xmx256M io.continuum.knit.ApplicationMaster
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Failing this attempt. Failing the application.

I think the relevant default params for EMR are here:

<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>32</value>
<source>java.io.BufferedInputStream@79e2c065</source>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>true</value>
<source>yarn-default.xml</source>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>5</value>
<source>java.io.BufferedInputStream@79e2c065</source>
</property>

And in Client.scala, the request for 300 mb memory producing - I assume - the 300 * 5 ~= 1.6gb vmem limit before killing the container.

I'm not sure if it's just python libs using up so much vmem, or something else...? I would suspect this isn't usually an issue since the default for yarn.scheduler.minimum-allocation-mb is 1024 and the request for 300mb is ignored, but in EMR this is lower for hbase I guess.

Thoughts?

Shorten method names?

Thoughts on removing _application_ from method names?

  • start_application -> start
  • get_application_logs -> get_logs
  • get_application_status -> get_status
  • kill_application -> kill

?

Knit on YARN... error with continuous integration readme ?

Following the continuous integration README

Everything went fine until I try to test knit on YARN... below the error I get

bash-4.1# hadoop jar /knit/knit_jvm/target/knit-1.0-SNAPSHOT.jar io.continuum.knit.Client hdfs://localhost:9000/jars/knit-1.0-SNAPSHOT.jar 1 "python -c 'import sys; print(sys.path); import random; print>
Error: Unknown argument 'hdfs://localhost:9000/jars/knit-1.0-SNAPSHOT.jar'
Error: Unknown argument '1'
Error: Unknown argument 'python -c 'import sys; print(sys.path); import random; print(str(random.random()))''

Documentation comments

Some comments on the current knit documentation (thanks @jreback!)

  • use conda install knit -c blaze (or whatever it should be)
  • show a full usage of .start rather than just saying it has other parameters
  • ideally show a response from the start command to see what it looks like (like you do for .status)
  • quickstart is not showing on the left hand menu
  • split the quickstart into 2 subsections and commands and python
  • usage: explain the purpose of the Python section (e.g. what does this do)
  • is there a command to essentially do conda list/conda info remotely? (e.g. after you * create the env, then maybe show the output of this (maybe as an enhancement add actual commands for this, e.g. knit.list_env()

Resource Respect

Launching a python process in a yarn container can effectively breakout of the requirements set by AMRM. Python processess should properly respect cores, mempry, etc. constraints

Submitter

ApplicationMaster needs access to who submitted the job otherwise AM will look for hdfs://.../user/yarn/.knitDeps

hdfs_home no longer respected

#101 broke support for alternate hdfs_home locations. When starting an application with alternative hdfs_home, the file is uploaded to the correct location, but the download address is incorrect.

knit = knit.Knit(hdfs_home='/tmp/knit')
knit.start('ls', files=['hdfs://path/to/my/file.zip'])
# Container starts, but fails to download file

In the logs I see java exceptions like:

java.io.FileNotFoundException: File does not exist: hdfs://HOSTNAME:8020/user/yarn/.knitDeps/file.zip

Note that while HOSTNAME is a stand-in for the actual address, the rest of it is the actual error. It looks like the code is pulling the basename the passed in path off the address, and using the relative path .knitDeps/basename for all files.

Almost certainly related to #104.

Split Commands

Currently the shell command is only executed in non-AM containers. We should:

  1. Allow commands to be executed on the AM
  2. Allow for a different command to be executed on the AM compared to other containers

Container naming assumptions in ``yarn_api`` seem incorrect

According to ContainerId API docs, its string representation is constructed according to the following format:

[...] container_e*epoch*_*clusterTimestamp*_*appId*_*attemptId*_*containerId* when epoch is larger than 0 (e.g. container_e17_1410901177871_0001_01_000005). epoch is increased when RM restarts or fails over. When epoch is 0, epoch is omitted (e.g. container_1410901177871_0001_01_000005).

# container_1452274436693_0001_01_000001

It seems that the current version of the code ignores the fact that epoch can be non-zero and therefore could return an empty list of containers, even though there're running containers for a given application.

add timeouts to requests calls

Network setups can sometimes leave requests.get and such to hang indefinitely. There should always be timeouts; and methods like kill() should decide what to do in the case that, for example, the application status is not available because of the timeout.

Dask-yarn command line interface

I would like to have access to a dask-yarn CLI

$ dask-yarn --nworkers 10 --threads-per-worker 4 --env "python=3 numpy pandas dask distributed"

Where is the right place for this command line interface to exist? The dask/distributed repository? dask/knit (here)? dask/dask-yarn (a new project)?

Option to set HDFS working directory

The path reported by fs.getHomeDirectory() is not necessarily writable for the user or even extant. Should provide the option to set a custom location for staging (e.g., in /tmp/...). This may also get around permission problems.

Knit applications fail if files are uploaded in new release.

Using knit at commit 6c2550c, the following succeeds:

knit = Knit(hdfs_home='/tmp/knit')
knit.start('env', env='myenv.zip')

Using the new knit 0.2.3 release, the following (should be equivalent) starts the application, but the application fails.

knit = Knit(hdfs_home='/tmp/knit')
knit.start('env', files=['myenv.zip'])

Note that this succeeds if I omit the files kwarg. Also note that the files are uploaded to the proper location on hdfs, and the log lines reference the proper hdfs locations.

For debugging the logs seem to be unhelpful here, I'm not sure if this is just how yarn is or if there is something knit could do to be better for debugging. There are no java tracebacks, the application just fails.

The few lines I found that may help with debugging:

  • directory.info ends without ever listing the uploaded files. Note that in both the working version and this version the broken symlinks line exists.
ls -l:
total 20
-rw-------. 1 yarn hadoop  166 Oct 30 12:13 container_tokens
-rwx------. 1 yarn hadoop  704 Oct 30 12:13 default_container_executor_session.sh
-rwx------. 1 yarn hadoop  758 Oct 30 12:13 default_container_executor.sh
lrwxrwxrwx. 1 yarn hadoop   66 Oct 30 12:13 knit.jar -> /hdfs/disk03/hadoop/yarn/local/filecache/166/knit-1.0-SNAPSHOT.jar
-rwx------. 1 yarn hadoop 4901 Oct 30 12:13 launch_container.sh
drwx--x---. 2 yarn hadoop   10 Oct 30 12:13 tmp
find -L . -maxdepth 5 -ls:
8597569075    4 drwx--x---   3 yarn     hadoop       4096 Oct 30 12:13 .
14182360386    0 drwx--x---   2 yarn     hadoop         10 Oct 30 12:13 ./tmp
8597569076    4 -rw-------   1 yarn     hadoop        166 Oct 30 12:13 ./container_tokens
8597569077    4 -rw-------   1 yarn     hadoop         12 Oct 30 12:13 ./.container_tokens.crc
8599231880    8 -rwx------   1 yarn     hadoop       4901 Oct 30 12:13 ./launch_container.sh
8599231881    4 -rw-------   1 yarn     hadoop         48 Oct 30 12:13 ./.launch_container.sh.crc
8599231882    4 -rwx------   1 yarn     hadoop        704 Oct 30 12:13 ./default_container_executor_session.sh
8599231883    4 -rw-------   1 yarn     hadoop         16 Oct 30 12:13 ./.default_container_executor_session.sh.crc
8599231888    4 -rwx------   1 yarn     hadoop        758 Oct 30 12:13 ./default_container_executor.sh
8599231889    4 -rw-------   1 yarn     hadoop         16 Oct 30 12:13 ./.default_container_executor.sh.crc
12884939857 25320 -r-xr-xr-x   1 yarn     hadoop   25925992 Oct 30 12:13 ./knit.jar
broken symlinks(find -L . -maxdepth 5 -type l -ls):

End of LogType:directory.info
  • The Container completed ContainerStatus: [...] line indicates exit code -1000. Googling this code doesn't turn up any results, so I'm not sure what it means.

I'm not sure what else I can do to help debug here. For now I'm relying on a custom build of an older commit.

No upload option for env

I have been mulling over writing a PR to allow users to set environment variables (CONDA_PREFIX and PYTHON_BIN) and not upload the environment at all. This seems like it wouldn't be too difficult based on what I have already observed with knit, but I wanted to see if this is possible and discuss API choices if it is.

Custom MINICONDA_URL

For sake of security, machines are not connected to the internet.
So when the CondaCreator class (see env.py) is called it cannot access the internet and install miniconda.
Would be possible to test if some ENV variable (say MINICONDA_URL) exists and if not continue the way it is being done so far?

bypass java

The YARN REST API appears complete enough that, if we can use hdfs3 to manage file-based resources (i.e., a minimal python) and create tokens, the client and AM could both be completely python, but still be able to successfully manage delegation tokens (including their renewal)

Dask - Yarn with jupyter notebook as client

Context:

  • cluster where YARN jobs should be run
  • jupyter notebook on a different machine (not on the cluster)

I would like from my jupyter notebook to start up a dask scheduler on the cluster namenode, and then add some dask workers in YARN containers.
Working only on the cluster (no jupyter notebook), it works just fine. However I'd like to drive this from on a remote machine. The machine with jupyter notebook could read the yarn-site.xml from the cluster and run remotely the "hadoop jar" command to spawn up the dask workers, and eventually the "yarn application -kill " when the job is done.
It seems that in "knit" all this commands are supposed to be installed on the remote machine which is a bit restrictive.

What would you suggest?
Thx... and happy 2017 ;)

config parsing for HA

Parsing of config files can fail - this should never cause an error, should use default parameters (which may not work).
If providing pars, there should be no attempt to replace them with config file ones.
Specifically for HA setups, the port should not be set, as there may be several for the various resource/name nodes,

Dynamic Containers

Add functionality and interface to dynamically increase/decrease containers. It is especially useful and standard for YARN applications to monitor containers, and on failure, request a new container

Specify conda environment via hdfs path

Currently if you specify an environment it must exist locally as a .zip file, and is always stored in hdfs by knit in a .knitDeps repo. If a file of the same name/path already exists on hdfs, has the same size, and the timestamp is after the local file, then uploading of the environment is skipped.

This is problematic for us for a few reasons:

  • Managing local zipped conda environments is a delicate process
  • We'd like to store them in our own central location, with permissions managed across a few users
  • We'd like the code not to need a local build of the environment.
  • We'd like to manage environments separate from managing deployments. These are related concepts, but tying them together in that way is limiting.

As such, it would be nice if env could also be specied as an hdfs path. This could be detected by looking for a 'hdfs://' prefix on the specified path.

Use Py4j to communicate between Python and Client.scala

Py4j is used in PySpark to communicate between the python process and the yarn client. I think a similar setup would provide a lot of benefits for knit.

My suggestion is to remove the argument parsing, and instead start a gateway server here:
https://github.com/dask/knit/blob/master/knit_jvm/src/main/scala/io/continuum/knit/Client.scala#L35

This would be a combination of:
https://github.com/apache/spark/blob/d83c2f9f0b08d6d5d369d9fae04cdb15448e7f0d/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala
and
https://www.py4j.org/

Basically registering the Client object, as the entry point. And thereby, allowing python to simply call a method in Client to start the AM/set the number of cores etc.

Similar to PySpark we'll need a socket to communicate the port of the gateway server over. Which should be similar to https://github.com/apache/spark/blob/d83c2f9f0b08d6d5d369d9fae04cdb15448e7f0d/python/pyspark/java_gateway.py

After this is done, we can start thinking of implementing #40, as we'll be able to communicate to the yarn client/rm ~directly.

@quasiben what do you think?

CondaCreator on a different platform

This might as well be a conda-specific question, but I'll give it a try anyway. Is there a way to use CondaCreator to assemble an environment for a platform other than the host? e.g. build a linux-64 environment on osx-64.

language settings for containers

Click sometimes is not OK with C.UTF-8; anecdotally, en_US.utf-8 can work where the formal doesn't. Should make this a configuration parameter and try to make the default be based on the setting in the host environment.

Problems specifying resource manager and running with CDH

I'm running knit alongside a CDH 5.5 cluster with the default YARN setup. When I run with knit, I get an error when running with the default settings:

>>> from knit import Knit
>>> k = Knit(autodetect=True)
>>> cmd = 'date'
>>> app_id = k.start(cmd)
>>> k.status(app_id)

...

ConnectionError: HTTPConnectionPool(host='localhost', port=8088): Max retries exceeded with url: 
/ws/v1/cluster/apps/ (Caused by 
NewConnectionError('<requests.packages.urllib3.connection.HTTPConnection object at 
0x7f66c93b3160>: Failed to establish a new connection: [Errno 111] Connection refused',))

But I'm unable to configure the resourcemanager, even though it's falling back to the localhost config option:

>>> k = Knit(rm='ip-172-31-24-75.ec2.internal')

...

HDFSConfigException: Possible Resource Manager hostname mismatch.  Detected localhost

So, a couple of issues:

  1. I should be able to override the autodetect settings even if they don't match config files or the fallback option.

  2. How can we make this work with CDH 5.5 out of the box, since I don't see any *HADOOP* env variables defined on my CDH node. Should I have to define this myself, or can we use another signal or search for the yarn-site.xml file?

Rename rambling

rambling provides (or will provide):

  • Distributed shell functionality
  • Log fetching for remote YARN jobs
  • Resource allocation of containers (e.g., CPU, memory)
  • Deploy conda environments via YARN job submission
  • Container restart/dynamic allocation

Names of other YARN-related projects:

  • py-yarn
  • yarn-api-client
  • Llama
  • Kitten
  • Apache Twill
  • Apache Slider

With an hdfs instance, addresses already on hdfs cause python exception before start

Looks like check_needs_upload doesn't properly handle hdfs:// paths.

import hdfs3, knit
hdfs = hdfs3.HDFileSystem()
knit = knit.Knit(hdfs=hdfs, hdfs_home='/tmp/knit')
knit.start('ls', files=['hdfs://tmp/knit/.knitDeps/py36.zip'], checks=False)
---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
<ipython-input-4-93a64cfab03e> in <module>()
----> 1 k.start('ls', files=['hdfs://tmp/knit/.knitDeps/py36.zip'], checks=False)
/.../.conda/py36/lib/python3.6/site-packages/knit/core.py in start(self, cmd, num_containers, virtual_cores, memory, files, envvars, app_name, queue, checks)
    318         self.client_gateway = gateway
    319         files = [(f if self.check_needs_upload(f) else ('hdfs://' + f))
--> 320                  for f in files]
    321         jfiles = ListConverter().convert(files, gateway._gateway_client)
    322         jenv = MapConverter().convert(envvars, gateway._gateway_client)
/.../.conda/py36/lib/python3.6/site-packages/knit/core.py in <listcomp>(.0)
    318         self.client_gateway = gateway
    319         files = [(f if self.check_needs_upload(f) else ('hdfs://' + f))
--> 320                  for f in files]
    321         jfiles = ListConverter().convert(files, gateway._gateway_client)
    322         jenv = MapConverter().convert(envvars, gateway._gateway_client)
/.../.conda/py36/lib/python3.6/site-packages/knit/core.py in check_needs_upload(self, path)
    588         fn = (self.hdfs_home + '/.knitDeps/' + os.path.basename(path))
    589         if self.hdfs and self.hdfs.exists(fn):
--> 590             st = os.stat(path)
    591             size = st.st_size
    592             t = st.st_mtime
FileNotFoundError: [Errno 2] No such file or directory: 'hdfs://tmp/knit/.knitDeps/py36.zip'

Log fetching

Currently users have to fetch logs with the yarn binary:

yarn logs <applicationId>

This could probably be nicer

Requirements For Build

Users may not need the full jdk installed but it does resolve build issues like the following:

[ERROR] Failed to execute goal on project knit: Could not resolve dependencies for project io.continuum:knit:jar:1.0-SNAPSHOT: Could not find
 artifact jdk.tools:jdk.tools:jar:1.7 at specified path /usr/lib/jvm/java-7-openjdk-amd64/jre/../lib/tools.jar ->

On Ubuntu:

aptitude install openjdk-7-jdk

Would be good to find the minimum amount of packages: openjdk-7-jre-lib and ant and something else?

File paths already on hdfs fail to download

In #101 support for passing in files already on hdfs was added. This doesn't seem to work.

knit = knit.Knit()
knit.start('ls', files=['hdfs://path/to/my/file.zip'])
# Container starts, but fails to download file

In the logs I see java tracebacks like:

java.io.FileNotFoundException: File does not exist: hdfs://HOSTNAME:8020/user/yarn/.knitDeps/file.zip

Note that while HOSTNAME is a stand-in for the actual address, the rest of it is the actual error. It looks like the code is pulling the basename the passed in path off the address, and using the relative path .knitDeps/basename for all files.

Resource Options

Allows users to defined Virtual Cores and Memory (defining number of containers is already implemented)

Uninformative errors on naive connection

I've started up Yarn (I think) on my local machine using @martindurant 's docker setup

$ docker run -p 8020:8020 -p 8088:8088 mdurant/hadoop
...
# localhost SSH-2.0-OpenSSH_6.6.1p1 Ubuntu-2ubuntu2.8
# localhost SSH-2.0-OpenSSH_6.6.1p1 Ubuntu-2ubuntu2.8
# 0.0.0.0 SSH-2.0-OpenSSH_6.6.1p1 Ubuntu-2ubuntu2.8
# 0.0.0.0 SSH-2.0-OpenSSH_6.6.1p1 Ubuntu-2ubuntu2.8
Starting namenodes on [0.0.0.0]
0.0.0.0: starting namenode, logging to /opt/hadoop/logs/hadoop-root-namenode-73187ceb633b.out
localhost: starting datanode, logging to /opt/hadoop/logs/hadoop-root-datanode-73187ceb633b.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /opt/hadoop/logs/hadoop-root-secondarynamenode-73187ceb633b.out
starting yarn daemons
starting resourcemanager, logging to /opt/hadoop/logs/yarn--resourcemanager-73187ceb633b.out
localhost: starting nodemanager, logging to /opt/hadoop/logs/yarn-root-nodemanager-73187ceb633b.out

Then on the same machine (but not in the docker container) I try to connect using knit.dask_yarn

In [1]: from knit.dask_yarn import DaskYARNCluster

In [2]: cluster = DaskYARNCluster?

In [3]: cluster = DaskYARNCluster()

In [4]: cluster
Out[4]: <knit.dask_yarn.DaskYARNCluster at 0x7fb9b5936e10>

In [5]: cluster.workers
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-5-7d7af4d87d9e> in <module>()
----> 1 cluster.workers

/home/mrocklin/workspace/knit/knit/dask_yarn.py in workers(self)
    150         # should not be remove or counted as a worker
    151 
--> 152         containers = self.knit.get_containers()
    153         containers.sort()
    154         self.application_master_container = containers.pop(0)

/home/mrocklin/workspace/knit/knit/core.py in get_containers(self)
    272 
    273         """
--> 274         return self.client.getContainers().split(',')
    275 
    276     def get_container_statuses(self):

AttributeError: 'NoneType' object has no attribute 'getContainers'

I probably did something wrong, but from this error I don't know what it is.

Container ENV settings

Allows users to set environment variables:

r. start_application(self, cmd, env = {'ENV_A': '/foo/bar', 'ENV_B': 2})

Reuse the same environment

If I create a Knit object with a conda environment file it goes through all the trouble of resolving dependencies, downloading packages, and zipping them up for me on the client-side.

When I create an application it sends the zipped environment to Yarn (presumably HDFS) and uses HDFS to scatter out that data to the different machines.

>>> appId = k.start(cmd, env='<full-path>/dev.zip')

If I then create a second application with the same environment do we suffer the same cost of moving the data from the client to HDFS a second time?

kerberos on rest endpoints

It is possible to secure http rest endpoints with kerberos. We could talk with these either by:

  • depend on requests-kerberos to do the right thing
  • grab a token from our client, which talks RPC (note that this might be easy to implement, but hurts our chances to implement more calls on http instead of relying on the client). Probably a hdfs token would do too...

troubleshooting

Establish a page in the docs giving pointers

Sources of information:

  • console feedback
  • view the application/container logs
  • view the system info
  • view the RM/NM logs, if possible

Specific issues

  • don't make a .zip of the conda root
  • newer java version if .zip > 2GB
  • same version/channels on client and workers
  • REST access to YARN
  • IP of scheduler
  • console language of workers for click
  • system constraints (e.g., disc < 90% full)
  • be sure to build .jar if running from source
  • investigate the configuration
  • install and use hdfs3

@quasiben @jcrist @quartox @mrocklin - please add to this list!

More robust testing

Several issues appeared after the merging of #101. It might be good to think about how to improve test coverage for this library. A few notes:

  • Looking at the travis logs, sometimes errors appear but the test still passes (perhaps unintentionally): https://travis-ci.org/dask/knit/jobs/291657224.
  • There are several possible code-paths that don't currently have tests (e.g. file names prefixed with hdfs://). It might be good to enumerate the possible cases and try to cover all/most of them.
  • test_hdfs_home is always skipped as hdfs3 isn't installed on travis. Is this intentional?

Also might be good to document how a novice developer (myself :)) might get a test environment setup locally.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.