This is a simple chatroom using Play and Websockets with the Java API.
This project makes use of dynamic streams from Akka Streams, notably BroadcastHub
and MergeHub
. By combining MergeHub and BroadcastHub, you can get publish/subscribe functionality.
The flow is defined once in the controller, and used everywhere from the chat
action:
public class HomeController extends Controller {
private final Flow userFlow;
@Inject
public HomeController(ActorSystem actorSystem,
Materializer mat) {
org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
LoggingAdapter logging = Logging.getLogger(actorSystem.eventStream(), logger.getName());
//noinspection unchecked
Source<String, Sink<String, NotUsed>> source = MergeHub.of(String.class)
.log("source", logging)
.recoverWithRetries(-1, new PFBuilder().match(Throwable.class, e -> Source.empty()).build());
Sink<String, Source<String, NotUsed>> sink = BroadcastHub.of(String.class);
Pair<Sink<String, NotUsed>, Source<String, NotUsed>> sinkSourcePair = source.toMat(sink, Keep.both()).run(mat);
Sink<String, NotUsed> chatSink = sinkSourcePair.first();
Source<String, NotUsed> chatSource = sinkSourcePair.second();
this.userFlow = Flow.fromSinkAndSource(chatSink, chatSource).log("userFlow", logging);
}
public Result index() {
Http.Request request = request();
String url = routes.HomeController.chat().webSocketURL(request);
return Results.ok(views.html.index.render(url));
}
public WebSocket chat() {
return WebSocket.Text.acceptOrResult(request -> {
if (sameOriginCheck(request)) {
return CompletableFuture.completedFuture(F.Either.Right(userFlow));
} else {
return CompletableFuture.completedFuture(F.Either.Left(forbidden()));
}
});
}
}
You will need JDK 1.8 and sbt installed.
sbt run
Go to http://localhost:9000 and open it in two different browsers. Typing into one browser will cause it to show up in another browser.
This project is originally taken from Johan Andrén's Akka-HTTP version:
Johan also has a blog post explaining dynamic streams in more detail: