Flink-08-Table与SQL(flink-table)

一、模块概览

1.1 模块职责

Table API & SQL是Flink的高级批流统一API,提供声明式的数据处理能力。

核心职责

  • 提供Table API和SQL两种编程接口
  • 统一批处理和流处理语义
  • Catalog元数据管理
  • SQL解析和优化
  • 代码生成和执行

1.2 Table API架构

flowchart TB
    subgraph "API层"
        TableEnv[TableEnvironment]
        Table[Table]
        SQL[SQL]
    end
    
    subgraph "Catalog层"
        Catalog[Catalog]
        CatalogManager[CatalogManager]
        Database[Database]
        TableSchema[TableSchema]
    end
    
    subgraph "解析层"
        Parser[SQL Parser]
        Validator[Validator]
    end
    
    subgraph "优化层"
        Planner[Planner]
        Optimizer[Optimizer]
        RuleBasedOpt[基于规则优化]
        CostBasedOpt[基于代价优化]
    end
    
    subgraph "执行层"
        CodeGen[代码生成]
        Transformation[Transformation]
        DataStream[DataStream]
    end
    
    TableEnv --> Table
    TableEnv --> SQL
    TableEnv --> Catalog
    
    Catalog --> CatalogManager
    Catalog --> Database
    Catalog --> TableSchema
    
    SQL --> Parser
    Parser --> Validator
    
    Validator --> Planner
    Planner --> Optimizer
    Optimizer --> RuleBasedOpt
    Optimizer --> CostBasedOpt
    
    Optimizer --> CodeGen
    CodeGen --> Transformation
    Transformation --> DataStream

二、TableEnvironment - 执行环境

2.1 创建执行环境

2.1.1 核心API

public interface TableEnvironment {
    
    /**
     * 创建流式Table环境
     */
    static StreamTableEnvironment create(StreamExecutionEnvironment env);
    
    /**
     * 创建流式Table环境(带配置)
     */
    static StreamTableEnvironment create(
        StreamExecutionEnvironment env,
        EnvironmentSettings settings);
    
    /**
     * 创建Table环境(批流统一)
     */
    static TableEnvironment create(EnvironmentSettings settings);
}

使用示例

// 1. 流式环境(推荐)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 2. 批流统一环境
EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()  // 或 .inBatchMode()
    .build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

// 3. 自定义配置
EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    .withConfiguration(config)
    .build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

2.2 表注册与查询

2.2.1 注册表

public interface TableEnvironment {
    
    /**
     * 从DataStream创建临时视图
     */
    void createTemporaryView(String path, DataStream<?> dataStream);
    
    /**
     * 从DataStream创建临时视图(带Schema)
     */
    void createTemporaryView(String path, DataStream<?> dataStream, Schema schema);
    
    /**
     * 注册表
     */
    void createTemporaryTable(String path, TableDescriptor descriptor);
    
    /**
     * 执行SQL DDL
     */
    TableResult executeSql(String statement);
}

注册表示例

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 1. 从DataStream创建视图
DataStream<Tuple2<String, Integer>> stream = env.fromElements(
    Tuple2.of("Alice", 25),
    Tuple2.of("Bob", 30)
);

tableEnv.createTemporaryView("users", stream, 
    Schema.newBuilder()
        .column("name", DataTypes.STRING())
        .column("age", DataTypes.INT())
        .build()
);

// 2. 使用SQL DDL创建表
tableEnv.executeSql(
    "CREATE TEMPORARY TABLE orders (" +
    "  order_id STRING," +
    "  user_id STRING," +
    "  amount DOUBLE," +
    "  order_time TIMESTAMP(3)," +
    "  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'orders'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

// 3. 使用Table API创建表
tableEnv.createTemporaryTable("products",
    TableDescriptor.forConnector("filesystem")
        .schema(Schema.newBuilder()
            .column("product_id", DataTypes.STRING())
            .column("name", DataTypes.STRING())
            .column("price", DataTypes.DOUBLE())
            .build())
        .option("path", "/path/to/products.csv")
        .format("csv")
        .build()
);

2.2.2 查询表

public interface TableEnvironment {
    
    /**
     * 获取表
     */
    Table from(String path);
    
    /**
     * 执行SQL查询
     */
    Table sqlQuery(String query);
    
    /**
     * 执行SQL语句
     */
    TableResult executeSql(String statement);
}

查询示例

// 1. Table API查询
Table users = tableEnv.from("users");
Table result = users
    .select($("name"), $("age"))
    .where($("age").isGreater(25));

// 2. SQL查询
Table sqlResult = tableEnv.sqlQuery(
    "SELECT name, age FROM users WHERE age > 25"
);

// 3. 混合使用
Table orders = tableEnv.from("orders");
tableEnv.createTemporaryView("recent_orders", orders
    .where($("order_time").isGreater(currentTimestamp().minus(lit(1).hours())))
);

Table summary = tableEnv.sqlQuery(
    "SELECT user_id, COUNT(*) as order_count, SUM(amount) as total_amount " +
    "FROM recent_orders " +
    "GROUP BY user_id"
);

三、Table API操作

3.1 基础操作

3.1.1 select - 投影

// 选择列
Table result = table.select($("name"), $("age"));

// 重命名列
Table result = table.select($("name").as("user_name"), $("age"));

// 计算列
Table result = table.select(
    $("name"),
    $("age"),
    $("age").plus(1).as("next_age")
);

3.1.2 where/filter - 过滤

// 简单过滤
Table result = table.where($("age").isGreater(25));

// 复杂条件
Table result = table.where(
    $("age").isGreater(25)
        .and($("name").isNotNull())
        .and($("city").in("Beijing", "Shanghai"))
);

// filter(等同于where)
Table result = table.filter($("age").isGreaterOrEqual(18));

3.1.3 groupBy - 分组

// 分组聚合
Table result = table
    .groupBy($("city"))
    .select(
        $("city"),
        $("age").avg().as("avg_age"),
        $("*").count().as("user_count")
    );

// 多列分组
Table result = table
    .groupBy($("city"), $("gender"))
    .select(
        $("city"),
        $("gender"),
        $("salary").sum().as("total_salary")
    );

3.2 窗口操作

3.2.1 滚动窗口

// 时间滚动窗口
Table result = table
    .window(Tumble.over(lit(10).minutes()).on($("order_time")).as("w"))
    .groupBy($("user_id"), $("w"))
    .select(
        $("user_id"),
        $("w").start().as("window_start"),
        $("w").end().as("window_end"),
        $("amount").sum().as("total_amount")
    );

// 行数滚动窗口
Table result = table
    .window(Tumble.over(rowInterval(100L)).on($("proctime")).as("w"))
    .groupBy($("user_id"), $("w"))
    .select($("user_id"), $("*").count().as("order_count"));

3.2.2 滑动窗口

// 时间滑动窗口
Table result = table
    .window(Slide.over(lit(10).minutes())
                 .every(lit(5).minutes())
                 .on($("order_time"))
                 .as("w"))
    .groupBy($("user_id"), $("w"))
    .select(
        $("user_id"),
        $("w").start(),
        $("w").end(),
        $("amount").sum().as("total_amount")
    );

3.2.3 会话窗口

// 会话窗口
Table result = table
    .window(Session.withGap(lit(15).minutes()).on($("order_time")).as("w"))
    .groupBy($("user_id"), $("w"))
    .select(
        $("user_id"),
        $("w").start(),
        $("w").end(),
        $("*").count().as("event_count")
    );

3.3 Join操作

3.3.1 内连接

// Inner Join
Table orders = tableEnv.from("orders");
Table products = tableEnv.from("products");

Table result = orders
    .join(products)
    .where($("orders.product_id").isEqual($("products.product_id")))
    .select(
        $("orders.order_id"),
        $("products.name"),
        $("orders.amount")
    );

3.3.2 左连接

// Left Join
Table result = orders
    .leftOuterJoin(products, $("orders.product_id").isEqual($("products.product_id")))
    .select(
        $("orders.order_id"),
        $("products.name"),
        $("orders.amount")
    );

3.3.3 时间窗口Join

// Interval Join(流处理)
Table result = orders
    .join(shipments)
    .where(
        $("orders.order_id").isEqual($("shipments.order_id"))
        .and($("shipments.ship_time").isGreaterOrEqual($("orders.order_time")))
        .and($("shipments.ship_time").isLess($("orders.order_time").plus(lit(2).days())))
    )
    .select($("orders.order_id"), $("orders.amount"), $("shipments.ship_time"));

3.4 聚合函数

// 内置聚合函数
Table result = table
    .groupBy($("user_id"))
    .select(
        $("user_id"),
        $("amount").sum().as("total"),
        $("amount").avg().as("avg"),
        $("amount").min().as("min"),
        $("amount").max().as("max"),
        $("*").count().as("count"),
        $("amount").stddevPop().as("stddev")
    );

// DISTINCT聚合
Table result = table
    .groupBy($("user_id"))
    .select(
        $("user_id"),
        $("order_id").count().distinct().as("unique_orders")
    );

四、SQL查询

4.1 基础查询

// SELECT
Table result = tableEnv.sqlQuery(
    "SELECT name, age FROM users WHERE age > 25"
);

// JOIN
Table result = tableEnv.sqlQuery(
    "SELECT o.order_id, p.name, o.amount " +
    "FROM orders o " +
    "JOIN products p ON o.product_id = p.product_id"
);

// GROUP BY
Table result = tableEnv.sqlQuery(
    "SELECT user_id, COUNT(*) as order_count, SUM(amount) as total " +
    "FROM orders " +
    "GROUP BY user_id"
);

// HAVING
Table result = tableEnv.sqlQuery(
    "SELECT user_id, SUM(amount) as total " +
    "FROM orders " +
    "GROUP BY user_id " +
    "HAVING SUM(amount) > 1000"
);

4.2 窗口SQL

// 滚动窗口
Table result = tableEnv.sqlQuery(
    "SELECT " +
    "  user_id, " +
    "  TUMBLE_START(order_time, INTERVAL '10' MINUTE) as window_start, " +
    "  TUMBLE_END(order_time, INTERVAL '10' MINUTE) as window_end, " +
    "  SUM(amount) as total_amount " +
    "FROM orders " +
    "GROUP BY user_id, TUMBLE(order_time, INTERVAL '10' MINUTE)"
);

// 滑动窗口
Table result = tableEnv.sqlQuery(
    "SELECT " +
    "  user_id, " +
    "  HOP_START(order_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) as window_start, " +
    "  HOP_END(order_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) as window_end, " +
    "  SUM(amount) as total_amount " +
    "FROM orders " +
    "GROUP BY user_id, HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)"
);

// 会话窗口
Table result = tableEnv.sqlQuery(
    "SELECT " +
    "  user_id, " +
    "  SESSION_START(order_time, INTERVAL '15' MINUTE) as window_start, " +
    "  SESSION_END(order_time, INTERVAL '15' MINUTE) as window_end, " +
    "  COUNT(*) as event_count " +
    "FROM orders " +
    "GROUP BY user_id, SESSION(order_time, INTERVAL '15' MINUTE)"
);

4.3 时间属性

// 事件时间
tableEnv.executeSql(
    "CREATE TABLE orders (" +
    "  order_id STRING," +
    "  amount DOUBLE," +
    "  order_time TIMESTAMP(3)," +
    "  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +
    ") WITH (...)"
);

// 处理时间
tableEnv.executeSql(
    "CREATE TABLE clicks (" +
    "  user_id STRING," +
    "  url STRING," +
    "  click_time AS PROCTIME()" +  // 处理时间
    ") WITH (...)"
);

五、Catalog管理

5.1 Catalog概念

classDiagram
    class Catalog {
        +String getDefaultDatabase()
        +List~String~ listDatabases()
        +List~String~ listTables(String database)
        +CatalogTable getTable(ObjectPath path)
        +createDatabase()
        +createTable()
    }
    
    class CatalogManager {
        +Catalog getCatalog(String name)
        +void registerCatalog(String name, Catalog catalog)
        +String getCurrentCatalog()
        +String getCurrentDatabase()
    }
    
    class GenericInMemoryCatalog
    class HiveCatalog
    class JdbcCatalog
    
    Catalog <|-- GenericInMemoryCatalog
    Catalog <|-- HiveCatalog
    Catalog <|-- JdbcCatalog
    CatalogManager --> Catalog

5.2 使用Catalog

// 1. 注册Catalog
tableEnv.executeSql(
    "CREATE CATALOG my_catalog WITH (" +
    "  'type' = 'hive'," +
    "  'hive-conf-dir' = '/path/to/hive/conf'" +
    ")"
);

// 2. 使用Catalog
tableEnv.executeSql("USE CATALOG my_catalog");
tableEnv.executeSql("USE my_database");

// 3. 列出Catalog
String[] catalogs = tableEnv.listCatalogs();

// 4. 列出数据库
String[] databases = tableEnv.listDatabases();

// 5. 列出表
String[] tables = tableEnv.listTables();

// 6. 完全限定名查询
Table result = tableEnv.sqlQuery(
    "SELECT * FROM my_catalog.my_database.my_table"
);

5.3 Hive集成

// 配置Hive Catalog
tableEnv.executeSql(
    "CREATE CATALOG hive_catalog WITH (" +
    "  'type' = 'hive'," +
    "  'hive-conf-dir' = '/path/to/hive/conf'," +
    "  'hadoop-conf-dir' = '/path/to/hadoop/conf'" +
    ")"
);

tableEnv.executeSql("USE CATALOG hive_catalog");

// 读取Hive表
Table hiveTable = tableEnv.from("hive_catalog.default.my_hive_table");

// 写入Hive表
Table result = tableEnv.sqlQuery("SELECT * FROM orders WHERE amount > 100");
result.executeInsert("hive_catalog.default.high_value_orders");

六、Connector - 连接器

6.1 Kafka Connector

// 读取Kafka
tableEnv.executeSql(
    "CREATE TABLE kafka_source (" +
    "  user_id STRING," +
    "  event_type STRING," +
    "  event_time TIMESTAMP(3) METADATA FROM 'timestamp'," +
    "  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'user-events'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'properties.group.id' = 'my-group'," +
    "  'scan.startup.mode' = 'latest-offset'," +
    "  'format' = 'json'" +
    ")"
);

// 写入Kafka
tableEnv.executeSql(
    "CREATE TABLE kafka_sink (" +
    "  user_id STRING," +
    "  result STRING" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'results'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

tableEnv.executeSql("INSERT INTO kafka_sink SELECT user_id, result FROM ...");

6.2 JDBC Connector

// JDBC Source
tableEnv.executeSql(
    "CREATE TABLE jdbc_source (" +
    "  id INT," +
    "  name STRING," +
    "  age INT" +
    ") WITH (" +
    "  'connector' = 'jdbc'," +
    "  'url' = 'jdbc:mysql://localhost:3306/mydb'," +
    "  'table-name' = 'users'," +
    "  'username' = 'root'," +
    "  'password' = 'password'" +
    ")"
);

// JDBC Sink(Upsert模式)
tableEnv.executeSql(
    "CREATE TABLE jdbc_sink (" +
    "  user_id STRING PRIMARY KEY NOT ENFORCED," +
    "  total_amount DOUBLE" +
    ") WITH (" +
    "  'connector' = 'jdbc'," +
    "  'url' = 'jdbc:mysql://localhost:3306/mydb'," +
    "  'table-name' = 'user_summary'," +
    "  'username' = 'root'," +
    "  'password' = 'password'" +
    ")"
);

6.3 Filesystem Connector

// CSV文件
tableEnv.executeSql(
    "CREATE TABLE csv_table (" +
    "  user_id STRING," +
    "  name STRING," +
    "  age INT" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = '/path/to/data.csv'," +
    "  'format' = 'csv'" +
    ")"
);

// Parquet文件
tableEnv.executeSql(
    "CREATE TABLE parquet_table (" +
    "  user_id STRING," +
    "  events ARRAY<ROW<event_type STRING, event_time TIMESTAMP(3)>>" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = '/path/to/data'," +
    "  'format' = 'parquet'" +
    ")"
);

七、UDF - 用户自定义函数

7.1 Scalar Function

// 定义标量函数
public class AddFunction extends ScalarFunction {
    public Integer eval(Integer a, Integer b) {
        return a + b;
    }
}

// 注册
tableEnv.createTemporarySystemFunction("add", AddFunction.class);

// 使用
Table result = tableEnv.sqlQuery(
    "SELECT user_id, add(score1, score2) as total_score FROM scores"
);

7.2 Table Function

// 定义表函数(一行变多行)
@FunctionHint(output = @DataTypeHint("ROW<word STRING>"))
public class SplitFunction extends TableFunction<Row> {
    
    public void eval(String str) {
        for (String word : str.split(" ")) {
            collect(Row.of(word));
        }
    }
}

// 注册
tableEnv.createTemporarySystemFunction("split", SplitFunction.class);

// 使用(LATERAL TABLE)
Table result = tableEnv.sqlQuery(
    "SELECT user_id, word " +
    "FROM users, LATERAL TABLE(split(content)) AS T(word)"
);

7.3 Aggregate Function

// 定义聚合函数
public class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {
    
    @Override
    public Double getValue(WeightedAvgAccumulator acc) {
        if (acc.count == 0) {
            return null;
        }
        return acc.sum / acc.count;
    }
    
    @Override
    public WeightedAvgAccumulator createAccumulator() {
        return new WeightedAvgAccumulator();
    }
    
    public void accumulate(WeightedAvgAccumulator acc, Double value, Integer weight) {
        acc.sum += value * weight;
        acc.count += weight;
    }
}

public class WeightedAvgAccumulator {
    public double sum = 0;
    public int count = 0;
}

// 注册和使用
tableEnv.createTemporarySystemFunction("weighted_avg", WeightedAvg.class);

Table result = tableEnv.sqlQuery(
    "SELECT category, weighted_avg(price, quantity) as avg_price " +
    "FROM products " +
    "GROUP BY category"
);

八、输出结果

8.1 executeInsert - 写入表

// 插入到已注册的表
Table result = tableEnv.sqlQuery("SELECT * FROM orders WHERE amount > 100");
result.executeInsert("high_value_orders");

// SQL INSERT
tableEnv.executeSql(
    "INSERT INTO high_value_orders " +
    "SELECT * FROM orders WHERE amount > 100"
);

8.2 转换为DataStream

// Table转DataStream
Table table = tableEnv.from("orders");
DataStream<Row> stream = tableEnv.toDataStream(table);

// 指定类型
DataStream<Order> typedStream = tableEnv.toDataStream(table, Order.class);

// Changelog Stream(流处理)
Table aggResult = tableEnv.sqlQuery(
    "SELECT user_id, SUM(amount) as total FROM orders GROUP BY user_id"
);

DataStream<Row> changelogStream = tableEnv.toChangelogStream(aggResult);

8.3 collect - 收集结果

// 收集结果到客户端(小数据量)
Table result = tableEnv.sqlQuery("SELECT * FROM users LIMIT 10");

try (CloseableIterator<Row> it = result.execute().collect()) {
    while (it.hasNext()) {
        Row row = it.next();
        System.out.println(row);
    }
}

九、最佳实践

9.1 性能优化

// 1. 使用Mini-Batch聚合
tableEnv.getConfig().set(
    ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
tableEnv.getConfig().set(
    ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5));
tableEnv.getConfig().set(
    ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);

// 2. 启用Local-Global聚合
tableEnv.getConfig().set(
    OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");

// 3. 设置并行度
tableEnv.getConfig().set(
    ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);

// 4. 启用对象重用
tableEnv.getConfig().set(
    ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY, MemorySize.parse("256mb"));

9.2 状态清理

// 设置状态TTL
tableEnv.getConfig().set(
    ExecutionConfigOptions.IDLE_STATE_RETENTION, Duration.ofHours(24));

// 或使用SQL Hint
Table result = tableEnv.sqlQuery(
    "SELECT /*+ STATE_TTL('orders' = '24h') */ " +
    "  user_id, COUNT(*) as cnt " +
    "FROM orders " +
    "GROUP BY user_id"
);

9.3 调试和监控

// 打印执行计划
Table result = tableEnv.sqlQuery("SELECT * FROM orders");
System.out.println(result.explain());

// 打印优化后的执行计划
System.out.println(result.explain(ExplainDetail.JSON_EXECUTION_PLAN));

十、总结

Table API & SQL提供了强大的声明式数据处理能力:

核心组件

  • TableEnvironment:执行环境和配置
  • Table:表抽象和操作
  • Catalog:元数据管理
  • Connector:数据源和目标

主要特性

  • 批流统一语义
  • 丰富的SQL支持
  • 窗口操作
  • Join和聚合
  • UDF扩展

优化能力

  • 规则优化(投影下推、谓词下推等)
  • 代价优化(Join顺序优化等)
  • 代码生成(运行时性能)
  • Mini-Batch和Local-Global优化

最佳实践

  • 合理设置并行度
  • 启用Mini-Batch优化
  • 配置状态TTL
  • 使用Changelog Stream处理更新

Table API & SQL是Flink高级开发的首选方式。