Flink-01-核心API与类型系统(flink-core-api)
一、模块概览
1.1 模块职责
核心API模块是Flink的基础层,定义了所有上层API共享的核心接口和类型系统。
核心职责:
- 定义用户函数接口(MapFunction、FilterFunction等)
- 提供类型信息和序列化框架
- 定义状态API接口
- 提供时间和Watermark抽象
- 定义连接器接口(Source、Sink)
1.2 模块架构
flowchart TB
subgraph "函数接口层 Function Interfaces"
Function[Function基接口]
MapFunction[MapFunction]
FilterFunction[FilterFunction]
FlatMapFunction[FlatMapFunction]
ReduceFunction[ReduceFunction]
AggregateFunction[AggregateFunction]
ProcessFunction[ProcessFunction]
end
subgraph "类型系统 Type System"
TypeInfo[TypeInformation]
BasicType[BasicTypeInfo]
TupleType[TupleTypeInfo]
PojoType[PojoTypeInfo]
CollectionType[CollectionTypeInfo]
TypeSerializer[TypeSerializer]
end
subgraph "状态API State API"
State[State接口]
ValueState[ValueState]
ListState[ListState]
MapState[MapState]
ReducingState[ReducingState]
AggregatingState[AggregatingState]
end
subgraph "时间与Watermark Time & Watermark"
Time[Time]
Watermark[Watermark]
WatermarkStrategy[WatermarkStrategy]
TimestampAssigner[TimestampAssigner]
end
subgraph "连接器接口 Connector API"
Source[Source]
Sink[Sink]
SourceSplit[SourceSplit]
SplitEnumerator[SplitEnumerator]
SplitReader[SplitReader]
end
Function --> MapFunction
Function --> FilterFunction
Function --> FlatMapFunction
Function --> ReduceFunction
TypeInfo --> BasicType
TypeInfo --> TupleType
TypeInfo --> PojoType
TypeInfo --> TypeSerializer
State --> ValueState
State --> ListState
State --> MapState
WatermarkStrategy --> TimestampAssigner
WatermarkStrategy --> Watermark
Source --> SourceSplit
Source --> SplitEnumerator
二、核心组件详解
2.1 Function - 用户函数接口
2.1.1 基础Function接口
// 所有用户函数的基接口
public interface Function extends java.io.Serializable {
// 空接口,支持Lambda表达式
}
2.1.2 MapFunction - 映射函数
@FunctionalInterface
public interface MapFunction<T, O> extends Function {
/**
* 将输入转换为输出
* @param value 输入元素
* @return 输出元素
*/
O map(T value) throws Exception;
}
使用示例:
// Lambda方式
DataStream<String> names = events.map(event -> event.getName());
// 实现类方式
DataStream<Integer> lengths = names.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
return value.length();
}
});
2.1.3 FilterFunction - 过滤函数
@FunctionalInterface
public interface FilterFunction<T> extends Function {
/**
* 过滤逻辑
* @param value 输入元素
* @return true保留,false过滤
*/
boolean filter(T value) throws Exception;
}
2.1.4 FlatMapFunction - 扁平映射函数
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function {
/**
* 将一个输入转换为0个、1个或多个输出
* @param value 输入元素
* @param out 输出收集器
*/
void flatMap(T value, Collector<O> out) throws Exception;
}
使用示例:
DataStream<String> words = sentences.flatMap((String sentence, Collector<String> out) -> {
for (String word : sentence.split(" ")) {
out.collect(word);
}
});
2.1.5 ReduceFunction - 归约函数
@FunctionalInterface
public interface ReduceFunction<T> extends Function {
/**
* 将两个元素归约为一个
* @param value1 第一个值
* @param value2 第二个值
* @return 归约结果
*/
T reduce(T value1, T value2) throws Exception;
}
2.1.6 AggregateFunction - 聚合函数
public interface AggregateFunction<IN, ACC, OUT> extends Function {
/** 创建初始累加器 */
ACC createAccumulator();
/** 将输入元素加入累加器 */
ACC add(IN value, ACC accumulator);
/** 从累加器获取最终结果 */
OUT getResult(ACC accumulator);
/** 合并两个累加器 */
ACC merge(ACC a, ACC b);
}
使用示例:
public class AverageAggregate implements AggregateFunction<Event, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return Tuple2.of(0L, 0L); // (sum, count)
}
@Override
public Tuple2<Long, Long> add(Event value, Tuple2<Long, Long> accumulator) {
return Tuple2.of(
accumulator.f0 + value.value,
accumulator.f1 + 1L
);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
}
2.2 TypeInformation - 类型系统
2.2.1 功能说明
TypeInformation是Flink类型系统的核心,用于:
- 类型推断和验证
- 序列化器创建
- 字段访问和投影
- Key提取
2.2.2 核心接口
public abstract class TypeInformation<T> implements Serializable {
/**
* 检查类型是否为基础类型
*/
public abstract boolean isBasicType();
/**
* 检查类型是否为Tuple类型
*/
public abstract boolean isTupleType();
/**
* 获取类型的泛型参数数量
*/
public abstract int getArity();
/**
* 获取类型的总字段数
*/
public abstract int getTotalFields();
/**
* 获取类型的Class
*/
public abstract Class<T> getTypeClass();
/**
* 创建序列化器
*/
public abstract TypeSerializer<T> createSerializer(ExecutionConfig config);
/**
* 检查此类型是否可以与另一个类型平等比较
*/
public abstract boolean canEqual(Object obj);
}
2.2.3 基础类型
// 基本类型
public class BasicTypeInfo<T> extends TypeInformation<T> {
public static final BasicTypeInfo<String> STRING_TYPE_INFO =
new BasicTypeInfo<>(String.class, ...);
public static final BasicTypeInfo<Boolean> BOOLEAN_TYPE_INFO =
new BasicTypeInfo<>(Boolean.class, ...);
public static final BasicTypeInfo<Byte> BYTE_TYPE_INFO =
new BasicTypeInfo<>(Byte.class, ...);
public static final BasicTypeInfo<Short> SHORT_TYPE_INFO =
new BasicTypeInfo<>(Short.class, ...);
public static final BasicTypeInfo<Integer> INT_TYPE_INFO =
new BasicTypeInfo<>(Integer.class, ...);
public static final BasicTypeInfo<Long> LONG_TYPE_INFO =
new BasicTypeInfo<>(Long.class, ...);
public static final BasicTypeInfo<Float> FLOAT_TYPE_INFO =
new BasicTypeInfo<>(Float.class, ...);
public static final BasicTypeInfo<Double> DOUBLE_TYPE_INFO =
new BasicTypeInfo<>(Double.class, ...);
}
2.2.4 复合类型
Tuple类型:
public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> {
private final TypeInformation<?>[] types;
public TypeInformation<?> getTypeAt(int pos) {
return types[pos];
}
@Override
public int getArity() {
return types.length;
}
}
POJO类型:
public class PojoTypeInfo<T> extends TypeInformation<T> {
private final PojoField[] fields;
public int getFieldIndex(String fieldName) {
for (int i = 0; i < fields.length; i++) {
if (fields[i].getField().getName().equals(fieldName)) {
return i;
}
}
return -1;
}
}
2.2.5 类型提示(Type Hints)
// 显式指定类型
DataStream<Tuple2<String, Integer>> result = stream
.map(value -> Tuple2.of(value.name, value.count))
.returns(Types.TUPLE(Types.STRING, Types.INT));
// 使用TypeHint
DataStream<MyGenericType<String>> result = stream
.map(...)
.returns(new TypeHint<MyGenericType<String>>(){});
2.3 TypeSerializer - 序列化器
2.3.1 功能说明
TypeSerializer负责:
- 序列化对象到字节流
- 从字节流反序列化对象
- 对象复制
- 提供序列化配置快照(用于状态演进)
2.3.2 核心接口
public abstract class TypeSerializer<T> implements Serializable {
/**
* 创建实例的深拷贝
*/
public abstract T copy(T from);
/**
* 创建实例的深拷贝(复用目标对象)
*/
public abstract T copy(T from, T reuse);
/**
* 创建类型的新实例
*/
public abstract T createInstance();
/**
* 序列化到DataOutputView
*/
public abstract void serialize(T record, DataOutputView target) throws IOException;
/**
* 从DataInputView反序列化
*/
public abstract T deserialize(DataInputView source) throws IOException;
/**
* 从DataInputView反序列化(复用对象)
*/
public abstract T deserialize(T reuse, DataInputView source) throws IOException;
/**
* 复制序列化的数据
*/
public abstract void copy(DataInputView source, DataOutputView target) throws IOException;
/**
* 获取序列化后的长度(-1表示可变长度)
*/
public abstract int getLength();
/**
* 创建序列化器快照(用于状态演进)
*/
public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
}
2.4 State API - 状态接口
2.4.1 State基接口
public interface State {
/** 清除当前Key的状态 */
void clear();
}
2.4.2 ValueState
public interface ValueState<T> extends State {
/** 获取当前Key的值 */
T value() throws IOException;
/** 更新当前Key的值 */
void update(T value) throws IOException;
}
2.4.3 AppendingState
public interface AppendingState<IN, OUT> extends State {
/** 获取当前Key的值 */
OUT get() throws Exception;
/** 添加值 */
void add(IN value) throws Exception;
}
2.4.4 ListState
public interface ListState<T> extends AppendingState<T, Iterable<T>> {
/** 更新状态为给定的值列表 */
void update(List<T> values) throws Exception;
/** 添加多个值 */
void addAll(List<T> values) throws Exception;
}
2.4.5 MapState
public interface MapState<UK, UV> extends State {
/** 获取指定Key的值 */
UV get(UK key) throws Exception;
/** 添加或更新Key-Value */
void put(UK key, UV value) throws Exception;
/** 批量添加 */
void putAll(Map<UK, UV> map) throws Exception;
/** 删除指定Key */
void remove(UK key) throws Exception;
/** 检查是否包含Key */
boolean contains(UK key) throws Exception;
/** 获取所有Entry */
Iterable<Map.Entry<UK, UV>> entries() throws Exception;
/** 获取所有Key */
Iterable<UK> keys() throws Exception;
/** 获取所有Value */
Iterable<UV> values() throws Exception;
/** 获取迭代器 */
Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
/** 检查是否为空 */
boolean isEmpty() throws Exception;
}
2.5 Watermark与时间
2.5.1 Watermark
public final class Watermark implements Serializable {
private final long timestamp;
/** 特殊Watermark:还未设置时间戳 */
public static final Watermark UNINITIALIZED = new Watermark(Long.MIN_VALUE);
public Watermark(long timestamp) {
this.timestamp = timestamp;
}
public long getTimestamp() {
return timestamp;
}
}
2.5.2 WatermarkStrategy
public interface WatermarkStrategy<T> extends
TimestampAssignerSupplier<T>,
WatermarkGeneratorSupplier<T> {
/** 创建时间戳分配器 */
@Override
TimestampAssigner<T> createTimestampAssigner(
TimestampAssignerSupplier.Context context);
/** 创建Watermark生成器 */
@Override
WatermarkGenerator<T> createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context);
/** 固定延迟策略 */
static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}
/** 单调递增策略 */
static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
return (ctx) -> new AscendingTimestampsWatermarks<>();
}
}
使用示例:
DataStream<Event> withWatermarks = stream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
2.6 Connector API - 连接器接口
2.6.1 Source接口(v2)
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {
/**
* 获取Source的边界性
*/
Boundedness getBoundedness();
/**
* 创建Split枚举器
*/
SplitEnumerator<SplitT, EnumChkT> createEnumerator(
SplitEnumeratorContext<SplitT> enumContext) throws Exception;
/**
* 恢复Split枚举器
*/
SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
SplitEnumeratorContext<SplitT> enumContext,
EnumChkT checkpoint) throws Exception;
/**
* 创建Reader
*/
SourceReader<T, SplitT> createReader(
SourceReaderContext readerContext) throws Exception;
/**
* 获取Split序列化器
*/
SimpleVersionedSerializer<SplitT> getSplitSerializer();
/**
* 获取枚举器检查点序列化器
*/
SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}
核心组件:
flowchart LR
Source[Source] --> Enumerator[SplitEnumerator]
Source --> Reader[SourceReader]
Enumerator --> Split[SourceSplit]
Reader --> Split
Reader --> Output[数据输出]
2.6.2 Sink接口(v2)
public interface Sink<InputT> extends Serializable {
/**
* 创建Writer
*/
SinkWriter<InputT> createWriter(InitContext context) throws IOException;
/**
* 获取提交器(两阶段提交)
*/
Optional<Committer<CommT>> createCommitter() throws IOException;
/**
* 获取全局提交器(两阶段提交)
*/
Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException;
/**
* 获取提交数据序列化器
*/
Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();
}
三、关键数据结构
3.1 Tuple类型
// Tuple基类
public abstract class Tuple implements Serializable {
public abstract int getArity();
public abstract <T> T getField(int pos);
public abstract <T> void setField(T value, int pos);
}
// Tuple2
public class Tuple2<T0, T1> extends Tuple {
public T0 f0;
public T1 f1;
public static <T0, T1> Tuple2<T0, T1> of(T0 f0, T1 f1) {
return new Tuple2<>(f0, f1);
}
}
// 使用示例
Tuple2<String, Integer> tuple = Tuple2.of("hello", 42);
String first = tuple.f0;
Integer second = tuple.f1;
3.2 Row类型
public final class Row implements Serializable {
private final RowKind kind; // INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE
private final Object[] fields;
public Object getField(int pos) {
return fields[pos];
}
public void setField(int pos, Object value) {
fields[pos] = value;
}
public int getArity() {
return fields.length;
}
}
四、最佳实践
4.1 选择合适的函数接口
// 简单转换:使用MapFunction或Lambda
DataStream<Integer> lengths = names.map(String::length);
// 需要输出多个结果:使用FlatMapFunction
DataStream<String> words = sentences.flatMap(
(String sentence, Collector<String> out) -> {
Arrays.stream(sentence.split(" ")).forEach(out::collect);
}
);
// 复杂聚合:使用AggregateFunction
DataStream<Stats> stats = events
.keyBy(Event::getKey)
.aggregate(new StatsAggregateFunction());
// 需要访问状态和定时器:使用ProcessFunction
DataStream<Result> results = events
.keyBy(Event::getKey)
.process(new MyProcessFunction());
4.2 类型信息最佳实践
// 1. 避免类型擦除
// Bad: 泛型信息丢失
DataStream<List<String>> bad = ...;
// Good: 使用TypeHint保留泛型信息
DataStream<List<String>> good = ...
.returns(new TypeHint<List<String>>(){});
// 2. POJO类型要求
public class MyPojo {
// 1) 必须是public类
// 2) 必须有public无参构造器
public MyPojo() {}
// 3) 所有字段必须是public或有public getter/setter
public String name;
private int value;
public int getValue() { return value; }
public void setValue(int value) { this.value = value; }
}
// 3. 复杂类型使用Kryo
env.getConfig().registerTypeWithKryoSerializer(
MyComplexType.class, MyKryoSerializer.class);
4.3 状态使用最佳实践
public class StatefulFunction extends KeyedProcessFunction<String, Event, Result> {
// 1. 使用transient避免序列化
private transient ValueState<Long> countState;
@Override
public void open(Configuration parameters) {
// 2. 在open()中初始化状态
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("count", Long.class, 0L);
// 3. 配置TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
descriptor.enableTimeToLive(ttlConfig);
countState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Event event, Context ctx, Collector<Result> out)
throws Exception {
// 4. 访问状态
Long count = countState.value();
count++;
countState.update(count);
out.collect(new Result(ctx.getCurrentKey(), count));
}
}
4.4 自定义序列化器
public class MyTypeSerializer extends TypeSerializer<MyType> {
@Override
public void serialize(MyType record, DataOutputView target) throws IOException {
// 写入字段
target.writeUTF(record.name);
target.writeInt(record.value);
}
@Override
public MyType deserialize(DataInputView source) throws IOException {
// 读取字段
String name = source.readUTF();
int value = source.readInt();
return new MyType(name, value);
}
@Override
public MyType copy(MyType from) {
// 深拷贝
return new MyType(from.name, from.value);
}
@Override
public TypeSerializerSnapshot<MyType> snapshotConfiguration() {
// 返回序列化器配置快照(用于状态演进)
return new MyTypeSerializerSnapshot();
}
// ... 其他方法实现
}
五、总结
核心API模块提供了Flink的基础抽象:
函数接口:
- 简单转换:MapFunction、FilterFunction
- 复杂处理:ProcessFunction
- 聚合操作:AggregateFunction、ReduceFunction
类型系统:
- 自动类型推断
- 多种类型支持(基本类型、Tuple、POJO)
- 高效序列化框架
状态API:
- 多种状态类型(ValueState、ListState、MapState)
- 状态TTL支持
- 统一的状态访问接口
连接器框架:
- 统一的Source/Sink接口
- 支持并行读写
- 灵活的检查点机制
理解核心API是掌握Flink的基础。