diff --git a/01jvm/README.md b/01jvm/README.md
index fc2d2818..51c885e3 100644
--- a/01jvm/README.md
+++ b/01jvm/README.md
@@ -31,7 +31,7 @@
2. 编译代码, 执行命令: `javac -g HelloNum.java`
3. 查看反编译的代码。
- 3.1 可以安装并使用idea的jclasslib插件, 选中 [HelloNum.java](./HelloNum.java) 文件, 选择 `View --> Show Bytecode With jclasslib` 即可。
- - 3.2 或者直接通过命令行工具 javap, 执行命令: `javap -v HelloNum.class`
+ - 3.2 或者直接通过命令行工具 javap, 执行命令: `javap -c -v -p -l HelloNum.class`
4. 分析相关的字节码。【此步骤需要各位同学自己进行分析】
diff --git a/01jvm/jvm/HelloClassLoader.java b/01jvm/jvm/HelloClassLoader.java
index ab080620..51d906f1 100644
--- a/01jvm/jvm/HelloClassLoader.java
+++ b/01jvm/jvm/HelloClassLoader.java
@@ -6,12 +6,12 @@ public class HelloClassLoader extends ClassLoader {
public static void main(String[] args) throws Exception {
- new HelloClassLoader().findClass("jvm.lib.Hello").newInstance();
+ new HelloClassLoader().findClass("lib.Hello").newInstance();
}
@Override
protected Class> findClass(String name) throws ClassNotFoundException {
- String helloBase64 = "yv66vgAAADQAHAoABgAOCQAPABAIABEKABIAEwcAFAcAFQEABjxpbml0PgEAAygpVgEABENvZGUBAA9MaW5lTnVtYmVyVGFibGUBAAg8Y2xpbml0PgEAClNvdXJjZUZpbGUBAApIZWxsby5qYXZhDAAHAAgHABYMABcAGAEAGEhlbGxvIENsYXNzIEluaXRpYWxpemVkIQcAGQwAGgAbAQAJanZtL0hlbGxvAQAQamF2YS9sYW5nL09iamVjdAEAEGphdmEvbGFuZy9TeXN0ZW0BAANvdXQBABVMamF2YS9pby9QcmludFN0cmVhbTsBABNqYXZhL2lvL1ByaW50U3RyZWFtAQAHcHJpbnRsbgEAFShMamF2YS9sYW5nL1N0cmluZzspVgAhAAUABgAAAAAAAgABAAcACAABAAkAAAAdAAEAAQAAAAUqtwABsQAAAAEACgAAAAYAAQAAAAMACAALAAgAAQAJAAAAJQACAAAAAAAJsgACEgO2AASxAAAAAQAKAAAACgACAAAABQAIAAYAAQAMAAAAAgAN";
+ String helloBase64 = "yv66vgAAADQAHwoABwAQCQARABIIABMKABQAFQgAFgcAFwcAGAEABjxpbml0PgEAAygpVgEABENvZGUBAA9MaW5lTnVtYmVyVGFibGUBAAVoZWxsbwEACDxjbGluaXQ+AQAKU291cmNlRmlsZQEACkhlbGxvLmphdmEMAAgACQcAGQwAGgAbAQAdSGVsbG8gY2xhc3Mgc2F5IGhlbGxvIG1ldGhvZC4HABwMAB0AHgEAGEhlbGxvIENsYXNzIEluaXRpYWxpemVkIQEACWxpYi9IZWxsbwEAEGphdmEvbGFuZy9PYmplY3QBABBqYXZhL2xhbmcvU3lzdGVtAQADb3V0AQAVTGphdmEvaW8vUHJpbnRTdHJlYW07AQATamF2YS9pby9QcmludFN0cmVhbQEAB3ByaW50bG4BABUoTGphdmEvbGFuZy9TdHJpbmc7KVYAIQAGAAcAAAAAAAMAAQAIAAkAAQAKAAAAHQABAAEAAAAFKrcAAbEAAAABAAsAAAAGAAEAAAADAAEADAAJAAEACgAAACUAAgABAAAACbIAAhIDtgAEsQAAAAEACwAAAAoAAgAAAAgACAAJAAgADQAJAAEACgAAACUAAgAAAAAACbIAAhIFtgAEsQAAAAEACwAAAAoAAgAAAAUACAAGAAEADgAAAAIADw==+AQAKU291cmNlRmlsZQEACkhlbGxvLmphdmEMAAgACQcAGQwAGgAbAQAdSGVsbG8gY2xhc3Mgc2F5IGhlbGxvIG1ldGhvZC4HABwMAB0AHgEAGEhlbGxvIENsYXNzIEluaXRpYWxpemVkIQEACWxpYi9IZWxsbwEAEGphdmEvbGFuZy9PYmplY3QBABBqYXZhL2xhbmcvU3lzdGVtAQADb3V0AQAVTGphdmEvaW8vUHJpbnRTdHJlYW07AQATamF2YS9pby9QcmludFN0cmVhbQEAB3ByaW50bG4BABUoTGphdmEvbGFuZy9TdHJpbmc7KVYAIQAGAAcAAAAAAAMAAQAIAAkAAQAKAAAAHQABAAEAAAAFKrcAAbEAAAABAAsAAAAGAAEAAAADAAEADAAJAAEACgAAACUAAgABAAAACbIAAhIDtgAEsQAAAAEACwAAAAoAAgAAAAgACAAJAAgADQAJAAEACgAAACUAAgAAAAAACbIAAhIFtgAEsQAAAAEACwAAAAoAAgAAAAUACAAGAAEADgAAAAIADw==";
byte[] bytes = decode(helloBase64);
return defineClass(name,bytes,0,bytes.length);
}
diff --git a/01jvm/lib/Hello.java b/01jvm/lib/Hello.java
index 79a24651..5d951d97 100644
--- a/01jvm/lib/Hello.java
+++ b/01jvm/lib/Hello.java
@@ -1,11 +1,12 @@
-//package lib;
-//
-//public class Hello {
-// static {
-// System.out.println("Hello Class Initialized!");
-// }
-// public void hello() {
-// System.out.println("Hello class say hello method.");
-// }
-//
-//}
\ No newline at end of file
+package lib;
+
+public class Hello {
+ static {
+ System.out.println("Hello Class Initialized!");
+ }
+ public void hello() {
+ System.out.println("Hello class say hello method.");
+ System.gc(); // JMX MBean server
+ }
+
+}
\ No newline at end of file
diff --git a/01jvm/out/production/01jvm/README.md b/01jvm/out/production/01jvm/README.md
index fc2d2818..51c885e3 100644
--- a/01jvm/out/production/01jvm/README.md
+++ b/01jvm/out/production/01jvm/README.md
@@ -31,7 +31,7 @@
2. 编译代码, 执行命令: `javac -g HelloNum.java`
3. 查看反编译的代码。
- 3.1 可以安装并使用idea的jclasslib插件, 选中 [HelloNum.java](./HelloNum.java) 文件, 选择 `View --> Show Bytecode With jclasslib` 即可。
- - 3.2 或者直接通过命令行工具 javap, 执行命令: `javap -v HelloNum.class`
+ - 3.2 或者直接通过命令行工具 javap, 执行命令: `javap -c -v -p -l HelloNum.class`
4. 分析相关的字节码。【此步骤需要各位同学自己进行分析】
diff --git a/02nio/nio02/pom.xml b/02nio/nio02/pom.xml
index 005de90a..c10848f8 100644
--- a/02nio/nio02/pom.xml
+++ b/02nio/nio02/pom.xml
@@ -29,7 +29,7 @@
io.netty
netty-all
- 4.1.45.Final
+ 4.1.104.Final
diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/NettyServerApplication.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/NettyServerApplication.java
index e67b7961..1adda64a 100644
--- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/NettyServerApplication.java
+++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/NettyServerApplication.java
@@ -2,7 +2,10 @@
import io.github.kimmking.gateway.inbound.HttpInboundServer;
+import io.netty.util.internal.PlatformDependent;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
import java.util.Arrays;
public class NettyServerApplication {
@@ -12,6 +15,13 @@ public class NettyServerApplication {
public static void main(String[] args) {
+// sun.misc.Unsafe unsafe = sun.misc.Unsafe.getUnsafe();
+// System.out.println(unsafe.addressSize());
+
+
+ System.out.println("PlatformDependent.hasUnsafe = " + PlatformDependent.javaVersion());
+ System.out.println("PlatformDependent.hasUnsafe = " + PlatformDependent.hasUnsafe());
+
String proxyPort = System.getProperty("proxyPort","8888");
// 这是之前的单个后端url的例子
diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HeaderHttpResponseFilter.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HeaderHttpResponseFilter.java
index 53493fb4..12fe310a 100644
--- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HeaderHttpResponseFilter.java
+++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HeaderHttpResponseFilter.java
@@ -1,10 +1,24 @@
package io.github.kimmking.gateway.filter;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
public class HeaderHttpResponseFilter implements HttpResponseFilter {
@Override
public void filter(FullHttpResponse response) {
response.headers().set("kk", "java-1-nio");
+ response.setStatus(HttpResponseStatus.CREATED);
+// byte[] array = response.content().array();
+// String content = new String(array);
+// System.out.println(content);
+// content = content + ",kimmking";
+ byte[] bytes = "hello,kimm.".getBytes();
+ //response.headers().setInt("Content-Length", bytes.length);
+ ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
+ ByteBuf content = response.content();
+ content.clear();
+ content.writeBytes(byteBuf);
}
}
diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java
index 69b40fde..1cd13ca2 100644
--- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java
+++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java
@@ -39,9 +39,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
// if (uri.contains("/test")) {
// handlerTest(fullRequest, ctx);
// }
-
- handler.handle(fullRequest, ctx, filter);
-
+
+ String uri = fullRequest.getUri();
+ System.out.println(" uri ==>> " + uri);
+ if(uri.contains("/netty/info")) {
+ NettyInfoHandler.INSTANCE.handle(fullRequest, ctx);
+ } else {
+ handler.handle(fullRequest, ctx, filter);
+ }
} catch(Exception e) {
e.printStackTrace();
} finally {
diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/NettyInfoHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/NettyInfoHandler.java
new file mode 100644
index 00000000..f179cc82
--- /dev/null
+++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/NettyInfoHandler.java
@@ -0,0 +1,83 @@
+package io.github.kimmking.gateway.inbound;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.util.internal.PlatformDependent;
+import lombok.SneakyThrows;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/5/31 下午7:13
+ */
+public class NettyInfoHandler {
+
+ public final static NettyInfoHandler INSTANCE = new NettyInfoHandler();
+
+ public void handle(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx) {
+ System.out.println("NettyInfoHandler.handle...");
+ Map infos = new HashMap<>();
+ infos.put("netty.usedDirectMemory", ""+getNettyUsedDirectMemory());
+ infos.put("netty.directMemoryLimit", ""+getNettyDirectMemoryLimit());
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ infos.forEach((k, v) -> {
+ sb.append("\"").append(k).append("\"")
+ .append(":")
+ .append("\"").append(v).append("\"").append(",");
+ });
+ if(sb.length()>1) {
+ sb.deleteCharAt(sb.length()-1);
+ }
+ sb.append("}");
+
+ byte[] body = ("{\"code\":200,\"msg\":\"success\",\"data\":" + sb +"}").getBytes();
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(body));
+
+ response.headers().set("Content-Type", "application/json");
+ response.headers().setInt("Content-Length", body.length);
+ response.headers().set("kk.gw.hanlder", "netty.info");
+
+ if (fullRequest != null) {
+ if (!HttpUtil.isKeepAlive(fullRequest)) {
+ ctx.write(response).addListener(ChannelFutureListener.CLOSE);
+ } else {
+ //response.headers().set(CONNECTION, KEEP_ALIVE);
+ ctx.write(response);
+ }
+ }
+ ctx.flush();
+ //ctx.close();
+
+ }
+
+ @SneakyThrows
+ private static long getNettyUsedDirectMemory() {
+ Field field = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER");
+ field.setAccessible(true);
+ AtomicLong o = (AtomicLong)field.get(null);
+ return o.get();
+ }
+
+ @SneakyThrows
+ private static long getNettyDirectMemoryLimit() {
+ Field field = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_LIMIT");
+ field.setAccessible(true);
+ return (Long)field.get(null);
+ }
+
+}
diff --git a/04fx/spring01/pom.xml b/04fx/spring01/pom.xml
index 505009a7..3075aaf2 100644
--- a/04fx/spring01/pom.xml
+++ b/04fx/spring01/pom.xml
@@ -10,7 +10,7 @@
- 4.3.29.RELEASE
+ 4.3.30.RELEASE
@@ -19,8 +19,8 @@
org.apache.maven.plugins
maven-compiler-plugin
- 8
- 8
+ 11
+ 11
diff --git a/04fx/spring01/src/main/java/io/kimmking/spring02/SpringDemo11.java b/04fx/spring01/src/main/java/io/kimmking/spring02/SpringDemo11.java
new file mode 100644
index 00000000..bcbc7e30
--- /dev/null
+++ b/04fx/spring01/src/main/java/io/kimmking/spring02/SpringDemo11.java
@@ -0,0 +1,49 @@
+package io.kimmking.spring02;
+
+import org.springframework.cglib.proxy.Enhancer;
+import org.springframework.cglib.proxy.MethodInterceptor;
+import org.springframework.cglib.proxy.MethodProxy;
+
+import java.lang.reflect.Method;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/1/22 18:01
+ */
+public class SpringDemo11 {
+
+ public static void main(String[] args) {
+ long s = System.currentTimeMillis();
+ Enhancer enhancer = new Enhancer();
+ enhancer.setInterfaces(new Class[]{IAction.class});
+ enhancer.setCallback(new MI());
+ enhancer.setUseCache(true);
+ IAction demo = (IAction) enhancer.create();
+ for (int i = 0; i < 5; i++) {
+ long ss = System.currentTimeMillis();
+ System.out.println(demo.action());
+ System.out.println( i + " *****====> invoke proxy " + (System.currentTimeMillis() - ss) + " ms");
+ }
+ System.out.println(" *****====> enhancer proxy " + (System.currentTimeMillis() - s) + " ms");
+
+ }
+
+ public interface IAction {
+ Object action();
+ }
+
+
+ static class MI implements MethodInterceptor {
+ @Override
+ public Object intercept(Object obj, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
+ long s = System.currentTimeMillis();
+ System.out.println(" *****==MI==> " + s + " " +"Before:"+method.getName());
+ Object result = "S-" + s;//methodProxy.invokeSuper(obj, objects);
+ System.out.println(" *****==MI==> " + (System.currentTimeMillis() - s) + " ms After:"+method.getName());
+ return result;
+ }
+ }
+
+}
diff --git a/06db/shardingsphere/init.sql b/06db/shardingsphere/init.sql
index 7c1adb0c..9b930268 100644
--- a/06db/shardingsphere/init.sql
+++ b/06db/shardingsphere/init.sql
@@ -29,3 +29,9 @@ CREATE TABLE IF NOT EXISTS demo_ds_0.t_order_item_1 (order_item_id BIGINT NOT NU
CREATE TABLE IF NOT EXISTS demo_ds_1.t_order_item_0 (order_item_id BIGINT NOT NULL AUTO_INCREMENT, order_id BIGINT NOT NULL, user_id INT NOT NULL, status VARCHAR(50), PRIMARY KEY (order_item_id));
CREATE TABLE IF NOT EXISTS demo_ds_1.t_order_item_1 (order_item_id BIGINT NOT NULL AUTO_INCREMENT, order_id BIGINT NOT NULL, user_id INT NOT NULL, status VARCHAR(50), PRIMARY KEY (order_item_id));
+
+
+
+
+# CREATE TABLE t_order (order_id BIGINT NOT NULL AUTO_INCREMENT, user_id INT NOT NULL, status VARCHAR(50), PRIMARY KEY (order_id));
+# CREATE TABLE t_order_item (order_item_id BIGINT NOT NULL AUTO_INCREMENT, order_id BIGINT NOT NULL, user_id INT NOT NULL, status VARCHAR(50), PRIMARY KEY (order_item_id));
diff --git a/07rpc/rpc01/client-rest.http b/07rpc/rpc01/client-rest.http
new file mode 100644
index 00000000..cfe742ec
--- /dev/null
+++ b/07rpc/rpc01/client-rest.http
@@ -0,0 +1 @@
+http://127.0.0.1:8091/api/hello
\ No newline at end of file
diff --git a/07rpc/rpc01/pom.xml b/07rpc/rpc01/pom.xml
index 62b29db1..8c924dcf 100644
--- a/07rpc/rpc01/pom.xml
+++ b/07rpc/rpc01/pom.xml
@@ -5,7 +5,7 @@
org.springframework.boot
spring-boot-starter-parent
- 2.0.9.RELEASE
+ 2.7.3
io.kimmking
diff --git a/07rpc/rpc01/rpcfx-core/pom.xml b/07rpc/rpc01/rpcfx-core/pom.xml
index 4570a59d..5a1eeac3 100644
--- a/07rpc/rpc01/rpcfx-core/pom.xml
+++ b/07rpc/rpc01/rpcfx-core/pom.xml
@@ -21,7 +21,7 @@
com.alibaba
fastjson
- 1.2.70
+ 1.2.83
@@ -45,7 +45,7 @@
org.apache.curator
- curator-framework
+ curator-recipes
5.1.0
@@ -70,6 +70,7 @@
-
+
+
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/annotation/RpcfxReference.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/annotation/RpcfxReference.java
new file mode 100644
index 00000000..570b8ab8
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/annotation/RpcfxReference.java
@@ -0,0 +1,17 @@
+package io.kimmking.rpcfx.annotation;
+
+import java.lang.annotation.*;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/1/1 20:00
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+@Inherited
+public @interface RpcfxReference {
+
+}
\ No newline at end of file
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/annotation/RpcfxService.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/annotation/RpcfxService.java
new file mode 100644
index 00000000..9db3c7e3
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/annotation/RpcfxService.java
@@ -0,0 +1,18 @@
+package io.kimmking.rpcfx.annotation;
+
+import java.lang.annotation.*;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/1/1 20:00
+ */
+
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+@Inherited
+public @interface RpcfxService {
+
+}
\ No newline at end of file
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Filter.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Filter.java
index 64f3b99d..29060ace 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Filter.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Filter.java
@@ -2,7 +2,9 @@
public interface Filter {
- boolean filter(RpcfxRequest request);
+ RpcfxResponse prefilter(RpcfxRequest request);
+
+ RpcfxResponse postfilter(RpcfxRequest request, RpcfxResponse response);
// Filter next();
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/LoadBalancer.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/LoadBalancer.java
index eccb66f5..5ac4fab2 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/LoadBalancer.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/LoadBalancer.java
@@ -1,9 +1,11 @@
package io.kimmking.rpcfx.api;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+
import java.util.List;
public interface LoadBalancer {
- String select(List urls);
+ InstanceMeta select(List instances);
}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Router.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Router.java
index 594aeff5..a4ed3225 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Router.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Router.java
@@ -1,8 +1,10 @@
package io.kimmking.rpcfx.api;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+
import java.util.List;
public interface Router {
- List route(List urls);
+ List route(List instances);
}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcContext.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcContext.java
new file mode 100644
index 00000000..79a65bde
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcContext.java
@@ -0,0 +1,41 @@
+package io.kimmking.rpcfx.api;
+
+import io.kimmking.rpcfx.meta.ProviderMeta;
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/1/13 20:34
+ */
+public class RpcContext {
+
+ @Getter
+ private final MultiValueMap providerHolder = new LinkedMultiValueMap<>();
+
+ @Getter
+ private final Map consumerHolder = new HashMap<>();
+
+ @Getter
+ private final Map parameters = new HashMap<>();
+
+ @Getter
+ @Setter
+ private Router router;
+
+ @Getter
+ @Setter
+ private LoadBalancer loadBalancer;
+
+ @Getter
+ @Setter
+ private Filter[] filters;
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxRequest.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxRequest.java
index 5ee7b9e1..1e9edfb4 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxRequest.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxRequest.java
@@ -5,6 +5,6 @@
@Data
public class RpcfxRequest {
private String serviceClass;
- private String method;
+ private String methodSign;
private Object[] params;
}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxResolver.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxResolver.java
deleted file mode 100644
index f7c48068..00000000
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxResolver.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package io.kimmking.rpcfx.api;
-
-public interface RpcfxResolver {
-
- Object resolve(String serviceClass);
-
-}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java
deleted file mode 100644
index 5d1ae517..00000000
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java
+++ /dev/null
@@ -1,113 +0,0 @@
-package io.kimmking.rpcfx.client;
-
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.parser.ParserConfig;
-import io.kimmking.rpcfx.api.*;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.ArrayList;
-import java.util.List;
-
-public final class Rpcfx {
-
- static {
- ParserConfig.getGlobalInstance().addAccept("io.kimmking");
- }
-
- public static T createFromRegistry(final Class serviceClass, final String zkUrl, Router router, LoadBalancer loadBalance, Filter filter) {
-
- // 加filte之一
-
- // curator Provider list from zk
- List invokers = new ArrayList<>();
- // 1. 简单:从zk拿到服务提供的列表
- // 2. 挑战:监听zk的临时节点,根据事件更新这个list(注意,需要做个全局map保持每个服务的提供者List)
-
- List urls = router.route(invokers);
-
- String url = loadBalance.select(urls); // router, loadbalance
-
- return (T) create(serviceClass, url, filter);
-
- }
-
- public static T create(final Class serviceClass, final String url, Filter... filters) {
-
- // 0. 替换动态代理 -> 字节码生成
- return (T) Proxy.newProxyInstance(Rpcfx.class.getClassLoader(), new Class[]{serviceClass}, new RpcfxInvocationHandler(serviceClass, url, filters));
-
- }
-
- public static class RpcfxInvocationHandler implements InvocationHandler {
-
- public static final MediaType JSONTYPE = MediaType.get("application/json; charset=utf-8");
-
- private final Class> serviceClass;
- private final String url;
- private final Filter[] filters;
-
- public RpcfxInvocationHandler(Class serviceClass, String url, Filter... filters) {
- this.serviceClass = serviceClass;
- this.url = url;
- this.filters = filters;
- }
-
- // 可以尝试,自己去写对象序列化,二进制还是文本的,,,rpcfx是xml自定义序列化、反序列化,json: code.google.com/p/rpcfx
- // int byte char float double long bool
- // [], data class
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] params) throws Throwable {
-
- // 加filter地方之二
- // mock == true, new Student("hubao");
-
- RpcfxRequest request = new RpcfxRequest();
- request.setServiceClass(this.serviceClass.getName());
- request.setMethod(method.getName());
- request.setParams(params);
-
- if (null!=filters) {
- for (Filter filter : filters) {
- if (!filter.filter(request)) {
- return null;
- }
- }
- }
-
- RpcfxResponse response = post(request, url);
-
- // 加filter地方之三
- // Student.setTeacher("cuijing");
-
- // 这里判断response.status,处理异常
- // 考虑封装一个全局的RpcfxException
-
- return JSON.parse(response.getResult().toString());
- }
-
- private RpcfxResponse post(RpcfxRequest req, String url) throws IOException {
- String reqJson = JSON.toJSONString(req);
- System.out.println("req json: "+reqJson);
-
- // 1.可以复用client
- // 2.尝试使用httpclient或者netty client
- OkHttpClient client = new OkHttpClient();
- final Request request = new Request.Builder()
- .url(url)
- .post(RequestBody.create(JSONTYPE, reqJson))
- .build();
- String respJson = client.newCall(request).execute().body().string();
- System.out.println("resp json: "+respJson);
- return JSON.parseObject(respJson, RpcfxResponse.class);
- }
- }
-}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/ConsumerBootstrap.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/ConsumerBootstrap.java
new file mode 100644
index 00000000..a69fe083
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/ConsumerBootstrap.java
@@ -0,0 +1,110 @@
+package io.kimmking.rpcfx.consumer;
+
+import io.kimmking.rpcfx.annotation.RpcfxReference;
+import io.kimmking.rpcfx.api.RpcContext;
+import io.kimmking.rpcfx.meta.ServiceMeta;
+import io.kimmking.rpcfx.registry.RegistryCenter;
+import io.kimmking.rpcfx.registry.RegistryConfiguration;
+import io.kimmking.rpcfx.stub.StubSkeletonHelper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.PropertyValues;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessor;
+import org.springframework.context.annotation.Import;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/1/13 23:26
+ */
+@Slf4j
+@Component
+@Import({RegistryConfiguration.class})
+public class ConsumerBootstrap implements Closeable, InstantiationAwareBeanPostProcessor {
+
+ private RpcContext context = new RpcContext();
+
+ private String scanPackage = "io.kimmking";
+
+ @Value("${app.id:app1}")
+ public String app;
+ @Value("${app.namespace:public}")
+ public String ns;
+ @Value("${app.env:dev}")
+ public String env;
+ @Value("${app.mock:false}")
+ public boolean mock;
+ @Value("${app.cache:false}")
+ public boolean cache;
+ @Value("${app.retry:1}")
+ public int retry;
+
+ @Autowired
+ RegistryCenter rc;
+
+ @PostConstruct
+ public void init() {
+ this.context.getParameters().put("app.id", app);
+ this.context.getParameters().put("app.namespace", ns);
+ this.context.getParameters().put("app.env", env);
+ this.context.getParameters().put("app.mock", String.valueOf(mock));
+ this.context.getParameters().put("app.cache", String.valueOf(cache));
+ this.context.getParameters().put("app.retry", String.valueOf(retry));
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName) throws BeansException {
+ if (bean.getClass().getPackage().getName().startsWith(scanPackage)) {
+ Field[] declaredFields = resolveAllField(bean.getClass()); // 解决父类里的注解扫描不到的问题
+
+ List consumers = Arrays.stream(declaredFields)
+ .filter(field -> field.isAnnotationPresent(RpcfxReference.class))
+ .collect(Collectors.toList());
+
+ consumers.stream().forEach(field -> {
+ Object consumer = createConsumer(field.getType());
+ try {
+ field.setAccessible(true);
+ field.set(bean, consumer);
+ } catch (IllegalAccessException e) {
+ log.error(e.getMessage(), e);
+ }
+ });
+ }
+ return null;
+ }
+
+ private Field[] resolveAllField(Class> aClass) {
+ List res = new ArrayList<>(20);
+ while ( !Object.class.equals(aClass) ) {
+ Field[] fields = aClass.getDeclaredFields();
+ res.addAll(Arrays.asList(fields));
+ aClass = aClass.getSuperclass();
+ }
+ return res.toArray(new Field[0]);
+ }
+
+ private T createConsumer(Class clazz) {
+ ServiceMeta sm = ServiceMeta.builder().name(clazz.getCanonicalName())
+ .app(app).namespace(ns).env(env).build();
+ return (T) StubSkeletonHelper.createConsumer(sm, context, rc);
+ }
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxConsumerInvoker.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxConsumerInvoker.java
new file mode 100644
index 00000000..66d9ea89
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxConsumerInvoker.java
@@ -0,0 +1,70 @@
+package io.kimmking.rpcfx.consumer;
+
+
+import com.alibaba.fastjson.parser.ParserConfig;
+import io.kimmking.rpcfx.api.*;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import io.kimmking.rpcfx.meta.ServiceMeta;
+import io.kimmking.rpcfx.registry.RegistryCenter;
+
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.List;
+
+public final class RpcfxConsumerInvoker {
+
+ static {
+ ParserConfig.getGlobalInstance().addAccept("io.kimmking");
+ }
+
+ RpcContext ctx;
+
+ RegistryCenter rc;
+
+ public RpcfxConsumerInvoker(RpcContext ctx, RegistryCenter rc) {
+ this.ctx = ctx;
+ this.rc = rc; //"localhost:2181"
+ }
+
+ public void start() {
+ this.rc.start();
+ }
+
+ public void stop() {
+ this.rc.stop();
+ }
+
+ public T createFromRegistry(final ServiceMeta sm, RpcContext ctx) {
+
+ String service = sm.getName();//"io.kimking.rpcfx.demo.api.UserService";
+ System.out.println("====> "+service);
+ List invokers = new ArrayList<>();
+ Class> serviceClass = null;
+ try {
+
+ serviceClass = Class.forName(service);
+
+ List insts = rc.fetchInstances(sm);
+ if(insts != null && insts.size()>0) invokers.addAll(insts);
+ rc.subscribe(sm, e -> {
+ invokers.clear();
+ invokers.addAll((List)e.getData());
+ });
+
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ throw new RuntimeException(ex);
+ }
+
+ return (T) create(serviceClass, invokers, ctx);
+
+ }
+
+ private T create(Class serviceClass, List invokers, RpcContext ctx) {
+ RpcfxInvocationHandler invocationHandler
+ = new RpcfxInvocationHandler(serviceClass, invokers, ctx);
+ return (T) Proxy.newProxyInstance(RpcfxConsumerInvoker.class.getClassLoader(),
+ new Class[]{serviceClass}, invocationHandler);
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxInvocationHandler.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxInvocationHandler.java
new file mode 100644
index 00000000..0325e0c7
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxInvocationHandler.java
@@ -0,0 +1,140 @@
+package io.kimmking.rpcfx.consumer;
+
+import com.alibaba.fastjson.JSON;
+import io.kimmking.rpcfx.api.*;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import io.kimmking.rpcfx.stub.StubSkeletonHelper;
+import io.kimmking.rpcfx.utils.MethodUtils;
+import okhttp3.*;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.net.SocketTimeoutException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class RpcfxInvocationHandler implements InvocationHandler {
+
+ public final Object target = new Object();
+
+ public static final MediaType JSONTYPE = MediaType.get("application/json; charset=utf-8");
+
+ private final Class> serviceClass;
+ private final List invokers;
+
+ private final RpcContext context;
+
+ public RpcfxInvocationHandler(Class serviceClass, List invokers, RpcContext ctx) {
+ this.serviceClass = serviceClass;
+ this.invokers = invokers;
+ this.context = ctx;
+ }
+
+ // 可以尝试,自己去写对象序列化,二进制还是文本的,,,rpcfx是xml自定义序列化、反序列化,json: code.google.com/p/rpcfx
+ // int byte char float double long bool
+ // [], data class
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] params) throws Throwable {
+
+ long start = System.currentTimeMillis();
+
+ if (!StubSkeletonHelper.checkRpcMethod(method)){
+ return method.invoke(target, params);
+ }
+
+
+ int retry = 2;
+ while (retry-- > 0) {
+ System.out.println("retry:" + retry);
+ try {
+
+ // check mock, 挡板功能 TODO 3
+
+ List insts = context.getRouter().route(invokers);
+// System.out.println("router.route => ");
+// urls.forEach(System.out::println);
+ InstanceMeta instance = context.getLoadBalancer().select(insts); // router, loadbalance
+// System.out.println("loadBalance.select => ");
+// System.out.println("final => " + url);
+
+ if (instance == null) {
+ throw new RuntimeException("No available providers from registry center.");
+ }
+
+
+ RpcfxRequest request = new RpcfxRequest();
+ request.setServiceClass(this.serviceClass.getName());
+ request.setMethodSign(MethodUtils.methodSign(method));
+ request.setParams(params);
+
+ Filter[] filters = context.getFilters();
+
+ if (null != filters) {
+ for (Filter filter : filters) {
+ RpcfxResponse response = filter.prefilter(request);
+ if (response != null) {
+ return JSON.parse(response.getResult().toString());
+ }
+ }
+ }
+
+ // 没有控制超时,可能会很久 TODO 2
+ RpcfxResponse response = post(request, instance);
+
+ if (null != filters) {
+ for (Filter filter : filters) {
+ RpcfxResponse postResponse = filter.postfilter(request, response);
+ if (postResponse!=null) {
+ response = postResponse;
+ }
+ }
+ }
+
+ System.out.println("Invoke spend " + (System.currentTimeMillis()-start) + " ms");
+
+ // 加filter地方之三
+ // Student.setTeacher("cuijing");
+
+ // 这里判断response.status,处理异常
+ // 考虑封装一个全局的RpcfxException
+
+ return JSON.parse(response.getResult().toString());
+
+ } catch (RuntimeException ex) {
+ ex.printStackTrace();
+ if(! (ex.getCause() instanceof SocketTimeoutException)) {
+ break;
+ }
+ }
+ }
+ return null;
+
+ }
+
+ OkHttpClient client = new OkHttpClient.Builder()
+ .connectionPool(new ConnectionPool(128, 60, TimeUnit.SECONDS))
+// .dispatcher(dispatcher)
+ .readTimeout(1, TimeUnit.SECONDS)
+ .writeTimeout(1, TimeUnit.SECONDS)
+ .connectTimeout(1, TimeUnit.SECONDS)
+ .build();
+
+ private RpcfxResponse post(RpcfxRequest req, InstanceMeta instance) throws Exception {
+ String reqJson = JSON.toJSONString(req);
+ System.out.println("req json: "+reqJson);
+
+ final Request request = new Request.Builder()
+ .url(instance.toString())
+ .post(RequestBody.create(JSONTYPE, reqJson))
+ .build();
+ String respJson;
+ try {
+ respJson = client.newCall(request).execute().body().string();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ System.out.println("resp json: "+respJson);
+ return JSON.parseObject(respJson, RpcfxResponse.class);
+ }
+}
\ No newline at end of file
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/InstanceMeta.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/InstanceMeta.java
new file mode 100644
index 00000000..e1388143
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/InstanceMeta.java
@@ -0,0 +1,48 @@
+package io.kimmking.rpcfx.meta;
+
+import com.google.common.base.Strings;
+import lombok.*;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/8 19:46
+ */
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode(of = {"scheme", "host", "port", "context"})
+public class InstanceMeta {
+
+ private String scheme;
+ private String host;
+ private Integer port;
+ private String context;
+ private boolean status;
+ private Map metadata;
+
+ public static InstanceMeta from(String instance) {
+ URI uri = URI.create(instance);
+ String path = uri.getPath();
+ path = Strings.isNullOrEmpty(path) ? "" : path.substring(1);
+ return InstanceMeta.builder()
+ .scheme(uri.getScheme())
+ .host(uri.getHost())
+ .port(uri.getPort())
+ .context(path)
+ .build();
+ }
+
+ @Override
+ public String toString() {
+ return scheme + "://" + host + ":" + port + "/" + context;
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ProviderMeta.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ProviderMeta.java
new file mode 100644
index 00000000..f0b9e0ae
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ProviderMeta.java
@@ -0,0 +1,19 @@
+package io.kimmking.rpcfx.meta;
+
+import lombok.Data;
+
+import java.lang.reflect.Method;
+
+/**
+ * @author lirui
+ */
+@Data
+public class ProviderMeta {
+
+ private Object serviceImpl;
+
+ private Method method;
+
+ private String methodSign;
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ServerMeta.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ServerMeta.java
new file mode 100644
index 00000000..239042b8
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ServerMeta.java
@@ -0,0 +1,24 @@
+package io.kimmking.rpcfx.meta;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/4/13 21:43
+ */
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode(of = {"url"})
+public class ServerMeta {
+ private String url;
+ private boolean leader;
+ private boolean status;
+ private long version;
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ServiceMeta.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ServiceMeta.java
new file mode 100644
index 00000000..c443a272
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ServiceMeta.java
@@ -0,0 +1,25 @@
+package io.kimmking.rpcfx.meta;
+
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/8 19:46
+ */
+@Data
+@Builder
+public class ServiceMeta {
+
+ private String app;
+ private String namespace;
+ private String env;
+ private String name;
+
+ @Override
+ public String toString() {
+ return String.format("%s_%s_%s_%s", app, namespace, env, name);
+ }
+}
\ No newline at end of file
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/ProviderBootstrap.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/ProviderBootstrap.java
new file mode 100644
index 00000000..ce714bb0
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/ProviderBootstrap.java
@@ -0,0 +1,154 @@
+package io.kimmking.rpcfx.provider;
+
+import io.kimmking.rpcfx.annotation.RpcfxService;
+import io.kimmking.rpcfx.api.RpcContext;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import io.kimmking.rpcfx.meta.ServiceMeta;
+import io.kimmking.rpcfx.registry.RegistryCenter;
+import io.kimmking.rpcfx.registry.RegistryConfiguration;
+import io.kimmking.rpcfx.stub.StubSkeletonHelper;
+import lombok.Getter;
+import lombok.SneakyThrows;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Import;
+import org.springframework.core.annotation.AnnotationUtils;
+import org.springframework.core.annotation.Order;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/1/13 20:27
+ */
+
+@Component
+@Import({RegistryConfiguration.class})
+public class ProviderBootstrap {
+
+ @Autowired
+ private ApplicationContext applicationContext;
+
+ @Autowired
+ Environment environment;
+
+ @Value("${app.id:app1}")
+ public String app;
+ @Value("${app.namespace:public}")
+ public String ns;
+ @Value("${app.env:dev}")
+ public String env;
+
+ private final RpcContext context = new RpcContext();
+
+ @Getter
+ private final RpcfxProviderInvoker invoker = new RpcfxProviderInvoker(context);;
+
+ private static String SCHEME = "http";
+ private static String ip;
+ private static int port;
+
+ @Autowired
+ RegistryCenter registry;// = new KKRegistryCenter();
+
+ @SneakyThrows
+ @PostConstruct
+ public void start(){
+ System.out.println("build all services from annotation...");
+ buildProvider();
+
+ System.out.println("get IP and PORT...");
+ ip = InetAddress.getLocalHost().getHostAddress();
+ port = Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port")));
+ }
+
+ private void buildProvider() {
+ String[] beansName = applicationContext.getBeanDefinitionNames();
+ for (int i = 0; i < beansName.length; i++) {
+ String beanName = beansName[i];
+ Object bean = applicationContext.getBean(beanName);
+ RpcfxService provider = AnnotationUtils.findAnnotation(bean.getClass(), RpcfxService.class);
+ if (provider == null) {
+ continue;
+ }
+ Class>[] classes = bean.getClass().getInterfaces();
+ if (classes == null || classes.length == 0) {
+ continue;
+ }
+ Arrays.stream(classes).forEach(c -> this.createProvider(c, bean));
+ }
+ }
+
+ private void createProvider(Class> clazz, Object bean) {
+ StubSkeletonHelper.createProvider(clazz, bean, context); // 初始化了holder
+ }
+
+ @Order(Integer.MIN_VALUE)
+ @Bean
+ public ApplicationRunner run() throws Exception {
+ return x -> registerServices();
+ }
+
+ private void registerServices() {
+
+ registry.start();
+
+ System.out.println("registry all services from RegistryCenter...");
+ context.getProviderHolder().forEach( (x, y) ->
+ {
+ System.out.println(" register " + x);
+ ServiceMeta sm = ServiceMeta.builder().name(x)
+ .app(app).namespace(ns).env(env).build();
+
+ InstanceMeta im = InstanceMeta.builder()
+ .scheme(SCHEME).host(ip).port(port).context("").build();
+ try {
+ registry.registerService(sm, im);
+ registry.heartbeat(sm, im);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+ }
+
+ @PreDestroy
+ public void stop() {
+ unregisterServices();
+ }
+
+ private void unregisterServices() {
+ System.out.println("unregistry all services from RegistryCenter...");
+ context.getProviderHolder().forEach( (x, y) ->
+ {
+ System.out.println(" unregister " + x);
+ ServiceMeta sm = ServiceMeta.builder().name(x)
+ .app(app).namespace(ns).env(env).build();
+ InstanceMeta im = InstanceMeta.builder()
+ .scheme(SCHEME).host(ip).port(port).context("").build();
+ try {
+ registry.unregisterService(sm, im);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+
+ registry.stop();
+
+ }
+
+
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/RpcfxProviderInvoker.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/RpcfxProviderInvoker.java
new file mode 100644
index 00000000..6842dfe6
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/RpcfxProviderInvoker.java
@@ -0,0 +1,61 @@
+package io.kimmking.rpcfx.provider;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import io.kimmking.rpcfx.api.RpcContext;
+import io.kimmking.rpcfx.api.RpcfxRequest;
+import io.kimmking.rpcfx.api.RpcfxResponse;
+import io.kimmking.rpcfx.meta.ProviderMeta;
+import org.springframework.util.CollectionUtils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Optional;
+
+public class RpcfxProviderInvoker {
+
+ RpcContext context;
+
+ public RpcfxProviderInvoker(RpcContext context) {
+ this.context = context;
+ }
+
+ public RpcfxResponse invoke(RpcfxRequest request) {
+ RpcfxResponse response = new RpcfxResponse();
+ String serviceClass = request.getServiceClass();
+
+ ProviderMeta meta = findProvider(serviceClass, request.getMethodSign());
+
+ try {
+ Method method = meta.getMethod();
+ // 没有控制超时,所以可能会很久 TODO 1
+ Object result = method.invoke(meta.getServiceImpl(), request.getParams()); // dubbo, fastjson,
+ // 两次json序列化能否合并成一个
+ response.setResult(JSON.toJSONString(result, SerializerFeature.WriteClassName));
+ response.setStatus(true);
+ return response;
+ } catch ( IllegalAccessException | InvocationTargetException e) {
+
+ // 3.Xstream
+
+ // 2.封装一个统一的RpcfxException
+ // 客户端也需要判断异常
+ e.printStackTrace();
+ response.setException(e);
+ response.setStatus(false);
+ return response;
+ }
+ }
+
+ protected ProviderMeta findProvider(String interfaceName, String methodSign) {
+ List providerMetas = context.getProviderHolder().get(interfaceName);
+ if (!CollectionUtils.isEmpty(providerMetas)) {
+ Optional providerMeta = providerMetas.stream()
+ .filter(provider -> methodSign.equals(provider.getMethodSign())).findFirst();
+ return providerMeta.orElse(null);
+ }
+ return null;
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/ChangedListener.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/ChangedListener.java
new file mode 100644
index 00000000..971cbcda
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/ChangedListener.java
@@ -0,0 +1,13 @@
+package io.kimmking.rpcfx.registry;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/8 20:19
+ */
+public interface ChangedListener {
+
+ void fireEvent(Event e);
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/Event.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/Event.java
new file mode 100644
index 00000000..19a92cd4
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/Event.java
@@ -0,0 +1,29 @@
+package io.kimmking.rpcfx.registry;
+
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/8 20:20
+ */
+public interface Event {
+
+ T getData();
+
+ static Event> withData(List list) {
+ return new ChangedEvent(list);
+ }
+
+ @Data
+ @AllArgsConstructor
+ class ChangedEvent implements Event> {
+ List data;
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/RegistryCenter.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/RegistryCenter.java
new file mode 100644
index 00000000..871c1a7e
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/RegistryCenter.java
@@ -0,0 +1,35 @@
+package io.kimmking.rpcfx.registry;
+
+import io.kimmking.rpcfx.api.ServiceProviderDesc;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import io.kimmking.rpcfx.meta.ServiceMeta;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.CreateMode;
+
+import java.util.List;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/8 15:23
+ */
+public interface RegistryCenter {
+
+ void start();
+
+ void stop();
+
+ void registerService(ServiceMeta service, InstanceMeta instance) throws Exception;
+
+ void unregisterService(ServiceMeta service, InstanceMeta instance) throws Exception;
+
+ List fetchInstances(ServiceMeta service) throws Exception;
+
+ void subscribe(ServiceMeta service, ChangedListener> listener);
+
+ void heartbeat(ServiceMeta service, InstanceMeta instance);
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/RegistryConfiguration.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/RegistryConfiguration.java
new file mode 100644
index 00000000..8391c4aa
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/RegistryConfiguration.java
@@ -0,0 +1,24 @@
+package io.kimmking.rpcfx.registry;
+
+import io.kimmking.rpcfx.registry.kkregistry.KKRegistryCenter;
+import io.kimmking.rpcfx.registry.zookeeper.ZookeeperRegistryCenter;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/9 01:05
+ */
+
+@Configuration
+public class RegistryConfiguration {
+
+ @Bean
+ RegistryCenter createRC() {
+ return new KKRegistryCenter();
+ //return new ZookeeperRegistryCenter(); //KKRegistryCenter();
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/kkregistry/KKHeathChecker.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/kkregistry/KKHeathChecker.java
new file mode 100644
index 00000000..bbf0e17e
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/kkregistry/KKHeathChecker.java
@@ -0,0 +1,42 @@
+package io.kimmking.rpcfx.registry.kkregistry;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/1 06:18
+ */
+public class KKHeathChecker {
+
+ final int interval = 5_000;
+
+ final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ static final DateTimeFormatter DTF = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss");
+
+ public void check(Callback callback) {
+ executor.scheduleWithFixedDelay(() -> {
+ System.out.println("start to check kk health ...[" + DTF.format(LocalDateTime.now()) + "]");
+ try {
+ callback.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, interval, interval, TimeUnit.MILLISECONDS);
+ }
+
+ public void stop() {
+ this.executor.shutdown();
+ }
+
+ public interface Callback {
+ void call() throws Exception;
+ }
+
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/kkregistry/KKRegistryCenter.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/kkregistry/KKRegistryCenter.java
new file mode 100644
index 00000000..001a3854
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/kkregistry/KKRegistryCenter.java
@@ -0,0 +1,209 @@
+package io.kimmking.rpcfx.registry.kkregistry;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import io.kimmking.rpcfx.meta.ServerMeta;
+import io.kimmking.rpcfx.meta.ServiceMeta;
+import io.kimmking.rpcfx.registry.ChangedListener;
+import io.kimmking.rpcfx.registry.Event;
+import io.kimmking.rpcfx.registry.RegistryCenter;
+import lombok.SneakyThrows;
+import okhttp3.ConnectionPool;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static io.kimmking.rpcfx.consumer.RpcfxInvocationHandler.JSONTYPE;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/8 15:25
+ */
+public class KKRegistryCenter implements RegistryCenter {
+
+ public String RC_Server = "http://localhost:8485";
+ private ServerMeta leader;
+ private List servers;
+ private Map TV = new HashMap<>();
+
+ OkHttpClient client;
+ @SneakyThrows
+ @Override
+ public void start() {
+ client = new OkHttpClient.Builder()
+ .connectionPool(new ConnectionPool(128, 60, TimeUnit.SECONDS))
+// .dispatcher(dispatcher)
+ .readTimeout(65, TimeUnit.SECONDS)
+ .writeTimeout(65, TimeUnit.SECONDS)
+ .connectTimeout(3, TimeUnit.SECONDS)
+ .build();
+
+ String url = RC_Server + "/cluster";
+ boolean init = false;
+ while(!init) {
+ System.out.println("===============>> cluster info from :" + url);
+ List new_servers = null;
+ ServerMeta new_leader = null;
+ try {
+ String respJson = get(url);
+ new_servers = JSON.parseObject(respJson, new TypeReference>() {
+ });
+ new_leader = new_servers.stream().filter(ServerMeta::isStatus)
+ .filter(ServerMeta::isLeader).findFirst().orElse(null);
+ } catch (Exception exception) {
+ exception.printStackTrace();
+ }
+
+ if(new_leader == null) {
+ System.out.println("===============>> no leader, 500ms later and retry.");
+ Thread.sleep(500);
+ Random random = new Random();
+ if(new_servers !=null && new_servers.size() > 1) {
+ url = new_servers.get(random.nextInt(new_servers.size())).getUrl() + "/cluster";
+ } else if((new_servers ==null || new_servers.isEmpty()) && !servers.isEmpty()) {
+ url = servers.get(random.nextInt(servers.size())).getUrl() + "/cluster";
+ }
+ } else {
+ this.servers = new_servers;
+ this.leader = new_leader;
+ init = true;
+ System.out.println("===============>> init ok, new_leader = " + new_leader);
+ System.out.println("===============>> init ok, new_servers = " + new_servers);
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ this.checker.stop();
+ }
+
+ @Override
+ public void registerService(ServiceMeta service, InstanceMeta instance) throws Exception {
+ instance.setStatus(true);
+ String reqJson = JSON.toJSONString(instance);
+ String url = leader.getUrl() + "/reg?service=" + service;
+ post(url, reqJson);
+// String reqJson = "{\n" +
+// " \"scheme\": \"http\",\n" +
+// " \"ip\": \"" + instance.getIp() + "\",\n" +
+// " \"port\": \"" + instance.getPort() + "\",\n" +
+// " \"context\": \"\",\n" +
+// " \"status\": \"online\",\n" +
+// " \"metadata\": {\n" +
+// " \"env\": \"dev\",\n" +
+// " \"tag\": \"RED\"\n" +
+// " }\n" +
+// "}";
+// final Request request = new Request.Builder()
+// .url("http://localhost:8484/reg?service=" + service)
+// .post(RequestBody.create(JSONTYPE, reqJson))
+// .build();
+// String respJson = client.newCall(request).execute().body().string();
+// System.out.println(respJson);
+ }
+
+ private String post(String url, String reqJson) throws IOException {
+ System.out.println(" ====> request: " + url);
+ final Request request = new Request.Builder()
+ .url(url)
+ .post(RequestBody.create(JSONTYPE, reqJson))
+ .build();
+ String respJson = client.newCall(request).execute().body().string();
+ System.out.println(" ====> response: " + respJson);
+ return respJson;
+ }
+
+ private String get(String url) throws IOException {
+ System.out.println(" ====> request: " + url);
+ final Request request = new Request.Builder()
+ .url(url)
+ .get()
+ .build();
+ String respJson = client.newCall(request).execute().body().string();
+ System.out.println(" ====> response: " + respJson);
+ return respJson;
+ }
+
+ @Override
+ public void unregisterService(ServiceMeta service, InstanceMeta instance) throws Exception {
+ String reqJson = "{\n" +
+ " \"scheme\": \"http\",\n" +
+ " \"host\": \"" + instance.getHost() + "\",\n" +
+ " \"port\": \"" + instance.getPort() + "\",\n" +
+ " \"context\": \"\"\n" +
+ "}";
+ String url = leader.getUrl() + "/unreg?service=" + service;
+ post(url, reqJson);
+ }
+
+ public List fetchInstances(ServiceMeta service) throws Exception {
+ String url = RC_Server + "/findAll?service=" + service;
+ String respJson = get(url);
+ List instances = JSON.parseObject(respJson, new TypeReference>() {
+ });
+ return instances;
+ }
+
+ KKHeathChecker checker = new KKHeathChecker();
+
+ // for Consumer
+ public void subscribe(ServiceMeta service, final ChangedListener> listener) {
+ checker.check( () -> {
+ if(hb(service)) {
+ List instances = fetchInstances(service);
+ Event> e = Event.withData(instances);
+ listener.fireEvent(e);
+ }
+ });
+
+ // 定时器轮询
+ // 保存上一次的TV
+ // 如果有差异就fire
+ }
+
+ private boolean hb(ServiceMeta service) throws Exception {
+ String svc = service.toString();
+ String url = RC_Server + "/version?service=" + svc;
+ String respJson = get(url);
+ Long v = Long.valueOf(respJson);
+ Long o = TV.getOrDefault(svc, -1L);
+ if ( v > o) {
+ TV.put(svc, v);
+ return o > -1L;
+ }
+ return false;
+ }
+
+
+ // for Provider
+ public void heartbeat(ServiceMeta service, InstanceMeta instance) {
+ checker.check( () -> {
+ heart(service, instance);
+ });
+ }
+
+ Long heart(ServiceMeta service, InstanceMeta instance) throws Exception {
+ String reqJson = "{\n" +
+ " \"scheme\": \"http\",\n" +
+ " \"host\": \"" + instance.getHost() + "\",\n" +
+ " \"port\": \"" + instance.getPort() + "\",\n" +
+ " \"context\": \"\",\n" +
+ " \"status\": true\n" +
+ "}";
+ String url = leader.getUrl() + "/renew?service=" + service;
+ String respJson = post(url, reqJson);
+ return Long.valueOf(respJson);
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/zookeeper/ZookeeperRegistryCenter.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/zookeeper/ZookeeperRegistryCenter.java
new file mode 100644
index 00000000..37f7b6ea
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/zookeeper/ZookeeperRegistryCenter.java
@@ -0,0 +1,101 @@
+package io.kimmking.rpcfx.registry.zookeeper;
+
+import io.kimmking.rpcfx.api.ServiceProviderDesc;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import io.kimmking.rpcfx.meta.ServiceMeta;
+import io.kimmking.rpcfx.registry.ChangedListener;
+import io.kimmking.rpcfx.registry.Event;
+import io.kimmking.rpcfx.registry.RegistryCenter;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.CreateMode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/1/13 20:16
+ */
+public class ZookeeperRegistryCenter implements RegistryCenter {
+
+// private final List listeners = new ArrayList<>();
+// public void addListener(ChangedListener listener) {
+// this.listeners.add(listener);
+// }
+
+ CuratorFramework client = null;
+ public void start() {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ client = CuratorFrameworkFactory.builder().connectString("localhost:2181").namespace("rpcfx").retryPolicy(retryPolicy).build();
+ client.start();
+ }
+
+ public void stop(){
+ client.close();
+ }
+
+ public void registerService(ServiceMeta service, InstanceMeta instance) throws Exception {
+ ServiceProviderDesc userServiceSesc = ServiceProviderDesc.builder()
+ .host(instance.getHost())
+ .port(instance.getPort()).serviceClass(service.getName()).build();
+ // String userServiceSescJson = JSON.toJSONString(userServiceSesc);
+
+ try {
+ if ( null == client.checkExists().forPath("/" + service)) {
+ client.create().withMode(CreateMode.PERSISTENT).forPath("/" + service, "service".getBytes());
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+
+ client.create().withMode(CreateMode.EPHEMERAL).
+ forPath( "/" + service + "/" + userServiceSesc.getHost() + "_" + userServiceSesc.getPort(), "provider".getBytes());
+ }
+
+ public void unregisterService(ServiceMeta service, InstanceMeta instance) throws Exception {
+
+ if (null == client.checkExists().forPath("/" + service)) {
+ return;
+ }
+ System.out.println("delete " + "/" + service + "/" + instance.getHost() + "_" + instance.getPort());
+ client.delete().quietly().
+ forPath( "/" + service + "/" + instance.getHost() + "_" + instance.getPort());
+ }
+
+ public List fetchInstances(ServiceMeta service) throws Exception {
+ List services = client.getChildren().forPath("/" + service);
+ List instances = new ArrayList<>();
+ for (String svc : services) {
+ System.out.println(svc);
+ String url = svc.replace("_", ":");
+ instances.add(InstanceMeta.from("http://" + url));
+ }
+ return instances;
+ }
+
+ public void subscribe(ServiceMeta service, ChangedListener listener) {
+ final TreeCache treeCache = TreeCache.newBuilder(client, "/" + service).setCacheData(true).setMaxDepth(2).build();
+ treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {
+ System.out.println("treeCacheEvent: "+treeCacheEvent);
+ List instances = fetchInstances(service);
+ listener.fireEvent(Event.withData(instances));
+ });
+ try {
+ treeCache.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void heartbeat(ServiceMeta service, InstanceMeta instance) {
+ // do nothing
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/server/RpcfxInvoker.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/server/RpcfxInvoker.java
deleted file mode 100644
index a6f77dac..00000000
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/server/RpcfxInvoker.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package io.kimmking.rpcfx.server;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import io.kimmking.rpcfx.api.RpcfxRequest;
-import io.kimmking.rpcfx.api.RpcfxResolver;
-import io.kimmking.rpcfx.api.RpcfxResponse;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Arrays;
-
-public class RpcfxInvoker {
-
- private RpcfxResolver resolver;
-
- public RpcfxInvoker(RpcfxResolver resolver){
- this.resolver = resolver;
- }
-
- public RpcfxResponse invoke(RpcfxRequest request) {
- RpcfxResponse response = new RpcfxResponse();
- String serviceClass = request.getServiceClass();
-
- // 作业1:改成泛型和反射
- Object service = resolver.resolve(serviceClass);//this.applicationContext.getBean(serviceClass);
-
- try {
- Method method = resolveMethodFromClass(service.getClass(), request.getMethod());
- Object result = method.invoke(service, request.getParams()); // dubbo, fastjson,
- // 两次json序列化能否合并成一个
- response.setResult(JSON.toJSONString(result, SerializerFeature.WriteClassName));
- response.setStatus(true);
- return response;
- } catch ( IllegalAccessException | InvocationTargetException e) {
-
- // 3.Xstream
-
- // 2.封装一个统一的RpcfxException
- // 客户端也需要判断异常
- e.printStackTrace();
- response.setException(e);
- response.setStatus(false);
- return response;
- }
- }
-
- private Method resolveMethodFromClass(Class> klass, String methodName) {
- return Arrays.stream(klass.getMethods()).filter(m -> methodName.equals(m.getName())).findFirst().get();
- }
-
-}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/stub/MockHandler.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/stub/MockHandler.java
new file mode 100644
index 00000000..9fe5269c
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/stub/MockHandler.java
@@ -0,0 +1,30 @@
+package io.kimmking.rpcfx.stub;
+
+import io.kimmking.rpcfx.utils.MockUtils;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/11 02:57
+ */
+public class MockHandler implements InvocationHandler {
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ Class> type = method.getReturnType();
+ System.out.println("invoke by mock handler...");
+ return MockUtils.mock(type, null);
+ }
+
+ public static T createMock(Class serviceClass) {
+ //final ServiceMeta sm, Router router, LoadBalancer loadBalance, Filter filter) {
+ return (T) Proxy.newProxyInstance(MockHandler.class.getClassLoader(),
+ new Class[]{serviceClass}, new MockHandler());
+
+ }
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/stub/StubSkeletonHelper.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/stub/StubSkeletonHelper.java
new file mode 100644
index 00000000..e77215a8
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/stub/StubSkeletonHelper.java
@@ -0,0 +1,186 @@
+package io.kimmking.rpcfx.stub;
+
+import io.kimmking.rpcfx.api.*;
+import io.kimmking.rpcfx.consumer.RpcfxConsumerInvoker;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import io.kimmking.rpcfx.meta.ProviderMeta;
+import io.kimmking.rpcfx.meta.ServiceMeta;
+import io.kimmking.rpcfx.registry.RegistryCenter;
+import io.kimmking.rpcfx.utils.MethodUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.MultiValueMap;
+
+import java.lang.reflect.Method;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author lirui
+ */
+public class StubSkeletonHelper {
+
+ public static void createProvider(Class> clazz, Object serviceImpl, RpcContext rpcContext) {
+ String clazzName = clazz.getName();
+ Class> callClass = serviceImpl.getClass();
+
+ Method[] methodList = callClass.getMethods();
+ for (Method method : methodList) {
+ if (!checkRpcMethod(method)) {
+ continue;
+ }
+ ProviderMeta providerMeta = buildProviderMeta(method, serviceImpl);
+
+ MultiValueMap providerHolder = rpcContext.getProviderHolder();
+ providerHolder.add(clazzName, providerMeta);
+ }
+ }
+
+ private static ProviderMeta buildProviderMeta(Method method, Object serviceImpl) {
+ String methodSign = MethodUtils.methodSign(method);
+ ProviderMeta providerMeta = new ProviderMeta();
+ providerMeta.setMethod(method);
+ providerMeta.setServiceImpl(serviceImpl);
+ providerMeta.setMethodSign(methodSign);
+ return providerMeta;
+ }
+
+ public static boolean checkRpcMethod(final Method method) {
+ //本地方法不代理
+ if ("toString".equals(method.getName()) ||
+ "hashCode".equals(method.getName()) ||
+ "notifyAll".equals(method.getName()) ||
+ "equals".equals(method.getName()) ||
+ "wait".equals(method.getName()) ||
+ "getClass".equals(method.getName()) ||
+ "notify".equals(method.getName())) {
+ return false;
+ }
+ return true;
+ }
+
+ public static T createConsumer(ServiceMeta sm, RpcContext ctx, RegistryCenter rc) {
+ String clazzName = sm.getName();
+ Class> serviceClass = null;
+ try {
+ serviceClass = Class.forName(clazzName);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ T proxyHandler = (T) ctx.getConsumerHolder().get(clazzName);
+ if (proxyHandler == null) { // TODO configuration
+
+ ctx.setRouter(new TagRouter());
+ ctx.setLoadBalancer(new RoundRibbonLoadBalancer());
+ ctx.setFilters(createFilters(ctx));
+
+ T mockHandler = createMockHandler(ctx, serviceClass);
+ if(mockHandler != null) {
+ return mockHandler;
+ }
+
+ RpcfxConsumerInvoker consumerInvoker = new RpcfxConsumerInvoker(ctx, rc);
+ consumerInvoker.start();
+ proxyHandler = consumerInvoker.createFromRegistry(sm, ctx);
+ ctx.getConsumerHolder().put(clazzName, proxyHandler);
+ }
+ return proxyHandler;
+ }
+
+ private static Filter[] createFilters(RpcContext ctx) {
+ String cache = ctx.getParameters().getOrDefault("app.cache", "false");
+ Filter[] filters = null;
+ if("true".equalsIgnoreCase(cache)) {
+ filters = new Filter[]{new CuicuiFilter(), new CacheFilter()};
+ } else {
+ filters = new Filter[]{new CuicuiFilter()};
+ }
+ return filters;
+ }
+
+ private static T createMockHandler(RpcContext ctx, Class> serviceClass) {
+ String mock = ctx.getParameters().getOrDefault("app.mock", "false");
+ if("true".equalsIgnoreCase(mock)) {
+ return (T) MockHandler.createMock(serviceClass);
+ }
+ return null;
+ }
+
+
+ private static class TagRouter implements Router {
+ @Override
+ public List route(List instances) {
+ return instances;
+ }
+ }
+
+ private static class RoundRibbonLoadBalancer implements LoadBalancer {
+ private final AtomicInteger count = new AtomicInteger(0);
+ @Override
+ public InstanceMeta select(List instances) {
+ if(instances.isEmpty()) return null;
+ return instances.get((count.getAndIncrement() & Integer.MAX_VALUE) % instances.size());
+ }
+ }
+
+ private static class RandomLoadBalancer implements LoadBalancer {
+ private final Random random = new Random();
+ @Override
+ public InstanceMeta select(List instances) {
+ if(instances.isEmpty()) return null;
+ return instances.get(random.nextInt(instances.size()));
+ }
+ }
+
+ @Slf4j
+ private static class CuicuiFilter implements Filter {
+ @Override
+ public RpcfxResponse prefilter(RpcfxRequest request) {
+ //log.info("filter {} -> {}", this.getClass().getName(), request.toString());
+ //System.out.printf("filter %s -> %s%n", this.getClass().getName(), request.toString());
+ return null;
+ }
+
+ @Override
+ public RpcfxResponse postfilter(RpcfxRequest request, RpcfxResponse response) {
+ return response;
+ }
+
+ }
+
+ private static class CacheFilter implements Filter {
+
+ static Map CACHE = new HashMap<>();
+
+ @Override
+ public RpcfxResponse prefilter(RpcfxRequest request) {
+ RpcfxResponse response = CACHE.get(genKey(request));
+ if(response != null) {
+ System.out.println("CacheFilter.prefilter hit! => request: \n" + request + "\n =>response: \n" + response);
+ }
+ return response;
+ //log.info("filter {} -> {}", this.getClass().getName(), request.toString());
+ //System.out.printf("filter %s -> %s%n", this.getClass().getName(), request.toString());
+ }
+
+ @Override
+ public RpcfxResponse postfilter(RpcfxRequest request, RpcfxResponse response) {
+ String key = genKey(request);
+ if(!CACHE.containsKey(key)) {
+ CACHE.put(key, response);
+ }
+ return response;
+ }
+
+ }
+
+ public static String genKey(RpcfxRequest request) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(request.getServiceClass());
+ sb.append("@");
+ sb.append(request.getMethodSign());
+ //sb.append("");
+ Arrays.stream(request.getParams()).forEach(x -> sb.append("_"+x.toString()));
+ return sb.toString();
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/MethodUtils.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/MethodUtils.java
new file mode 100644
index 00000000..f17a102f
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/MethodUtils.java
@@ -0,0 +1,30 @@
+package io.kimmking.rpcfx.utils;
+
+import org.springframework.util.DigestUtils;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+
+public class MethodUtils {
+
+ public static String methodSign(Method method) {
+ if (method != null) {
+ StringBuilder builder = new StringBuilder();
+ String name = method.getName();
+ builder.append(name);
+ builder.append("@");
+ int count = method.getParameterCount();
+ builder.append(count);
+ builder.append("_");
+ if (count > 0) {
+ Class>[] classes = method.getParameterTypes();
+ Arrays.stream(classes).forEach(c -> builder.append(c.getName() + ","));
+ }
+ return builder.toString();
+// String string = builder.toString();
+// return DigestUtils.md5DigestAsHex(string.getBytes());
+ }
+ return "";
+ }
+
+}
\ No newline at end of file
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/MockUtils.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/MockUtils.java
new file mode 100644
index 00000000..8d22536a
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/MockUtils.java
@@ -0,0 +1,143 @@
+package io.kimmking.rpcfx.utils;
+
+import lombok.Data;
+import org.springframework.util.ClassUtils;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.*;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/11 03:15
+ */
+public class MockUtils {
+
+ public static Object mock(Class> clazz, Type[] generics) {
+ boolean primitiveOrWrapper = ClassUtils.isPrimitiveOrWrapper(clazz);
+ if(primitiveOrWrapper) return mockPrimitive(clazz);
+ if(String.class.equals(clazz)) return mockString();
+ if (Number.class.isAssignableFrom(clazz)) {
+ return 10;
+ }
+ if(clazz.isArray()) {
+ return mockArray(clazz.getComponentType());
+ }
+ if(List.class.isAssignableFrom(clazz)) {
+ return mockList(clazz, generics[0]);
+ }
+ if(Map.class.isAssignableFrom(clazz)) {
+ return mockMap(clazz, generics[1]);
+ }
+ return mockPojo(clazz);
+ }
+
+ private static Object mockMap(Class> clazz, Type generic) {
+ HashMap