phoenixframework / phoenix_pubsub Goto Github PK
View Code? Open in Web Editor NEWDistributed PubSub and Presence platform for the Phoenix Framework
Home Page: http://www.phoenixframework.org/
License: MIT License
Distributed PubSub and Presence platform for the Phoenix Framework
Home Page: http://www.phoenixframework.org/
License: MIT License
When Phoenix.Tracker nodes connect for the first time, and boths nodes already hold presences, their state will not merge correctly.
Pull https://github.com/szlend/tracker_test
iex --sname n1 --cookie test -S mix
for n <- 1..51, do: Phoenix.Tracker.track(TrackerTest.Tracker, self(), "test", "#{inspect Node.self()}_#{n}", %{})
iex --sname n2 --cookie test -S mix
for n <- 1..51, do: Phoenix.Tracker.track(TrackerTest.Tracker, self(), "test", "#{inspect Node.self()}_#{n}", %{})
Node.connect :"n1@your_hostname"
Wait a while for Phoenix.Tracker to do it's thing, then:
Phoenix.Tracker.list(TrackerTest.Tracker, "test") |> Enum.count
# Node 1
> 51
# Node 2
> 102
I can only reproduce this issue when the number of tracked presences is above 50 on each node. Looking at the debug logs, I can see one difference:
When N > 50, I get:
sending delta generation 1
When N <= 50, I get:
falling back to sending entire crdt
With regards to phoenixframework/phoenix#3027
I have tracked it down to shard.ex
@spec list(pid, topic) :: [presence]
def list(server_pid, topic) do
server_pid
|> GenServer.call({:list, topic})
|> State.get_by_topic(topic)
end
The spec needs to be updated to @spec list(pid | atom, topic) :: [presence]
In addition, there are several other dialzyer issues in the shard.ex file I have fixes for.
I notice that the dialyzer package is no longer supported, could I also switch to the recommended https://github.com/Comcast/dialyzex or https://github.com/jeremyjh/dialyxir?
Hey @chrismccord !
Some folks are running into an issue the following issue. Given a supervision tree that looks like:
children = [
supervisor(App.Endpoint, []),
supervisor(Absinthe.Subscription, [App.Endpoint]),
]
The Absinthe.Subscription
starts a set of proxy processes which, as part of their boot process, do:
def init({pubsub, shard}) d
:ok = pubsub.subscribe(topic(shard))
{:ok, %__MODULE__{pubsub: pubsub}}
end
pubsub
in this case is the App.Endpoint
, per the argument to the Absinthe.Subscription
child spec.
This produces the following error:
=INFO REPORT==== 13-Jul-2017::15:12:30 ===
application: logger
exited: stopped
type: temporary
** (Mix) Could not start application frontend: Frontend.Application.start(:normal, []) returned an error: shutdown: failed to start child: Absinthe.Subscription
** (EXIT) shutdown: failed to start child: Absinthe.Subscription.ProxySupervisor
** (EXIT) shutdown: failed to start child: 1
** (EXIT) an exception was raised:
** (ArgumentError) argument error
(stdlib) :ets.lookup(nil, :subscribe)
(phoenix_pubsub) lib/phoenix/pubsub.ex:288: Phoenix.PubSub.call/3
(absinthe) lib/absinthe/subscription/proxy.ex:19: Absinthe.Subscription.Proxy.init/1
(stdlib) gen_server.erl:365: :gen_server.init_it/2
(stdlib) gen_server.erl:333: :gen_server.init_it/6
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
I haven't run into this myself so it's possibly config related, but I'm having difficulty tracking down what might be wrong.
I am using pubsub without phoenix in version 1.1.2
and there are some tests for the tracker behaviour. Now and then the tests fail with the following message:
** (UndefinedFunctionError) function Phoenix.PubSub.broadcast_from!/4 is undefined or private
(phoenix_pubsub) Phoenix.PubSub.broadcast_from!(:results, #PID<0.340.0>, "phx_presence:Elixir.Styx.PubSub.Tracker_shard3", {:pub, :heartbeat, {:nonode@nohost, 1564048528061830}, %Phoenix.Tracker.State{clouds: %{{:nonode@nohost, 1564048528061830} => #MapSet<[{{:nonode@nohost, 1564048528061830}, 1}, {{:nonode@nohost, 1564048528061830}, 2}]>}, context: %{}, delta: :unset, mode: :delta, pids: nil, range: {%{{:nonode@nohost, 1564048528061830} => 0}, %{{:nonode@nohost, 1564048528061830} => 2}}, replica: {:nonode@nohost, 1564048528061830}, replicas: %{}, values: %{}}, {{:nonode@nohost, 1564048528061830}, %{{:nonode@nohost, 1564048528061830} => 2}}})
(phoenix_pubsub) lib/phoenix/tracker/shard.ex:453: Phoenix.Tracker.Shard.broadcast_delta_heartbeat/1
(phoenix_pubsub) lib/phoenix/tracker/shard.ex:169: Phoenix.Tracker.Shard.handle_info/2
(stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:711: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: :heartbeat
In which cases can this happen?
If you have two channels, call them AChannel and BChannel defined as:
defmodule AChannel do
def join("room:lobby", _, socket) do
send self(), :after_join
{:ok, socket}
end
def handle_info(:after_join, socket) do
Presence.track(socket, "1234", %{})
{:noreply, socket}
end
end
defmodule BChannel do
def join("room:lobby", _, socket) do
{:ok, socket}
end
end
And you deploy this server in a release, then stopping or restarting the release (eg. bin/myapp restart
) has subtly different behavior:
In the AChannel case, when the server is restarted, the client will receive an onClose
event.
In the BChannel case, when the server is restarted, the client will receive an onError
event!
This results in the client for AChannel not attempting to reconnect to the channel when the server comes back up, but the client in BChannel will reconnect.
I believe this is due to this line:
https://github.com/phoenixframework/phoenix_pubsub/blob/master/lib/phoenix/tracker.ex#L402
I think that this causes the process in AChannel to get shutdown correctly when the supervision trees are brought down, so it sends a close message, whereas people are probably expecting the behavior exhibited by BChannel.
Not sure what the fix is here - setting trap_exit on the channel process seems like overkill. Perhaps the Tracker should only set up a monitor?
Hi, I've been playing with phoenix_pubsub
with the tracker feature, I've built an http endpoint to show how many tracked clients the system holds, using this:
`size = length( Phoenix.Tracker.list(My.Tracker, "bucket"))`
when system has more than 10k tracker in the registry the length function takes for ever to process. is there any way to call a count function over the Phoenix.PubSub.PG2
registry in a more efficient way ?
As explained in the depreciated Supervisor.Spec documentation, it's recommended to use the new child specifications outlined in the Supervisor module instead. I think we should update the Tracker documentation to reflect this. I would love to make a PR.
Started to study a bit about Phoenix.PubSub
and added {:phoenix_pubsub, "~> 2.0"}
as a dependency - as the front page instructs. But it seems the latest version is 1.1.2
and 2.0
is not available in hex.pm
:
** (Mix) No matching version for phoenix_pubsub ~> 2.0 (from: mix.exs) in registry
The latest version is: 1.1.2
Should the GitHub repo be used directly? How long will it take to show up in hex.pm
?
Tracker.Shard
uses a GenServer.call
which doesn't accept any options from the caller. This call can be expensive in a large shard or one which is under heavy load.
Is it worthwhile to expose a timeout option the entire way through the function calls? My thought is to set a low timeout value when writing a listeners?
function and then just assuming true if the function times out.
I'm happy to add this in but wanted to run it by the maintainers first. I could see a desire to expose this through all of the Tracker public APIs.
I have two connected nodes. One runs a Phoenix app, the other is plain ol' Elixir. The Phoenix node has pubsub started automatically, as it is a dependncy. The latter manually starts PubSub.
The Phoenix node broadcasts a message:
Phoenix.PubSub.broadcast!(
Hangman.Connector,
"hangman:game_stats",
{:game_stats, "123", "cat", true, ~w{ a b c }}
)
On the plain ol' Elixir node, I get an error:
14:27:13.889 [error] GenServer Hangman.Connector terminating
** (UndefinedFunctionError) function Phoenix.Channel.Server.fastlane/3 is undefined (module Phoenix.Channel.Server is not available)
Phoenix.Channel.Server.fastlane([{#PID<0.277.0>, nil}], :none, {:game_stats, "123", "cat", true, ["a", "b", "c"]})
(phoenix_pubsub) lib/phoenix/pubsub/local.ex:101: anonymous fn/7 in Phoenix.PubSub.Local.broadcast/6
(elixir) lib/enum.ex:2940: Enum.reduce_range_inc/4
(phoenix_pubsub) lib/phoenix/pubsub/local.ex:100: Phoenix.PubSub.Local.broadcast/6
(phoenix_pubsub) lib/phoenix/pubsub/pg2_server.ex:52: Phoenix.PubSub.PG2Server.handle_info/2
Last message: {:forward_to_local, Phoenix.Channel.Server, :none, "hangman:game_stats", {:game_stats, "123", "cat", true, ["a", "b", "c"]}}
State: %{name: Hangman.Connector, pool_size: 4}
I'm guessing the problem is that the Phoenix side assumes the other side is a Phoenix app and passes the Phoenix.Channel.Server module as the fastline implementation (which doesn't exist on the POE node).
Is this bad config on my part?
Dave
https://hex.pm/packages/phoenix_pubsub
Was looking for the Phoenix.PubSub.Tracker docs on hex and saw they weren't published.
But we should still add code comments to them explaining their roles and responsibility. For example, what is the VNode, State, etc, etc.
We use Phoenix.Tracker for keeping the state of currently active calls. Each call has a certain state in the metadata of the tracker. When a process updates the state in the terminate
callback, the update of the metadata is only published locally. The remote node doesn't see this updated metadata but only sees the process leave since it terminated within the broadcast_window
.
I made an example project that demoes the above scenario. You can find it here: https://github.com/tverlaan/presence_multinode
We discussed possible improvements and alternative solutions during ElixirConfEU, but we didn't come to a conclusion just yet.
To satisfy @josevalim's ocd :D
as Supervisor.Spec is deprecated?
At your earliest convenience @asonge . I can even handle the namespace rename if you just want to copy the impl and test files over. I just want want to make sure your contribution stays intact :)
Presence works fine
I have three nodes with these hardware
2 * Intel(R) Xeon(R) CPU E5-2695 v3 @ 2.30GHz
64 GB Ram
256 SSD
When each of the servers is under ~300-400 channel request per second and ~35000 user connection (System load 15/56), I get timeout error from list
and track
methods.
Presence.list error:
** (stop) exited in: GenServer.call(App.Presence, {:list, "user:e6728e0c-3170-4863-b8b0-8e7e0b00e813"}, 5000)
** (EXIT) time out
(elixir) lib/gen_server.ex:737: GenServer.call/3
lib/phoenix/tracker.ex:199: Phoenix.Tracker.list/2
(phoenix) lib/phoenix/presence.ex:236: Phoenix.Presence.list/2
(app) web/channels/presence.ex:8: App.Presence.user_online?/1
(app) web/channels/user_channel.ex:318: anonymous fn/1 in App.UserChannel.push_notification/1
(elixir) lib/task/supervised.ex:85: Task.Supervised.do_apply/2
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: #Function<2.63574933/0 in App.UserChannel./1>
Args: []
Presence.track error:
** (stop) exited in: GenServer.call(App.Presence, {:track, #PID<0.28571.6>, "user:9cab5aab-7737-49
bc-a4a2-faba1380b95a", "online", %{}}, 5000)
** (EXIT) time out
(elixir) lib/gen_server.ex:737: GenServer.call/3
(app) web/channels/user_channel.ex:67: anonymous fn/1 in App.UserChannel.handle_info/2
(elixir) lib/task/supervised.ex:85: Task.Supervised.do_apply/2
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: #Function<1.63574933/0 in App.UserChannel.handle_info/2>
Args: []
This is the track code section:
with {:ok, _} <- Presence.track(socket, "online", %{}) do
### do somethings
end
{:noreply, socket}
This is the list code section:
def user_online?(user_id) do
"user:" <> user_id
|> Presence.list
|> Map.has_key?("online")
end
And less copying
We ran into a case where the process that published to a channel is also a subscriber of that channel. Is there an easy way to pass along a list of pids that should NOT not receive the message even though they are a subscriber? It would be pretty easy for us to hack this into phoenix_pubsub but I was just wondering if there was already support such an operation before we pulled out the machete :)
Thanks,
-Chris
I noticed that it's not possible to start Phoenix.PubSub.PG2
by using
children = [
{Phoenix.PubSub.PG2, [:pubsub_name, []]}
]
...
Supervisor.start_link(children, opts)
because it result in in ** (UndefinedFunctionError) function Phoenix.PubSub.PG2Server.start_link/1 is undefined or private.
.
Is this the intended behaviour or am I missing anything?
Thanks!
In the documentation (https://hexdocs.pm/phoenix_pubsub/Phoenix.Tracker.html) under the heading "Implementing a Tracker" where it does:
defmodule MyTracker do
@behavior Phoenix.Tracker
...
It then goes on to define a start_link
that calls GenServer.start_link
Should that sample code not include use GenServer
as well? Without it, the module does not define child_spec
and, like #94, problems with the deprecation of Supervisor.Spec
ensue.
(if the change is appropriate, I would be happy to put together a pull request to make it)
Can't find way to make it works.
I.e.:
Phoenix.PubSub.subscribe(:freeling, "freeling:#{id}", fastlane: {some_module_pid, SomeModule, ["some_id1"]})
Normal subscriber always called, but not fastlane destination.
Also tried to print subscribers, all right, it has fastlanes:
[{#PID<0.959.0>,
{#PID<0.1006.0>, SomeModule,
["bdb59e6a-2a5d-11e6-be56-685b35999bd2"]}}]
[{#PID<0.959.0>,
{#PID<0.1007.0>, SomeModule,
["bdb59e6a-2a5d-11e6-be56-685b35999bd2"]}}]
Seems that sending to local pid is forced.
lib/phoenix/pubsub/local.ex#L115
defp do_broadcast(nil, pubsub_server, shard, from, topic, msg) do
pubsub_server
|> subscribers_with_fastlanes(topic, shard)
|> Enum.each(fn
{pid, _} when pid == from -> :noop
{pid, _} -> send(pid, msg)
end)
end
I've noticed that lasp/partisan enables erlang/elixir to scale up to 1k nodes, however, elixir/phoenix itself is totally built upon distributed erlang. Does phoenix.pubsub.redis mitigate this problem? What if we somehow want to scale elixir/phoenix upto 1k nodes?
I think I found a memory leak in Phoenix.PubSub.Local with monitor refs and long running subscribers that join topics frequently.
Each time a process subscribes to a topic Process.monitor
is called at
phoenix_pubsub/lib/phoenix_pubsub/local.ex
Line 194 in 9e0e079
The ref isn't captured when it's created and isn't demonitored when the subscriber unsubscribes from the topic. It appears that the only way these monitors are cleaned up is if the subscriber or the Local GenServer process exits.
Below is an iex session using phoenix_pubsub master demonstrating the issue.
Interactive Elixir (1.2.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> {:ok, pubsub} = Phoenix.PubSub.LocalSupervisor.start_link(:monitor_leak_test, 1, [])
{:ok, #PID<0.128.0>}
iex(2)> [{_, shard?, _, _}] = Supervisor.which_children(pubsub)
[{0, #PID<0.129.0>, :supervisor, [Supervisor]}]
iex(3)> [{Phoenix.PubSub.Local, local, _, _}, _gc] = Supervisor.which_children(shard?)
[{Phoenix.PubSub.Local, #PID<0.131.0>, :worker, [Phoenix.PubSub.Local]},
{Phoenix.PubSub.GC, #PID<0.130.0>, :worker, [Phoenix.PubSub.GC]}]
iex(4)> subscriber = spawn fn -> receive do end end
#PID<0.135.0>
iex(5)> :erlang.process_info(local, :memory)
{:memory, 2824}
iex(6)> (1..10000) |> Enum.each(fn(_) -> Phoenix.PubSub.subscribe(:monitor_leak_test, subscriber, "foo") && Phoenix.PubSub.unsubscribe(:monitor_leak_test, subscriber, "foo") end)
:ok
iex(7)> :erlang.process_info(local, :memory)
{:memory, 736752}
iex(8)> :erlang.garbage_collect(local)
true
iex(9)> :erlang.process_info(local, :memory)
{:memory, 722824}
iex(10)> Process.exit(subscriber, :kill)
true
iex(11)> :erlang.garbage_collect(local)
true
iex(12)> :erlang.process_info(local, :memory)
{:memory, 2824}
As always thanks for all the work you do on Phoenix!
We're using Phoenix Presence (latest version in the master branch). We have a kubernetes set up where we have multiple pods running.
We've noticed that every time we restart a pod or do a rolling update then network traffic is up for 20 minutes.
I was able to replicate it in our beta environment when I had 10K online connections and I restarted one pod:
As you can see, traffic went up around 11:53 and came back down around 12:14.
I think it's related to permdown_period
setting which by default is 20 minutes. I tried to replicate this with just phoenix_pubsub library without a web server but wasn't able to. EDIT: It is related. If I changed it to 10, then network traffic was up only for 10 minutes.
I also did a tcpdump inside one pod to see where the traffic is coming from/going to. It was all between the presence servers themselves. I think these are the state synchronization messages.
Do you have any suggestions what to look for or how to gather more information?
Just send a heartbeat if you haven't broadcast any delta in a while. Basically, send an empty delta if no deltas were sent in X seconds and X seconds is the heartbeat time. We can increase it to at least 15 seconds (Erlang would detect a failure after 60 seconds). http://erlang.org/doc/man/kernel_app.html
Every time the delta is sent, we should also include the dotted vector versions.
We're using this library in a non-phoenix project and are defining our supervision tree ourselves. Here's the children I'm defining under a supervisor:
[
supervisor(Task.Supervisor, [[name: MyApp.PubSubTracker.TaskSupervisor]]),
supervisor(Phoenix.PubSub.PG2, [MyApp.PubSub, []]),
worker(MyApp.PubSubTracker, [[
pubsub_server: MyApp.PubSub,
name: MyApp.PubSubTracker,
task_sup: MyApp.PubSubTracker.TaskSupervisor,
]]),
]
We're trying to figure out what restart strategy to use. When I'm dealing with GenServers I've written I can generally figure out if :one_for_one
is safe or if I need to use :rest_for_one
. Here I' not sure, because I don't know the details of how the tracker and pubsub processes interact. Is it safe for them to be restarted individually (allowing us to use :one_for_one
)? Or do we need to use :rest_for_one
?
If possible, it would be nice for this to be in the docs.
Thanks!
Hey!
I am developing a small experimental app for a project I am working on.
I planned on using Phoenix.PubSub
for setting up subscriptions in a cluster of machines so a game process can subscribe to events happening that it cares about.
Due to the high scalability requirements I tested adding and deleting lots of processes early on and saw a huge bottleneck in the Phoenix.PubSub.GC
module.
Subscribing my 50k test processes to 10 topics is done in well under 2s, removing their subscription takes ˜50s with all schedulers maxing out: https://dsh.re/19075
This is obviously a super synthetic benchmark but I was wondering if the GC part has been looked at with a performance perspective in mind.
When a replica is restarted then permdown
is invoked in other nodes and the old instance removed from the State
. However, it seems like not all the old instance info is cleaned up and sometimes the old instance resurfaces. Eventually, this old instance info finds its way back to the restarted replica and the zombie Pids are made available again.
This is easier to reproduce with many nodes and low broadcast periods.
Here the node4
out of 4 nodes is restarted and the old instance {:"[email protected]", 1473221321103120}
shows up along with invalid pids.
AudiaMBP:Dht gabiz$ iex --name [email protected] -S mix
Erlang/OTP 19 [erts-8.0] [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false]
Interactive Elixir (1.3.2) - press Ctrl+C to exit (type h() ENTER for help)
iex([email protected])1>
21:08:45.127 [debug] [email protected]: transfer_req from :"[email protected]"
21:08:45.129 [debug] {:"[email protected]", 1473221325050827}: sending delta generation 1
21:08:45.129 [debug] [email protected]: transfer_req from :"[email protected]"
21:08:45.129 [debug] {:"[email protected]", 1473221325050827}: sending delta generation 1
21:08:45.141 [debug] [email protected]: transfer_req from :"[email protected]"
21:08:45.141 [debug] {:"[email protected]", 1473221325050827}: sending delta generation 1
21:08:45.177 [debug] [email protected]: replica up from :"[email protected]"
21:08:45.177 [debug] [email protected]: replica up from :"[email protected]"
21:08:45.194 [debug] [email protected]: replica up from :"[email protected]"
21:08:45.210 [debug] [email protected]: transfer_req from [email protected]
21:08:45.212 [debug] [email protected]: transfer_ack from :"[email protected]"
iex([email protected])1> GenServer.call(Dispatch.Registry, {:list, "dht"})
%Phoenix.Tracker.State{cloud: #MapSet<[]>,
context: %{{:"[email protected]", 1473221166666092} => 1,
{:"[email protected]", 1473221261730782} => 1,
{:"[email protected]", 1473221317066008} => 1,
{:"[email protected]", 1473221321103120} => 1,
{:"[email protected]", 1473221325050827} => 1},
delta: %Phoenix.Tracker.State{cloud: #MapSet<[]>, context: %{}, delta: :unset,
mode: :delta, pids: nil,
range: {%{{:"[email protected]", 1473221325050827} => 1},
%{{:"[email protected]", 1473221325050827} => 1}},
replica: {:"[email protected]", 1473221325050827}, replicas: %{}, values: %{}},
mode: :normal, pids: 245817, range: {%{}, %{}},
replica: {:"[email protected]", 1473221325050827},
replicas: %{{:"[email protected]", 1473221166666092} => :up,
{:"[email protected]", 1473221261730782} => :up,
{:"[email protected]", 1473221317066008} => :up,
{:"[email protected]", 1473221325050827} => :up}, values: 241720}
iex([email protected])3> Phoenix.Tracker.list(Dispatch.Registry, "dht")
[{#PID<15449.228.0>,
%{node: :"[email protected]", phx_ref: "Pe6xs3WtaDM=", state: :online}},
{#PID<15551.232.0>,
%{node: :"[email protected]", phx_ref: "PGUKQ2hd0fI=", state: :online}},
{#PID<0.236.0>,
%{node: :"[email protected]", phx_ref: "m1qD8cn+eMc=", state: :online}},
{#PID<0.236.0>,
%{node: :"[email protected]", phx_ref: "wDLkDaoxAL8=", state: :online}},
{#PID<15448.268.0>,
%{node: :"[email protected]", phx_ref: "teoKZJQijs8=", state: :online}}]
Should broadcast you are leaving, which is picked up and immediately triggers a permdown on other nodes.
We are depending on Elixir 1.2, aren't we?
We need to validate and raise for violations. For example, a down_period
must be at least 2x broadcast_period
, and the the permdown_period
must be at minimum > down_period
Hello,
I just noticed that while you can use Phoenix.PubSub.subscribe/2
to subscribe to presence changes, you may only do so when your topic is of binary type.
On the other hand, one can track presence with arbitrary things as topic and presence listing does work fine (haven't tested multi-node setup)
So this does work:
iex(dingen@localhost)15> Phoenix.PubSub.subscribe(Dingen.PubSub, "dingen:12345")
:ok
iex(dingen@localhost)7> DingenWeb.Presence.track(self(), "dingen:12345", "dingen", %{})
{:ok, "SMcm2taTxLw="}
iex(dingen@localhost)17> flush
%Phoenix.Socket.Broadcast{event: "presence_diff",
payload: %{joins: %{"dingen" => %{metas: [%{phx_ref: "SMcm2taTxLw="}]}},
leaves: %{}}, topic: "dingen:12345"}
:ok
While this doesn't
iex(dingen@localhost)15> Phoenix.PubSub.subscribe(Dingen.PubSub, {"game", 12345})
** (FunctionClauseError) no function clause matching in Phoenix.PubSub.subscribe/2
The following arguments were given to Phoenix.PubSub.subscribe/2:
# 1
Dingen.PubSub
# 2
{"game", 12345}
Attempted function clauses (showing 1 out of 1):
def subscribe(server, topic) when is_atom(server) and -is_binary(topic)-
(phoenix_pubsub) lib/phoenix/pubsub.ex:151: Phoenix.PubSub.subscribe/2
iex(dingen@localhost)4> DingenWeb.Presence.track(self(), {"game", 12345}, "dingen", %{})
{:ok, "RY19Y4RqB8Q="}
On IRC I heard that I should not count on the behaviour of Presence track allowing for arbitrary topic names so maybe some guard clause is missing. Or maybe the other way around that Presence does support it by design and maybe PubSub should too?
Anyway, I thought I'd rather report it.
Best regards
Currently it is possible to track the same {pid, topic, key} tuple multiple times, which causes error on untrack (since ets returns more then one row). I believe that operation should be idempotent(except for phx_ref), replacing the old record with new meta. Currently it is possible to crash GenServer, which probably should not be the case.
Hey all,
We have integrated the phoenix_pubsub into our application and it is great. I noticed in the Phoenix.Tracker.State module that that "values" ets table is created via the following command:
values: :ets.new(:values, [:ordered_set])
When one calls the Phoenix.Tracker.list
function it is returning a copy of the State object and it then calls an :ets.select
on :values tid. Since this query is happening on a caller process and not the GenServer managing the state object, I was wondered why {read_concurrency, true}
is not given as an option?
For our use cases when a user logins in they pull a friends list and check it against the Tracker system so we are doing a lot of current reads.
Thanks!
Should message ordering be preserved when a broadcast
call returns before another broadcast
call begins? I've done some testing and found some counterexamples -- not sure whether it's a bug or not.
When using pubsub with multi-node, if the nodes are configured with different pool sizes, the following situations can occur:
pool_size
is lower than the broadcaster - this results in the error belowpool_size
is greater than the broadcaster - this results in missed messagesThe issue stems from forwarding the pool_size based on the broadcaster - https://github.com/phoenixframework/phoenix_pubsub/blob/master/lib/phoenix/pubsub/pg2_server.ex#L36
Error:
14:41:19.297 [error] Task #PID<0.5811.0> started from Phoenix.PubSub.PubSub terminating
** (MatchError) no match of right hand side value: lib/phoenix/pubsub/local.ex:228: Phoenix.PubSub.Local.pools_for_shard/2
(phoenix_pubsub) lib/phoenix/pubsub/local.ex:223: Phoenix.PubSub.Local.local_for_shard/2
(phoenix_pubsub) lib/phoenix/pubsub/local.ex:152: Phoenix.PubSub.Local.subscribers_with_fastlanes/3
(phoenix_pubsub) lib/phoenix/pubsub/local.ex:112: Phoenix.PubSub.Local.do_broadcast/6
(phoenix_pubsub) lib/phoenix/pubsub/local.ex:103: anonymous fn/7 in Phoenix.PubSub.Local.broadcast/6
(elixir) lib/task/supervised.ex:94: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:45: Task.Supervised.reply/5
(stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Function: #Function<4.20419525/0 in Phoenix.PubSub.Local.broadcast/6>
Args: []
I am getting this:
Enum.into/2 or "for" comprehensions with an :into option is incorrect when collecting into non-empty lists. If you're collecting into a non-empty keyword list, consider using Keyword.merge/2 instead. If you're collecting into a non-empty list, consider concatenating the two lists with the ++ operator.
(elixir) lib/collectable.ex:83: Collectable.List.into/1
(phoenix_pubsub) lib/phoenix/tracker/state.ex:380: Phoenix.Tracker.State.merge_deltas/2
(phoenix_pubsub) lib/phoenix/tracker/delta_generation.ex:34: Phoenix.Tracker.DeltaGeneration.do_push/4
(phoenix_pubsub) lib/phoenix/tracker/shard.ex:519: Phoenix.Tracker.Shard.push_delta_generation/2
(phoenix_pubsub) lib/phoenix/tracker/shard.ex:169: Phoenix.Tracker.Shard.handle_info/2
With elixir 1.8 and phoenux_pubsub 1.1.1
Hi Guys , I’m using phoenix_pubsub in a project, I’ve pushed to a production app and I’m doing some load testing, sadly I getting a lot of errors , one of these is
function Enumerable.MapSet.reduce/3 is undefined or private. Did you mean one of:
* reduce/3
the trace tells that the error is from lib/phoenix/tracker/state.ex:362:
then I've remove the build and re compile all , the same error persist but with different trace:
function Enumerable.MapSet.reduce/3 is undefined (module Enumerable.MapSet is not available)
I'm using:
Erlang/OTP 19 [erts-8.1] [source] [64-bit] [async-threads:10] [kernel-poll:false]
Elixir 1.3.4
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.