Giter Club home page Giter Club logo

dask-kubernetes's Introduction

Dask

Build Status Coverage status Documentation Status Discuss Dask-related things and ask for help Version Status NumFOCUS

Dask is a flexible parallel computing library for analytics. See documentation for more information.

LICENSE

New BSD. See License File.

dask-kubernetes's People

Contributors

athornton avatar baswelsh avatar bolliger32 avatar bstadlbauer avatar dbalabka avatar dependabot[bot] avatar droctothorpe avatar fbergroth avatar graingert avatar ig248 avatar jacobtomlinson avatar jameslamb avatar jcrist avatar jmif avatar jo-migo avatar john-jam avatar jonded94 avatar jrbourbeau avatar ksaur avatar matt711 avatar mriduls avatar mrocklin avatar ogrisel avatar quasiben avatar samdyzon avatar scharlottej13 avatar skirui-source avatar tomaugspurger avatar yuhuishishishi avatar yuvipanda avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dask-kubernetes's Issues

cluster.adapt does not behave when nprocs is > 1

If the number of dask processes in the pod spec is different from 1, then the adapt method for the cluster will not behave well.

For eg, if you define a cluster with a spec that has nprocs=4, and then try to adapt from 1 to 16, then the scale up and down will be triggered indefinitely, since it either gets 4 or 0 workers.

Is there a good solution for this problem that I'm not grasping?

add_worker() got an unexpected keyword argument 'cpu'

Just testing dask-kubernetes with current versions of dask and distributed. I'm seeing some discrepancies in the add_worker method.

distributed.core - ERROR - add_worker() got an unexpected keyword argument 'cpu'
Traceback (most recent call last):
  File "/home/circleci/miniconda/envs/dask-kubernetes-test/lib/python3.6/site-packages/distributed/core.py", line 340, in handle_comm
    result = yield result
  File "/home/circleci/miniconda/envs/dask-kubernetes-test/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/circleci/miniconda/envs/dask-kubernetes-test/lib/python3.6/site-packages/tornado/gen.py", line 307, in wrapper
    result = func(*args, **kwargs)
TypeError: add_worker() got an unexpected keyword argument 'cpu'

https://circleci.com/gh/dask/dask-kubernetes/154?utm_campaign=vcs-integration-link&utm_medium=referral&utm_source=github-build-link

Help diagnosing local testing networking issues with minikube

I'm no longer able to get my local environment to run tests. Worker can connect to the scheduler in the local process, but the host process can't connect down to the workers running in minikube. Historically I think I've resolved this with ip route, however the instructions in the README are no longer doing me much good.

mrocklin@carbon:~/workspace/dask-kubernetes$ sudo ip route add 172.17.0.0/16 via $(minikube ip)
[sudo] password for mrocklin: 
RTNETLINK answers: File exists
mrocklin@carbon:~/workspace/dask-kubernetes$ sudo cat /etc/docker/daemon.json
{
   "bip": "172.19.1.1/16"
}

This is stopping me from making progress on this library.

cc @yuvipanda in case he has suggestions

AttributeError("'NoneType' object has no attribute 'get'")

I am running a Pangeo-like deployment on gke. I am attempting to use an adaptive cluster to spin up many workers to ping a url on a NASA server, download data to local store then save to a google store using gcsfs. I can use dask with the local cluster and it works wonderfully. When I try to use KubeCluster I get the error:

AttributeError("'NoneType' object has no attribute 'get'")

My worker and scheduler pass the check_versions=True test.

Is there anything obvious about that I am missing?

Pickling issues when using classes and object oriented Python

... or at least I suspect this is the problem. Forgive me if this is more of a Dask distributed issue and not one necessarily tied to dask-kubernetes, but as I'm running into this problem using the latter, I thought I'd post here.

At any rate, this is related to https://github.com/pangeo-data/storage-benchmarks for the Pangeo project. We're using Airspeed Velocity for this which is object oriented. I've set up the tests so that storage setup/teardown are a bunch of classes and the benchmarks themselves are another set.

For example, I have a synthetic write test that instantiates a Zarr storage object that runs a write test:

class IOWrite_Zarr():
    timeout = 300
    #number = 1
    warmup_time = 0.0
    params = (['POSIX', 'GCS', 'FUSE'])
    param_names = ['backend']

    def setup(self, backend):
        chunksize=(10, 100, 100)
        self.da = da.random.normal(10, 0.1, size=(100, 100, 100),
                                   chunks=chunksize)
        self.da_size = np.round(self.da.nbytes / 1024**2, 2)
        self.target = target_zarr.ZarrStore(backend=backend, dask=True,
                                            chunksize=chunksize, shape=self.da.shape,
                                            dtype=self.da.dtype)
        self.target.get_temp_filepath()

        if backend == 'GCS':
            gsutil_arg = "gs://%s" % self.target.gcs_zarr
            call(["gsutil", "-q", "-m", "rm","-r", gsutil_arg])

    def time_synthetic_write(self, backend):
        self.da.store(self.target.storage_obj)

    def teardown(self, backend):
        self.target.rm_objects()

When I put code anywhere in there to start up my dask pods,

from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('worker-spec.yml')
cluster.adapt()
from dask.distributed import Client
client = Client(cluster)

My benchmarks die a horrible death with pickle error messsages (error messages truncated for brevity):

                For parameters: 'GCS'
                Traceback (most recent call last):
                  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 38, in dumps
                    result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
                TypeError: can't pickle _thread.lock objects
...

                During handling of the above exception, another exception occurred:

                Traceback (most recent call last):
                  File "/home/jovyan/.local/lib/python3.6/site-packages/asv/benchmark.py", line 795, in <module>
                    commands[mode](args)
                  File "/home/jovyan/.local/lib/python3.6/site-packages/asv/benchmark.py", line 772, in main_run
                    result = benchmark.do_run()
                  File "/home/jovyan/.local/lib/python3.6/site-packages/asv/benchmark.py", line 456, in do_run
                    return self.run(*self._current_params)
                  File "/home/jovyan/.local/lib/python3.6/site-packages/asv/benchmark.py", line 548, in run
                    all_runs.extend(timer.repeat(repeat, number))
                  File "/opt/conda/lib/python3.6/timeit.py", line 206, in repeat
                    t = self.timeit(number)
                  File "/opt/conda/lib/python3.6/timeit.py", line 178, in timeit
                    timing = self.inner(it, self.timer)
                  File "<timeit-src>", line 6, in inner
                  File "/home/jovyan/.local/lib/python3.6/site-packages/asv/benchmark.py", line 512, in <lambda>
                    func = lambda: self.func(*param)
                  File "/home/jovyan/dev/storage-benchmarks-kai/benchmarks/IO_dask.py", line 57, in time_synthetic_write
                    self.da.store(self.target.storage_obj)
                  File "/opt/conda/lib/python3.6/site-packages/dask/array/core.py", line 1211, in store
                    r = store([self], [target], **kwargs)
                  File "/opt/conda/lib/python3.6/site-packages/dask/array/core.py", line 955, in store
                    result.compute(**kwargs)
                  File "/opt/conda/lib/python3.6/site-packages/dask/base.py", line 155, in compute
                    (result,) = compute(self, traverse=False, **kwargs)
                  File "/opt/conda/lib/python3.6/site-packages/dask/base.py", line 404, in compute
                    results = get(dsk, keys, **kwargs)
                  File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 2064, in get
                    resources=resources)
                  File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 2021, in _graph_to_futures
                    'tasks': valmap(dumps_task, dsk3),
                  File "cytoolz/dicttoolz.pyx", line 165, in cytoolz.dicttoolz.valmap
                  File "cytoolz/dicttoolz.pyx", line 190, in cytoolz.dicttoolz.valmap
                  File "/opt/conda/lib/python3.6/site-packages/distributed/worker.py", line 718, in dumps_task
                    'args': warn_dumps(task[1:])}
                  File "/opt/conda/lib/python3.6/site-packages/distributed/worker.py", line 727, in warn_dumps
                    b = dumps(obj)
                  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 51, in dumps
                    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
                  File "/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 881, in dumps
                    cp.dump(obj)
                  File "/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 268, in dump
                    return Pickler.dump(self, obj)
                  File "/opt/conda/lib/python3.6/pickle.py", line 409, in dump
                    self.save(obj)
                  File "/opt/conda/lib/python3.6/pickle.py", line 476, in save
                    f(self, obj) # Call unbound method with explicit self
                  File "/opt/conda/lib/python3.6/pickle.py", line 751, in save_tuple
                    save(element)
                  File "/opt/conda/lib/python3.6/pickle.py", line 496, in save
                    rv = reduce(self.proto)
                TypeError: can't pickle _thread.lock objectsUsing mount point: /tmp/tmpi1hpqq5w

I've found a workaround by putting everything into a single callable def and that seems to work ok, however, it'll lead to some messy and redundant code. I'm hoping there's a straight-forward(ish) way to get classes to work with dask_kubernetes.

Worker Pod configuration

I need to configure my worker pods to have elevated priveleges. I think that I can do this with the following:

        # code taken from @yuvipanda
        pod.spec.containers[0].security_context = client.V1SecurityContext(
            privileged=True,
            capabilities=client.V1Capabilities(
                add=['SYS_ADMIN']
            )
        )

However, this isn't appropriate to add to the basic pod template in this project, so this should probably be made configurable. I propose the following options to a new keyword, pod=

  1. Nothing, in which case we provide our current mechanism
  2. A kubernetes.client.V1Pod object
  3. A dictionary spec that we can turn into a kubernetes.client.V1Pod object using kubernetes.client.ApiClient.deserialize
  4. A filename that we can load with yaml to obtain 3

If any of the latter three options are selected then none of the threads/image/etc. options should be set.

KubeCluster creation is hanging on in Jupyter

OK, so sorry for the really newbie issue here.
I've just deployed a Pangeo style cluster on GCE, and I am trying to start a KubeCluster as in the Pangeo provided example, for instance dask-array.ipynb.
But no luck for the moment, the notebook cell is just displayed as running but nothing happens.

Should I give more argument than just

from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=10)
cluster

like a pod_spec for instance?

I'm not sure how this is suppose to work (or how it works on Pangeo?), as in the default config from this repo the pod spec is empty.

Cannot scale down

This might be related to #5 but when manually scaling a cluster I cannot remove workers by scaling to a lower number.

image

I've created a simple cluster. Used the widget to scale up to two. But when changing the number to zero or one and clicking scale again nothing happens. I'm having to manually clean up those pods with kubectl.

Using environment variables to update values in worker-spec.yml

In the dask configuration module, it is possible to use environment variables to update configuration values. It doesn't seem that is currently possible here as the "worker-spec.yml" file is actually a kubernetes spec, rather than a dask config file. I'm curious if there is a way to do that now or if it would be feasible to add such functionality.

somewhat related to #88, #45

Background for my particular use case: jupyterhub/kubespawner#193, jupyterhub/repo2docker#363

bokeh/tornado Errors

I am using dask-kubernetes in a setup very similar to Pangeo. I can adaptively increase/decrease the size of my cluster and the pods show up when I run kubectl get pods. However, when I execute some code, I get the error below. The dashboard renders but I get a 500: Internal Server Error on the worker tab.

distributed.utils - ERROR - unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 623, in log_errors
    yield
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 1082, in workers_doc
    table = WorkerTable(scheduler)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 963, in __init__
    reorderable=True, sortable=True, width=width, **dt_kwargs
  File "/opt/conda/lib/python3.6/site-packages/bokeh/models/widgets/tables.py", line 493, in __init__
    super(TableWidget, self).__init__(**kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/model.py", line 219, in __init__
    super(Model, self).__init__(**kwargs)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 236, in __init__
    setattr(self, name, value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 271, in __setattr__
    (name, self.__class__.__name__, text, nice_join(matches)))
AttributeError: unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
tornado.application - ERROR - Uncaught exception GET /workers (::1)
HTTPServerRequest(protocol='http', host='35.185.240.76', method='GET', uri='/workers', version='HTTP/1.1', remote_ip='::1')
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/tornado/web.py", line 1543, in _execute
    result = yield result
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/views/doc_handler.py", line 21, in get
    session = yield self.get_session()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/views/session_handler.py", line 43, in get_session
    session = yield self.application_context.create_session_if_needed(session_id, self.request)
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/application_context.py", line 182, in create_session_if_needed
    self._application.initialize_document(doc)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/util/api.py", line 190, in wrapper
    return obj(*args, **kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/application/application.py", line 179, in initialize_document
    h.modify_document(doc)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/util/api.py", line 190, in wrapper
    return obj(*args, **kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/application/handlers/function.py", line 131, in modify_document
    self._func(doc)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 1082, in workers_doc
    table = WorkerTable(scheduler)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 963, in __init__
    reorderable=True, sortable=True, width=width, **dt_kwargs
  File "/opt/conda/lib/python3.6/site-packages/bokeh/models/widgets/tables.py", line 493, in __init__
    super(TableWidget, self).__init__(**kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/model.py", line 219, in __init__
    super(Model, self).__init__(**kwargs)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 236, in __init__
    setattr(self, name, value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 271, in __setattr__
    (name, self.__class__.__name__, text, nice_join(matches)))
AttributeError: unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
distributed.utils - ERROR - unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 623, in log_errors
    yield
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 1082, in workers_doc
    table = WorkerTable(scheduler)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 963, in __init__
    reorderable=True, sortable=True, width=width, **dt_kwargs
  File "/opt/conda/lib/python3.6/site-packages/bokeh/models/widgets/tables.py", line 493, in __init__
    super(TableWidget, self).__init__(**kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/model.py", line 219, in __init__
    super(Model, self).__init__(**kwargs)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 236, in __init__
    setattr(self, name, value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 271, in __setattr__
    (name, self.__class__.__name__, text, nice_join(matches)))
AttributeError: unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
tornado.application - ERROR - Uncaught exception GET /workers (::1)
HTTPServerRequest(protocol='http', host='35.185.240.76', method='GET', uri='/workers', version='HTTP/1.1', remote_ip='::1')
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/tornado/web.py", line 1543, in _execute
    result = yield result
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/views/doc_handler.py", line 21, in get
    session = yield self.get_session()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/views/session_handler.py", line 43, in get_session
    session = yield self.application_context.create_session_if_needed(session_id, self.request)
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/application_context.py", line 182, in create_session_if_needed
    self._application.initialize_document(doc)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/util/api.py", line 190, in wrapper
    return obj(*args, **kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/application/application.py", line 179, in initialize_document
    h.modify_document(doc)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/util/api.py", line 190, in wrapper
    return obj(*args, **kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/application/handlers/function.py", line 131, in modify_document
    self._func(doc)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 1082, in workers_doc
    table = WorkerTable(scheduler)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 963, in __init__
    reorderable=True, sortable=True, width=width, **dt_kwargs
  File "/opt/conda/lib/python3.6/site-packages/bokeh/models/widgets/tables.py", line 493, in __init__
    super(TableWidget, self).__init__(**kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/model.py", line 219, in __init__
    super(Model, self).__init__(**kwargs)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 236, in __init__
    setattr(self, name, value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 271, in __setattr__
    (name, self.__class__.__name__, text, nice_join(matches)))
AttributeError: unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
distributed.utils - ERROR - unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 623, in log_errors
    yield
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 1082, in workers_doc
    table = WorkerTable(scheduler)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 963, in __init__
    reorderable=True, sortable=True, width=width, **dt_kwargs
  File "/opt/conda/lib/python3.6/site-packages/bokeh/models/widgets/tables.py", line 493, in __init__
    super(TableWidget, self).__init__(**kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/model.py", line 219, in __init__
    super(Model, self).__init__(**kwargs)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 236, in __init__
    setattr(self, name, value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 271, in __setattr__
    (name, self.__class__.__name__, text, nice_join(matches)))
AttributeError: unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
tornado.application - ERROR - Uncaught exception GET /workers (::1)
HTTPServerRequest(protocol='http', host='35.185.240.76', method='GET', uri='/workers', version='HTTP/1.1', remote_ip='::1')
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/tornado/web.py", line 1543, in _execute
    result = yield result
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/views/doc_handler.py", line 21, in get
    session = yield self.get_session()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/views/session_handler.py", line 43, in get_session
    session = yield self.application_context.create_session_if_needed(session_id, self.request)
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/application_context.py", line 182, in create_session_if_needed
    self._application.initialize_document(doc)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/util/api.py", line 190, in wrapper
    return obj(*args, **kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/application/application.py", line 179, in initialize_document
    h.modify_document(doc)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/util/api.py", line 190, in wrapper
    return obj(*args, **kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/application/handlers/function.py", line 131, in modify_document
    self._func(doc)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 1082, in workers_doc
    table = WorkerTable(scheduler)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 963, in __init__
    reorderable=True, sortable=True, width=width, **dt_kwargs
  File "/opt/conda/lib/python3.6/site-packages/bokeh/models/widgets/tables.py", line 493, in __init__
    super(TableWidget, self).__init__(**kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/model.py", line 219, in __init__
    super(Model, self).__init__(**kwargs)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 236, in __init__
    setattr(self, name, value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 271, in __setattr__
    (name, self.__class__.__name__, text, nice_join(matches)))
AttributeError: unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
distributed.utils - ERROR - unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 623, in log_errors
    yield
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 1082, in workers_doc
    table = WorkerTable(scheduler)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 963, in __init__
    reorderable=True, sortable=True, width=width, **dt_kwargs
  File "/opt/conda/lib/python3.6/site-packages/bokeh/models/widgets/tables.py", line 493, in __init__
    super(TableWidget, self).__init__(**kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/model.py", line 219, in __init__
    super(Model, self).__init__(**kwargs)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 236, in __init__
    setattr(self, name, value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 271, in __setattr__
    (name, self.__class__.__name__, text, nice_join(matches)))
AttributeError: unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
tornado.application - ERROR - Uncaught exception GET /workers (::1)
HTTPServerRequest(protocol='http', host='35.185.240.76', method='GET', uri='/workers', version='HTTP/1.1', remote_ip='::1')
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/tornado/web.py", line 1543, in _execute
    result = yield result
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/views/doc_handler.py", line 21, in get
    session = yield self.get_session()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/views/session_handler.py", line 43, in get_session
    session = yield self.application_context.create_session_if_needed(session_id, self.request)
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/application_context.py", line 182, in create_session_if_needed
    self._application.initialize_document(doc)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/util/api.py", line 190, in wrapper
    return obj(*args, **kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/application/application.py", line 179, in initialize_document
    h.modify_document(doc)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/util/api.py", line 190, in wrapper
    return obj(*args, **kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/application/handlers/function.py", line 131, in modify_document
    self._func(doc)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 1082, in workers_doc
    table = WorkerTable(scheduler)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 963, in __init__
    reorderable=True, sortable=True, width=width, **dt_kwargs
  File "/opt/conda/lib/python3.6/site-packages/bokeh/models/widgets/tables.py", line 493, in __init__
    super(TableWidget, self).__init__(**kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/model.py", line 219, in __init__
    super(Model, self).__init__(**kwargs)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 236, in __init__
    setattr(self, name, value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 271, in __setattr__
    (name, self.__class__.__name__, text, nice_join(matches)))
AttributeError: unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
distributed.utils - ERROR - unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 623, in log_errors
    yield
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 1082, in workers_doc
    table = WorkerTable(scheduler)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 963, in __init__
    reorderable=True, sortable=True, width=width, **dt_kwargs
  File "/opt/conda/lib/python3.6/site-packages/bokeh/models/widgets/tables.py", line 493, in __init__
    super(TableWidget, self).__init__(**kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/model.py", line 219, in __init__
    super(Model, self).__init__(**kwargs)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 236, in __init__
    setattr(self, name, value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 271, in __setattr__
    (name, self.__class__.__name__, text, nice_join(matches)))
AttributeError: unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width
tornado.application - ERROR - Uncaught exception GET /workers (::1)
HTTPServerRequest(protocol='http', host='35.185.240.76', method='GET', uri='/workers', version='HTTP/1.1', remote_ip='::1')
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/tornado/web.py", line 1543, in _execute
    result = yield result
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/views/doc_handler.py", line 21, in get
    session = yield self.get_session()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/views/session_handler.py", line 43, in get_session
    session = yield self.application_context.create_session_if_needed(session_id, self.request)
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/server/application_context.py", line 182, in create_session_if_needed
    self._application.initialize_document(doc)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/util/api.py", line 190, in wrapper
    return obj(*args, **kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/application/application.py", line 179, in initialize_document
    h.modify_document(doc)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/util/api.py", line 190, in wrapper
    return obj(*args, **kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/application/handlers/function.py", line 131, in modify_document
    self._func(doc)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 1082, in workers_doc
    table = WorkerTable(scheduler)
  File "/opt/conda/lib/python3.6/site-packages/distributed/bokeh/scheduler.py", line 963, in __init__
    reorderable=True, sortable=True, width=width, **dt_kwargs
  File "/opt/conda/lib/python3.6/site-packages/bokeh/models/widgets/tables.py", line 493, in __init__
    super(TableWidget, self).__init__(**kw)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/model.py", line 219, in __init__
    super(Model, self).__init__(**kwargs)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 236, in __init__
    setattr(self, name, value)
  File "/opt/conda/lib/python3.6/site-packages/bokeh/core/has_props.py", line 271, in __setattr__
    (name, self.__class__.__name__, text, nice_join(matches)))
AttributeError: unexpected attribute 'index_position' to DataTable, possible attributes are columns, css_classes, disabled, editable, fit_columns, height, js_event_callbacks, js_property_callbacks, name, reorderable, row_headers, scroll_to_selection, selectable, sizing_mode, sortable, source, subscribed_events, tags, view or width

โ€‹

Mangement widget

We might consider having the KubeCluster object repr itself in notebooks with an IPython widget that allows users to modify parameters and scale workers.

Add ability to explictly specify Kubernetes cluster

Currently, KubeCluster() is hardcoded to launch Dask workers in the "current" Kubernetes cluster, this being either the cluster where the current process is running ("in-cluster config") or as a fall-back the default cluster in the kubeconfig file. As this may not be the right strategy for all use cases, it would be nice to be able to be able to optionally specify more explicitly which cluster to use. Here are the choices I currently see possible:

  • in-cluster
  • default from kubeconfig
  • non-default cluster (other context) from kubeconfig
  • other cluster (URL to Kubernetes API server + TLS certificate for connection... is anything else needed?)

wild adapt

Hey guys, we love this, integrated it with Azure, kubernetes and node autoscaling. It's very cool. We tried the adapt method too which was hilarious as it maxed out our Azure quota by spawning 1000+ workers and getting the autoscaler very excited.

I was wondering, would an adapt_withinreason be welcome or a pull request to add some parameters to the adapt method?

In any case, epic work

Use of "pods" vs "workers"

I have found that people using our system have no concept of what kubernetes is, and frankly there isn't really a requirement for them to know the details other than it is a kind of scheduler.

But then I get questions like "How do I get the worker logs" and my answer includes a call to cluster.pods(). At which point I have to explain what a pod is and its relationship to a worker.

I would find it useful to remove the kubernetes specific language for end users. It would make more sense for them to call cluster.workers().

I would be keen to hear others thoughts on the matter.

Unable to parse requirement: invalid label value

Trying to use dask-kubernetes on a pangeo-like jupyterhub deployment on Google cloud. Using dask-kubernetes 0.4.0. When I run this code:

from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=10)

...I get this API exception:

---------------------------------------------------------------------------
ApiException                              Traceback (most recent call last)
<ipython-input-26-00757e3889c1> in <module>()
      1 from dask_kubernetes import KubeCluster
----> 2 cluster = KubeCluster(n_workers=10)
      3 cluster

/opt/conda/lib/python3.6/site-packages/dask_kubernetes/core.py in __init__(self, pod_template, name, namespace, n_workers, host, port, env, **kwargs)
    202 
    203         if n_workers:
--> 204             self.scale(n_workers)
    205 
    206     @classmethod

/opt/conda/lib/python3.6/site-packages/dask_kubernetes/core.py in scale(self, n)
    334         KubeCluster.scale_down
    335         """
--> 336         pods = self._cleanup_succeeded_pods(self.pods())
    337         if n >= len(pods):
    338             return self.scale_up(n, pods=pods)

/opt/conda/lib/python3.6/site-packages/dask_kubernetes/core.py in pods(self)
    296         return self.core_api.list_namespaced_pod(
    297             self.namespace,
--> 298             label_selector=format_labels(self.pod_template.metadata.labels)
    299         ).items
    300 

/opt/conda/lib/python3.6/site-packages/kubernetes/client/apis/core_v1_api.py in list_namespaced_pod(self, namespace, **kwargs)
  12308             return self.list_namespaced_pod_with_http_info(namespace, **kwargs)
  12309         else:
> 12310             (data) = self.list_namespaced_pod_with_http_info(namespace, **kwargs)
  12311             return data
  12312 

/opt/conda/lib/python3.6/site-packages/kubernetes/client/apis/core_v1_api.py in list_namespaced_pod_with_http_info(self, namespace, **kwargs)
  12411                                         _preload_content=params.get('_preload_content', True),
  12412                                         _request_timeout=params.get('_request_timeout'),
> 12413                                         collection_formats=collection_formats)
  12414 
  12415     def list_namespaced_pod_template(self, namespace, **kwargs):

/opt/conda/lib/python3.6/site-packages/kubernetes/client/api_client.py in call_api(self, resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, async, _return_http_data_only, collection_formats, _preload_content, _request_timeout)
    319                                    body, post_params, files,
    320                                    response_type, auth_settings,
--> 321                                    _return_http_data_only, collection_formats, _preload_content, _request_timeout)
    322         else:
    323             thread = self.pool.apply_async(self.__call_api, (resource_path, method,

/opt/conda/lib/python3.6/site-packages/kubernetes/client/api_client.py in __call_api(self, resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, _return_http_data_only, collection_formats, _preload_content, _request_timeout)
    153                                      post_params=post_params, body=body,
    154                                      _preload_content=_preload_content,
--> 155                                      _request_timeout=_request_timeout)
    156 
    157         self.last_response = response_data

/opt/conda/lib/python3.6/site-packages/kubernetes/client/api_client.py in request(self, method, url, query_params, headers, post_params, body, _preload_content, _request_timeout)
    340                                         _preload_content=_preload_content,
    341                                         _request_timeout=_request_timeout,
--> 342                                         headers=headers)
    343         elif method == "HEAD":
    344             return self.rest_client.HEAD(url,

/opt/conda/lib/python3.6/site-packages/kubernetes/client/rest.py in GET(self, url, headers, query_params, _preload_content, _request_timeout)
    229                             _preload_content=_preload_content,
    230                             _request_timeout=_request_timeout,
--> 231                             query_params=query_params)
    232 
    233     def HEAD(self, url, headers=None, query_params=None, _preload_content=True, _request_timeout=None):

/opt/conda/lib/python3.6/site-packages/kubernetes/client/rest.py in request(self, method, url, query_params, headers, body, post_params, _preload_content, _request_timeout)
    220 
    221         if not 200 <= r.status <= 299:
--> 222             raise ApiException(http_resp=r)
    223 
    224         return r

ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Audit-Id': '5be2a28d-47a8-4138-a4fb-7bb99a38ff7a', 'Content-Type': 'application/json', 'Date': 'Mon, 03 Sep 2018 14:36:51 GMT', 'Content-Length': '480'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"unable to parse requirement: invalid label value: \"[email protected]\": a valid label must be an empty string or consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character (e.g. 'MyValue',  or 'my_value',  or '12345', regex used for validation is '(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?')","reason":"BadRequest","code":400}

I think the fundamental problem is that the label "[email protected]" contains an "@" character, which is disallowed. This label is being generated from a unique ID passed through via our authentication system. I have colleagues who do not have an "@" in their ID and they do not get this error.

Maybe some sanitisation of label values is needed inside dask-kubernetes? Happy to make a PR if given some guidance on appropriate place to make changes.

cc @slejdops

Scaling up only works initially

I'm trying to do some batch jobs using dask-kubernetes. Initially i run:

cluster.adapt(minimum=1, maximum=6)

Which works fine, and I soon have 6 worker-pods running.

However, when a pod dies, another one isn't recreated, instead I get flooded with these messages (about every second):

2018-08-03 08:24:50 [INFO] distributed.deploy.adaptive: CPU limit exceeded [132485 occupancy / 5 cores]
2018-08-03 08:24:50 [INFO] distributed.deploy.adaptive: Scaling up to 6 workers

Any idea what's going on?

Get scheduler logs

I'm having issues getting the adaptive functionality to work. There are tasks on the queue but no workers are being provisioned.

To troubleshoot this I would like to be able to see the logs for the scheduler but am unsure of how to access them.

Improve logging during fail cases

Currently we fail silently when launching pods fail. We might consider waiting for the pods to finish before leaving scale_up. This would require an asynchronous solution.

Logging generally could probably be improved.

Exporting an environment

Is it worth adding a couple of util scripts that would export the users current environment (conda or virtual env) and builds a docker image on top of daskdev/dask:latest?

Being able to pass a list of conda/pip packages is fine for relatively simple environments / prototyping but I can see value in something slightly more stable. Building a new image (Shouldn't need to be done too often) will increase the connection time to the KubeCluster, but will reduce the worker start up time.

I have some basic POC of this, which I am currently using, which looks something like

  • On Jupyterlab, build my conda env / validate locally in notebook
  • Export the environment
  • Build a docker image on top of daskdev/dask:latest, using something like
dockerfile_template = (
    'FROM daskdev/dask:latest\n'
    'ADD {environment_file} /opt/app/environment.yml\n'
    'RUN /opt/conda/bin/conda env update -n dask -f /opt/app/environment.yml && \ \n'
    '    conda clean -tipsy'
)

def build_publish_dockerfile(context_dir, dockerfile_txt, tag):
    with pathlib.Path(os.getcwd()).joinpath('dockerfile').open('w') as f:
        f.write(dockerfile_txt)
    client.images.build(
        path='.', dockerfile='dockerfile', tag='%s/%s' % (DOCKER_HUB_REPO, tag), nocache=True
    )


def image_from_conda_env(env_name, tag, conda_bin='conda'):
    with tempfile.TemporaryDirectory() as tmp_dir:
        env_file = pathlib.Path(tmp_dir).joinpath('environment.yml')
        export_conda_env(env_name, env_file, conda_bin)
        dockerfile = dockerfile_template.format(env_file)
        build_publish_dockerfile(tmp_dir, dockerfile_txt=dockerfile, tag=tag)

image_from_conda_env('myenv', 'dask-worker-myenv')

k = KubeCluster(image='dask-worker-myenv')

Is this in the works? Or any thoughts on the above?

Not robust to odd characters

I had a JHub username matt! which broke dask-kubernetes when my username was included in my kubernetes name. We should escape things accordingly.

Release?

I plan to push something out to PyPI tomorrow if there are no objections.

Documentation for launching on Google Kubernetes

I'm following the Google Kubernetes docs for launching a cluster via the command line, and everything works. When I attempt to start a KubeCluster, I don't seem to have a worker_spec.yml that actually works. Perhaps I'm missing information in the yaml?

Launching a cluster:
gcloud container clusters create dask-cluster
Out:

Created [https://container.googleapis.com/v1/projects/phrasal-fire-198816/zones/us-east1-d/clusters/dask-cluster].
WARNING: environment variable HOME or KUBECONFIG must be set to store credentials for kubectl
NAME          ZONE        MASTER_VERSION  MASTER_IP       MACHINE_TYPE   NODE_VERSION  NUM_NODES  STATUS
dask-cluster  us-east1-d  1.8.8-gke.0     35.196.157.198  n1-standard-1  1.8.8-gke.0   3          RUNNING

Then in a Jupyter notebook I used:

from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('worker-spec.yaml')

And I get this nice error:

---------------------------------------------------------------------------
ConfigException                           Traceback (most recent call last)
~\Anaconda3\lib\site-packages\dask_kubernetes\core.py in __init__(self, pod_template, name, namespace, n_workers, host, port, env, **kwargs)
    150         try:
--> 151             kubernetes.config.load_incluster_config()
    152         except kubernetes.config.ConfigException:

~\Anaconda3\lib\site-packages\kubernetes\config\incluster_config.py in load_incluster_config()
     92     InClusterConfigLoader(token_filename=SERVICE_TOKEN_FILENAME,
---> 93                           cert_filename=SERVICE_CERT_FILENAME).load_and_set()

~\Anaconda3\lib\site-packages\kubernetes\config\incluster_config.py in load_and_set(self)
     44     def load_and_set(self):
---> 45         self._load_config()
     46         self._set_config()

~\Anaconda3\lib\site-packages\kubernetes\config\incluster_config.py in _load_config(self)
     50                 SERVICE_PORT_ENV_NAME not in self._environ):
---> 51             raise ConfigException("Service host/port is not set.")
     52 

ConfigException: Service host/port is not set.

During handling of the above exception, another exception occurred:

FileNotFoundError                         Traceback (most recent call last)
<ipython-input-8-fc55ef0ba3d4> in <module>()
----> 1 cluster = KubeCluster.from_yaml('worker-spec.yaml')

~\Anaconda3\lib\site-packages\dask_kubernetes\core.py in from_yaml(cls, yaml_path, **kwargs)
    248         with open(yaml_path) as f:
    249             d = yaml.safe_load(f)
--> 250             return cls.from_dict(d, **kwargs)
    251 
    252     @property

~\Anaconda3\lib\site-packages\dask_kubernetes\core.py in from_dict(cls, pod_spec, **kwargs)
    213         KubeCluster.from_yaml
    214         """
--> 215         return cls(make_pod_from_dict(pod_spec), **kwargs)
    216 
    217     @classmethod

~\Anaconda3\lib\site-packages\dask_kubernetes\core.py in __init__(self, pod_template, name, namespace, n_workers, host, port, env, **kwargs)
    151             kubernetes.config.load_incluster_config()
    152         except kubernetes.config.ConfigException:
--> 153             kubernetes.config.load_kube_config()
    154 
    155         self.core_api = kubernetes.client.CoreV1Api()

~\Anaconda3\lib\site-packages\kubernetes\config\kube_config.py in load_kube_config(config_file, context, client_configuration, persist_config)
    356     loader = _get_kube_config_loader_for_yaml_file(
    357         config_file, active_context=context,
--> 358         config_persister=config_persister)
    359     if client_configuration is None:
    360         config = type.__call__(Configuration)

~\Anaconda3\lib\site-packages\kubernetes\config\kube_config.py in _get_kube_config_loader_for_yaml_file(filename, **kwargs)
    313 
    314 def _get_kube_config_loader_for_yaml_file(filename, **kwargs):
--> 315     with open(filename) as f:
    316         return KubeConfigLoader(
    317             config_dict=yaml.load(f),

FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\Brendan/.kube/config'

My worker spec yaml, which only has the "name" attribute changed from the dask-kubernetes docs:

kind: Pod
metadata:
  name: dask-cluster
  labels:
    app: dask-ml
    
spec:
  restartPolicy: Never
  containers:
  - name: aksbdgka
    image: daskdev/dask:latest
    args: [dask-worker, --nthreads, '2', --no-bokeh, --memory-limit, 6GB, --death-timeout, '60']
    env:
      - name: EXTRA_PIP_PACKAGES
        value: fastparquet git+https://github.com/dask/distributed
    resources:
      limits:
        cpu: "2"
        memory: 6G
      requests:
        cpu: "2"
        memory: 6G

I'm not really sure how the KubeCluster actually interfaces with gcloud, so I have no idea what to look for to solve the problem and get the cluster running with dask.

dd.read_parquet("gc:") read data from gcs fails

I created a Kubernetes cluster on google cloud and deployed dask using helm from stable/dask chart. The Jupyter Notebook provides with the examples folder does not work as gcsfs package is not included.

So I tried to do my own way. Start a local Jupyter Notebook and connect to the dask cluster in the google cloud.

import dask
from dask.distributed import Client, progress
client = Client("IP:8786")
client

Works perfectly.

To avoid access problems is initialize gcsfs beforehand:

import gcsfs
gcs = gcsfs.GCSFileSystem(project='NAME', token=r"PATH")

Also works and I can ls the files.

Then I want to read the parquet:

import dask.dataframe as dd
df = dd.read_parquet('gs://anaconda-public-data/nyc-taxi/nyc.parquet')
df = df.persist()
progress(df)

This fails with error:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'urllib3'
distributed.worker - WARNING - Could not deserialize task
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/worker.py", line 1266, in add_task
    self.tasks[key] = _deserialize(function, args, kwargs, task)
  File "/opt/conda/lib/python3.6/site-packages/distributed/worker.py", line 644, in _deserialize
    args = pickle.loads(args)
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'urllib3'

However urllib3 should be installed. In order to read the parquet and provide snappy I had to edit a config.yaml as following:

# config.yaml
scheduler:
  replicas: 1
  env:
    - name: EXTRA_CONDA_PACKAGES
      value: numba xarray fastparquet pyarrow snappy python-snappy gcsfs s3fs dask-ml urllib3 -c conda-forge
    - name: EXTRA_PIP_PACKAGES
      value: urllib3
worker:
  replicas: 8
  resources:
    limits:
      cpu: 2
      memory: 7.5G
    requests:
      cpu: 2
      memory: 7.5G
  env:
    - name: EXTRA_CONDA_PACKAGES
      value: numba xarray pyarrow fastparquet snappy python-snappy gcsfs s3fs dask-ml urllib3 -c conda-forge
    - name: EXTRA_PIP_PACKAGES
      value: urllib3 # dask-ml --upgrade
# We want to keep the same packages on the worker and jupyter environments
jupyter:
  enabled: true
  env:
    - name: EXTRA_CONDA_PACKAGES
      value: numba xarray pyarrow fastparquet snappy python-snappy gcsfs s3fs dask-ml urllib3 matplotlib -c conda-forge
    - name: EXTRA_PIP_PACKAGES
      value: urllib3 # dask-ml --upgrade

Configure cluster defaults from environment variables

It could be great if we as the admins of our JupyterHub cluster could set some environment variables to specify the default values for the kwargs in KubeCluster.

For example it would nice to be able to set the name of the cluster to be the JUPYTERHUB_USER.

Deployment instead of single pods

Pods are the current way of dask-kubernetes to define a worker.
They are very easy to spawn and kill, but they are not managed when it comes to kubernetes.

Since there is nothing controlling them in the k8s layer:

  • the pods cannot be rescheduled in the case a node dies.
  • the pods cannot be moved by the cluster autoscaler to scale down a node.

I sometimes find myself with half a dozen huge nodes, each running one or two small dask workers, wasting a lot of resources. If they could be rescheduled to other nodes, even if that meant re-starting a task, it would be a good trade-off.

It could of course be possible to avoid a worker from being rescheduled by the autoscaler by annotating the workers with cluster-autoscaler.kubernetes.io/safe-to-evict = false.

I'd like to put to consideration changing the worker scheduling method from single pods to a deployment or a simple replication controller.
This would give us the ability to scale up and down by setting the number of replicas.

On the downside, I think it would make it difficult to scale down a single worker, since there is no way (that I'm aware of at least), to tell which node to scale down on size change of either a replication controller or a deployment.

Ideas and insights needed ๐Ÿ™‚

Errors when closing a cluster

When destroying a cluster either by calling KubeCluster.close() or using the context manager which calls KubeCluster.__exit__() a bunch of tornado errors are sprayed into the notebook.

Here's a simple example to demonstrate.

import os
import dask
import distributed
from daskernetes.core import KubeCluster

with KubeCluster(name=os.environ.get('JUPYTERHUB_USER'),
                 namespace='dask',
                 worker_image='informaticslab/singleuser-notebook:latest',
                 n_workers=5,
                 threads_per_worker=1,
                 host='0.0.0.0',
                 port=8786) as cluster:
    client = distributed.Client(cluster.scheduler_address)
    
    cube = lambda x: x**3
    cubed_numbers = client.map(cube, range(1000))
    total = client.submit(sum, cubed_numbers)
    print(total.result())

This results in the following output, which is repeated every few seconds.

249500250000
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7fa76c285d90>, <tornado.concurrent.Future object at 0x7fa76c2cb128>)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 174, in read
    n_frames = yield stream.read_bytes(8)
  File "/opt/conda/lib/python3.6/site-packages/tornado/iostream.py", line 324, in read_bytes
    self._try_inline_read()
  File "/opt/conda/lib/python3.6/site-packages/tornado/iostream.py", line 709, in _try_inline_read
    self._check_closed()
  File "/opt/conda/lib/python3.6/site-packages/tornado/iostream.py", line 925, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/core.py", line 464, in send_recv_from_rpc
    result = yield send_recv(comm=comm, op=key, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/core.py", line 350, in send_recv
    response = yield comm.read()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 307, in wrapper
    yielded = next(result)
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 188, in read
    convert_stream_closed_error(self, e)
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 124, in convert_stream_closed_error
    raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 605, in _run_callback
    ret = callback()
  File "/opt/conda/lib/python3.6/site-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 626, in _discard_future_result
    future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 782, in _update_scheduler_info
    self._scheduler_identity = yield self.scheduler.identity()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/core.py", line 467, in send_recv_from_rpc
    % (e, key,))
distributed.comm.core.CommClosedError: in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe: while trying to call remote method 'identity'
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7fa774bedb70>, <tornado.concurrent.Future object at 0x7fa76c14fbe0>)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/core.py", line 185, in connect
    quiet_exceptions=EnvironmentError)
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
tornado.gen.TimeoutError: Timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 605, in _run_callback
    ret = callback()
  File "/opt/conda/lib/python3.6/site-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 626, in _discard_future_result
    future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 782, in _update_scheduler_info
    self._scheduler_identity = yield self.scheduler.identity()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/core.py", line 463, in send_recv_from_rpc
    comm = yield self.live_comm()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/core.py", line 439, in live_comm
    connection_args=self.connection_args)
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/core.py", line 194, in connect
    _raise(error)
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/core.py", line 177, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://100.96.53.45:8786' after 5 s: in <distributed.comm.tcp.TCPConnector object at 0x7fa76c288550>: ConnectionRefusedError: [Errno 111] Connection refused

Clearing all cell output shows that the error keeps on happening and only a kernel restart will stop it.

ValueError in make_pod_from_dict when template image has no "name" attribute

With kubernetes client 4.0.0 I get the following exception if I do not define a name attribute in the template (in the containers segment, at the same level as the image attribute).

This name attribute will be later overriden by a dynamically generated name. However just parsing the template fails without one (see details).

Possible solutions:

  • include a dummy name placeholder in the sample templates in the documentation.
  • automatically insert a dummy name attribute in make_pod_from_dict prior to calling SERIALIZATION_API_CLIENT.deserialize.
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-2-725209b39003> in <module>()
----> 1 KubeCluster()

/opt/conda/lib/python3.6/site-packages/dask_kubernetes/core.py in __init__(self, pod_template, name, namespace, n_workers, host, port, env, **kwargs)
    137                 with open(config['kubernetes-worker-template-path']) as f:
    138                     d = yaml.safe_load(f)
--> 139                 pod_template = make_pod_from_dict(d)
    140             else:
    141                 msg = ("Worker pod specification not provided. See KubeCluster "

/opt/conda/lib/python3.6/site-packages/dask_kubernetes/objects.py in make_pod_from_dict(dict_)
    177     return SERIALIZATION_API_CLIENT.deserialize(
    178         _FakeResponse(data=json.dumps(dict_)),
--> 179         client.V1Pod
    180     )
    181 

/opt/conda/lib/python3.6/site-packages/kubernetes/client/api_client.py in deserialize(self, response, response_type)
    234             data = response.data
    235 
--> 236         return self.__deserialize(data, response_type)
    237 
    238     def __deserialize(self, data, klass):

/opt/conda/lib/python3.6/site-packages/kubernetes/client/api_client.py in __deserialize(self, data, klass)
    274             return self.__deserialize_datatime(data)
    275         else:
--> 276             return self.__deserialize_model(data, klass)
    277 
    278     def call_api(self, resource_path, method,

/opt/conda/lib/python3.6/site-packages/kubernetes/client/api_client.py in __deserialize_model(self, data, klass)
    618                    and isinstance(data, (list, dict)):
    619                     value = data[klass.attribute_map[attr]]
--> 620                     kwargs[attr] = self.__deserialize(value, attr_type)
    621 
    622         instance = klass(**kwargs)

/opt/conda/lib/python3.6/site-packages/kubernetes/client/api_client.py in __deserialize(self, data, klass)
    274             return self.__deserialize_datatime(data)
    275         else:
--> 276             return self.__deserialize_model(data, klass)
    277 
    278     def call_api(self, resource_path, method,

/opt/conda/lib/python3.6/site-packages/kubernetes/client/api_client.py in __deserialize_model(self, data, klass)
    618                    and isinstance(data, (list, dict)):
    619                     value = data[klass.attribute_map[attr]]
--> 620                     kwargs[attr] = self.__deserialize(value, attr_type)
    621 
    622         instance = klass(**kwargs)

/opt/conda/lib/python3.6/site-packages/kubernetes/client/api_client.py in __deserialize(self, data, klass)
    252                 sub_kls = re.match('list\[(.*)\]', klass).group(1)
    253                 return [self.__deserialize(sub_data, sub_kls)
--> 254                         for sub_data in data]
    255 
    256             if klass.startswith('dict('):

/opt/conda/lib/python3.6/site-packages/kubernetes/client/api_client.py in <listcomp>(.0)
    252                 sub_kls = re.match('list\[(.*)\]', klass).group(1)
    253                 return [self.__deserialize(sub_data, sub_kls)
--> 254                         for sub_data in data]
    255 
    256             if klass.startswith('dict('):

/opt/conda/lib/python3.6/site-packages/kubernetes/client/api_client.py in __deserialize(self, data, klass)
    274             return self.__deserialize_datatime(data)
    275         else:
--> 276             return self.__deserialize_model(data, klass)
    277 
    278     def call_api(self, resource_path, method,

/opt/conda/lib/python3.6/site-packages/kubernetes/client/api_client.py in __deserialize_model(self, data, klass)
    620                     kwargs[attr] = self.__deserialize(value, attr_type)
    621 
--> 622         instance = klass(**kwargs)
    623 
    624         if hasattr(instance, 'get_real_child_model'):

/opt/conda/lib/python3.6/site-packages/kubernetes/client/models/v1_container.py in __init__(self, args, command, env, env_from, image, image_pull_policy, lifecycle, liveness_probe, name, ports, readiness_probe, resources, security_context, stdin, stdin_once, termination_message_path, termination_message_policy, tty, volume_devices, volume_mounts, working_dir)
    123         if liveness_probe is not None:
    124           self.liveness_probe = liveness_probe
--> 125         self.name = name
    126         if ports is not None:
    127           self.ports = ports

/opt/conda/lib/python3.6/site-packages/kubernetes/client/models/v1_container.py in name(self, name)
    354         """
    355         if name is None:
--> 356             raise ValueError("Invalid value for `name`, must not be `None`")
    357 
    358         self._name = name

ValueError: Invalid value for `name`, must not be `None`

Add default toleration to worker pod

I have setup an instance of dask-kubernetes that uses taints to prevent kube-system components from scheduling onto the nodes. This really helps when using Google's autoscaler, since empty nodes are easily batch deleted.

The configuration that I needed to add is here:
https://github.com/pangeo-data/atmos.pangeo.io-deploy/blob/0ddab66e1a37f9f2a9ac85c8fdad7301b9e3882d/deployments/atmos.pangeo.io/image/.dask/config.yaml#L36-L44

      tolerations:
      - key: "k8s.dask.org_dedicated"
        operator: "Equal"
        value: "worker"
        effect: "NoSchedule"
      - key: "k8s.dask.org/dedicated"
        operator: "Equal"
        value: "worker"
        effect: "NoSchedule"

It might be nice to add a default toleration, so we standardize what taints are recommended for using dask-kubernetes.

Pods not assigned to nodes in autoscaling worker-pool

Hi,

Thanks for your effort you are putting into this project! I was able to get pangeo running on a GCS cluster. I did pretty much everything as described in the tutorial and now my hub is running and I can log in and open a notebook. Now the problem: When I generate the dask-kubernetes-cluster with

from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('../worker-template.yaml')
cluster.scale_up(5)

I just get 2 workers and as long as I wait it won't become more.
I can perform computations on those worker-pods though!
When I check the pods in the cluster I can see, that the missing pods are not started due to missing cpu/memory resources.
When I check the worker-pool, I can see that the resources are already allocated.
So it seems, as if the autoscaling works, but the pods are somehow not distributed to the nodes?

Would be thankful for a hint!

Move to dask org

At some point we might consider moving this out of @yuvipanda 's user account and into another github organization, presumably dask.

Moving it to the official dask org raises some additional questions:

  1. Should it be renamed to something more standardized like dask-kubernetes
  2. Oops, that project name is already taken with a command line tool focused (perhaps inappropriately) on gcloud, do we merge the two projects, deprecate the old one, ... ?
  3. Given the conflict with an overly-specialized project above are we concerned about the specialization of this project

At the moment my thought is to move the project over but keep the name. I suspect that there might be a few dask+kubernetes projects in the near future. We might consider a bazaar model with a few libraries marked experimental until one gains obvious dominance.

cc @martindurant @yuvipanda @mkjpryor-stfc

Clean up pods

I've noticed we're starting to accumulate a whole load of pods which are in the Terminated: Completed state.

Ideally these should be cleaned up in the same way JupyterHub cleans up pods after the user has logged off.

Update cpu and memory requests after cluster creation

On our Pangeo deployment users get a default worker-template.yaml which allows them to create clusters by simply running cluster = KubeCluster() without having to worry about what a kubernetes even is.

However in some occasions people want to be able to update the memory and cpu ratios of their workers depending on what they are running. The current workflow for this is to either specify the whole template as a dict or to copy the default worker-template.yaml, understand it, update the values and then user KubeCluster.from_yaml().

Personally I ended up writing a couple of helper functions in my notebook which look like this:

def update_worker_memory(cluster, new_limit):
    cluster.pod_template.spec.containers[0].resources.limits["memory"] = new_limit
    cluster.pod_template.spec.containers[0].resources.requests["memory"] = new_limit
    if '--memory-limit' in cluster.pod_template.spec.containers[0].args:
        index = cluster.pod_template.spec.containers[0].args.index('--memory-limit')
        cluster.pod_template.spec.containers[0].args[index + 1] = new_limit
    return cluster

def update_worker_cpu(cluster, new_limit):
    cluster.pod_template.spec.containers[0].resources.limits["cpu"] = new_limit
    cluster.pod_template.spec.containers[0].resources.requests["cpu"] = new_limit
    if '--nthreads' in cluster.pod_template.spec.containers[0].args:
        index = cluster.pod_template.spec.containers[0].args.index('--nthreads')
        cluster.pod_template.spec.containers[0].args[index + 1] = new_limit
    return cluster

This allows me to adjust the worker template after the cluster has been created and all new workers will follow the updated values.

I'm considering how to add this functionality into the core project. I'm inspired by the dask-jobqueue SLURMCluster which allows you to specify cores and memory as kwargs. Therefore perhaps @mrocklin, @jhamman or @guillaumeeb have thoughts.

Before I go charging in to raise a PR I would like to discuss options.

  • Would it be useful to add methods to the KubeCluster object to update sizes after creation as I am above?
  • Should we add kwargs to the cluster init and if so should they create the cluster and use the helpers or update the config before creation?
  • Are there any other ways of specifying memory and cpu that I haven't captured in the examples above?

Configure worker pods

Currently our pod spec is relatively simple:

    def _make_pod(self):
        return client.V1Pod(
            metadata=client.V1ObjectMeta(
                generate_name=self.name + '-',
                labels=self.worker_labels
            ),
            spec=client.V1PodSpec(
                restart_policy='Never',
                containers=[
                    client.V1Container(
                        name='dask-worker',
                        image=self.worker_image,
                        args=[
                            'dask-worker',
                            self.scheduler_address,
                            '--nthreads', str(self.threads_per_worker),
                        ]
                    )
                ]
            )
        )

For our needs this seems to work well. However for broader use will this be sufficient? Is there a way that we can provide an escape hatch for people to define their own pod template, either using the kubernetes Python APi or some YAML ?

Doc unclear on actual usage scenarios that will work

I've been trying to use Dask-kubernetes to have a local Jupyter notebook on my laptop and run the Dask workers run on a Google Cloud Kubernetes cluster. I've been struggling a bit with the doc to figure out if this is supported, and how to do this, although it may not help that I'm learning Kubernetes at the same time. Then I found this #58 (comment) , which made everything a lot clearer: "It [Dask-kubernetes] is designed to be run from a pod on a Kubernetes cluster that has permissions to launch other pods."

So this issue is about clarifying the doc, to help people like me in the future. I'm willing to do the pull request, but we should probably figure out if my understand is accurate first. On top of something like the above sentence (on the doc, and KubeCluster docstring), here are the things that would have been useful for me to have the doc clarify:

  • kubectl doesn't need to be installed (because the Kubernetes Python API is used directly).. pretty sure this is true also...?
  • If .kube/config is configured correctly to be able to launch pods, it will be used, and workers will be launched on the default context. (Is that correct? If yes, it would allow the use case I'm looking for. If not, is there a way to influence the Kubernetes cluster used by Dask-kubernetes?)

If the second bullet is supported, it would also be nice to be able to specify a Kubernetes context other than the default, but that should probably be a separate issue...

Thanks!

Proxy for worker and scheduler dashboards

One slightly tricky bit with running dask in a kubernetes environment (we're doing JupyterHub + JupyterLab + k8s) is that the dashboards are all on the internal, private network, and there's only the one https endpoint that leads you to the hub and then proxies you to your container.

If you want to use bokeh inside JupyterLab in this sort of environment, you end up using nbserverproxy anyway to map local ports to a route mapped into your https endpoint.

Turns out you can do a similar trick for the scheduler proxy. This is how we do it at LSST, and I think with a little work we could drop the LSST-specific environmental bits and make it a generic part of dask-kubernetes:

https://github.com/lsst-sqre/jupyterlabutils/blob/master/jupyterlabutils/notebook/lsstdaskclient.py

All that does is to replace the representation of the scheduler dashboard endpoint with its equivalent mapped through the hub user proxy and the nbserver proxy.

The other piece is a little more interesting. What this does is to collect the worker status dashboard URLs, create local-to-the-scheduler-and-therefore-notebook TCP proxies to the worker host/bokeh-port, hook those into the existing IOLoop, and then return the proxy addresses pointing to those ports to the user. That lets you hit the worker dashboards from your browser, despite the fact that everything is running on the k8s private notebook:

https://github.com/lsst-sqre/jupyterlabutils/blob/master/jupyterlabutils/notebook/clusterproxy.py

Both of these in their current form depend on environment variables I'm injecting as part of the LSST Lab setup; I'd like to discuss how we can make them generic and roll them into dask-kubernetes.

Spawn Dask scheduler on separate Kubernetes pod

This feature request is a followup to #82. Having at least the option to spawn the Dask scheduler on a remote Kubernetes pod would enable more use cases for Dask-kubernetes, including at least the "run Jupyter on my notebook, but do the Dask computation on a remote cluster" one.

Does it work for anyone?

Hello,

I'm trying to set up Dask and Kubernetes. Dask is version 0.17.5.
Following instructions, I installed dask-kubernetes. It fails with

from dask_kubernetes import KubeCluster
Traceback (most recent call last):
File "", line 1, in
File "/Users/evg/Library/Python/2.7/lib/python/site-packages/dask_kubernetes/init.py", line 1, in
from .core import KubeCluster
File "/Users/evg/Library/Python/2.7/lib/python/site-packages/dask_kubernetes/core.py", line 7, in
from urllib.parse import urlparse
ImportError: No module named parse

Ok, I'm on Python 2.7, so in core.py I changed

from urllib.parse import urlparse

to

try:
    from urllib.parse import urlparse
except ImportError:
    from urlparse import urlparse

and now it fails with

from dask_kubernetes import KubeCluster
Traceback (most recent call last):
File "", line 1, in
File "/Library/Python/2.7/site-packages/dask_kubernetes-0.3.0-py2.7.egg/dask_kubernetes/init.py", line 1, in
from . import config
File "/Library/Python/2.7/site-packages/dask_kubernetes-0.3.0-py2.7.egg/dask_kubernetes/config.py", line 10, in
dask.config.ensure_file(source=fn)
AttributeError: 'module' object has no attribute 'config'

Release

I plan to issue a small release today if there are no objections.

Robust Cleanup of pods

We should ensure that pods don't get left behind. We can do this in two ways:

  1. When the KubeCluster or Python process shut down we can remove the pods that we created
  2. When the launching pod dies we can remove all resources that it produced. Ideally we also restrict this pod to a specific namespace so that it is well isolated.

JupyterLab extension

It would be useful to have a JupyterLab extension to launch Daskernetes clusters.

Unable to retrieve pod logs

I am trying to understand the relationship between the workers and pods. To this end, I have tried a few methods and have been unsuccessful.

The first method using kubectl:
screen shot 2018-04-05 at 7 42 03 am

?...This is confusing.

The second method using dask_kubernetes cluster.log(pod) raises what looks like an rbac error.
Here is the output from cluster.log(cluster.pods()[0]):

screen shot 2018-04-05 at 7 47 55 am

Its not clear to me where to modify rbac privileges so the single-user notebook can have access to the worker logs.

Resolve local testing approach

There are a couple options:

  1. Use telepresence like the following:

     telepresence --expose 8786 --run py.test daskernetes
    
  2. Use ip route as show in the README. If so we should probably include instructions to reverse these steps

Scaling behavior permission in namespace issues

I've got a cluster up and running with dask_kubernetes installed. I am using this as a guide:

When I go to initialize my scaling behavior I get the following error:

from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('worker-spec.yml')
cluster.scale_up(3)

....
ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '7d4dba0b-e4e2-40c8-bedc-33e93a3801a1', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Tue, 20 Mar 2018 00:45:44 GMT', 'Content-Length': '307'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods is forbidden: User \"system:serviceaccount:default:default\" cannot list pods in the namespace \"default\": Unknown user \"system:serviceaccount:default:default\"","reason":"Forbidden","details":{"kind":"pods"},"code":403}

My worker-spec.yml looks like this:

kind: Pod
metadata:
spec:
  restartPolicy: Never
  containers:
  - image: daskdev/dask:latest
    name: worker
    args: [--nthreads, '2', --no-bokeh, --memory-limit, 6GB, --death-timeout, '60']
    env:
      - name: EXTRA_PIP_PACKAGES
        value: fastparquet git+https://github.com/dask/distributed

This is the same behavior as pangeo-data/pangeo#167
I have copied dask-kubernetes-serviceaccount.yml file and run kubectl create -f dask-kubernetes-serviceaccount.yml.

Is there anything obvious I am missing?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.