Comments (19)
This functionality is also useful for implementing generic protocols such as Erlang's gen_server
. For instance, we can implement a semi-typed gen_server
as in https://gist.github.com/4025934. However, provided that we add functionality
wrap :: Serializable a => a -> AbstractMessage
matchAbstractMessage :: AbstractMessage -> [Match b] -> Process (Maybe b)
and make AbstractMessage
serializable (different from forwarding!) we could change that GenServer
example as follows:
data Reply :: * where
Reply :: Serializable a => a -> Reply
data Server = Server {
init :: Process ()
, handleCall :: [Match Reply]
}
start :: (Serializable request, Serializable response)
=> Name -> Process Server -> Process ()
start name createServer = do
server <- createServer
pid <- spawnLocal $ do
init server
forever $ do
(them, request) <- expect
mReply <- matchAbstractMessage request (handleCall server)
case mReply of
Nothing -> undefined -- deal with error
Just (Reply reply) -> send them x
register name pid
call :: (Serializable a, Serializable b) => Name -> a -> Process b
call name request = do
us <- getSelfPid
nsend name (us, wrap request)
expect
(Although for the specific case of GenServer
I'm not 100% sure this this an improvement over the semi-typed version. But it illustrates the ideas.)
from distributed-process.
I do think would represent an improvement, even though it weakens the guarantees the type system is able to make. The reason I see this as valuable is that library and infrastructure code often cares little about the content of data, but does need to deal with routing, concurrency and error handling, delegating the 'functional' aspects of an application (or integration scenario) to so other code that does care about the specific types involved. So having the ability to interact with AbstractMessage
at this level is useful in some cases (where it simplifies the code required to deal with non-functional concerns) and the more specific types available via the higher level API (where Serializable
data is coerced into its actual runtime type based on Typeable
information) is better for application authors who're simply trying to leverage CH to build cool stuff.
I may attempt to contribute a pull request for this at some point in the future, though I have no idea when. :)
from distributed-process.
So I think this looks fine. We just need to think of sensible names for these things. Perhaps:
data Message
wrapMessage :: Serializable a => a -> Message
matchMessage :: Message -> [Match a] -> Process (Maybe a)
from distributed-process.
Note that the Message
type already exists, it is the internal wrapper that contains a FingerPrint
and a ByteString
. It is precisely the thing that AbstractMessage
provides a layer over; initially I didn't want to expose the Message
type directly because I thought it would be confusing if we regarded Message
as itself Serializable
, because now it the difference between send
ing an Message and
forward`ing it might be confusing. But if we add support for serializability anyway we can just export the internal type.
from distributed-process.
I'm not particularly advocing that we expose an internal type, just trying to think of a sensible name for the public API.
from distributed-process.
No, I understand, but there is no reason not to export Message
(the type at least, not the constructor).
from distributed-process.
I haven't gone the whole hog with implementing this, however I'd like to draw your attention to this branch which I just pushed. I am not going to merge this without having taken some guidance from you guys first, so I'd appreciate it if you could take a peek and see what you think.
Whilst implementing generic server processes, I noticed that sometimes I want to dequeue a message from the mailbox, but not handle it myself. This is typical of generic servers where the process mailbox is managed by the implementation, and the actual handling of messages is supplied by the user as a callback of sorts.
For GenProcess
it is not necessary to expose Message
as Serializable
or otherwise, but we do need a way to provide a handle_info equivalent, and I achieved this by providing a maybeHandleMessage
operation on AbstractMessage
which (providing the if condition matches) defers type checking until the handler attempts to run and returns a Maybe
so you can determine whether or not the handler was applied. This always dequeue behaviour makes it possible to implement handle_info really neatly.
The key thing about this code is that I do want to dequeue the message, but it don't necessarily know if I can handle it. That latter part is up to the user callbacks, so I take the AbstractMessage
and see if the user can handle it or not. Because I have an array of handlers, I'm sort-of duplicating the loop through available matches concept here, but I have variable policies in place to decide what to do with un-matched messages. Unlike vanilla cloud haskell, the gen-server needs to decide what to do with these...
-- | Policy for handling unexpected messages, i.e., messages which are not
-- sent using the 'call' or 'cast' APIs, and which are not handled by any of the
-- 'handleInfo' handlers.
data UnhandledMessagePolicy =
Terminate
| DeadLetter ProcessId
| Drop
| ReQueue
data InfoDispatcher s = InfoDispatcher {
dispatchInfo :: s -> AbstractMessage -> Process (Maybe (ProcessAction s))
}
Each info handler is a pretty simple function, with the wrapper simply deferring to maybeHandleMessage
handleInfo :: forall s a. (Serializable a)
=> (s -> a -> Process (ProcessAction s))
-> InfoDispatcher s
handleInfo h = InfoDispatcher { dispatchInfo = doHandleInfo h }
where
doHandleInfo :: forall s2 a2. (Serializable a2)
=> (s2 -> a2 -> Process (ProcessAction s2))
-> s2
-> AbstractMessage
-> Process (Maybe (ProcessAction s2))
doHandleInfo h' s msg = maybeHandleMessage msg (h' s)
The real meat of the work is in the process implementation, which walks all the info handlers and makes a policy based decision if it gets to the end without having handled the message. This is work in progress, but I hope it gives a good idea of how maybeHandleMessage
can be used:
-- Process Implementation
applyPolicy :: s
-> UnhandledMessagePolicy
-> AbstractMessage
-> Process (ProcessAction s)
applyPolicy s p m =
case p of
Terminate -> stop (TerminateOther "unexpected-input")
DeadLetter pid -> forward m pid >> continue s
Drop -> continue s
initLoop :: Behaviour s -> s -> Delay -> Process TerminateReason
initLoop b s w =
let p = unhandledMessagePolicy b
t = timeoutHandler b
ms = map (matchMessage p s) (dispatchers b)
ms' = ms ++ addInfoAux p s (infoHandlers b)
in loop ms' t s w
where
addInfoAux :: UnhandledMessagePolicy
-> s
-> [InfoDispatcher s]
-> [Match (ProcessAction s)]
addInfoAux p ps ds = [matchAny (infoHandler p ps ds)]
infoHandler :: UnhandledMessagePolicy
-> s
-> [InfoDispatcher s]
-> AbstractMessage
-> Process (ProcessAction s)
infoHandler pol st [] msg = applyPolicy st pol msg
infoHandler pol st (d:ds :: [InfoDispatcher s]) msg
| length ds > 0 = let dh = dispatchInfo d in do
-- NB: we *do not* want to terminate/dead-letter messages until
-- we've exhausted all the possible info handlers
m <- dh st msg
case m of
Nothing -> infoHandler pol st ds msg
Just act -> return act
-- but here we *do* let the policy kick in
| otherwise = let dh = dispatchInfo d in do
m <- dh st msg
case m of
Nothing -> applyPolicy st pol msg
Just act -> return act
The generic process implementation looks quite neat with this in place:
data Reset = Reset
deriving (Typeable)
$(derive makeBinary ''Reset)
type MyState = [String]
demo :: Behaviour MyState
demo = Behaviour {
dispatchers = [
handleCall add
, handleCast reset
]
, infoHandlers = [handleInfo handleMonitorSignal]
, timeoutHandler = onTimeout
, terminateHandler = undefined
, unhandledMessagePolicy = Drop
}
add :: MyState -> String -> Process (ProcessReply MyState String)
add s x =
let s' = (x:s)
in reply "ok" s'
reset :: MyState -> Reset -> Process (ProcessAction MyState)
reset _ Reset = continue []
handleMonitorSignal :: MyState -> ProcessMonitorNotification -> Process (ProcessAction MyState)
handleMonitorSignal s (ProcessMonitorNotification _ _ _) = continue s
onTimeout :: TimeoutHandler MyState
onTimeout _ _ = stop $ TerminateOther "timeout"
from distributed-process.
And the canonical use-case of course, is in my supervisor implementation: https://github.com/haskell-distributed/distributed-process-platform/blob/supervisor/src/Control/Distributed/Process/Platform/Supervisor.hs#L162
I'm wondering whether the API should be more like what you suggested above though. I guess the main difference is that I'm not so interested in sending the messages on elsewhere. Even if the Message
type is exported, without the messageFingerprint
being available there's no way to decode it and I actually need to decode the Message
in the callback routines, but not in the code that calls receive
- that code doesn't care what the decoded type will be and I think this is possibly a better solution to handling input messages in a generic fashion. If you're going to decode, someone somewhere needs to know the type.
from distributed-process.
I'm wondering whether the API should be more like what you suggested above though.
I'm not convinced that issue #71 really was a duplicate. This issue talks about being able to use expect and receive primitives to obtain a Message
(or some other type that contains a Message
in it) and then forward that Message
which is a useful feature. Issue #71 however is about layering of code that does not care about message types with code that does. Exporting Message
and allowing for matchMessage
and/or wrapMessage
would not solve the latter case AFAICT.
from distributed-process.
I'm not convinced that issue #71 really was a duplicate.
To that end, I've re-opened it and submitted a patch (see pull request #116) to resolve it. This issue remains open as implementing a proxy in the way @edsko describes above would still require more work (i.e., the approach suggested here).
from distributed-process.
I've decided to still go ahead with what @edsko proposed here as well - I need the ability to pull messages out of the mailbox without processing them immediately, and this fits in nicely with that requirement.
One thing that does frustrate here is that once we expose Message
like this, AbstractMessage
will quickly become pointless. I don't however, want to wait for the next major version bump to release this, as I really need it for the ManagedProcess
(i.e., gen-server) API in d-p-platform. I think I'll just deprecate AbstractMessage
instead.
from distributed-process.
I think I'll just deprecate AbstractMessage instead.
That's precisely what I've needed to do, which according to http://www.haskell.org/haskellwiki/Package_versioning_policy means this requires a major version bump. Re-assigning to 0.5.0, again.
from distributed-process.
Implementation is park in https://github.com/haskell-distributed/distributed-process/compare/expose-message. We can't merge this until 0.5.0 because it removes AbstractMessage
.
from distributed-process.
I am not sure what, if anything, still needs to be decided here. Can you give me a three line summary of the current approach?
from distributed-process.
Sure.
Make Message
serializable, export the type (not the constructors) and provide an API to converting to/from Message
and other serializable types. Make matchAny
work with Message
instead of AbstractMessage
and continue to provide a forward
operation on Message
.
from distributed-process.
Right, sounds like a plan :)
from distributed-process.
Right, sounds like a plan :)
Awesome - this will get merged into 0.5.0 then! :D
from distributed-process.
@hyperthunk As far as I understand everything in this task was merged and published? can we close it?
from distributed-process.
Yes, we can close. Sorry for the massive delay...
from distributed-process.
Related Issues (20)
- Could not deduce (MonadFail Process) arising from a use of ‘fail’ HOT 1
- whereisRemoteAsync does not work via LAN HOT 1
- maintainance issue w.r.t. STM dependency & GHC version compatibility
- broken link in repo description HOT 3
- Generalize call to take a `Closure (SerializableDict a)`
- Revamp/Update CI
- Revamp CI on distributed-process
- Evaluate CI providers HOT 2
- Would it be possible to send and receive CH messages from potentially untrusted sources HOT 1
- Support for distributed traces
- Revamp docs site on distributed process
- Establish support window & upgrade plan
- Address Intermittently failing tests HOT 1
- Remove rematch dependency in distributed-process-tests
- Build failure with ghc-9.2.4 `Expecting one more argument to ‘TyVarBndr’` HOT 2
- Can’t build with GHC 9.2.5 HOT 1
- No instance for (binary-0.8.5.1:Data.Binary.Class.Binary Main.Message) HOT 1
- New release? HOT 5
- Add LaurentRDC to the hackage maintainers list HOT 3
- Add davidsd and LaurentRDC to hackage maintainers list for `distributed-process-tests` HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from distributed-process.