Giter Club home page Giter Club logo

rayfed's Introduction

RayFed

docs building test on many rays test on ray 1.13.0

A multiple parties joint, distributed execution engine based on Ray, to help build your own federated learning frameworks in minutes.

Overview

Note: This project is now in actively developing.

RayFed is a distributed computing framework for cross-parties federated learning. Built in the Ray ecosystem, RayFed provides a Ray native programming pattern for federated learning so that users can build a distributed program easily.

It provides users the role of "party", thus users can write code belonging to the specific party explicitly imposing more clear data perimeters. These codes will be restricted to execute within the party.

As for the code execution, RayFed introduces the multi-controller architecture: The code view in each party is exactly the same, but the execution differs based on the declared party of code and the current party of executor.

Features

  • Ray Native Programming Pattern

    Let you write your federated and distributed computing applications like a single-machine program.

  • Multiple Controller Execution Mode

    The RayFed job can be run in the single-controller mode for developing and debugging and the multiple-controller mode for production without code change.

  • Very Restricted and Clear Data Perimeters

    Because of the PUSH-BASED data transferring mechanism and multiple controller execution mode, the data transmission authority is held by the data owner rather than the data demander.

  • Very Large Scale Federated Computing and Training

    Powered by the scalabilities and the distributed abilities from Ray, large scale federated computing and training jobs are naturally supported.

Supported Ray Versions

RayFed Versions ray-1.13.0 ray-2.4.0 ray-2.5.1 ray-2.6.3 ray-2.7.1 ray-2.8.1 ray-2.9.0
0.1.0
0.2.0 not released not released not released not released not released not released not released

Installation

Install it from pypi.

pip install -U rayfed

Install the nightly released version from pypi.

pip install -U rayfed-nightly

Quick Start

This example shows how to aggregate values across two participators.

Step 1: Write an Actor that Generates Value

The MyActor increment its value by num. This actor will be executed within the explicitly declared party.

import sys
import ray
import fed

@fed.remote
class MyActor:
    def __init__(self, value):
        self.value = value

    def inc(self, num):
        self.value = self.value + num
        return self.value

Step 2: Define Aggregation Function

The below function collects and aggragates values from two parties separately, and will also be executed within the declared party.

@fed.remote
def aggregate(val1, val2):
    return val1 + val2

Step 3: Create the actor and call methods in a specific party

The creation code is similar with Ray, however, the difference is that in RayFed the actor must be explicitly created within a party:

actor_alice = MyActor.party("alice").remote(1)
actor_bob = MyActor.party("bob").remote(1)

val_alice = actor_alice.inc.remote(1)
val_bob = actor_bob.inc.remote(2)

sum_val_obj = aggregate.party("bob").remote(val_alice, val_bob)

The above codes:

  1. Create two MyActors separately in each party, i.e. 'alice' and 'bob';
  2. Increment by '1' in alice and '2' in 'bob';
  3. Execute the aggregation function in party 'bob'.

Step 4: Declare Cross-party Cluster & Init

def main(party):
    ray.init(address='local', include_dashboard=False)

    addresses = {
        'alice': '127.0.0.1:11012',
        'bob': '127.0.0.1:11011',
    }
    fed.init(addresses=addresses, party=party)

This first declares a two-party cluster, whose addresses corresponding to '127.0.0.1:11012' in 'alice' and '127.0.0.1:11011' in 'bob'. And then, the fed.init create a cluster in the specified party. Note that fed.init should be called twice, passing in the different party each time.

When executing codes in step 1~3, the 'alice' cluster will only execute functions whose "party" are also declared as 'alice'.

Put it together !

Save below codes as demo.py:

import sys
import ray
import fed


@fed.remote
class MyActor:
    def __init__(self, value):
        self.value = value

    def inc(self, num):
        self.value = self.value + num
        return self.value


@fed.remote
def aggregate(val1, val2):
    return val1 + val2


def main(party):
    ray.init(address='local', include_dashboard=False)

    addresses = {
        'alice': '127.0.0.1:11012',
        'bob': '127.0.0.1:11011',
    }
    fed.init(addresses=addresses, party=party)

    actor_alice = MyActor.party("alice").remote(1)
    actor_bob = MyActor.party("bob").remote(1)

    val_alice = actor_alice.inc.remote(1)
    val_bob = actor_bob.inc.remote(2)

    sum_val_obj = aggregate.party("bob").remote(val_alice, val_bob)
    result = fed.get(sum_val_obj)
    print(f"The result in party {party} is {result}")

    fed.shutdown()
    ray.shutdown()


if __name__ == "__main__":
    assert len(sys.argv) == 2, 'Please run this script with party.'
    main(sys.argv[1])

Run The Code.

Open a terminal and run the code as alice. It's recommended to run the code with Ray TLS enabled (please refer to Ray TLS)

RAY_USE_TLS=1 \
RAY_TLS_SERVER_CERT='/path/to/the/server/cert/file' \
RAY_TLS_SERVER_KEY='/path/to/the/server/key/file' \
RAY_TLS_CA_CERT='/path/to/the/ca/cert/file' \
python test.py alice

In the mean time, open another terminal and run the code as bob.

RAY_USE_TLS=1 \
RAY_TLS_SERVER_CERT='/path/to/the/server/cert/file' \
RAY_TLS_SERVER_KEY='/path/to/the/server/key/file' \
RAY_TLS_CA_CERT='/path/to/the/ca/cert/file' \
python test.py bob

Then you will get The result in party alice is 5 on the first terminal screen and The result in party bob is 5 on the second terminal screen.

Figure shows the execution under the hood:

Figure
## Running untrusted codes As a general rule: Always execute untrusted codes inside a sandbox (e.g., [nsjail](https://github.com/google/nsjail)).

Who use us

Ant Chain Morse SecretFlow

rayfed's People

Contributors

anakinxc avatar fengsp avatar ian-huu avatar jovany-wang avatar krout0n avatar nkcqx avatar wp19991 avatar zhouaihui avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rayfed's Issues

Imporve README page.

The following sections needs to be added:

  • the table of Ray versions that we've been supporting
  • Contact us
  • How to contribute
  • Feature key words
  • ?

Change ray dependency version & release name

  1. The current used ray dependency called "secretflow-ray", is it possible to just use "ray" ?
  2. Our released package name is "secretflow-rayfed" rather than "rayfed", rename to "rayfed" can be easier to understand

Remove jax dependency

We only use jax for the tree_flatten and tree_unflatten utilities, while introducing the heavy dependency.

Let us remove the jax dependency and use an alternative utilities.

[RFC] support a global barrier to sync the process in all parties.

Currently, it's hard to control the process due to the asymmetrical workloads. So let's propose a global barrier global_sync to make sure all parties are in here.

o1 = f.party("ALICE").remote()
o2 = g.party("BOB").remote()
global_sync()
h.party("ALICE").remote() # `h` will be invoked after the global_sync getting invoked in all parties.

KeyError raised if we don't provide the key for cert_clients.

Reproduced script:

    cert_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "/tmp/rayfed/test-certs/")
    ca_config = {
                "ca_cert": os.path.join(cert_dir, "server.crt"),
                "cert": os.path.join(cert_dir, "server.crt"),
                "key": os.path.join(cert_dir, "server.key"),
    }
    ca_config_client = { # we don't provide a key for cert client
                "ca_cert": os.path.join(cert_dir, "server.crt"),
                "cert": os.path.join(cert_dir, "server.crt"),
    }
    tls_config_alice = { "cert": ca_config, "client_certs": { "bob": ca_config_client }}
    tls_config_bob = { "cert": ca_config, "client_certs": { "alice": ca_config_client }}
    tls_config = tls_config_alice if party == "alice" else tls_config_bob

    cluster = {
        'alice': {'address': '127.0.0.1:11010'},
        'bob': {'address': '127.0.0.1:11011'},
    }
    fed.init(address='local', cluster=cluster, party=party, tls_config=tls_config)

And then the following KeyError will be raised.

ValueError: Failed to look up actor with name 'RecverProxyActor-alice'. This could because 1. You are trying to look up a named actor you didn't create. 2. The named actor died. 3. You did not use a namespace matching the namespace of the actor.
2023-02-20 15:35:49 INFO cleanup.py:102 [bob] --  Notify check sending thread to exit.
2023-02-20 15:35:50 WARNING cleanup.py:59 [bob] --  Failed to send ObjectRef(82891771158d68c1f3f79669d0327c1f4900e1ae0100000001000000) with error: ray::SendProxyActor.send() (pid=67928, ip=127.0.0.1, repr=<fed.barriers.SendProxyActor object at 0x7fc800129e50>)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
  File "/Users/qingwang/workspace/opensource/RayFed/fed/barriers.py", line 213, in send
    response = await send_data_grpc(
  File "/Users/qingwang/workspace/opensource/RayFed/fed/barriers.py", line 127, in send_data_grpc
    ca_cert, private_key, cert_chain = fed_utils.load_client_certs(
  File "/Users/qingwang/workspace/opensource/RayFed/fed/utils.py", line 132, in load_client_certs
    return _load_from_cert_config(client_cert_config)
  File "/Users/qingwang/workspace/opensource/RayFed/fed/utils.py", line 109, in _load_from_cert_config
    private_key_file = cert_config["key"]
KeyError: 'key'
2023-02-20 15:35:50 INFO cleanup.py:65 [bob] --  Check sending thread was exited.

[Feature] handle the large start time difference.

The problem

The start time of every party can differ very much in real production leading sending failure. The cross-silo grpc retry is not a good way to handle this since it's designed for network jitter.

Suggestion

We can wait for all others until their grpc services are all ready at the end of fed.init.

Pass grpc metadata

The python grpc code:

import grpc
from your_grpc_module import your_pb2, your_pb2_grpc

# Create gRPC channel
channel = grpc.insecure_channel('localhost:50051')
stub = your_pb2_grpc.YourServiceStub(channel)

# Define the metadata
metadata = [
    ('header-key1', 'header-value1'),
    ('header-key2', 'header-value2'),
]

# RPC call within metadata
response = stub.YourRpcMethod(your_pb2.YourRequest(), metadata=metadata)

[bug] cross_silo_grpc_retry_policy doesn't work.

It seems that the argument maxAttempts: 360 is not able to be set correctly. Please see the following error, it shows that the value is 5 instead of 360(I set it to 360).
644fb478915f14696b86ef4409cb61f8

reproduced script:
image

Code quality improvements

for the following actions:

  • the abstractions and architectures
  • for some local code paths
  • the doc-strings and comments
  • logging messages
  • unit-tests infra
  • ?

[benchmark] Introduce benchmark infra

We should add our baselines on:

  • The memory usage baseline.
  • The parties number baseline.
  • The performance baseline on the task scheduling across parties.
  • The performance baseline on the communication between parties.

For the baselines, we should introduce benchmark infra first.

Add user friendly debug log

debug info like:

2023-02-10 07:06:00,812	WARNING worker.py:1851 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ffffffffffffffff90dafcf9eb358393d790d3d503000000 Worker ID: a3ecdc4fd2ea61036a0ff1d373a1d904fef544dec5e55a7b1b0ba0f3 Node ID: ae60bbcf8816aaa352738f655f6066e8dc9c1cecf951a0f62cef41b9 Worker IP address: 172.16.201.146 Worker port: 10014 Worker PID: 1377 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
2023-02-10 07:06:00 WARNING fed.cleanup [alice] --  Failed to send ObjectRef(4984f7d6f9a761ad90dafcf9eb358393d790d3d50300000001000000) with error: The actor died unexpectedly before finishing this task.
	class_name: SendProxyActor
	actor_id: 90dafcf9eb358393d790d3d503000000
	pid: 1377
	name: SendProxyActor
	namespace: c2ec2b3b-3cf8-46b1-8c9f-3e9985157c6d
	ip: 172.16.201.146
The actor is dead because its worker process has died. Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
2023-02-10 07:06:00 WARNING fed.cleanup [alice] --  Signal self to exit.
*** SIGTERM received at time=1676012760 on cpu 10 ***
2023-02-10 07:06:00 INFO fed.cleanup [alice] --  Check sending thread was exited.
PC: @     0x7f9f2f4f174a  (unknown)  pthread_cond_timedwait@@GLIBC_2.3.2
    @     0x7f9f2f4f5c20  (unknown)  (unknown)
[2023-02-10 07:06:00,816 E 1214 1214] logging.cc:361: *** SIGTERM received at time=1676012760 on cpu 10 ***
[2023-02-10 07:06:00,817 E 1214 1214] logging.cc:361: PC: @     0x7f9f2f4f174a  (unknown)  pthread_cond_timedwait@@GLIBC_2.3.2
[2023-02-10 07:06:00,817 E 1214 1214] logging.cc:361:     @     0x7f9f2f4f5c20  (unknown)  (unknown)
(secretflow) bash-4.4# python sl_test_alice.py

should provide a more user friendly log show connection error.

Build manylinux wheels

  • Build manylinux wheels in Github actions
  • Add nightly release process in Github actions

Failed to execute `pip install -e .`

The error message of executing pip install -e . -v:

-> % pip install -e . -v
Using pip 22.2.2 from /Users/qingwang/anaconda3/lib/python3.9/site-packages/pip (python 3.9)
Looking in indexes: https://mirrors.aliyun.com/pypi/simple/
Obtaining file:///Users/qingwang/workspace/opensource/RayFed
  Running command pip subprocess to install build dependencies
  Looking in indexes: https://mirrors.aliyun.com/pypi/simple/
  Collecting flit_core<4,>=3.2
    Downloading https://mirrors.aliyun.com/pypi/packages/04/ac/f91a0219e0d4162762f218e4876bf328f16e4d283e691cd7f42a02b22634/flit_core-3.8.0-py3-none-any.whl (62 kB)
       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.6/62.6 kB 272.8 kB/s eta 0:00:00
  Installing collected packages: flit_core
  Successfully installed flit_core-3.8.0
  Installing build dependencies ... done
  Running command Checking if build backend supports build_editable
  Checking if build backend supports build_editable ... done
  Running command Getting requirements to build editable
  Traceback (most recent call last):
    File "/Users/qingwang/anaconda3/lib/python3.9/site-packages/pip/_vendor/pep517/in_process/_in_process.py", line 363, in <module>
      main()
    File "/Users/qingwang/anaconda3/lib/python3.9/site-packages/pip/_vendor/pep517/in_process/_in_process.py", line 345, in main
      json_out['return_val'] = hook(**hook_input['kwargs'])
    File "/Users/qingwang/anaconda3/lib/python3.9/site-packages/pip/_vendor/pep517/in_process/_in_process.py", line 144, in get_requires_for_build_editable
      return hook(config_settings)
    File "/private/var/folders/q4/6c8j2fsx34v45njdq0vdkhlm0000gp/T/pip-build-env-svvukwe7/overlay/lib/python3.9/site-packages/flit_core/buildapi.py", line 31, in get_requires_for_build_wheel
      module = Module(info.module, Path.cwd())
    File "/private/var/folders/q4/6c8j2fsx34v45njdq0vdkhlm0000gp/T/pip-build-env-svvukwe7/overlay/lib/python3.9/site-packages/flit_core/common.py", line 59, in __init__
      raise ValueError("No file/folder found for module {}".format(name))
  ValueError: No file/folder found for module RayFed
  error: subprocess-exited-with-error

  × Getting requirements to build editable did not run successfully.
  │ exit code: 1
  ╰─> See above for output.

  note: This error originates from a subprocess, and is likely not a problem with pip.
  full command: /Users/qingwang/anaconda3/bin/python /Users/qingwang/anaconda3/lib/python3.9/site-packages/pip/_vendor/pep517/in_process/_in_process.py get_requires_for_build_editable /var/folders/q4/6c8j2fsx34v45njdq0vdkhlm0000gp/T/tmp3fvd7gfx
  cwd: /Users/qingwang/workspace/opensource/RayFed
  Getting requirements to build editable ... error
error: subprocess-exited-with-error

× Getting requirements to build editable did not run successfully.
│ exit code: 1
╰─> See above for output.

note: This error originates from a subprocess, and is likely not a problem with pip.

[RFC] Support running in different modes.

I'm proposing that support running rayfed job in single-controller mode.

I'd like to propose 2 options on how we startup the single-controller cluster and how we connect to the cluster and run our jobs.

option 1

Add a new cli toolkit to start the cluster, it just wrapper the ray cli toolkit, for example:

A. running single-controller mode

> rayfed start --head --mode=single-controller --party=ALICE  # node1, listening on 1.2.3.4:5555
> rayfed start --address="1.2.3.4:5555" --party=ALICE  # node2, connecting to the node1
> rayfed start --address="1.2.3.4:5555" --party=BOB  # node3, connecting to the node1
> rayfed start --address="1.2.3.4:5555" --party=BOB  # node4, connecting to the node1

And then, the job could be run in single controller mode automatically:

# main.py
fed.init(address="1.2.3.4:5555", xxx)
# Nothing need to be changed in this job script.

B. running multiple-controller mode

> rayfed start --head --mode=multiple-controller --party=ALICE  # node1, listening on 1.2.3.4:5555
> rayfed start --address="1.2.3.4:5555" --party=ALICE  # node2, connecting to the node1
> rayfed start --head --mode=multiple-controller --party=BOB  # node3, listening on 5.6.7.8:6666
> rayfed start --address="5.6.7.8:6666" --party=BOB  # node4, connecting to the node3

And then, you run the following script in 2 clusters:

# main.py
fed.init(address="1.2.3.4:5555", xxx)
# nothing need to be changed in this job script.
# in node2
> python main.py --party=ALICE
# in node3
> python main.py --party=BOB

option 2

No need to add a new toolkit, but we should tell users that add some extra arguments when starting up the Ray cluster.
For example,

A. running single-controller mode

> ray start --head --resources={"_PARTY_ALICE", 9999}  # node1, listening on 1.2.3.4:5555
> ray start --address="1.2.3.4:5555" --resources={"_PARTY_ALICE", 9999}  # node2, connecting to the node1
> ray start --address="1.2.3.4:5555" --resources={"_PARTY_BOB", 9999}  # node3, connecting to the node1
> ray start --address="1.2.3.4:5555" --resources={"_PARTY_BOB", 9999}  # node4, connecting to the node1

And then, add the extra mode info when fed.init():

# main.py
fed.init(address="1.2.3.4:5555", mode="single-controller", xxx)
# Nothing need to be changed in this job script.

A. running multiple-controller mode

> ray start --head # node1, listening on 1.2.3.4:5555
> ray start --address="1.2.3.4:5555" # node2, connecting to the node1
> ray start --head # node3, listening on 5.6.7.8:6666
> ray start --address="5.6.7.8:6666" # node4, connecting to the node3

And then, add the extra mode info when fed.init()(And we could ignore it if we provide a default value):

# main.py
fed.init(address="1.2.3.4:5555", mode="multiple-controller", xxx)
# Nothing need to be changed in this job script.

Refactor fed configuration system.

Currently, the Fed Configuration system is mess. It's not easy to introduce a new configuration item. To introduce a new item, we should change many places, such as putting a new item(k-v pair) to internal kv, fetching via internal_kv, and passing to other places.

For further developing, we'd like to refactor the fed configuration system, to let developers have a simple operation for that.

In the new system, we'd like to introduce cluster configurations and job configurations. All of the above will be written into internal kv.

Enhance compatibility with protobuf >=4.0

We tries to enhance the compatibility for protobuf, see #101
rayfed is compatible with all protobuf 3.x versions, but it's incompatible with protobuf 4.x.

We should generated 2 files for different protobuf versions.

[bug] `exit_on_failure_cross_silo_sending` won't work as expected

The param exit_on_failure_cross_silo_sending of fed.init is designed to decide whether to exit on cross-silo sending failure.
But the main thread will exit once cross-silo sending failed, since SendProxyActor does not catch any exception and exits as soon as failed to send.

2 nodes connecting failed when RAY_TLS is enabled.

Node status is OK once TLS is enabled:

======== Autoscaler status: 2023-02-07 10:08:08.216389 ========
Node status
---------------------------------------------------------------
Healthy:
 1 node_2a321ba103a914821571f010bd317d56425a7552405a068dc7f245b7
 1 node_06e9368d851e3ba77f7a8a91076ff977fad89b5ba2f37b75588ce718
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 0.0/4.0 CPU
 0.00/25.844 GiB memory
 0.00/11.929 GiB object_store_memory
 0.0/8.0 promoter
 0.0/8.0 provider

Demands:
 (no resource demands)

The following script failed:

error message is :

2023-02-07 10:09:57.143117: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-02-07 10:10:04,480 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:10:11,496 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:10:18,512 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:10:25,526 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:10:32,540 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:10:39,558 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:10:46,577 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:10:53,592 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:11:00,606 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:11:07,621 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:11:14,637 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:11:21,653 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:11:28,668 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:11:35,684 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:11:42,701 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:11:49,717 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:11:56,734 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:12:03,751 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:12:10,765 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
2023-02-07 10:12:17,780 WARNING utils.py:1242 -- Unable to connect to GCS at 192.168.227.195:6379. Check that (1) Ray GCS with matching version started successfully at the specified address, and (2) there is no firewall setting preventing access.
Traceback (most recent call last):
  File "test-get-data.py", line 7, in <module>
    sf.init(address='192.168.227.195:6379')
  File "/root/miniconda3/envs/wefe-python-3.8/lib/python3.8/site-packages/secretflow/device/driver.py", line 215, in init
    ray.init(
  File "/root/miniconda3/envs/wefe-python-3.8/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/root/miniconda3/envs/wefe-python-3.8/lib/python3.8/site-packages/ray/worker.py", line 1101, in init
    _global_node = ray.node.Node(
  File "/root/miniconda3/envs/wefe-python-3.8/lib/python3.8/site-packages/ray/node.py", line 185, in __init__
    session_name = ray._private.utils.internal_kv_get_with_retry(
  File "/root/miniconda3/envs/wefe-python-3.8/lib/python3.8/site-packages/ray/_private/utils.py", line 1258, in internal_kv_get_with_retry
    raise RuntimeError(
RuntimeError: Could not read 'session_name' from GCS. Did GCS start successfully?

versions:
secretflow-ray 2.0.0.dev1

[bug] The logging format is not applied to SendProxy and RecverProxy

The logging format not takes effect on SendProxy and RecverProxy

(RecverProxyActor pid=3610565) INFO:fed.barriers:Successfully start Grpc service without credentials.
2023-03-16 16:14:21 INFO barriers.py:373 [alice] --  SendProxy was successfully created.
(SendProxyActor pid=3610738) DEBUG:fed.barriers:Sending data to seq_id 7 from 5#0 without credentials.

Are there more details about the `simple and controllable protocols` for cross-solis Ray?

从开源的代码里看rayfed在初始化时在ray的内部kv缓存中确定可以被反序列化的白名单类,然后就是普通的grpc+tls传输,不知道我的理解是否贴切?
想进一步了解的一是ray内部缓存这个中间件是experimental,这和rayfed想表达的生产级别安全性是否有一点点不搭?
二是目前代码块发布的版本展示了这个白名单类可以配置一些numpy ndarry之类的数据结构,但是隐语在做MPC、PSI这些操作时传输的数据结构是这样简单的吗,据群里专家介绍应该是通过pybind11包装了yet another crypto library里的数据结构吧,那么白名单该怎么写,或者说整个基于rayfed的MPC和PSI该如何实现,是否可以提供一些demo呢?
三是grpc+tls这里tls如果要使用SM2那么应该如何配置呢?据我了解似乎基于openssl是没法支持SM2的,在python grpc中实现这个有一定难度。

KeyError raised if we don't provide the key for cert_clients.

Reproduced script:

    cert_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "/tmp/rayfed/test-certs/")
    ca_config = {
                "ca_cert": os.path.join(cert_dir, "server.crt"),
                "cert": os.path.join(cert_dir, "server.crt"),
                "key": os.path.join(cert_dir, "server.key"),
    }
    ca_config_client = { # we don't provide a key for cert client
                "ca_cert": os.path.join(cert_dir, "server.crt"),
                "cert": os.path.join(cert_dir, "server.crt"),
    }
    tls_config_alice = { "cert": ca_config, "client_certs": { "bob": ca_config_client }}
    tls_config_bob = { "cert": ca_config, "client_certs": { "alice": ca_config_client }}
    tls_config = tls_config_alice if party == "alice" else tls_config_bob

    cluster = {
        'alice': {'address': '127.0.0.1:11010'},
        'bob': {'address': '127.0.0.1:11011'},
    }
    fed.init(address='local', cluster=cluster, party=party, tls_config=tls_config)

And then the following KeyError will be raised.

ValueError: Failed to look up actor with name 'RecverProxyActor-alice'. This could because 1. You are trying to look up a named actor you didn't create. 2. The named actor died. 3. You did not use a namespace matching the namespace of the actor.
2023-02-20 15:35:49 INFO cleanup.py:102 [bob] --  Notify check sending thread to exit.
2023-02-20 15:35:50 WARNING cleanup.py:59 [bob] --  Failed to send ObjectRef(82891771158d68c1f3f79669d0327c1f4900e1ae0100000001000000) with error: ray::SendProxyActor.send() (pid=67928, ip=127.0.0.1, repr=<fed.barriers.SendProxyActor object at 0x7fc800129e50>)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
  File "/Users/qingwang/workspace/opensource/RayFed/fed/barriers.py", line 213, in send
    response = await send_data_grpc(
  File "/Users/qingwang/workspace/opensource/RayFed/fed/barriers.py", line 127, in send_data_grpc
    ca_cert, private_key, cert_chain = fed_utils.load_client_certs(
  File "/Users/qingwang/workspace/opensource/RayFed/fed/utils.py", line 132, in load_client_certs
    return _load_from_cert_config(client_cert_config)
  File "/Users/qingwang/workspace/opensource/RayFed/fed/utils.py", line 109, in _load_from_cert_config
    private_key_file = cert_config["key"]
KeyError: 'key'
2023-02-20 15:35:50 INFO cleanup.py:65 [bob] --  Check sending thread was exited.

Support running multiple actor instances in one process.

In the research and simulation scenarios, users may need to run large workloads within limited resources.
If we use raw Ray actors as fed instances, it may limit the workloads in a single machine.

We'd like to introduce the Ray parallel actor concept to archive this goal.

Some references to Ray concurrency group and Ray parallel actor concepts:
google-doc: Ray ConcurrencyGroup Proposal
google-doc: Ray Concurrency Group API V2-latest
Ray Java Parallel Actor PR: ray-project/ray#21701

It's still one of the #110

quickstart example fail on two hosts

quick start(script)

https://github.com/ray-project/rayfed#quick-start

Open two ssh terminal on single host, it's OK:

2023-02-01 08:31:01,790 INFO worker.py:1538 -- Started a local Ray instance.
2023-02-01 08:31:04 INFO fed.barriers [alice] --  RecverProxy was successfully created.
(RecverProxyActor pid=692) INFO:fed.barriers:Successfully start Grpc service without credentials.
2023-02-01 08:31:05 INFO fed.barriers [alice] --  SendProxy was successfully created.
2023-02-01 08:31:05 INFO fed.cleanup [alice] --  Start check sending thread.
2023-02-01 08:31:05 INFO fed.cleanup [alice] --  Start check sending monitor thread.
The result in party alice is 5
2023-02-01 08:31:43 INFO fed.cleanup [alice] --  Notify check sending thread to exit.
2023-02-01 08:31:44 INFO fed.cleanup [alice] --  Check sending thread was exited.
2023-02-01 08:31:46 INFO fed.api [alice] --  Shutdowned ray.

quick start(interactively)

single host

at 192.168.12.14,two terminal & python:alice & bob

//define task
import sys
import fed
@fed.remote
class MyActor:
    def __init__(self, value):
        self.value = value
    def inc(self, num):
        self.value = self.value + num
        return self.value

@fed.remote
def aggregate(val1, val2):
    return val1 + val2
    
//define and init RayFed(note:different party id in fed.init)
//@alice
cluster = {'alice': {'address': '**192.168.12.14**:11013'},'bob': {'address': '**192.168.12.14**:11014'}}
fed.init(address='local', cluster=cluster, party='alice')

//@bob
cluster = {'alice': {'address': '**192.168.12.14**:11013'},'bob': {'address': '**192.168.12.14**:11014'}}
fed.init(address='local', cluster=cluster, party='bob')

//define DAG(job)
actor_alice = MyActor.party("alice").remote(1)
actor_bob = MyActor.party("bob").remote(1)
val_alice = actor_alice.inc.remote(1)
val_bob = actor_bob.inc.remote(2)
sum_val_obj = aggregate.party("bob").remote(val_alice, val_bob)
result = fed.get(sum_val_obj)
print(f"The result is {result}")

fed.shutdown()

and it's OK:

>>> print(f"The result is {result}")
The result is 5

two hosts

only change the cluster config: 192.168.12.13192.168.12.14

//define and init RayFed(note:different party id in fed.init)
//@alice
cluster = {'alice': {'address': '**192.168.12.13**:11013'},'bob': {'address': '**192.168.12.14**:11014'}}
fed.init(address='local', cluster=cluster, party='alice')

//@bob
cluster = {'alice': {'address': '**192.168.12.13**:11013'},'bob': {'address': '**192.168.12.14**:11014'}}
fed.init(address='local', cluster=cluster, party='bob')

alice raise exceptions,and bob hang:

>>> actor_alice = MyActor.party("alice").remote(1)
>>> actor_bob = MyActor.party("bob").remote(1)
>>> val_alice = actor_alice.inc.remote(1)
>>> val_bob = actor_bob.inc.remote(2)
>>> sum_val_obj = aggregate.party("bob").remote(val_alice, val_bob)
2023-02-01 19:09:46 INFO fed.cleanup [alice] --  Start check sending thread.
2023-02-01 19:09:46 INFO fed.cleanup [alice] --  Start check sending monitor thread.
>>> result = fed.get(sum_val_obj)
2023-02-01 19:11:03,377 WARNING worker.py:1851 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ffffffffffffffff1ee563b44ac31096b7ba4a6a01000000 Worker ID: a3fdb717fc345cb5f211be2e9df4d422f4f17d9e6095063b5979c7a5 Node ID: d65f0ff24c8edac9d08637fddca92f06ee0ba4713d0d58d4589b9bac Worker IP address: 192.168.12.13 Worker port: 33132 Worker PID: 27136 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
2023-02-01 19:11:03 WARNING fed.cleanup [alice] --  Failed to send ObjectRef(82891771158d68c11ee563b44ac31096b7ba4a6a0100000001000000) with error: The actor died unexpectedly before finishing this task.
        class_name: SendProxyActor
        actor_id: 1ee563b44ac31096b7ba4a6a01000000
        pid: 27136
        name: SendProxyActor
        namespace: e629d53d-6ac0-4a94-ab75-ff336c42125c
        ip: 192.168.12.13
The actor is dead because its worker process has died. Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.

found the logs:

# ls | grep a3fdb717fc345cb5f211be2e9df4d422f4f17d9e6095063b5979c7a5
python-core-worker-a3fdb717fc345cb5f211be2e9df4d422f4f17d9e6095063b5979c7a5_27136.log
worker-a3fdb717fc345cb5f211be2e9df4d422f4f17d9e6095063b5979c7a5-01000000-27136.err
worker-a3fdb717fc345cb5f211be2e9df4d422f4f17d9e6095063b5979c7a5-01000000-27136.out

nothing in effect:

# cat worker-a3fdb717fc345cb5f211be2e9df4d422f4f17d9e6095063b5979c7a5-01000000-27136.out
:actor_name:SendProxyActor

# cat worker-a3fdb717fc345cb5f211be2e9df4d422f4f17d9e6095063b5979c7a5-01000000-27136.err
:actor_name:SendProxyActor

# cat python-core-worker-a3fdb717fc345cb5f211be2e9df4d422f4f17d9e6095063b5979c7a5_27136.log
[2023-02-01 19:09:28,847 I 27136 27136] core_worker_process.cc:107: Constructing CoreWorkerProcess. pid: 27136
[2023-02-01 19:09:28,856 I 27136 27136] io_service_pool.cc:35: IOServicePool is running with 1 io_service.
[2023-02-01 19:09:28,859 I 27136 27136] grpc_server.cc:120: worker server started, listening on port 33132.
[2023-02-01 19:09:28,865 I 27136 27136] core_worker.cc:215: Initializing worker at address: 192.168.12.13:33132, worker ID a3fdb717fc345cb5f211be2e9df4d422f4f17d9e6095063b5979c7a5, raylet d65f0ff24c8edac9d08637fddca92f06ee0ba4713d0d58d4589b9bac
[2023-02-01 19:09:28,868 I 27136 27136] core_worker.cc:559: Adjusted worker niceness to 15
[2023-02-01 19:09:28,868 I 27136 27163] core_worker.cc:510: Event stats:


Global stats: 13 total (9 active)
Queueing time: mean = 39.493 us, max = 158.184 us, min = 98.811 us, total = 513.407 us
Execution time:  mean = 19.139 us, total = 248.813 us
Event stats:
        PeriodicalRunner.RunFnPeriodically - 7 total (4 active, 1 running), CPU time: mean = 17.478 us, total = 122.348 us
        UNKNOWN - 2 total (2 active), CPU time: mean = 0.000 s, total = 0.000 s
        CoreWorker.deadline_timer.flush_profiling_events - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s
        WorkerInfoGcsService.grpc_client.AddWorkerInfo - 1 total (0 active), CPU time: mean = 126.465 us, total = 126.465 us
        InternalPubSubGcsService.grpc_client.GcsSubscriberPoll - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s
        InternalPubSubGcsService.grpc_client.GcsSubscriberCommandBatch - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s


[2023-02-01 19:09:28,870 I 27136 27163] accessor.cc:612: Received notification for node id = d65f0ff24c8edac9d08637fddca92f06ee0ba4713d0d58d4589b9bac, IsAlive = 1
[2023-02-01 19:09:28,879 I 27136 27136] direct_actor_task_submitter.cc:36: Set max pending calls to -1 for actor 1ee563b44ac31096b7ba4a6a01000000
[2023-02-01 19:09:28,880 I 27136 27136] direct_actor_task_submitter.cc:229: Connecting to actor 1ee563b44ac31096b7ba4a6a01000000 at worker a3fdb717fc345cb5f211be2e9df4d422f4f17d9e6095063b5979c7a5
[2023-02-01 19:09:28,880 I 27136 27136] core_worker.cc:2322: Creating actor: 1ee563b44ac31096b7ba4a6a01000000
[2023-02-01 19:09:29,458 I 27136 27136] direct_actor_transport.cc:179: Actor creation task finished, task_id: ffffffffffffffff1ee563b44ac31096b7ba4a6a01000000, actor_id: 1ee563b44ac31096b7ba4a6a01000000
[2023-02-01 19:09:29,464 I 27136 27136] out_of_order_actor_scheduling_queue.cc:38: Setting actor as asyncio with max_concurrency=1000, and defined concurrency groups are:

[2023-02-01 19:10:28,869 I 27136 27163] core_worker.cc:510: Event stats:


Global stats: 882 total (8 active)
Queueing time: mean = 160.545 us, max = 2.805 ms, min = 22.485 us, total = 141.601 ms
Execution time:  mean = 46.259 us, total = 40.801 ms
Event stats:
        UNKNOWN - 734 total (6 active, 1 running), CPU time: mean = 43.714 us, total = 32.086 ms
        CoreWorker.deadline_timer.flush_profiling_events - 60 total (1 active), CPU time: mean = 28.091 us, total = 1.685 ms
        NodeManagerService.grpc_client.ReportWorkerBacklog - 60 total (0 active), CPU time: mean = 38.474 us, total = 2.308 ms
        CoreWorkerService.grpc_server.GetCoreWorkerStats - 12 total (0 active), CPU time: mean = 178.150 us, total = 2.138 ms
        PeriodicalRunner.RunFnPeriodically - 7 total (0 active), CPU time: mean = 173.018 us, total = 1.211 ms
        CoreWorkerService.grpc_server.PushTask - 3 total (0 active), CPU time: mean = 191.626 us, total = 574.879 us
        StatsGcsService.grpc_client.AddProfileData - 2 total (0 active), CPU time: mean = 105.188 us, total = 210.375 us
        InternalPubSubGcsService.grpc_client.GcsSubscriberPoll - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s
        InternalPubSubGcsService.grpc_client.GcsSubscriberCommandBatch - 1 total (0 active), CPU time: mean = 265.986 us, total = 265.986 us
        WorkerInfoGcsService.grpc_client.AddWorkerInfo - 1 total (0 active), CPU time: mean = 126.465 us, total = 126.465 us
        NodeInfoGcsService.grpc_client.GetAllNodeInfo - 1 total (0 active), CPU time: mean = 194.388 us, total = 194.388 us

Loosen protobuf version

RayFed restricts protobuf>=3.9.2,<3.20, but without any solid reason, we should loosen it for better compatibility.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.