diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpHeadersRequestFilter.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpHeadersRequestFilter.java new file mode 100644 index 00000000..bf618f85 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpHeadersRequestFilter.java @@ -0,0 +1,24 @@ +package io.github.kimmking.gateway.filter; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.FullHttpRequest; + +public class HttpHeadersRequestFilter extends ChannelInboundHandlerAdapter implements HttpRequestFilter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof FullHttpRequest) { + FullHttpRequest fullHttpRequest = (FullHttpRequest) msg; + filter(fullHttpRequest, ctx); + } else { + ctx.fireChannelRead(msg); + } + } + + @Override + public void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx) { + fullRequest.headers().add("nio", "linboxuan"); + ctx.fireChannelRead(fullRequest); + } +} diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java index 69b40fde..28069cf8 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java @@ -1,8 +1,6 @@ package io.github.kimmking.gateway.inbound; -import io.github.kimmking.gateway.filter.HeaderHttpRequestFilter; -import io.github.kimmking.gateway.filter.HttpRequestFilter; -import io.github.kimmking.gateway.outbound.httpclient4.HttpOutboundHandler; +import io.github.kimmking.gateway.outbound.netty4.HttpOutboundHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.FullHttpRequest; @@ -10,18 +8,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - public class HttpInboundHandler extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(HttpInboundHandler.class); - private final List proxyServer; private HttpOutboundHandler handler; - private HttpRequestFilter filter = new HeaderHttpRequestFilter(); - public HttpInboundHandler(List proxyServer) { - this.proxyServer = proxyServer; - this.handler = new HttpOutboundHandler(this.proxyServer); + public HttpInboundHandler(String proxyServer) { + handler = new HttpOutboundHandler(proxyServer); } @Override @@ -34,14 +27,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { try { //logger.info("channelRead流量接口请求开始,时间为{}", startTime); FullHttpRequest fullRequest = (FullHttpRequest) msg; -// String uri = fullRequest.uri(); -// //logger.info("接收到的请求url为{}", uri); + String uri = fullRequest.uri(); + logger.info("接收到的请求url为{}", uri); // if (uri.contains("/test")) { // handlerTest(fullRequest, ctx); // } - handler.handle(fullRequest, ctx, filter); - + handler.handle(fullRequest, ctx); } catch(Exception e) { e.printStackTrace(); } finally { diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundInitializer.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundInitializer.java index 1d651fb1..a41c2a2b 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundInitializer.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundInitializer.java @@ -1,21 +1,17 @@ package io.github.kimmking.gateway.inbound; -import io.github.kimmking.gateway.filter.HttpRequestFilter; -import io.netty.channel.ChannelHandlerContext; +import io.github.kimmking.gateway.filter.HttpHeadersRequestFilter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; -import java.util.List; - public class HttpInboundInitializer extends ChannelInitializer { - private List proxyServer; + private String proxyServer; - public HttpInboundInitializer(List proxyServer) { + public HttpInboundInitializer(String proxyServer) { this.proxyServer = proxyServer; } @@ -28,6 +24,7 @@ public void initChannel(SocketChannel ch) { p.addLast(new HttpServerCodec()); //p.addLast(new HttpServerExpectContinueHandler()); p.addLast(new HttpObjectAggregator(1024 * 1024)); + p.addLast(new HttpHeadersRequestFilter()); p.addLast(new HttpInboundHandler(this.proxyServer)); } } diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/NettyHttpClient.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/NettyHttpClient.java new file mode 100644 index 00000000..5ac32f2b --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/NettyHttpClient.java @@ -0,0 +1,76 @@ +package io.github.kimmking.gateway.outbound; + +import io.github.kimmking.gateway.outbound.netty4.NettyHttpClientInboundHandler; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.*; +import io.netty.util.concurrent.FutureListener; + +import java.net.URI; +import java.util.function.Consumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class NettyHttpClient { + + private final Bootstrap b; + private final EventLoopGroup workerGroup; + + public NettyHttpClient(){ + workerGroup = new NioEventLoopGroup(); + b = new Bootstrap() + .group(workerGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE, true); + } + + public void connect(final Consumer httpResponseConsumer, final HttpHeaders headers, final String backendUrl) { + Matcher matcher = Pattern.compile("http://(.+):([0-9]+)").matcher(backendUrl); + String host; + int port; + if (matcher.find()) { + host = matcher.group(1); + port = Integer.parseInt(matcher.group(2)); + } else { + throw new IllegalArgumentException("illegal backendUrl"); + } + try { + // Start the client. + b.handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码 + ch.pipeline().addLast(new HttpResponseDecoder()); +// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码 + ch.pipeline().addLast(new HttpRequestEncoder()) + .addLast(new NettyHttpClientInboundHandler(httpResponseConsumer)); +// ch.pipeline().addLast(new HttpClientOutboundHandler(ctx, inbound)); + } + }); + ChannelFuture f = b.connect(host, port).sync(); + String url = "http://" + host + ":" + port + "/api/hello"; + URI uri = new URI(url); + DefaultFullHttpRequest request = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString(), + Unpooled.buffer(0), headers, new DefaultHttpHeaders(true)); + // 构建http请求 + request.headers().set(HttpHeaderNames.HOST, host); + request.headers().set(HttpHeaderNames.CONNECTION, + HttpHeaderNames.CONNECTION); + request.headers().set(HttpHeaderNames.CONTENT_LENGTH, + request.content().readableBytes()); + f.channel().write(request); + f.channel().flush(); + f.channel().closeFuture().sync(); + }catch (Exception e){ + throw new RuntimeException(e.getCause()); + } finally{ + workerGroup.shutdownGracefully(); + } + } + +} \ No newline at end of file diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/HttpOutboundHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/HttpOutboundHandler.java new file mode 100644 index 00000000..00eef264 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/HttpOutboundHandler.java @@ -0,0 +1,88 @@ +package io.github.kimmking.gateway.outbound.netty4; + +import io.github.kimmking.gateway.outbound.NettyHttpClient; +import io.github.kimmking.gateway.outbound.httpclient4.NamedThreadFactory; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.*; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.*; + +import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +public class HttpOutboundHandler { + private ExecutorService proxyService; + private String backendUrl; + private NettyHttpClient httpclient; + + public HttpOutboundHandler(String backendUrl) { + this.backendUrl = backendUrl.endsWith("/") ? backendUrl.substring(0, backendUrl.length() - 1) : backendUrl; + int cores = Runtime.getRuntime().availableProcessors() * 2; + long keepAliveTime = 1000; + int queueSize = 2048; + RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); + proxyService = new ThreadPoolExecutor(cores, cores, + keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize), + new NamedThreadFactory("proxyService"), handler); + + httpclient = new NettyHttpClient(); + } + + public void handle(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx) { + final String url = this.backendUrl + fullRequest.uri(); + proxyService.submit(() -> fetchGet(fullRequest, ctx, url)); + } + + private void fetchGet(final FullHttpRequest inbound, final ChannelHandlerContext ctx, final String url) { + httpclient.connect(httpResponse -> handleResponse(inbound, ctx, httpResponse), inbound.headers(), url); + } + + private void handleResponse(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx, final Object msg) { + FullHttpResponse fullHttpResponse = null; + try { + String value = "hello,kimmking"; + fullHttpResponse = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes(StandardCharsets.UTF_8))); + fullHttpResponse.headers().set("Content-Type", "application/json"); + fullHttpResponse.headers().setInt("Content-Length", value.length()); + + + if (msg instanceof HttpResponse) + { + HttpResponse response = (HttpResponse) msg; + System.out.println("CONTENT_TYPE:" + response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + } + if(msg instanceof HttpContent) + { + HttpContent content = (HttpContent)msg; + ByteBuf buf = content.content(); + System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8)); + buf.release(); + } + } catch (Exception e) { + e.printStackTrace(); + fullHttpResponse = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT); + exceptionCaught(ctx, e); + } finally { + if (fullRequest != null) { + if (!HttpUtil.isKeepAlive(fullRequest)) { + ctx.write(fullHttpResponse).addListener(ChannelFutureListener.CLOSE); + } else { + ctx.write(fullHttpResponse); + } + } + ctx.flush(); + //ctx.close(); + } + + } + + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClient.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClient.java deleted file mode 100644 index 79aeb148..00000000 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClient.java +++ /dev/null @@ -1,51 +0,0 @@ -//package io.github.kimmking.gateway.outbound; -// -//import io.netty.bootstrap.Bootstrap; -//import io.netty.channel.ChannelFuture; -//import io.netty.channel.ChannelInitializer; -//import io.netty.channel.ChannelOption; -//import io.netty.channel.EventLoopGroup; -//import io.netty.channel.nio.NioEventLoopGroup; -//import io.netty.channel.socket.SocketChannel; -//import io.netty.channel.socket.nio.NioSocketChannel; -//import io.netty.handler.codec.http.HttpRequestEncoder; -//import io.netty.handler.codec.http.HttpResponseDecoder; -// -//public class NettyHttpClient { -// public void connect(String host, int port) throws Exception { -// EventLoopGroup workerGroup = new NioEventLoopGroup(); -// -// try { -// Bootstrap b = new Bootstrap(); -// b.group(workerGroup); -// b.channel(NioSocketChannel.class); -// b.option(ChannelOption.SO_KEEPALIVE, true); -// b.handler(new ChannelInitializer() { -// @Override -// public void initChannel(SocketChannel ch) throws Exception { -// // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码 -// ch.pipeline().addLast(new HttpResponseDecoder()); -// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码 -// ch.pipeline().addLast(new HttpRequestEncoder()); -// ch.pipeline().addLast(new HttpClientOutboundHandler()); -// } -// }); -// -// // Start the client. -// ChannelFuture f = b.connect(host, port).sync(); -// -// -// f.channel().write(request); -// f.channel().flush(); -// f.channel().closeFuture().sync(); -// } finally { -// workerGroup.shutdownGracefully(); -// } -// -// } -// -// public static void main(String[] args) throws Exception { -// NettyHttpClient client = new NettyHttpClient(); -// client.connect("127.0.0.1", 8844); -// } -//} \ No newline at end of file diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClientInboundHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClientInboundHandler.java new file mode 100644 index 00000000..c27e078b --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClientInboundHandler.java @@ -0,0 +1,21 @@ +package io.github.kimmking.gateway.outbound.netty4; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +import java.util.function.Consumer; + +public class NettyHttpClientInboundHandler extends SimpleChannelInboundHandler { + private final Consumer httpResponseConsumer; + + + public NettyHttpClientInboundHandler(Consumer httpResponseConsumer) { + this.httpResponseConsumer = httpResponseConsumer; + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, Object msg) { + httpResponseConsumer.accept(msg); + } + +} \ No newline at end of file diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClientOutboundHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClientOutboundHandler.java deleted file mode 100644 index 6730cd5a..00000000 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClientOutboundHandler.java +++ /dev/null @@ -1,22 +0,0 @@ -package io.github.kimmking.gateway.outbound.netty4; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; - -public class NettyHttpClientOutboundHandler extends ChannelInboundHandlerAdapter { - - @Override - public void channelActive(ChannelHandlerContext ctx) - throws Exception { - - - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - - - - } -} \ No newline at end of file