Giter Club home page Giter Club logo

swift-cluster-membership's Introduction

Swift Cluster Membership

This library aims to help Swift make ground in a new space: clustered multi-node distributed systems.

With this library we provide reusable runtime agnostic membership protocol implementations which can be adopted in various clustering use-cases.

Background

Cluster membership protocols are a crucial building block for distributed systems, such as computation intensive clusters, schedulers, databases, key-value stores and more. With the announcement of this package, we aim to make building such systems simpler, as they no longer need to rely on external services to handle service membership for them. We would also like to invite the community to collaborate on and develop additional membership protocols.

At their core, membership protocols need to provide an answer for the question "Who are my (live) peers?". This seemingly simple task turns out to be not so simple at all in a distributed system where delayed or lost messages, network partitions, and unresponsive but still "alive" nodes are the daily bread and butter. Providing a predictable, reliable answer to this question is what cluster membership protocols do.

There are various trade-offs one can take while implementing a membership protocol, and it continues to be an interesting area of research and continued refinement. As such, the cluster-membership package intends to focus not on a single implementation, but serve as a collaboration space for various distributed algorithms in this space.

🏊🏾‍♀️🏊🏻‍♀️🏊🏾‍♂️🏊🏼‍♂️ SWIMming with Swift

High-level Protocol Description

For a more in-depth discussion of the protocol and modifications in this implementation we suggest reading the SWIM API Documentation, as well as the associated papers linked below.

The Scalable Weakly-consistent Infection-style process group Membership algorithm (also known as "SWIM"), along with a few notable protocol extensions as documented in the 2018 Lifeguard: Local Health Awareness for More Accurate Failure Detection paper.

SWIM is a gossip protocol in which peers periodically exchange bits of information about their observations of other nodes’ statuses, eventually spreading the information to all other members in a cluster. This category of distributed algorithms are very resilient against arbitrary message loss, network partitions and similar issues.

At a high level, SWIM works like this:

  • A member periodically pings a "randomly" selected peer it is aware of. It does so by sending a .ping message to that peer, expecting an .ack to be sent back. See how A probes B initially in the diagram below.
    • The exchanged messages also carry a gossip payload, which is (partial) information about what other peers the sender of the message is aware of, along with their membership status (.alive, .suspect, etc.)
  • If it receives an .ack, the peer is considered still .alive. Otherwise, the target peer might have terminated/crashed or is unresponsive for other reasons.
    • In order to double check if the peer really is dead, the origin asks a few other peers about the state of the unresponsive peer by sending .pingRequest messages to a configured number of other peers, which then issue direct pings to that peer (probing peer E in the diagram below).
  • If those pings fail, due to lack of .acks resulting in the peer being marked as .suspect,
    • Our protocol implementation will also use additional .nack ("negative acknowledgement") messages in the situation to inform the ping request origin that the intermediary did receive those .pingRequest messages, however the target seems to not have responded. We use this information to adjust a Local Health Multiplier, which affects how timeouts are calculated. To learn more about this refer to the API docs and the Lifeguard paper.

SWIM: Messages Examples

The above mechanism, serves not only as a failure detection mechanism, but also as a gossip mechanism, which carries information about known members of the cluster. This way members eventually learn about the status of their peers, even without having them all listed upfront. It is worth pointing out however that this membership view is weakly-consistent, which means there is no guarantee (or way to know, without additional information) if all members have the same exact view on the membership at any given point in time. However, it is an excellent building block for higher-level tools and systems to build their stronger guarantees on top.

Once the failure detection mechanism detects an unresponsive node, it eventually is marked as .dead resulting in its irrevocable removal from the cluster. Our implementation offers an optional extension, adding an .unreachable state to the possible states, however most users will not find it necessary and it is disabled by default. For details and rules rules about legal status transitions refer to SWIM.Status or the following diagram:

SWIM: Lifecycle Diagram

The way Swift Cluster Membership implements protocols, is by offering "Instances" of them. For example, the SWIM implementation is encapsulated in the runtime agnostic SWIM.Instance which needs to be “driven” or “interpreted” by some glue code between a networking runtime and the instance itself. We call those glue pieces of an implementation "Shells", and the library ships with a SWIMNIOShell implemented using SwiftNIO’s DatagramChannel that performs all messaging asynchronously over UDP. Alternative implementations can use completely different transports, or piggy back SWIM messages on some other existing gossip system etc.

The SWIM instance also has built-in support for emitting metrics (using swift-metrics) and can be configured to log details about internal details by passing a swift-log Logger.

Example: Reusing the SWIM protocol logic implementation

The primary purpose of this library is to share the SWIM.Instance implementation across various implementations which need some form of in-process membership service. Implementing a custom runtime is documented in depth in the project’s README (https://github.com/apple/swift-cluster-membership/), so please have a look there if you are interested in implementing SWIM over some different transport.

Implementing a new transport boils down a “fill in the blanks” exercise:

First, one has to implement the Peer protocols (https://github.com/apple/swift-cluster-membership/blob/main/Sources/SWIM/Peer.swift) using one’s target transport:

/// SWIM peer which can be initiated contact with, by sending ping or ping request messages.
public protocol SWIMPeer: SWIMAddressablePeer {
    /// Perform a probe of this peer by sending a `ping` message.
    /// 
    /// <... more docs here - please refer to the API docs for the latest version ...>
    func ping(
        payload: SWIM.GossipPayload,
        from origin: SWIMPingOriginPeer,
        timeout: DispatchTimeInterval,
        sequenceNumber: SWIM.SequenceNumber
    ) async throws -> SWIM.PingResponse
    
    // ... 
}

Which usually means wrapping some connection, channel, or other identity with the ability to send messages and invoke the appropriate callbacks when applicable.

Then, on the receiving end of a peer, one has to implement receiving those messages and invoke all the corresponding on<SomeMessage>(...) callbacks defined on the SWIM.Instance (grouped under SWIMProtocol).

A piece of the SWIMProtocol is listed below to give you an idea about it:

public protocol SWIMProtocol {

    /// MUST be invoked periodically, in intervals of `self.swim.dynamicLHMProtocolInterval`.
    ///
    /// MUST NOT be scheduled using a "repeated" task/timer", as the interval is dynamic and may change as the algorithm proceeds.
    /// Implementations should schedule each next tick by handling the returned directive's `scheduleNextTick` case,
    /// which includes the appropriate delay to use for the next protocol tick.
    ///
    /// This is the heart of the protocol, as each tick corresponds to a "protocol period" in which:
    /// - suspect members are checked if they're overdue and should become `.unreachable` or `.dead`,
    /// - decisions are made to `.ping` a random peer for fault detection,
    /// - and some internal house keeping is performed.
    ///
    /// Note: This means that effectively all decisions are made in interval sof protocol periods.
    /// It would be possible to have a secondary periodic or more ad-hoc interval to speed up
    /// some operations, however this is currently not implemented and the protocol follows the fairly
    /// standard mode of simply carrying payloads in periodic ping messages.
    ///
    /// - Returns: `SWIM.Instance.PeriodicPingTickDirective` which must be interpreted by a shell implementation
    mutating func onPeriodicPingTick() -> [SWIM.Instance.PeriodicPingTickDirective]

    mutating func onPing( ... ) -> [SWIM.Instance.PingDirective]

    mutating func onPingRequest( ... ) -> [SWIM.Instance.PingRequestDirective]

    mutating func onPingResponse( ... ) -> [SWIM.Instance.PingResponseDirective]

    // ... 
}

These calls perform all SWIM protocol specific tasks internally, and return directives which are simple to interpret “commands” to an implementation about how it should react to the message. For example, upon receiving a .pingRequest message, the returned directive may instruct a shell to send a ping to some nodes. The directive prepares all apropriate target, timeout and additional information that makes it simpler to simply follow its instruction and implement the call correctly, e.g. like this:

self.swim.onPingRequest(
    target: target,
    pingRequestOrigin: pingRequestOrigin,            
    payload: payload,
    sequenceNumber: sequenceNumber
).forEach { directive in
    switch directive {
    case .gossipProcessed(let gossipDirective):
        self.handleGossipPayloadProcessedDirective(gossipDirective)

    case .sendPing(let target, let payload, let pingRequestOriginPeer, let pingRequestSequenceNumber, let timeout, let sequenceNumber):
        self.sendPing(
            to: target,
            payload: payload,
            pingRequestOrigin: pingRequestOriginPeer,
            pingRequestSequenceNumber: pingRequestSequenceNumber,
            timeout: timeout,
            sequenceNumber: sequenceNumber
        )
    }
}

In general this allows for all the tricky "what to do when" to be encapsulated within the protocol instance, and a Shell only has to follow instructions implementing them. The actual implementations will often need to perform some more involved concurrency and networking thasks, like awaiting for a sequence of responses, and handling them in a specific way etc, however the general outline of the protocol is orchestrated by the instance's directives.

For detailed documentation about each of the callbacks, when to invoke them, and how all this fits together, please refer to the API Documentation.

Example: SWIMming with Swift NIO

The repository contains an end-to-end example and an example implementation called SWIMNIOExample which makes use of the SWIM.Instance to enable a simple UDP based peer monitoring system. This allows peers to gossip and notify each other about node failures using the SWIM protocol by sending datagrams driven by SwiftNIO.

📘 The SWIMNIOExample implementation is offered only as an example, and has not been implemented with production use in mind, however with some amount of effort it could definitely do well for some use-cases. If you are interested in learning more about cluster membership algorithms, scalability benchmarking and using SwiftNIO itself, this is a great module to get your feet wet, and perhaps once the module is mature enough we could consider making it not only an example, but a reusable component for Swift NIO based clustered applications.

In it’s simplest form, combining the provided SWIM instance and NIO shell to build a simple server, one can embedd the provided handlers like shown below, in a typical NIO channel pipeline:

let bootstrap = DatagramBootstrap(group: group)
    .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
    .channelInitializer { channel in
        channel.pipeline
            // first install the SWIM handler, which contains the SWIMNIOShell:
            .addHandler(SWIMNIOHandler(settings: settings)).flatMap {
                // then install some user handler, it will receive SWIM events:
                channel.pipeline.addHandler(SWIMNIOExampleHandler())
        }
    }

bootstrap.bind(host: host, port: port)

The example handler can then receive and handle SWIM cluster membership change events:

final class SWIMNIOExampleHandler: ChannelInboundHandler {
    public typealias InboundIn = SWIM.MemberStatusChangedEvent
    
    let log = Logger(label: "SWIMNIOExampleHandler")
    
    public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let change: SWIM.MemberStatusChangedEvent = self.unwrapInboundIn(data)

        self.log.info("Membership status changed: [\(change.member.node)] is now [\(change.status)]", metadata: [    
            "swim/member": "\(change.member.node)",
            "swim/member/status": "\(change.status)",
        ])
    }
}

If you are interested in contributing and polishing up the SWIMNIO implementation please head over to the issues and pick up a task or propose an improvement yourself!

Additional Membership Protocol Implementations

We are generally interested in fostering discussions and implementations of additional membership implementations using a similar "Instance" style.

If you are interested in such algorithms, and have a favourite protocol that you'd like to see implemented, please do not hesitate to reach out heve via issues or the Swift forums.

Contributing

Experience reports, feedback, improvement ideas and contributions are greatly encouraged! We look forward to hear from you.

Please refer to CONTRIBUTING guide to learn about the process of submitting pull requests, and refer to the HANDBOOK for terminology and other useful tips for working with this library.

swift-cluster-membership's People

Contributors

avolokhov avatar drexin avatar dustinnewman avatar heckj avatar ktoso avatar tomerd avatar yim-lee avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

swift-cluster-membership's Issues

timeout for a ping initiated by `pingReq` should be shorter than initiator's ping timeout

Reported by @avolokhov

In current protocol we're issuing indirect ping requests when initial ping fails.

For this indirect pings we use a timeout calculated with static timeout * LHA multiplier on ping issuer node.
Indirect pings are using a timeout calculated with a same formula but on the indirect pinger nodes. We expect indirect pinger nodes to send nack if their ping attempt fails. In reality, if LHA multiplier is the same or higher on indirect pinger node, a nack will be issued only after pingReq timeout on initial pinger node expires, which makes LHA multiplier effectively useless.
In order to fix it we can't just apply a shortening multiplier on indirect prober node, because LHA multiplier on that node can be higher than on initial pinger node.

Original: see footnote [5] on page 5 of lifeguard paper: https://arxiv.org/pdf/1707.00788.pdf


Memberlist implementation has scaled probe timeouts for direct ping ack/nack:
https://github.com/hashicorp/memberlist/blob/c192837f8fd6d494ac641880d1356804b21503a3/state.go#L305

and unscaled for indirect ping requests:
https://github.com/hashicorp/memberlist/blob/c192837f8fd6d494ac641880d1356804b21503a3/net.go#L584

We may either implement something similar, or go with a solution suggested in a paper and further scale down pingReq timeout to 80% from initial ping timeout.

Use Swift 5.2.5 in 5.2 docker because Codable fixes

We use top level encoders in tests and it would be nice to use the 5.2.5 swift in docker so we don't have to reintroduce the workarounds for 2.5.4.

CI is on 5.2.5 so that's good -- just about the

docker-compose -f docker/docker-compose.yaml -f docker/docker-compose.1804.53.yaml run test to also pass.

5.3 is fine.

SWIMNIO: Small cleanup: Move callback handling into Shell from Handler

Currently we manage the callbacks in the SWIMNIOHandler, they're keyed by sequencenumbers which are invoked when we get "replies" since of course there's no such thing as a "reply" in UDP itself so we emulate them this way.

This could sit in the Shell rather than in the Handler which should only do serialization and message dispatch really.

// TODO: move callbacks into the shell?
    struct PendingResponseCallbackIdentifier: Hashable {
        let peerAddress: SocketAddress // FIXME: UID as well...?
        let sequenceNumber: SWIM.SequenceNumber
    }

Small ticket to be picked up by someone interested I think

Offer explicit API on SWIM.Instance for "start monitoring this node"

Expected behavior

Shells today kind of work around this and implement it manually, each in its own slightly different way.

E.g. SWIMNIOShell does:

        let sequenceNumber = self.swim.nextSequenceNumber()
        targetPeer.ping(payload: self.swim.makeGossipPayload(to: nil), from: self.peer, timeout: .seconds(1), sequenceNumber: sequenceNumber) { (result: Result<SWIM.PingResponse, Error>) in

which is a bit hacky, since we have to reach into the instance more than we really should, both making a sequence number and a gossip explicitly should not be needed.

Alternatively, this can be solved via swim.addMember and just blindly adding it there, so swim starts monitoring it.

This honestly is probably just fine, but I was wondering if we should just expose the addMember or not, as in some impls where nodes do not have known UID before this adds more complexity than kicking off a ping which will return with the UID.

Actual behavior

Some explicit call to be made on the isntance which tells us how to deal with "start monitoring that node", including retries perhaps etc?

Version/commit hash

0.1.0

SwiftUI Visualization App

It would be fantastic if someone with better SwiftUI skillz than myself could help contribute a SwiftUI based visualization for those protocols, initially SWIM. It'd be a fantastic learning resource, very useful to explain how the dynamics of the protocol work etc.

This also shows how Swift is uniquely positioned to make such small helper / visualization apps for projects one is working on, which is kind of unique among other dist systems languages.

We'd want a visualization similar to what ShiViz is able to do:
Screen Shot 2020-08-25 at 11 14 20

https://bestchai.bitbucket.io/shiviz/


So we'd want a "timeline" for each node, and lines between nodes when they perform sends to eachother.
The lines should represent message sends, and dots the messages;

We can store the membership on every such call as well, and thanks to that we could on mouse over on a given node see what currently is the membership state on it.

This would likely want to use swift-tracing I suppose, though in reality the sequenceNumber is enough to perform the correlation, especially in SWIM where the messages are very simple and well known.

It would be great if we could build this as some form of visualizer.emit(message) since then we can implement it as an Instrument (from swift tracing) (!) most likely, and running with the visualizer would be the same as running any normal app but with the visualizer "enabled" (i.e. the VisualizerInstrument bootstrapped).

Happy to discuss ideas!

Other membership implementations of interest

No commitment on actually working on them, but here's a few membership algorithms I'd be interested to explore some time:

  • HyParView, it is interesting because of it's partial view property; it used by quite a few systems, including e.g. partisan
  • Rapid could be quite interesting to look into as well, I know @manuelbernhardt has been exploring this a little bit in the past :)

Bring back embedded SWIMNIO tests, which are had without async-aware "Embedded" infra

The tests were:

They were pretty rough to get correct with dancing around with async and embedded... so for now dropping them, we have real clustered tests, and the SWIM.Instance is heavily tested by swift-distributed-actors -- the SWIMNIO impl always was just an example, so the clustered tests are good enough here for now

import ClusterMembership
import struct Dispatch.DispatchTime
import enum Dispatch.DispatchTimeInterval
import NIO
@testable import SWIM
@testable import SWIMNIOExample
import XCTest

final class SWIMNIOEmbeddedTests: EmbeddedClusteredXCTestCase {
    let firstNode = Node(protocol: "test", host: "127.0.0.1", port: 7001, uid: 1111)
    let secondNode = Node(protocol: "test", host: "127.0.0.1", port: 7002, uid: 1111)

    func test_embedded_schedulingPeriodicTicksWorks() async throws {
        let first = self.makeEmbeddedShell("first") { settings in
            settings.swim.initialContactPoints = [secondNode]
            settings.swim.node = firstNode
        }
        let second = self.makeEmbeddedShell("second") { settings in
            settings.swim.initialContactPoints = []
            settings.swim.node = secondNode
        }

        var unfulfilledCallbacks = UnfulfilledNIOPeerCallbacks()

        for _ in 0 ... 5 {
            self.loop.advanceTime(by: .seconds(1))
            await self.exchangeMessages(first, second, unfulfilledCallbacks: &unfulfilledCallbacks)
        }

        unfulfilledCallbacks.complete()

        XCTAssertEqual(first.swim.allMemberCount, 2)
        XCTAssertEqual(second.swim.allMemberCount, 2)
    }

    func test_embedded_suspicionsBecomeDeadNodesAfterTime() async throws {
        let first = self.makeEmbeddedShell("first") { settings in
            settings.swim.initialContactPoints = [secondNode]
            settings.swim.node = firstNode
        }
        let second = self.makeEmbeddedShell("second") { settings in
            settings.swim.initialContactPoints = []
            settings.swim.node = secondNode
        }

        var unfulfilledCallbacks = UnfulfilledNIOPeerCallbacks()

        for _ in 0 ... 5 {
            self.loop.advanceTime(by: .seconds(1))
            await self.exchangeMessages(first, second, unfulfilledCallbacks: &unfulfilledCallbacks)
        }

        unfulfilledCallbacks.complete()

        XCTAssertEqual(first.swim.allMemberCount, 2)
        XCTAssertEqual(second.swim.allMemberCount, 2)

        // --- simulate cluster partition ---
        var foundSuspects = false
        var rounds = 1
        while !foundSuspects {
            self.loop.advanceTime(by: .seconds(1))
            await self.timeoutPings(first, second)
            // the nodes can't send each other messages, and thus eventually emit dead warnings
            foundSuspects = first.swim.suspects.count == 1 && second.swim.suspects.count == 1
            rounds += 1
        }

        print("  Becoming suspicious of each other after a cluster partition took: \(rounds) rounds")
    }

    func test_embedded_handleMissedNacks_whenTimingOut() async throws {
        let thirdNode = Node(protocol: "test", host: "127.0.0.1", port: 7003, uid: 1111)
        let unreachableNode = Node(protocol: "test", host: "127.0.0.1", port: 7004, uid: 1111)

        let first = self.makeEmbeddedShell("first") { settings in
            settings.swim.node = firstNode
            // FIXME: EmbeddedChannel is not thread-safe
            // Don't set contact points to prevent initial pings from getting sent. We will send them ourselves below.
            settings.swim.initialContactPoints = [] // [secondNode, thirdNode, unreachableNode]
            settings._startPeriodicPingTimer = false
            settings.swim.lifeguard.maxLocalHealthMultiplier = 8
            settings.swim.unreachability = .enabled
        }

        let second = self.makeEmbeddedShell("second") { settings in
            settings.swim.node = secondNode
            settings.swim.initialContactPoints = []
            settings._startPeriodicPingTimer = false
            settings.swim.unreachability = .enabled
        }

        let third = self.makeEmbeddedShell("third") { settings in
            settings.swim.node = thirdNode
            settings.swim.initialContactPoints = []
            settings._startPeriodicPingTimer = false
            settings.swim.unreachability = .enabled
        }

        let unreachable = self.makeEmbeddedShell("unreachable") { settings in
            settings.swim.node = unreachableNode
            settings.swim.initialContactPoints = []
            settings._startPeriodicPingTimer = false
            settings.swim.unreachability = .enabled
        }

        // Allow initial message passing to let first node recognize peers as SWIMMembers
        await self.pingAndResponse(origin: first, target: second, sequenceNumber: 1)
        await self.pingAndResponse(origin: first, target: third, sequenceNumber: 2)
        await self.pingAndResponse(origin: first, target: unreachable, sequenceNumber: 3)

        try await self.assertLocalHealthMultiplier(first, expected: 0)

        // FIXME: is this test flow correct?

        self.sendPing(origin: first, target: unreachable, payload: .none, pingRequestOrigin: nil, pingRequestSequenceNumber: nil, sequenceNumber: 4)
        // push .ping; the .timeout ping response would trigger .pingRequest
        await self.timeoutPings(first, unreachable)
        try await self.assertLocalHealthMultiplier(first, expected: 1)

        self.sendPing(origin: first, target: unreachable, payload: .none, pingRequestOrigin: first.peer, pingRequestSequenceNumber: 1, sequenceNumber: 5)
        // miss a nack
        await self.timeoutPings(first, unreachable, pingRequestOrigin: first.peer, pingRequestSequenceNumber: 1)
        try await self.assertLocalHealthMultiplier(first, expected: 1)

        // .pingRequest sent to second and third (random order)
        let (_, done2) = await self.timeoutPings(first, second) // push .pingRequest through
        if (done2) {
            try await self.assertLocalHealthMultiplier(first, expected: 2)
        }

        _ = await self.timeoutPings(first, third) // push .pingRequest through
        try await self.assertLocalHealthMultiplier(first, expected: done2 ? 3 : 2)

        // We don't know in which order the .pingRequests are sent, so in case third receives before second, check second again
        if !done2 {
            _ = await self.timeoutPings(first, second) // push .pingRequest through
            try await self.assertLocalHealthMultiplier(first, expected: 3)
        }
    }

    func test_embedded_handleNacks_whenPingTimeout() async throws {
        let thirdNode = Node(protocol: "test", host: "127.0.0.1", port: 7003, uid: 1111)
        let unreachableNode = Node(protocol: "test", host: "127.0.0.1", port: 7004, uid: 1111)

        let first = self.makeEmbeddedShell("first") { settings in
            // FIXME: EmbeddedChannel is not thread-safe
            // Don't set contact points to prevent initial pings from getting sent. We will send them ourselves below.
            settings.swim.initialContactPoints = [] // [secondNode, thirdNode, unreachableNode]
            settings._startPeriodicPingTimer = false
            settings.swim.lifeguard.maxLocalHealthMultiplier = 8
            settings.swim.node = firstNode
            settings.swim.unreachability = .enabled
        }

        let second = self.makeEmbeddedShell("second") { settings in
            settings.swim.initialContactPoints = []
            settings._startPeriodicPingTimer = false
            settings.swim.node = secondNode
        }

        let third = self.makeEmbeddedShell("third") { settings in
            settings.swim.initialContactPoints = []
            settings._startPeriodicPingTimer = false
            settings.swim.node = thirdNode
        }

        let unreachable = self.makeEmbeddedShell("unreachable") { settings in
            settings.swim.initialContactPoints = []
            settings._startPeriodicPingTimer = false
            settings.swim.node = unreachableNode
        }

        // Allow initial message passing to let first node recognize peers as SWIMMembers
        await self.pingAndResponse(origin: first, target: second, sequenceNumber: 1)
        await self.pingAndResponse(origin: first, target: third, sequenceNumber: 2)
        await self.pingAndResponse(origin: first, target: unreachable, sequenceNumber: 3)

        try await self.assertLocalHealthMultiplier(first, expected: 0)

        // FIXME: is this test flow correct?

        self.sendPing(origin: first, target: unreachable, payload: .none, pingRequestOrigin: first.peer, pingRequestSequenceNumber: 1, sequenceNumber: 4)
        // push .ping; the .timeout ping response would trigger .pingRequest
        // Non-nil pingRequestOrigin would cause nack
        await self.timeoutPings(first, unreachable, pingRequestOrigin: first.peer, pingRequestSequenceNumber: 1)
        try await self.assertLocalHealthMultiplier(first, expected: 0)

        // .pingRequest sent to second and third (random order)
        let (_, done2) = await self.timeoutPings(first, second, pingRequestOrigin: first.peer, pingRequestSequenceNumber: 1) // push .pingRequest through
        if (done2) {
            try await self.assertLocalHealthMultiplier(first, expected: 0)
        }

        _ = await self.timeoutPings(first, third, pingRequestOrigin: first.peer, pingRequestSequenceNumber: 1) // push .pingRequest through
        try await self.assertLocalHealthMultiplier(first, expected: 0)

        // We don't know in which order the .pingRequests are sent, so in case third receives before second, check second again
        if !done2 {
            _ = await self.timeoutPings(first, second) // push .pingRequest through
            try await self.assertLocalHealthMultiplier(first, expected: 0)
        }
    }

    private func assertLocalHealthMultiplier(_ node: SWIMNIOShell, expected: Int, within: DispatchTimeInterval = .milliseconds(100), line: UInt = #line) async throws {
        let deadline = DispatchTime.now() + within

        while DispatchTime.now().uptimeNanoseconds < deadline.uptimeNanoseconds {
            if node.swim.localHealthMultiplier == expected {
                return
            }
            try await Task.sleep(nanoseconds: 10_000_000)
        }

        XCTAssertEqual(node.swim.localHealthMultiplier, expected, line: line)
    }

    /// Returns unfulfilled callback after each round of exchange
    private func exchangeMessages(_ first: SWIMNIOShell, _ second: SWIMNIOShell, unfulfilledCallbacks: inout UnfulfilledNIOPeerCallbacks) async {
        let firstEmbeddedChannel = first.channel as! EmbeddedChannel
        let secondEmbeddedChannel = second.channel as! EmbeddedChannel

        let writeCommand1 = try! await firstEmbeddedChannel.readOutboundWriteCommand()
        if let writeCommand1 = writeCommand1 {
            if case .response = writeCommand1.message, let replyCallback2 = unfulfilledCallbacks.second.popFirst() {
                replyCallback2(.success(writeCommand1.message))
            } else {
                second.receiveMessage(message: writeCommand1.message)
            }

            if let replyCallback1 = writeCommand1.replyCallback {
                unfulfilledCallbacks.first.append(replyCallback1)
            }
        }

        let writeCommand2 = try! await secondEmbeddedChannel.readOutboundWriteCommand()
        if let writeCommand2 = writeCommand2 {
            if case .response = writeCommand2.message, let replyCallback1 = unfulfilledCallbacks.first.popFirst() {
                replyCallback1(.success(writeCommand2.message))
            } else {
                first.receiveMessage(message: writeCommand2.message)
            }

            if let replyCallback2 = writeCommand2.replyCallback {
                unfulfilledCallbacks.second.append(replyCallback2)
            }
        }
    }

    private func sendMessage(from first: SWIMNIOShell, to second: SWIMNIOShell) async {
        let firstEmbeddedChannel = first.channel as! EmbeddedChannel

        if let writeCommand = try! await firstEmbeddedChannel.readOutboundWriteCommand() {
            second.receiveMessage(message: writeCommand.message)
        }
    }

    private func sendPing(
        origin: SWIMNIOShell,
        target: SWIMNIOShell,
        payload: SWIM.GossipPayload<SWIM.NIOPeer>,
        pingRequestOrigin: SWIM.NIOPeer?,
        pingRequestSequenceNumber: SWIM.SequenceNumber?,
        sequenceNumber: SWIM.SequenceNumber
    ) {
        Task {
            await origin.sendPing(
                to: target.peer,
                payload: payload,
                pingRequestOrigin: pingRequestOrigin,
                pingRequestSequenceNumber: pingRequestSequenceNumber,
                timeout: .milliseconds(100),
                sequenceNumber: sequenceNumber
            )
        }
    }

    private func pingAndResponse(origin: SWIMNIOShell, target: SWIMNIOShell, payload: SWIM.GossipPayload<SWIM.NIOPeer> = .none, sequenceNumber: SWIM.SequenceNumber) async {
        self.sendPing(origin: origin, target: target, payload: .none, pingRequestOrigin: nil, pingRequestSequenceNumber: nil, sequenceNumber: sequenceNumber)

        let targetEmbeddedChannel = target.channel as! EmbeddedChannel

        // origin invokes ping on target's channel, so it's target that writes and receives the command
        guard let pingCommand = try! await targetEmbeddedChannel.readOutboundWriteCommand() else {
            return XCTFail("Expected \(target) to receive ping from \(origin)")
        }
        target.receiveMessage(message: pingCommand.message)

        // target sends ping response to origin on its own channel
        guard let pingResponse = try! await targetEmbeddedChannel.readOutboundWriteCommand() else {
            return XCTFail("Expected \(target) to send ack to \(origin)")
        }
        guard let pingCallback = pingCommand.replyCallback else {
            return XCTFail("Expected ping to have callback")
        }
        pingCallback(.success(pingResponse.message))
    }

    /// Timeout pings between nodes
    @discardableResult
    private func timeoutPings(
        _ first: SWIMNIOShell,
        _ second: SWIMNIOShell,
        pingRequestOrigin: SWIM.NIOPeer? = nil,
        pingRequestSequenceNumber: SWIM.SequenceNumber? = nil
    ) async -> (Bool, Bool) {
        if pingRequestOrigin != nil && pingRequestSequenceNumber == nil ||
            pingRequestOrigin == nil && pingRequestSequenceNumber != nil {
            fatalError("either both or none pingRequest parameters must be set, was: \(String(reflecting: pingRequestOrigin)), \(String(reflecting: pingRequestSequenceNumber))")
        }

        let firstEmbeddedChannel = first.channel as! EmbeddedChannel
        let secondEmbeddedChannel = second.channel as! EmbeddedChannel

        var firstPingResponse = false
        var secondPingResponse = false

        if let writeCommand1 = try! await firstEmbeddedChannel.readOutboundWriteCommand() {
            switch writeCommand1.message {
            case .ping(_, _, let sequenceNumber), .pingRequest(_, _, _, let sequenceNumber):
                let response = SWIM.PingResponse.timeout(target: second.peer, pingRequestOrigin: pingRequestOrigin, timeout: .milliseconds(1), sequenceNumber: sequenceNumber)
                if let replyCallback = writeCommand1.replyCallback {
                    replyCallback(.success(.response(response)))
                } else {
                    first.receivePingResponse(
                        response: response,
                        pingRequestOriginPeer: pingRequestOrigin,
                        pingRequestSequenceNumber: pingRequestSequenceNumber
                    )
                }
                firstPingResponse = true
            default:
                // deliver others as usual
                second.receiveMessage(message: writeCommand1.message)
            }
        }

        if let writeCommand2 = try! await secondEmbeddedChannel.readOutboundWriteCommand() {
            switch writeCommand2.message {
            case .ping(_, _, let sequenceNumber), .pingRequest(_, _, _, let sequenceNumber):
                let response = SWIM.PingResponse.timeout(target: second.peer, pingRequestOrigin: pingRequestOrigin, timeout: .milliseconds(1), sequenceNumber: sequenceNumber)
                if let replyCallback = writeCommand2.replyCallback {
                    replyCallback(.success(.response(response)))
                } else {
                    second.receivePingResponse(
                        response: response,
                        pingRequestOriginPeer: pingRequestOrigin,
                        pingRequestSequenceNumber: pingRequestSequenceNumber
                    )
                }
                secondPingResponse = true
            default:
                // deliver others as usual
                first.receiveMessage(message: writeCommand2.message)
            }
        }

        return (firstPingResponse, secondPingResponse)
    }
}

private struct UnfulfilledNIOPeerCallbacks {
    typealias ReplyCallback = (Result<SWIM.Message, Error>) -> Void

    var first: [ReplyCallback] = []
    var second: [ReplyCallback] = []

    func complete() {
        self.first.forEach {
            $0(.failure(EmbeddedShellError.noReply))
        }
        self.second.forEach {
            $0(.failure(EmbeddedShellError.noReply))
        }
    }

    mutating func reset() {
        self.first = []
        self.second = []
    }
}

private extension Array where Element == UnfulfilledNIOPeerCallbacks.ReplyCallback {
    mutating func popFirst() -> Element? {
        guard !self.isEmpty else {
            return nil
        }
        return self.removeFirst()
    }
}

private extension EmbeddedChannel {
    func readOutboundWriteCommand(within: DispatchTimeInterval = .milliseconds(500)) async throws -> SWIMNIOWriteCommand? {
        let deadline = DispatchTime.now() + within

        while DispatchTime.now().uptimeNanoseconds < deadline.uptimeNanoseconds {
            if let writeCommand = try self.readOutbound(as: SWIMNIOWriteCommand.self) {
                return writeCommand
            }

            try await Task.sleep(nanoseconds: 10_000_000)
        }

        return nil
    }
}

private enum EmbeddedShellError: Error {
    case noReply
}

Compiling SWIMNIOExample fails (logs included)

Compiling SWIMNIOExample fails with the below log

[1/10] Compiling CNIOLinux ifaddrs-android.c
[2/10] Compiling CNIOWindows WSAStartup.c
[3/10] Compiling CNIOWindows shim.c
[4/10] Compiling CNIOLinux shim.c
[5/16] Compiling CNIODarwin shim.c
[6/16] Compiling CNIOSHA1 c_nio_sha1.c
[7/16] Compiling Logging Locks.swift
[8/17] Merging module ClusterMembership
[11/17] Compiling CoreMetrics Metrics.swift
[12/17] Compiling Logging Logging.swift
[14/19] Merging module Logging
[15/19] Merging module CoreMetrics
[16/20] Compiling c-nioatomics.c
[17/20] Compiling c-atomics.c
[18/23] Compiling NIOConcurrencyHelpers NIOAtomic.swift
[19/23] Compiling NIOConcurrencyHelpers lock.swift
[20/23] Compiling NIOConcurrencyHelpers atomics.swift
[21/24] Merging module NIOConcurrencyHelpers
[22/95] Compiling NIO AddressedEnvelope.swift
[23/96] Compiling NIO ThreadWindows.swift
[24/96] Compiling NIO TypeAssistedChannelHandler.swift
[25/96] Compiling NIO UniversalBootstrapSupport.swift
[26/96] Compiling NIO Utilities.swift
[27/96] Merging module Metrics
[28/108] Compiling NIO PriorityQueue.swift
[29/108] Compiling NIO RecvByteBufferAllocator.swift
[30/108] Compiling NIO Resolver.swift
[31/108] Compiling NIO Selectable.swift
[32/108] Compiling NIO Linux.swift
[33/108] Compiling NIO LinuxCPUSet.swift
[34/108] Compiling NIO MarkedCircularBuffer.swift
[35/108] Compiling NIO MulticastChannel.swift
[36/108] Compiling SWIM Events.swift
[37/108] Compiling SWIM Member.swift
[38/108] Compiling SWIM Metrics.swift
[39/108] Compiling SWIM Peer.swift
[40/108] Compiling SWIM SWIM.swift
[41/108] Compiling NIO IO.swift
[42/108] Compiling NIO IOData.swift
[43/108] Compiling NIO IntegerTypes.swift
[44/108] Compiling NIO Interfaces.swift
[45/108] Compiling NIO NIOAny.swift
[46/108] Compiling NIO NIOCloseOnErrorHandler.swift
[47/108] Compiling NIO NIOThreadPool.swift
[48/108] Compiling NIO NonBlockingFileIO.swift
[49/108] Compiling NIO SocketProtocols.swift
[50/108] Compiling NIO System.swift
[51/108] Compiling NIO Thread.swift
[52/108] Compiling NIO ThreadPosix.swift
[53/108] Compiling NIO FileHandle.swift
[54/108] Compiling NIO FileRegion.swift
[55/108] Compiling NIO GetaddrinfoResolver.swift
[56/108] Compiling NIO HappyEyeballs.swift
[57/108] Compiling NIO Heap.swift
[58/108] Compiling NIO PendingDatagramWritesManager.swift
[59/108] Compiling NIO PendingWritesManager.swift
[60/108] Compiling NIO PipeChannel.swift
[61/108] Compiling NIO PipePair.swift
[62/108] Compiling NIO Codec.swift
[63/108] Compiling NIO ControlMessage.swift
[64/108] Compiling NIO ConvenienceOptionSupport.swift
[65/108] Compiling NIO DatagramVectorReadManager.swift
[66/108] Compiling NIO DeadChannel.swift
[67/108] Compiling NIO Socket.swift
[68/108] Compiling NIO SocketAddresses.swift
[69/108] Compiling NIO SocketChannel.swift
[70/108] Compiling NIO SocketOptionProvider.swift
[71/108] Compiling SWIM String+Extensions.swift
[72/108] Compiling SWIM _PrettyLog.swift
[73/108] Compiling NIO ByteBuffer-views.swift
[74/108] Compiling NIO Channel.swift
[75/108] Compiling NIO ChannelHandler.swift
[76/108] Compiling NIO SelectableEventLoop.swift
[77/108] Compiling NIO Selector.swift
[78/108] Compiling NIO ServerSocket.swift
[79/108] Compiling NIO SingleStepByteToMessageDecoder.swift
[80/108] Compiling NIO ChannelHandlers.swift
[81/108] Compiling NIO ChannelInvoker.swift
[82/108] Compiling NIO ChannelOption.swift
[83/108] Compiling NIO ChannelPipeline.swift
[84/108] Compiling NIO CircularBuffer.swift
[85/108] Compiling NIO DispathQueue+WithFuture.swift
[86/108] Compiling NIO Embedded.swift
[87/108] Compiling NIO EventLoop.swift
[88/108] Compiling NIO EventLoopFuture.swift
[89/108] Compiling NIO FileDescriptor.swift
[90/108] Compiling SWIM SWIMInstance.swift
[91/108] Compiling SWIM Settings.swift
[92/108] Compiling SWIM Status.swift
[93/108] Compiling SWIM Dispatch+Extensions.swift
[94/108] Compiling SWIM Heap.swift
[95/109] Merging module NIO
[96/126] Compiling NIOExtras JSONRPCFraming.swift
[97/126] Compiling NIOExtras FixedLengthFrameDecoder.swift
[98/126] Compiling NIOExtras NIOExtrasError.swift
[99/126] Compiling NIOExtras DebugInboundEventsHandler.swift
[100/126] Compiling NIOExtras DebugOutboundEventsHandler.swift
[101/126] Compiling NIOExtras RequestResponseHandler.swift
[102/126] Compiling NIOExtras QuiescingHelper.swift
[103/126] Compiling NIOExtras LineBasedFrameDecoder.swift
[104/126] Compiling NIOExtras PCAPRingBuffer.swift
[105/126] Compiling NIOExtras LengthFieldBasedFrameDecoder.swift
[106/126] Compiling NIOExtras LengthFieldPrepender.swift
[107/126] Compiling NIOExtras JSONRPCFraming+ContentLengthHeader.swift
[108/126] Compiling NIOExtras WritePCAPHandler.swift
[110/127] Merging module NIOExtras
[123/128] Merging module NIOFoundationCompat
[125/129] Merging module SWIM
[126/141] Compiling SWIMNIOExample Coding.swift
[127/142] Compiling SWIMNIOExample Message.swift
[128/142] Compiling SWIMNIOExample Logging.swift
[129/142] Compiling SWIMNIOExample Settings.swift
[130/142] Compiling SWIMNIOExample Metrics+Extensions.swift
[131/142] Compiling SWIMNIOExample String+Extensions.swift
[132/142] Merging module it_Clustered_swim_suspension_reachability
[133/142] Compiling SWIMNIOExample SWIMNIOShell.swift
/private/tmp/.nio-release-tools_YZyUPi/swift-cluster-membership/Sources/SWIMNIOExample/SWIMNIOShell.swift:398:45: error: ambiguous use of 'recordInterval'
self.swim.metrics.shell.pingRequestResponseTimeAll.recordInterval(since: pingRequestSentAt)
^
/private/tmp/.nio-release-tools_YZyUPi/swift-cluster-membership/Sources/SWIMNIOExample/Metrics+Extensions.swift:21:10: note: found this candidate
func recordInterval(since: DispatchTime, now: DispatchTime = .now()) {
^
/private/tmp/.nio-release-tools_YZyUPi/swift-cluster-membership/.build/checkouts/swift-metrics/Sources/Metrics/Metrics.swift:42:10: note: found this candidate
func recordInterval(since: DispatchTime, end: DispatchTime = .now()) {
^
/private/tmp/.nio-release-tools_YZyUPi/swift-cluster-membership/Sources/SWIMNIOExample/SWIMNIOShell.swift:426:41: error: ambiguous use of 'recordInterval'
self.swim.metrics.shell.pingRequestResponseTimeFirst.recordInterval(since: startedSendingPingRequestsSentAt)
^
/private/tmp/.nio-release-tools_YZyUPi/swift-cluster-membership/Sources/SWIMNIOExample/Metrics+Extensions.swift:21:10: note: found this candidate
func recordInterval(since: DispatchTime, now: DispatchTime = .now()) {
^
/private/tmp/.nio-release-tools_YZyUPi/swift-cluster-membership/.build/checkouts/swift-metrics/Sources/Metrics/Metrics.swift:42:10: note: found this candidate
func recordInterval(since: DispatchTime, end: DispatchTime = .now()) {
^
[134/142] Compiling SWIMNIOExample NIOPeer.swift
[135/142] Linking it_Clustered_swim_suspension_reachability
[136/142] Compiling SWIMNIOExample time.swift
[138/142] Compiling SWIMNIOExample SWIMNIOHandler.swift

Cleanup: pingRequestOrigin+pingRequestSequenceNumber as PingRequestOrigin single param?

We very often have to pass around pingRequestOrigin: SWIMPingRequestOriginPeer?, pingRequestSequenceNumber: SWIM.SequenceNumber? and really they are only passed together and both should be set.

This should be changed into

public struct SWIMPingRequestOrigin { 
  let peer: SWIMPingRequestOriginPeer
  let pingRequestSequenceNumber: SWIM.SequenceNumber

and we should change all APIs to use:

-    public func onPingResponse(response: SWIM.PingResponse, pingRequestOrigin: SWIMPingRequestOriginPeer?, pingRequestSequenceNumber: SWIM.SequenceNumber?) -> [PingResponseDirective] {
+    public func onPingResponse(response: SWIM.PingResponse, pingRequest: SWIMPingRequestOrigin?) -> [PingResponseDirective] {

is nack handling is wrong in the swimnio impl?

we must be able to accept a "reply" nack followed by a nack, i.e. a nack should not remove the callback from the handler which keeps them.

Something like:

                if message.isNack,  let callback = self.pendingReplyCallbacks[callbackKey] {
                    callback(.success(message))
                } else if let callback = self.pendingReplyCallbacks.removeValue(forKey: callbackKey) {
                    // TODO: UIDs of nodes matter
                    callback(.success(message))

should do as a simple solution; we'll still get a timeout or ack which will definitely remove the callback later so we can keep them like so

complation fails

swift test --enable-test-discovery

in docker gives me

/private/tmp/.nio-release-tools_3bMiDW/swift-cluster-membership/.build/x86_64-apple-macosx/debug/SWIM.build/module.modulemap:2:12: error: header '/private/tmp/.nio-release-tools_3bMiDW/swift-cluster-membership/.build/x86_64-apple-macosx/debug/SWIM.build/SWIM-Swift.h' not found
    header "/private/tmp/.nio-release-tools_3bMiDW/swift-cluster-membership/.build/x86_64-apple-macosx/debug/SWIM.build/SWIM-Swift.h"
           ^
/private/tmp/.nio-release-tools_3bMiDW/swift-cluster-membership/Tests/SWIMTestKit/TestMetrics.swift:32:18: error: could not build Objective-C module 'SWIM'
@testable import SWIM
                 ^
/private/tmp/.nio-release-tools_3bMiDW/swift-cluster-membership/.build/x86_64-apple-macosx/debug/SWIM.build/module.modulemap:2:12: error: header '/private/tmp/.nio-release-tools_3bMiDW/swift-cluster-membership/.build/x86_64-apple-macosx/debug/SWIM.build/SWIM-Swift.h' not found
    header "/private/tmp/.nio-release-tools_3bMiDW/swift-cluster-membership/.build/x86_64-apple-macosx/debug/SWIM.build/SWIM-Swift.h"
           ^
/private/tmp/.nio-release-tools_3bMiDW/swift-cluster-membership/Tests/SWIMTestKit/TestMetrics.swift:32:18: error: could not build Objective-C module 'SWIM'
@testable import SWIM
                 ^

Reduce the namespacing nesting in SWIM?

We're nesting a bit too much, and types end up looking like:

SWIM.SWIM.Member(..., status: SWIM.SWIM.Status.unreachable(incarnation: 0), protocolPeriod: 1, suspicionStartedAt: 

we can skip the SWIM namespace enum I believe.

SWIM: once a node is dead, set a TTL for the tombstone and remove it later

We must schedule a periodic cleanup of the dead members otherwise the list would grow indefinitely

Dead's we remove today, it's just unreachables that stay around.

We may want to allow "keep dead nodes as tombstones".

It depends on the runtime if it needs this feature or not, SWIMNIO would need it for example to be a serious impl.

main doesn't compile on Xcode beta 5

Expected behavior

Latest main compiles and runs tests fine with latest Xcode 14 Seed.

Actual behavior

Building for debugging...
/Users/davidnadoba/Repositories/swift-cluster-membership/.build/arm64-apple-macosx/debug/SWIM.build/module.modulemap:2:12: error: header '/Users/davidnadoba/Repositories/swift-cluster-membership/.build/arm64-apple-macosx/debug/SWIM.build/SWIM-Swift.h' not found
    header "/Users/davidnadoba/Repositories/swift-cluster-membership/.build/arm64-apple-macosx/debug/SWIM.build/SWIM-Swift.h"
           ^
/Users/davidnadoba/Repositories/swift-cluster-membership/Tests/SWIMTestKit/TestMetrics.swift:32:18: error: could not build Objective-C module 'SWIM'
@testable import SWIM
                 ^
/Users/davidnadoba/Repositories/swift-cluster-membership/.build/arm64-apple-macosx/debug/SWIM.build/module.modulemap:2:12: error: header '/Users/davidnadoba/Repositories/swift-cluster-membership/.build/arm64-apple-macosx/debug/SWIM.build/SWIM-Swift.h' not found
    header "/Users/davidnadoba/Repositories/swift-cluster-membership/.build/arm64-apple-macosx/debug/SWIM.build/SWIM-Swift.h"
           ^
/Users/davidnadoba/Repositories/swift-cluster-membership/Tests/SWIMTestKit/TestMetrics.swift:32:18: error: could not build Objective-C module 'SWIM'
@testable import SWIM
                 ^
/Users/davidnadoba/Repositories/swift-cluster-membership/.build/arm64-apple-macosx/debug/SWIM.build/module.modulemap:2:12: error: header '/Users/davidnadoba/Repositories/swift-cluster-membership/.build/arm64-apple-macosx/debug/SWIM.build/SWIM-Swift.h' not found
    header "/Users/davidnadoba/Repositories/swift-cluster-membership/.build/arm64-apple-macosx/debug/SWIM.build/SWIM-Swift.h"
           ^
/Users/davidnadoba/Repositories/swift-cluster-membership/Tests/SWIMTestKit/TestMetrics.swift:32:18: error: could not build Objective-C module 'SWIM'
@testable import SWIM
                 ^

error: fatalError

Steps to reproduce

git clone [email protected]:apple/swift-cluster-membership.git
cd swift-cluster-membership
swift test

Version/commit hash

latest main
68b2da6afe4491fca3be3f1265a1ee2697f33ff3

Swift & OS version (output of swift --version && uname -a)

Xcode: Version 14.0 beta 5 (14A5294e)

swift-driver version: 1.62.3 Apple Swift version 5.7 (swiftlang-5.7.0.123.8 clang-1400.0.29.50)
Target: arm64-apple-macosx13.0
Darwin Davids-MBP.fritz.box 22.0.0 Darwin Kernel Version 22.0.0: Wed Jul 20 01:53:07 PDT 2022; root:xnu-8792.0.188.141.3~2/DEVELOPMENT_ARM64_T6000 arm64

More efficient efficient coding for SWIMNIO payloads

In the SWIMNIO example implementation we use Codable and its JSON implementation. This is sub-optimal for gossip systems which are periodically always on in the background.

Re using the codable types but implementing an efficient binary serialization format would be preferable.
It could well just be a hand woven implementation, since the payloads and messages are very simple and it's just an example app.

This matters if we'd like to compare bytes-per-message and bytes per second/minute of operation usage with other implementations, which is an important measure in general for gossip systems.

Handle node "replacements"

Expected behavior

When a node dies and immediately restarts we may need to perform a replacement i.e. treat the new one as alive and kick the old one as dead.

Actual behavior

Both can linger around

Document all directives and callbacks well enough

By well enough we mean "people should be able to know how to implement this and what it does" without having to read the paper in depth.

The swim instance guides people to do the right thing, and it should explain when to call it and what it does

De-prioritize pinging unreachable nodes (when using that mode)

We have "unreachable" status in addition to dead (if enabled).

Unreachable is used to keep trying even though it should be dead and waiting for another system confirm that yes we should confirm it dead.

That keep trying makes it the same as suspect today; while in reality it should be "less of a priority".
We could reflect this somehow in how we ping unreachables.

SWIM: Remove ack incarnation, use gossip about self instead?

Proposal by @avolokhov


Remove incarnation from ack payload and put information about pinged instance into response payload.
This will deduplicate logic of membership dissemination code and condense it in gossip payload processing.

Motivation:

Current SWIM protocol has two ways to notify about group membership: membership about everyone except pinged instance is disseminated in gossip payload, and membership about self is stored in a dedicated incarnation field. This forces us to have essentially the same membership processing code in two places. This PR removes this duplication and makes gossip payload the only way to pass group membership information.

Modifications:

  • SWIM Ack ping response no longer contains incarnation field.
  • SWIM gossip payload will always carry information about pinged instance.
  • SWIMShell will only modify group membership information based on gossip payload

Need to consider the deeper impact of such change.

Add anti-entropy mechanism ("state sync")

Because in the raw SWIM protocol we have here nodes may not be able to discover each-other in a very flaky network (esp. in the UDP implementation this is problematic).

The gossips are count limited when they are spread, and as such when cluster partitions happen the split may not be recovered from (in general raw SWIM is not great at dealing with this, and we don't necessarily want to solve it at this level entirely, but we should do better than just ignoring the topic in the UDP impl). A periodic triggered "sync all members" with some random nodes is used to avoid these problems; or a high level membership protocol sitting on top of SWIM can achieve the same results.

This is more of a shell details than an instance thing though perhaps?

SWIM: Tune default probing timeouts

Best to put it onto some amazon instances poke around a bit and select something realistic / good "default", something like a few seconds is likely fine.

Implement the `useUnreachableState` flag

We can operate in two modes, one with unreachability and one with the classic failure detection means .dead mode.

The unreachable state pattern is not useful for most system and is to be disabled by default.

The implementation today uses the unreachable state, emits events about unreachable and awaits that someone calls confirm dead. We should only do this if useUnreachableState is true.

        /// Optional SWIM Protocol Extension: `SWIM.MemberStatus.unreachable`
        ///
        /// This is a custom extension to the standard SWIM statuses which first moves a member into unreachable state,
        /// while still trying to ping it, while awaiting for a final "mark it `.dead` now" from an external system.
        ///
        /// This allows for collaboration between external and internal monitoring systems before committing a node as `.dead`.
        /// The `.unreachable` state IS gossiped throughout the cluster same as alive/suspect are, while a `.dead` member is not gossiped anymore,
        /// as it is effectively removed from the membership. This allows for additional spreading of the unreachable observation throughout
        /// the cluster, as an observation, but not as an action (of removing given member).
        ///
        /// The `.unreachable` state therefore from a protocol perspective, is equivalent to a `.suspect` member status.
        ///
        /// Unless you _know_ you need unreachability, do not enable this mode, as it requires additional actions to be taken,
        /// to confirm a node as dead, complicating the failure detection and node pruning.
        ///
        /// By default this option is disabled, and the SWIM implementation behaves same as documented in the papers,
        /// meaning that when a node remains unresponsive for an exceeded amount of time it is marked as `.dead` immediately.
        public var useUnreachableState: Bool = false

Integrate swift-tracing into SWIM

It would be quite nice to optionally be able to pass around baggage context in such way that tracing will just work:

  • The Instance should return baggage context whenever it yields a send... directive
  • the Peer protocols should gain a context parameter (BaggageContext)
  • implementations should pass the context always
    • if a tracer is enabled, this will contain the trace ids so tracing "just works"

we'd small fun traces of the pings and pingReq (this one is more interesting since it performs a "hop" through a node, so that's nice to view).

We should also visualize a pingRequest as containing all of the pings this causes.

Tracing: https://github.com/slashmo/gsoc-swift-tracing

Cleanup the PingRequestResponseDirective

It is not easy to act on and implementations end up using default: to ignore most cases:

    public enum PingRequestResponseDirective {
        case gossipProcessed(GossipProcessedDirective)

        case alive(previousStatus: SWIM.Status) // TODO: offer a membership change option rather?
        case nackReceived
        /// Indicates that the `target` of the ping response is not known to this peer anymore,
        /// it could be that we already marked it as dead and removed it.
        ///
        /// No additional action, except optionally some debug logging should be performed.
        case unknownMember
        case newlySuspect(previousStatus: SWIM.Status, suspect: SWIM.Member)
        case alreadySuspect
        case alreadyUnreachable
        case alreadyDead
        /// The incoming gossip is older than already known information about the target peer (by incarnation), and was (safely) ignored.
        /// The current status of the peer is as returned in `currentStatus`.
        case ignoredDueToOlderStatus(currentStatus: SWIM.Status)
    }

it should rather have gossip processed and emit change? if a change is to be emitted and additional information but only informational what the status is now

Fix contributions stats

Equalize the contributions -- the first commit here should have (and did before) attribute all 3 main authors but it seems github doesnt count it 🤔

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.