diff --git a/02nio/nio02/pom.xml b/02nio/nio02/pom.xml
index 005de90a..c10848f8 100644
--- a/02nio/nio02/pom.xml
+++ b/02nio/nio02/pom.xml
@@ -29,7 +29,7 @@
io.netty
netty-all
- 4.1.45.Final
+ 4.1.104.Final
diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/NettyServerApplication.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/NettyServerApplication.java
index e67b7961..1adda64a 100644
--- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/NettyServerApplication.java
+++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/NettyServerApplication.java
@@ -2,7 +2,10 @@
import io.github.kimmking.gateway.inbound.HttpInboundServer;
+import io.netty.util.internal.PlatformDependent;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
import java.util.Arrays;
public class NettyServerApplication {
@@ -12,6 +15,13 @@ public class NettyServerApplication {
public static void main(String[] args) {
+// sun.misc.Unsafe unsafe = sun.misc.Unsafe.getUnsafe();
+// System.out.println(unsafe.addressSize());
+
+
+ System.out.println("PlatformDependent.hasUnsafe = " + PlatformDependent.javaVersion());
+ System.out.println("PlatformDependent.hasUnsafe = " + PlatformDependent.hasUnsafe());
+
String proxyPort = System.getProperty("proxyPort","8888");
// 这是之前的单个后端url的例子
diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HeaderHttpResponseFilter.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HeaderHttpResponseFilter.java
index 53493fb4..12fe310a 100644
--- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HeaderHttpResponseFilter.java
+++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HeaderHttpResponseFilter.java
@@ -1,10 +1,24 @@
package io.github.kimmking.gateway.filter;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
public class HeaderHttpResponseFilter implements HttpResponseFilter {
@Override
public void filter(FullHttpResponse response) {
response.headers().set("kk", "java-1-nio");
+ response.setStatus(HttpResponseStatus.CREATED);
+// byte[] array = response.content().array();
+// String content = new String(array);
+// System.out.println(content);
+// content = content + ",kimmking";
+ byte[] bytes = "hello,kimm.".getBytes();
+ //response.headers().setInt("Content-Length", bytes.length);
+ ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
+ ByteBuf content = response.content();
+ content.clear();
+ content.writeBytes(byteBuf);
}
}
diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java
index 69b40fde..1cd13ca2 100644
--- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java
+++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java
@@ -39,9 +39,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
// if (uri.contains("/test")) {
// handlerTest(fullRequest, ctx);
// }
-
- handler.handle(fullRequest, ctx, filter);
-
+
+ String uri = fullRequest.getUri();
+ System.out.println(" uri ==>> " + uri);
+ if(uri.contains("/netty/info")) {
+ NettyInfoHandler.INSTANCE.handle(fullRequest, ctx);
+ } else {
+ handler.handle(fullRequest, ctx, filter);
+ }
} catch(Exception e) {
e.printStackTrace();
} finally {
diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/NettyInfoHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/NettyInfoHandler.java
new file mode 100644
index 00000000..f179cc82
--- /dev/null
+++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/NettyInfoHandler.java
@@ -0,0 +1,83 @@
+package io.github.kimmking.gateway.inbound;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.util.internal.PlatformDependent;
+import lombok.SneakyThrows;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/5/31 下午7:13
+ */
+public class NettyInfoHandler {
+
+ public final static NettyInfoHandler INSTANCE = new NettyInfoHandler();
+
+ public void handle(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx) {
+ System.out.println("NettyInfoHandler.handle...");
+ Map infos = new HashMap<>();
+ infos.put("netty.usedDirectMemory", ""+getNettyUsedDirectMemory());
+ infos.put("netty.directMemoryLimit", ""+getNettyDirectMemoryLimit());
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ infos.forEach((k, v) -> {
+ sb.append("\"").append(k).append("\"")
+ .append(":")
+ .append("\"").append(v).append("\"").append(",");
+ });
+ if(sb.length()>1) {
+ sb.deleteCharAt(sb.length()-1);
+ }
+ sb.append("}");
+
+ byte[] body = ("{\"code\":200,\"msg\":\"success\",\"data\":" + sb +"}").getBytes();
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(body));
+
+ response.headers().set("Content-Type", "application/json");
+ response.headers().setInt("Content-Length", body.length);
+ response.headers().set("kk.gw.hanlder", "netty.info");
+
+ if (fullRequest != null) {
+ if (!HttpUtil.isKeepAlive(fullRequest)) {
+ ctx.write(response).addListener(ChannelFutureListener.CLOSE);
+ } else {
+ //response.headers().set(CONNECTION, KEEP_ALIVE);
+ ctx.write(response);
+ }
+ }
+ ctx.flush();
+ //ctx.close();
+
+ }
+
+ @SneakyThrows
+ private static long getNettyUsedDirectMemory() {
+ Field field = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER");
+ field.setAccessible(true);
+ AtomicLong o = (AtomicLong)field.get(null);
+ return o.get();
+ }
+
+ @SneakyThrows
+ private static long getNettyDirectMemoryLimit() {
+ Field field = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_LIMIT");
+ field.setAccessible(true);
+ return (Long)field.get(null);
+ }
+
+}
diff --git a/04fx/spring01/pom.xml b/04fx/spring01/pom.xml
index 505009a7..3075aaf2 100644
--- a/04fx/spring01/pom.xml
+++ b/04fx/spring01/pom.xml
@@ -10,7 +10,7 @@
- 4.3.29.RELEASE
+ 4.3.30.RELEASE
@@ -19,8 +19,8 @@
org.apache.maven.plugins
maven-compiler-plugin
- 8
- 8
+ 11
+ 11
diff --git a/04fx/spring01/src/main/java/io/kimmking/spring02/SpringDemo11.java b/04fx/spring01/src/main/java/io/kimmking/spring02/SpringDemo11.java
new file mode 100644
index 00000000..bcbc7e30
--- /dev/null
+++ b/04fx/spring01/src/main/java/io/kimmking/spring02/SpringDemo11.java
@@ -0,0 +1,49 @@
+package io.kimmking.spring02;
+
+import org.springframework.cglib.proxy.Enhancer;
+import org.springframework.cglib.proxy.MethodInterceptor;
+import org.springframework.cglib.proxy.MethodProxy;
+
+import java.lang.reflect.Method;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/1/22 18:01
+ */
+public class SpringDemo11 {
+
+ public static void main(String[] args) {
+ long s = System.currentTimeMillis();
+ Enhancer enhancer = new Enhancer();
+ enhancer.setInterfaces(new Class[]{IAction.class});
+ enhancer.setCallback(new MI());
+ enhancer.setUseCache(true);
+ IAction demo = (IAction) enhancer.create();
+ for (int i = 0; i < 5; i++) {
+ long ss = System.currentTimeMillis();
+ System.out.println(demo.action());
+ System.out.println( i + " *****====> invoke proxy " + (System.currentTimeMillis() - ss) + " ms");
+ }
+ System.out.println(" *****====> enhancer proxy " + (System.currentTimeMillis() - s) + " ms");
+
+ }
+
+ public interface IAction {
+ Object action();
+ }
+
+
+ static class MI implements MethodInterceptor {
+ @Override
+ public Object intercept(Object obj, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
+ long s = System.currentTimeMillis();
+ System.out.println(" *****==MI==> " + s + " " +"Before:"+method.getName());
+ Object result = "S-" + s;//methodProxy.invokeSuper(obj, objects);
+ System.out.println(" *****==MI==> " + (System.currentTimeMillis() - s) + " ms After:"+method.getName());
+ return result;
+ }
+ }
+
+}
diff --git a/07rpc/rpc01/client-rest.http b/07rpc/rpc01/client-rest.http
index 5808372d..cfe742ec 100644
--- a/07rpc/rpc01/client-rest.http
+++ b/07rpc/rpc01/client-rest.http
@@ -1 +1 @@
-http://127.0.0.1:8080/api/hello
\ No newline at end of file
+http://127.0.0.1:8091/api/hello
\ No newline at end of file
diff --git a/07rpc/rpc01/rpcfx-core/pom.xml b/07rpc/rpc01/rpcfx-core/pom.xml
index d8b62724..5a1eeac3 100644
--- a/07rpc/rpc01/rpcfx-core/pom.xml
+++ b/07rpc/rpc01/rpcfx-core/pom.xml
@@ -21,7 +21,7 @@
com.alibaba
fastjson
- 1.2.70
+ 1.2.83
@@ -45,7 +45,7 @@
org.apache.curator
- curator-framework
+ curator-recipes
5.1.0
@@ -70,11 +70,7 @@
-
- org.apache.curator
- curator-recipes
- 5.1.0
-
+
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Filter.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Filter.java
index 64f3b99d..29060ace 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Filter.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Filter.java
@@ -2,7 +2,9 @@
public interface Filter {
- boolean filter(RpcfxRequest request);
+ RpcfxResponse prefilter(RpcfxRequest request);
+
+ RpcfxResponse postfilter(RpcfxRequest request, RpcfxResponse response);
// Filter next();
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/LoadBalancer.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/LoadBalancer.java
index eccb66f5..5ac4fab2 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/LoadBalancer.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/LoadBalancer.java
@@ -1,9 +1,11 @@
package io.kimmking.rpcfx.api;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+
import java.util.List;
public interface LoadBalancer {
- String select(List urls);
+ InstanceMeta select(List instances);
}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Router.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Router.java
index 594aeff5..a4ed3225 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Router.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Router.java
@@ -1,8 +1,10 @@
package io.kimmking.rpcfx.api;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+
import java.util.List;
public interface Router {
- List route(List urls);
+ List route(List instances);
}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcContext.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcContext.java
index e5365ab7..79a65bde 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcContext.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcContext.java
@@ -2,6 +2,7 @@
import io.kimmking.rpcfx.meta.ProviderMeta;
import lombok.Getter;
+import lombok.Setter;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
@@ -17,9 +18,24 @@
public class RpcContext {
@Getter
- private MultiValueMap providerHolder = new LinkedMultiValueMap<>();
+ private final MultiValueMap providerHolder = new LinkedMultiValueMap<>();
@Getter
- private Map consumerHolder = new HashMap<>();
+ private final Map consumerHolder = new HashMap<>();
+
+ @Getter
+ private final Map parameters = new HashMap<>();
+
+ @Getter
+ @Setter
+ private Router router;
+
+ @Getter
+ @Setter
+ private LoadBalancer loadBalancer;
+
+ @Getter
+ @Setter
+ private Filter[] filters;
}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxRequest.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxRequest.java
index 5ee7b9e1..1e9edfb4 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxRequest.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxRequest.java
@@ -5,6 +5,6 @@
@Data
public class RpcfxRequest {
private String serviceClass;
- private String method;
+ private String methodSign;
private Object[] params;
}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/ConsumerBootstrap.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/ConsumerBootstrap.java
index 16527507..a69fe083 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/ConsumerBootstrap.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/ConsumerBootstrap.java
@@ -2,16 +2,24 @@
import io.kimmking.rpcfx.annotation.RpcfxReference;
import io.kimmking.rpcfx.api.RpcContext;
+import io.kimmking.rpcfx.meta.ServiceMeta;
+import io.kimmking.rpcfx.registry.RegistryCenter;
+import io.kimmking.rpcfx.registry.RegistryConfiguration;
import io.kimmking.rpcfx.stub.StubSkeletonHelper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.PropertyValues;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessor;
+import org.springframework.context.annotation.Import;
import org.springframework.stereotype.Component;
+import javax.annotation.PostConstruct;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -24,12 +32,39 @@
*/
@Slf4j
@Component
+@Import({RegistryConfiguration.class})
public class ConsumerBootstrap implements Closeable, InstantiationAwareBeanPostProcessor {
- private RpcContext rpcContext = new RpcContext();
+ private RpcContext context = new RpcContext();
private String scanPackage = "io.kimmking";
+ @Value("${app.id:app1}")
+ public String app;
+ @Value("${app.namespace:public}")
+ public String ns;
+ @Value("${app.env:dev}")
+ public String env;
+ @Value("${app.mock:false}")
+ public boolean mock;
+ @Value("${app.cache:false}")
+ public boolean cache;
+ @Value("${app.retry:1}")
+ public int retry;
+
+ @Autowired
+ RegistryCenter rc;
+
+ @PostConstruct
+ public void init() {
+ this.context.getParameters().put("app.id", app);
+ this.context.getParameters().put("app.namespace", ns);
+ this.context.getParameters().put("app.env", env);
+ this.context.getParameters().put("app.mock", String.valueOf(mock));
+ this.context.getParameters().put("app.cache", String.valueOf(cache));
+ this.context.getParameters().put("app.retry", String.valueOf(retry));
+ }
+
@Override
public void close() throws IOException {
@@ -38,14 +73,17 @@ public void close() throws IOException {
@Override
public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName) throws BeansException {
if (bean.getClass().getPackage().getName().startsWith(scanPackage)) {
- Field[] declaredFields = bean.getClass().getDeclaredFields();
- List consumers = Arrays.stream(declaredFields).filter(field -> field.isAnnotationPresent(RpcfxReference.class)).collect(Collectors.toList());
+ Field[] declaredFields = resolveAllField(bean.getClass()); // 解决父类里的注解扫描不到的问题
- consumers.stream().forEach(consumer -> {
- Object consumer1 = createConsumer(consumer.getType());
+ List consumers = Arrays.stream(declaredFields)
+ .filter(field -> field.isAnnotationPresent(RpcfxReference.class))
+ .collect(Collectors.toList());
+
+ consumers.stream().forEach(field -> {
+ Object consumer = createConsumer(field.getType());
try {
- consumer.setAccessible(true);
- consumer.set(bean, consumer1);
+ field.setAccessible(true);
+ field.set(bean, consumer);
} catch (IllegalAccessException e) {
log.error(e.getMessage(), e);
}
@@ -54,7 +92,19 @@ public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, Str
return null;
}
+ private Field[] resolveAllField(Class> aClass) {
+ List res = new ArrayList<>(20);
+ while ( !Object.class.equals(aClass) ) {
+ Field[] fields = aClass.getDeclaredFields();
+ res.addAll(Arrays.asList(fields));
+ aClass = aClass.getSuperclass();
+ }
+ return res.toArray(new Field[0]);
+ }
+
private T createConsumer(Class clazz) {
- return StubSkeletonHelper.createConsumer(clazz, rpcContext);
+ ServiceMeta sm = ServiceMeta.builder().name(clazz.getCanonicalName())
+ .app(app).namespace(ns).env(env).build();
+ return (T) StubSkeletonHelper.createConsumer(sm, context, rc);
}
}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxConsumerInvoker.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxConsumerInvoker.java
new file mode 100644
index 00000000..66d9ea89
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxConsumerInvoker.java
@@ -0,0 +1,70 @@
+package io.kimmking.rpcfx.consumer;
+
+
+import com.alibaba.fastjson.parser.ParserConfig;
+import io.kimmking.rpcfx.api.*;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import io.kimmking.rpcfx.meta.ServiceMeta;
+import io.kimmking.rpcfx.registry.RegistryCenter;
+
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.List;
+
+public final class RpcfxConsumerInvoker {
+
+ static {
+ ParserConfig.getGlobalInstance().addAccept("io.kimmking");
+ }
+
+ RpcContext ctx;
+
+ RegistryCenter rc;
+
+ public RpcfxConsumerInvoker(RpcContext ctx, RegistryCenter rc) {
+ this.ctx = ctx;
+ this.rc = rc; //"localhost:2181"
+ }
+
+ public void start() {
+ this.rc.start();
+ }
+
+ public void stop() {
+ this.rc.stop();
+ }
+
+ public T createFromRegistry(final ServiceMeta sm, RpcContext ctx) {
+
+ String service = sm.getName();//"io.kimking.rpcfx.demo.api.UserService";
+ System.out.println("====> "+service);
+ List invokers = new ArrayList<>();
+ Class> serviceClass = null;
+ try {
+
+ serviceClass = Class.forName(service);
+
+ List insts = rc.fetchInstances(sm);
+ if(insts != null && insts.size()>0) invokers.addAll(insts);
+ rc.subscribe(sm, e -> {
+ invokers.clear();
+ invokers.addAll((List)e.getData());
+ });
+
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ throw new RuntimeException(ex);
+ }
+
+ return (T) create(serviceClass, invokers, ctx);
+
+ }
+
+ private T create(Class serviceClass, List invokers, RpcContext ctx) {
+ RpcfxInvocationHandler invocationHandler
+ = new RpcfxInvocationHandler(serviceClass, invokers, ctx);
+ return (T) Proxy.newProxyInstance(RpcfxConsumerInvoker.class.getClassLoader(),
+ new Class[]{serviceClass}, invocationHandler);
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxInvocationHandler.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxInvocationHandler.java
index 5c755596..0325e0c7 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxInvocationHandler.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxInvocationHandler.java
@@ -2,31 +2,32 @@
import com.alibaba.fastjson.JSON;
import io.kimmking.rpcfx.api.*;
+import io.kimmking.rpcfx.meta.InstanceMeta;
import io.kimmking.rpcfx.stub.StubSkeletonHelper;
+import io.kimmking.rpcfx.utils.MethodUtils;
import okhttp3.*;
-import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
+import java.net.SocketTimeoutException;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class RpcfxInvocationHandler implements InvocationHandler {
+ public final Object target = new Object();
+
public static final MediaType JSONTYPE = MediaType.get("application/json; charset=utf-8");
private final Class> serviceClass;
- private final List invokers;
- private final Router router;
- private final LoadBalancer loadBalance;
- private final Filter[] filters;
+ private final List invokers;
+
+ private final RpcContext context;
- public RpcfxInvocationHandler(Class serviceClass, List invokers, Router router, LoadBalancer loadBalance, Filter... filters) {
+ public RpcfxInvocationHandler(Class serviceClass, List invokers, RpcContext ctx) {
this.serviceClass = serviceClass;
this.invokers = invokers;
- this.router = router;
- this.loadBalance = loadBalance;
- this.filters = filters;
+ this.context = ctx;
}
// 可以尝试,自己去写对象序列化,二进制还是文本的,,,rpcfx是xml自定义序列化、反序列化,json: code.google.com/p/rpcfx
@@ -36,66 +37,104 @@ public RpcfxInvocationHandler(Class serviceClass, List invokers,
@Override
public Object invoke(Object proxy, Method method, Object[] params) throws Throwable {
+ long start = System.currentTimeMillis();
+
if (!StubSkeletonHelper.checkRpcMethod(method)){
- return null ;
+ return method.invoke(target, params);
}
- List urls = router.route(invokers);
+
+ int retry = 2;
+ while (retry-- > 0) {
+ System.out.println("retry:" + retry);
+ try {
+
+ // check mock, 挡板功能 TODO 3
+
+ List insts = context.getRouter().route(invokers);
// System.out.println("router.route => ");
// urls.forEach(System.out::println);
- String url = loadBalance.select(urls); // router, loadbalance
+ InstanceMeta instance = context.getLoadBalancer().select(insts); // router, loadbalance
// System.out.println("loadBalance.select => ");
// System.out.println("final => " + url);
- if (url == null) {
- throw new RuntimeException("No available providers from registry center.");
- }
+ if (instance == null) {
+ throw new RuntimeException("No available providers from registry center.");
+ }
- // 加filter地方之二
- // mock == true, new Student("hubao");
- RpcfxRequest request = new RpcfxRequest();
- request.setServiceClass(this.serviceClass.getName());
- request.setMethod(method.getName());
- request.setParams(params);
+ RpcfxRequest request = new RpcfxRequest();
+ request.setServiceClass(this.serviceClass.getName());
+ request.setMethodSign(MethodUtils.methodSign(method));
+ request.setParams(params);
- if (null!=filters) {
- for (Filter filter : filters) {
- if (!filter.filter(request)) {
- return null;
+ Filter[] filters = context.getFilters();
+
+ if (null != filters) {
+ for (Filter filter : filters) {
+ RpcfxResponse response = filter.prefilter(request);
+ if (response != null) {
+ return JSON.parse(response.getResult().toString());
+ }
+ }
}
- }
- }
- RpcfxResponse response = post(request, url);
+ // 没有控制超时,可能会很久 TODO 2
+ RpcfxResponse response = post(request, instance);
+
+ if (null != filters) {
+ for (Filter filter : filters) {
+ RpcfxResponse postResponse = filter.postfilter(request, response);
+ if (postResponse!=null) {
+ response = postResponse;
+ }
+ }
+ }
+
+ System.out.println("Invoke spend " + (System.currentTimeMillis()-start) + " ms");
+
+ // 加filter地方之三
+ // Student.setTeacher("cuijing");
- // 加filter地方之三
- // Student.setTeacher("cuijing");
+ // 这里判断response.status,处理异常
+ // 考虑封装一个全局的RpcfxException
- // 这里判断response.status,处理异常
- // 考虑封装一个全局的RpcfxException
+ return JSON.parse(response.getResult().toString());
+
+ } catch (RuntimeException ex) {
+ ex.printStackTrace();
+ if(! (ex.getCause() instanceof SocketTimeoutException)) {
+ break;
+ }
+ }
+ }
+ return null;
- return JSON.parse(response.getResult().toString());
}
OkHttpClient client = new OkHttpClient.Builder()
.connectionPool(new ConnectionPool(128, 60, TimeUnit.SECONDS))
// .dispatcher(dispatcher)
-// .readTimeout(httpClientConfig.getReadTimeout(), TimeUnit.SECONDS)
-// .writeTimeout(httpClientConfig.getWriteTimeout(), TimeUnit.SECONDS)
-// .connectTimeout(httpClientConfig.getConnectTimeout(), TimeUnit.SECONDS)
+ .readTimeout(1, TimeUnit.SECONDS)
+ .writeTimeout(1, TimeUnit.SECONDS)
+ .connectTimeout(1, TimeUnit.SECONDS)
.build();
- private RpcfxResponse post(RpcfxRequest req, String url) throws IOException {
+ private RpcfxResponse post(RpcfxRequest req, InstanceMeta instance) throws Exception {
String reqJson = JSON.toJSONString(req);
-// System.out.println("req json: "+reqJson);
+ System.out.println("req json: "+reqJson);
final Request request = new Request.Builder()
- .url(url)
+ .url(instance.toString())
.post(RequestBody.create(JSONTYPE, reqJson))
.build();
- String respJson = client.newCall(request).execute().body().string();
-// System.out.println("resp json: "+respJson);
+ String respJson;
+ try {
+ respJson = client.newCall(request).execute().body().string();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ System.out.println("resp json: "+respJson);
return JSON.parseObject(respJson, RpcfxResponse.class);
}
}
\ No newline at end of file
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxInvoker.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxInvoker.java
deleted file mode 100644
index 356c7fe3..00000000
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/consumer/RpcfxInvoker.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package io.kimmking.rpcfx.consumer;
-
-
-import com.alibaba.fastjson.parser.ParserConfig;
-import io.kimmking.rpcfx.api.*;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-
-import java.lang.reflect.Proxy;
-import java.util.ArrayList;
-import java.util.List;
-
-public final class RpcfxInvoker {
-
- static {
- ParserConfig.getGlobalInstance().addAccept("io.kimmking");
- }
- CuratorFramework client;
- String zkUrl = null;
-
- public RpcfxInvoker(String zkUrl) {
- this.zkUrl = zkUrl; //"localhost:2181"
- this.start();
- }
-
- public void start() {
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- client = CuratorFrameworkFactory.builder().connectString(this.zkUrl).namespace("rpcfx").retryPolicy(retryPolicy).build();
- client.start();
- }
-
- public void stop() {
- client.close();
- }
-
- public T createFromRegistry(final Class serviceClass, Router router, LoadBalancer loadBalance, Filter filter) {
-
- String service = serviceClass.getCanonicalName();//"io.kimking.rpcfx.demo.api.UserService";
- System.out.println("====> "+service);
- List invokers = new ArrayList<>();
-
- try {
-
- if ( null == client.checkExists().forPath("/" + service)) {
- return null;
- }
-
- fetchInvokers(client, service, invokers);
-
- final TreeCache treeCache = TreeCache.newBuilder(client, "/" + service).setCacheData(true).setMaxDepth(2).build();
- treeCache.getListenable().addListener(new TreeCacheListener() {
- public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
- System.out.println("treeCacheEvent: "+treeCacheEvent);
- fetchInvokers(client, service, invokers);
- }
- });
- treeCache.start();
-
- } catch (Exception ex) {
- ex.printStackTrace();
- }
-
- return (T) create(serviceClass, invokers, router, loadBalance, filter);
-
- }
-
-
-
- private void fetchInvokers(CuratorFramework client, String service, List invokers) throws Exception {
- List services = client.getChildren().forPath("/" + service);
- invokers.clear();
- for (String svc : services) {
- System.out.println(svc);
- String url = svc.replace("_", ":");
- invokers.add("http://" + url);
- }
- }
-
- private T create(Class serviceClass, List invokers, Router router, LoadBalancer loadBalance, Filter... filters) {
- RpcfxInvocationHandler invocationHandler
- = new RpcfxInvocationHandler(serviceClass, invokers, router, loadBalance, filters);
- return (T) Proxy.newProxyInstance(RpcfxInvoker.class.getClassLoader(),
- new Class[]{serviceClass}, invocationHandler);
- }
-
-}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/discovery/DiscoveryClient.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/discovery/DiscoveryClient.java
deleted file mode 100644
index 46f89433..00000000
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/discovery/DiscoveryClient.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package io.kimmking.rpcfx.discovery;
-
-/**
- * Description for this class.
- *
- * @Author : kimmking(kimmking@apache.org)
- * @create 2024/1/13 20:17
- */
-public class DiscoveryClient {
-}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/InstanceMeta.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/InstanceMeta.java
new file mode 100644
index 00000000..e1388143
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/InstanceMeta.java
@@ -0,0 +1,48 @@
+package io.kimmking.rpcfx.meta;
+
+import com.google.common.base.Strings;
+import lombok.*;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/8 19:46
+ */
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode(of = {"scheme", "host", "port", "context"})
+public class InstanceMeta {
+
+ private String scheme;
+ private String host;
+ private Integer port;
+ private String context;
+ private boolean status;
+ private Map metadata;
+
+ public static InstanceMeta from(String instance) {
+ URI uri = URI.create(instance);
+ String path = uri.getPath();
+ path = Strings.isNullOrEmpty(path) ? "" : path.substring(1);
+ return InstanceMeta.builder()
+ .scheme(uri.getScheme())
+ .host(uri.getHost())
+ .port(uri.getPort())
+ .context(path)
+ .build();
+ }
+
+ @Override
+ public String toString() {
+ return scheme + "://" + host + ":" + port + "/" + context;
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ServerMeta.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ServerMeta.java
new file mode 100644
index 00000000..239042b8
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ServerMeta.java
@@ -0,0 +1,24 @@
+package io.kimmking.rpcfx.meta;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/4/13 21:43
+ */
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode(of = {"url"})
+public class ServerMeta {
+ private String url;
+ private boolean leader;
+ private boolean status;
+ private long version;
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ServiceMeta.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ServiceMeta.java
new file mode 100644
index 00000000..c443a272
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/meta/ServiceMeta.java
@@ -0,0 +1,25 @@
+package io.kimmking.rpcfx.meta;
+
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/8 19:46
+ */
+@Data
+@Builder
+public class ServiceMeta {
+
+ private String app;
+ private String namespace;
+ private String env;
+ private String name;
+
+ @Override
+ public String toString() {
+ return String.format("%s_%s_%s_%s", app, namespace, env, name);
+ }
+}
\ No newline at end of file
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/ProviderBootstrap.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/ProviderBootstrap.java
index 5b541a34..ce714bb0 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/ProviderBootstrap.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/ProviderBootstrap.java
@@ -2,14 +2,19 @@
import io.kimmking.rpcfx.annotation.RpcfxService;
import io.kimmking.rpcfx.api.RpcContext;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import io.kimmking.rpcfx.meta.ServiceMeta;
import io.kimmking.rpcfx.registry.RegistryCenter;
+import io.kimmking.rpcfx.registry.RegistryConfiguration;
import io.kimmking.rpcfx.stub.StubSkeletonHelper;
import lombok.Getter;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Import;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.Environment;
@@ -29,6 +34,7 @@
*/
@Component
+@Import({RegistryConfiguration.class})
public class ProviderBootstrap {
@Autowired
@@ -37,16 +43,24 @@ public class ProviderBootstrap {
@Autowired
Environment environment;
- private RpcContext rpcContext = new RpcContext();
+ @Value("${app.id:app1}")
+ public String app;
+ @Value("${app.namespace:public}")
+ public String ns;
+ @Value("${app.env:dev}")
+ public String env;
+
+ private final RpcContext context = new RpcContext();
@Getter
- private RpcfxInvoker invoker;
+ private final RpcfxProviderInvoker invoker = new RpcfxProviderInvoker(context);;
- private static String PROTO = "http";
+ private static String SCHEME = "http";
private static String ip;
private static int port;
- RegistryCenter registry = new RegistryCenter();
+ @Autowired
+ RegistryCenter registry;// = new KKRegistryCenter();
@SneakyThrows
@PostConstruct
@@ -77,8 +91,7 @@ private void buildProvider() {
}
private void createProvider(Class> clazz, Object bean) {
- StubSkeletonHelper.createProvider(clazz, bean, rpcContext); // 初始化了holder
- this.invoker = new RpcfxInvoker(rpcContext);
+ StubSkeletonHelper.createProvider(clazz, bean, context); // 初始化了holder
}
@Order(Integer.MIN_VALUE)
@@ -91,13 +104,18 @@ private void registerServices() {
registry.start();
- System.out.println("registry all services from zk...");
- rpcContext.getProviderHolder().forEach( (x,y) ->
+ System.out.println("registry all services from RegistryCenter...");
+ context.getProviderHolder().forEach( (x, y) ->
{
System.out.println(" register " + x);
+ ServiceMeta sm = ServiceMeta.builder().name(x)
+ .app(app).namespace(ns).env(env).build();
+ InstanceMeta im = InstanceMeta.builder()
+ .scheme(SCHEME).host(ip).port(port).context("").build();
try {
- registry.registerService(x, ip, port);
+ registry.registerService(sm, im);
+ registry.heartbeat(sm, im);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -111,12 +129,16 @@ public void stop() {
}
private void unregisterServices() {
- System.out.println("unregistry all services from zk...");
- rpcContext.getProviderHolder().forEach( (x,y) ->
+ System.out.println("unregistry all services from RegistryCenter...");
+ context.getProviderHolder().forEach( (x, y) ->
{
System.out.println(" unregister " + x);
+ ServiceMeta sm = ServiceMeta.builder().name(x)
+ .app(app).namespace(ns).env(env).build();
+ InstanceMeta im = InstanceMeta.builder()
+ .scheme(SCHEME).host(ip).port(port).context("").build();
try {
- registry.unregisterService(x, ip, port);
+ registry.unregisterService(sm, im);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/RpcfxInvoker.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/RpcfxProviderInvoker.java
similarity index 87%
rename from 07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/RpcfxInvoker.java
rename to 07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/RpcfxProviderInvoker.java
index 4767e046..6842dfe6 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/RpcfxInvoker.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/provider/RpcfxProviderInvoker.java
@@ -13,11 +13,11 @@
import java.util.List;
import java.util.Optional;
-public class RpcfxInvoker {
+public class RpcfxProviderInvoker {
RpcContext context;
- public RpcfxInvoker(RpcContext context) {
+ public RpcfxProviderInvoker(RpcContext context) {
this.context = context;
}
@@ -25,10 +25,11 @@ public RpcfxResponse invoke(RpcfxRequest request) {
RpcfxResponse response = new RpcfxResponse();
String serviceClass = request.getServiceClass();
- ProviderMeta meta = findProvider(serviceClass, request.getMethod());
+ ProviderMeta meta = findProvider(serviceClass, request.getMethodSign());
try {
Method method = meta.getMethod();
+ // 没有控制超时,所以可能会很久 TODO 1
Object result = method.invoke(meta.getServiceImpl(), request.getParams()); // dubbo, fastjson,
// 两次json序列化能否合并成一个
response.setResult(JSON.toJSONString(result, SerializerFeature.WriteClassName));
@@ -50,7 +51,8 @@ public RpcfxResponse invoke(RpcfxRequest request) {
protected ProviderMeta findProvider(String interfaceName, String methodSign) {
List providerMetas = context.getProviderHolder().get(interfaceName);
if (!CollectionUtils.isEmpty(providerMetas)) {
- Optional providerMeta = providerMetas.stream().filter(provider -> methodSign.equals(provider.getMethodSign())).findFirst();
+ Optional providerMeta = providerMetas.stream()
+ .filter(provider -> methodSign.equals(provider.getMethodSign())).findFirst();
return providerMeta.orElse(null);
}
return null;
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/ChangedListener.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/ChangedListener.java
new file mode 100644
index 00000000..971cbcda
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/ChangedListener.java
@@ -0,0 +1,13 @@
+package io.kimmking.rpcfx.registry;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/8 20:19
+ */
+public interface ChangedListener {
+
+ void fireEvent(Event e);
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/Event.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/Event.java
new file mode 100644
index 00000000..19a92cd4
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/Event.java
@@ -0,0 +1,29 @@
+package io.kimmking.rpcfx.registry;
+
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/8 20:20
+ */
+public interface Event {
+
+ T getData();
+
+ static Event> withData(List list) {
+ return new ChangedEvent(list);
+ }
+
+ @Data
+ @AllArgsConstructor
+ class ChangedEvent implements Event> {
+ List data;
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/RegistryCenter.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/RegistryCenter.java
index f7ec83bf..871c1a7e 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/RegistryCenter.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/RegistryCenter.java
@@ -1,57 +1,35 @@
package io.kimmking.rpcfx.registry;
import io.kimmking.rpcfx.api.ServiceProviderDesc;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import io.kimmking.rpcfx.meta.ServiceMeta;
import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
+import java.util.List;
+
/**
* Description for this class.
*
* @Author : kimmking(kimmking@apache.org)
- * @create 2024/1/13 20:16
+ * @create 2024/2/8 15:23
*/
-public class RegistryCenter {
-
- CuratorFramework client = null;
- public void start() {
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- client = CuratorFrameworkFactory.builder().connectString("localhost:2181").namespace("rpcfx").retryPolicy(retryPolicy).build();
- client.start();
- }
-
- public void stop(){
- client.close();
- }
-
- public void registerService(String service, String host, int port) throws Exception {
- ServiceProviderDesc userServiceSesc = ServiceProviderDesc.builder()
- .host(host)
- .port(port).serviceClass(service).build();
- // String userServiceSescJson = JSON.toJSONString(userServiceSesc);
-
- try {
- if ( null == client.checkExists().forPath("/" + service)) {
- client.create().withMode(CreateMode.PERSISTENT).forPath("/" + service, "service".getBytes());
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- }
-
- client.create().withMode(CreateMode.EPHEMERAL).
- forPath( "/" + service + "/" + userServiceSesc.getHost() + "_" + userServiceSesc.getPort(), "provider".getBytes());
- }
-
- public void unregisterService(String service, String host, int port) throws Exception {
-
- if (null == client.checkExists().forPath("/" + service)) {
- return;
- }
- System.out.println("delete " + "/" + service + "/" + host + "_" + port);
- client.delete().quietly().
- forPath( "/" + service + "/" + host + "_" + port);
- }
+public interface RegistryCenter {
+
+ void start();
+
+ void stop();
+
+ void registerService(ServiceMeta service, InstanceMeta instance) throws Exception;
+
+ void unregisterService(ServiceMeta service, InstanceMeta instance) throws Exception;
+
+ List fetchInstances(ServiceMeta service) throws Exception;
+
+ void subscribe(ServiceMeta service, ChangedListener> listener);
+
+ void heartbeat(ServiceMeta service, InstanceMeta instance);
}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/RegistryConfiguration.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/RegistryConfiguration.java
new file mode 100644
index 00000000..8391c4aa
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/RegistryConfiguration.java
@@ -0,0 +1,24 @@
+package io.kimmking.rpcfx.registry;
+
+import io.kimmking.rpcfx.registry.kkregistry.KKRegistryCenter;
+import io.kimmking.rpcfx.registry.zookeeper.ZookeeperRegistryCenter;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/9 01:05
+ */
+
+@Configuration
+public class RegistryConfiguration {
+
+ @Bean
+ RegistryCenter createRC() {
+ return new KKRegistryCenter();
+ //return new ZookeeperRegistryCenter(); //KKRegistryCenter();
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/kkregistry/KKHeathChecker.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/kkregistry/KKHeathChecker.java
new file mode 100644
index 00000000..bbf0e17e
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/kkregistry/KKHeathChecker.java
@@ -0,0 +1,42 @@
+package io.kimmking.rpcfx.registry.kkregistry;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/1 06:18
+ */
+public class KKHeathChecker {
+
+ final int interval = 5_000;
+
+ final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ static final DateTimeFormatter DTF = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss");
+
+ public void check(Callback callback) {
+ executor.scheduleWithFixedDelay(() -> {
+ System.out.println("start to check kk health ...[" + DTF.format(LocalDateTime.now()) + "]");
+ try {
+ callback.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, interval, interval, TimeUnit.MILLISECONDS);
+ }
+
+ public void stop() {
+ this.executor.shutdown();
+ }
+
+ public interface Callback {
+ void call() throws Exception;
+ }
+
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/kkregistry/KKRegistryCenter.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/kkregistry/KKRegistryCenter.java
new file mode 100644
index 00000000..001a3854
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/kkregistry/KKRegistryCenter.java
@@ -0,0 +1,209 @@
+package io.kimmking.rpcfx.registry.kkregistry;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import io.kimmking.rpcfx.meta.ServerMeta;
+import io.kimmking.rpcfx.meta.ServiceMeta;
+import io.kimmking.rpcfx.registry.ChangedListener;
+import io.kimmking.rpcfx.registry.Event;
+import io.kimmking.rpcfx.registry.RegistryCenter;
+import lombok.SneakyThrows;
+import okhttp3.ConnectionPool;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static io.kimmking.rpcfx.consumer.RpcfxInvocationHandler.JSONTYPE;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/8 15:25
+ */
+public class KKRegistryCenter implements RegistryCenter {
+
+ public String RC_Server = "http://localhost:8485";
+ private ServerMeta leader;
+ private List servers;
+ private Map TV = new HashMap<>();
+
+ OkHttpClient client;
+ @SneakyThrows
+ @Override
+ public void start() {
+ client = new OkHttpClient.Builder()
+ .connectionPool(new ConnectionPool(128, 60, TimeUnit.SECONDS))
+// .dispatcher(dispatcher)
+ .readTimeout(65, TimeUnit.SECONDS)
+ .writeTimeout(65, TimeUnit.SECONDS)
+ .connectTimeout(3, TimeUnit.SECONDS)
+ .build();
+
+ String url = RC_Server + "/cluster";
+ boolean init = false;
+ while(!init) {
+ System.out.println("===============>> cluster info from :" + url);
+ List new_servers = null;
+ ServerMeta new_leader = null;
+ try {
+ String respJson = get(url);
+ new_servers = JSON.parseObject(respJson, new TypeReference>() {
+ });
+ new_leader = new_servers.stream().filter(ServerMeta::isStatus)
+ .filter(ServerMeta::isLeader).findFirst().orElse(null);
+ } catch (Exception exception) {
+ exception.printStackTrace();
+ }
+
+ if(new_leader == null) {
+ System.out.println("===============>> no leader, 500ms later and retry.");
+ Thread.sleep(500);
+ Random random = new Random();
+ if(new_servers !=null && new_servers.size() > 1) {
+ url = new_servers.get(random.nextInt(new_servers.size())).getUrl() + "/cluster";
+ } else if((new_servers ==null || new_servers.isEmpty()) && !servers.isEmpty()) {
+ url = servers.get(random.nextInt(servers.size())).getUrl() + "/cluster";
+ }
+ } else {
+ this.servers = new_servers;
+ this.leader = new_leader;
+ init = true;
+ System.out.println("===============>> init ok, new_leader = " + new_leader);
+ System.out.println("===============>> init ok, new_servers = " + new_servers);
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ this.checker.stop();
+ }
+
+ @Override
+ public void registerService(ServiceMeta service, InstanceMeta instance) throws Exception {
+ instance.setStatus(true);
+ String reqJson = JSON.toJSONString(instance);
+ String url = leader.getUrl() + "/reg?service=" + service;
+ post(url, reqJson);
+// String reqJson = "{\n" +
+// " \"scheme\": \"http\",\n" +
+// " \"ip\": \"" + instance.getIp() + "\",\n" +
+// " \"port\": \"" + instance.getPort() + "\",\n" +
+// " \"context\": \"\",\n" +
+// " \"status\": \"online\",\n" +
+// " \"metadata\": {\n" +
+// " \"env\": \"dev\",\n" +
+// " \"tag\": \"RED\"\n" +
+// " }\n" +
+// "}";
+// final Request request = new Request.Builder()
+// .url("http://localhost:8484/reg?service=" + service)
+// .post(RequestBody.create(JSONTYPE, reqJson))
+// .build();
+// String respJson = client.newCall(request).execute().body().string();
+// System.out.println(respJson);
+ }
+
+ private String post(String url, String reqJson) throws IOException {
+ System.out.println(" ====> request: " + url);
+ final Request request = new Request.Builder()
+ .url(url)
+ .post(RequestBody.create(JSONTYPE, reqJson))
+ .build();
+ String respJson = client.newCall(request).execute().body().string();
+ System.out.println(" ====> response: " + respJson);
+ return respJson;
+ }
+
+ private String get(String url) throws IOException {
+ System.out.println(" ====> request: " + url);
+ final Request request = new Request.Builder()
+ .url(url)
+ .get()
+ .build();
+ String respJson = client.newCall(request).execute().body().string();
+ System.out.println(" ====> response: " + respJson);
+ return respJson;
+ }
+
+ @Override
+ public void unregisterService(ServiceMeta service, InstanceMeta instance) throws Exception {
+ String reqJson = "{\n" +
+ " \"scheme\": \"http\",\n" +
+ " \"host\": \"" + instance.getHost() + "\",\n" +
+ " \"port\": \"" + instance.getPort() + "\",\n" +
+ " \"context\": \"\"\n" +
+ "}";
+ String url = leader.getUrl() + "/unreg?service=" + service;
+ post(url, reqJson);
+ }
+
+ public List fetchInstances(ServiceMeta service) throws Exception {
+ String url = RC_Server + "/findAll?service=" + service;
+ String respJson = get(url);
+ List instances = JSON.parseObject(respJson, new TypeReference>() {
+ });
+ return instances;
+ }
+
+ KKHeathChecker checker = new KKHeathChecker();
+
+ // for Consumer
+ public void subscribe(ServiceMeta service, final ChangedListener> listener) {
+ checker.check( () -> {
+ if(hb(service)) {
+ List instances = fetchInstances(service);
+ Event> e = Event.withData(instances);
+ listener.fireEvent(e);
+ }
+ });
+
+ // 定时器轮询
+ // 保存上一次的TV
+ // 如果有差异就fire
+ }
+
+ private boolean hb(ServiceMeta service) throws Exception {
+ String svc = service.toString();
+ String url = RC_Server + "/version?service=" + svc;
+ String respJson = get(url);
+ Long v = Long.valueOf(respJson);
+ Long o = TV.getOrDefault(svc, -1L);
+ if ( v > o) {
+ TV.put(svc, v);
+ return o > -1L;
+ }
+ return false;
+ }
+
+
+ // for Provider
+ public void heartbeat(ServiceMeta service, InstanceMeta instance) {
+ checker.check( () -> {
+ heart(service, instance);
+ });
+ }
+
+ Long heart(ServiceMeta service, InstanceMeta instance) throws Exception {
+ String reqJson = "{\n" +
+ " \"scheme\": \"http\",\n" +
+ " \"host\": \"" + instance.getHost() + "\",\n" +
+ " \"port\": \"" + instance.getPort() + "\",\n" +
+ " \"context\": \"\",\n" +
+ " \"status\": true\n" +
+ "}";
+ String url = leader.getUrl() + "/renew?service=" + service;
+ String respJson = post(url, reqJson);
+ return Long.valueOf(respJson);
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/zookeeper/ZookeeperRegistryCenter.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/zookeeper/ZookeeperRegistryCenter.java
new file mode 100644
index 00000000..37f7b6ea
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/registry/zookeeper/ZookeeperRegistryCenter.java
@@ -0,0 +1,101 @@
+package io.kimmking.rpcfx.registry.zookeeper;
+
+import io.kimmking.rpcfx.api.ServiceProviderDesc;
+import io.kimmking.rpcfx.meta.InstanceMeta;
+import io.kimmking.rpcfx.meta.ServiceMeta;
+import io.kimmking.rpcfx.registry.ChangedListener;
+import io.kimmking.rpcfx.registry.Event;
+import io.kimmking.rpcfx.registry.RegistryCenter;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.CreateMode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/1/13 20:16
+ */
+public class ZookeeperRegistryCenter implements RegistryCenter {
+
+// private final List listeners = new ArrayList<>();
+// public void addListener(ChangedListener listener) {
+// this.listeners.add(listener);
+// }
+
+ CuratorFramework client = null;
+ public void start() {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ client = CuratorFrameworkFactory.builder().connectString("localhost:2181").namespace("rpcfx").retryPolicy(retryPolicy).build();
+ client.start();
+ }
+
+ public void stop(){
+ client.close();
+ }
+
+ public void registerService(ServiceMeta service, InstanceMeta instance) throws Exception {
+ ServiceProviderDesc userServiceSesc = ServiceProviderDesc.builder()
+ .host(instance.getHost())
+ .port(instance.getPort()).serviceClass(service.getName()).build();
+ // String userServiceSescJson = JSON.toJSONString(userServiceSesc);
+
+ try {
+ if ( null == client.checkExists().forPath("/" + service)) {
+ client.create().withMode(CreateMode.PERSISTENT).forPath("/" + service, "service".getBytes());
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+
+ client.create().withMode(CreateMode.EPHEMERAL).
+ forPath( "/" + service + "/" + userServiceSesc.getHost() + "_" + userServiceSesc.getPort(), "provider".getBytes());
+ }
+
+ public void unregisterService(ServiceMeta service, InstanceMeta instance) throws Exception {
+
+ if (null == client.checkExists().forPath("/" + service)) {
+ return;
+ }
+ System.out.println("delete " + "/" + service + "/" + instance.getHost() + "_" + instance.getPort());
+ client.delete().quietly().
+ forPath( "/" + service + "/" + instance.getHost() + "_" + instance.getPort());
+ }
+
+ public List fetchInstances(ServiceMeta service) throws Exception {
+ List services = client.getChildren().forPath("/" + service);
+ List instances = new ArrayList<>();
+ for (String svc : services) {
+ System.out.println(svc);
+ String url = svc.replace("_", ":");
+ instances.add(InstanceMeta.from("http://" + url));
+ }
+ return instances;
+ }
+
+ public void subscribe(ServiceMeta service, ChangedListener listener) {
+ final TreeCache treeCache = TreeCache.newBuilder(client, "/" + service).setCacheData(true).setMaxDepth(2).build();
+ treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {
+ System.out.println("treeCacheEvent: "+treeCacheEvent);
+ List instances = fetchInstances(service);
+ listener.fireEvent(Event.withData(instances));
+ });
+ try {
+ treeCache.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void heartbeat(ServiceMeta service, InstanceMeta instance) {
+ // do nothing
+ }
+
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/stub/MockHandler.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/stub/MockHandler.java
new file mode 100644
index 00000000..9fe5269c
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/stub/MockHandler.java
@@ -0,0 +1,30 @@
+package io.kimmking.rpcfx.stub;
+
+import io.kimmking.rpcfx.utils.MockUtils;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/11 02:57
+ */
+public class MockHandler implements InvocationHandler {
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ Class> type = method.getReturnType();
+ System.out.println("invoke by mock handler...");
+ return MockUtils.mock(type, null);
+ }
+
+ public static T createMock(Class serviceClass) {
+ //final ServiceMeta sm, Router router, LoadBalancer loadBalance, Filter filter) {
+ return (T) Proxy.newProxyInstance(MockHandler.class.getClassLoader(),
+ new Class[]{serviceClass}, new MockHandler());
+
+ }
+}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/stub/StubSkeletonHelper.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/stub/StubSkeletonHelper.java
index ebe5f7ab..e77215a8 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/stub/StubSkeletonHelper.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/stub/StubSkeletonHelper.java
@@ -1,14 +1,18 @@
package io.kimmking.rpcfx.stub;
import io.kimmking.rpcfx.api.*;
-import io.kimmking.rpcfx.consumer.RpcfxInvoker;
+import io.kimmking.rpcfx.consumer.RpcfxConsumerInvoker;
+import io.kimmking.rpcfx.meta.InstanceMeta;
import io.kimmking.rpcfx.meta.ProviderMeta;
+import io.kimmking.rpcfx.meta.ServiceMeta;
+import io.kimmking.rpcfx.registry.RegistryCenter;
+import io.kimmking.rpcfx.utils.MethodUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.MultiValueMap;
import java.lang.reflect.Method;
-import java.util.List;
-import java.util.Random;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* @author lirui
@@ -32,7 +36,7 @@ public static void createProvider(Class> clazz, Object serviceImpl, RpcContext
}
private static ProviderMeta buildProviderMeta(Method method, Object serviceImpl) {
- String methodSign = method.getName();//MethodUtils.methodSign(method);
+ String methodSign = MethodUtils.methodSign(method);
ProviderMeta providerMeta = new ProviderMeta();
providerMeta.setMethod(method);
providerMeta.setServiceImpl(serviceImpl);
@@ -54,43 +58,129 @@ public static boolean checkRpcMethod(final Method method) {
return true;
}
- public static T createConsumer(Class clazz, RpcContext rpcContext) {
- String clazzName = clazz.getName();
- T proxyHandler = (T) rpcContext.getConsumerHolder().get(clazzName);
+ public static T createConsumer(ServiceMeta sm, RpcContext ctx, RegistryCenter rc) {
+ String clazzName = sm.getName();
+ Class> serviceClass = null;
+ try {
+ serviceClass = Class.forName(clazzName);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ T proxyHandler = (T) ctx.getConsumerHolder().get(clazzName);
if (proxyHandler == null) { // TODO configuration
- proxyHandler = new RpcfxInvoker("localhost:2181")
- .createFromRegistry(clazz, new TagRouter(),
- new RandomLoadBalancer(), new CuicuiFilter());
- rpcContext.getConsumerHolder().put(clazzName, proxyHandler);
+
+ ctx.setRouter(new TagRouter());
+ ctx.setLoadBalancer(new RoundRibbonLoadBalancer());
+ ctx.setFilters(createFilters(ctx));
+
+ T mockHandler = createMockHandler(ctx, serviceClass);
+ if(mockHandler != null) {
+ return mockHandler;
+ }
+
+ RpcfxConsumerInvoker consumerInvoker = new RpcfxConsumerInvoker(ctx, rc);
+ consumerInvoker.start();
+ proxyHandler = consumerInvoker.createFromRegistry(sm, ctx);
+ ctx.getConsumerHolder().put(clazzName, proxyHandler);
+ }
+ return proxyHandler;
+ }
+
+ private static Filter[] createFilters(RpcContext ctx) {
+ String cache = ctx.getParameters().getOrDefault("app.cache", "false");
+ Filter[] filters = null;
+ if("true".equalsIgnoreCase(cache)) {
+ filters = new Filter[]{new CuicuiFilter(), new CacheFilter()};
+ } else {
+ filters = new Filter[]{new CuicuiFilter()};
}
- return (T) proxyHandler;
+ return filters;
+ }
+
+ private static T createMockHandler(RpcContext ctx, Class> serviceClass) {
+ String mock = ctx.getParameters().getOrDefault("app.mock", "false");
+ if("true".equalsIgnoreCase(mock)) {
+ return (T) MockHandler.createMock(serviceClass);
+ }
+ return null;
}
private static class TagRouter implements Router {
@Override
- public List route(List urls) {
- return urls;
+ public List route(List instances) {
+ return instances;
+ }
+ }
+
+ private static class RoundRibbonLoadBalancer implements LoadBalancer {
+ private final AtomicInteger count = new AtomicInteger(0);
+ @Override
+ public InstanceMeta select(List instances) {
+ if(instances.isEmpty()) return null;
+ return instances.get((count.getAndIncrement() & Integer.MAX_VALUE) % instances.size());
}
}
private static class RandomLoadBalancer implements LoadBalancer {
private final Random random = new Random();
@Override
- public String select(List urls) {
- if(urls.isEmpty()) return null;
- return urls.get(random.nextInt(urls.size()));
+ public InstanceMeta select(List instances) {
+ if(instances.isEmpty()) return null;
+ return instances.get(random.nextInt(instances.size()));
}
}
@Slf4j
private static class CuicuiFilter implements Filter {
@Override
- public boolean filter(RpcfxRequest request) {
+ public RpcfxResponse prefilter(RpcfxRequest request) {
+ //log.info("filter {} -> {}", this.getClass().getName(), request.toString());
+ //System.out.printf("filter %s -> %s%n", this.getClass().getName(), request.toString());
+ return null;
+ }
+
+ @Override
+ public RpcfxResponse postfilter(RpcfxRequest request, RpcfxResponse response) {
+ return response;
+ }
+
+ }
+
+ private static class CacheFilter implements Filter {
+
+ static Map CACHE = new HashMap<>();
+
+ @Override
+ public RpcfxResponse prefilter(RpcfxRequest request) {
+ RpcfxResponse response = CACHE.get(genKey(request));
+ if(response != null) {
+ System.out.println("CacheFilter.prefilter hit! => request: \n" + request + "\n =>response: \n" + response);
+ }
+ return response;
//log.info("filter {} -> {}", this.getClass().getName(), request.toString());
//System.out.printf("filter %s -> %s%n", this.getClass().getName(), request.toString());
- return true;
}
+
+ @Override
+ public RpcfxResponse postfilter(RpcfxRequest request, RpcfxResponse response) {
+ String key = genKey(request);
+ if(!CACHE.containsKey(key)) {
+ CACHE.put(key, response);
+ }
+ return response;
+ }
+
+ }
+
+ public static String genKey(RpcfxRequest request) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(request.getServiceClass());
+ sb.append("@");
+ sb.append(request.getMethodSign());
+ //sb.append("");
+ Arrays.stream(request.getParams()).forEach(x -> sb.append("_"+x.toString()));
+ return sb.toString();
}
}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/MethodUtils.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/MethodUtils.java
index 8158ccff..f17a102f 100644
--- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/MethodUtils.java
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/MethodUtils.java
@@ -9,10 +9,10 @@ public class MethodUtils {
public static String methodSign(Method method) {
if (method != null) {
- StringBuilder builder = new StringBuilder("method:");
+ StringBuilder builder = new StringBuilder();
String name = method.getName();
builder.append(name);
- builder.append("_");
+ builder.append("@");
int count = method.getParameterCount();
builder.append(count);
builder.append("_");
@@ -20,8 +20,9 @@ public static String methodSign(Method method) {
Class>[] classes = method.getParameterTypes();
Arrays.stream(classes).forEach(c -> builder.append(c.getName() + ","));
}
- String string = builder.toString();
- return DigestUtils.md5DigestAsHex(string.getBytes());
+ return builder.toString();
+// String string = builder.toString();
+// return DigestUtils.md5DigestAsHex(string.getBytes());
}
return "";
}
diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/MockUtils.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/MockUtils.java
new file mode 100644
index 00000000..8d22536a
--- /dev/null
+++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/MockUtils.java
@@ -0,0 +1,143 @@
+package io.kimmking.rpcfx.utils;
+
+import lombok.Data;
+import org.springframework.util.ClassUtils;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.*;
+
+/**
+ * Description for this class.
+ *
+ * @Author : kimmking(kimmking@apache.org)
+ * @create 2024/2/11 03:15
+ */
+public class MockUtils {
+
+ public static Object mock(Class> clazz, Type[] generics) {
+ boolean primitiveOrWrapper = ClassUtils.isPrimitiveOrWrapper(clazz);
+ if(primitiveOrWrapper) return mockPrimitive(clazz);
+ if(String.class.equals(clazz)) return mockString();
+ if (Number.class.isAssignableFrom(clazz)) {
+ return 10;
+ }
+ if(clazz.isArray()) {
+ return mockArray(clazz.getComponentType());
+ }
+ if(List.class.isAssignableFrom(clazz)) {
+ return mockList(clazz, generics[0]);
+ }
+ if(Map.class.isAssignableFrom(clazz)) {
+ return mockMap(clazz, generics[1]);
+ }
+ return mockPojo(clazz);
+ }
+
+ private static Object mockMap(Class> clazz, Type generic) {
+ HashMap