From 0c11ff5f4a0316ae55dd8f69f63e909766e8b052 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=AA=E9=B9=8F?= Date: Mon, 14 Dec 2020 19:09:57 +0800 Subject: [PATCH] practice before lesson --- .../main/java/java0/nio01/HttpServer01.java | 2 +- .../main/java/java0/nio01/HttpServer02.java | 2 +- .../main/java/java0/nio01/HttpServer03.java | 14 ++- 02nio/nio02/pom.xml | 12 ++ .../gateway/NettyServerApplication.java | 2 +- .../gateway/filter/HttpRequestFilter.java | 2 +- .../gateway/filter/ReqHeadFilter.java | 15 +++ .../gateway/inbound/HttpInboundHandler.java | 52 ++++++--- .../gateway/inbound/HttpInboundServer.java | 6 +- .../outbound/netty4/NettyHttpClient.java | 105 ++++++++++-------- .../NettyHttpClientOutboundHandler.java | 32 ++++-- .../gateway/outbound/okhttp/HttpUtils.java | 44 ++++++++ .../okhttp/OkhttpOutboundHandler.java | 104 +++++++++++++++++ .../kimmking/gateway/router/RandowRouter.java | 21 ++++ .../java/java0/conc0301/DaemonThread.java | 5 +- .../java/java0/conc0301/sync/Counter.java | 2 +- .../java/java0/conc0301/sync/Thread3.java | 4 +- .../java0/conc0302/atomic/AtomicMain.java | 2 +- .../java0/conc0302/lock/ConditionDemo.java | 64 ++++++++--- .../java0/conc0302/lock/LockSupportDemo.java | 2 +- .../threadpool/ExecutorServiceDemo.java | 2 + .../collection/ConcurrentHashMapDemo.java | 2 + .../collection/CopyOnWriteArrayListDemo2.java | 2 +- .../future/CompletableFutureDemo.java | 1 + .../conc0303/stream/StreamParallelDemo.java | 1 + .../conc0303/tool/CyclicBarrierDemo2.java | 7 +- .../java0/conc0303/tool/SemaphoreDemo2.java | 4 +- .../java0/conc0303/tool/SemaphoreDemo3.java | 4 +- .../src/main/java/io/kimmking/java8/A.java | 6 + .../java/io/kimmking/java8/ForeachDemo.java | 11 +- .../java/io/kimmking/java8/GenericDemo.java | 16 +++ .../java/io/kimmking/java8/GuavaDemo.java | 28 ++++- .../java/io/kimmking/java8/LambdaDemo.java | 2 +- .../java/io/kimmking/spring01/GuavaDemo.java | 2 +- .../src/main/resources/applicationContext.xml | 4 +- .../src/main/resources/springjms-receiver.xml | 2 +- .../src/main/resources/springjms-sender.xml | 2 +- .../kimmking/springboot01/jms/BeanConfig.java | 2 +- .../springboot01/jms/ProducerController.java | 4 +- .../src/main/resources/application.yml | 2 +- 40 files changed, 472 insertions(+), 124 deletions(-) create mode 100644 02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/ReqHeadFilter.java create mode 100644 02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/okhttp/HttpUtils.java create mode 100644 02nio/nio02/src/main/java/io/github/kimmking/gateway/router/RandowRouter.java diff --git a/02nio/nio01/src/main/java/java0/nio01/HttpServer01.java b/02nio/nio01/src/main/java/java0/nio01/HttpServer01.java index c9cc060f..7b0e9847 100644 --- a/02nio/nio01/src/main/java/java0/nio01/HttpServer01.java +++ b/02nio/nio01/src/main/java/java0/nio01/HttpServer01.java @@ -24,7 +24,7 @@ private static void service(Socket socket) { PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true); printWriter.println("HTTP/1.1 200 OK"); printWriter.println("Content-Type:text/html;charset=utf-8"); - String body = "hello,nio"; + String body = "hello,nio1"; printWriter.println("Content-Length:" + body.getBytes().length); printWriter.println(); printWriter.write(body); diff --git a/02nio/nio01/src/main/java/java0/nio01/HttpServer02.java b/02nio/nio01/src/main/java/java0/nio01/HttpServer02.java index 9b51cf29..20f1bfc7 100644 --- a/02nio/nio01/src/main/java/java0/nio01/HttpServer02.java +++ b/02nio/nio01/src/main/java/java0/nio01/HttpServer02.java @@ -26,7 +26,7 @@ private static void service(Socket socket) { PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true); printWriter.println("HTTP/1.1 200 OK"); printWriter.println("Content-Type:text/html;charset=utf-8"); - String body = "hello,nio"; + String body = "hello,nio2"; printWriter.println("Content-Length:" + body.getBytes().length); printWriter.println(); printWriter.write(body); diff --git a/02nio/nio01/src/main/java/java0/nio01/HttpServer03.java b/02nio/nio01/src/main/java/java0/nio01/HttpServer03.java index fad136a8..7f4f5bc2 100644 --- a/02nio/nio01/src/main/java/java0/nio01/HttpServer03.java +++ b/02nio/nio01/src/main/java/java0/nio01/HttpServer03.java @@ -1,6 +1,7 @@ package java0.nio01; import java.io.IOException; +import java.io.InputStream; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; @@ -27,7 +28,7 @@ private static void service(Socket socket) { PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true); printWriter.println("HTTP/1.1 200 OK"); printWriter.println("Content-Type:text/html;charset=utf-8"); - String body = "hello,nio"; + String body = "hello,nio3"; printWriter.println("Content-Length:" + body.getBytes().length); printWriter.println(); printWriter.write(body); @@ -37,4 +38,15 @@ private static void service(Socket socket) { e.printStackTrace(); } } + + private static void handleInputStreanm(Socket socket) throws IOException { + InputStream inputStream = socket.getInputStream(); + byte[] buffer = new byte[4096]; + StringBuilder sb = new StringBuilder(); + int tmp = 0; + while ((tmp = inputStream.read(buffer))!=-1){ + sb.append(new String(buffer,"UTF-8")); + } + // + } } \ No newline at end of file diff --git a/02nio/nio02/pom.xml b/02nio/nio02/pom.xml index 6cbbeffd..5de040a3 100644 --- a/02nio/nio02/pom.xml +++ b/02nio/nio02/pom.xml @@ -26,6 +26,18 @@ + + org.jetbrains.kotlin + kotlin-stdlib + 1.3.70 + + + + com.squareup.okhttp3 + okhttp + 4.9.0 + + io.netty netty-all diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/NettyServerApplication.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/NettyServerApplication.java index 870b2d4f..30484f6e 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/NettyServerApplication.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/NettyServerApplication.java @@ -9,7 +9,7 @@ public class NettyServerApplication { public final static String GATEWAY_VERSION = "1.0.0"; public static void main(String[] args) { - String proxyServer = System.getProperty("proxyServer","http://localhost:8088"); + String proxyServer = System.getProperty("proxyServer","http://localhost:8801,http://localhost:8802,http://localhost:8803"); String proxyPort = System.getProperty("proxyPort","8888"); // http://localhost:8888/api/hello ==> gateway API diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpRequestFilter.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpRequestFilter.java index 31253b40..1bb4c43b 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpRequestFilter.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpRequestFilter.java @@ -5,6 +5,6 @@ public interface HttpRequestFilter { - void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx); + void filter(FullHttpRequest fullRequest); } diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/ReqHeadFilter.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/ReqHeadFilter.java new file mode 100644 index 00000000..dc1dc7d0 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/ReqHeadFilter.java @@ -0,0 +1,15 @@ +package io.github.kimmking.gateway.filter; + + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaders; + +public class ReqHeadFilter implements HttpRequestFilter{ + + @Override + public void filter(FullHttpRequest fullRequest) { + HttpHeaders headers = fullRequest.headers(); + headers.set("nio","wp"); + } +} diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java index 22fb2525..1d01d5a4 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java @@ -1,22 +1,48 @@ package io.github.kimmking.gateway.inbound; +import io.github.kimmking.gateway.filter.HttpRequestFilter; +import io.github.kimmking.gateway.filter.ReqHeadFilter; import io.github.kimmking.gateway.outbound.httpclient4.HttpOutboundHandler; +import io.github.kimmking.gateway.outbound.netty4.NettyHttpClient; +import io.github.kimmking.gateway.outbound.okhttp.OkhttpOutboundHandler; +import io.github.kimmking.gateway.router.RandowRouter; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpUtil; import io.netty.util.ReferenceCountUtil; +import okhttp3.OkHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.Random; + +import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + public class HttpInboundHandler extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(HttpInboundHandler.class); private final String proxyServer; - private HttpOutboundHandler handler; - +// private OkhttpOutboundHandler handler; + private NettyHttpClient handler; + private HttpRequestFilter filter; + private RandowRouter randowRouter; + public HttpInboundHandler(String proxyServer) { this.proxyServer = proxyServer; - handler = new HttpOutboundHandler(this.proxyServer); + randowRouter = new RandowRouter(); + String route = randowRouter.route(Arrays.asList(proxyServer.split(","))); +// handler = new OkhttpOutboundHandler(route); + handler = new NettyHttpClient(route); + filter = new ReqHeadFilter(); + } @Override @@ -27,16 +53,16 @@ public void channelReadComplete(ChannelHandlerContext ctx) { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { - //logger.info("channelRead流量接口请求开始,时间为{}", startTime); - FullHttpRequest fullRequest = (FullHttpRequest) msg; -// String uri = fullRequest.uri(); -// //logger.info("接收到的请求url为{}", uri); -// if (uri.contains("/test")) { -// handlerTest(fullRequest, ctx); -// } - - handler.handle(fullRequest, ctx); - + if (msg instanceof FullHttpRequest){ + FullHttpRequest fullRequest = (FullHttpRequest) msg; + if (fullRequest.uri().contains("/favicon.ico")){ + return; + } + filter.filter(fullRequest); + + handler.handle(fullRequest, ctx); + + } } catch(Exception e) { e.printStackTrace(); } finally { diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundServer.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundServer.java index 071fa9bc..b8d2f580 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundServer.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundServer.java @@ -13,16 +13,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; + public class HttpInboundServer { private static Logger logger = LoggerFactory.getLogger(HttpInboundServer.class); private int port; - + private String proxyServer; public HttpInboundServer(int port, String proxyServer) { - this.port=port; + this.port = port; this.proxyServer = proxyServer; } diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClient.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClient.java index 79aeb148..7d208d84 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClient.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClient.java @@ -1,51 +1,62 @@ -//package io.github.kimmking.gateway.outbound; -// -//import io.netty.bootstrap.Bootstrap; -//import io.netty.channel.ChannelFuture; -//import io.netty.channel.ChannelInitializer; -//import io.netty.channel.ChannelOption; -//import io.netty.channel.EventLoopGroup; -//import io.netty.channel.nio.NioEventLoopGroup; -//import io.netty.channel.socket.SocketChannel; -//import io.netty.channel.socket.nio.NioSocketChannel; -//import io.netty.handler.codec.http.HttpRequestEncoder; -//import io.netty.handler.codec.http.HttpResponseDecoder; -// -//public class NettyHttpClient { -// public void connect(String host, int port) throws Exception { -// EventLoopGroup workerGroup = new NioEventLoopGroup(); -// +package io.github.kimmking.gateway.outbound.netty4; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.*; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +import java.util.Map; + +public class NettyHttpClient { + + Bootstrap b = new Bootstrap(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + private String host; + private int port; + public NettyHttpClient(String backendUrl){ + backendUrl = backendUrl.endsWith("/")?backendUrl.substring(0,backendUrl.length()-1):backendUrl; + String[] split = backendUrl.substring(backendUrl.indexOf("://") + 3).split(":"); + host = split[0]; + port = Integer.parseInt(split[1]); +// connect(host,port); + b.group(workerGroup); + b.channel(NioSocketChannel.class); + b.option(ChannelOption.SO_KEEPALIVE, true); + } + + public void connect(String host, int port,final FullHttpRequest fullRequest, final ChannelHandlerContext ctx) throws InterruptedException { + // try { -// Bootstrap b = new Bootstrap(); -// b.group(workerGroup); -// b.channel(NioSocketChannel.class); -// b.option(ChannelOption.SO_KEEPALIVE, true); -// b.handler(new ChannelInitializer() { -// @Override -// public void initChannel(SocketChannel ch) throws Exception { -// // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码 -// ch.pipeline().addLast(new HttpResponseDecoder()); -// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码 -// ch.pipeline().addLast(new HttpRequestEncoder()); -// ch.pipeline().addLast(new HttpClientOutboundHandler()); -// } -// }); -// -// // Start the client. -// ChannelFuture f = b.connect(host, port).sync(); -// -// -// f.channel().write(request); -// f.channel().flush(); -// f.channel().closeFuture().sync(); + + b.handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码 + //客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码 + ch.pipeline().addLast(new HttpResponseDecoder()); + ch.pipeline().addLast(new HttpRequestEncoder()); + ch.pipeline().addLast(new NettyHttpClientOutboundHandler(ctx.channel())); + ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); + } + }); + // Start the client. + Channel channel = b.connect(host, port).sync().channel(); + channel.writeAndFlush(fullRequest); // } finally { // workerGroup.shutdownGracefully(); -// } // -// } -// -// public static void main(String[] args) throws Exception { -// NettyHttpClient client = new NettyHttpClient(); -// client.connect("127.0.0.1", 8844); -// } -//} \ No newline at end of file +// } + + } + + public void handle(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx) throws Exception { + System.out.println(fullRequest.headers().get("nio")); + + connect(host,port,fullRequest,ctx); + } +} \ No newline at end of file diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClientOutboundHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClientOutboundHandler.java index 6730cd5a..a78ffa3d 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClientOutboundHandler.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClientOutboundHandler.java @@ -1,22 +1,30 @@ package io.github.kimmking.gateway.outbound.netty4; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.util.CharsetUtil; + +import java.nio.charset.Charset; public class NettyHttpClientOutboundHandler extends ChannelInboundHandlerAdapter { - + + private Channel serverChannel; + + public NettyHttpClientOutboundHandler(Channel serverChannel){ + this.serverChannel = serverChannel; + } + + @Override - public void channelActive(ChannelHandlerContext ctx) - throws Exception { - - + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - - - + public void channelRead(ChannelHandlerContext ctx, Object msg){ + System.out.println("client channelRead.."+msg); + serverChannel.writeAndFlush(msg); } } \ No newline at end of file diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/okhttp/HttpUtils.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/okhttp/HttpUtils.java new file mode 100644 index 00000000..6c1d06ef --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/okhttp/HttpUtils.java @@ -0,0 +1,44 @@ +package io.github.kimmking.gateway.outbound.okhttp; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Objects; + +/** + * @Description: HttpClientUtils + * @Date: 2020/10/28 18:04 + * @Author: wp + **/ +public class HttpUtils { + + public static final String url ="http://localhost:8088"; + public static final int success = 200; + + public static String callGet(String url) throws IOException { + OkHttpClient httpClient = new OkHttpClient(); + Request request = new Request.Builder().url(url).build(); + Response response = httpClient.newCall(request).execute(); + if (response.isSuccessful()){ + return Objects.requireNonNull(response.body()).string(); + } + return null; + } + + public static void main(String[] args) throws IOException, URISyntaxException { + String response = HttpUtils.callGet(url); + if (response == null) { + throw new RuntimeException("调用失败"); + } + System.out.println(response); + } +} diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/okhttp/OkhttpOutboundHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/okhttp/OkhttpOutboundHandler.java index 5f194588..415bb088 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/okhttp/OkhttpOutboundHandler.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/okhttp/OkhttpOutboundHandler.java @@ -1,4 +1,108 @@ package io.github.kimmking.gateway.outbound.okhttp; +import io.github.kimmking.gateway.outbound.httpclient4.NamedThreadFactory; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.*; +import okhttp3.*; +import org.apache.http.Header; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.*; + +import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + public class OkhttpOutboundHandler { + +// private ExecutorService proxyService; + private String backendUrl; + private OkHttpClient okHttpClient; + + public OkhttpOutboundHandler(String backendUrl){ + this.backendUrl = backendUrl.endsWith("/")?backendUrl.substring(0,backendUrl.length()-1):backendUrl; +// int cores = Runtime.getRuntime().availableProcessors() * 2; +// long keepAliveTime = 1000; +// int queueSize = 2048; +// RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();//.DiscardPolicy(); +// proxyService = new ThreadPoolExecutor(cores, cores, +// keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize), +// new NamedThreadFactory("proxyService"), handler); + okHttpClient = new OkHttpClient().newBuilder() + .connectTimeout(5,TimeUnit.SECONDS) + .readTimeout(5, TimeUnit.SECONDS) + .writeTimeout(5,TimeUnit.SECONDS) + .build(); + } + + public void handle(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx) { + final String url = this.backendUrl + fullRequest.uri(); + fetchGet(fullRequest, ctx, url); +// proxyService.submit(()->fetchGet(fullRequest, ctx, url)); + } + + private void fetchGet(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx, final String url) { + String resStr = "Request fail!!!"; + try { + Request.Builder header = new Request.Builder().get().url(url); + for (Map.Entry head : fullRequest.headers()) { + header.addHeader(head.getKey(),head.getValue()); + } + Request request = header.build(); + System.out.println("nio => "+request.header("nio")); + Response response = okHttpClient.newCall(request).execute(); + if (response.isSuccessful()){ + resStr = Objects.requireNonNull(response.body()).string(); + }else { + resStr = "Request error!!!"; + } + + } catch (IOException e) { + e.printStackTrace(); + } + + handleResponse(fullRequest,ctx, resStr); + } + + private void handleResponse(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx, final String resStr){ + FullHttpResponse response = null; + try { + + response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(resStr.getBytes())); + response.headers().set("Content-Type", "application/json"); + response.headers().setInt("Content-Length", resStr.getBytes().length); + + } catch (Exception e) { + e.printStackTrace(); + response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT); + exceptionCaught(ctx, e); + } finally { + if (fullRequest != null) { + if (!HttpUtil.isKeepAlive(fullRequest)) { + ctx.write(response).addListener(ChannelFutureListener.CLOSE); + } else { + //response.headers().set(CONNECTION, KEEP_ALIVE); + ctx.write(response); + } + } + ctx.flush(); + //ctx.close(); + } + + } + + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } } diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/RandowRouter.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/RandowRouter.java new file mode 100644 index 00000000..d0723a74 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/RandowRouter.java @@ -0,0 +1,21 @@ +package io.github.kimmking.gateway.router; + +import java.util.List; +import java.util.Random; + +public class RandowRouter implements HttpEndpointRouter{ + + private Random random; + + public RandowRouter(){ + random = new Random(); + } + + @Override + public String route(List endpoints) { + if (endpoints == null || endpoints.size() == 0) { + throw new IllegalArgumentException(String.format("Illegal argument endpoints:%s",endpoints)); + } + return endpoints.get(random.nextInt(endpoints.size())); + } +} diff --git a/03concurrency/0301/src/main/java/java0/conc0301/DaemonThread.java b/03concurrency/0301/src/main/java/java0/conc0301/DaemonThread.java index 5a00ab24..50beb0b0 100644 --- a/03concurrency/0301/src/main/java/java0/conc0301/DaemonThread.java +++ b/03concurrency/0301/src/main/java/java0/conc0301/DaemonThread.java @@ -2,7 +2,7 @@ public class DaemonThread { - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { Runnable task = new Runnable() { @Override public void run() { @@ -15,9 +15,10 @@ public void run() { System.out.println("当前线程:" + t.getName()); } }; + Thread.sleep(10000); Thread thread = new Thread(task); thread.setName("test-thread-1"); - thread.setDaemon(false); + thread.setDaemon(true); thread.start(); } diff --git a/03concurrency/0301/src/main/java/java0/conc0301/sync/Counter.java b/03concurrency/0301/src/main/java/java0/conc0301/sync/Counter.java index e60845a6..68d869f6 100644 --- a/03concurrency/0301/src/main/java/java0/conc0301/sync/Counter.java +++ b/03concurrency/0301/src/main/java/java0/conc0301/sync/Counter.java @@ -7,7 +7,7 @@ public class Counter { public static int B=10; private volatile int sum = 0; - public synchronized void incr() { + public void incr() { sum=sum+1; } public int getSum() { diff --git a/03concurrency/0301/src/main/java/java0/conc0301/sync/Thread3.java b/03concurrency/0301/src/main/java/java0/conc0301/sync/Thread3.java index f9d5d742..0c027e27 100644 --- a/03concurrency/0301/src/main/java/java0/conc0301/sync/Thread3.java +++ b/03concurrency/0301/src/main/java/java0/conc0301/sync/Thread3.java @@ -33,7 +33,9 @@ private void m4t1(Inner inner) { } private void m4t2(Inner inner) { - inner.m4t2(); + synchronized (inner) { + inner.m4t2(); + } } public static void main(String[] args) { diff --git a/03concurrency/0301/src/main/java/java0/conc0302/atomic/AtomicMain.java b/03concurrency/0301/src/main/java/java0/conc0302/atomic/AtomicMain.java index de251e57..ab62dc50 100644 --- a/03concurrency/0301/src/main/java/java0/conc0302/atomic/AtomicMain.java +++ b/03concurrency/0301/src/main/java/java0/conc0302/atomic/AtomicMain.java @@ -4,7 +4,7 @@ public class AtomicMain { public static void main(String[] args) { - final AtomicCount count = new AtomicCount(); + final SyncCount count = new SyncCount(); for (int i = 0; i < 100; i++) { new Thread(new Runnable() { @Override diff --git a/03concurrency/0301/src/main/java/java0/conc0302/lock/ConditionDemo.java b/03concurrency/0301/src/main/java/java0/conc0302/lock/ConditionDemo.java index 33ad0bab..9ab342d5 100644 --- a/03concurrency/0301/src/main/java/java0/conc0302/lock/ConditionDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0302/lock/ConditionDemo.java @@ -6,40 +6,70 @@ class ConditionDemo { final Lock lock = new ReentrantLock(); - final Condition notFull = lock.newCondition(); + final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); - + final Object[] items = new Object[20]; - int putptr, takeptr, count; - + int putptr = 0, takeptr = 0, count = 0; + public void put(Object x) throws InterruptedException { lock.lock(); try { // 当count等于数组的大小时,当前线程等待,直到notFull通知,再进行生产 - while (count == items.length) - notFull.await(); - items[putptr] = x; - if (++putptr == items.length) putptr = 0; - ++count; - notEmpty.signal(); + while (count < items.length) { + items[putptr] = x; + System.out.println(Thread.currentThread().getName() + " product " + putptr++); + ++count; + notEmpty.signal(); + + } + putptr = 0; + System.out.println("货仓已满,等待消费"); + notFull.await(); } finally { lock.unlock(); } } - + public Object take() throws InterruptedException { lock.lock(); try { // 当count为0,进入等待,直到notEmpty通知,进行消费。 - while (count == 0) - notEmpty.await(); - Object x = items[takeptr]; - if (++takeptr == items.length) takeptr = 0; - --count; - notFull.signal(); + Object x = new Object(); + while (count > 0) { + x = items[takeptr]; + System.out.println(Thread.currentThread().getName() + " consume " + takeptr++); + --count; + notFull.signal(); + + } + takeptr = 0; + System.out.println("货仓已空,等待生产"); + notEmpty.await(); return x; } finally { lock.unlock(); } } + + public static void main(String[] args) throws InterruptedException { + ConditionDemo conditionDemo = new ConditionDemo(); + new Thread(() -> { + try { + conditionDemo.put(new Object()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }, "t1").start(); + Thread.sleep(1000); + new Thread(() -> { + try { + conditionDemo.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }, "t2").start(); + Thread.sleep(1000); + Thread.currentThread().getThreadGroup().list(); + } } \ No newline at end of file diff --git a/03concurrency/0301/src/main/java/java0/conc0302/lock/LockSupportDemo.java b/03concurrency/0301/src/main/java/java0/conc0302/lock/LockSupportDemo.java index 18f598f1..6ab2b3de 100644 --- a/03concurrency/0301/src/main/java/java0/conc0302/lock/LockSupportDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0302/lock/LockSupportDemo.java @@ -30,7 +30,7 @@ public static void main(String[] args) throws InterruptedException { t2.start(); Thread.sleep(3000L); t1.interrupt(); - LockSupport.unpark(t2); + LockSupport.unpark(t1); t1.join(); t2.join(); } diff --git a/03concurrency/0301/src/main/java/java0/conc0302/threadpool/ExecutorServiceDemo.java b/03concurrency/0301/src/main/java/java0/conc0302/threadpool/ExecutorServiceDemo.java index 664b6942..f439d4b7 100644 --- a/03concurrency/0301/src/main/java/java0/conc0302/threadpool/ExecutorServiceDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0302/threadpool/ExecutorServiceDemo.java @@ -18,6 +18,8 @@ public String call() throws Exception { }).get(); System.out.println("str=" + str); + Thread.currentThread().getThreadGroup().list(); + executorService.shutdownNow(); } catch (Exception e) { e.printStackTrace(); } diff --git a/03concurrency/0301/src/main/java/java0/conc0303/collection/ConcurrentHashMapDemo.java b/03concurrency/0301/src/main/java/java0/conc0303/collection/ConcurrentHashMapDemo.java index 1ec7f12b..6fe10cdb 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/collection/ConcurrentHashMapDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/collection/ConcurrentHashMapDemo.java @@ -21,9 +21,11 @@ public void run() { for (int i = 0; i < 5; i++) { oldValue = count.get("a"); if (null == oldValue) { + System.out.println(Thread.currentThread().getName()+"第一个null:"+i); AtomicInteger zeroValue = new AtomicInteger(0); oldValue = count.putIfAbsent("a", zeroValue); if (null == oldValue) { + System.out.println(Thread.currentThread().getName()+"第二个null:"+i); oldValue = zeroValue; } } diff --git a/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo2.java b/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo2.java index 94ae2427..89a1b7ca 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo2.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo2.java @@ -10,7 +10,7 @@ public static void main(String[] args) { // 这个例子再次证明, // 多个步骤的操作,不能保证原子性 // list.size() 获取到的数,再继续用list时,可能已经变了 - // + // test(); } public static void test(){ diff --git a/03concurrency/0301/src/main/java/java0/conc0303/future/CompletableFutureDemo.java b/03concurrency/0301/src/main/java/java0/conc0303/future/CompletableFutureDemo.java index c5d1e900..9b4e8477 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/future/CompletableFutureDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/future/CompletableFutureDemo.java @@ -7,6 +7,7 @@ public class CompletableFutureDemo { public static void main(String[] args){ // 1.变换结果 + System.out.println(System.setProperty("user.dir","默认值")); System.out.println("=====>1.变换结果"); String result1 = CompletableFuture.supplyAsync(()->{return "Hello ";}).thenApplyAsync(v -> v + "world").join(); System.out.println(result1); diff --git a/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelDemo.java b/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelDemo.java index aaf9567d..4eecdddb 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelDemo.java @@ -28,6 +28,7 @@ public static void main(String[] args) { } }); System.out.println("blockingQueue:" + blockingQueue.toString()); + System.out.println("blockingQueueSize:" + blockingQueue.size()); } diff --git a/03concurrency/0301/src/main/java/java0/conc0303/tool/CyclicBarrierDemo2.java b/03concurrency/0301/src/main/java/java0/conc0303/tool/CyclicBarrierDemo2.java index 33e88260..947b4bea 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/tool/CyclicBarrierDemo2.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/tool/CyclicBarrierDemo2.java @@ -6,7 +6,12 @@ public class CyclicBarrierDemo2 { public static void main(String[] args) { int N = 4; - CyclicBarrier barrier = new CyclicBarrier(N); + CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() { + @Override + public void run() { + System.out.println(Thread.currentThread().getName() + "已完成一个周期循环"); + } + }); for(int i=0;i { try { - semaphore.acquire(3); // 获取全部许可,退化成串行执行 + semaphore.acquire(); // 获取全部许可,退化成串行执行 test(threadNum); - semaphore.release(3); // 释放多个许可 + semaphore.release(); // 释放多个许可 } catch (Exception e) { e.printStackTrace(); } diff --git a/03concurrency/0301/src/main/java/java0/conc0303/tool/SemaphoreDemo3.java b/03concurrency/0301/src/main/java/java0/conc0303/tool/SemaphoreDemo3.java index 3a02dc78..09592f65 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/tool/SemaphoreDemo3.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/tool/SemaphoreDemo3.java @@ -29,7 +29,7 @@ public void run() { while (true) { try { buffer.put(n); - System.out.println(">" + n); + System.out.println("Producer>" + n); // 速度较快。休息10毫秒 Thread.sleep(10); } catch (InterruptedException e) { @@ -45,7 +45,7 @@ static class Consumer implements Runnable { public void run() { while (true) { try { - System.out.println("<" + buffer.take()); + System.out.println(" { + e=2; + System.out.println("foreach作用域内"+list.get(0)); x=2; - //y=2; // can't be compiled +// y=2; + // can't be compiled }); + System.out.println("foreach作用域外"+list.get(0)); } } diff --git a/04fx/java8/src/main/java/io/kimmking/java8/GenericDemo.java b/04fx/java8/src/main/java/io/kimmking/java8/GenericDemo.java index d6851736..3affcbb8 100644 --- a/04fx/java8/src/main/java/io/kimmking/java8/GenericDemo.java +++ b/04fx/java8/src/main/java/io/kimmking/java8/GenericDemo.java @@ -1,9 +1,18 @@ package io.kimmking.java8; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; + import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; public class GenericDemo { + + static EventBus eventBus = new EventBus("test"); + static { + eventBus.register(new A()); + } + public static void main(String[] args) { Demo demo = new Demo(); Class clazz = demo.getClass(); @@ -27,4 +36,11 @@ public static class Person { public static class Demo extends Person { } + + @Subscribe + public void handle(GuavaDemo.AEvent ae){ + System.out.println(this.getClass().toString() + ",msg:" + ae + " is running."); + ae.setState(1); + eventBus.post(ae); + } } diff --git a/04fx/java8/src/main/java/io/kimmking/java8/GuavaDemo.java b/04fx/java8/src/main/java/io/kimmking/java8/GuavaDemo.java index a2540278..7e5aee20 100644 --- a/04fx/java8/src/main/java/io/kimmking/java8/GuavaDemo.java +++ b/04fx/java8/src/main/java/io/kimmking/java8/GuavaDemo.java @@ -12,6 +12,7 @@ import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.SneakyThrows; @@ -24,6 +25,7 @@ public class GuavaDemo { static EventBus bus = new EventBus(); static { bus.register(new GuavaDemo()); + bus.register(new GenericDemo()); } @@ -46,10 +48,14 @@ private static void testEventBus() { // EventBus // SPI+service loader // Callback/Listener - // + // 观察者 责任链模式 Student student2 = new Student(2, "KK02"); System.out.println("I want " + student2 + " run now."); bus.post(new AEvent(student2)); + + Student student1 = new Student(1, "KK01"); + System.out.println("I want " + student1 + " run now."); + bus.post(new AEvent(student1)); } private static void testBiMap(List lists) { @@ -105,15 +111,27 @@ private static void print(Object obj) { @Data @AllArgsConstructor + @Builder public static class AEvent{ - private Student student; + Student student; + int state = 0; + + public AEvent(Student student){ + this.student = student; + } + } - + + @Subscribe + public void handle1(AEvent ae){ + System.out.println("handle1 is running."); + } + @Subscribe public void handle(AEvent ae){ - System.out.println(ae.student + " is running."); + System.out.println(this.getClass().toString() + ",msg:" +ae + " is running."); } - + } diff --git a/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java b/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java index f1dffef0..8a984571 100644 --- a/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java +++ b/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java @@ -71,7 +71,7 @@ interface GreetingService { void sayMessage(String message); } - private T operate(int a, int b, MathOperation mathOperation){ + private T operate(int a, int b, MathOperation mathOperation){ return mathOperation.operation(a, b); } diff --git a/04fx/spring01/src/main/java/io/kimmking/spring01/GuavaDemo.java b/04fx/spring01/src/main/java/io/kimmking/spring01/GuavaDemo.java index 98ef1c51..576de7f1 100644 --- a/04fx/spring01/src/main/java/io/kimmking/spring01/GuavaDemo.java +++ b/04fx/spring01/src/main/java/io/kimmking/spring01/GuavaDemo.java @@ -48,7 +48,7 @@ public static void main(String[] args) throws IOException { // 更强的集合操作 // 简化 创建 - List list = Lists.newArrayList(4,2,3,5,1,2,2,7,6); + List list = Lists.newArrayList(4,2,3,5,1,2,2,7,6,9); List> list1 = Lists.partition(list,3); print(list1); diff --git a/04fx/spring01/src/main/resources/applicationContext.xml b/04fx/spring01/src/main/resources/applicationContext.xml index 681cf409..89755bd3 100644 --- a/04fx/spring01/src/main/resources/applicationContext.xml +++ b/04fx/spring01/src/main/resources/applicationContext.xml @@ -6,7 +6,9 @@ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context - http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/aop https://www.springframework.org/schema/aop/spring-aop.xsd"> + http://www.springframework.org/schema/context/spring-context-3.2.xsd + http://www.springframework.org/schema/aop + https://www.springframework.org/schema/aop/spring-aop.xsd"> diff --git a/04fx/spring01/src/main/resources/springjms-receiver.xml b/04fx/spring01/src/main/resources/springjms-receiver.xml index e0d9fbc6..c0408832 100644 --- a/04fx/spring01/src/main/resources/springjms-receiver.xml +++ b/04fx/spring01/src/main/resources/springjms-receiver.xml @@ -14,7 +14,7 @@ - + diff --git a/04fx/spring01/src/main/resources/springjms-sender.xml b/04fx/spring01/src/main/resources/springjms-sender.xml index 119ec07a..a443cd17 100644 --- a/04fx/spring01/src/main/resources/springjms-sender.xml +++ b/04fx/spring01/src/main/resources/springjms-sender.xml @@ -11,7 +11,7 @@ - + diff --git a/04fx/springboot01/src/main/java/io/kimmking/springboot01/jms/BeanConfig.java b/04fx/springboot01/src/main/java/io/kimmking/springboot01/jms/BeanConfig.java index fd19a424..0bed8a68 100644 --- a/04fx/springboot01/src/main/java/io/kimmking/springboot01/jms/BeanConfig.java +++ b/04fx/springboot01/src/main/java/io/kimmking/springboot01/jms/BeanConfig.java @@ -23,7 +23,7 @@ public class BeanConfig { @Value("${spring.activemq.user}") private String username; - @Value("${spring.activemq.topic-name}") + @Value("${spring.activemq.password}") private String password; @Value("${spring.activemq.queue-name}") diff --git a/04fx/springboot01/src/main/java/io/kimmking/springboot01/jms/ProducerController.java b/04fx/springboot01/src/main/java/io/kimmking/springboot01/jms/ProducerController.java index 92894be3..20e932b9 100644 --- a/04fx/springboot01/src/main/java/io/kimmking/springboot01/jms/ProducerController.java +++ b/04fx/springboot01/src/main/java/io/kimmking/springboot01/jms/ProducerController.java @@ -22,7 +22,7 @@ public class ProducerController private Topic topic; - // curl http://localhost:8080/queue/test -X POST -d "testququuquqq" + // curl http://192.168.100.117:8080/queue/test -X POST -d "testququuquqq" @PostMapping("/queue/test") public String sendQueue(@RequestBody String str) { this.sendMessage(this.queue, str); @@ -30,7 +30,7 @@ public String sendQueue(@RequestBody String str) { } - //curl http://localhost:8080/topic/test -X POST -d "testtopiccccc" + //curl http://192.168.100.117:8080/topic/test -X POST -d "testtopiccccc" @PostMapping("/topic/test") public String sendTopic(@RequestBody String str) { this.sendMessage(this.topic, str); diff --git a/04fx/springboot01/src/main/resources/application.yml b/04fx/springboot01/src/main/resources/application.yml index fa5370a3..f9233d38 100644 --- a/04fx/springboot01/src/main/resources/application.yml +++ b/04fx/springboot01/src/main/resources/application.yml @@ -3,7 +3,7 @@ server: spring: activemq: - broker-url: tcp://127.0.0.1:61616 + broker-url: tcp://192.168.199.117:61616 user: admin password: admin close-timeout: 15s # 在考虑结束之前等待的时间