Flink-04-DataStream API编程指南(flink-streaming-java)
一、模块概览
1.1 模块职责
DataStream API是Flink流处理的核心用户API,提供高级抽象来构建流式数据处理应用。
核心职责:
- 提供流式数据处理抽象(DataStream、KeyedStream)
- 支持丰富的转换操作(map、filter、window等)
- 管理执行环境和作业配置
- 生成StreamGraph(作业拓扑)
- 支持事件时间和处理时间语义
1.2 DataStream API架构
flowchart TB
subgraph "执行环境层"
Env[StreamExecutionEnvironment]
LocalEnv[LocalStreamEnvironment]
RemoteEnv[RemoteStreamEnvironment]
end
subgraph "数据流抽象层"
DataStream[DataStream]
SingleOutput[SingleOutputStreamOperator]
KeyedStream[KeyedStream]
WindowedStream[WindowedStream]
ConnectedStream[ConnectedStreams]
BroadcastStream[BroadcastStream]
end
subgraph "转换操作层"
Transformation[Transformation]
SourceTransformation[SourceTransformation]
OneInputTransformation[OneInputTransformation]
TwoInputTransformation[TwoInputTransformation]
PartitionTransformation[PartitionTransformation]
end
subgraph "图生成层"
StreamGraphGenerator[StreamGraphGenerator]
StreamGraph[StreamGraph]
StreamNode[StreamNode]
StreamEdge[StreamEdge]
end
Env --> LocalEnv
Env --> RemoteEnv
Env --> DataStream
DataStream --> SingleOutput
DataStream --> KeyedStream
KeyedStream --> WindowedStream
DataStream --> ConnectedStream
DataStream --> BroadcastStream
DataStream -->|内部持有| Transformation
Transformation --> SourceTransformation
Transformation --> OneInputTransformation
Transformation --> TwoInputTransformation
Transformation --> PartitionTransformation
Env -->|execute()| StreamGraphGenerator
StreamGraphGenerator --> StreamGraph
StreamGraph --> StreamNode
StreamGraph --> StreamEdge
二、StreamExecutionEnvironment - 执行环境
2.1 功能说明
StreamExecutionEnvironment是流处理程序的入口,负责:
- 创建数据源(Source)
- 配置执行参数(并行度、检查点等)
- 管理Transformation链
- 触发作业执行
2.2 核心API
2.2.1 创建执行环境
public class StreamExecutionEnvironment {
/**
* 创建执行环境(自动检测运行环境)
*/
public static StreamExecutionEnvironment getExecutionEnvironment() {
return getExecutionEnvironment(new Configuration());
}
/**
* 创建本地执行环境
* @param parallelism 并行度
*/
public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
Configuration configuration = new Configuration();
return createLocalEnvironment(parallelism, configuration);
}
/**
* 创建远程执行环境
* @param host JobManager主机
* @param port JobManager端口
* @param jarFiles 用户JAR文件
*/
public static StreamExecutionEnvironment createRemoteEnvironment(
String host, int port, String... jarFiles) {
return new RemoteStreamEnvironment(host, port, null, jarFiles);
}
}
使用示例:
// 1. 自动检测环境(推荐)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 本地环境(开发测试)
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(4);
// 3. 远程环境(显式指定集群)
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment(
"localhost", 8081, "/path/to/my.jar");
2.2.2 配置执行参数
public class StreamExecutionEnvironment {
// 设置全局并行度
public StreamExecutionEnvironment setParallelism(int parallelism);
// 设置最大并行度(状态KeyGroup数量)
public StreamExecutionEnvironment setMaxParallelism(int maxParallelism);
// 设置缓冲超时时间(毫秒)
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis);
// 启用检查点
public StreamExecutionEnvironment enableCheckpointing(long interval);
// 设置重启策略
public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration strategy);
// 设置状态后端
public void setStateBackend(StateBackend backend);
}
配置示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 并行度配置
env.setParallelism(4);
env.setMaxParallelism(128);
// 检查点配置
env.enableCheckpointing(60000); // 60秒
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMinPauseBetweenCheckpoints(30000);
config.setCheckpointTimeout(600000);
// 重启策略
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
3, // 重启次数
Time.seconds(10) // 重启间隔
)
);
// 状态后端
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");
2.2.3 创建数据源
public class StreamExecutionEnvironment {
/**
* 从集合创建数据流
*/
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data);
/**
* 从元素创建数据流
*/
public <OUT> DataStreamSource<OUT> fromElements(OUT... data);
/**
* 从Source创建数据流(新API)
*/
public <OUT> DataStreamSource<OUT> fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName);
/**
* 从SourceFunction创建数据流(旧API)
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function);
/**
* 从文件创建数据流
*/
public DataStreamSource<String> readTextFile(String filePath);
/**
* 从Socket创建数据流
*/
public DataStreamSource<String> socketTextStream(String hostname, int port);
}
数据源示例:
// 1. 从集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
DataStream<Integer> stream = env.fromCollection(numbers);
// 2. 从元素
DataStream<String> stream = env.fromElements("hello", "world");
// 3. 从Source(推荐)
DataStream<String> kafkaStream = env.fromSource(
KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("my-topic")
.setGroupId("my-group")
.setValueOnlyDeserializer(new SimpleStringSchema())
.build(),
WatermarkStrategy.noWatermarks(),
"Kafka Source"
);
// 4. 从文件
DataStream<String> fileStream = env.readTextFile("input.txt");
// 5. 从Socket(测试用)
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
// 6. 自定义SourceFunction
DataStream<Long> customStream = env.addSource(new SourceFunction<Long>() {
private volatile boolean isRunning = true;
private long count = 0;
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (isRunning) {
ctx.collect(count++);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
2.2.4 执行作业
public class StreamExecutionEnvironment {
/**
* 同步执行(阻塞直到作业完成)
*/
public JobExecutionResult execute(String jobName) throws Exception;
/**
* 异步执行(立即返回JobClient)
*/
public JobClient executeAsync(String jobName) throws Exception;
/**
* 执行StreamGraph
*/
public JobClient executeAsync(StreamGraph streamGraph) throws Exception;
}
执行示例:
// 1. 同步执行(适合批处理)
JobExecutionResult result = env.execute("My Job");
System.out.println("Job took " + result.getNetRuntime() + " ms");
// 2. 异步执行(适合流处理)
JobClient jobClient = env.executeAsync("My Streaming Job");
System.out.println("Job submitted with ID: " + jobClient.getJobID());
// 监控作业状态
jobClient.getJobStatus().thenAccept(status -> {
System.out.println("Job status: " + status);
});
三、DataStream - 数据流抽象
3.1 DataStream基类
public class DataStream<T> {
protected final StreamExecutionEnvironment environment;
protected final Transformation<T> transformation;
/**
* 获取类型信息
*/
public TypeInformation<T> getType();
/**
* 获取执行环境
*/
public StreamExecutionEnvironment getExecutionEnvironment();
/**
* 设置并行度
*/
public DataStream<T> setParallelism(int parallelism);
/**
* 设置最大并行度
*/
public DataStream<T> setMaxParallelism(int maxParallelism);
}
3.2 基础转换操作
3.2.1 map - 一对一转换
public class DataStream<T> {
/**
* Map转换
*/
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper);
}
使用示例:
// Lambda方式
DataStream<String> input = env.fromElements("1", "2", "3");
DataStream<Integer> output = input.map(Integer::parseInt);
// 实现类方式
DataStream<User> users = events.map(new MapFunction<Event, User>() {
@Override
public User map(Event event) {
return new User(event.getUserId(), event.getUserName());
}
});
// RichMapFunction(需要访问运行时上下文)
DataStream<String> enriched = events.map(new RichMapFunction<Event, String>() {
private transient ValueState<Long> countState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("count", Long.class, 0L);
countState = getRuntimeContext().getState(descriptor);
}
@Override
public String map(Event event) throws Exception {
Long count = countState.value();
count++;
countState.update(count);
return event.toString() + ", count=" + count;
}
});
3.2.2 flatMap - 一对多转换
public class DataStream<T> {
/**
* FlatMap转换
*/
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper);
}
使用示例:
// 分词
DataStream<String> sentences = env.fromElements(
"hello world",
"flink streaming"
);
DataStream<String> words = sentences.flatMap(
(String sentence, Collector<String> out) -> {
for (String word : sentence.split(" ")) {
out.collect(word);
}
}
).returns(Types.STRING);
// 过滤并转换
DataStream<Event> events = ...;
DataStream<Alert> alerts = events.flatMap(
new FlatMapFunction<Event, Alert>() {
@Override
public void flatMap(Event event, Collector<Alert> out) {
if (event.isAnomaly()) {
out.collect(new Alert(event, "Anomaly detected"));
}
}
}
);
3.2.3 filter - 过滤
public class DataStream<T> {
/**
* Filter转换
*/
public SingleOutputStreamOperator<T> filter(FilterFunction<T> filter);
}
使用示例:
// 简单过滤
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
DataStream<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);
// 复杂过滤
DataStream<Event> events = ...;
DataStream<Event> importantEvents = events.filter(
new FilterFunction<Event>() {
@Override
public boolean filter(Event event) {
return event.getPriority() >= 5 &&
event.getType().equals("CRITICAL");
}
}
);
3.2.4 keyBy - 分区
public class DataStream<T> {
/**
* KeyBy转换(返回KeyedStream)
*/
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key);
/**
* KeyBy字段名(仅支持POJO和Tuple)
*/
public KeyedStream<T, Tuple> keyBy(String... fields);
}
使用示例:
// 按字段keyBy
DataStream<Event> events = ...;
KeyedStream<Event, String> keyedByUser = events.keyBy(Event::getUserId);
// 按多个字段
KeyedStream<Event, Tuple> keyedByUserAndType = events.keyBy(
event -> Tuple2.of(event.getUserId(), event.getType())
);
// POJO字段名
DataStream<Tuple2<String, Integer>> tuples = ...;
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = tuples.keyBy("f0");
3.3 聚合操作
public class KeyedStream<T, KEY> extends DataStream<T> {
/** 归约 */
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer);
/** 聚合 */
public <ACC, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, R> aggregateFunction);
/** 求和(仅Tuple和POJO) */
public SingleOutputStreamOperator<T> sum(int positionToSum);
public SingleOutputStreamOperator<T> sum(String field);
/** 最小值 */
public SingleOutputStreamOperator<T> min(int positionToMin);
public SingleOutputStreamOperator<T> min(String field);
/** 最大值 */
public SingleOutputStreamOperator<T> max(int positionToMax);
public SingleOutputStreamOperator<T> max(String field);
}
聚合示例:
// Reduce
DataStream<Event> events = ...;
DataStream<Event> maxValues = events
.keyBy(Event::getUserId)
.reduce((e1, e2) -> e1.getValue() > e2.getValue() ? e1 : e2);
// Aggregate
DataStream<Double> averages = events
.keyBy(Event::getUserId)
.aggregate(new AverageAggregate());
// Sum(Tuple)
DataStream<Tuple2<String, Integer>> wordCounts = ...;
DataStream<Tuple2<String, Integer>> totals = wordCounts
.keyBy(0) // keyBy第一个字段
.sum(1); // sum第二个字段
3.4 窗口操作
3.4.1 窗口类型
public class KeyedStream<T, KEY> extends DataStream<T> {
/** 滚动时间窗口 */
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size);
/** 滑动时间窗口 */
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide);
/** 滚动计数窗口 */
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size);
/** 滑动计数窗口 */
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide);
/** 会话窗口 */
public WindowedStream<T, KEY, TimeWindow> window(
SessionWindowAssigner.withGap(Time sessionGap));
/** 自定义窗口 */
public <W extends Window> WindowedStream<T, KEY, W> window(
WindowAssigner<? super T, W> assigner);
}
窗口示例:
// 1. 滚动时间窗口(5分钟)
DataStream<Event> events = ...;
DataStream<Long> counts = events
.keyBy(Event::getUserId)
.timeWindow(Time.minutes(5))
.reduce((e1, e2) -> new Event(e1.count + e2.count));
// 2. 滑动时间窗口(10分钟窗口,5分钟滑动)
DataStream<Double> averages = events
.keyBy(Event::getUserId)
.timeWindow(Time.minutes(10), Time.minutes(5))
.aggregate(new AverageAggregate());
// 3. 计数窗口
DataStream<Long> countWindows = events
.keyBy(Event::getUserId)
.countWindow(100) // 每100个元素一个窗口
.reduce((e1, e2) -> new Event(e1.count + e2.count));
// 4. 会话窗口(15分钟gap)
DataStream<SessionStats> sessions = events
.keyBy(Event::getUserId)
.window(SessionWindows.withGap(Time.minutes(15)))
.process(new SessionProcessFunction());
3.4.2 窗口函数
public class WindowedStream<T, K, W extends Window> {
/** Reduce函数 */
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function);
/** Aggregate函数 */
public <ACC, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, R> aggregateFunction);
/** Process函数(最灵活) */
public <R> SingleOutputStreamOperator<R> process(
ProcessWindowFunction<T, R, K, W> processFunction);
/** Apply函数(已废弃,用process替代) */
public <R> SingleOutputStreamOperator<R> apply(
WindowFunction<T, R, K, W> function);
}
窗口函数示例:
// ProcessWindowFunction(全量窗口数据)
DataStream<String> result = events
.keyBy(Event::getUserId)
.timeWindow(Time.minutes(5))
.process(new ProcessWindowFunction<Event, String, String, TimeWindow>() {
@Override
public void process(
String key,
Context context,
Iterable<Event> elements,
Collector<String> out) {
long count = 0;
for (Event event : elements) {
count++;
}
out.collect(String.format(
"Window [%d - %d] for user %s: %d events",
context.window().getStart(),
context.window().getEnd(),
key,
count
));
}
});
// Aggregate + Process(增量聚合 + 窗口元数据)
DataStream<Result> optimized = events
.keyBy(Event::getUserId)
.timeWindow(Time.minutes(5))
.aggregate(
new AverageAggregate(), // 增量聚合
new ProcessWindowFunction<Double, Result, String, TimeWindow>() {
@Override
public void process(
String key,
Context ctx,
Iterable<Double> elements,
Collector<Result> out) {
Double avg = elements.iterator().next();
out.collect(new Result(key, avg, ctx.window()));
}
}
);
3.5 多流操作
3.5.1 union - 合并流
public class DataStream<T> {
/**
* Union多个流(类型必须相同)
*/
public DataStream<T> union(DataStream<T>... streams);
}
Union示例:
DataStream<Event> stream1 = ...;
DataStream<Event> stream2 = ...;
DataStream<Event> stream3 = ...;
DataStream<Event> combined = stream1.union(stream2, stream3);
3.5.2 connect - 连接流
public class DataStream<T> {
/**
* Connect两个流(类型可以不同)
*/
public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream);
}
public class ConnectedStreams<IN1, IN2> {
/**
* 使用CoMapFunction处理
*/
public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper);
/**
* 使用CoFlatMapFunction处理
*/
public <R> SingleOutputStreamOperator<R> flatMap(
CoFlatMapFunction<IN1, IN2, R> coFlatMapper);
/**
* 使用CoProcessFunction处理(最灵活)
*/
public <R> SingleOutputStreamOperator<R> process(
CoProcessFunction<IN1, IN2, R> coProcessFunction);
}
Connect示例:
// 连接两个不同类型的流
DataStream<Event> events = ...;
DataStream<Rule> rules = ...;
DataStream<Alert> alerts = events
.connect(rules)
.keyBy(Event::getUserId, Rule::getUserId)
.process(new CoProcessFunction<Event, Rule, Alert>() {
private ValueState<Rule> ruleState;
@Override
public void open(Configuration parameters) {
ruleState = getRuntimeContext().getState(
new ValueStateDescriptor<>("rule", Rule.class));
}
@Override
public void processElement1(Event event, Context ctx, Collector<Alert> out)
throws Exception {
// 处理Event流
Rule rule = ruleState.value();
if (rule != null && event.violates(rule)) {
out.collect(new Alert(event, rule));
}
}
@Override
public void processElement2(Rule rule, Context ctx, Collector<Alert> out)
throws Exception {
// 处理Rule流
ruleState.update(rule);
}
});
3.5.3 split/side output - 分流
// 旧API(已废弃):split
DataStream<Event> events = ...;
SplitStream<Event> split = events.split(new OutputSelector<Event>() {
@Override
public Iterable<String> select(Event event) {
return event.isImportant()
? Collections.singletonList("important")
: Collections.singletonList("normal");
}
});
// 新API(推荐):side output
final OutputTag<Event> importantTag = new OutputTag<Event>("important"){};
final OutputTag<Event> normalTag = new OutputTag<Event>("normal"){};
SingleOutputStreamOperator<Event> mainStream = events.process(
new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event event, Context ctx, Collector<Event> out) {
if (event.isImportant()) {
ctx.output(importantTag, event);
} else {
ctx.output(normalTag, event);
}
}
}
);
DataStream<Event> importantEvents = mainStream.getSideOutput(importantTag);
DataStream<Event> normalEvents = mainStream.getSideOutput(normalTag);
3.6 高级操作
3.6.1 process - 处理函数
public class DataStream<T> {
/**
* Process函数(最底层API)
*/
public <R> SingleOutputStreamOperator<R> process(
ProcessFunction<T, R> processFunction);
}
public class KeyedStream<T, KEY> extends DataStream<T> {
/**
* KeyedProcessFunction
*/
public <R> SingleOutputStreamOperator<R> process(
KeyedProcessFunction<KEY, T, R> keyedProcessFunction);
}
Process函数示例:
// 带定时器的ProcessFunction
DataStream<Alert> alerts = events
.keyBy(Event::getUserId)
.process(new KeyedProcessFunction<String, Event, Alert>() {
private ValueState<Long> countState;
private ValueState<Long> timerState;
@Override
public void open(Configuration parameters) {
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class, 0L));
timerState = getRuntimeContext().getState(
new ValueStateDescriptor<>("timer", Long.class));
}
@Override
public void processElement(Event event, Context ctx, Collector<Alert> out)
throws Exception {
// 更新计数
Long count = countState.value();
count++;
countState.update(count);
// 注册定时器(5秒后触发)
Long timer = timerState.value();
if (timer == null) {
long fireTime = ctx.timerService().currentProcessingTime() + 5000;
ctx.timerService().registerProcessingTimeTimer(fireTime);
timerState.update(fireTime);
}
// 达到阈值立即输出
if (count >= 10) {
out.collect(new Alert(ctx.getCurrentKey(), count));
countState.clear();
if (timer != null) {
ctx.timerService().deleteProcessingTimeTimer(timer);
timerState.clear();
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out)
throws Exception {
// 定时器触发
Long count = countState.value();
if (count > 0) {
out.collect(new Alert(ctx.getCurrentKey(), count));
}
countState.clear();
timerState.clear();
}
});
3.6.2 async I/O - 异步IO
// AsyncDataStream工具类
DataStream<String> enriched = AsyncDataStream.unorderedWait(
events,
new AsyncFunction<Event, String>() {
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) {
client = new DatabaseClient();
}
@Override
public void asyncInvoke(Event event, ResultFuture<String> resultFuture) {
// 异步查询数据库
CompletableFuture<String> future = client.asyncQuery(event.getUserId());
future.whenComplete((result, throwable) -> {
if (throwable != null) {
resultFuture.completeExceptionally(throwable);
} else {
resultFuture.complete(Collections.singletonList(result));
}
});
}
@Override
public void close() {
client.close();
}
},
60000, // 超时时间(毫秒)
TimeUnit.MILLISECONDS
);
四、StreamGraph生成
4.1 StreamGraph结构
classDiagram
class StreamGraph {
+Map~Integer, StreamNode~ streamNodes
+Set~StreamEdge~ edges
+addSource()
+addOperator()
+addEdge()
}
class StreamNode {
+int id
+String operatorName
+StreamOperatorFactory operatorFactory
+int parallelism
+TypeInformation outputType
}
class StreamEdge {
+int sourceId
+int targetId
+StreamPartitioner partitioner
+OutputTag outputTag
}
StreamGraph --> StreamNode
StreamGraph --> StreamEdge
StreamNode --> StreamOperatorFactory
StreamEdge --> StreamPartitioner
4.2 生成过程
// 用户代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env
.fromElements("a", "b", "c")
.map(String::toUpperCase)
.filter(s -> s.length() > 0)
.keyBy(s -> s)
.sum(0);
// execute()触发StreamGraph生成
env.execute("My Job");
// 内部实现
public JobExecutionResult execute(String jobName) throws Exception {
// 1. 生成StreamGraph
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
// 2. 获取Pipeline执行器
PipelineExecutor executor = getPipelineExecutor();
// 3. 执行(会进一步转换为JobGraph)
CompletableFuture<JobClient> jobClientFuture =
executor.execute(streamGraph, configuration, userClassloader);
return jobClientFuture.get().getJobExecutionResult().get();
}
五、最佳实践
5.1 并行度设置
// 1. 全局并行度(作用于所有算子)
env.setParallelism(4);
// 2. 算子级并行度(优先级最高)
DataStream<String> result = stream
.map(String::toUpperCase).setParallelism(8)
.filter(s -> s.length() > 0).setParallelism(4);
// 3. Source和Sink的并行度
DataStream<String> source = env
.fromElements("a", "b", "c")
.setParallelism(1); // Source通常设置为1
result.print().setParallelism(1); // Sink设置为1避免输出混乱
5.2 状态管理
// 使用RichFunction访问状态
DataStream<String> result = stream
.keyBy(Event::getUserId)
.map(new RichMapFunction<Event, String>() {
// 使用transient避免序列化
private transient ValueState<Long> state;
@Override
public void open(Configuration parameters) {
// 在open()中初始化状态
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("myState", Long.class, 0L);
state = getRuntimeContext().getState(descriptor);
}
@Override
public String map(Event event) throws Exception {
Long value = state.value();
value++;
state.update(value);
return event.toString() + ", count=" + value;
}
});
5.3 性能优化
// 1. 禁用Operator Chain(调试用)
env.disableOperatorChaining();
// 2. 为特定算子禁用Chain
stream.map(...).disableChaining();
// 3. 开始新Chain
stream.map(...).startNewChain();
// 4. 设置Slot Sharing Group
stream.map(...).slotSharingGroup("group1");
// 5. 设置缓冲超时(降低延迟,但会影响吞吐量)
env.setBufferTimeout(100); // 100ms
5.4 Watermark策略
// 1. 固定延迟Watermark
WatermarkStrategy<Event> strategy1 = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
DataStream<Event> stream1 = env
.fromSource(source, strategy1, "Source");
// 2. 单调递增Watermark
WatermarkStrategy<Event> strategy2 = WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
// 3. 自定义Watermark生成器
WatermarkStrategy<Event> strategy3 = WatermarkStrategy
.forGenerator(ctx -> new CustomWatermarkGenerator())
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
六、总结
DataStream API是Flink流处理的核心:
核心组件:
- StreamExecutionEnvironment:执行环境和配置
- DataStream:数据流抽象
- KeyedStream:分区后的数据流
- WindowedStream:窗口化的数据流
转换操作:
- 基础:map、flatMap、filter
- 聚合:reduce、aggregate、sum/min/max
- 分区:keyBy、rebalance、shuffle
- 窗口:timeWindow、countWindow、session
- 多流:union、connect、join
高级特性:
- ProcessFunction:底层API,完全控制
- 异步I/O:高效外部交互
- 状态管理:Keyed State和Operator State
- 定时器:事件时间和处理时间
最佳实践:
- 合理设置并行度
- 正确使用状态
- 选择合适的Watermark策略
- 优化Operator Chain
理解DataStream API对于开发Flink流处理应用至关重要。