一款基于 Nacos 实现的 RPC 框架。网络传输实现了基于 Java 原生 Socket 与 Netty 版本,并且实现了多种序列化与负载均衡算法。

项目地址:https://gitcode.net/mirrors/cn-guoziyang/my-rpc-framework?utm_source=csdn_github_accelerator

文档记录:https://blog.csdn.net/qq_40856284/category_10138756.html

RPC框架图:

img

1、一个最简单的实现

大体思路

客户端和服务端都可以访问到通用的接口,但是只有服务端有这个接口的实现类,客户端调用这个接口的方式,是通过网络传输,告诉服务端我要调用这个接口,服务端收到之后找到这个接口的实现类,并且执行,将执行的结果返回给客户端,作为客户端调用接口方法的返回值。

需要考虑的:

客户端怎么知道服务端的地址?客户端怎么告诉服务端我要调用的接口?客户端怎么传递参数?只有接口客户端怎么生成实现类?

这一章,我们就来探讨一个最简单的实现。一个最简单的实现,基于这样一个假设,那就是客户端已经知道了服务端的地址,这部分会由后续的服务发现机制完善。

1.1、通用接口

我们先把通用的接口写好,然后再来看怎么实现客户端和服务端。

接口如下:

1
2
3
public interface HelloService {
String hello(HelloObject object);
}

hello方法需要传递一个对象,HelloObject对象,定义如下:

1
2
3
4
5
6
@Data
@AllArgsConstructor
public class HelloObject implements Serializable {
private Integer id;
private String message;
}

注意这个对象需要实现Serializable接口,因为它需要在调用过程中从客户端传递给服务端。

接着我们在服务端对这个接口进行实现,实现的方式也很简单,返回一个字符串就行:

1
2
3
4
5
6
7
8
public class HelloServiceImpl implements HelloService {
private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
@Override
public String hello(HelloObject object) {
logger.info("接收到:{}", object.getMessage());
return "这是掉用的返回值,id=" + object.getId();
}
}

1.2、传输方式

我们来思考一下,服务端需要哪些信息,才能唯一确定服务端需要调用的接口的方法呢?

首先,就是接口的名字,和方法的名字,但是由于方法重载的缘故,我们还需要这个方法的所有参数的类型,最后,客户端调用时,还需要传递参数的实际值,那么服务端知道以上四个条件,就可以找到这个方法并且调用了。我们把这四个条件写到一个对象里,到时候传输时传输这个对象就行了。即RpcRequest对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Data
@Builder
public class RpcRequest implements Serializable {
/**
* 待调用接口名称
*/
private String interfaceName;
/**
* 待调用方法名称
*/
private String methodName;
/**
* 调用方法的参数
*/
private Object[] parameters;
/**
* 调用方法的参数类型
*/
private Class<?>[] paramTypes;
}

参数类型我是直接使用Class对象,其实用字符串也是可以的。

那么服务器调用完这个方法后,需要给客户端返回哪些信息呢?如果调用成功的话,显然需要返回值,如果调用失败了,就需要失败的信息,这里封装成一个RpcResponse对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Data
public class RpcResponse<T> implements Serializable {
/**
* 响应状态码
*/
private Integer statusCode;
/**
* 响应状态补充信息
*/
private String message;
/**
* 响应数据
*/
private T data;

public static <T> RpcResponse<T> success(T data) {
RpcResponse<T> response = new RpcResponse<>();
response.setStatusCode(ResponseCode.SUCCESS.getCode());
response.setData(data);
return response;
}
public static <T> RpcResponse<T> fail(ResponseCode code) {
RpcResponse<T> response = new RpcResponse<>();
response.setStatusCode(code.getCode());
response.setMessage(code.getMessage());
return response;
}
}

这里还多写了两个静态方法,用于快速生成成功与失败的响应对象。其中,statusCode属性可以自行定义,客户端服务端一致即可。

1.3、客户端的实现——动态代理

客户端方面,由于在客户端这一侧我们并没有接口的具体实现类,就没有办法直接生成实例对象。这时,我们可以通过动态代理的方式生成实例,并且调用方法时生成需要的RpcRequest对象并且发送给服务端。

这里我们采用JDK动态代理,代理类是需要实现InvocationHandler接口的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class RpcClientProxy implements InvocationHandler {
private String host;
private int port;

public RpcClientProxy(String host, int port) {
this.host = host;
this.port = port;
}

@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
}

我们需要传递host和port来指明服务端的位置。并且使用getProxy()方法来生成代理对象。

InvocationHandler接口需要实现invoke()方法,来指明代理对象的方法被调用时的动作。在这里,我们显然就需要生成一个RpcRequest对象,发送出去,然后返回从服务端接收到的结果即可:

1
2
3
4
5
6
7
8
9
10
11
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.parameters(args)
.paramTypes(method.getParameterTypes())
.build();
RpcClient rpcClient = new RpcClient();
return ((RpcResponse) rpcClient.sendRequest(rpcRequest, host, port)).getData();
}

生成RpcRequest很简单,我使用Builder模式来生成这个对象。发送的逻辑我使用了一个RpcClient对象来实现,这个对象的作用,就是将一个对象发过去,并且接受返回的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RpcClient {

private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);

public Object sendRequest(RpcRequest rpcRequest, String host, int port) {
try (Socket socket = new Socket(host, port)) {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
objectOutputStream.writeObject(rpcRequest);
objectOutputStream.flush();
return objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
logger.error("调用时有错误发生:", e);
return null;
}
}
}

我的实现很简单,直接使用Java的序列化方式,通过Socket传输。创建一个Socket,获取ObjectOutputStream对象,然后把需要发送的对象传进去即可,接收时获取ObjectInputStream对象,readObject()方法就可以获得一个返回的对象。

1.4、服务端的实现——反射调用

服务端的实现就简单多了,使用一个ServerSocket监听某个端口,循环接收连接请求,如果发来了请求就创建一个线程,在新线程中处理调用。这里创建线程采用线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class RpcServer {

private final ExecutorService threadPool;
private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);

public RpcServer() {
int corePoolSize = 5;
int maximumPoolSize = 50;
long keepAliveTime = 60;
BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory);
}

}

这里简化了一下,RpcServer暂时只能注册一个接口,即对外提供一个接口的调用服务,添加register方法,在注册完一个服务后立刻开始监听:

1
2
3
4
5
6
7
8
9
10
11
12
public void register(Object service, int port) {
try (ServerSocket serverSocket = new ServerSocket(port)) {
logger.info("服务器正在启动...");
Socket socket;
while((socket = serverSocket.accept()) != null) {
logger.info("客户端连接!Ip为:" + socket.getInetAddress());
threadPool.execute(new WorkerThread(socket, service));
}
} catch (IOException e) {
logger.error("连接时有错误发生:", e);
}
}

这里向工作线程WorkerThread传入了socket和用于服务端实例service。

WorkerThread实现了Runnable接口,用于接收RpcRequest对象,解析并且调用,生成RpcResponse对象并传输回去。run方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void run() {
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
Object returnObject = method.invoke(service, rpcRequest.getParameters());
objectOutputStream.writeObject(RpcResponse.success(returnObject));
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
logger.error("调用或发送时有错误发生:", e);
}
}

其中,通过class.getMethod方法,传入方法名和方法参数类型即可获得Method对象。如果你上面RpcRequest中使用String数组来存储方法参数类型的话,这里你就需要通过反射生成对应的Class数组了。通过method.invoke方法,传入对象实例和参数,即可调用并且获得返回值。

1.5、测试

服务端侧,我们已经在上面实现了一个HelloService的实现类HelloServiceImpl的实现类了,我们只需要创建一个RpcServer并且把这个实现类注册进去就行了:

1
2
3
4
5
6
7
public class TestServer {
public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
RpcServer rpcServer = new RpcServer();
rpcServer.register(helloService, 9000);
}
}

服务端开放在9000端口。

客户端方面,我们需要通过动态代理,生成代理对象,并且调用,动态代理会自动帮我们向服务端发送请求的:

1
2
3
4
5
6
7
8
9
public class TestClient {
public static void main(String[] args) {
RpcClientProxy proxy = new RpcClientProxy("127.0.0.1", 9000);
HelloService helloService = proxy.getProxy(HelloService.class);
HelloObject object = new HelloObject(12, "This is a message");
String res = helloService.hello(object);
System.out.println(res);
}
}

我们这里生成了一个HelloObject对象作为方法的参数。

首先启动服务端,再启动客户端,服务端输出:

1
2
3
服务器正在启动...
客户端连接!Ip为:127.0.0.1
接收到:This is a message

客户端输出:

1
这是调用的返回值,id=12

2、注册多个服务

上一节中,我们使用 JDK 序列化和 Socket 实现了一个最基本的 RPC 框架,服务端测试时是这样的:

1
2
3
4
5
6
7
public class TestServer {
public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
RpcServer rpcServer = new RpcServer();
rpcServer.register(helloService, 9000);
}
}

在注册完 helloService 后,服务器就自行启动了,也就是说,一个服务器只能注册一个服务,这个设计非常不好(毕竟是简易实现)。这一节,我们就将服务的注册和服务器启动分离,使得服务端可以提供多个服务。

2.1、服务注册表

我们需要一个容器,这个容器很简单,就是保存一些本地服务的信息,并且在获得一个服务名字的时候能够返回这个服务的信息。创建一个 ServiceRegistry 接口:

1
2
3
4
5
6
package top.guoziyang.rpc.registry;

public interface ServiceRegistry {
<T> void register(T service);
Object getService(String serviceName);
}

一目了然,一个register注册服务信息,一个getService获取服务信息。

我们新建一个默认的注册表类 DefaultServiceRegistry 来实现这个接口,提供服务注册服务,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class DefaultServiceRegistry implements ServiceRegistry {

private static final Logger logger = LoggerFactory.getLogger(DefaultServiceRegistry.class);

private final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
private final Set<String> registeredService = ConcurrentHashMap.newKeySet();

@Override
public synchronized <T> void register(T service) {
String serviceName = service.getClass().getCanonicalName();
if(registeredService.contains(serviceName)) return;
registeredService.add(serviceName);
Class<?>[] interfaces = service.getClass().getInterfaces();
if(interfaces.length == 0) {
throw new RpcException(RpcError.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE);
}
for(Class<?> i : interfaces) {
serviceMap.put(i.getCanonicalName(), service);
}
logger.info("向接口: {} 注册服务: {}", interfaces, serviceName);
}

@Override
public synchronized Object getService(String serviceName) {
Object service = serviceMap.get(serviceName);
if(service == null) {
throw new RpcException(RpcError.SERVICE_NOT_FOUND);
}
return service;
}
}

我们将服务名与提供服务的对象的对应关系保存在一个 ConcurrentHashMap 中,并且使用一个 Set 来保存当前有哪些对象已经被注册。

在注册服务时,默认采用这个对象实现的接口的完整类名作为服务名,例如某个对象 A 实现了接口 X 和 Y,那么将 A 注册进去后,会有两个服务名 X 和 Y 对应于 A 对象。这种处理方式也就说明了某个接口只能有一个对象提供服务。

2.2、其他处理

为了降低耦合度,我们不会把 ServiceRegistry 和某一个 RpcServer 绑定在一起,而是在创建 RpcServer 对象时,传入一个 ServiceRegistry 作为这个服务的注册表。

那么 RpcServer 这个类现在就变成了这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class RpcServer {

private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);

private static final int CORE_POOL_SIZE = 5;
private static final int MAXIMUM_POOL_SIZE = 50;
private static final int KEEP_ALIVE_TIME = 60;
private static final int BLOCKING_QUEUE_CAPACITY = 100;
private final ExecutorService threadPool;
private RequestHandler requestHandler = new RequestHandler();
private final ServiceRegistry serviceRegistry;

public RpcServer(ServiceRegistry serviceRegistry) {
this.serviceRegistry = serviceRegistry;
BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, workingQueue, threadFactory);
}

public void start(int port) {
try (ServerSocket serverSocket = new ServerSocket(port)) {
logger.info("服务器启动……");
Socket socket;
while((socket = serverSocket.accept()) != null) {
logger.info("消费者连接: {}:{}", socket.getInetAddress(), socket.getPort());
threadPool.execute(new RequestHandlerThread(socket, requestHandler, serviceRegistry));
}
threadPool.shutdown();
} catch (IOException e) {
logger.error("服务器启动时有错误发生:", e);
}
}
}

在创建 RpcServer 时需要传入一个已经注册好服务的 ServiceRegistry,而原来的 register 方法也被改成了 start 方法,因为服务的注册已经不由 RpcServer 处理了,它只需要启动就行了。

而在每一个请求处理线程(RequestHandlerThread)中也就需要传入 ServiceRegistry 了,这里把处理线程和处理逻辑分成了两个类:RequestHandlerThread 只是一个线程,从ServiceRegistry 获取到提供服务的对象后,就会把 RpcRequest 和服务对象直接交给 RequestHandler 去处理,反射等过程被放到了 RequestHandler 里。

RequesthandlerThread.java:处理线程,接受对象等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RequestHandlerThread implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(RequestHandlerThread.class);

private Socket socket;
private RequestHandler requestHandler;
private ServiceRegistry serviceRegistry;

public RequestHandlerThread(Socket socket, RequestHandler requestHandler, ServiceRegistry serviceRegistry) {
this.socket = socket;
this.requestHandler = requestHandler;
this.serviceRegistry = serviceRegistry;
}

@Override
public void run() {
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
String interfaceName = rpcRequest.getInterfaceName();
Object service = serviceRegistry.getService(interfaceName);
Object result = requestHandler.handle(rpcRequest, service);
objectOutputStream.writeObject(RpcResponse.success(result));
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException e) {
logger.error("调用或发送时有错误发生:", e);
}
}
}

RequestHandler.java:通过反射进行方法调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RequestHandler {

private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);

public Object handle(RpcRequest rpcRequest, Object service) {
Object result = null;
try {
result = invokeTargetMethod(rpcRequest, service);
logger.info("服务:{} 成功调用方法:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
} catch (IllegalAccessException | InvocationTargetException e) {
logger.error("调用或发送时有错误发生:", e);
} return result;
}
private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) throws IllegalAccessException, InvocationTargetException {
Method method;
try {
method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
} catch (NoSuchMethodException e) {
return RpcResponse.fail(ResponseCode.METHOD_NOT_FOUND);
}
return method.invoke(service, rpcRequest.getParameters());
}
}

在这种情况下,客户端完全不需要做任何改动。

2.3、测试

服务端的测试

1
2
3
4
5
6
7
8
9
public class TestServer {
public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
ServiceRegistry serviceRegistry = new DefaultServiceRegistry();
serviceRegistry.register(helloService);
RpcServer rpcServer = new RpcServer(serviceRegistry);
rpcServer.start(9000);
}
}

3、Netty传输和通用序列化接口

本节我们会将传统的 BIO 方式传输换成效率更高的 NIO 方式,当然不会使用 Java 原生的 NIO,而是采用更为简单的 Netty。本节还会实现一个通用的序列化接口,为多种序列化支持做准备,并且,本节还会自定义传输的协议。

3.1、Netty 服务端与客户端

首先就需要在 pom.xml 中加入 Netty 依赖:

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty-version}</version>
</dependency>

netty 的最新版本可以在 mavenrepository查到,注意使用 netty 4 而不是 netty 5。

为了保证通用性,我们可以把 Server 和 Client 抽象成两个接口,分别是 RpcServer 和 RpcClient:

1
2
3
4
5
6
7
public interface RpcServer {
void start(int port);
}

public interface RpcClient {
Object sendRequest(RpcRequest rpcRequest);
}

而原来的 RpcServer 和 RpcClient 类实际上是上述两个接口的 Socket 方式实现类,改成 SocketServer 和 SocketClient 并实现上面两个接口即可,几乎不需要做什么修改。

我们的任务,就是要实现 NettyServer 和 NettyClient。

这里提一个改动,就是在 DefaultServiceRegistry.java 中,将包含注册信息的 serviceMap 和 registeredService 都改成了 static ,这样就能保证全局唯一的注册信息,并且在创建 RpcServer 时也就不需要传入了。

NettyServer的实现很传统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class NettyServer implements RpcServer {

private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

@Override
public void start(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 256)
.option(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CommonEncoder(new JsonSerializer()));
pipeline.addLast(new CommonDecoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();

} catch (InterruptedException e) {
logger.error("启动服务器时有错误发生: ", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

了解过 Netty 的同学可能知道,Netty 中有一个很重要的设计模式——责任链模式,责任链上有多个处理器,每个处理器都会对数据进行加工,并将处理后的数据传给下一个处理器。代码中的 CommonEncoder、CommonDecoder和NettyServerHandler 分别就是编码器,解码器和数据处理器。因为数据从外部传入时需要解码,而传出时需要编码,类似计算机网络的分层模型,每一层向下层传递数据时都要加上该层的信息,而向上层传递时则需要对本层信息进行解码。

而 NettyClient 的实现也很类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class NettyClient implements RpcClient {

private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

private String host;
private int port;
private static final Bootstrap bootstrap;

public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}

static {
EventLoopGroup group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CommonDecoder())
.addLast(new CommonEncoder(new JsonSerializer()))
.addLast(new NettyClientHandler());
}
});
}

@Override
public Object sendRequest(RpcRequest rpcRequest) {
try {
ChannelFuture future = bootstrap.connect(host, port).sync();
logger.info("客户端连接到服务器 {}:{}", host, port);
Channel channel = future.channel();
if(channel != null) {
channel.writeAndFlush(rpcRequest).addListener(future1 -> {
if(future1.isSuccess()) {
logger.info(String.format("客户端发送消息: %s", rpcRequest.toString()));
} else {
logger.error("发送消息时有错误发生: ", future1.cause());
}
});
channel.closeFuture().sync();
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
RpcResponse rpcResponse = channel.attr(key).get();
return rpcResponse.getData();
}
} catch (InterruptedException e) {
logger.error("发送消息时有错误发生: ", e);
}
return null;
}
}

在静态代码块中就直接配置好了 Netty 客户端,等待发送数据时启动,channel 将 RpcRequest 对象写出,并且等待服务端返回的结果。注意这里的发送是非阻塞的,所以发送后会立刻返回,而无法得到结果。这里通过 AttributeKey 的方式阻塞获得返回结果:

1
2
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
RpcResponse rpcResponse = channel.attr(key).get();

通过这种方式获得全局可见的返回结果,在获得返回结果 RpcResponse 后,将这个对象以 key 为 rpcResponse 放入 ChannelHandlerContext 中,这里就可以立刻获得结果并返回,我们会在 NettyClientHandler 中看到放入的过程。

3.2、自定义协议与编解码器

在传输过程中,我们可以在发送的数据上加上各种必要的数据,形成自定义的协议,而自动加上这个数据就是编码器的工作,解析数据获得原始数据就是解码器的工作。

我们定义的协议是这样的:

1
2
3
4
5
6
7
+---------------+---------------+-----------------+-------------+
| Magic Number | Package Type | Serializer Type | Data Length |
| 4 bytes | 4 bytes | 4 bytes | 4 bytes |
+---------------+---------------+-----------------+-------------+
| Data Bytes |
| Length: ${Data Length} |
+---------------------------------------------------------------+

首先是 4 字节魔数,表识一个协议包。接着是 Package Type,标明这是一个调用请求还是调用响应,Serializer Type 标明了实际数据使用的序列化器,这个服务端和客户端应当使用统一标准;Data Length 就是实际数据的长度,设置这个字段主要防止粘包,最后就是经过序列化后的实际数据,可能是 RpcRequest 也可能是 RpcResponse 经过序列化后的字节,取决于 Package Type。

规定好协议后,我们就可以来看看 CommonEncoder 了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CommonEncoder extends MessageToByteEncoder {

private static final int MAGIC_NUMBER = 0xCAFEBABE;

private final CommonSerializer serializer;

public CommonEncoder(CommonSerializer serializer) {
this.serializer = serializer;
}

@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
out.writeInt(MAGIC_NUMBER);
if(msg instanceof RpcRequest) {
out.writeInt(PackageType.REQUEST_PACK.getCode());
} else {
out.writeInt(PackageType.RESPONSE_PACK.getCode());
}
out.writeInt(serializer.getCode());
byte[] bytes = serializer.serialize(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}

CommonEncoder 继承了MessageToByteEncoder 类,见名知义,就是把 Message(实际要发送的对象)转化成 Byte 数组。CommonEncoder 的工作很简单,就是把 RpcRequest 或者 RpcResponse 包装成协议包。 根据上面提到的协议格式,将各个字段写到管道里就可以了,这里serializer.getCode() 获取序列化器的编号,之后使用传入的序列化器将请求或响应包序列化为字节数组写入管道即可。

而 CommonDecoder 的工作就更简单了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CommonDecoder extends ReplayingDecoder {

private static final Logger logger = LoggerFactory.getLogger(CommonDecoder.class);
private static final int MAGIC_NUMBER = 0xCAFEBABE;

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magic = in.readInt();
if(magic != MAGIC_NUMBER) {
logger.error("不识别的协议包: {}", magic);
throw new RpcException(RpcError.UNKNOWN_PROTOCOL);
}
int packageCode = in.readInt();
Class<?> packageClass;
if(packageCode == PackageType.REQUEST_PACK.getCode()) {
packageClass = RpcRequest.class;
} else if(packageCode == PackageType.RESPONSE_PACK.getCode()) {
packageClass = RpcResponse.class;
} else {
logger.error("不识别的数据包: {}", packageCode);
throw new RpcException(RpcError.UNKNOWN_PACKAGE_TYPE);
}
int serializerCode = in.readInt();
CommonSerializer serializer = CommonSerializer.getByCode(serializerCode);
if(serializer == null) {
logger.error("不识别的反序列化器: {}", serializerCode);
throw new RpcException(RpcError.UNKNOWN_SERIALIZER);
}
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes);
Object obj = serializer.deserialize(bytes, packageClass);
out.add(obj);
}
}

CommonDecoder 继承自 ReplayingDecoder ,与 MessageToByteEncoder 相反,它用于将收到的字节序列还原为实际对象。主要就是一些字段的校验,比较重要的就是取出序列化器的编号,以获得正确的反序列化方式,并且读入 length 字段来确定数据包的长度(防止粘包),最后读入正确大小的字节数组,反序列化成对应的对象。

3.3、序列化接口

序列化器接口(CommonSerializer)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface CommonSerializer {

byte[] serialize(Object obj);

Object deserialize(byte[] bytes, Class<?> clazz);

int getCode();

static CommonSerializer getByCode(int code) {
switch (code) {
case 1:
return new JsonSerializer();
default:
return null;
}
}
}

主要就是四个方法,序列化,反序列化,获得该序列化器的编号,以及根据编号获取序列化器,这里我已经写了一个示例的 JSON 序列化器,Kryo 序列化器会在后面讲解。

作为一个比较简单的例子,我写了一个 JSON 的序列化器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class JsonSerializer implements CommonSerializer {

private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);

private ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte[] serialize(Object obj) {
try {
return objectMapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
logger.error("序列化时有错误发生: {}", e.getMessage());
e.printStackTrace();
return null;
}
}

@Override
public Object deserialize(byte[] bytes, Class<?> clazz) {
try {
Object obj = objectMapper.readValue(bytes, clazz);
if(obj instanceof RpcRequest) {
obj = handleRequest(obj);
}
return obj;
} catch (IOException e) {
logger.error("反序列化时有错误发生: {}", e.getMessage());
e.printStackTrace();
return null;
}
}

/*
这里由于使用JSON序列化和反序列化Object数组,无法保证反序列化后仍然为原实例类型
需要重新判断处理
*/
private Object handleRequest(Object obj) throws IOException {
RpcRequest rpcRequest = (RpcRequest) obj;
for(int i = 0; i < rpcRequest.getParamTypes().length; i ++) {
Class<?> clazz = rpcRequest.getParamTypes()[i];
if(!clazz.isAssignableFrom(rpcRequest.getParameters()[i].getClass())) {
byte[] bytes = objectMapper.writeValueAsBytes(rpcRequest.getParameters()[i]);
rpcRequest.getParameters()[i] = objectMapper.readValue(bytes, clazz);
}
}
return rpcRequest;
}

@Override
public int getCode() {
return SerializerCode.valueOf("JSON").getCode();
}

}

JSON 序列化工具我使用的是 Jackson,在 pom.xml 中添加依赖即可。序列化和反序列化都比较循规蹈矩,把对象翻译成字节数组,和根据字节数组和 Class 反序列化成对象。这里有一个需要注意的点,就是在 RpcRequest 反序列化时,由于其中有一个字段是 Object 数组,在反序列化时序列化器会根据字段类型进行反序列化,而 Object 就是一个十分模糊的类型,会出现反序列化失败的现象,这时就需要 RpcRequest 中的另一个字段 ParamTypes 来获取到 Object 数组中的每个实例的实际类,辅助反序列化,这就是 handleRequest() 方法的作用。

上面提到的这种情况不会在其他序列化方式中出现,因为其他序列化方式是转换成字节数组,会记录对象的信息,而 JSON 方式本质上只是转换成 JSON 字符串,会丢失对象的类型信息。

3.4、NettyServerHandler 和 NettyClientHandler

NettyServerHandler 和 NettyClientHandler 都分别位于服务器端和客户端责任链的尾部,直接和 RpcServer 对象或 RpcClient 对象打交道,而无需关心字节序列的情况。

NettyServerhandler 用于接收 RpcRequest,并且执行调用,将调用结果返回封装成 RpcResponse 发送出去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private static RequestHandler requestHandler;
private static ServiceRegistry serviceRegistry;

static {
requestHandler = new RequestHandler();
serviceRegistry = new DefaultServiceRegistry();
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
try {
logger.info("服务器接收到请求: {}", msg);
String interfaceName = msg.getInterfaceName();
Object service = serviceRegistry.getService(interfaceName);
Object result = requestHandler.handle(msg, service);
ChannelFuture future = ctx.writeAndFlush(RpcResponse.success(result));
future.addListener(ChannelFutureListener.CLOSE);
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("处理过程调用时有错误发生:");
cause.printStackTrace();
ctx.close();
}

}

处理方式和 Socket 中的逻辑基本一致,不做讲解。

NettyClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {

private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
try {
logger.info(String.format("客户端接收到消息: %s", msg));
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
ctx.channel().attr(key).set(msg);
ctx.channel().close();
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("过程调用时有错误发生:");
cause.printStackTrace();
ctx.close();
}
}

这里只需要处理收到的消息,即 RpcResponse 对象,由于前面已经有解码器解码了,这里就直接将返回的结果放入 ctx 中即可。

3.5、测试

这里我们主要测试 Netty 方式。

NettyTestServer 如下:

1
2
3
4
5
6
7
8
9
10
11
public class NettyTestServer {

public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
ServiceRegistry registry = new DefaultServiceRegistry();
registry.register(helloService);
NettyServer server = new NettyServer();
server.start(9999);
}

}

NettyTestClient如下:

1
2
3
4
5
6
7
8
9
10
11
12
public class NettyTestClient {

public static void main(String[] args) {
RpcClient client = new NettyClient("127.0.0.1", 9999);
RpcClientProxy rpcClientProxy = new RpcClientProxy(client);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
HelloObject object = new HelloObject(12, "This is a message");
String res = helloService.hello(object);
System.out.println(res);
}

}

注意这里 RpcClientProxy 通过传入不同的 Client(SocketClient、NettyClient)来切换客户端不同的发送方式。

执行后可以获得与之前类似的结果。

4、Kryo序列化

上一节我们实现了一个通用的序列化框架,使得序列化方式具有了较高的扩展性,并且实现了一个基于 JSON 的序列化器。

但是,我们也提到过,这个基于 JSON 的序列化器有一个毛病,就是在某个类的属性反序列化时,如果属性声明为 Object 的,就会造成反序列化出错,通常会把 Object 属性直接反序列化成 String 类型,就需要其他参数辅助序列化。并且,JSON 序列化器是基于字符串(JSON 串)的,占用空间较大且速度较慢。

另外,我们在用过的RPC通信框架中,很少会发现使用JDK提供的序列化,主要是因为JDK默认的序列化存在着如下一些缺陷:无法跨语言易被攻击序列化后的流太大序列化性能太差等

这一节我们就来实现一个基于 Kryo 的序列化器。那么,什么是 Kryo?

Kryo 是一个快速高效的 Java 对象序列化框架,主要特点是高性能、高效和易用。最重要的两个特点,一是基于字节的序列化,对空间利用率较高,在网络传输时可以减小体积;二是序列化时记录属性对象的类型信息,这样在反序列化时就不会出现之前的问题了。

4.1、实现接口

首先添加 kryo 的依赖

1
2
3
4
5
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.2</version>
</dependency>

我们在上一节定义了一个通用的序列化接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface CommonSerializer {

byte[] serialize(Object obj);

Object deserialize(byte[] bytes, Class<?> clazz);

int getCode();

static CommonSerializer getByCode(int code) {
switch (code) {
case 0:
return new KryoSerializer();
case 1:
return new JsonSerializer();
default:
return null;
}
}
}

根据接口,我们的主要任务就是实现其中的主要两个方法,serialize() 和 deserialize() ,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class KryoSerializer implements CommonSerializer {

private static final Logger logger = LoggerFactory.getLogger(KryoSerializer.class);

private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.register(RpcResponse.class);
kryo.register(RpcRequest.class);
kryo.setReferences(true);
kryo.setRegistrationRequired(false);
return kryo;
});

@Override
public byte[] serialize(Object obj) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)){
Kryo kryo = kryoThreadLocal.get();
kryo.writeObject(output, obj);
kryoThreadLocal.remove();
return output.toBytes();
} catch (Exception e) {
logger.error("序列化时有错误发生:", e);
throw new SerializeException("序列化时有错误发生");
}
}

@Override
public Object deserialize(byte[] bytes, Class<?> clazz) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
Kryo kryo = kryoThreadLocal.get();
Object o = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return o;
} catch (Exception e) {
logger.error("反序列化时有错误发生:", e);
throw new SerializeException("反序列化时有错误发生");
}
}

@Override
public int getCode() {
return SerializerCode.valueOf("KRYO").getCode();
}
}

这里 Kryo 可能存在线程安全问题,文档上是推荐放在 ThreadLocal 里,一个线程一个 Kryo。在序列化时,先创建一个 Output 对象(Kryo 框架的概念),接着使用 writeObject 方法将对象写入 Output 中,最后调用 Output 对象的 toByte() 方法即可获得对象的字节数组。反序列化则是从 Input 对象中直接 readObject,这里只需要传入对象的类型,而不需要具体传入每一个属性的类型信息。

最后 getCode 方法中事实上是把序列化的编号写在一个枚举类 SerializerCode 里了:

1
2
3
4
5
public enum SerializerCode {
KRYO(0),
JSON(1);
private final int code;
}

4.2、替换序列化器并测试

我们只需要把 NettyServer 和 NettyClient 责任链中的 CommonEncoder 传入的参数改成 KryoSerializer 即可使用 Kryo 序列化。

1
2
-                             pipeline.addLast(new CommonEncoder(new JsonSerializer()));
+ pipeline.addLast(new CommonEncoder(new KryoSerializer()));

5、基于 Nacos 的服务器注册与发现

我们目前实现的框架看起来工作的还不错,但是有一个问题:我们的服务端地址是固化在代码中的,也就是说,对于一个客户端,它只会去寻找那么一个服务提供者,如果这个提供者挂了或者换了地址,那就没有办法了。

在分布式架构中,有一个重要的组件,就是服务注册中心,它用于保存多个服务提供者的信息,每个服务提供者在启动时都需要向注册中心注册自己所拥有的服务。这样客户端在发起 RPC 时,就可以直接去向注册中心请求服务提供者的信息,如果拿来的这个挂了,还可以重新请求,并且在这种情况下可以很方便地实现负载均衡。

常见的注册中心有 Eureka、Zookeeper 和 Nacos。

获得 Nacos

Nacos 是阿里开发的一款服务注册中心,在 SpringCloud Alibaba 逐步替代原始的 SpringCloud 的过程中,Nacos 逐步走红,所以我们就是用 Nacos 作为我们的注册中心。

下载解压的过程略过。注意 Nacos 是依赖数据库的,所以我们需要在配置文件中配置 Mysql 的信息。

为了简单,我们先以单机模式运行:

1
sh startup.sh -m standalone

启动后可以访问 Nacos 的web UI,地址 http://127.0.0.1:8848/nacos/index.html

默认的用户名和密码都是 nacos

5.1、在项目中使用 Nacos

引入 nacos-client 依赖:

1
2
3
4
5
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.3.0</version>
</dependency>

这里我们修正之前的概念,第二节把本地保存服务的类称为 ServiceRegistry,现在更改为 ServiceProvider,而 ServiceRegistry 作为远程注册表(Nacos)使用,对应的类名也有修改。

这里我们实现一个接口 ServiceRegistry:

1
2
3
4
public interface ServiceRegistry {
void register(String serviceName, InetSocketAddress inetSocketAddress);
InetSocketAddress lookupService(String serviceName);
}

两个方法很好理解,register 方法将服务的名称和地址注册进服务注册中心,lookupService 方法则是根据服务名称从注册中心获取到一个服务提供者的地址。

接口有了,我们就可以写实现类了,我们实现一个 Nacos 作为注册中心的实现类:NacosServiceRegistry,我们也可以使用 ZooKeeper 作为注册中心,实现接口就可以

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class NacosServiceRegistry implements ServiceRegistry {

private static final Logger logger = LoggerFactory.getLogger(NacosServiceRegistry.class);

private static final String SERVER_ADDR = "127.0.0.1:8848";
private static final NamingService namingService;

static {
try {
namingService = NamingFactory.createNamingService(SERVER_ADDR);
} catch (NacosException e) {
logger.error("连接到Nacos时有错误发生: ", e);
throw new RpcException(RpcError.FAILED_TO_CONNECT_TO_SERVICE_REGISTRY);
}m,.
}

@Override
public void register(String serviceName, InetSocketAddress inetSocketAddress) {
try {
namingService.registerInstance(serviceName, inetSocketAddress.getHostName(), inetSocketAddress.getPort());
} catch (NacosException e) {
logger.error("注册服务时有错误发生:", e);
throw new RpcException(RpcError.REGISTER_SERVICE_FAILED);
}
}

@Override
public InetSocketAddress lookupService(String serviceName) {
try {
List<Instance> instances = namingService.getAllInstances(serviceName);
Instance instance = instances.get(0);
return new InetSocketAddress(instance.getIp(), instance.getPort());
} catch (NacosException e) {
logger.error("获取服务时有错误发生:", e);
}
return null;
}
}

Nacos 的使用很简单,通过 NamingFactory 创建 NamingService 连接 Nacos(连接的时候没有找到修改用户名密码的方式……是不需要吗),连接的过程写在了静态代码块中,在类加载时自动连接。namingService 提供了两个很方便的接口,registerInstance 和 getAllInstances 方法,前者可以直接向 Nacos 注册服务,后者可以获得提供某个服务的所有提供者的列表。所以接口的这两个方法只需要包装一下就好了。

在 lookupService 方法中,通过 getAllInstance 获取到某个服务的所有提供者列表后,需要选择一个,这里就涉及了负载均衡策略,这里我们先选择第 0 个,后面某节会详细讲解负载均衡。

5.2、注册服务

我们修改 RpcServer 接口,新增一个方法 publishService,用于向 Nacos 注册服务:

1
<T> void publishService(Object service, Class<T> serviceClass);

接着只需要实现这个方法即可,以 NettyServer 的实现为例,NettyServer 在创建时需要创建一个 ServiceRegistry 了:

1
2
3
4
5
6
public NettyServer(String host, int port) {
this.host = host;
this.port = port;
serviceRegistry = new NacosServiceRegistry();
serviceProvider = new ServiceProviderImpl();
}

接着实现 publishService 方法即可:

1
2
3
4
5
6
7
8
9
public <T> void publishService(Object service, Class<T> serviceClass) {
if(serializer == null) {
logger.error("未设置序列化器");
throw new RpcException(RpcError.SERIALIZER_NOT_FOUND);
}
serviceProvider.addServiceProvider(service);
serviceRegistry.register(serviceClass.getCanonicalName(), new InetSocketAddress(host, port));
start();
}

publishService 需要将服务保存在本地的注册表,同时注册到 Nacos 上。我这里的实现是注册完一个服务后直接调用 start() 方法,这是个不太好的实现……导致一个服务端只能注册一个服务,之后可以多注册几个然后再手动调用 start() 方法。

5.3、发现服务

客户端的修改就更简单了,以 NettyClient 为例,在过去创建 NettyClient 时,需要传入 host 和 port,现在这个 host 和 port 是通过 Nacos 获取的,sendRequest 修改如下:

1
2
3
4
5
6
7
8
9
10
public Object sendRequest(RpcRequest rpcRequest) {
if(serializer == null) {
logger.error("未设置序列化器");
throw new RpcException(RpcError.SERIALIZER_NOT_FOUND);
}
AtomicReference<Object> result = new AtomicReference<>(null);
try {
InetSocketAddress inetSocketAddress = serviceRegistry.lookupService(rpcRequest.getInterfaceName());
Channel channel = ChannelProvider.get(inetSocketAddress, serializer);
...

重点是最后两句,过去是直接使用传入的 host 和 port 直接构造 channel,现在是首先从 ServiceRegistry 中获取到服务的地址和端口,再构造。

5.4、测试

NettyTestClient 如下:

1
2
3
4
5
6
7
8
9
10
11
public class NettyTestClient {
public static void main(String[] args) {
RpcClient client = new NettyClient();
client.setSerializer(new ProtobufSerializer());
RpcClientProxy rpcClientProxy = new RpcClientProxy(client);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
HelloObject object = new HelloObject(12, "This is a message");
String res = helloService.hello(object);
System.out.println(res);
}
}

构造 RpcClient 时不再需要传入地址和端口。

NettyTestServer 如下:

1
2
3
4
5
6
7
8
public class NettyTestServer {
public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
NettyServer server = new NettyServer("127.0.0.1", 9999);
server.setSerializer(new ProtobufSerializer());
server.publishService(helloService, HelloService.class);
}
}

我这里是把 start 写在了 publishService 中,实际应当分离,否则只能注册一个服务。

分别启动,可以看到和之前相同的结果。

这里如果通过修改不同的端口,启动两个服务的话,会看到即使客户端多次调用,也只是由同一个服务端提供服务,这是因为在 NacosServiceRegistry 中,我们直接选择了服务列表的第 0 个,这个会在之后讲解负载均衡时作出修改。

6、自动注销服务和负载均衡策略

6.1、自动注销服务

上一节我们实现了服务的自动注册和发现,但是有些细心的同学就可能会发现,如果你启动完成服务端后把服务端给关闭了,并不会自动地注销 Nacos 中对应的服务信息,这样就导致了当客户端再次向 Nacos 请求服务时,会获取到已经关闭的服务端信息,最终就有可能因为连接不到服务器而调用失败。

那么我们就需要一种办法,在服务端关闭之前自动向 Nacos 注销服务。但是有一个问题,我们不知道什么时候服务器会关闭,也就不知道这个方法调用的时机,就没有办法手工去调用。这时,我们就需要钩子。

钩子是什么呢?是在某些事件发生后自动去调用的方法。那么我们只需要把注销服务的方法写到关闭系统的钩子方法里就行了。

首先先写向 Nacos 注销所有服务的方法,这部分被放在了 NacosUtils 中作为一个静态方法,NacosUtils 是一个 Nacos 相关的工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void clearRegistry() {
if(!serviceNames.isEmpty() && address != null) {
String host = address.getHostName();
int port = address.getPort();
Iterator<String> iterator = serviceNames.iterator();
while(iterator.hasNext()) {
String serviceName = iterator.next();
try {
namingService.deregisterInstance(serviceName, host, port);
} catch (NacosException e) {
logger.error("注销服务 {} 失败", serviceName, e);
}
}
}
}

所有的服务名称都被存储在 NacosUtils 类中的 serviceNames 中,在注销时只需要用迭代器迭代所有服务名,调用 deregisterInstance 即可。

接着就是钩子了,新建一个类,ShutdownHook:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ShutdownHook {

private static final Logger logger = LoggerFactory.getLogger(ShutdownHook.class);

private final ExecutorService threadPool = ThreadPoolFactory.createDefaultThreadPool("shutdown-hook");
private static final ShutdownHook shutdownHook = new ShutdownHook();

public static ShutdownHook getShutdownHook() {
return shutdownHook;
}

public void addClearAllHook() {
logger.info("关闭后将自动注销所有服务");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
NacosUtil.clearRegistry();
threadPool.shutdown();
}));
}

}

使用了单例模式创建其对象,在 addClearAllHook 中,Runtime 对象是 JVM 虚拟机的运行时环境,调用其 addShutdownHook 方法增加一个钩子函数,创建一个新线程调用 clearRegistry 方法完成注销工作。这个钩子函数会在 JVM 关闭之前被调用。

这样在 RpcServer 启动之前,只需要调用 addClearAllHook,就可以注册这个钩子了。例如在 NettyServer 中:

1
2
3
             ChannelFuture future = serverBootstrap.bind(host, port).sync();
+ ShutdownHook.getShutdownHook().addClearAllHook();
future.channel().closeFuture().sync();

启动服务端后再关闭,就会发现 Nacos 中的注册信息都被注销了。

6.2、负载均衡策略

负载均衡大家应该都熟悉,在上一节中客户端在 lookupService 方法中,从 Nacos 获取到的是所有提供这个服务的服务端信息列表,我们就需要从中选择一个,这便涉及到客户端侧的负载均衡策略。我们新建一个接口:LoadBalancer:

1
2
3
public interface LoadBalancer {
Instance select(List<Instance> instances);
}

接口中的 select 方法用于从一系列 Instance 中选择一个。这里我就实现两个比较经典的算法:随机和转轮。

随机算法顾名思义,就是随机选一个,毫无技术含量:

1
2
3
4
5
6
7
8
public class RandomLoadBalancer implements LoadBalancer {

@Override
public Instance select(List<Instance> instances) {
return instances.get(new Random().nextInt(instances.size()));
}

}

而转轮算法大家也应该了解,按照顺序依次选择第一个、第二个、第三个……这里就需要一个变量来表示当前选到了第几个:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class RoundRobinLoadBalancer implements LoadBalancer {

private int index = 0;

@Override
public Instance select(List<Instance> instances) {
if(index >= instances.size()) {
index %= instances.size();
}
return instances.get(index++);
}

}

index 就表示当前选到了第几个服务器,并且每次选择后都会自增一。

最后在 NacosServiceRegistry 中集成就可以了,这里选择外部传入的方式传入 LoadBalancer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class NacosServiceDiscovery implements ServiceDiscovery {
private final LoadBalancer loadBalancer;

public NacosServiceDiscovery(LoadBalancer loadBalancer) {
if(loadBalancer == null) this.loadBalancer = new RandomLoadBalancer();
else this.loadBalancer = loadBalancer;
}

public InetSocketAddress lookupService(String serviceName) {
try {
List<Instance> instances = NacosUtil.getAllInstance(serviceName);
Instance instance = loadBalancer.select(instances);
return new InetSocketAddress(instance.getIp(), instance.getPort());
} catch (NacosException e) {
logger.error("获取服务时有错误发生:", e);
}
return null;
}
}

而这个负载均衡策略,也可以在创建客户端时指定,例如无参构造 NettyClient 时就用默认的策略,也可以有参构造传入策略,具体的实现留给大家。

7、服务端自动注册服务

到目前为止,客户端看起来挺完美了,但是在服务端,我们却需要手动创建服务对象,并且手动进行注册,如果服务端提供了很多服务,这个操作就会变得很繁琐。本节就会介绍如何基于注解进行服务的自动注册。

本节需要一些反射知识。

7.1、定义注解

首先我们需要定义两个注解:Service 和 ServiceScan:

Service.java

1
2
3
4
5
6
7
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Service {

public String name() default "";

}

ServiceScan.java

1
2
3
4
5
6
7
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ServiceScan {

public String value() default "";

}

@Service 放在一个类上,标识这个类提供一个服务,@ServiceScan 放在启动的入口类上(main 方法所在的类),标识服务的扫描的包的范围。Service 注解的值定义为该服务的名称,默认值是该类的完整类名,而 ServiceScan 的值定义为扫描范围的根包,默认值为入口类所在的包,扫描时会扫描该包及其子包下所有的类,找到标记有 Service 的类,并注册。

7.2、工具类 ReflectUtil

这个类是一系列工具方法,不做讲解,只说用途,感兴趣的可以研究研究具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
public class ReflectUtil {

public static String getStackTrace() {
StackTraceElement[] stack = new Throwable().getStackTrace();
return stack[stack.length - 1].getClassName();
}

public static Set<Class<?>> getClasses(String packageName) {
Set<Class<?>> classes = new LinkedHashSet<>();
boolean recursive = true;
String packageDirName = packageName.replace('.', '/');
Enumeration<URL> dirs;
try {
dirs = Thread.currentThread().getContextClassLoader().getResources(
packageDirName);
// 循环迭代下去
while (dirs.hasMoreElements()) {
// 获取下一个元素
URL url = dirs.nextElement();
// 得到协议的名称
String protocol = url.getProtocol();
// 如果是以文件的形式保存在服务器上
if ("file".equals(protocol)) {
// 获取包的物理路径
String filePath = URLDecoder.decode(url.getFile(), "UTF-8");
// 以文件的方式扫描整个包下的文件 并添加到集合中
findAndAddClassesInPackageByFile(packageName, filePath,
recursive, classes);
} else if ("jar".equals(protocol)) {
// 如果是jar包文件
// 定义一个JarFile
JarFile jar;
try {
// 获取jar
jar = ((JarURLConnection) url.openConnection())
.getJarFile();
// 从此jar包 得到一个枚举类
Enumeration<JarEntry> entries = jar.entries();
// 同样的进行循环迭代
while (entries.hasMoreElements()) {
// 获取jar里的一个实体 可以是目录 和一些jar包里的其他文件 如META-INF等文件
JarEntry entry = entries.nextElement();
String name = entry.getName();
// 如果是以/开头的
if (name.charAt(0) == '/') {
// 获取后面的字符串
name = name.substring(1);
}
// 如果前半部分和定义的包名相同
if (name.startsWith(packageDirName)) {
int idx = name.lastIndexOf('/');
// 如果以"/"结尾 是一个包
if (idx != -1) {
// 获取包名 把"/"替换成"."
packageName = name.substring(0, idx)
.replace('/', '.');
}
// 如果可以迭代下去 并且是一个包
if ((idx != -1) || recursive) {
// 如果是一个.class文件 而且不是目录
if (name.endsWith(".class")
&& !entry.isDirectory()) {
// 去掉后面的".class" 获取真正的类名
String className = name.substring(
packageName.length() + 1, name
.length() - 6);
try {
// 添加到classes
classes.add(Class
.forName(packageName + '.'
+ className));
} catch (ClassNotFoundException e) {
// log
// .error("添加用户自定义视图类错误 找不到此类的.class文件");
e.printStackTrace();
}
}
}
}
}
} catch (IOException e) {
// log.error("在扫描用户定义视图时从jar包获取文件出错");
e.printStackTrace();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}

return classes;
}

private static void findAndAddClassesInPackageByFile(String packageName,
String packagePath, final boolean recursive, Set<Class<?>> classes) {
// 获取此包的目录 建立一个File
File dir = new File(packagePath);
// 如果不存在或者 也不是目录就直接返回
if (!dir.exists() || !dir.isDirectory()) {
// log.warn("用户定义包名 " + packageName + " 下没有任何文件");
return;
}
// 如果存在 就获取包下的所有文件 包括目录
File[] dirfiles = dir.listFiles(new FileFilter() {
// 自定义过滤规则 如果可以循环(包含子目录) 或则是以.class结尾的文件(编译好的java类文件)
public boolean accept(File file) {
return (recursive && file.isDirectory())
|| (file.getName().endsWith(".class"));
}
});
// 循环所有文件
for (File file : dirfiles) {
// 如果是目录 则继续扫描
if (file.isDirectory()) {
findAndAddClassesInPackageByFile(packageName + "."
+ file.getName(), file.getAbsolutePath(), recursive,
classes);
} else {
// 如果是java类文件 去掉后面的.class 只留下类名
String className = file.getName().substring(0,
file.getName().length() - 6);
try {
// 添加到集合中去
//classes.add(Class.forName(packageName + '.' + className));
//经过回复同学的提醒,这里用forName有一些不好,会触发static方法,没有使用classLoader的load干净
classes.add(Thread.currentThread().getContextClassLoader().loadClass(packageName + '.' + className));
} catch (ClassNotFoundException e) {
// log.error("添加用户自定义视图类错误 找不到此类的.class文件");
e.printStackTrace();
}
}
}
}

}

主要就是 getClasses 方法,传入一个包名,用于扫描该包及其子包下所有的类,并将其 Class 对象放入一个 Set 中返回。

7.3、扫描服务

由于扫描服务这一步是一个比较公共的方法,无论是 Socket 还是 Netty 的服务端都需要这个方法,于是我对项目做了一点重构,使用了一个抽象类 AbstractRpcServer 实现了 RpcServer 接口,而 NettyServer 和 SocketServer 继承自 AbstractRpcServer,将 scanServices 方法放在抽象类中,而 start 方法则由具体实现类来实现。

scanServices 方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void scanServices() {
String mainClassName = ReflectUtil.getStackTrace();
Class<?> startClass;
try {
startClass = Class.forName(mainClassName);
if(!startClass.isAnnotationPresent(ServiceScan.class)) {
logger.error("启动类缺少 @ServiceScan 注解");
throw new RpcException(RpcError.SERVICE_SCAN_PACKAGE_NOT_FOUND);
}
} catch (ClassNotFoundException e) {
logger.error("出现未知错误");
throw new RpcException(RpcError.UNKNOWN_ERROR);
}
String basePackage = startClass.getAnnotation(ServiceScan.class).value();
if("".equals(basePackage)) {
basePackage = mainClassName.substring(0, mainClassName.lastIndexOf("."));
}
Set<Class<?>> classSet = ReflectUtil.getClasses(basePackage);
for(Class<?> clazz : classSet) {
if(clazz.isAnnotationPresent(Service.class)) {
String serviceName = clazz.getAnnotation(Service.class).name();
Object obj;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
logger.error("创建 " + clazz + " 时有错误发生");
continue;
}
if("".equals(serviceName)) {
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> oneInterface: interfaces){
publishService(obj, oneInterface.getCanonicalName());
}
} else {
publishService(obj, serviceName);
}
}
}
}

我们首先需要获得要扫描的包的范围,就需要获取到 ServiceScan 注解的值,而我们前面说过,这个注解是加在启动类上的,那么,我们怎么知道启动类是哪一个呢?答案是通过调用栈。方法的调用和返回是通过方法调用栈来实现的,当调用一个方法时,该方法入栈,该方法返回时,该方法出站,控制回到栈顶的方法。那么,main 方法一定位于调用栈的最底端,在 ReflectUtils 中,我写了一个 getStackTrace 方法(名字起得不好),用于获取 main 所在的类。通过 Class 对象的 isAnnotationPresent 方法来判断该类是否有 ServiceScan 注解。如果有,通过startClass.getAnnotation(ServiceScan.class).value(); 获取注解的值。

当获得扫描的范围后,就可以通过ReflectUtil.getClasses(basePackage) 获取到所有的 Class 了,逐个判断是否有 Service 注解,如果有的话,通过反射创建该对象,并且调用 publishService 注册即可。

7.4、开启自动注册并测试

以 NettyServer 为例,在 NettyServer 的构造方法最后,调用 scanServices 方法,即可自动注册所有服务:

1
2
3
4
5
6
7
8
public NettyServer(String host, int port, Integer serializer) {
this.host = host;
this.port = port;
serviceRegistry = new NacosServiceRegistry();
serviceProvider = new ServiceProviderImpl();
this.serializer = CommonSerializer.getByCode(serializer);
scanServices();
}

不要忘了在 HelloServiceImpl 类上加上 @service 注解:

1
2
3
4
5
6
7
@Service
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String name) {
return "Hello, " + name;
}
}

并且在服务器启动类上加上注解:

1
2
3
4
5
6
7
@ServiceScan
public class NettyTestServer {
public static void main(String[] args) {
NettyServer server = new NettyServer("127.0.0.1", 9999, CommonSerializer.PROTOBUF_SERIALIZER);
server.start();
}
}

直接使用启动类所在的包作为扫描根包。

启动类变得无比简洁!启动后应该能看到和之前相同的结果。



本站总访问量