Giter Club home page Giter Club logo

sync's Introduction

amphp/sync

AMPHP is a collection of event-driven libraries for PHP designed with fibers and concurrency in mind. amphp/sync specifically provides synchronization primitives such as locks and semaphores for asynchronous and concurrent programming.

Latest Release MIT License

Installation

This package can be installed as a Composer dependency.

composer require amphp/sync

Usage

The weak link when managing concurrency is humans; so amphp/sync provides abstractions to hide some complexity.

Mutex

Mutual exclusion can be achieved using Amp\Sync\synchronized() and any Mutex implementation, or by manually using the Mutex instance to acquire a Lock.

As long as the resulting Lock object isn't released using Lock::release() or by being garbage collected, the holder of the lock can exclusively run some code as long as all other parties running the same code also acquire a lock before doing so.

function writeExclusively(Amp\Sync\Mutex $mutex, string $filePath, string $data) {
    $lock = $mutex->acquire();
    
    try {
        Amp\File\write($filePath, $data);
    } finally {
        $lock->release();
    }
}
function writeExclusively(Amp\Sync\Mutex $mutex, string $filePath, string $data) {
    Amp\Sync\synchronized($mutex, fn () => Amp\File\write($filePath, $data));
}

Semaphore

Semaphores are another synchronization primitive in addition to mutual exclusion.

Instead of providing exclusive access to a single party, they provide access to a limited set of N parties at the same time. This makes them great to control concurrency, e.g. limiting an HTTP client to X concurrent requests, so the HTTP server doesn't get overwhelmed.

Similar to Mutex, Lock instances can be acquired using Semaphore::acquire(). Please refer to the Mutex documentation for additional usage documentation, as they're basically equivalent except for the fact that Mutex is always a Semaphore with a count of exactly one party.

In many cases you can use amphp/pipeline instead of directly using a Semaphore.

Parcel

A Parcel is used to synchronize access to a value across multiple execution contexts, such as multiple coroutines or multiple processes. The example below demonstrates using a LocalParcel to share an integer between two coroutines.

use Amp\Future;
use Amp\Sync\LocalMutex;
use Amp\Sync\LocalParcel;
use function Amp\async;
use function Amp\delay;

$parcel = new LocalParcel(new LocalMutex(), 42);

$future1 = async(function () use ($parcel): void {
    echo "Coroutine 1 started\n";

    $value = $parcel->synchronized(function (int $value): int {
        delay(1); // Delay for 1s to simulate I/O.
        return $value * 2;
    });

    echo "Value after access in coroutine 1: ", $value, "\n";
});

$future2 = async(function () use ($parcel): void {
    echo "Coroutine 2 started\n";

    $value = $parcel->synchronized(function (int $value): int {
        delay(1); // Delay again in this coroutine.
        return $value + 8;
    });

    echo "Value after access in coroutine 2: ", $value, "\n";
});

Future\await([$future1, $future2]);

Channels

Channels are used to send data between execution contexts, such as multiple coroutines or multiple processes. The example below shares two Channel between two coroutines. These channels are connected. Data sent on a channel is received on the paired channel and vice-versa.

use Amp\Future;
use function Amp\async;
use function Amp\delay;

[$left, $right] = createChannelPair();

$future1 = async(function () use ($left): void {
    echo "Coroutine 1 started\n";
    delay(1); // Delay to simulate I/O.
    $left->send(42);
    $value = $left->receive();
    echo "Received ", $value, " in coroutine 1\n";
});

$future2 = async(function () use ($right): void {
    echo "Coroutine 2 started\n";
    $value = $right->receive();
    echo "Received ", $value, " in coroutine 2\n";
    delay(1); // Delay to simulate I/O.
    $right->send($value * 2);
});

Future\await([$future1, $future2]);

Sharing data between processes

To share data between processes in PHP, the data must be serializable and use external storage or an IPC (inter-process communication) channel.

Parcels in external storage

SharedMemoryParcel uses shared memory conjunction with PosixSemaphore wrapped in SemaphoreMutex (though another cross-context mutex implementation may be used, such as RedisMutex in amphp/redis).

Note ext-shmop and ext-sysvmsg are required for SharedMemoryParcel and PosixSemaphore respectively.

amphp/redis provides RedisParcel for storing shared data in Redis.

Channels over pipes

Channels between processes can be created by layering serialization (Native PHP serialization, JSON serialization, etc.) on a pipe between those processes.

StreamChannel in amphp/byte-stream creates a channel from any ReadableStream and WritableStream. This allows a channel to be created from a variety of stream sources, such as sockets or process pipes.

ProcessContext and task Execution objects in amphp/parallel provide a Channel to send data between processes and tasks.

Concurrency Approaches

Given you have a list of URLs you want to crawl, let's discuss a few possible approaches. For simplicity, we will assume a fetch function already exists, which takes a URL and returns the HTTP status code (which is everything we want to know for these examples).

Approach 1: Sequential

Simple loop using non-blocking I/O, but no concurrency while fetching the individual URLs; starts the second request as soon as the first completed.

$urls = [...];

$results = [];

foreach ($urls as $url) {
    $results[$url] = fetch($url);
}

var_dump($results);

Approach 2: Everything Concurrently

Almost the same loop, but awaiting all operations at once; starts all requests immediately. Might not be feasible with too many URLs.

$urls = [...];

$results = [];

foreach ($urls as $url) {
    $results[$url] = Amp\async(fetch(...), $url);
}

$results = Amp\Future\await($results);

var_dump($results);

Approach 3: Concurrent Chunks

Splitting the jobs into chunks of ten; all requests within a chunk are made concurrently, but each chunk sequentially, so the timing for each chunk depends on the slowest response; starts the eleventh request as soon as the first ten requests completed.

$urls = [...];

$results = [];

foreach (\array_chunk($urls, 10) as $chunk) {
    $futures = [];

    foreach ($chunk as $url) {
        $futures[$url] = Amp\async(fetch(...), $url);
    }

    $results = \array_merge($results, Amp\Future\await($futures));
}

var_dump($results);

Approach 4: ConcurrentIterator

The amphp/pipeline library provides concurrent iterators which can be used to process and consume data concurrently in multiple fibers.

use Amp\Pipeline\Pipeline;
use function Amp\delay;

$urls = [...];

$results = Pipeline::fromIterable($urls)
    ->concurrent(10) // Process up to 10 URLs concurrently
    ->unordered() // Results may arrive out of order
    ->map(fetch(...)) // Map each URL to fetch(...)
    ->toArray();

var_dump($results);

See the documentation in amphp/pipeline for more information on using Pipelines for concurrency.

Versioning

amphp/sync follows the semver semantic versioning specification like all other amphp packages.

Security

If you discover any security related issues, please email [email protected] instead of using the issue tracker.

License

The MIT License (MIT). Please see LICENSE for more information.

sync's People

Contributors

b1rdex avatar danog avatar enumag avatar iggyvolz avatar kelunik avatar peter279k avatar prolic avatar rybakit avatar thgs avatar trowski avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sync's Issues

Issue with serializing callables for parallel processing.

I have a bunch of background tasks in a Symfony application and I'm trying to process it parallel using this library. My background tasks are Symfony services with an execute function.

The following code contains a sample implementation of my task engine.

        $noOfTasks = 100;
        $engine = $this;
        wait(each(
            Iterator\fromIterable($this->load_tasks_from_db($noOfTasks)),
            new LocalSemaphore(10),
            static function ($task) use($engine) {
                $service = $engine->container->get($task->getProcessor());
                yield Worker\enqueueCallable([$service, 'execute'], $task);
            }
        ));

Here when the $service contains a lot of dependencies Amp library throwing the following error

app.CRITICAL: The given data could not be serialized: Serialization of 'SimpleXMLElement' is not allowed

It's is coming from NativeSerializer, even if I try the following code also I'm getting the above error.

$service = $this->container->get($task->getProcessor());
try {
     \serialize($service);
} catch (\Throwable $exception) {
    dump(sprintf('The given data could not be serialized: %s', $exception->getMessage()));
}

This is not a problem with Amp library but my question is how we can execute callables parallelly if it contains a lot of dependencies?

Missing example

Can you give me an example how to actually use this library? The documentation only contains the interfaces but nothing else. I assume that I don't actually need to implement them since this package already contains implementations. So how do I use them?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.