Giter Club home page Giter Club logo

jupyter-server / enterprise_gateway Goto Github PK

View Code? Open in Web Editor NEW
610.0 610.0 214.0 7.62 MB

A lightweight, multi-tenant, scalable and secure gateway that enables Jupyter Notebooks to share resources across distributed clusters such as Apache Spark, Kubernetes and others.

Home Page: https://jupyter-enterprise-gateway.readthedocs.io/en/latest/

License: Other

Makefile 3.33% Jupyter Notebook 3.99% Python 74.39% HTML 1.85% Shell 6.17% R 1.55% Scala 2.76% Dockerfile 2.70% JavaScript 1.07% SCSS 1.34% Jinja 0.84%
enterprise gateway hacktoberfest jupyter jupyter-enterprise-gateway jupyter-kernels jupyter-notebook kernel kubernetes remote-kernels spark spark-on-kubernetes yarn

enterprise_gateway's People

Contributors

akchinstc avatar amarinder-bindra avatar bgerrity avatar blink1073 avatar bloomsa avatar carreau avatar charlieeeeeee avatar ckadner avatar dependabot[bot] avatar echarles avatar ellisonbg avatar esevan avatar jtyberg avatar kevin-bates avatar kiersten-stokes avatar ktong avatar lk-tmac1 avatar lresende avatar lull3rskat3r avatar luong-komorebi avatar minrk avatar parente avatar poplav avatar pre-commit-ci[bot] avatar rahul26goyal avatar reverie99 avatar rolweber avatar sanjay-saxena avatar telamonian avatar willingc avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

enterprise_gateway's Issues

Implement socket-based connection file mode

We currently support two modes for conveying the connection information between Elyra and the kernel: Push Mode and Pull Mode.

Push Mode is the default and uses ports generated on the Elyra server in hopes they will be valid when used on the selected destination. This mode is the default because it requires no changes on the kernel side of things - so should just work provided the ports are valid. That said, its a huge leap of faith regarding port validity.

Pull Mode can occur when the kernel knows how to go about producing the connection file. For example, the iPython kernel will "fill" in a connection file if the associated parameter does not correspond to a physical file (just its name). In this case, the Elyra server, monitoring the state of the Yarn application, will pull the specified file once the application is in RUNNING state and the file is observed to exist (via a remove shell command). Pull Mode is advantageous to Push Mode since the port derivation occurs on the actual target system - which tremendously reduces the odds of port conflicts.

Since many kernels do not have those "create on the fly" semantics, yet implementing Pull Mode would require a launcher application which creates the connection file, then launches the desired kernel. There are many advantages to having a launcher application besides creating the connection file (e.g., provides a place for inserting custom code). As a result, if the Elyra server indicated where to send the connection information, it would be easy to introduce a third mode of connection file communication - Socket Mode.

Socket Mode has a couple of advantages over Pull Mode.

  1. It eliminates the need to copy (pull) the file from the remote system.
  2. Because the connection info is sent immediately when available, it eliminates the need to monitor for the file's existence.

In addition, we could choose to pass other information as well.

Elyra will convey the response address (IP and Port) for where to send the connection info JSON in an environment variable KERNEL_RESPONSE_ADDRESS. This variable will be of the form <ip>:<port>.

This information will be conveyed to the launcher via another environment variable built within the kernel.json file named LAUNCH_OPTS which will consist of the argument name --response_address followed by the value of KERNEL_RESPONSE_ADDRESS. (See below)

As part of this issue, we should extend the kernelspec indicator of remote_process_proxy_class to include the connection file mode indicator. This will then allow each kernel to specify their own process proxy behavior.

This change will be accomplished by introducing a process_proxy stanza in place of the remote_process_proxy entry which can then specify both the class and connection file mode, as follows...

{
  "language": "python",
  "display_name": "Spark 2.1 - Python (YARN Cluster Mode)",
  "process_proxy": {
    "class_name": "kernel_gateway.services.kernels.processproxy.YarnProcessProxy",
    "connection_file_mode": "socket"
  },
  "env": {
    "SPARK_HOME": "/usr/iop/current/spark2-client",
    "SPARK_OPTS": "--master yarn --deploy-mode cluster --name ${KERNEL_ID:-ERROR__NO__KERNEL_ID} --proxy-user ${KERNEL_USERNAME:-ERROR__NO__KERNEL_USERNAME}",
    "LAUNCH_OPTS": "--response_address ${KERNEL_RESPONSE_ADDRESS:-ERROR__NO__KERNEL_RESPONSE_ADDRESS}"
  },
  "argv": [
    "/usr/local/share/jupyter/kernels/spark_2.1_python_yarn_cluster/bin/run.sh",
    "{connection_file}"
  ]
}

When not specified, the default process proxy class will be kernel_gateway.services.kernels.processproxy.LocalProcessProxy and the default connection file mode will be None, although each process proxy class implementation is free to specify its own default (e.g., default connection file mode for StandaloneProcessProxy will be Push Mode).

Improve time-to-connect times for yarn-cluster kernels

After a yarn-cluster kernel is launched, Elyra polls its status at one-second intervals. Then, depending on the connection-file-mode policy, additional delays can be introduced. I believe we can trim 1 to 2 seconds off the kernel launch and the time it takes for a kernel to be in a 'connected' state - (which is the point that cells can be submitted, although they won't be executed until the kernel is 'ready') - by doing the following...

  1. Begin looking for the connection file as soon as a host is assigned. Currently, we wait until the Yarn application state is RUNNING, but launchers could be optimized to create the connection file prior to that. (See Issue #64)
  2. Combine the yarn queries to get the state and host into one that returns a tuple. Currently, we use curl to just get the state. Then, until there is a host, we turn around and use the Yarn API to get the host. We should flip this and use the yarn API to get state and host, then, once we have a host, just use curl (assuming its much faster).
  3. Cut the sleep interval from 1 second to .5 seconds. This will improve cases where the connection file was available just after the sleep started.
  4. Increase the socket timeout from 1 second to 5 seconds. Since this is a blocking call that will be interrupted as soon as data arrives, it will optimize socket mode since we expect data to arrive within 5 seconds after host assignment - and we would have just spent that time polling status anyway.

Toree is taking 10 seconds to respond to initial kernel info request

When debugging kernel launches, I noticed that python kernels are about 10 seconds faster to start than Toree kernels. @aazhou1 found similar timings in #42.

The time difference appears to be between Elyra/JKG issuing the request for kernel info and the kernel's response. In these cases, Toree takes about 10 seconds while IPython is milliseconds.

We should get an understanding of where this time is being consumed in Toree.

Toree: 10.637 secs

[D 2017-07-11 10:44:35.184 KernelGatewayApp] Requesting kernel info from 786c1a04-5724-4dc9-a6f7-a652d010b932
[D 2017-07-11 10:44:35.184 KernelGatewayApp] Connecting to: tcp://172.16.187.129:40313
[W 2017-07-11 10:44:45.186 KernelGatewayApp] Timeout waiting for kernel_info reply from 786c1a04-5724-4dc9-a6f7-a652d010b932
[I 170711 10:44:45 web:2063] 101 GET /api/kernels/786c1a04-5724-4dc9-a6f7-a652d010b932/channels (9.108.95.126) 10007.74ms
[D 2017-07-11 10:44:45.188 KernelGatewayApp] Opening websocket /api/kernels/786c1a04-5724-4dc9-a6f7-a652d010b932/channels
[D 2017-07-11 10:44:45.188 KernelGatewayApp] Connecting to: tcp://172.16.187.129:40313
[D 2017-07-11 10:44:45.189 KernelGatewayApp] Connecting to: tcp://172.16.187.129:40804
[D 2017-07-11 10:44:45.189 KernelGatewayApp] Connecting to: tcp://172.16.187.129:33483
[D 2017-07-11 10:44:45.811 KernelGatewayApp] activity on 786c1a04-5724-4dc9-a6f7-a652d010b932: status
[D 2017-07-11 10:44:45.821 KernelGatewayApp] Received kernel info: {u'implementation': u'spark', u'protocol_version': u'5.0', u'language_info': {u'mimetype': u'text/x-scala', u'name': u'scala', u'pygments_lexer': u'scala', u'version': u'2.11.8', u'file_extension': u'.scala', u'codemirror_mode': u'text/x-scala'}, u'implementation_version': u'0.2.0.dev1-incubating-SNAPSHOT', u'banner': u'Apache Toree'}

IPython: 0.007 secs

[D 2017-07-11 10:45:14.266 KernelGatewayApp] Requesting kernel info from 5042b31c-a6a0-43da-8450-da985a68b49b
[D 2017-07-11 10:45:14.266 KernelGatewayApp] Connecting to: tcp://172.16.187.129:37035
[D 2017-07-11 10:45:14.271 KernelGatewayApp] activity on 5042b31c-a6a0-43da-8450-da985a68b49b: status
[D 2017-07-11 10:45:14.273 KernelGatewayApp] Received kernel info: {u'status': u'ok', u'language_info': {u'mimetype': u'text/x-python', u'nbconvert_exporter': u'python', u'name': u'python', u'pygments_lexer': u'ipython2', u'version': u'2.7.5', u'file_extension': u'.py', u'codemirror_mode': {u'version': 2, u'name': u'ipython'}}, u'implementation': u'ipython', u'implementation_version': u'5.4.1', u'protocol_version': u'5.1', u'banner': u'Python 2.7.5 (default, Aug  2 2016, 04:20:16) \nType "copyright", "credits" or "license" for more information.\n\nIPython 5.4.1 -- An enhanced Interactive Python.\n?         -> Introduction and overview of IPython\'s features.\n%quickref -> Quick reference.\nhelp      -> Python\'s own help system.\nobject?   -> Details about \'object\', use \'object??\' for extra details.\n', u'help_links': [{u'url': u'http://docs.python.org/2.7', u'text': u'Python'}, {u'url': u'http://ipython.org/documentation.html', u'text': u'IPython'}, {u'url': u'http://docs.scipy.org/doc/numpy/reference/', u'text': u'NumPy'}, {u'url': u'http://docs.scipy.org/doc/scipy/reference/', u'text': u'SciPy'}, {u'url': u'http://matplotlib.org/contents.html', u'text': u'Matplotlib'}, {u'url': u'http://docs.sympy.org/latest/index.html', u'text': u'SymPy'}, {u'url': u'http://pandas.pydata.org/pandas-docs/stable/', u'text': u'pandas'}]}

Detect missing culling functionality during load of persisted kernel sessions

When a previously persisted kernel session is loaded during Elyra's setup, the idle kernel culling functionality must be initialized for each "revived" kernel session. If the underlying notebook package does not contain the idle kernel culling functionality, a missing attribute exception is thrown and Elyra fails to start. Instead, such missing functionality should be detected and gracefully ignored. In such cases, Elyra should issue a warning message indicating that an older version of notebook is in use, but continue.

Should consider adding a namespace to launcher options

We may want to consider adding a namespace indicator to launcher options sooner rather than later. Right now we only have --response-address, but we might want others. For each option, there's the chance that it collides with a pre-existing parameter. This can be avoided by prefixing all parameters with a namespace which is considered good practice anyway.

Some proposals include:
Elyra e.g., --Elyra.response-address
ElyraLauncher e.g., --ElyraLauncher.response-address
KernelLauncher e.g., --KernelLauncher.response-address

This would require changes to each of the launchers such that the appropriate parameter is processed.

Investigate removing --response-address parameter from kernel.json

Currently, the response address parameter used for socket mode exists in the kernel.json file and will be passed to launchers - regardless of connection file mode. Instead, we may want to only set the --response-address parameter when 'socket' mode is used. This makes the parameter more specific to connection-file mode - where it belongs.

We'd still keep the current LAUNCH_OPTS entry in place, but it will be empty by default. However, this can be used to convey other information to the launchers.

Create kernel launch script for R kernels to enable pull-mode

Similar to the launch script for the py spark kernel (see launch_ipkernel.py), the R kernel invocations could also utilize a launch script that is responsible for constructing the necessary information for location-relative connection files thereby dramatically decreasing the likelihood of port conflicts. This would include the identification of local ports and public IP - as well as generation of the key used to sign socket traffic.

Move runtime and data directories under installation folder

Currently the JUPYTER_DATA_DIR and JUPYTER_RUNTIME_DIR values point to /var/lib/elyra and /var/run/elyra/runtime. Since Elyra will likely not be "ambarified" for the CHS release and we need to support Elyra in all configurations, it would be best to maintain Elyra's file structure under its installation folder of {{ install_dir }}/elyra (e.g., /opt/elyra).

One proposal is for JUPYTER_DATA_DIR to correspond to /opt/elyra/data and JUPYTER_RUNTIME_DIR at /opt/elyra/runtime with the requirement that JUPYTER_RUNTIME_DIR exist on all nodes.

This Issue was prompted after learning that /var/run/elyra was removed from all nodes following cluster restarts.

Opensource dependency checklist

We currently rely on various 3rd-party libraries and packages which were forked off of open-source projects and modified for our needs without contributing the changes back to the respective upstream repositories.

This issue serves as a checklist of sorts to track progress on getting our open-source dependency house back in order. Individual "sub-tasks" should be linked into this list.

  • Yarn-Client: HTTPS support from here (Note: Version 0.2.3 has been deemed sufficient.)
  • Apache Toree: need a SNAPSHOT or release with fixes since 0.2.0-dev1, i.e. ASM jar shading, support Yarn Cluster mode --> Issue #40
    • update Toree's Python dependencies ... jupyter_client<5.0,>=4.0 should be >=4.0 since it works with jupyter_client=5.1
    • Toree builds after June 7, 2017 won't work in Yarn Cluster mode (Toree Main application get stuck in ACCEPTED state and will time out without a SparkContext being utilized after spark.yarn.am.waitTime, default of 100 seconds) -- see: Toree commit 5cfbc83 (TOREE-390: Lazily initialize spark sessions)
  • NB2KG: variables for request timeouts and user authentication mentioned here
  • NB2KG docker image: need to rebuild and host under neutral name on Dockerhub - see elyra/NB2KG
  • Notebook: Kernel culling will first appear in the 5.1.0 release of Jupyter Notebook which is now available.
  • ...

Investigate user environment setup

We need to make sure kernels come up in a well-defined environment that is sufficiently isolated from that of other users. Considerations should include the user's home folder in HDFS or GPF and data persistence across kernel sessions.

Environments could be isolated on a ...

  • kernel by kernel basis
  • user by user basis
  • be "fresh" each time a kernel gets launched

For Python kernels the mechanism to use could be virtualenv or Anaconda.

For Java/Scala kernels, the submitted jar files constitute the runtime environment

For R kernels it could be Renv or Packrat.

Investigate: Permission denied: u'/var/run/elyra/runtime/kernel-xxxxx.json

Looks like there are some intermitent cases where we are getting permission denied on creating/updating kernel profile json file

[D 2017-07-16 15:56:15.197 KernelGatewayApp] RemoteKernelManager: Writing connection file with ip=169.45.103.144, control=33001, hb=45662, iopub=44204, stdin=33960, shell=44543
Traceback (most recent call last):
  File "/opt/anaconda2/bin/jupyter-kernelgateway", line 11, in <module>
    sys.exit(launch_instance())
  File "/opt/anaconda2/lib/python2.7/site-packages/jupyter_core/application.py", line 267, in launch_instance
    return super(JupyterApp, cls).launch_instance(argv=argv, **kwargs)
  File "/opt/anaconda2/lib/python2.7/site-packages/traitlets/config/application.py", line 657, in launch_instance
    app.initialize(argv)
  File "/opt/anaconda2/lib/python2.7/site-packages/kernel_gateway/gatewayapp.py", line 352, in initialize
    self.init_configurables()
  File "/opt/anaconda2/lib/python2.7/site-packages/kernel_gateway/gatewayapp.py", line 402, in init_configurables
    self.kernel_session_manager.start_sessions()
  File "/opt/anaconda2/lib/python2.7/site-packages/kernel_gateway/services/sessions/kernelsessionmanager.py", line 84, in start_sessions
    if not self._start_session(kernel_session):
  File "/opt/anaconda2/lib/python2.7/site-packages/kernel_gateway/services/sessions/kernelsessionmanager.py", line 97, in _start_session
    launch_args=kernel_session['launch_args'])
  File "/opt/anaconda2/lib/python2.7/site-packages/kernel_gateway/services/kernels/remotemanager.py", line 46, in start_kernel_from_session
    km.write_connection_file()
  File "/opt/anaconda2/lib/python2.7/site-packages/kernel_gateway/services/kernels/remotemanager.py", line 143, in write_connection_file
    return super(RemoteKernelManager, self).write_connection_file()
  File "/opt/anaconda2/lib/python2.7/site-packages/jupyter_client/connect.py", line 431, in write_connection_file
    kernel_name=self.kernel_name
  File "/opt/anaconda2/lib/python2.7/site-packages/jupyter_client/connect.py", line 136, in write_connection_file
    with open(fname, 'w') as f:
IOError: [Errno 13] Permission denied: u'/var/run/elyra/runtime/kernel-7388b059-8a68-48a0-811c-2d9803f04250.json'

This seems related to having some files being created as "elyra user" and others as "yarn:hadoop" user

[root@spark-02 runtime]# pwd
/var/run/elyra/runtime
[root@spark-02 runtime]# ls -la
total 12
drwxrwxrwx. 2 elyra elyra  100 Jul 16 15:44 .
drwxr-xr-x. 3 elyra elyra   80 Jul 14 18:48 ..
-rw-r--r--. 1 yarn  hadoop 292 Jul 16 15:44 kernel-7388b059-8a68-48a0-811c-2d9803f04250.json
-rw-rw-r--. 1 elyra elyra  308 Jul 15 02:31 kernel-aed5c8b7-dcee-4872-8dff-ae46ea4fec61.json
-rw-r--r--. 1 yarn  hadoop 292 Jul 15 02:32 kernel-f4ff41f9-64f9-45c4-b239-31a63288b416.json

Minimize per kernel footprint on Elyra server

When issuing a spark-submit call that runs the driver application in yarn cluster mode, there's still some cost associated with each kernel since each will have an associated JVM process. This "submit-process" can be automatically avoided once the driver application has reach the RUNNING state by setting the configuration flag --conf spark.yarn.submit.waitAppCompletion=false. When set, there are two immediate benefits:

  1. Once the kernel is in the Yarn RUNNING state, there is no "submit-process" JVM overhead introduced.
  2. The Elyra log does not fill will per-second log entries indicating the status of the running application - for each kernel - thereby leading to premature roll over and unnecessary noise.

On the other hand, there is no process by which manual status can be achieved or perhaps rouge kernels could be killed.

A workaround to the status can be achieved by querying the kernel persistence repository (file or DB) and rouge kernels (yarn applications) can be monitored and killed by the Yarn management facility.

As a result, I believe the positives outweigh the negatives and we should add --conf spark.yarn.submit.waitAppCompletion=false to kernelspecs using YarnProcessProxy.

Because there is no local process we should log the command that is started. This can be done by extending the run.sh scripts appropriately (i.e. set -x). Special care needs to be taken in masking potentially user sensitive information (i.e. KG_HTTP_PASSWORD in the user env).

Make Kernel Lifecycle management plugable

In troubleshooting the Yarn process proxy ([Issue #4]), I found that it was getting side affected by the Standalone process proxy functionality that relies on the fact that the connection file has been copied to the remote system (into the /tmp directory by default). As a result, that file is not present because the actual launch of yarn-based kernels is a local operation.

This functionality was based on the presence of a remote-process-proxy-class defined in the kernelspec, but was performed when the process proxy class had not been instantiated yet. Since instantiation of the process proxy class is currently performed at the time of launch (well after the formatting of the cmd and generation of the temp file) we should create the process proxy class sooner and let the actual process creation occur via a method (rather than the constructor). By using this approach different forms of process proxies can have better control of their environment.

This issue will look into creating the process proxy class when its kernel manager instance is created (if possible).

Honor KERNEL_LAUNCH_TIMEOUT from kwargs[env]

YarnProcessProxy should be modified to first look for KERNEL_LAUNCH_TIMEOUT in the kwarg[env] and, if not set, defer to ELYRA_KERNEL_LAUNCH_TIMEOUT.

This will allow notebook users to specify their own launch timeout values prior to starting their Notebook servers by setting KERNEL_LAUNCH_TIMEOUT since all env vars prefixed with KERNEL_ flow to JKG/ELyra.

IPython launcher no longer creates connection file when pull mode is configured.

PR #43 introduced socket mode for conveying the connection information of the remote kernel back to Elyra. The launcher change included for that PR is only creating the connection file if the --response-address parameter is present. However, if the Kernelspec were to updated such that pull mode is enabled (and the --response-address is removed), the connection file would still not be created. I.e., the connection file should be created whenever it doesn't exist. Then, its contents should be returned via the response address - only when that parameter exists.

Implement initial session persistence for kernels

As noted in Issue #19 the initial implementation of kernel sessions should track the following...

  1. Kernel id (fundamental primary key)
  2. Connection information - IP, 5 assigned ports, Key and Signature scheme, kernel name (i.e., have the ability to produce the content of the connection file - probably just saving the connection file itself)
  3. Kernel user (via KERNEL_USERNAME)
  4. Process proxy instance information. Need kernel name to know process proxy. Each process proxy will produce information relative to itself (preferrably in readable format, but up to provider). For yarn, need application Id, local pid(?), etc. (although could determine application id from kernel_id).
  5. Some portion of the environment since referred to as the launch arguments that were used during the original launch.

Kernel sessions should be persisted immediately following the kernel's start and removed during its shutdown.
Persisted kernel sessions will be loaded at startup and each will 'hydrate' a kernel manager instance from the persisted state - provided the kernel is still active. Kernel activity will be a function of a successfull poll using the appropriate process-proxy instance (also instantiated from the persisted state).
Sessions that yield an active poll response will be added the set of managed kernels, while sessions that indicate non-activity will be removed from the persistent repository.

Initial implementation can take place via files and directories - where directories will eventually map to database/table and files will map to rows.

Investigate kernel prespawning (pooling)

In order to address kernel startup times, we should probably get an idea of how the current kernel prespawn logic works in JKG. This is enabled via two command line options (or env vars): --KernelGatewayApp.prespawn_count=<n> --KernelGatewayApp.default_kernel_name=<kernel_name>

Because the code path to prespawn a kernel is entirely different than the normal route - changes will likely be necessary to get the basics working with yarn cluster kernels and session persistence.

Obviously, we'd need to tie the pooling to some form of user configuration where each user could have their own prespawn count and kernel-type, etc.

Integrate IRkernel with Elyra

Create and test R wrapper for spark submit
Verify end to end scenario, including start/stop/execution via notebook

Christian tested and found that spark submit is able to take in a sparkR script directly
$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster application.R

  1. R script will take in a kernel connection file as a command line argument
  2. import necessary libraries and create spark context.

Spark APIs are not working in Elyra

A more complex notebook with Spark code fails to run in Elyra (and works ok on test server from toree make dev):

The issue seems to be related to how we are creating the context in the Toree Launcher, as I can get things working again if I remove the creation of the context in Toree Launcher and go back prior to the "delay context initialization" commit.

Being fixed in Toree by : TOREE-425

See sample code below and exception details

import org.apache.commons.io.IOUtils
import java.net.URL
import java.nio.charset.Charset

val sqc = spark.sqlContext
import sqc.implicits._

val bankText = sc.parallelize(
    IOUtils.toString(
        new URL("https://s3.amazonaws.com/apache-zeppelin/tutorial/bank/bank.csv"),
        Charset.forName("utf8")).split("\n"))
        
case class Bank(age: Integer, job: String, marital: String, education: String, balance: Integer)

val bank = bankText.map(s => s.split(";")).filter(s => s(0) != "\"age\"").map(
    s => Bank(s(0).toInt, 
            s(1).replaceAll("\"", ""),
            s(2).replaceAll("\"", ""),
            s(3).replaceAll("\"", ""),
            s(5).replaceAll("\"", "").toInt
        )
).toDF()

bank.printSchema()

bank.show(10)

Exception:

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Elyra needs to recognize loopback scenario in socket mode

On configurations where the elyra server is also a worker node, socket mode requests will timeout if the kernel lands on the elyra node because it never receives the connection information. This is because the kernel launcher received a reference to an existing file and assumes 'push' mode.

What is really happening is that, in this case, because the assignment landed on the elyra server, the connection information created by default is valid. So because the file is not empty, the launcher will behave as if the connection file mode is 'push' and just use the file its given. In this case, Elyra should break out of its loop waiting for a socket response (assuming its in the RUNNING state) and just continue - since the original file is valid.

Investigate async creation of spark session in launchers

By instrumenting the python launcher with simple timestamps, it was discovered that spark session creation is taking 6 seconds to complete. The rest of the launcher code (creation of the connection file, sending it back on a socket, etc.) is sub-second (or faster). By performing the session creation (and subsequent lines dependent on that code) in an asynchronous manner we should trim the kernel startup time substantially.

Assuming this approach works, we'll want to do the same for the R and Toree kernels.

Here's the output of a timestamp-decorated launch script:

A: 2017-07-11 20:09:28.425
B: 2017-07-11 20:09:28.426
C: 2017-07-11 20:09:34.529
D: 2017-07-11 20:09:34.530
E: 2017-07-11 20:09:34.530
F: 2017-07-11 20:09:34.530
G: 2017-07-11 20:09:34.531
H: 2017-07-11 20:09:34.531
HA: 2017-07-11 20:09:34.531
HB: 2017-07-11 20:09:34.531
HC: 2017-07-11 20:09:34.531
I: 2017-07-11 20:09:34.531
J: 2017-07-11 20:09:34.531

followed by the script itself:

import os.path
import socket
import json
import uuid
import argparse
from ipython_genutils.py3compat import str_to_bytes
from jupyter_client.connect import write_connection_file
from IPython import embed_kernel
from pyspark.sql import SparkSession
from datetime import datetime


def return_connection_info(connection_file, ip, response_addr):
    response_parts = response_addr.split(":")
    if len(response_parts) != 2:
        print("Invalid format for response address '{}'.  Assuming 'pull' mode...".format(response_addr))
        return

    response_ip = response_parts[0]
    try:
        response_port = int(response_parts[1])
    except ValueError:
        print("Invalid port component found in response address '{}'.  Assuming 'pull' mode...".format(response_addr))
        return

    print("HA: " + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
    with open(connection_file) as fp:
        cf_json = json.load(fp)
        fp.close()

    print("HB: " + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        s.connect((response_ip, response_port))
        s.send(json.dumps(cf_json).encode(encoding='utf-8'))
    finally:
        s.close()
    print("HC: " + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])


if __name__ == "__main__":
    """
        Usage: spark-submit launch_ipykernel [connection_file] [--response-address <response_addr>]
    """

    print("A: " + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])

    parser = argparse.ArgumentParser()
    parser.add_argument('connection_file', help='Connection file to write connection info')
    parser.add_argument('--response-address', nargs='?', metavar='<ip>:<port>', help='Connection address (<ip>:<port>) for returning connection file')
    arguments = vars(parser.parse_args())
    connection_file = arguments['connection_file']
    response_addr = arguments['response_address']  # Although argument uses dash, argparse converts to underscore.

    print("B: " + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
    # create a Spark session
    spark = SparkSession.builder.getOrCreate()

    print("C: " + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
    # setup Spark session variables
    sc = spark.sparkContext
    sql = spark.sql

    print("D: " + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
    # setup Spark legacy variables for compatibility
    sqlContext = spark._wrapped
    sqlCtx = sqlContext

    print("E: " + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
    ip = "0.0.0.0"

    # If the connection file doesn't exist, then we're using 'pull' or 'socket' mode - otherwise 'push' mode.
    # If 'pull' or 'socket', create the file, then return it to server (if response address provided, i.e., 'socket')
    if not os.path.isfile(connection_file):
        key = str_to_bytes(str(uuid.uuid4()))
        print("F: " + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
        write_connection_file(fname=connection_file, ip=ip, key=key)
        print("G: " + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])

        if response_addr:
            print("H: " + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
            return_connection_info(connection_file, ip, response_addr)
            print("I: " + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])

    print("J: " + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
    # launch the IPython kernel instance
    embed_kernel(connection_file=connection_file, ip=ip)

    # stop the SparkContext after the kernel is stopped/killed
    spark.stop()

Re-instate --proxy-user in kernel files (enforce Kerberos)

We temporarily removed the --proxy-user ... from the etc/kernels/../kernel.json files in PR #51.

We should re-instate the impersonation requirement once we have a working Kerberos setup and/or an install script to lay down the kernel files with an option to enable/disable impersonation with Kerberos.

Create kernel launch application for Toree Scala kernels to enable pull-mode

Similar to the launch script for the py spark kernel (see launch_ipkernel.py), the Toree/Scala kernel invocations could also utilize a launch application that is responsible for constructing the necessary information for location-relative connection files thereby dramatically decreasing the likelihood of port conflicts. This would include the identification of local ports and public IP - as well as generation of the key used to sign socket traffic.

One approach could be to modify Toree itself to detect a non-existent connection file and produce the applicable information at startup - writing out that information to the specified file, but a launch application provides an area of placing other forms of logic (e.g., workspace management) that we might not have otherwise.

Add support for pull and socket mode to StandaloneProcessProxy

The StandaloneProcessProxy currently only supports push mode for connection files. Now that we have launchers, we can leverage them for Standalone (yarn-client) kernels. However, once we support impersonation, then Standalone kernels will expose too many issues and probably need to be prevented. As a result, we need to decide the priority of this.

Scala kernels don't connect on HDP when waitAppCompletion=false and socket mode

This is related to #48. During testing on HDP, I found that toree kernels could not complete their initial connection sequence when configured with --conf spark.yarn.submit.waitAppCompletion=false and connection-file-mode = socket and only on HDP clusters.

If any of those three parameters is different (i.e., waitAppCompletion removed, connection file mode is changed to pull or on IOP cluster), then the kernel works fine.

@liukun1016 I'm hoping you can take look - first by reproducing, then probably by adding some tracing in the launcher, etc.

Due to this issue, I plan on changing the connection file mode on Scala kernels to pull when delivering changes for #48.

Here's an excerpt from the log showing the reception of the json connection info with no additional kernel activity entries...

[D 2017-07-10 08:08:57.949 KernelGatewayApp] Waiting for application_1499374582831_0027 to connect back to receive connection info...
[D 2017-07-10 08:08:58.967 KernelGatewayApp] KernelID=b351d958-8361-4ced-9cf5-1bcf947deb03, ApplicationID=application_1499374582831_0027, AssignedHost=elyra-fyi-node-5.fyre.ibm.com, CurrentState=RUNNING, Attempt=14
[D 2017-07-10 08:08:58.967 KernelGatewayApp] Connected to ('172.16.190.45', 48714)...
[D 2017-07-10 08:08:58.968 KernelGatewayApp] Connected to ('172.16.190.45', 48714)...
[D 2017-07-10 08:08:58.969 KernelGatewayApp] RemoteKernelManager: Writing connection file with ip=172.16.190.45, control=33269, hb=36826, iopub=33802, stdin=41918, shell=33198
[D 2017-07-10 08:08:58.969 KernelGatewayApp] Successfully updated connection file '/var/run/elyra/runtime/kernel-b351d958-8361-4ced-9cf5-1bcf947deb03.json'.
[D 2017-07-10 08:08:58.971 KernelGatewayApp] Connecting to: tcp://172.16.190.45:33269
[D 2017-07-10 08:08:58.972 KernelGatewayApp] Connecting to: tcp://172.16.190.45:33802
[I 2017-07-10 08:08:58.974 KernelGatewayApp] Kernel started: b351d958-8361-4ced-9cf5-1bcf947deb03
[D 2017-07-10 08:08:58.974 KernelGatewayApp] Kernel args: {'kernel_name': u'spark_2.1_scala_yarn_cluster', 'env': {'HOSTNAME': 'elyra-fyi-node-1.fyre.ibm.com', 'NO_PROXY': 'localhost,127.0.0.1,.fyre.ibm.com', 'http_proxy': 'http://proxy1.fyre.ibm.com:3128', 'FTP_PROXY': 'http://proxy1.fyre.ibm.com:3128', 'LESSOPEN': '||/usr/bin/lesspipe.sh %s', 'ELYRA_REMOTE_USER': 'elyra', 'ELYRA_REMOTE_HOSTS': 'elyra-fyi-node-1,elyra-fyi-node-2,elyra-fyi-node-3,elyra-fyi-node-4,elyra-fyi-node-5', 'LOGNAME': 'elyra', 'USER': 'elyra', 'QTDIR': '/usr/lib64/qt-3.3', 'PATH': '/opt/anaconda2/bin:/usr/lib64/qt-3.3/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/home/elyra/.local/bin:/home/elyra/bin', 'LANG': 'en_US.UTF-8', 'QTLIB': '/usr/lib64/qt-3.3/lib', 'TERM': 'xterm-256color', 'SHELL': '/bin/bash', 'ELYRA_PROXY_LAUNCH_LOG': '/var/log/elyra/proxy_launch_2017-07-09.log', 'SHLVL': '2', 'HISTSIZE': '1000', 'KERNEL_ID': u'b351d958-8361-4ced-9cf5-1bcf947deb03', 'ELYRA_YARN_ENDPOINT': 'http://elyra-fyi-node-1:8088/ws/v1/cluster', 'HOME': '/home/elyra', 'no_proxy': 'localhost,127.0.0.1,.fyre.ibm.com', 'QT_GRAPHICSSYSTEM': 'native', 'JUPYTER_DATA_DIR': '/var/lib/elyra', 'ftp_proxy': 'http://proxy1.fyre.ibm.com:3128', u'KERNEL_USERNAME': u'Foo', 'ELYRA_KERNEL_LAUNCH_TIMEOUT': '90', 'MAIL': '/var/spool/mail/elyra', 'QT_GRAPHICSSYSTEM_CHECKED': '1', 'XDG_SESSION_ID': '987', '_': '/opt/anaconda2/bin/jupyter', 'HTTP_PROXY': 'http://proxy1.fyre.ibm.com:3128', 'HISTCONTROL': 'ignoredups', 'QTINC': '/usr/lib64/qt-3.3/include', 'PWD': '/home/elyra', 'JUPYTER_RUNTIME_DIR': '/var/run/elyra/runtime'}}
[I 170710 08:08:58 web:2063] 201 POST /api/kernels (9.108.95.126) 16935.83ms
[I 170710 08:08:59 web:2063] 200 GET /api/kernels/b351d958-8361-4ced-9cf5-1bcf947deb03 (9.108.95.126) 1.10ms
[D 2017-07-10 08:08:59.201 KernelGatewayApp] Initializing websocket connection /api/kernels/b351d958-8361-4ced-9cf5-1bcf947deb03/channels
[W 2017-07-10 08:08:59.203 KernelGatewayApp] No session ID specified
[D 2017-07-10 08:08:59.204 KernelGatewayApp] Requesting kernel info from b351d958-8361-4ced-9cf5-1bcf947deb03
[D 2017-07-10 08:08:59.204 KernelGatewayApp] Connecting to: tcp://172.16.190.45:33198
[D 2017-07-10 08:09:06.398 KernelGatewayApp] Polling every 30 seconds for kernels idle > 600 seconds...
[D 2017-07-10 08:09:06.398 KernelGatewayApp] kernel_id=b351d958-8361-4ced-9cf5-1bcf947deb03, kernel_name=spark_2.1_scala_yarn_cluster, last_activity=2017-07-10 15:08:58.972078+00:00
[W 2017-07-10 08:09:09.206 KernelGatewayApp] Timeout waiting for kernel_info reply from b351d958-8361-4ced-9cf5-1bcf947deb03
[I 170710 08:09:09 web:2063] 101 GET /api/kernels/b351d958-8361-4ced-9cf5-1bcf947deb03/channels (9.108.95.126) 10006.78ms
[D 2017-07-10 08:09:09.208 KernelGatewayApp] Opening websocket /api/kernels/b351d958-8361-4ced-9cf5-1bcf947deb03/channels
[D 2017-07-10 08:09:09.209 KernelGatewayApp] Connecting to: tcp://172.16.190.45:33198
[D 2017-07-10 08:09:09.209 KernelGatewayApp] Connecting to: tcp://172.16.190.45:33802
[D 2017-07-10 08:09:09.210 KernelGatewayApp] Connecting to: tcp://172.16.190.45:41918
[D 2017-07-10 08:09:36.397 KernelGatewayApp] Polling every 30 seconds for kernels idle > 600 seconds...
[D 2017-07-10 08:09:36.398 KernelGatewayApp] kernel_id=b351d958-8361-4ced-9cf5-1bcf947deb03, kernel_name=spark_2.1_scala_yarn_cluster, last_activity=2017-07-10 15:08:58.972078+00:00

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.