derekkraan / delta_crdt_ex Goto Github PK
View Code? Open in Web Editor NEWUse DeltaCrdt to build distributed applications in Elixir
License: MIT License
Use DeltaCrdt to build distributed applications in Elixir
License: MIT License
I verified that the DeltaCrdt.Storage Behavior used for persistence is specifying a type called storage_format with the following representation:
defmodule DeltaCrdt.Storage do
@moduledoc """
This behaviour can be used to enable persistence of the CRDT.
This can be helpful in the event of crashes.
To use, implement this behaviour in a module, and pass it to your CRDT with the `storage_module` option.
"""
@type t :: module()
@opaque storage_format ::
{node_id :: term(), sequence_number :: integer(), crdt_state :: term()}
@callback write(name :: term(), storage_format()) :: :ok
@callback read(name :: term()) :: storage_format() | nil
end
Note that the type is basically composed of {node_id :: term(), sequence_number :: integer(), crdt_state :: term()} but checking where this behavior is called I noticed that actually the expected return pattern of the read function is different from the type specified in the behavior. See in DeltaCrdt.CausalCrd:
defp read_from_storage(state) do
case state.storage_module.read(state.name) do
nil ->
state
{node_id, sequence_number, crdt_state, merkle_map} ->
Map.put(state, :sequence_number, sequence_number)
|> Map.put(:crdt_state, crdt_state)
|> Map.put(:merkle_map, merkle_map)
|> Map.put(:node_id, node_id)
|> remove_crdt_state_keys()
end
end
In this case the expected return type pattern is {node_id, sequence_number, crdt_state, merkle_map}.
My first question is what is the correct format?
I think it's better to add an issue here than merkle_map
or horde
, because the merkle_map
is another kind of a map and horde
leverages this library.
It looks the DeltaCrdt
uses atom-type keys to propagates data, like %{"CRDT" => "is magic!"}
. src
That means whenever a mutation is occurred, a new atom can be created across the nodes which never garbage collected.
How about broadcasting data across the nodes to their own Agent to avoid bloating atom table?
Hi Derek,
using this via your horde library. I am getting errors every 20 seconds when running - two messages each time about discarding messages. Are these important or is there a way i can switch them off as they are generating lots in the logs.
This is being run on my home cluster of 36 raspberry pi's with horde. Horde seems happy in clustering the registry and the supervisors but delta_CRDT has started throwing errors. It never did this previously that i have noticed. I am using the horde version you pointed me to on git not the hex version.
Thanks
Tom
PS sorry for the long error trail below.
11:15:28.237 [error] Discarding message {delta,{<0.206.0>,<0.206.0>,#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>#{'struct'=>'Elixir.DeltaCrdt.CausalContext',dots=>#{'struct'=>'Elixir.MapSet',map=>#{<8860>{<820>{[{723330950,0}],[{259073618,0}]},[{886054732,0}],[{90994585,0}],[{212397740,0}]},{<908>{[{224541245,0}],[{354815956,0}],[{501767019,0}]},[{644668926,0}],[{884763753,0}],[{680172344,0}]},<1020>{<4080>{[{608052787,0}],[{87023067,0}]},<2804>{[{992209432,0}],[{85815313,0}],[{107656322,0}]}},<10a1>{[{761337020,0}],[{57681452,0}],[{312110877,0}],[{561210601,0}]},<8244>{[{149594868,0}],[{529351108,0}],[{231569336,0}],<2210>{[{477061609,0}],[{617103734,0}],[{270793753,0}]}},<8d00>{[{220838726,0}],[{893132601,0}],[{678561457,0}],[{79767290,0}]},<6502>{[{560938324,0}],[{823104459,0}],<81>{[{768249285,0}],[{453473496,0}]},[{471979944,0}],[{465765109,0}]},<961>{[{154501068,0}],[{57131885,0}],[{832826349,0}],[{574622686,0}],[{853838025,0}]},<68>{[{231644276,0}],[{909871415,0}],[{584834511,0}]},[{978167393,0}],<2509>{[{283646442,0}],[{368901591,0}],[{980630187,0}],[{695829055,0}],[{421117301,0}]},<6093>{<420>{[{261846720,0}],[{236507057,0}]},[{151955626,0}],<8010>{[{256799718,0}],[{915189138,0}]},[{730824296,0}],[{879988380,0}],<3000>{[{865384709,0}],[{254696481,0}]}},<8101>{{[{918232091,0}],[{162637692,0}]},[{292687244,0}],[{728559197,0}]},<132>{[{764991069,0}],<82>{[{881771793,0}],[{64290749,0}]},[{470913543,0}],[{516674924,0}]},<6810>{[{1309607,0}],[{168459593,0}],[{176508547,0}],[{811735860,0}]},<1900>{[{128620661,0}],[{505929147,0}],[{665961286,0}]}},version=>2},maxima=>#{[212397740|0],<4831>{[730824296|0],[224541245|0],[680172344|0],[107656322|0],[695829055|0]},<8285>{[915189138|0],[678561457|0],[893132601|0],<1000>{<420>{[236507057|0],[884763753|0]}},[879988380|0]},<41>{[978167393|0],[881771793|0]},<8402>{[259073618|0],[561210601|0],<8040>{[254696481|0],[477061609|0]}},<820>{[270793753|0],[761337020|0]},<908>{[256799718|0],[421117301|0],[832826349|0]},<8304>{<108>{[918232091|0],[617103734|0]},[231644276|0],[465765109|0],[723330950|0]},<100>{<200>{<4040>{[176508547|0],[980630187|0]}}},<4>{{[220838726|0],[168459593|0]}},<622>{[768249285|0],[453473496|0],[64290749|0],[853838025|0]},{<1140>{[644668926|0],[823104459|0],[728559197|0]},<1020>{[283646442|0],[354815956|0]},<4200>{[162637692|0],[811735860|0]},[529351108|0],[471979944|0],{[231569336|0],[470913543|0]},[151955626|0],[261846720|0]},{<4021>{[865384709|0],[368901591|0],[574622686|0]},[312110877|0],[128620661|0],[886054732|0],[57131885|0],[501767019|0]},<8419>{[57681452|0],[1309607|0],[584834511|0],<10a>{[87023067|0],[154501068|0],[560938324|0]},[992209432|0]},<9e00>{<80>{<600>{[764991069|0],[149594868|0]}},<440>{[90994585|0],[665961286|0]},[292687244|0],[79767290|0],[608052787|0]},<806>{[909871415|0],<402>{[85815313|0],[505929147|0]},[516674924|0]}}},keys=>#{'struct'=>'Elixir.MapSet',map=>#{<86b>{[<<"E7IwHHDqhp6vvpRUM6MlMA==">>],[<<"7KBMQSaU5KWiya9f55tqfw==">>],[<<"JncEdzW3VrscpQ2JFTxvjw==">>],<8200>{[<<"omUBFLuljKPTngAuCrdcgQ==">>],[<<"KYi1jL9KgYYJmVhUg7VEEA==">>]},[<<"L4J/xHaWgMc5GP/jH+Z7Cg==">>],[<<"vIOI2DtF+dFSxkUb6LJjzQ==">>]},<62>{[<<"N4D4LXoPQtXZQ/LcDhYPAg==">>],[<<"xI0J2/7T7doP1S/g/STsOA==">>],[<<"VaAiyjQsJA40klMf/eUXAQ==">>]},<13dc>{[<<"vs6XYFGMO4c0xWXOHoranQ==">>],[<<"EG6Dn8s2BkP8hs4vSHhhog==">>],[<<"GH5qGpg3YtbvAzst4L+S7Q==">>],[<<"aegaOhJy+xYveh4QiTV9Zw==">>],[<<"+giKoQjVw1PZ+qckZutBGg==">>],[<<"HoaL35929EgB4G8VV9qGcg==">>],[<<"QCl2IRQvsoUjCkUgtv97Bw==">>],[<<"Gc7BF/gsPWlDxROSKpWGFQ==">>]},<124>{[<<"lWfnOUfDuihsDvx8Ptg0Cw==">>],[<<"kHSSYH8OBR90cGsUDgomgQ==">>],[<<"WXYHwgNkIo1zEmmhg1XxuA==">>]},<8398>{[<<"i3KHrvu/RYcRa+F8bMq/kA==">>],<4100>{[<<"KVmpcyqCWb6sjK2bBtNVkg==">>],[<<"5NXXgn+6QsHlpdHFhczkyQ==">>]},[<<"AL2KAUkHUJIAioHEHNylWg==">>],[<<"IAXuuAwVgOb6Hs5d66Ez7g==">>],[<<"z4pTEg4uZAgOHrfkggYtUQ==">>],[<<"2YymVa5kjrdxoX670G71ug==">>]},<81>{[<<"QsBS2ijaCj6/Me1atTAjcw==">>],[<<"s0UN5TTaPEjIuuClNWYvbQ==">>]},<4004>{[<<"Bw3eewoJ391zBrw3jdeBtQ==">>],[<<"ct8GOYAqUw4yoZdnraGcpg==">>]},<200c>{<9040>{[<<"L7Zf1Bctzu+cJ++DMuFlEA==">>],[<<"tc4SkitIUX7i3plC0c2EdQ==">>],[<<"bPi3efdC6COk0MdlZCAYHw==">>]},[<<"ihgQIxasjv3Sz16Fo3+Spw==">>],[<<"Ayil53y8WMhg9VpG769onw==">>]},<3020>{[<<"X5+ezgknmNKlzQeYm7mNsw==">>],[<<"paOMJws/ADcpR0T0tI4uSA==">>],<6>{[<<"gfPJsQIhFvPQtRh8pHcpYg==">>],[<<"8JR30qjVqfQDhXjsRag2nA==">>]}},{[<<"RLCPyVMsvvppsHESrLmB8w==">>],[<<"MogCb6XFLvfYSgUe4t2b0g==">>],[<<"iZzEyKzfmhHEU11SWQI6rA==">>],[<<"FLTu2ntWXclkPdQ9lE5oPQ==">>],[<<"19GGsvcmcFAbIzfTDJiodQ==">>],[<<"UANIPQ3h+1wQ6bO9WeitXw==">>],[<<"JwvHI0Ag7F+r1vlXASxe2g==">>],[<<"w6u/5Bj3AObv/mZBwbzOGw==">>]},{[<<"kHmw2fwtBkFQfbDstJQw0A==">>],[<<"FOj542MooTBHj1RC8gq7wg==">>],[<<"9rTi4B1JHGcvhEWKlzl2zA==">>],[<<"pugBOnOyEdZ3g6rjlkPzTQ==">>],[<<"bXH1+JGYkBQFVxLncuI1Uw==">>],[<<"aRYNumxyw17M2zeaH5NkhQ==">>]},<41ab>{<810>{[<<"2KkSf8K+nDiYvnCuz2SxOQ==">>],[<<"fbUHyegwmW9s4DwsI8ULMg==">>]},[<<"X16Tox5Eb2VDB46ZhtnI+A==">>],[<<"iBzucoTTtteDOYLroe9IUA==">>],[<<"t5eVQRT17UEaaIWpSWHf6w==">>],[<<"NJ7t+md6d03vQA0aiboMmw==">>],[<<"N2O5RUXRX3iepElrj4bkNQ==">>],[<<"tDnpkgddLS8hs4Eo//RP+g==">>]},<2281>{<120>{[<<"u/A1k4eVPXp2IGC650elrg==">>],[<<"NmgLfaeL9oXtNf/0xtxa8A==">>]},[<<"blls+PxiO5+pLtK2+J83KQ==">>],[<<"/VnIAo0aXHie97K3aKg8IQ==">>],[<<"MWtaw57inYi8HkqFECtPtw==">>]},<8200>{[<<"bkw5LG/wZITD2APxXdX1/g==">>],[<<"ZzpJWrIJLyL5tn+K/6ylCg==">>]},<138>{[<<"/lrbghCe8w1yEOSYAwwo1g==">>],[<<"lwuNVhRGi1MWO2nNYWontw==">>],[<<"dNmIwNZ1zH4B2dHzMn4G1A==">>],[<<"YJeav1KsYVbWQ4hh7HtxEg==">>]},[<<"aapzz1MFCJiejHustgK5hw==">>]},version=>2},state=>#{<86b>{[<<"E7IwHHDqhp6vvpRUM6MlMA==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16436.256.0>,<16436.260.0>},1534759592748297642}=>[]},version=>2},state=>#{{{<16436.256.0>,<16436.260.0>},1534759592748297642}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{87023067,0}=>[]},version=>2}}}}],[<<"7KBMQSaU5KWiya9f55tqfw==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16418.349.0>,<16418.353.0>},1534759694496164977}=>[]},version=>2},state=>#{{{<16418.349.0>,<16418.353.0>},1534759694496164977}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{992209432,0}=>[]},version=>2}}}}],[<<"JncEdzW3VrscpQ2JFTxvjw==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16407.349.0>,<16407.355.0>},1534759700289044705}=>[]},version=>2},state=>#{{{<16407.349.0>,<16407.355.0>},1534759700289044705}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{79767290,0}=>[]},version=>2}}}}],<8200>{[<<"omUBFLuljKPTngAuCrdcgQ==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16426.187.0>,<16426.191.0>},1534759134590351593}=>[]},version=>2},state=>#{{{<16426.187.0>,<16426.191.0>},1534759134590351593}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{453473496,0}=>[]},version=>2}}}}],[<<"KYi1jL9KgYYJmVhUg7VEEA==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16434.187.0>,<16434.191.0>},1534759584572105562}=>[]},version=>2},state=>#{{{<16434.187.0>,<16434.191.0>},1534759584572105562}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{283646442,0}=>[]},version=>2}}}}]},[<<"L4J/xHaWgMc5GP/jH+Z7Cg==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16436.187.0>,<16436.191.0>},1534759132671404719}=>[]},version=>2},state=>#{{{<16436.187.0>,<16436.191.0>},1534759132671404719}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,stat (truncated)
11:15:48.245 [error] Discarding message {delta,{<0.206.0>,<0.249.0>,#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>#{'struct'=>'Elixir.DeltaCrdt.CausalContext',dots=>#{'struct'=>'Elixir.MapSet',map=>#{<8860>{<820>{[{723330950,0}],[{259073618,0}]},[{886054732,0}],[{90994585,0}],[{212397740,0}]},{<908>{[{224541245,0}],[{354815956,0}],[{501767019,0}]},[{644668926,0}],[{884763753,0}],[{680172344,0}]},<1020>{<4080>{[{608052787,0}],[{87023067,0}]},<2804>{[{992209432,0}],[{85815313,0}],[{107656322,0}]}},<10a1>{[{761337020,0}],[{57681452,0}],[{312110877,0}],[{561210601,0}]},<8244>{[{149594868,0}],[{529351108,0}],[{231569336,0}],<2210>{[{477061609,0}],[{617103734,0}],[{270793753,0}]}},<8d00>{[{220838726,0}],[{893132601,0}],[{678561457,0}],[{79767290,0}]},<6502>{[{560938324,0}],[{823104459,0}],<81>{[{768249285,0}],[{453473496,0}]},[{471979944,0}],[{465765109,0}]},<961>{[{154501068,0}],[{57131885,0}],[{832826349,0}],[{574622686,0}],[{853838025,0}]},<68>{[{231644276,0}],[{909871415,0}],[{584834511,0}]},[{978167393,0}],<2509>{[{283646442,0}],[{368901591,0}],[{980630187,0}],[{695829055,0}],[{421117301,0}]},<6093>{<420>{[{261846720,0}],[{236507057,0}]},[{151955626,0}],<8010>{[{256799718,0}],[{915189138,0}]},[{730824296,0}],[{879988380,0}],<3000>{[{865384709,0}],[{254696481,0}]}},<8101>{{[{918232091,0}],[{162637692,0}]},[{292687244,0}],[{728559197,0}]},<132>{[{764991069,0}],<82>{[{881771793,0}],[{64290749,0}]},[{470913543,0}],[{516674924,0}]},<6810>{[{1309607,0}],[{168459593,0}],[{176508547,0}],[{811735860,0}]},<1900>{[{128620661,0}],[{505929147,0}],[{665961286,0}]}},version=>2},maxima=>#{[212397740|0],<4831>{[730824296|0],[224541245|0],[680172344|0],[107656322|0],[695829055|0]},<8285>{[915189138|0],[678561457|0],[893132601|0],<1000>{<420>{[236507057|0],[884763753|0]}},[879988380|0]},<41>{[978167393|0],[881771793|0]},<8402>{[259073618|0],[561210601|0],<8040>{[254696481|0],[477061609|0]}},<820>{[270793753|0],[761337020|0]},<908>{[256799718|0],[421117301|0],[832826349|0]},<8304>{<108>{[918232091|0],[617103734|0]},[231644276|0],[465765109|0],[723330950|0]},<100>{<200>{<4040>{[176508547|0],[980630187|0]}}},<4>{{[220838726|0],[168459593|0]}},<622>{[768249285|0],[453473496|0],[64290749|0],[853838025|0]},{<1140>{[644668926|0],[823104459|0],[728559197|0]},<1020>{[283646442|0],[354815956|0]},<4200>{[162637692|0],[811735860|0]},[529351108|0],[471979944|0],{[231569336|0],[470913543|0]},[151955626|0],[261846720|0]},{<4021>{[865384709|0],[368901591|0],[574622686|0]},[312110877|0],[128620661|0],[886054732|0],[57131885|0],[501767019|0]},<8419>{[57681452|0],[1309607|0],[584834511|0],<10a>{[87023067|0],[154501068|0],[560938324|0]},[992209432|0]},<9e00>{<80>{<600>{[764991069|0],[149594868|0]}},<440>{[90994585|0],[665961286|0]},[292687244|0],[79767290|0],[608052787|0]},<806>{[909871415|0],<402>{[85815313|0],[505929147|0]},[516674924|0]}}},keys=>#{'struct'=>'Elixir.MapSet',map=>#{<86b>{[<<"E7IwHHDqhp6vvpRUM6MlMA==">>],[<<"7KBMQSaU5KWiya9f55tqfw==">>],[<<"JncEdzW3VrscpQ2JFTxvjw==">>],<8200>{[<<"omUBFLuljKPTngAuCrdcgQ==">>],[<<"KYi1jL9KgYYJmVhUg7VEEA==">>]},[<<"L4J/xHaWgMc5GP/jH+Z7Cg==">>],[<<"vIOI2DtF+dFSxkUb6LJjzQ==">>]},<62>{[<<"N4D4LXoPQtXZQ/LcDhYPAg==">>],[<<"xI0J2/7T7doP1S/g/STsOA==">>],[<<"VaAiyjQsJA40klMf/eUXAQ==">>]},<13dc>{[<<"vs6XYFGMO4c0xWXOHoranQ==">>],[<<"EG6Dn8s2BkP8hs4vSHhhog==">>],[<<"GH5qGpg3YtbvAzst4L+S7Q==">>],[<<"aegaOhJy+xYveh4QiTV9Zw==">>],[<<"+giKoQjVw1PZ+qckZutBGg==">>],[<<"HoaL35929EgB4G8VV9qGcg==">>],[<<"QCl2IRQvsoUjCkUgtv97Bw==">>],[<<"Gc7BF/gsPWlDxROSKpWGFQ==">>]},<124>{[<<"lWfnOUfDuihsDvx8Ptg0Cw==">>],[<<"kHSSYH8OBR90cGsUDgomgQ==">>],[<<"WXYHwgNkIo1zEmmhg1XxuA==">>]},<8398>{[<<"i3KHrvu/RYcRa+F8bMq/kA==">>],<4100>{[<<"KVmpcyqCWb6sjK2bBtNVkg==">>],[<<"5NXXgn+6QsHlpdHFhczkyQ==">>]},[<<"AL2KAUkHUJIAioHEHNylWg==">>],[<<"IAXuuAwVgOb6Hs5d66Ez7g==">>],[<<"z4pTEg4uZAgOHrfkggYtUQ==">>],[<<"2YymVa5kjrdxoX670G71ug==">>]},<81>{[<<"QsBS2ijaCj6/Me1atTAjcw==">>],[<<"s0UN5TTaPEjIuuClNWYvbQ==">>]},<4004>{[<<"Bw3eewoJ391zBrw3jdeBtQ==">>],[<<"ct8GOYAqUw4yoZdnraGcpg==">>]},<200c>{<9040>{[<<"L7Zf1Bctzu+cJ++DMuFlEA==">>],[<<"tc4SkitIUX7i3plC0c2EdQ==">>],[<<"bPi3efdC6COk0MdlZCAYHw==">>]},[<<"ihgQIxasjv3Sz16Fo3+Spw==">>],[<<"Ayil53y8WMhg9VpG769onw==">>]},<3020>{[<<"X5+ezgknmNKlzQeYm7mNsw==">>],[<<"paOMJws/ADcpR0T0tI4uSA==">>],<6>{[<<"gfPJsQIhFvPQtRh8pHcpYg==">>],[<<"8JR30qjVqfQDhXjsRag2nA==">>]}},{[<<"RLCPyVMsvvppsHESrLmB8w==">>],[<<"MogCb6XFLvfYSgUe4t2b0g==">>],[<<"iZzEyKzfmhHEU11SWQI6rA==">>],[<<"FLTu2ntWXclkPdQ9lE5oPQ==">>],[<<"19GGsvcmcFAbIzfTDJiodQ==">>],[<<"UANIPQ3h+1wQ6bO9WeitXw==">>],[<<"JwvHI0Ag7F+r1vlXASxe2g==">>],[<<"w6u/5Bj3AObv/mZBwbzOGw==">>]},{[<<"kHmw2fwtBkFQfbDstJQw0A==">>],[<<"FOj542MooTBHj1RC8gq7wg==">>],[<<"9rTi4B1JHGcvhEWKlzl2zA==">>],[<<"pugBOnOyEdZ3g6rjlkPzTQ==">>],[<<"bXH1+JGYkBQFVxLncuI1Uw==">>],[<<"aRYNumxyw17M2zeaH5NkhQ==">>]},<41ab>{<810>{[<<"2KkSf8K+nDiYvnCuz2SxOQ==">>],[<<"fbUHyegwmW9s4DwsI8ULMg==">>]},[<<"X16Tox5Eb2VDB46ZhtnI+A==">>],[<<"iBzucoTTtteDOYLroe9IUA==">>],[<<"t5eVQRT17UEaaIWpSWHf6w==">>],[<<"NJ7t+md6d03vQA0aiboMmw==">>],[<<"N2O5RUXRX3iepElrj4bkNQ==">>],[<<"tDnpkgddLS8hs4Eo//RP+g==">>]},<2281>{<120>{[<<"u/A1k4eVPXp2IGC650elrg==">>],[<<"NmgLfaeL9oXtNf/0xtxa8A==">>]},[<<"blls+PxiO5+pLtK2+J83KQ==">>],[<<"/VnIAo0aXHie97K3aKg8IQ==">>],[<<"MWtaw57inYi8HkqFECtPtw==">>]},<8200>{[<<"bkw5LG/wZITD2APxXdX1/g==">>],[<<"ZzpJWrIJLyL5tn+K/6ylCg==">>]},<138>{[<<"/lrbghCe8w1yEOSYAwwo1g==">>],[<<"lwuNVhRGi1MWO2nNYWontw==">>],[<<"dNmIwNZ1zH4B2dHzMn4G1A==">>],[<<"YJeav1KsYVbWQ4hh7HtxEg==">>]},[<<"aapzz1MFCJiejHustgK5hw==">>]},version=>2},state=>#{<86b>{[<<"E7IwHHDqhp6vvpRUM6MlMA==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16436.256.0>,<16436.260.0>},1534759592748297642}=>[]},version=>2},state=>#{{{<16436.256.0>,<16436.260.0>},1534759592748297642}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{87023067,0}=>[]},version=>2}}}}],[<<"7KBMQSaU5KWiya9f55tqfw==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16418.349.0>,<16418.353.0>},1534759694496164977}=>[]},version=>2},state=>#{{{<16418.349.0>,<16418.353.0>},1534759694496164977}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{992209432,0}=>[]},version=>2}}}}],[<<"JncEdzW3VrscpQ2JFTxvjw==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16407.349.0>,<16407.355.0>},1534759700289044705}=>[]},version=>2},state=>#{{{<16407.349.0>,<16407.355.0>},1534759700289044705}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{79767290,0}=>[]},version=>2}}}}],<8200>{[<<"omUBFLuljKPTngAuCrdcgQ==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16426.187.0>,<16426.191.0>},1534759134590351593}=>[]},version=>2},state=>#{{{<16426.187.0>,<16426.191.0>},1534759134590351593}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{453473496,0}=>[]},version=>2}}}}],[<<"KYi1jL9KgYYJmVhUg7VEEA==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16434.187.0>,<16434.191.0>},1534759584572105562}=>[]},version=>2},state=>#{{{<16434.187.0>,<16434.191.0>},1534759584572105562}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{283646442,0}=>[]},version=>2}}}}]},[<<"L4J/xHaWgMc5GP/jH+Z7Cg==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16436.187.0>,<16436.191.0>},1534759132671404719}=>[]},version=>2},state=>#{{{<16436.187.0>,<16436.191.0>},1534759132671404719}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,stat (truncated)
11:15:48.248 [error] Discarding message {delta,{<0.206.0>,<0.206.0>,#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>#{'struct'=>'Elixir.DeltaCrdt.CausalContext',dots=>#{'struct'=>'Elixir.MapSet',map=>#{<8860>{<820>{[{723330950,0}],[{259073618,0}]},[{886054732,0}],[{90994585,0}],[{212397740,0}]},{<908>{[{224541245,0}],[{354815956,0}],[{501767019,0}]},[{644668926,0}],[{884763753,0}],[{680172344,0}]},<1020>{<4080>{[{608052787,0}],[{87023067,0}]},<2804>{[{992209432,0}],[{85815313,0}],[{107656322,0}]}},<10a1>{[{761337020,0}],[{57681452,0}],[{312110877,0}],[{561210601,0}]},<8244>{[{149594868,0}],[{529351108,0}],[{231569336,0}],<2210>{[{477061609,0}],[{617103734,0}],[{270793753,0}]}},<8d00>{[{220838726,0}],[{893132601,0}],[{678561457,0}],[{79767290,0}]},<6502>{[{560938324,0}],[{823104459,0}],<81>{[{768249285,0}],[{453473496,0}]},[{471979944,0}],[{465765109,0}]},<961>{[{154501068,0}],[{57131885,0}],[{832826349,0}],[{574622686,0}],[{853838025,0}]},<68>{[{231644276,0}],[{909871415,0}],[{584834511,0}]},[{978167393,0}],<2509>{[{283646442,0}],[{368901591,0}],[{980630187,0}],[{695829055,0}],[{421117301,0}]},<6093>{<420>{[{261846720,0}],[{236507057,0}]},[{151955626,0}],<8010>{[{256799718,0}],[{915189138,0}]},[{730824296,0}],[{879988380,0}],<3000>{[{865384709,0}],[{254696481,0}]}},<8101>{{[{918232091,0}],[{162637692,0}]},[{292687244,0}],[{728559197,0}]},<132>{[{764991069,0}],<82>{[{881771793,0}],[{64290749,0}]},[{470913543,0}],[{516674924,0}]},<6810>{[{1309607,0}],[{168459593,0}],[{176508547,0}],[{811735860,0}]},<1900>{[{128620661,0}],[{505929147,0}],[{665961286,0}]}},version=>2},maxima=>#{[212397740|0],<4831>{[730824296|0],[224541245|0],[680172344|0],[107656322|0],[695829055|0]},<8285>{[915189138|0],[678561457|0],[893132601|0],<1000>{<420>{[236507057|0],[884763753|0]}},[879988380|0]},<41>{[978167393|0],[881771793|0]},<8402>{[259073618|0],[561210601|0],<8040>{[254696481|0],[477061609|0]}},<820>{[270793753|0],[761337020|0]},<908>{[256799718|0],[421117301|0],[832826349|0]},<8304>{<108>{[918232091|0],[617103734|0]},[231644276|0],[465765109|0],[723330950|0]},<100>{<200>{<4040>{[176508547|0],[980630187|0]}}},<4>{{[220838726|0],[168459593|0]}},<622>{[768249285|0],[453473496|0],[64290749|0],[853838025|0]},{<1140>{[644668926|0],[823104459|0],[728559197|0]},<1020>{[283646442|0],[354815956|0]},<4200>{[162637692|0],[811735860|0]},[529351108|0],[471979944|0],{[231569336|0],[470913543|0]},[151955626|0],[261846720|0]},{<4021>{[865384709|0],[368901591|0],[574622686|0]},[312110877|0],[128620661|0],[886054732|0],[57131885|0],[501767019|0]},<8419>{[57681452|0],[1309607|0],[584834511|0],<10a>{[87023067|0],[154501068|0],[560938324|0]},[992209432|0]},<9e00>{<80>{<600>{[764991069|0],[149594868|0]}},<440>{[90994585|0],[665961286|0]},[292687244|0],[79767290|0],[608052787|0]},<806>{[909871415|0],<402>{[85815313|0],[505929147|0]},[516674924|0]}}},keys=>#{'struct'=>'Elixir.MapSet',map=>#{<86b>{[<<"E7IwHHDqhp6vvpRUM6MlMA==">>],[<<"7KBMQSaU5KWiya9f55tqfw==">>],[<<"JncEdzW3VrscpQ2JFTxvjw==">>],<8200>{[<<"omUBFLuljKPTngAuCrdcgQ==">>],[<<"KYi1jL9KgYYJmVhUg7VEEA==">>]},[<<"L4J/xHaWgMc5GP/jH+Z7Cg==">>],[<<"vIOI2DtF+dFSxkUb6LJjzQ==">>]},<62>{[<<"N4D4LXoPQtXZQ/LcDhYPAg==">>],[<<"xI0J2/7T7doP1S/g/STsOA==">>],[<<"VaAiyjQsJA40klMf/eUXAQ==">>]},<13dc>{[<<"vs6XYFGMO4c0xWXOHoranQ==">>],[<<"EG6Dn8s2BkP8hs4vSHhhog==">>],[<<"GH5qGpg3YtbvAzst4L+S7Q==">>],[<<"aegaOhJy+xYveh4QiTV9Zw==">>],[<<"+giKoQjVw1PZ+qckZutBGg==">>],[<<"HoaL35929EgB4G8VV9qGcg==">>],[<<"QCl2IRQvsoUjCkUgtv97Bw==">>],[<<"Gc7BF/gsPWlDxROSKpWGFQ==">>]},<124>{[<<"lWfnOUfDuihsDvx8Ptg0Cw==">>],[<<"kHSSYH8OBR90cGsUDgomgQ==">>],[<<"WXYHwgNkIo1zEmmhg1XxuA==">>]},<8398>{[<<"i3KHrvu/RYcRa+F8bMq/kA==">>],<4100>{[<<"KVmpcyqCWb6sjK2bBtNVkg==">>],[<<"5NXXgn+6QsHlpdHFhczkyQ==">>]},[<<"AL2KAUkHUJIAioHEHNylWg==">>],[<<"IAXuuAwVgOb6Hs5d66Ez7g==">>],[<<"z4pTEg4uZAgOHrfkggYtUQ==">>],[<<"2YymVa5kjrdxoX670G71ug==">>]},<81>{[<<"QsBS2ijaCj6/Me1atTAjcw==">>],[<<"s0UN5TTaPEjIuuClNWYvbQ==">>]},<4004>{[<<"Bw3eewoJ391zBrw3jdeBtQ==">>],[<<"ct8GOYAqUw4yoZdnraGcpg==">>]},<200c>{<9040>{[<<"L7Zf1Bctzu+cJ++DMuFlEA==">>],[<<"tc4SkitIUX7i3plC0c2EdQ==">>],[<<"bPi3efdC6COk0MdlZCAYHw==">>]},[<<"ihgQIxasjv3Sz16Fo3+Spw==">>],[<<"Ayil53y8WMhg9VpG769onw==">>]},<3020>{[<<"X5+ezgknmNKlzQeYm7mNsw==">>],[<<"paOMJws/ADcpR0T0tI4uSA==">>],<6>{[<<"gfPJsQIhFvPQtRh8pHcpYg==">>],[<<"8JR30qjVqfQDhXjsRag2nA==">>]}},{[<<"RLCPyVMsvvppsHESrLmB8w==">>],[<<"MogCb6XFLvfYSgUe4t2b0g==">>],[<<"iZzEyKzfmhHEU11SWQI6rA==">>],[<<"FLTu2ntWXclkPdQ9lE5oPQ==">>],[<<"19GGsvcmcFAbIzfTDJiodQ==">>],[<<"UANIPQ3h+1wQ6bO9WeitXw==">>],[<<"JwvHI0Ag7F+r1vlXASxe2g==">>],[<<"w6u/5Bj3AObv/mZBwbzOGw==">>]},{[<<"kHmw2fwtBkFQfbDstJQw0A==">>],[<<"FOj542MooTBHj1RC8gq7wg==">>],[<<"9rTi4B1JHGcvhEWKlzl2zA==">>],[<<"pugBOnOyEdZ3g6rjlkPzTQ==">>],[<<"bXH1+JGYkBQFVxLncuI1Uw==">>],[<<"aRYNumxyw17M2zeaH5NkhQ==">>]},<41ab>{<810>{[<<"2KkSf8K+nDiYvnCuz2SxOQ==">>],[<<"fbUHyegwmW9s4DwsI8ULMg==">>]},[<<"X16Tox5Eb2VDB46ZhtnI+A==">>],[<<"iBzucoTTtteDOYLroe9IUA==">>],[<<"t5eVQRT17UEaaIWpSWHf6w==">>],[<<"NJ7t+md6d03vQA0aiboMmw==">>],[<<"N2O5RUXRX3iepElrj4bkNQ==">>],[<<"tDnpkgddLS8hs4Eo//RP+g==">>]},<2281>{<120>{[<<"u/A1k4eVPXp2IGC650elrg==">>],[<<"NmgLfaeL9oXtNf/0xtxa8A==">>]},[<<"blls+PxiO5+pLtK2+J83KQ==">>],[<<"/VnIAo0aXHie97K3aKg8IQ==">>],[<<"MWtaw57inYi8HkqFECtPtw==">>]},<8200>{[<<"bkw5LG/wZITD2APxXdX1/g==">>],[<<"ZzpJWrIJLyL5tn+K/6ylCg==">>]},<138>{[<<"/lrbghCe8w1yEOSYAwwo1g==">>],[<<"lwuNVhRGi1MWO2nNYWontw==">>],[<<"dNmIwNZ1zH4B2dHzMn4G1A==">>],[<<"YJeav1KsYVbWQ4hh7HtxEg==">>]},[<<"aapzz1MFCJiejHustgK5hw==">>]},version=>2},state=>#{<86b>{[<<"E7IwHHDqhp6vvpRUM6MlMA==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16436.256.0>,<16436.260.0>},1534759592748297642}=>[]},version=>2},state=>#{{{<16436.256.0>,<16436.260.0>},1534759592748297642}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{87023067,0}=>[]},version=>2}}}}],[<<"7KBMQSaU5KWiya9f55tqfw==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16418.349.0>,<16418.353.0>},1534759694496164977}=>[]},version=>2},state=>#{{{<16418.349.0>,<16418.353.0>},1534759694496164977}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{992209432,0}=>[]},version=>2}}}}],[<<"JncEdzW3VrscpQ2JFTxvjw==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16407.349.0>,<16407.355.0>},1534759700289044705}=>[]},version=>2},state=>#{{{<16407.349.0>,<16407.355.0>},1534759700289044705}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{79767290,0}=>[]},version=>2}}}}],<8200>{[<<"omUBFLuljKPTngAuCrdcgQ==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16426.187.0>,<16426.191.0>},1534759134590351593}=>[]},version=>2},state=>#{{{<16426.187.0>,<16426.191.0>},1534759134590351593}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{453473496,0}=>[]},version=>2}}}}],[<<"KYi1jL9KgYYJmVhUg7VEEA==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16434.187.0>,<16434.191.0>},1534759584572105562}=>[]},version=>2},state=>#{{{<16434.187.0>,<16434.191.0>},1534759584572105562}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,state=>#{'struct'=>'Elixir.MapSet',map=>#{{283646442,0}=>[]},version=>2}}}}]},[<<"L4J/xHaWgMc5GP/jH+Z7Cg==">>|#{'struct'=>'Elixir.DeltaCrdt.CausalDotMap',causal_context=>nil,keys=>#{'struct'=>'Elixir.MapSet',map=>#{{{<16436.187.0>,<16436.191.0>},1534759132671404719}=>[]},version=>2},state=>#{{{<16436.187.0>,<16436.191.0>},1534759132671404719}=>#{'struct'=>'Elixir.DeltaCrdt.CausalDotSet',causal_context=>nil,stat (truncated)
Hi, thanks for the library! This is probably more my lack of understanding than an issue with your code ^^
I've noticed the following:
iex> {:ok, c1} = DeltaCrdt.start_link(AWLWWMap)
iex> {:ok, c2} = DeltaCrdt.start_link(AWLWWMap)
iex> DeltaCrdt.set_neighbours(c1, [c2])
iex> DeltaCrdt.set_neighbours(c2, [c1])
iex> for id <- 1..100_000, do: DeltaCrdt.mutate(c1, :add, [id, %{}])
iex> DeltaCrdt.read(c1) |> map_size()
18429
I would've expected 100000 - I've waited until the scheduler utilization was low but the number didn't change after this.
Then I've tried it without setting up a neighbor:
iex> # Recreating c2:
iex> {:ok, c2} = DeltaCrdt.start_link(AWLWWMap)
iex> for id <- 1..100_000, do: DeltaCrdt.mutate(c2, :add, [id, %{}])
iex> DeltaCrdt.read(c2) |> map_size()
100000
That was fast and all elements are there. When connecting this new c2
instance to the c1
, I've expected those 100000 entries to propagate from c2
to c1
, but instead:
iex(26)> DeltaCrdt.read(c2) |> map_size()
100000
iex(27)> DeltaCrdt.read(c1) |> map_size()
18429
iex(28)> DeltaCrdt.set_neighbours c2, [c1]
:ok
iex(29)> DeltaCrdt.set_neighbours c1, [c2]
:ok
iex(30)> DeltaCrdt.read(c1) |> map_size()
17629
iex(31)> DeltaCrdt.read(c2) |> map_size()
98400
iex(32)> DeltaCrdt.read(c2) |> map_size()
97400
...
So the new entries in c2
are actually removed from the map and only the 18429 items in c1
remain.
What's going on here? :)
We recently began using delta_crdt (version 0.5.1 currently) in our project, and while typically it seems to work fine, under higher mutate loads everything starts timing out. For our app this typically happens on startup, and I've seen this behavior during load tests as well.
Here's an example of what we're seeing:
| GenStateMachine {MyApp.HordeRegistry, {MyApp.Automation.TaskRunner, 21393}}
terminating ** (exit) exited in: GenServer.call(MyApp.Data.Store, {:operation, {:add, [:my_key, 1]}}, 5000)
** (EXIT) time out
(elixir) lib/gen_server.ex:1009: GenServer.call/3
(my_app) lib/routing.ex:31: anonymous fn/2 in Routing.lodge_data/1
(elixir) lib/enum.ex:1948: Enum."-reduce/3-lists^foldl/2-0-"/3
(my_app) lib/routing.ex:30: Routing.lodge_data/1
(my_app) lib/my_app/data/task_runner.ex:128: MyApp.Automation.TaskRunner.handle_event/4
(stdlib) gen_statem.erl:1147: :gen_statem.loop_state_callback/11
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
-- | --
(i changed some of the names and data for privacy concerns)
The issue itself could just be something documentation related or might require some additional code.
We are using the default of 50 milliseconds for the sync_interval, and the default timeout for mutate/3. I was also digging through the horde code and noticed you use a sync_interval of 300 in that project.
Is raising our sync_interval to be more on par with horde something that could reduce mutate timeouts, or is there a better solution here? If that's the case perhaps some more information about how to use the sync_timeout in the docs could be warranted.
I'm also wondering if perhaps adding a batch add functionality to the AWLWWMap would be something that could be used to mitigate the load spikes we're putting on the delta_crdt?
These function calls (and perhaps others) can cause freezing in a netsplit scenario. A possible solution is to move synchronization to separate processes (one per neighbour perhaps, so that one neighbour's netsplit doesn't cause problems for synchronization to other neighbours) so that we can isolate the impact of a slow call to a netsplitted neighbour.
CC @frekw
I'm running into an issue (via Horde) where I make some changes to a CRDT and then stop the local copy. In some cases, the final changes are lost (i.e. don't get shipped out to the neighbors before the local copy stops). I'm wondering if there's a way to have a stronger guarantee.
I think what I'm observing is this: Currently, DeltaCrdt.CausalCrdt.terminate/2
calls sync_interval_or_state_to_all/1
in an attempt to flush all changes. However, this ships deltas only to neighbors for which we aren't currently waiting for an outstanding ack. So it is possible for the changes not to get propagated if the CRDT is still waiting for acks.
Currently I am solving this downstream by inserting a delay between making my final change to the CRDT, and stopping the CRDT, enough time for acks to flush and the change to ship. However, it seems it would be cleaner if there was a way to "flush" all changes at the CRDT level.
First things first: I've went through the source code and noticed that the AWLWWMap
not only offers read/1
to get the whole map but also read/2
to a subset of keys.
I'd like to open a PR which uses read/2
to read a single value from the AWLWWMap
, similar to Map.get/2
. But from where I'm standing I'm under the impressions that the DeltaCrdt
module has been explicitly designed to not assume any kind of particular data structure "under the hood", which means that a DeltaCrdt.get/2
would break that assumption.
As such I'd like to ask: how could an API look like which allows to use a "more sophisticated" read
operation on the underlying data structure, such as read/2
on AWLWWMap
?
What I imagine could work is something like this:
@timeout 5_000
#...
def read(crdt), do: read(crdt, [], @timeout)
def read(crdt, args) when is_list(args), do: read(crdt, args, @timeout)
def read(crdt, timeout), do: read(crdt, [], timeout)
def read(crdt, args, timeout) do
GenServer.call(crdt, {:read, args}, timeout)
end
And adjusting the casual_crdt
GenServer
to accept a {:read, args}
message. This in turn would keep the previous API intact - making for a non breaking change - but allow for the following API usage:
iex> DeltaCrdt.mutate(crdt, :add, ["my-key", "my-value"])
iex> DeltaCrdt.mutate(crdt, :add, ["another-key", "another-value"])
iex> DeltaCrdt.read(crdt, ["my-key"])
%{"my-key" => "my-value"}
Any thoughts on this?
The tricky thing is making this work with network partitions. If a neighbouring node goes away and later comes back, and there is a gap in the deltas that we have, then we need to be able to recover from this situation.
It would be nice to be able to use DeltaCrdt transparently with the stdlib Enum module.
Hi Derek,
I was trying to write a distributed process group on top of your awesome DeltaCrdt package.
But it seems AWSet implementation is not ready yet.
I tried to add an element to an AWSet and it failed with this error: function DeltaCrdt.AWSet.minimum_deltas/2 is undefined or private
Also I can not read the state of an AWSet because read
function is not implemented for AWSet.
Am I doing something wrong or it's not ready yet?
I thought maybe I can help If you give me some hints.
Thanks for this library, super cool. I'm working towards using it and testing it out with some example scenarios.
I have the basic working by using a node monitor to handle connect/disconnect between the cluster. This means that a joined node will automatically get the AWLWW Map.
I'm doing code that looks like this:
Enum.each((1..5), fn _ ->
Enum.each((1..10000), fn i ->
DeltaCrdt.mutate(AccountLookupCache, :add, [i, i])
end)
end)
DeltaCrdt.read(AccountLookupCache, 30_000) |> Enum.count()
The read is instant on node 1, but times out after 30s on node 2. It will eventually catch up, but may take 1-2 minutes. It seems to be spending most of its time in join decomposition
Do you have any tips for this?
There doesn't seem to be an handler for the {:get, keys}
message in DeltaCrdt.CausalCrdt
, causing DeltaCrdt.get/3
to always crash. It looks like the GenServer expects to get a message in the form of {:read, keys}
instead.
Example:
iex> DeltaCrdt.get(App.Crdt, :data)
[error] GenServer App.Crdt terminating
** (FunctionClauseError) no function clause matching in DeltaCrdt.CausalCrdt.handle_call/3
(delta_crdt 0.6.2) lib/delta_crdt/causal_crdt.ex:188: DeltaCrdt.CausalCrdt.handle_call({:get, [:data]}, {#PID<0.496.0>, #Reference<0.1390650936.464519170.185368>}, %DeltaCrdt.CausalCrdt{crdt_module: DeltaCrdt.AWLWWMap, crdt_state: %DeltaCrdt.AWLWWMap{dots: %{308441501 => 1, 832333321 => 1}, value: %{data: %{{%{}, -576460750822203482} => #MapSet<[{308441501, 1}]>, {%{}, -576460750512409489} => #MapSet<[{832333321, 1}]>}}}, max_sync_size: 200, merkle_map: %MerkleMap{map: %{data: %{{%{}, -576460750822203482} => #MapSet<[{308441501, 1}]>, {%{}, -576460750512409489} => #MapSet<[{832333321, 1}]>}}, merkle_tree: %MerkleMap.MerkleTree{tree: {<<242, 89, 53, 225>>, {<<5, 163, 213, 97>>, %{data: %{{%{}, -576460750822203482} => #MapSet<[{308441501, 1}]>, {%{}, -576460750512409489} => #MapSet<[{832333321, 1}]>}}}}}}, name: App.Crdt, neighbour_monitors: %{{App.Crdt, :app@node} => #Reference<0.1390650936.464519173.183426>}, neighbours: #MapSet<[{App.Crdt, :app@node}]>, node_id: 308441501, on_diffs: nil, outstanding_syncs: %{}, sequence_number: 0, storage_module: nil, sync_interval: 200})
(stdlib 3.14.1) gen_server.erl:715: :gen_server.try_handle_call/4
(stdlib 3.14.1) gen_server.erl:744: :gen_server.handle_msg/6
(stdlib 3.14.1) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message (from App.Client): {:get, [:data]}
iex> GenServer.call(App.Crdt, {:read, [:data]})
%{data: %{}}
To improve the performance it would be useful to have a performance suite that benchmarked the basic operations (read, add, remove, clear, and join).
Join decomposition, as described in https://arxiv.org/pdf/1803.02750.pdf will reduce the problem of redundant deltas being sent around in the network.
I haven't quite grokked it 100% yet, so if anyone has a good understanding of it and wants to help, then discussion is VERY welcome (or a PR).
The following module simulates the scenario with several concurrent processes writing to CRDT at some constant rate. It looks like when running the script on 2 or more nodes with the interval between writes close to sync_interval
, some keys will often be lost.
defmodule DeltaCrdt.Test do
@crdt_test :crdt_test
def init(sync_interval \\ 200) do
{:ok, crdt} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: sync_interval, name: @crdt_test)
:net_adm.world()
|> Enum.reject(fn node -> node == Node.self() end)
|> set_neighbours(crdt)
end
def run(opts \\ []) do
write_to_crdt(
Keyword.get(opts, :processes, 10),
Keyword.get(opts, :requests_per_process, 1000),
Keyword.get(opts, :interval, 200)
)
end
def crdt_length() do
length(Map.keys(DeltaCrdt.to_map(:crdt_test)))
end
def stop() do
Process.exit(:erlang.whereis(:crdt_test), :normal)
end
defp set_neighbours(nodes, crdt) do
DeltaCrdt.set_neighbours(crdt,
Enum.map(nodes, fn node -> {@crdt_test, node} end)
)
end
## 'process_num' processes write `requests_per_process` times to CRDT every 'interval' milliseconds
defp write_to_crdt(process_num, requests_per_process, interval) do
Enum.each(1..process_num,
fn _process_id -> Task.async(fn -> write(requests_per_process, interval) end)
end)
end
defp write(requests_per_process, interval) do
Enum.each(1..requests_per_process,
fn _request_id ->
DeltaCrdt.put(@crdt_test,
:erlang.make_ref(), DateTime.to_unix(DateTime.utc_now(), :millisecond))
:timer.sleep(interval)
end)
end
end
Steps to reproduce the issue (you might need to run several times):
iex --sname node1 --cookie delta_crdt -S mix
iex --sname node2 --cookie delta_crdt -S mix
import DeltaCrdt.Test
## This will initialize crdt on the node with 'sync_interval = 200' and then run 10 concurrent processes,
## each writing 1000 times, with 100 msecs between writes.
init(200); run(interval: 100)
crdt_length
Expected value is 20000 (that is, 10 * 1000 * 2), but you'll likely see lesser number, which indicates some records were lost.
You will likely get 20000 if running with much larger interval, i.e.
run(interval: 500)
, but it gets worse for 3 and more nodes, that is, even much larger interval does not prevent an occasional loss of records.
Note: If you want to run multiple tests within the same IEx session, you can kill current crdt by running:
stop
I can write the PR
Horde has the helpful NodeListener
genserver that I've been copy-pasting into toy projects where I'm playing with DeltaCrdt. My approach to libraries built on top of DeltaCrdt has been to start the DeltaCrdt
process under a supervisor, with a downstream child that bootstraps node membership. It'd be nice to have an optional NodeListener
baked into DeltaCrdt
- or even a setup like mine that use a supervisor.
Here's a ReplicatedMap implementation I came up with:
defmodule ReplicatedMap do
use Supervisor
alias ReplicatedMap.{NodeWatcher}
def start_link(opts) do
opts = Keyword.put_new(opts, :name, __MODULE__)
Supervisor.start_link(__MODULE__, opts, name: :"#{opts[:name]}.Supervisor")
end
def init(opts) do
name = opts[:name]
members = opts[:members] || :auto
children = [
{DeltaCrdt, [name: name, crdt: DeltaCrdt.AWLWWMap]},
with_members(members, name)
]
# Restart the node watcher when the ReplicatedMap crashes
# so that node membership is properly resync'd w/ the DeltaCrdt.
Supervisor.init(children, strategy: :rest_for_one)
end
@spec set_members(map :: module(), members :: [GenServer.server()])
def set_members(map, members) when is_list(members) do
DeltaCrdt.set_neighbours(map, members)
end
@spec put(map :: module(), key :: any(), value :: any()) :: module()
def put(map \\ __MODULE__, key, value) do
DeltaCrdt.put(map, key, value)
end
@spec delete(map :: module(), key :: any()) :: module()
def delete(map \\ __MODULE__, key) do
DeltaCrdt.delete(map, key)
end
@spec drop(map :: module(), keys :: [any()]) :: module()
def drop(map \\ __MODULE__, keys) when is_list(keys) do
DeltaCrdt.drop(map, keys)
end
@spec get(map :: module(), key :: any()) :: nil | any()
def get(map \\ __MODULE__, key) do
case DeltaCrdt.read(map, [key]) do
%{^key => val} -> val
_ -> nil
end
end
defp with_members(:auto, crdt_name) do
{NodeWatcher, [name: :"#{crdt_name}.NodeWatcher", crdt_name: crdt_name]}
end
defp with_members(members, crdt_name) when is_list(members) do
{Task, fn -> DeltaCrdt.set_neighbours(crdt_name, members) end}
end
end
This library has been so fun to play with, and I use it to show off "the power of Elixir/Erlang" whenever I have the chance.
I can provide the PR
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.