Giter Club home page Giter Club logo

reactor-netty's Introduction

Reactor Netty

Join the chat at https://gitter.im/reactor/reactor-netty

Reactor Netty

publish CodeQL

Reactor Netty offers non-blocking and backpressure-ready TCP/HTTP/UDP/QUIC clients & servers based on Netty framework.

Getting it

Reactor Netty requires Java 8 or + to run.

With Gradle from repo.spring.io or Maven Central repositories (stable releases only):

    repositories {
      //maven { url 'https://repo.spring.io/snapshot' }
      maven { url 'https://repo.spring.io/milestone' }
      mavenCentral()
    }

    dependencies {
      //compile "io.projectreactor.netty:reactor-netty-core:1.2.0-SNAPSHOT"
      compile "io.projectreactor.netty:reactor-netty-core:1.2.0-M1"
      //compile "io.projectreactor.netty:reactor-netty-http:1.2.0-SNAPSHOT"
      compile "io.projectreactor.netty:reactor-netty-http:1.2.0-M1"
    }

See the Reference documentation for more information on getting it (eg. using Maven, or on how to get milestones and snapshots).

Getting Started

New to Reactor Netty? Check this Reactor Netty Workshop and the Reference documentation

Here is a very simple HTTP server and the corresponding HTTP client example

HttpServer.create()   // Prepares an HTTP server ready for configuration
          .port(0)    // Configures the port number as zero, this will let the system pick up
                      // an ephemeral port when binding the server
          .route(routes ->
                      // The server will respond only on POST requests
                      // where the path starts with /test and then there is path parameter
                  routes.post("/test/{param}", (request, response) ->
                          response.sendString(request.receive()
                                                     .asString()
                                                     .map(s -> s + ' ' + request.param("param") + '!')
                                                     .log("http-server"))))
          .bindNow(); // Starts the server in a blocking fashion, and waits for it to finish its initialization
HttpClient.create()             // Prepares an HTTP client ready for configuration
          .port(server.port())  // Obtains the server's port and provides it as a port to which this
                                // client should connect
          .post()               // Specifies that POST method will be used
          .uri("/test/World")   // Specifies the path
          .send(ByteBufFlux.fromString(Flux.just("Hello")))  // Sends the request body
          .responseContent()    // Receives the response body
          .aggregate()
          .asString()
          .log("http-client")
          .block();

Getting help

Having trouble with Reactor Netty? We'd like to help!

Reporting Issues

Reactor Netty uses GitHub’s integrated issue tracking system to record bugs and feature requests. If you want to raise an issue, please follow the recommendations below:

  • Before you log a bug, please search the issue tracker to see if someone has already reported the problem.
  • If the issue doesn't already exist, create a new issue.
  • Please provide as much information as possible with the issue report, we like to know the version of Reactor Netty that you are using, as well as your Operating System and JVM version.
  • If you want to raise a security vulnerability, please review our Security Policy for more details.

Contributing

See our Contributing Guide for information about contributing to Reactor Netty.

Building from Source

You don't need to build from source to use Reactor Netty (binaries in repo.spring.io), but if you want to try out the latest and greatest, Reactor Netty can be easily built with the gradle wrapper. You also need JDK 1.8.

$ git clone https://github.com/reactor/reactor-netty.git
$ cd reactor-netty
$ ./gradlew build

If you want to publish the artifacts to your local Maven repository use:

$ ./gradlew publishToMavenLocal

Javadoc

https://projectreactor.io/docs/netty/release/api/

Guides

License

Reactor Netty is Open Source Software released under the Apache License 2.0

reactor-netty's People

Contributors

akiraly avatar aneveu avatar anshlykov avatar bclozel avatar bsideup avatar chemicl avatar ctlove0523 avatar dependabot-preview[bot] avatar dependabot[bot] avatar ericbottard avatar jameschenx avatar jchenga avatar jdaru avatar lhotari avatar liyixin95 avatar olegdokuka avatar pderop avatar quanticc avatar raycoarana avatar roggenbrot avatar rstoyanchev avatar samueldlightfoot avatar simonbasle avatar smaldini avatar spring-builds avatar spring-operator avatar sullis avatar tamasperlaki avatar violetagg avatar yuzawa-san avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reactor-netty's Issues

ByteBuf is too unqualified for WebSocket messages

For WebSocket the NettyInbound.receive() which provides ByteBuf's is too unqualified to be useful because there is no way of knowing the boundaries of a complete WebSocket message nor whether it is binary, text, or pong. It's possible to use NettyInbound.receiveObject().cast(WebSocketFrame.class) but it would be nicer if a handler is given something a little less generic and more tailored to a WebSocket interaction -- e.g. a receive method that provides WebSocketFrame's.

A similar improvement on the outbound side would remove the need to specify text vs binary when calling one of the HttpOutbound#upgradeToWebSocketXxx methods since the input would already be WebSocketFrame's.

onComplete of All Data Received Signal Available Without Receive

Given the code

AtomicLong startTimeHolder = new AtomicLong();

httpClient.get(uri)
            .log("stream.beforeTiming")
            .doOnSubscribe(s -> startTimeHolder.set(System.currentTimeMillis()))
            .doFinally(signal -> System.out.println(System.currentTimeMillis() - startTimeHolder.get()))
            .log("stream.afterTiming")

notification of all "completion-style" events (doOnSuccess(), doOnTerminate(), doAfterTerminate(), and doFinally()) happen before the last of the data is passed to an subscriber to .receive(). You can see an example of this here where onComplete() is signaled followed by quite a lot more data.

Now, I'm not sure that this is strictly wrong. The Mono<HttpClient> is clearly done sending items, so it makes a certain kind of sense. However, this makes it exceeding difficult for any party other than the final subscriber of the actual data to get this information.

There should be a way for the caller of the HttpClient operations (.get() et al) to be signaled when all data has been received.

Question: Half Closed Connections

I'm wondering if there is a way in reactor-netty to shut down the output (from the client to the server) once the data has been written to the channel. Shutting down the output means calling SocketChannel#shutdownOutput()

Essentially I have a Publisher that is producing the data to send. Shutting down the output when the Publisher finishes might be too early because there might be pending writes. Is there a way to intercept when all the writes have been completed?
My TCP server (on which I don't have control) is waiting for the shut shutdownOutput() signal before starting sending the response.

ByteBufFlux skipping bytes when reading from file channel

in the ByteBufFlux.java line 144 the code is erroneously updating the file channel read position after reading a chunk. The position is already updated during the read (line 140) so there is no need to update it again.
The current buggy implementation is skipping bytes as a result.

You can have a look at my fork here, specifically the ByteBufFluxTest.java which is replicating the bug and the ByteBufFlux.java where I added the comment above the buggy line.

I can fix the bug and create a pull request if you wish.

Chained Mono.fromRunnable is not executed (+Workaround)

If you create a Mono with Mono.fromRunnable(...) and chain it using .then(...) it will be executed in 3.0.4.RELEASE but not in 3.0.5.RELEASE.
Workaround
Attach a .doOnSuccess(_void_ -> {}); to your runnable-Mono and it will be executed.

The code below produces the following output:
3.0.4.RELEASE
1: Mono1
2: Mono2
3: Mono1
4: Mono2

3.0.5.RELEASE
1: Mono1
2: Mono2
3: Mono1
4:

@Test
public void test_chained_runnable_monos() {

	Mono<Void> mono1 = Mono.fromRunnable(() -> {
		System.out.println("Mono1");
	}).doOnSuccess(_void_ -> {});
	Mono<Void> mono2 = Mono.fromRunnable(() -> {
		System.out.println("Mono2");
	});
	Mono<Void> monoEmpty = Mono.empty();
	
	System.out.print("1: ");
	mono1.subscribe();
	System.out.print("2: ");
	mono2.subscribe();
	System.out.print("3: ");
	monoEmpty.then(mono1).subscribe();
	System.out.print("4: ");
	monoEmpty.then(mono2).subscribe();
	
}

Let the `NettyInbound.onReadIdle` and `NettyOutbound.onWriteIdle` replace existing idle handlers

java.lang.IllegalArgumentException: Duplicate handler name: onChannelReadIdle
        at io.netty.channel.DefaultChannelPipeline.checkDuplicateName(DefaultChannelPipeline.java:1062)
        at io.netty.channel.DefaultChannelPipeline.filterName(DefaultChannelPipeline.java:293)
        at io.netty.channel.DefaultChannelPipeline.addBefore(DefaultChannelPipeline.java:250)
        at io.netty.channel.DefaultChannelPipeline.addBefore(DefaultChannelPipeline.java:240)
        at reactor.ipc.netty.channel.ChannelOperations.addHandler(ChannelOperations.java:141)
        at reactor.ipc.netty.channel.ChannelOperations.addHandler(ChannelOperations.java:64)
        at reactor.ipc.netty.NettyInbound.onReadIdle(NettyInbound.java:66)
        at org.springframework.messaging.tcp.reactor.ReactorNettyTcpConnection.onReadInactivity(ReactorNettyTcpConnection.java:67)
        at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$StompConnectionHandler.initHeartbeats(StompBrokerRelayMessageHandler.java:706)
...

For context, on a TCP connection to a STOMP broker we use heartbeats to keep the connection from hanging. However heartbeats have to be negotiated first through an exchange of CONNECT and CONNECTED frames and so there is a possibility to hang before the heartbeats have started leading to a resource leak. This is why we first register one task first check if we haven't received CONNECTED and then a heartbeat task after.

This should not cause onChannelReadIdle to fail. One option would be to replace the OnChannelReadIdle handler first (my current workaround) or the API could provide a clear choice whether the new task should be added or used as a replacement.

Default 404 handler doesn't send content-length or transfer-encoding header.

Default implementation of HttpClient's 404 handler is following.

						request.delegate()
						       .writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1,
								       HttpResponseStatus.NOT_FOUND));

This response doesn't include Content-Length header and Transfer-Encoding header. As a result, HTTP client can't determine end of the response. HTTP/1.1 server must return Content-Length or Transfer-Encoding(iirc).

Curl's verbose output is here:

$ curl -vvv http://localhost:8011/jkjk
*   Trying ::1...
* connect to ::1 port 8011 failed: Connection refused
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8011 (#0)
> GET /jkjk HTTP/1.1
> Host: localhost:8011
> User-Agent: curl/7.49.1
> Accept: */*
> 
< HTTP/1.1 404 Not Found
* no chunk, no close, no size. Assume close to signal end
< 

HTTP Server keep alive - Netty Server unable to process TCP incoming requests

We noted that the Reactor HTTP Server Application we are developing running on Netty at IP:PORT are failing to process incoming requests.

We see that the TCP client repeatedly attempts to send an HTTP GET request to the application; however, it does not start processing all the messages sent.

The first request results in a successful interaction however the next one fails and logs do not show any errors but the application categorically rejected to process the next incoming request and the cycle begins again.

After an in depth investigation we noted when the failure happens the client is re-route to a different peer at IP:PORT and then this will result in a next successful interaction. Clients will not receive errors, however they experience more than 60 seconds delay for the HTTP synchronus invocation and the connection is timed out or aborted.

If the failure is not forced by a next incoming request, the server takes 60 seconds from the previous request to successfully process. Any request received in this time period will ultimately fails.

No way to wait for shutdown of TcpClient resources

Following the model of TcpResources I'm trying to create my own resources:

this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("reactor-netty-tcp-client");
this.poolResources = PoolResources.fixed("reactor-netty-tcp-pool");

TcpClient.create(opts -> opts
		.channelGroup(this.channelGroup)
		.loopResources(this.loopResources)
		.poolResources(this.poolResources)
		.preferNative(false));

So I can then shut them down:

ChannelGroupFuture future = this.channelGroup.close();
Mono<Void> completion = FutureMono.from(future)
		.doAfterTerminate((x, e) -> {
			this.loopResources.dispose();
			this.poolResources.dispose();
		});

Note however that unlike ChannelGroup there is no option to wait for the completion of the shutdown of resources whose dispose methods return void and don't wait.

`httpServerResponse.sendString(Mono.error(...))` stucks

reactor-netty doesn't handle httpServerResponse.sendString(Mono.error(new IllegalArgumentException())) correctly.

Reproduction code is here:

package reactor.ipc.netty.http;

import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyState;
import reactor.ipc.netty.config.ClientOptions;

import java.nio.charset.StandardCharsets;

public class HttpErrorTests {
    @Test
    public void test() {
        NettyState server = HttpServer.create(0).newRouter(httpServerRoutes
                -> httpServerRoutes.get("/", (httpServerRequest, httpServerResponse) -> {
            return httpServerResponse.sendString(Mono.error(new IllegalArgumentException()));
        })).block();
        HttpClient client = HttpClient.create(ClientOptions.to("localhost", server.address().getPort()));
        client.get("/")
                .flatMap(httpClientResponse -> {
                    return httpClientResponse.receive()
                            .asString(StandardCharsets.UTF_8)
                            .collectList();
                })
                .doOnNext(it -> System.out.println("GOT!!!!!!" + it))
                .doOnError(Throwable::printStackTrace)
                .blockLast();
        server.dispose();
    }
}

Output log is:

11:13:52.025 [main] DEBUG r.ipc.netty.util.NettyNativeDetector - Default Netty Epoll support : false
11:13:52.028 [main] DEBUG reactor.ipc.netty.tcp.TcpServer - Server is not managed (Not directly introspectable)
11:13:52.488 [reactor-tcp-server-select-1] DEBUG reactor.ipc.netty.tcp.TcpServer - BIND OK /0:0:0:0:0:0:0:0:65069
11:13:52.800 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.http.HttpClient - [id: 0x26b03022] REGISTERED
11:13:52.801 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.http.HttpClient - [id: 0x26b03022] CONNECT: localhost/127.0.0.1:65069
11:13:52.807 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.tcp.TcpServer - CONNECT [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070]
11:13:52.809 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.tcp.TcpClient - CONNECT OK localhost/127.0.0.1:65069
11:13:52.809 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.http.HttpClient - [id: 0x26b03022, L:/127.0.0.1:65070 - R:localhost/127.0.0.1:65069] ACTIVE
11:13:52.817 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] REGISTERED
11:13:52.817 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] ACTIVE
11:13:52.838 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.http.HttpClient - [id: 0x26b03022, L:/127.0.0.1:65070 - R:localhost/127.0.0.1:65069] WRITE: 48B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 20 48 54 54 50 2f 31 2e 31 0d 0a |GET / HTTP/1.1..|
|00000010| 68 6f 73 74 3a 20 6c 6f 63 61 6c 68 6f 73 74 0d |host: localhost.|
|00000020| 0a 61 63 63 65 70 74 3a 20 2a 2f 2a 0d 0a 0d 0a |.accept: */*....|
+--------+-------------------------------------------------+----------------+
11:13:52.839 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.http.HttpClient - [id: 0x26b03022, L:/127.0.0.1:65070 - R:localhost/127.0.0.1:65069] FLUSH
11:13:52.843 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] RECEIVED: 48B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 20 48 54 54 50 2f 31 2e 31 0d 0a |GET / HTTP/1.1..|
|00000010| 68 6f 73 74 3a 20 6c 6f 63 61 6c 68 6f 73 74 0d |host: localhost.|
|00000020| 0a 61 63 63 65 70 74 3a 20 2a 2f 2a 0d 0a 0d 0a |.accept: */*....|
+--------+-------------------------------------------------+----------------+
11:13:52.861 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] WRITE: 66B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 74 72 61 6e 73 66 65 72 2d 65 6e 63 6f 64 69 |.transfer-encodi|
|00000020| 6e 67 3a 20 63 68 75 6e 6b 65 64 0d 0a 63 6f 6e |ng: chunked..con|
|00000030| 6e 65 63 74 69 6f 6e 3a 20 63 6c 6f 73 65 0d 0a |nection: close..|
|00000040| 0d 0a                                           |..              |
+--------+-------------------------------------------------+----------------+
11:13:52.861 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] FLUSH
11:13:52.867 [reactor-tcp-client-io-1] DEBUG reactor.ipc.netty.http.HttpClient - [id: 0x26b03022, L:/127.0.0.1:65070 - R:localhost/127.0.0.1:65069] RECEIVED: 66B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 74 72 61 6e 73 66 65 72 2d 65 6e 63 6f 64 69 |.transfer-encodi|
|00000020| 6e 67 3a 20 63 68 75 6e 6b 65 64 0d 0a 63 6f 6e |ng: chunked..con|
|00000030| 6e 65 63 74 69 6f 6e 3a 20 63 6c 6f 73 65 0d 0a |nection: close..|
|00000040| 0d 0a                                           |..              |
+--------+-------------------------------------------------+----------------+
11:13:52.869 [reactor-tcp-client-io-1] DEBUG r.i.netty.http.HttpClientOperations - Received response (auto-read:false) : io.netty.handler.codec.http.DefaultHttpHeaders@bfebfb1b
Got response
200 OK
11:13:52.881 [reactor-tcp-server-io-2] ERROR r.ipc.netty.channel.NettyOperations - Write error
java.lang.IllegalArgumentException: null
	at reactor.ipc.netty.http.HttpErrorTests.lambda$null$0(HttpErrorTests.java:16) ~[test/:na]
	at reactor.ipc.netty.http.DefaultHttpServerRoutes$HttpRouteHandler.apply(DefaultHttpServerRoutes.java:80) ~[main/:na]
	at reactor.ipc.netty.http.DefaultHttpServerRoutes$HttpRouteHandler.apply(DefaultHttpServerRoutes.java:60) ~[main/:na]
	at reactor.ipc.netty.http.HttpServer.routeRequestResponse(HttpServer.java:214) ~[main/:na]
	at reactor.ipc.netty.http.HttpServer.lambda$newRouter$0(HttpServer.java:139) ~[main/:na]
	at reactor.ipc.netty.http.HttpServerOperations.onNext(HttpServerOperations.java:199) ~[main/:na]
	at reactor.ipc.netty.channel.NettyChannelHandler.channelRead(NettyChannelHandler.java:69) [main/:na]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877) [netty-all-4.1.3.Final.jar:4.1.3.Final]
	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
11:13:52.883 [reactor-tcp-server-io-2] DEBUG r.ipc.netty.channel.NettyOperations - Pausing read due to lack of request
11:13:52.883 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] WRITE: 0B
11:13:52.884 [reactor-tcp-server-io-2] DEBUG reactor.ipc.netty.http.HttpServer - [id: 0xc8e06f1d, L:/127.0.0.1:65069 - R:/127.0.0.1:65070] FLUSH
11:13:52.885 [reactor-tcp-server-io-2] ERROR r.i.netty.http.HttpServerOperations - Error processing connection. Closing the channel.
java.lang.IllegalArgumentException: null
	at reactor.ipc.netty.http.HttpErrorTests.lambda$null$0(HttpErrorTests.java:16) ~[test/:na]
	at reactor.ipc.netty.http.DefaultHttpServerRoutes$HttpRouteHandler.apply(DefaultHttpServerRoutes.java:80) ~[main/:na]
	at reactor.ipc.netty.http.DefaultHttpServerRoutes$HttpRouteHandler.apply(DefaultHttpServerRoutes.java:60) ~[main/:na]
	at reactor.ipc.netty.http.HttpServer.routeRequestResponse(HttpServer.java:214) ~[main/:na]
	at reactor.ipc.netty.http.HttpServer.lambda$newRouter$0(HttpServer.java:139) ~[main/:na]
	at reactor.ipc.netty.http.HttpServerOperations.onNext(HttpServerOperations.java:199) ~[main/:na]
	at reactor.ipc.netty.channel.NettyChannelHandler.channelRead(NettyChannelHandler.java:69) ~[main/:na]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
	at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_77]
11:13:52.886 [reactor-tcp-client-io-1] DEBUG r.ipc.netty.channel.NettyOperations - Pausing read due to lack of request
11:13:52.886 [reactor-tcp-client-io-1] DEBUG r.ipc.netty.channel.NettyOperations - Subscribing inbound receiver [pending: 0, done: false]

The log says Error processing connection. Closing the channel. but it doesn't close the socket.

As a result, client request stucks forever.

$ curl -vvv http://localhost:65069/
*   Trying ::1...
* Connected to localhost (::1) port 65069 (#0)
> GET / HTTP/1.1
> Host: localhost:65069
> User-Agent: curl/7.49.1
> Accept: */*
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< connection: close
<

https://gyazo.com/d6b920d1126f423bb97d2c2a273aa290

No way to set sub-protocols on WebSocket client

HttpClientOperations delegates to withWebSocketSupport which accepts sub-protocols but that's not exposed. Publicly HttpClientRequest only has sendWebSocket().

HttpClientResponse has receiveWebSocket which does accept sub-protocols but that makes no sense on a response since it is a request header.

Provide method to close WebSocket connection outside of a Flux Subscriber's Cancellation

A Subscriber to the Flux of input WebSocket messages can use the Cancellation it receives to close the WebSocket connection.

It would be very useful to expose a close() method that can be called by something other than the Flux Subscriber. For example a 3rd party WebSocketSession abstraction built around the Reactor Netty WebSocket support that itself does not subscribe to the input Flux.

Semantically this should be possible assuming there is only one Subscriber for a given WebSocket connection? Or potentially some WebSocket handler's may not even subscribe to the inbound stream effectively ignoring it and only broadcasting messages -- even in those cases it should be possible to close the connection somehow.

There should also be a way to specify a close status, e.g. close(int status).

HttpClient blocked after a failed request, error "unexpected message type: DefaultHttpRequest"

Version:
spring-boot-starter-web-reactive:0.1.0.BUILD-SNAPSHOT => reactor-netty-0.6.0.BUILD-20161219.135933-150

We used spring-web-reactive in our application since a while and it worked fine. Today, we got exception in our test client and the process is blocked somehow due to this error:

2016-12-19 18:49:24,359 [reactor-http-nio-1] ERROR reactor.ipc.netty.channel.ChannelOperations - [HttpClient] Error processing connection. Requesting close the channel
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultHttpRequest
	at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:106)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:749)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:812)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:825)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:805)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.doWrite(ChannelOperationsHandler.java:267)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:368)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:150)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:787)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:813)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:825)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:805)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:842)
	at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1032)
	at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:296)
	at reactor.ipc.netty.http.HttpOperations.lambda$sendHeaders$0(HttpOperations.java:98)
	at reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:114)
	at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:152)
	at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:69)
	at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:41)
	at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:421)
	at reactor.ipc.netty.http.client.HttpClientOperations.onChannelActive(HttpClientOperations.java:452)
	at reactor.ipc.netty.channel.PooledClientContextHandler.connectOrAcquire(PooledClientContextHandler.java:171)
	at reactor.ipc.netty.channel.PooledClientContextHandler.lambda$operationComplete$0(PooledClientContextHandler.java:118)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:454)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: unexpected message type: DefaultHttpRequest
	at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:69)
	at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:88)

After this error, the process is hanging forever. To reproduce this, one could setup a spring boot project using the version mentioned above and with the follow classes:

Application.java

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        new SpringApplication().run(args);
    }
}
AnyController.java

@Controller
@RequestMapping("/any")
public class AnyController {
    @RequestMapping(value = "/thing", method = RequestMethod.HEAD)
    public Mono<ResponseEntity<Void>> testAny() {
        return Mono.just(ResponseEntity.notFound().build());
    }
}
AnyControllerTest.java

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class AnyControllerTest {
    @LocalServerPort
    int port;
    String url = "http://localhost:{port}/any/thing";
    WebClient webClient = WebClient.create(new ReactorClientHttpConnector());
    @Test
    public void aTest() {
        Mono<ClientResponse> response = webClient.exchange(ClientRequest.HEAD(url, port).build());
        assert response.block().statusCode() == HttpStatus.NOT_FOUND;
    }
    @Test
    public void anotherTest() {
        Mono<ClientResponse> response = webClient.exchange(ClientRequest.HEAD(url, port).build());
        assert response.block().statusCode() == HttpStatus.NOT_FOUND;
    }
}

When one runs the test, the first test case will succeed without any problem, however the second one will be simply blocked and hangs forever....

HTTP service and TCP client bridge

One of my Spring WebFlux controller needs to access a remote TCP/IP server (legacy). How could I bridge the HTTP response with the TcpClient provided by reactor-netty?

Or maybe, would it be easier to not use Spring Webflux at all? If yes, is there some code I could use as example?

Thanks for your help,
--nick

Document encoding/decoding API and Spring 5 codecs

<dependency>
        <groupId>io.projectreactor.ipc</groupId>
	<artifactId>reactor-netty</artifactId>
	<version>0.5.2.RELEASE</version>
</dependency>
WARNING: An exception 'java.lang.NoClassDefFoundError: reactor/ipc/codec/Codec' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
java.lang.NoClassDefFoundError: reactor/ipc/codec/Codec
	at reactive.net.Netty.lambda$0(Netty.java:26)
	at reactor.ipc.netty.common.NettyChannelHandler.channelActive(NettyChannelHandler.java:104)

Consider removing default transfer-encoding header?

On the NettyHttpChannel class, we by default set the transfer-encoding header to chunked. This means that consumers need to call responseTransfer() or removeTransferEncodingChunked() to disable this behavior. Should we instead let the client decide whether they want to set the transfer-encoding or not?

At the moment, the spring-web-reactive class ReactorServerHttpResponse never calls responseTransfer() or removeTransferEncodingChunked() so all reactive based responses produced by spring-web are forced to include the transfer-encoding which should be an optional behavior.

Alternatively, we could consider updating spring-web's ReactorServerHttpResponse to call responseTransfer() or removeTransferEncodingChunked() to disable transfer-encoding in the presence of the conten-length header.

https://github.com/reactor/reactor-netty/blob/master/src/main/java/reactor/ipc/netty/http/NettyHttpChannel.java#L91

https://github.com/spring-projects/spring-framework/blob/master/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java#L87

p.s. I've created https://jira.spring.io/browse/SPR-14643 to potentially address this issue upstream.

HTTP Client - EncoderException: unexpected message type: DefaultHttpRequest

After updating to reactor-netty 0.6.0.Release we started to get erro in Http Client, everything was working fine before. The error happens after the first request is made and keeps going the same behavior: one request is successfully processed and the next one fails. This issue happens when setting http content-length header in post request's with the following Exception:

io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultHttpRequest at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:106) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:749) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:812) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:825) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:805) at reactor.ipc.netty.channel.ChannelOperationsHandler.doWrite(ChannelOperationsHandler.java:245) at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:372) at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:161) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:787) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:813) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:825) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:805) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:842) at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1032) at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:296) at reactor.ipc.netty.http.HttpOperations.lambda$sendHeaders$0(HttpOperations.java:96) at reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:114) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:38) at reactor.core.publisher.MonoThenIgnore$MonoThenIgnoreMain.drain(MonoThenIgnore.java:166) at reactor.core.publisher.MonoThenIgnore.subscribe(MonoThenIgnore.java:54) at reactor.ipc.netty.NettyOutbound.subscribe(NettyOutbound.java:267) at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:465) at reactor.ipc.netty.http.client.HttpClientOperations.onHandlerStart(HttpClientOperations.java:415) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:312) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: unexpected message type: DefaultHttpRequest at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:69) at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:88) ... 27 common frames omitted

Please,we need a fix on this because we have to be able to work with content-length header in http client requests.

Support client SNI

Currently, reactor uses the same code for creating a SslHandler regardless of the Netty channel being a client or server connection. (https://github.com/reactor/reactor-netty/blob/master/src/main/java/reactor/ipc/netty/options/NettyOptions.java#L219)

Clients should use the newHandler(ByteBufAllocator alloc, String peerHost, int peerPort) variant so that the server name gets sent in the SSL handshake so that the server knows which certificate to send to the client. See netty/netty#3801.

IllegalReferenceCountException with reactor-netty 0.6.1

I use reactor's TcpClient to connect to a Server.
While exchanging messages I get very often a IllegalReferenceCountException (see below) from netty.
I didn't have this problem with [core: 3.0.1.RELEASE, netty: 0.5.2.RELEASE].
It started after upgrading to core: [3.0.5.RELEASE, netty: 0.6.1.RELEASE]

Is it a bug or am I doing something wrong?
Maybe someone can give me a hint from the attached logs...

IllegalReferenceCountException.txt

100-Continue bug ?

Following https://gitter.im/reactor/reactor?at=58c928a5872fc8ce62083bae

This doesn't work when a 100-Continue is issued by the client:

return HttpServer.create(port).newHandler((request, response) ->  request.receive()
	.doOnNext(ByteBuf::retain)
	.compose(bb -> response.send(bb));
);

Server responds with a 200 OK instead of a 100-Continue.

> POST / HTTP/1.1
> User-Agent: curl/7.26.0
> Host: localhost:9100
> Accept: */*
> Content-Length: 1828
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
>
* additional stuff not fine transfer.c:1037: 0 0
* HTTP 1.1 or later with persistent connection, pipelining supported
< HTTP/1.1 200 OK
< transfer-encoding: chunked
<
* additional stuff not fine transfer.c:1037: 0 0
* Done waiting for 100-continue
* additional stuff not fine transfer.c:1037: 0 0
* additional stuff not fine transfer.c:1037: 0 0
* additional stuff not fine transfer.c:1037: 0 0
* additional stuff not fine transfer.c:1037: 0 0
* additional stuff not fine transfer.c:1037: 0 0
* additional stuff not fine transfer.c:1037: 0 0
* Operation timed out after 5000 milliseconds with 0 bytes received
* Closing connection #0
curl: (28) Operation timed out after 5000 milliseconds with 0 bytes received

If I do the following it works though

return HttpServer.create(port).newHandler((request, response) ->  request.receive()
	.doOnNext(ByteBuf::retain)
	.flatMap(bb -> response.send(Mono.just(bb)));
);

I'm using version 0.6.2.RELEASE
Also please note that if the client doesn't expect a 100-Continue (for instance, sent content is small enough to fit in one chunk) both solution work.

Add Mechanism to obtain port from HttpServer

It is very convenient that HttpServer.create allows passing in 0 for the port to dynamically allocate an available port. However, I do not see a way to obtain the dynamically allocated port which makes using it for writing tests difficult.

It would be nice if there were a way to access the port on HttpServer.

Provide a simpler embedded server API

Current Reactor Netty API is not very straightforward when it comes to just run it from a main, see for example this code sample where AtomicReference<NettyContext> is needed.

Could we have something like startAndAwait() method and something to add handlers in a more friendly fashion?

Rework NettyContext user pipeline API

The current API has confusing semantics, misleading javadocs and bugged implementation.

The ultimate goal of the addHandler/addDecoder methods is to let the users simply add handlers in the "user part" of the ChannelPipeline, which could be represented as the middle part of the pipeline:

 [ [reactor codecs], [<- user ENCODERS added here, user DECODERS added here ->], [reactor handlers] ]

The addHandler method should be removed, and addEncoder method added.
Detection of where to place handlers can be achieved more easily by prefixing the NettyPipeline names with reactor.left or reactor.right:

  • addEncoder will put its handler just after the last reactor.left. handler in the pipeline
  • addDecoder will put its handler just before the first reactor.right. handler in the pipeline

Have `addEncoder`/`addDecoder` skip if handler already exist, like legacy `addHandler`

addHandler used to check if there was an existing handler with the provided name, and skip the addition of the new handler if that was the case.
Since the move to addEncoder/addDecoder, this is no longer the case.

This should be fixed, for a minimum consistency with the previous API (even though the methods don't add at the same position as before).

Note that this is the inverse of #22 which would introduce a way of replacing instead of skipping.

NettyState dispose deadlock

dispose() call never return as shown with simple test:

	@Test
	public void testHang() throws Exception {
		NettyState httpServer = HttpServer
				.create(ServerOptions.on("0.0.0.0", 0)
						.eventLoopGroup(new NioEventLoopGroup(10)))
				.newRouter(r -> r.get("/data", (request, response) -> {
					return response.send(Mono.empty());
				})).block();
		httpServer.dispose();
	}

From stack dump:

"nioEventLoopGroup-2-1" #23 prio=10 os_prio=0 tid=0x00007fb8f0a36800 nid=0x7441 in Object.wait() [0x00007fb8cf2e1000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x0000000672ef4af8> (a io.netty.util.concurrent.DefaultPromise)
	at java.lang.Object.wait(Object.java:502)
	at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:239)
	- locked <0x0000000672ef4af8> (a io.netty.util.concurrent.DefaultPromise)
	at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:340)
	at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:34)
	at reactor.ipc.netty.tcp.TcpServer.lambda$channel$1(TcpServer.java:313)
	at reactor.ipc.netty.tcp.TcpServer$$Lambda$11/84739718.dispose(Unknown Source)
	at reactor.ipc.netty.channel.ChannelConnectHandler.lambda$operationComplete$0(ChannelConnectHandler.java:54)
	at reactor.ipc.netty.channel.ChannelConnectHandler$$Lambda$14/870848850.operationComplete(Unknown Source)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:514)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:488)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:427)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:111)
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82)
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1062)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:686)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:664)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1276)
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:634)
	at io.netty.channel.AbstractChannelHandlerContext.access$1100(AbstractChannelHandlerContext.java:38)
	at io.netty.channel.AbstractChannelHandlerContext$13.run(AbstractChannelHandlerContext.java:623)
	at io.netty.util.concurrent.SingleThreadEventExecutor.safeExecute(SingleThreadEventExecutor.java:451)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:745)


"main" #1 prio=5 os_prio=0 tid=0x00007fb8f000e800 nid=0x741f in Object.wait() [0x00007fb8f8d22000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x0000000677aae4e0> (a io.netty.channel.DefaultChannelPromise)
	at java.lang.Object.wait(Object.java:502)
	at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:239)
	- locked <0x0000000677aae4e0> (a io.netty.channel.DefaultChannelPromise)
	at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:129)
	at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:28)
	at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:340)
	at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:117)
	at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:28)
	at reactor.ipc.netty.channel.ChannelConnectHandler.dispose(ChannelConnectHandler.java:77)
	at reactor.ipc.netty.channel.ChannelState.dispose(ChannelState.java:69)
	at org.springframework.cloud.stream.app.gpfdist.sink.FooTests.testHang(FooTests.java:40)

Connections should be acquired on send, not on subscribe

Currently, connections are acquired from the pool (or opened in non-pooling scenarios) on subscription. This means that the connection is either unusable by other flows or consuming resources during a period of time that it is not in use. I'd like to see connections acquired on send instead of subscribe.

It may seem like this distinction is quite small in practical terms, but the Cloud Foundry Java Client has a number of extended flows, nested flows, etc. where a connection is acquired on subscription, but continuing up the flow, another connection is also acquired for a different request. This means that the first (lower) connection sits idle, but unusable until the first connection has returned. Worse even than that though, is in small pool sizes (say one connection), there is a deadlock as the second (higher) connection attempts to acquire a connection currently reserved by the first (lower) and it will never be able to do so.

I believe that it's a reasonable (if slow) requirement that any flow written should be able to use a one-connection pool successfully.

receiveWebSocket results "Failed to upgrade to websocket"

HttpClientResponse#receiveWebSocket delegates to withWebsocketSupport which checks if headers have been sent and if so returns an ISE. Of course at this stage when we already have a response the headers have been sent, so as far as I can see there is no way this could possibly work.

Path parameter parsing should allow dots in segments and ignore query parameters

There seems to be a problem with pattern matching for paths in HTTP requests in reactor-netty 0.6.0.RELEASE. Please consider the four examples below. They outline two different problems that are all related to (at least for me) unexpected behaviour of the pattern matcher. In summary they are

  1. Dots in parameterized segments break pattern matching (/{x} won't match /a.b)
  2. Query parameters break pattern matching, event without path params (/search won't match /search?q=reactor)

The last example with /test/{order} illustrates nicely the brokenness of the implementation here as soon as query parameters come into play (given a path /test/foo?q=bar, the value for order will be foo?q=bar).

package reactor.ipc.netty.http.server;
import org.junit.Test;
import reactor.ipc.netty.http.server.HttpPredicate.UriPathTemplate;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

public class UriPathTemplateTest {

    @Test
    public void patternShouldMatchPathWithOnlyLetters() {
        UriPathTemplate uriPathTemplate = new UriPathTemplate("/test/{order}");
        // works as expected
        assertThat(uriPathTemplate.match("/test/1").get("order"), is("1"));
    }

    @Test
    public void patternShouldMatchPathWithDots() {
        UriPathTemplate uriPathTemplate = new UriPathTemplate("/test/{order}");
        // does not match, the dot in the segment parameter breaks matching
        // expected: a map containing {"order": "2.0"}, found: empty map
        assertThat(uriPathTemplate.match("/test/2.0").get("order"), is("2.0"));
    }

    @Test
    public void staticPatternShouldMatchPathWithQueryParams() {
        UriPathTemplate uriPathTemplate = new UriPathTemplate("/test/3");
        // does not match, the query parameter breaks matching
        // expected: true, found: false
        assertThat(uriPathTemplate.matches("/test/3?q=reactor"), is(true));
    }

    @Test
    public void parameterizedPatternShouldMatchPathWithQueryParams() {
        UriPathTemplate uriPathTemplate = new UriPathTemplate("/test/{order}");
        // does not match, the query parameter breaks matching
        // expected: a map containing {"order": "3"}, found: a map containing {"order": "3?q=reactor"}
        assertThat(uriPathTemplate.match("/test/3?q=reactor").get("order"), is("3"));
    }

}

No way to shut down TcpClient resources

Currently TcpClient is created as follows:

ClientOptions clientOptions = ClientOptions.create();
clientOptions.loopResources(TcpResources.get()).poolResources(TcpResources.get());
options.accept(clientOptions);
return new TcpClient(clientOptions.duplicate());

Even if I provide my own resource through ClientOptions the global resources are created anyway and after that there is no way to shut them down since dispose() is empty and _dispose() is protected.

Please support the use case to create a TcpClient and then shut down all associated resources. If I provide my own resources it shouldn't trigger creation of global ones. There should also be some way to shut down global resources through an explicit method designed for that. Consider for example that when running in a server like Tomcat on shutdown you get ugly messages about active threads.

It would also be very useful for ClientOptions to expose a way to expose what an option is currently set to so that I can also accept external options and add my own only if others are not provided -- same thing that TcpClient#create would have to do.

ClosedChannelException on NettyOutbound.sendFile(…) with epoll

Using epoll channels with NettyOutbound.sendFile(…) sending files from a ZIP File-System causes ClosedChannelException.

Versions:

  • Reactor Core 3.0.5.RELEASE
  • Reactor Netty 0.6.2.RELEASE

Trace:

java.nio.channels.ClosedChannelException
	at io.netty.channel.epoll.Native.sendfile(...)(Unknown Source)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoError] :
	reactor.core.publisher.Mono.error(Mono.java:274)
	reactor.ipc.netty.FutureMono.from(FutureMono.java:48)
	reactor.ipc.netty.NettyOutbound.lambda$sendFile$1(NettyOutbound.java:184)
	reactor.core.publisher.MonoUsing.subscribe(MonoUsing.java:87)
	reactor.core.publisher.MonoSource.subscribe(MonoSource.java:65)
	reactor.core.publisher.MonoThenIgnore$MonoThenIgnoreMain.drain(MonoThenIgnore.java:151)
	reactor.core.publisher.MonoThenIgnore.subscribe(MonoThenIgnore.java:54)
	reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:78)
	reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:378)
	reactor.ipc.netty.http.server.HttpServerOperations.onHandlerStart(HttpServerOperations.java:354)
	io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:304)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
Error has been observed by the following operator(s):
	|_	Mono.error(FutureMono.java:48)
	|_	Mono.using(NettyOutbound.java:183)
	|_	Mono.then(ReactorNetty.java:246)
	|_	Mono.thenEmpty(ReactorNetty.java:246)

Code to reproduce: https://gist.github.com/mp911de/e0cf842ca27fb56a4e478d705dcfe34a
(Note: File must be served from the Jar-File to reproduce the error. Serving from the classes directory will not reveal the issue)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.