Flink-06-RPC通信框架(flink-rpc)
一、模块概览
1.1 模块职责
RPC模块是Flink分布式通信的基础,负责组件间的远程方法调用。
核心职责:
- 提供异步RPC框架
- 管理RPC端点生命周期
- 处理消息路由和调度
- 支持多种RPC实现(Akka、Netty)
1.2 RPC架构
flowchart TB
subgraph "RPC接口层"
RpcGateway[RpcGateway]
JobMasterGateway[JobMasterGateway]
TaskExecutorGateway[TaskExecutorGateway]
ResourceManagerGateway[ResourceManagerGateway]
end
subgraph "RPC端点层"
RpcEndpoint[RpcEndpoint]
JobMaster[JobMaster]
TaskExecutor[TaskExecutor]
ResourceManager[ResourceManager]
end
subgraph "RPC服务层"
RpcService[RpcService]
AkkaRpcService[AkkaRpcService]
ActorSystem[Akka ActorSystem]
end
subgraph "消息处理层"
RpcServer[RpcServer]
AkkaInvocationHandler[AkkaInvocationHandler]
MessageDispatcher[消息分发]
end
RpcGateway --> JobMasterGateway
RpcGateway --> TaskExecutorGateway
RpcGateway --> ResourceManagerGateway
RpcEndpoint --> JobMaster
RpcEndpoint --> TaskExecutor
RpcEndpoint --> ResourceManager
JobMaster -->|实现| JobMasterGateway
TaskExecutor -->|实现| TaskExecutorGateway
ResourceManager -->|实现| ResourceManagerGateway
RpcService --> AkkaRpcService
AkkaRpcService --> ActorSystem
RpcEndpoint -->|使用| RpcService
RpcService --> RpcServer
RpcServer --> AkkaInvocationHandler
AkkaInvocationHandler --> MessageDispatcher
二、核心组件
2.1 RpcGateway - RPC接口
/**
* RPC网关基接口,所有RPC接口必须继承此接口
*/
public interface RpcGateway {
/**
* 获取网关地址
*/
String getAddress();
/**
* 获取网关主机名
*/
String getHostname();
}
实现示例:
public interface JobMasterGateway extends RpcGateway {
/**
* 提交任务到JobMaster
*/
CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
Time timeout);
/**
* 更新任务执行状态
*/
CompletableFuture<Acknowledge> updateTaskExecutionState(
TaskExecutionState taskExecutionState);
/**
* 请求下一个输入分片
*/
CompletableFuture<SerializedInputSplit> requestNextInputSplit(
JobVertexID vertexID,
ExecutionAttemptID executionAttempt);
}
2.2 RpcEndpoint - RPC端点
/**
* RPC端点基类,所有分布式组件的基类
*/
public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
// RPC服务
private final RpcService rpcService;
// 端点ID
private final String endpointId;
// 主线程执行器
private final RpcServer rpcServer;
protected RpcEndpoint(RpcService rpcService, String endpointId) {
this.rpcService = rpcService;
this.endpointId = endpointId;
this.rpcServer = rpcService.startServer(this);
}
/**
* 启动端点
*/
public final void start() {
rpcServer.start();
}
/**
* 在主线程中执行Runnable
*/
protected void runAsync(Runnable runnable) {
rpcServer.execute(runnable);
}
/**
* 在主线程中执行Callable
*/
protected <V> CompletableFuture<V> callAsync(Callable<V> callable, Time timeout) {
return rpcServer.callAsync(callable, timeout);
}
/**
* 创建到其他RpcGateway的连接
*/
protected <C extends RpcGateway> CompletableFuture<C> connect(
String address,
Class<C> clazz) {
return rpcService.connect(address, clazz);
}
/**
* 关闭端点
*/
@Override
public CompletableFuture<Void> closeAsync() {
return rpcServer.terminateAsync();
}
}
使用示例:
public class JobMaster extends RpcEndpoint implements JobMasterGateway {
public JobMaster(
RpcService rpcService,
JobMasterId jobMasterId,
Configuration configuration) throws Exception {
super(rpcService, "JobMaster_" + jobMasterId);
}
@Override
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
Time timeout) {
// 在主线程中执行
return callAsync(
() -> {
// 执行提交逻辑
deployTask(tdd);
return Acknowledge.get();
},
timeout
);
}
@Override
protected void onStart() throws Exception {
// 端点启动时的初始化逻辑
connectToResourceManager();
}
private void connectToResourceManager() {
// 连接到ResourceManager
CompletableFuture<ResourceManagerGateway> rmGatewayFuture =
connect(resourceManagerAddress, ResourceManagerGateway.class);
rmGatewayFuture.thenAccept(rmGateway -> {
// 使用ResourceManager网关
runAsync(() -> {
resourceManagerGateway = rmGateway;
registerAtResourceManager();
});
});
}
}
2.3 RpcService - RPC服务
public interface RpcService {
/**
* 获取RPC服务地址
*/
String getAddress();
/**
* 启动RPC Server
*/
<C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint);
/**
* 连接到远程RpcGateway
*/
<C extends RpcGateway> CompletableFuture<C> connect(
String address,
Class<C> clazz);
/**
* 连接到远程RpcGateway(带超时)
*/
<C extends RpcGateway> CompletableFuture<C> connect(
String address,
Class<C> clazz,
Time timeout);
/**
* 创建固定延迟调度
*/
ScheduledFuture<?> scheduleRunnable(
Runnable runnable,
long initialDelay,
long period,
TimeUnit unit);
/**
* 停止RPC服务
*/
CompletableFuture<Void> stopService();
}
2.4 AkkaRpcService - Akka实现
public class AkkaRpcService implements RpcService {
private final ActorSystem actorSystem;
private final AkkaRpcServiceConfiguration configuration;
private final Executor executor;
@Override
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
// 1. 创建Actor Props
Props akkaRpcActorProps = Props.create(
AkkaRpcActor.class,
rpcEndpoint,
configuration.getMaximumFramesize());
// 2. 创建Actor
ActorRef actorRef = actorSystem.actorOf(
akkaRpcActorProps,
rpcEndpoint.getEndpointId());
// 3. 创建RpcServer(InvocationHandler)
return new AkkaInvocationHandler(
actorRef,
rpcEndpoint.getClass(),
configuration.getTimeout(),
configuration.getMaximumFramesize(),
actorSystem.scheduler());
}
@Override
public <C extends RpcGateway> CompletableFuture<C> connect(
String address,
Class<C> clazz) {
return callAsync(
() -> {
// 1. 解析Actor路径
ActorSelection actorSel = actorSystem.actorSelection(address);
// 2. 发送Identify消息
CompletableFuture<ActorIdentity> identifyFuture =
Patterns.ask(actorSel, new Identify(42), timeout)
.toCompletableFuture()
.thenApply(response -> (ActorIdentity) response);
// 3. 创建代理
return identifyFuture.thenApply(identity -> {
ActorRef actorRef = identity.getActorRef().get();
return (C) Proxy.newProxyInstance(
clazz.getClassLoader(),
new Class[]{clazz},
new AkkaInvocationHandler(actorRef, ...));
});
},
timeout
).thenCompose(Function.identity());
}
}
三、消息处理机制
3.1 消息类型
// RPC调用消息
public class RpcInvocation implements Serializable {
private final String methodName;
private final Class<?>[] parameterTypes;
private final Object[] args;
}
// 本地消息(直接在主线程执行)
public interface LocalRpcInvocation {
void invoke();
}
// 运行消息
public class RunAsync implements LocalRpcInvocation {
private final Runnable runnable;
@Override
public void invoke() {
runnable.run();
}
}
// 调用消息
public class CallAsync implements LocalRpcInvocation {
private final Callable<?> callable;
private final CompletableFuture<?> resultFuture;
@Override
public void invoke() {
try {
Object result = callable.call();
resultFuture.complete(result);
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
}
}
3.2 消息分发
sequenceDiagram
participant Client as 客户端
participant Proxy as RPC代理
participant Actor as Akka Actor
participant Endpoint as RpcEndpoint
participant MainThread as 主线程
Client->>Proxy: 调用方法
Proxy->>Proxy: 创建RpcInvocation
Proxy->>Actor: 发送消息
activate Actor
Actor->>Actor: 接收消息
Actor->>MainThread: 投递到主线程
deactivate Actor
activate MainThread
MainThread->>Endpoint: 执行方法
Endpoint-->>MainThread: 返回结果
MainThread-->>Actor: 完成Future
deactivate MainThread
Actor-->>Proxy: 返回CompletableFuture
Proxy-->>Client: 返回结果
3.3 AkkaRpcActor
public class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
private final T rpcEndpoint;
@Override
public Receive createReceive() {
return receiveBuilder()
.match(RpcInvocation.class, this::handleRpcInvocation)
.match(LocalRpcInvocation.class, this::handleLocalRpcInvocation)
.match(ControlMessages.class, this::handleControlMessage)
.matchAny(this::handleUnknownMessage)
.build();
}
private void handleRpcInvocation(RpcInvocation rpcInvocation) {
try {
// 1. 获取方法
Method method = lookupMethod(
rpcInvocation.getMethodName(),
rpcInvocation.getParameterTypes());
// 2. 执行方法
Object result = method.invoke(rpcEndpoint, rpcInvocation.getArgs());
// 3. 返回结果
if (result instanceof CompletableFuture) {
CompletableFuture<?> future = (CompletableFuture<?>) result;
Patterns.pipe(future, getContext().dispatcher())
.to(getSender());
} else {
getSender().tell(result, getSelf());
}
} catch (Exception e) {
getSender().tell(new Status.Failure(e), getSelf());
}
}
private void handleLocalRpcInvocation(LocalRpcInvocation invocation) {
// 在Actor线程中直接执行
invocation.invoke();
}
}
四、使用示例
4.1 定义RPC接口
/**
* 自定义RPC Gateway
*/
public interface MyServiceGateway extends RpcGateway {
/**
* 同步方法(返回CompletableFuture)
*/
CompletableFuture<String> getData(String key);
/**
* 异步方法
*/
CompletableFuture<Void> putData(String key, String value);
/**
* Fire-and-forget方法(@RpcMethod注解)
*/
void notifyEvent(Event event);
}
4.2 实现RPC端点
public class MyService extends RpcEndpoint implements MyServiceGateway {
private final Map<String, String> dataStore = new ConcurrentHashMap<>();
public MyService(RpcService rpcService) {
super(rpcService, "MyService");
}
@Override
public CompletableFuture<String> getData(String key) {
// 方法自动在主线程执行
return CompletableFuture.completedFuture(dataStore.get(key));
}
@Override
public CompletableFuture<Void> putData(String key, String value) {
// 异步操作
return CompletableFuture.runAsync(
() -> {
dataStore.put(key, value);
// 持久化到外部存储
persistToStorage(key, value);
},
getMainThreadExecutor()
);
}
@Override
public void notifyEvent(Event event) {
// 在主线程异步执行
runAsync(() -> {
handleEvent(event);
});
}
@Override
protected void onStart() throws Exception {
log.info("MyService started at {}", getAddress());
}
@Override
protected CompletableFuture<Void> onStop() {
log.info("MyService stopping");
return CompletableFuture.completedFuture(null);
}
}
4.3 使用RPC服务
public class RpcExample {
public static void main(String[] args) throws Exception {
// 1. 创建RpcService
AkkaRpcService rpcService = AkkaRpcServiceUtils.createRemoteRpcService(
configuration,
"localhost",
"0", // 自动分配端口
null,
null);
try {
// 2. 启动服务端
MyService myService = new MyService(rpcService);
myService.start();
String serviceAddress = myService.getAddress();
log.info("Service started at: {}", serviceAddress);
// 3. 客户端连接
CompletableFuture<MyServiceGateway> gatewayFuture =
rpcService.connect(serviceAddress, MyServiceGateway.class);
MyServiceGateway gateway = gatewayFuture.get();
// 4. 调用RPC方法
CompletableFuture<Void> putFuture = gateway.putData("key1", "value1");
putFuture.get();
CompletableFuture<String> getFuture = gateway.getData("key1");
String value = getFuture.get();
log.info("Retrieved value: {}", value);
// 5. Fire-and-forget调用
gateway.notifyEvent(new Event("test"));
// 6. 关闭
myService.closeAsync().get();
} finally {
rpcService.stopService().get();
}
}
}
五、高级特性
5.1 Fencing Token
/**
* 带Fencing的RPC Gateway(防止脑裂)
*/
public interface FencedRpcGateway<F extends Serializable> extends RpcGateway {
F getFencingToken();
}
/**
* 使用Fencing Token
*/
public interface JobMasterGateway extends FencedRpcGateway<JobMasterId> {
CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId, // Fencing Token
Time timeout);
}
// 实现
public class JobMaster extends FencedRpcEndpoint<JobMasterId>
implements JobMasterGateway {
@Override
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
Time timeout) {
// 验证Fencing Token
validateRunsInMainThread();
return callAsync(() -> {
// 验证JobMasterId匹配
if (!getFencingToken().equals(jobMasterId)) {
throw new FlinkException("Invalid JobMasterId");
}
// 执行任务提交
return internalSubmitTask(tdd);
}, timeout);
}
}
5.2 消息批处理
/**
* 批量RPC调用
*/
public class BatchRpcCaller {
private final List<CompletableFuture<Void>> pendingCalls = new ArrayList<>();
private final MyServiceGateway gateway;
public void addCall(String key, String value) {
CompletableFuture<Void> future = gateway.putData(key, value);
pendingCalls.add(future);
}
public CompletableFuture<Void> flushAll() {
return CompletableFuture.allOf(
pendingCalls.toArray(new CompletableFuture[0])
);
}
}
5.3 超时与重试
/**
* 带超时和重试的RPC调用
*/
public class RpcWithRetry {
public static <T> CompletableFuture<T> callWithRetry(
Supplier<CompletableFuture<T>> rpcCall,
int maxRetries,
Duration timeout) {
return callWithRetryInternal(rpcCall, maxRetries, timeout, 0);
}
private static <T> CompletableFuture<T> callWithRetryInternal(
Supplier<CompletableFuture<T>> rpcCall,
int maxRetries,
Duration timeout,
int currentAttempt) {
CompletableFuture<T> resultFuture = rpcCall.get();
return resultFuture
.orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
.exceptionally(throwable -> {
if (currentAttempt < maxRetries) {
// 重试
log.warn("RPC call failed, retrying (attempt {}/{})",
currentAttempt + 1, maxRetries, throwable);
return callWithRetryInternal(
rpcCall, maxRetries, timeout, currentAttempt + 1);
} else {
// 失败
throw new CompletionException(throwable);
}
})
.thenCompose(Function.identity());
}
}
六、性能优化
6.1 Akka配置调优
akka {
actor {
default-dispatcher {
# 线程池类型
type = "Dispatcher"
executor = "fork-join-executor"
fork-join-executor {
# 并行度(通常为CPU核心数)
parallelism-min = 8
parallelism-factor = 3.0
parallelism-max = 64
}
# 吞吐量(每次处理消息数)
throughput = 5
}
}
remote {
# 最大帧大小
maximum-frame-size = 10485760b # 10MB
# 传输配置
netty.tcp {
# 发送缓冲区
send-buffer-size = 10485760b
# 接收缓冲区
receive-buffer-size = 10485760b
# 最大帧大小
maximum-frame-size = 10485760b
}
}
}
6.2 批量消息处理
/**
* 使用批量消息减少RPC调用次数
*/
public interface BatchGateway extends RpcGateway {
CompletableFuture<List<Result>> batchProcess(List<Request> requests);
}
public class BatchProcessor extends RpcEndpoint implements BatchGateway {
@Override
public CompletableFuture<List<Result>> batchProcess(List<Request> requests) {
return callAsync(() -> {
List<Result> results = new ArrayList<>(requests.size());
for (Request request : requests) {
results.add(processRequest(request));
}
return results;
}, timeout);
}
}
七、故障排查
7.1 常见问题
问题1:RPC超时
// 增加超时时间
AkkaRpcServiceConfiguration configuration =
AkkaRpcServiceConfiguration.fromConfiguration(config)
.withTimeout(Time.minutes(5)); // 增加到5分钟
问题2:消息过大
// 增加最大帧大小
config.setString("akka.remote.netty.tcp.maximum-frame-size", "20971520b"); // 20MB
问题3:Actor死锁
// 避免在主线程中阻塞等待
// Bad:
Object result = someRpcCall().get(); // 阻塞主线程
// Good:
someRpcCall().thenAccept(result -> {
runAsync(() -> {
// 在主线程中处理结果
handleResult(result);
});
});
八、总结
Flink RPC框架提供了:
核心特性:
- 基于Akka的异步RPC
- 主线程执行模型(保证线程安全)
- Fencing机制(防止脑裂)
- 灵活的超时和重试
最佳实践:
- 使用CompletableFuture进行异步编程
- 避免在主线程中阻塞等待
- 合理配置超时和帧大小
- 使用批量消息减少RPC次数
性能优化:
- 调优Akka线程池
- 增大缓冲区大小
- 批量处理消息
- 合理设置超时时间
理解RPC框架对于理解Flink分布式协调机制至关重要。