Giter Club home page Giter Club logo

Comments (25)

misterion avatar misterion commented on August 18, 2024

@umpirsky thanks for good words. I think the problem is forking in php. I mean the forking process in php has some limitations and restrictions. One of them - resource sharing between forked process but in your code problem is $channel->close(); in first forked process. Then you open connection with $connection->channel(); you create socket in master process. Then you fork process you share this socket and its state between 3 separete process - master, child 1 and child 2. Then you send you message in first forked process you close channel but actualy change state of socket in all processes. So the child 2 now in undefined behaviour state. The right strategy in this case - has one connection per consumer process opened in it context:

$processManager = new ProcessManager();

$processManager->fork(function (Process $process) use ($channel) {
    echo 'Publishing from '.$process->getPid().PHP_EOL;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    $channel->queue_declare('q1');
    $channel->basic_publish(new AMQPMessage('Hello from '.$process->getPid()), null, 'q1');
    $channel->close();
});

$processManager->fork(function (Process $process) use ($channel) {
    echo 'Consuming from '.$process->getPid().PHP_EOL;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    $channel->queue_declare('q1');
    $channel->basic_consume('q1', '', false, false, false, false, function ($message) {
        echo 'Consuming message: '.$message->body.PHP_EOL;
    });

    while (count($channel->callbacks)) {
        $channel->wait();
    }
});

$processManager->wait();

More one thing about this - sending message with AMQP is very lightweight operation. The most costly part of it - open connection to AMQP server. So it looks like overhead to fork process just for message pushing. Also it may be interestion for you look at https://github.com/misterion/ko-worker project (based on ko-process). Ko-worker project created to simplify work with workers based on AMQP.

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

@misterion Thanks, that worked like a charm! 👍

That means that connection must always be created inside forked process, which kinda makes it imposible to inject connection or channel like I was planning to.

ko-worker looks interesting too, thanks for sharig.

Keep doing great work! 👍

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

For the reference, see https://github.com/alanxz/rabbitmq-c#threading.

from ko-process.

misterion avatar misterion commented on August 18, 2024

Yes, if using fork in php you should open resources in fork context. In your case you can inject parameters to create connection in forked process. Or use some kind of ioc container in which connection is not actualy esteblished.

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

@misterion Exactly my idea.

I am playing with Ko\AmqpBroker now. Because he can do exactly that, provide fresh producer/consumer based on config.

I am trying again to run producer and consumer in separate child processes:

$broker = new Ko\AmqpBroker($config);
$processManager = new ProcessManager();

$processManager->fork(function(Process $process) use ($broker) {
    $producer = $broker->getProducer('extractor');
    $message = 'Hello world ' . time();
    echo 'Sending message `' . $message . '`' . PHP_EOL ;
    $producer->publish($message);

});

$processManager->fork(function(Process $process) use ($broker) {
    $consumer = $broker->getConsumer('extractor');
    $message = 'Hello world ' . time();
    echo 'Receiving message `' . $message . '`' . PHP_EOL ;
    $consumer->consume(function (AMQPEnvelope $envelope) {
    echo 'Receive `' . $envelope->getBody() . PHP_EOL;

    return true;
    }, AMQP_AUTOACK);
});

$processManager->wait();

I get:

Sending message `Hello world 1454345755`
Receiving message `Hello world 1454345755`

Looks like it is never consumed. The strange thing is that the queue is empty.

Am I missing something again?

Full example on:
https://github.com/umpirsky/Extraload/blob/feature/queue/examples/test.php

from ko-process.

misterion avatar misterion commented on August 18, 2024

Could you post your config?

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

@misterion Sure, see https://github.com/umpirsky/Extraload/blob/feature/queue/examples/config.yml.

Thanks.

from ko-process.

misterion avatar misterion commented on August 18, 2024

Please change your durability to true (https://www.rabbitmq.com/tutorials/tutorial-two-php.html) or start the consumer first. In RabbitMQ is queue is not durable all messages would be lost if here is no any consumers on it.

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

@misterion Thanks. That is true in deed, nice catch.

But, after changing both umpirsky/Extraload@b9fecca still same result. Must be something else as well.

Looks like message never reach the queue, I see consumer is active, but looks like there are no messages to consume.

I narrow down my script to:

$broker->getProducer('extractor')->publish('Hello');

But looks like this message never gets to queue.

selection_088

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

@misterion Any opinion on this?

from ko-process.

misterion avatar misterion commented on August 18, 2024

Sorry @umpirsky, i looze your previous message so just see it. Did you try with durable queue?

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

@misterion Yes, changed in umpirsky/Extraload@b9fecca#diff-8061b7ac4547367eeedccf63f38d5f2cR16, same result. Thanks.

from ko-process.

misterion avatar misterion commented on August 18, 2024

In your sample all looks fine. Did you try to execute them without forking? P.s. i can try to play with it myself in monday - now have no avcess to pc with linux on board.

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

@misterion I did before, it didn't work.

Now I tried it again, after:

$broker->getProducer('extractor')->publish('Hello');

No messages in queue, which is strange.

And consume ends up with Segmentation fault (core dumped).

If you can try it next week that would be cool.

I know I can always achieve this with ko-process and videlalvaro/php-amqplib, but I wanted to give ko-worker a try, since it does all this for me.

Thanks.

from ko-process.

misterion avatar misterion commented on August 18, 2024

I look at your code - the problem is consumer routing-keys defined as *. You mean this as any message from extractor_exchange but actualy (you are using direct type of excange) this mean only full match routing keys are supported with no masks (as in topic). Look at https://www.rabbitmq.com/getstarted.html for more details. To fast fix your sample just change routing-keys: '*' to routing-keys: '' in config.yaml. Or change exchange_options type to topic and send non empty routing key with publish method.

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

Thanks @misterion! 👍

umpirsky/Extraload@441b5a0 fixed it in deed.

I also tried to change exchange_options type to topic and send non empty routing key with publish method, but I got AMQPExchangeException: 'Server channel error: 406, message: PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'extractor_exchange' in vhost '/': received 'topic' but current is 'direct''.

Probably something else need to be changed as well.

However, thank you very much for you help. Really appreciate it.

from ko-process.

misterion avatar misterion commented on August 18, 2024

In RabbitMQ you can`t redeclare exchange type without recreating it. So if you change type from 'direct' to 'topic' you should delete exchange first.

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

@misterion Oh, I see. Thanks.

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

@misterion I just came across very strange use case, which makes ko-worker/ko-process combo do not work for me.

My test script https://github.com/umpirsky/Extraload/blob/feature/queue/examples/test.php works and output is:

Receiving message `Hello world 1455114747`
Sending message `Hello world 1455114747`
Receive `Hello world 1455114747

So far so good.

But adding any use statements, like:

diff --git a/examples/test.php b/examples/test.php
index b2a9be5..42eceb5 100644
--- a/examples/test.php
+++ b/examples/test.php
@@ -3,6 +3,7 @@

 require __DIR__.'/../vendor/autoload.php';

+use Symfony\Component\Yaml\Yaml;
 use Ko\ProcessManager;
 use Ko\Process;

makes it stop consuming messages and the output is:

Receiving message `Hello world 1455114886`
Sending message `Hello world 1455114886`

I spent a lot of time narrowing it down to this, but why it happens is a total mistery for me.

If you have any clue, please hint me. It's very strange.

Thanks for your assistance so far.

from ko-process.

misterion avatar misterion commented on August 18, 2024

Looks very strange. I`l try to run it myself tomorrow and write the results. The last used code in your feature/queue branch? Could you also provide you php and AMQP extension versions?

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

@misterion Yes, the linked one is good, by applying diff from my previous comment it stops working.

$ php -v
PHP 5.6.11-1ubuntu3.1 (cli) 
Copyright (c) 1997-2015 The PHP Group
Zend Engine v2.6.0, Copyright (c) 1998-2015 Zend Technologies
    with Zend OPcache v7.0.6-dev, Copyright (c) 1999-2015, by Zend Technologies
    with Xdebug v2.3.3, Copyright (c) 2002-2015, by Derick Rethans

AMQP version: 1.6.1

Thanks!

from ko-process.

misterion avatar misterion commented on August 18, 2024

Hi @umpirsky. Just test your code on php 5.6.10 (my dev stand), 5.6.11 (local dev stand at my work) both with AMQP 1.6.1 and 1.7.0. Have same output (with diff time) for all 4 cases:

Receiving message `Hello world 1455201401`
Sending message `Hello world 1455201401`
Receive `Hello world 1455201401

Here is full source i test with:

#!/usr/bin/env php
<?php

require __DIR__.'/../vendor/autoload.php';

use Symfony\Component\Yaml\Yaml;
use Ko\ProcessManager;
use Ko\Process;

$config = Yaml::parse(file_get_contents(__DIR__.'/config.yml'));
$broker = new Ko\AmqpBroker($config);
$processManager = new ProcessManager();

$processManager->fork(function(Process $process) use ($broker) {
    $consumer = $broker->getConsumer('extractor');
    $message = 'Hello world ' . time();
    echo 'Receiving message `' . $message . '`' . PHP_EOL ;

    $consumer->consume(function (AMQPEnvelope $envelope) {
        echo 'Receive `' . $envelope->getBody() . PHP_EOL;
        return true;
    }, AMQP_AUTOACK);
});
$processManager->fork(function(Process $process) use ($broker) {
    $producer = $broker->getProducer('extractor');
    $message = 'Hello world ' . time();
    echo 'Sending message `' . $message . '`' . PHP_EOL ;
    $producer->publish($message);
});

$processManager->wait();

And my config file:

connections:
  default:
      host: 'localhost'
      port: 5672
      login: 'guest'
      password: 'guest'
producers:
  extractor:
    connection: default
    exchange_options: {name: 'extractor_exchange', type: direct, durable: 1, passive: 0}
consumers:
  extractor:
    connection: default
    queue_options:
      name: 'extractor_queue'
      durable: 1
      autodelete: 0
      binding: {name: 'extractor_exchange', routing-keys: ''}

I also asked my devops friend about this. He suggest to try to reboot RabbitMQ and your dev stand to "purity of the experiment". Could you try to reboot you dev server?

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

@misterion Thanks for trying it out.

And seems like your devops friend was right (say hello from me).

So, I just run it again, and if I wait for some time (which I usually don't do because I am not patient), I got:

ubuntu_105

Then I noticed durable: 1 in your config and I did:

diff --git a/examples/config.yml b/examples/config.yml
index 33ee89b..45ed4b5 100644
--- a/examples/config.yml
+++ b/examples/config.yml
@@ -7,7 +7,7 @@ connections:
 producers:
   extractor:
     connection: default
-    exchange_options: {name: 'extractor_exchange', type: direct, durable: 0, passive: 0}
+    exchange_options: {name: 'extractor_exchange', type: direct, durable: 1, passive: 0}

Then some errors like durable is false, expected true...

Then I tried to restart RabbitMQ, got some erlang segmentation faults. Then I reboot my machine.

And guess what? It finally works.

So, I assume that there are some issues with my current setup, but if experiment is pure, it should work.

Thanks again for your help, you will get credits in https://github.com/umpirsky/Extraload readme. ;)

from ko-process.

misterion avatar misterion commented on August 18, 2024

Thanks for using my labrary ;) It`s realy nice to know that something else need it :)

from ko-process.

umpirsky avatar umpirsky commented on August 18, 2024

@misterion Oh I need it. :)

from ko-process.

Related Issues (14)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.