使用Netty实现类似Dubbo的远程接口调用的实现方法
作者:顽石九变
一、Netty简介
Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
本文通过完整示例代码,详细介绍netty实现类似dubbo的远程网络通讯,如有错误欢迎指正!
实现步骤:
- 创建接口和实现类
- 创建客户端代码
- 通过动态代理模式,封装netty远程接口调用
- 通过异步线程等待/通知,实现异步转同步
- 创建服务端代码
- 自定义编码解码器
- 编写测试客户端发送请求代码
二、完整代码实现
工程依赖引入
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.75.Final</version> </dependency>
1、创建接口和实现类
定义简单接口和实现类。通过注解定义接口和服务实现,在后续代码中解析注解。
@ServiceEntry定义接口serviceId
@MyService定义服务实现类
public interface IHelloService { @ServiceEntry(serviceId = "001", name = "hello") String hello(String msg); } @MyService public class HelloServiceImpl implements IHelloService { @Override public String hello(String msg) { return "re:这里是服务端,已收到客户端消息:" + msg.hashCode(); } }
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface ServiceEntry { /** * 服务Id */ String serviceId(); /** * 服务名称 */ String name() default ""; } @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface MyService { String value() default ""; }
2、客户端代码实现及动态代理和异步转同步
1)创建客户端Handler,MyClientHandler,继承ChannelInboundHandlerAdapter,并实现Callable接口
- 客户端发送请求时,会调用call方法,在这里将异步转同步
- 将请求context放入map,并等待线程,在收到服务端返回时,异步通知线程执行,返回结果数据
- 收到服务端返回时,设置返回结果数据,并通知线程执行
public class MyClientHandler extends ChannelInboundHandlerAdapter implements Callable<String> { private ChannelHandlerContext ctx; private ConcurrentHashMap<String, SyncSendContext> syncSendContextMap = new ConcurrentHashMap<>(); private Object[] param; private String serviceId; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端和服务端链接成功"); this.ctx = ctx; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("客户端收到服务端回复: " + msg); ResponseData data = (ResponseData) msg; String id = data.getId(); // 收到服务端返回时,设置返回结果数据,并通知线程执行 SyncSendContext context = syncSendContextMap.get(id); context.setResp(data); synchronized (context) { context.notify(); } } @Override public String call() throws Exception { System.out.println("客户端向服务端发送消息: " + param[0].toString()); String id = UUID.randomUUID().toString(); RequestData data = new RequestData(); data.setId(id); //强制设置参数1 data.setData(param[0].toString()); data.setServiceId(serviceId); SyncSendContext context = new SyncSendContext(); context.setRequest(data); // 将请求context放入map,并等待线程,在收到服务端返回时,异步通知线程执行,返回结果数据 syncSendContextMap.put(id, context); synchronized (context) { ctx.writeAndFlush(data); context.wait(); return (String) context.getResp().getData(); } } public void setParam(Object[] param) { this.param = param; } public void setServiceId(String serviceId) { this.serviceId = serviceId; } }
2)创建客户端代码,MyClient
- 通过动态代理,包装远程服务请求
- 初始化服务端链接,通过双检锁确保clientHandler是单例实现
- 发送请求时,通过线程池异步发送clientHandler
public class MyClient { private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private MyClientHandler clientHandler; // 通过动态代理,包装远程服务请求 public <T> T getServie(final Class<T> service) { return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{service}, (proxy, method, args) -> { if (clientHandler == null) { init("127.0.0.1", 7000); } ServiceEntry annotation = method.getAnnotation(ServiceEntry.class); if (annotation == null) { return null; } clientHandler.setParam(args); clientHandler.setServiceId(annotation.serviceId()); return executor.submit(clientHandler).get(); }); } // 初始化服务端链接,通过双检锁确保clientHandler是单例实现 private synchronized void init(String hostname, int port) { if (clientHandler != null) { return; } clientHandler = new MyClientHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap().group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new ResponseMessageCodec()); pipeline.addLast(new RequestMessageCodec()); pipeline.addLast(clientHandler); } }); bootstrap.connect(hostname, port).sync(); } catch (Exception e) { e.printStackTrace(); } } }
3、服务端代码实现
1)创建服务工程类ServiceFacatory,解析注解保存服务接口和实现类,调用的时候从Map直接获取
public class ServiceFacatory { private static final Map<String, Method> methodMap = new HashMap<>(); private static final Map<String, Object> serviceMap = new HashMap<>(); public static void init() throws Exception { // 要扫描的包 String packages = "com.hj.netty.dubbo.api"; Set<MethodInfo> methods = PackageUtils.findClassAnnotationMethods(packages, ServiceEntry.class); for (MethodInfo info : methods) { ServiceEntry serviceEntry = (ServiceEntry) info.getAnnotation(); methodMap.put(serviceEntry.serviceId(), info.getMethod()); String serviceName = info.getMethod().getDeclaringClass().getName(); if (!serviceMap.containsKey(serviceName)) { Object instance = info.getMethod().getDeclaringClass().newInstance(); serviceMap.put(serviceName, instance); } } } public static Object invoke(String serviceId, Object args) throws Exception { Method method = methodMap.get(serviceId); String serviceName = method.getDeclaringClass().getName(); Object instance = serviceMap.get(serviceName); Object result = method.invoke(instance, args); return result; } } @Data @AllArgsConstructor public class MethodInfo { private Annotation annotation; private Method method; }
2)包解析工具类,解析指定目录下的所有service类
import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; import org.springframework.core.type.classreading.CachingMetadataReaderFactory; import org.springframework.core.type.classreading.MetadataReader; import org.springframework.core.type.classreading.MetadataReaderFactory; import org.springframework.util.SystemPropertyUtils; import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.lang.reflect.Parameter; import java.util.*; public class PackageUtils { private final static Logger log = LoggerFactory.getLogger(PackageUtils.class); //扫描 scanPackages 下的文件的匹配符 protected static final String DEFAULT_RESOURCE_PATTERN = "**/*.class"; /** * 结合spring的类扫描方式 * 根据需要扫描的包路径及相应的注解,获取最终测method集合 * 仅返回public方法,如果方法是非public类型的,不会被返回 * 可以扫描工程下的class文件及jar中的class文件 * * @param scanPackages * @param annotation * @return */ public static Set<MethodInfo> findClassAnnotationMethods(String scanPackages, Class<? extends Annotation> annotation) { //获取所有的类 Set<String> clazzSet = findPackageClass(scanPackages); Set<MethodInfo> methods = new HashSet<>(); //遍历类,查询相应的annotation方法 for (String clazz : clazzSet) { try { Set<MethodInfo> ms = findAnnotationMethods(clazz, annotation); methods.addAll(ms); } catch (ClassNotFoundException ignore) { } } return methods; } public static Set<MethodInfo> findAnnotationMethods(String fullClassName, Class<? extends Annotation> anno) throws ClassNotFoundException { Set<MethodInfo> methodSet = new HashSet<>(); Class<?> clz = Class.forName(fullClassName); // 存储接口中定义的方法 Map<String, Method> mapMethodInf = new HashMap<>(); for (int i = 0; i < clz.getInterfaces().length; i++) { Class<?> inf = clz.getInterfaces()[i]; Method[] methods = inf.getDeclaredMethods(); for (Method method : methods) { String key = getMethodKey(method); mapMethodInf.put(key, method); } } Method[] methods = clz.getDeclaredMethods(); for (Method method : methods) { if (method.getModifiers() != Modifier.PUBLIC) { continue; } Annotation annotation = method.getAnnotation(anno); if (annotation != null) { methodSet.add(new MethodInfo(annotation,method)); } else { // 从接口中读取对应的方法 String key = getMethodKey(method); Method methodInf = mapMethodInf.get(key); annotation = methodInf.getAnnotation(anno); if (annotation != null) { methodSet.add(new MethodInfo(annotation,method)); } } } return methodSet; } /** * 根据扫描包的,查询下面的所有类 * * @param scanPackages 扫描的package路径 * @return */ private static Set<String> findPackageClass(String scanPackages) { if (StringUtils.isBlank(scanPackages)) { return Collections.EMPTY_SET; } //验证及排重包路径,避免父子路径多次扫描 Set<String> packages = checkPackage(scanPackages); ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver(); MetadataReaderFactory metadataReaderFactory = new CachingMetadataReaderFactory(resourcePatternResolver); Set<String> clazzSet = new HashSet<String>(); for (String basePackage : packages) { if (StringUtils.isBlank(basePackage)) { continue; } String packageSearchPath = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + org.springframework.util.ClassUtils.convertClassNameToResourcePath(SystemPropertyUtils.resolvePlaceholders(basePackage)) + "/" + DEFAULT_RESOURCE_PATTERN; try { Resource[] resources = resourcePatternResolver.getResources(packageSearchPath); for (Resource resource : resources) { //检查resource,这里的resource都是class String clazz = loadClassName(metadataReaderFactory, resource); clazzSet.add(clazz); } } catch (Exception e) { log.error("获取包下面的类信息失败,package:" + basePackage, e); } } return clazzSet; } /** * 排重、检测package父子关系,避免多次扫描 * * @param scanPackages * @return 返回检查后有效的路径集合 */ private static Set<String> checkPackage(String scanPackages) { if (StringUtils.isBlank(scanPackages)) { return Collections.EMPTY_SET; } Set<String> packages = new HashSet<>(); //排重路径 Collections.addAll(packages, scanPackages.split(",")); String[] strings = packages.toArray(new String[packages.size()]); for (String pInArr : strings) { if (StringUtils.isBlank(pInArr) || pInArr.equals(".") || pInArr.startsWith(".")) { continue; } if (pInArr.endsWith(".")) { pInArr = pInArr.substring(0, pInArr.length() - 1); } Iterator<String> packageIte = packages.iterator(); boolean needAdd = true; while (packageIte.hasNext()) { String pack = packageIte.next(); if (pInArr.startsWith(pack + ".")) { //如果待加入的路径是已经加入的pack的子集,不加入 needAdd = false; } else if (pack.startsWith(pInArr + ".")) { //如果待加入的路径是已经加入的pack的父集,删除已加入的pack packageIte.remove(); } } if (needAdd) { packages.add(pInArr); } } return packages; } /** * 加载资源,根据resource获取className * * @param metadataReaderFactory spring中用来读取resource为class的工具 * @param resource 这里的资源就是一个Class */ private static String loadClassName(MetadataReaderFactory metadataReaderFactory, Resource resource) { try { if (resource.isReadable()) { MetadataReader metadataReader = metadataReaderFactory.getMetadataReader(resource); if (metadataReader != null) { return metadataReader.getClassMetadata().getClassName(); } } } catch (Exception e) { log.error("根据resource获取类名称失败", e); } return null; } private static String getMethodKey(Method method) { StringBuilder key = new StringBuilder(method.getName()); for (Parameter parameter : method.getParameters()) { key.append(parameter.getType().getName()) .append(parameter.getName()); } return key.toString(); } }
3)创建服务端Handler类,接收客户端请求,并调用服务实现类执行接口
public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端接入"); super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("收到客户端消息:" + msg); RequestData req = (RequestData) msg; if (req != null) { String args = req.getData(); String serviceId = req.getServiceId(); // 调用服务实现类 Object res = ServiceFacatory.invoke(serviceId, args); ResponseData resp = new ResponseData(); resp.setData(res); resp.setId(req.getId()); ctx.writeAndFlush(resp); } System.out.println("----------响应结束----------" + req.getData()); } }
4)创建服务端启动类MyServer、ServerApp,启动端口监听;加入编解码器和服务端MyServerHandler
public class MyServer { public static void start(String hostname, int port) throws Exception { ServiceFacatory.init(); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new RequestMessageCodec()); pipeline.addLast(new ResponseMessageCodec()); pipeline.addLast(new MyServerHandler()); } }); ChannelFuture future = bootstrap.bind(hostname, port).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } } public class ServerApp { public static void main(String[] args) throws Exception { MyServer.start("127.0.0.1", 7000); } }
4、自定义编码解码器
1)创建请求数据编解码器RequestMessageCodec,实现String和请求参数对象RequestData之间互相转换
public class RequestMessageCodec extends MessageToMessageCodec<String, RequestData> { @Override protected void encode(ChannelHandlerContext ctx, RequestData msg, List<Object> out) throws Exception { System.out.println("RequestMessageCodec.encode 被调用 " + msg); String json = JSONObject.toJSONString(msg); out.add(json); } @Override protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception { System.out.println("RequestMessageCodec.decode 被调用 " + msg); RequestData po = JSONObject.parseObject(msg, RequestData.class); out.add(po); } }
2)创建服务响应数据编解码器ResponseMessageCodec,实现String和响应数据对象ResponseData之间互相转换
public class ResponseMessageCodec extends MessageToMessageCodec<String, ResponseData> { @Override protected void encode(ChannelHandlerContext ctx, ResponseData msg, List<Object> out) throws Exception { System.out.println("ResponseMessageCodec.encode 被调用 " + msg); String json = JSONObject.toJSONString(msg); out.add(json); } @Override protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception { System.out.println("ResponseMessageCodec.decode 被调用 " + msg); ResponseData po = JSONObject.parseObject(msg, ResponseData.class); out.add(po); } }
3)创建请求、响应VO
@Data public class RequestData { private String id; private String serviceId; private String data; } @Data public class ResponseData { private String id; private Object data; } @Data public class SyncSendContext { private ResponseData resp; private RequestData request; }
5、编写测试客户端发送请求代码
1)创建客户端请求类ClientTest,模拟发送数据
public class ClientTest { public static void main(String[] args) throws Exception { MyClient client = new MyClient(); IHelloService servie = client.getServie(IHelloService.class); for (int i = 0; i < 5; i++) { Thread.sleep(2 * 1000); String res = servie.hello("你好 服务端 ~~ " + i); System.out.println("service 得到服务端返回消息: " + res); System.out.println("-----------------------------" + i +" 结束"); } } }
2)运行ServerApp,启动服务端,运行ClientTest模拟客户端发送数据
客户端日志:
客户端和服务端链接成功
客户端向服务端发送消息: 你好 服务端 ~~ 0
RequestMessageCodec.encode 被调用 RequestData(id=d081f840-4367-42d3-909c-32f6a1654c60, serviceId=001, data=你好 服务端 ~~ 0)
ResponseMessageCodec.decode 被调用 {"data":"re:这里是服务端,已收到客户端消息:1845339960","id":"d081f840-4367-42d3-909c-32f6a1654c60"}
客户端收到服务端回复: ResponseData(id=d081f840-4367-42d3-909c-32f6a1654c60, data=re:这里是服务端,已收到客户端消息:1845339960)
service 得到服务端返回消息: re:这里是服务端,已收到客户端消息:1845339960
-----------------------------0 结束
客户端向服务端发送消息: 你好 服务端 ~~ 1
RequestMessageCodec.encode 被调用 RequestData(id=d49105b0-2624-43c2-bb19-c826987133f1, serviceId=001, data=你好 服务端 ~~ 1)
ResponseMessageCodec.decode 被调用 {"data":"re:这里是服务端,已收到客户端消息:1845339961","id":"d49105b0-2624-43c2-bb19-c826987133f1"}
客户端收到服务端回复: ResponseData(id=d49105b0-2624-43c2-bb19-c826987133f1, data=re:这里是服务端,已收到客户端消息:1845339961)
service 得到服务端返回消息: re:这里是服务端,已收到客户端消息:1845339961
-----------------------------1 结束
客户端向服务端发送消息: 你好 服务端 ~~ 2
RequestMessageCodec.encode 被调用 RequestData(id=13f82f4a-0a2f-41cc-8420-38ab20fab2d2, serviceId=001, data=你好 服务端 ~~ 2)
ResponseMessageCodec.decode 被调用 {"data":"re:这里是服务端,已收到客户端消息:1845339962","id":"13f82f4a-0a2f-41cc-8420-38ab20fab2d2"}
客户端收到服务端回复: ResponseData(id=13f82f4a-0a2f-41cc-8420-38ab20fab2d2, data=re:这里是服务端,已收到客户端消息:1845339962)
service 得到服务端返回消息: re:这里是服务端,已收到客户端消息:1845339962
-----------------------------2 结束
客户端向服务端发送消息: 你好 服务端 ~~ 3
RequestMessageCodec.encode 被调用 RequestData(id=f4576cbd-8ee5-438c-ae6d-810b836c177a, serviceId=001, data=你好 服务端 ~~ 3)
ResponseMessageCodec.decode 被调用 {"data":"re:这里是服务端,已收到客户端消息:1845339963","id":"f4576cbd-8ee5-438c-ae6d-810b836c177a"}
客户端收到服务端回复: ResponseData(id=f4576cbd-8ee5-438c-ae6d-810b836c177a, data=re:这里是服务端,已收到客户端消息:1845339963)
service 得到服务端返回消息: re:这里是服务端,已收到客户端消息:1845339963
-----------------------------3 结束
客户端向服务端发送消息: 你好 服务端 ~~ 4
RequestMessageCodec.encode 被调用 RequestData(id=68e67b0f-0c35-4ead-915e-e1890a0c0b53, serviceId=001, data=你好 服务端 ~~ 4)
ResponseMessageCodec.decode 被调用 {"data":"re:这里是服务端,已收到客户端消息:1845339964","id":"68e67b0f-0c35-4ead-915e-e1890a0c0b53"}
客户端收到服务端回复: ResponseData(id=68e67b0f-0c35-4ead-915e-e1890a0c0b53, data=re:这里是服务端,已收到客户端消息:1845339964)
service 得到服务端返回消息: re:这里是服务端,已收到客户端消息:1845339964
-----------------------------4 结束
服务端日志:
RequestMessageCodec.decode 被调用 {"data":"你好 服务端 ~~ 0","id":"f876eccf-a034-467a-8b5a-4c6dba80cee2","serviceId":"001"}
收到客户端消息:RequestData(id=f876eccf-a034-467a-8b5a-4c6dba80cee2, serviceId=001, data=你好 服务端 ~~ 0)
ResponseMessageCodec.encode 被调用 ResponseData(id=f876eccf-a034-467a-8b5a-4c6dba80cee2, data=re:这里是服务端,已收到客户端消息:1845339960)
----------响应结束----------你好 服务端 ~~ 0
RequestMessageCodec.decode 被调用 {"data":"你好 服务端 ~~ 1","id":"bcceaa9b-09be-4dcc-9135-ac14caa365d1","serviceId":"001"}
收到客户端消息:RequestData(id=bcceaa9b-09be-4dcc-9135-ac14caa365d1, serviceId=001, data=你好 服务端 ~~ 1)
ResponseMessageCodec.encode 被调用 ResponseData(id=bcceaa9b-09be-4dcc-9135-ac14caa365d1, data=re:这里是服务端,已收到客户端消息:1845339961)
----------响应结束----------你好 服务端 ~~ 1
RequestMessageCodec.decode 被调用 {"data":"你好 服务端 ~~ 2","id":"ab0181b1-b3fe-42b7-ae17-d2a533c56098","serviceId":"001"}
收到客户端消息:RequestData(id=ab0181b1-b3fe-42b7-ae17-d2a533c56098, serviceId=001, data=你好 服务端 ~~ 2)
ResponseMessageCodec.encode 被调用 ResponseData(id=ab0181b1-b3fe-42b7-ae17-d2a533c56098, data=re:这里是服务端,已收到客户端消息:1845339962)
----------响应结束----------你好 服务端 ~~ 2
RequestMessageCodec.decode 被调用 {"data":"你好 服务端 ~~ 3","id":"6a4e6061-9ebe-4250-b939-2e5f314096fc","serviceId":"001"}
收到客户端消息:RequestData(id=6a4e6061-9ebe-4250-b939-2e5f314096fc, serviceId=001, data=你好 服务端 ~~ 3)
ResponseMessageCodec.encode 被调用 ResponseData(id=6a4e6061-9ebe-4250-b939-2e5f314096fc, data=re:这里是服务端,已收到客户端消息:1845339963)
----------响应结束----------你好 服务端 ~~ 3
RequestMessageCodec.decode 被调用 {"data":"你好 服务端 ~~ 4","id":"69c726e6-a3f1-487a-8455-ada02b4e97ed","serviceId":"001"}
收到客户端消息:RequestData(id=69c726e6-a3f1-487a-8455-ada02b4e97ed, serviceId=001, data=你好 服务端 ~~ 4)
ResponseMessageCodec.encode 被调用 ResponseData(id=69c726e6-a3f1-487a-8455-ada02b4e97ed, data=re:这里是服务端,已收到客户端消息:1845339964)
----------响应结束----------你好 服务端 ~~ 4
代码地址
https://gitee.com/personal_practice/netty-demo
到此这篇关于使用Netty实现类似Dubbo的远程接口调用的实现方法的文章就介绍到这了,更多相关Netty实现类似Dubbo的远程调用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!