Flink-02-核心基础与工具类(flink-core)

一、模块概览

1.1 模块职责

flink-core模块提供Flink的基础设施和工具类,是整个框架的基石。

核心职责

  • 配置管理(Configuration)
  • 文件系统抽象(FileSystem)
  • 内存管理(MemorySegment)
  • 序列化框架基础
  • 度量指标系统(Metrics)
  • 工具类和辅助功能

1.2 模块架构

flowchart TB
    subgraph "配置管理层"
        Config[Configuration]
        ConfigOptions[ConfigOptions]
        GlobalConfig[GlobalConfiguration]
    end
    
    subgraph "文件系统层"
        FS[FileSystem]
        LocalFS[LocalFileSystem]
        HDFS[HadoopFileSystem]
        S3FS[S3FileSystem]
        Path[Path]
    end
    
    subgraph "内存管理层"
        MemSegment[MemorySegment]
        MemManager[MemoryManager]
        Buffer[Buffer]
        BufferPool[BufferPool]
    end
    
    subgraph "度量系统层"
        MetricGroup[MetricGroup]
        Counter[Counter]
        Gauge[Gauge]
        Histogram[Histogram]
        Meter[Meter]
    end
    
    subgraph "工具类层"
        IOUtils[IOUtils]
        InstantiationUtil[InstantiationUtil]
        ClassUtils[ClassUtils]
        StringUtils[StringUtils]
    end
    
    Config --> ConfigOptions
    Config --> GlobalConfig
    
    FS --> LocalFS
    FS --> HDFS
    FS --> S3FS
    FS --> Path
    
    MemSegment --> MemManager
    MemSegment --> Buffer
    Buffer --> BufferPool
    
    MetricGroup --> Counter
    MetricGroup --> Gauge
    MetricGroup --> Histogram
    MetricGroup --> Meter

二、配置管理

2.1 Configuration - 配置类

2.1.1 功能说明

Configuration是Flink的核心配置类,基于键值对存储配置信息。

核心特性

  • 类型安全的配置访问
  • 支持配置继承和覆盖
  • 序列化和反序列化
  • 默认值支持

2.1.2 核心API

public class Configuration implements Serializable, Cloneable {
    
    // 内部存储
    protected final HashMap<String, Object> confData;
    
    /**
     * 设置字符串配置
     */
    public void setString(String key, String value);
    
    /**
     * 获取字符串配置
     */
    public String getString(String key, String defaultValue);
    
    /**
     * 设置整数配置
     */
    public void setInteger(String key, int value);
    
    /**
     * 获取整数配置
     */
    public int getInteger(String key, int defaultValue);
    
    /**
     * 设置布尔配置
     */
    public void setBoolean(String key, boolean value);
    
    /**
     * 获取布尔配置
     */
    public boolean getBoolean(String key, boolean defaultValue);
    
    /**
     * 设置类型安全配置
     */
    public <T> Configuration set(ConfigOption<T> option, T value);
    
    /**
     * 获取类型安全配置
     */
    public <T> T get(ConfigOption<T> option);
    
    /**
     * 添加所有配置
     */
    public void addAll(Configuration other);
    
    /**
     * 转换为Properties
     */
    public Properties toProperties();
}

使用示例

// 创建配置
Configuration config = new Configuration();

// 设置基本类型
config.setString("jobmanager.rpc.address", "localhost");
config.setInteger("jobmanager.rpc.port", 6123);
config.setBoolean("jobmanager.high-availability.enabled", true);

// 类型安全配置(推荐)
config.set(JobManagerOptions.ADDRESS, "localhost");
config.set(JobManagerOptions.PORT, 6123);
config.set(HighAvailabilityOptions.HA_MODE, "zookeeper");

// 获取配置
String address = config.getString("jobmanager.rpc.address", "localhost");
int port = config.getInteger("jobmanager.rpc.port", 6123);

// 类型安全获取
String haMode = config.get(HighAvailabilityOptions.HA_MODE);
Duration timeout = config.get(JobManagerOptions.SLOT_REQUEST_TIMEOUT);

// 配置合并
Configuration base = new Configuration();
Configuration override = new Configuration();
base.addAll(override);  // override覆盖base

2.2 ConfigOption - 配置选项

2.2.1 定义配置选项

/**
 * 配置选项定义
 */
public class ConfigOption<T> {
    
    private final String key;
    private final Class<T> clazz;
    private final T defaultValue;
    private final String description;
    
    /**
     * 创建配置选项
     */
    public static <T> ConfigOptionBuilder key(String key) {
        return new ConfigOptionBuilder(key);
    }
}

/**
 * 定义自定义配置
 */
public class MyOptions {
    
    // 字符串配置
    public static final ConfigOption<String> MY_STRING = 
        ConfigOptions.key("my.string")
            .stringType()
            .defaultValue("default")
            .withDescription("My string configuration");
    
    // 整数配置
    public static final ConfigOption<Integer> MY_INT = 
        ConfigOptions.key("my.int")
            .intType()
            .defaultValue(100)
            .withDescription("My integer configuration");
    
    // 持续时间配置
    public static final ConfigOption<Duration> MY_DURATION = 
        ConfigOptions.key("my.duration")
            .durationType()
            .defaultValue(Duration.ofSeconds(30))
            .withDescription("My duration configuration");
    
    // 内存大小配置
    public static final ConfigOption<MemorySize> MY_MEMORY = 
        ConfigOptions.key("my.memory")
            .memoryType()
            .defaultValue(MemorySize.parse("1gb"))
            .withDescription("My memory configuration");
    
    // 枚举配置
    public static final ConfigOption<MyEnum> MY_ENUM = 
        ConfigOptions.key("my.enum")
            .enumType(MyEnum.class)
            .defaultValue(MyEnum.VALUE1)
            .withDescription("My enum configuration");
}

使用示例

// 设置配置
Configuration config = new Configuration();
config.set(MyOptions.MY_STRING, "custom value");
config.set(MyOptions.MY_INT, 200);
config.set(MyOptions.MY_DURATION, Duration.ofMinutes(5));
config.set(MyOptions.MY_MEMORY, MemorySize.parse("2gb"));

// 获取配置
String str = config.get(MyOptions.MY_STRING);
int num = config.get(MyOptions.MY_INT);
Duration duration = config.get(MyOptions.MY_DURATION);
MemorySize memory = config.get(MyOptions.MY_MEMORY);

2.3 GlobalConfiguration - 全局配置

public final class GlobalConfiguration {
    
    /**
     * 加载配置文件(flink-conf.yaml)
     */
    public static Configuration loadConfiguration(String configDir);
    
    /**
     * 加载配置并合并
     */
    public static Configuration loadConfiguration(
        String configDir, 
        Configuration dynamicProperties);
}

使用示例

// 加载flink-conf.yaml
Configuration config = GlobalConfiguration.loadConfiguration("/path/to/conf");

// 加载并合并动态配置
Configuration dynamicConfig = new Configuration();
dynamicConfig.setInteger("parallelism.default", 4);

Configuration merged = GlobalConfiguration.loadConfiguration(
    "/path/to/conf", 
    dynamicConfig
);

三、文件系统抽象

3.1 FileSystem - 文件系统接口

3.1.1 功能说明

FileSystem提供统一的文件系统抽象,支持本地、HDFS、S3等多种存储。

3.1.2 核心API

public abstract class FileSystem {
    
    /**
     * 获取文件系统实例
     */
    public static FileSystem get(URI uri) throws IOException;
    
    /**
     * 获取默认文件系统
     */
    public static FileSystem getDefaultFileSystem() throws IOException;
    
    /**
     * 列出目录内容
     */
    public abstract FileStatus[] listStatus(Path path) throws IOException;
    
    /**
     * 创建文件输出流
     */
    public abstract FSDataOutputStream create(Path path, WriteMode overwrite) 
        throws IOException;
    
    /**
     * 打开文件输入流
     */
    public abstract FSDataInputStream open(Path path) throws IOException;
    
    /**
     * 删除文件或目录
     */
    public abstract boolean delete(Path path, boolean recursive) throws IOException;
    
    /**
     * 创建目录
     */
    public abstract boolean mkdirs(Path path) throws IOException;
    
    /**
     * 重命名
     */
    public abstract boolean rename(Path src, Path dst) throws IOException;
    
    /**
     * 获取文件状态
     */
    public abstract FileStatus getFileStatus(Path path) throws IOException;
}

使用示例

// 1. 本地文件系统
Path localPath = new Path("file:///tmp/data.txt");
FileSystem localFS = localPath.getFileSystem();

FSDataOutputStream out = localFS.create(localPath, WriteMode.OVERWRITE);
out.writeUTF("Hello Flink");
out.close();

FSDataInputStream in = localFS.open(localPath);
String content = in.readUTF();
in.close();

// 2. HDFS文件系统
Path hdfsPath = new Path("hdfs://namenode:9000/user/flink/data.txt");
FileSystem hdfs = hdfsPath.getFileSystem();

// 列出目录
FileStatus[] files = hdfs.listStatus(new Path("hdfs://namenode:9000/user/flink/"));
for (FileStatus file : files) {
    System.out.println(file.getPath() + ", size=" + file.getLen());
}

// 3. S3文件系统
Path s3Path = new Path("s3://my-bucket/data/file.txt");
FileSystem s3 = s3Path.getFileSystem();

// 写入S3
FSDataOutputStream s3Out = s3.create(s3Path, WriteMode.OVERWRITE);
s3Out.write("data".getBytes());
s3Out.close();

// 4. 删除文件
hdfs.delete(hdfsPath, false);

// 5. 创建目录
hdfs.mkdirs(new Path("hdfs://namenode:9000/user/flink/new-dir"));

// 6. 重命名
hdfs.rename(
    new Path("hdfs://namenode:9000/user/flink/old.txt"),
    new Path("hdfs://namenode:9000/user/flink/new.txt")
);

3.2 Path - 路径类

public class Path implements Serializable {
    
    /**
     * 构造函数
     */
    public Path(String pathString);
    public Path(String parent, String child);
    public Path(Path parent, String child);
    
    /**
     * 获取父路径
     */
    public Path getParent();
    
    /**
     * 获取文件名
     */
    public String getName();
    
    /**
     * 转换为URI
     */
    public URI toUri();
    
    /**
     * 是否为绝对路径
     */
    public boolean isAbsolute();
}

使用示例

// 创建路径
Path path1 = new Path("/user/flink/data.txt");
Path path2 = new Path("hdfs://namenode:9000/user/flink/data.txt");
Path path3 = new Path("/user/flink", "data.txt");

// 路径操作
String name = path1.getName();  // "data.txt"
Path parent = path1.getParent();  // "/user/flink"
URI uri = path2.toUri();
boolean absolute = path1.isAbsolute();  // true

四、内存管理

4.1 MemorySegment - 内存段

4.1.1 功能说明

MemorySegment是Flink内存管理的核心,提供统一的内存访问接口。

核心特性

  • 堆内和堆外内存统一抽象
  • 高效的内存读写
  • 边界检查
  • 支持批量操作

4.1.2 核心API

public abstract class MemorySegment {
    
    /**
     * 分配堆内内存段
     */
    public static MemorySegment allocateHeapMemory(int size);
    
    /**
     * 分配堆外内存段
     */
    public static MemorySegment allocateOffHeapMemory(int size);
    
    /**
     * 包装字节数组
     */
    public static MemorySegment wrap(byte[] buffer);
    
    /**
     * 读取基本类型
     */
    public abstract byte get(int index);
    public abstract void put(int index, byte b);
    public abstract int getInt(int index);
    public abstract void putInt(int index, int value);
    public abstract long getLong(int index);
    public abstract void putLong(int index, long value);
    
    /**
     * 批量操作
     */
    public abstract void get(int index, byte[] dst, int offset, int length);
    public abstract void put(int index, byte[] src, int offset, int length);
    
    /**
     * 内存拷贝
     */
    public void copyTo(int offset, MemorySegment target, int targetOffset, int numBytes);
    
    /**
     * 获取大小
     */
    public int size();
    
    /**
     * 释放内存
     */
    public abstract void free();
}

使用示例

// 1. 分配内存
MemorySegment heapSegment = MemorySegment.allocateHeapMemory(1024);
MemorySegment offHeapSegment = MemorySegment.allocateOffHeapMemory(1024);

// 2. 写入数据
heapSegment.putInt(0, 42);
heapSegment.putLong(4, 123456789L);
heapSegment.put(12, (byte) 'A');

// 3. 读取数据
int intValue = heapSegment.getInt(0);  // 42
long longValue = heapSegment.getLong(4);  // 123456789
byte byteValue = heapSegment.get(12);  // 'A'

// 4. 批量操作
byte[] data = "Hello Flink".getBytes();
heapSegment.put(0, data, 0, data.length);

byte[] readData = new byte[data.length];
heapSegment.get(0, readData, 0, data.length);

// 5. 内存拷贝
MemorySegment source = MemorySegment.allocateHeapMemory(100);
MemorySegment target = MemorySegment.allocateHeapMemory(100);
source.copyTo(0, target, 0, 100);

// 6. 释放内存
offHeapSegment.free();

4.2 Buffer - 网络缓冲区

/**
 * 网络传输缓冲区
 */
public interface Buffer {
    
    /** 获取内存段 */
    MemorySegment getMemorySegment();
    
    /** 获取缓冲区大小 */
    int getSize();
    
    /** 设置数据大小 */
    void setSize(int writerIndex);
    
    /** 是否为Buffer */
    boolean isBuffer();
    
    /** 回收缓冲区 */
    void recycleBuffer();
}

五、度量系统

5.1 MetricGroup - 度量组

5.1.1 功能说明

MetricGroup提供层次化的度量指标管理。

5.1.2 核心API

public interface MetricGroup {
    
    /**
     * 注册Counter
     */
    Counter counter(String name);
    
    /**
     * 注册Gauge
     */
    <T> void gauge(String name, Gauge<T> gauge);
    
    /**
     * 注册Histogram
     */
    void histogram(String name, Histogram histogram);
    
    /**
     * 注册Meter
     */
    Meter meter(String name, Meter meter);
    
    /**
     * 创建子组
     */
    MetricGroup addGroup(String name);
}

使用示例

// 在RichFunction中使用
public class MyMapFunction extends RichMapFunction<String, String> {
    
    private transient Counter counter;
    private transient Meter meter;
    private transient Histogram histogram;
    
    @Override
    public void open(Configuration parameters) {
        // 获取MetricGroup
        MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
        
        // 注册Counter
        counter = metricGroup.counter("myCounter");
        
        // 注册Gauge
        metricGroup.gauge("myGauge", new Gauge<Long>() {
            @Override
            public Long getValue() {
                return System.currentTimeMillis();
            }
        });
        
        // 注册Meter(需要手动创建)
        meter = metricGroup.meter("myMeter", new MeterView(60));
        
        // 注册Histogram(需要手动创建)
        histogram = metricGroup.histogram("myHistogram", 
            new DescriptiveStatisticsHistogram(1000));
        
        // 创建子组
        MetricGroup subGroup = metricGroup.addGroup("subMetrics");
        subGroup.counter("subCounter");
    }
    
    @Override
    public String map(String value) {
        counter.inc();  // 计数器+1
        meter.markEvent();  // 标记事件
        histogram.update(value.length());  // 更新直方图
        return value.toUpperCase();
    }
}

5.2 度量类型

5.2.1 Counter - 计数器

public interface Counter extends Metric {
    void inc();
    void inc(long n);
    void dec();
    void dec(long n);
    long getCount();
}

5.2.2 Gauge - 瞬时值

public interface Gauge<T> extends Metric {
    T getValue();
}

5.2.3 Histogram - 直方图

public interface Histogram extends Metric {
    void update(long value);
    long getCount();
    HistogramStatistics getStatistics();
}

5.2.4 Meter - 速率

public interface Meter extends Metric {
    void markEvent();
    void markEvent(long n);
    double getRate();
    long getCount();
}

六、工具类

6.1 InstantiationUtil - 实例化工具

public final class InstantiationUtil {
    
    /**
     * 序列化对象
     */
    public static byte[] serializeObject(Object o) throws IOException;
    
    /**
     * 反序列化对象
     */
    public static <T> T deserializeObject(byte[] bytes, ClassLoader cl) 
        throws IOException, ClassNotFoundException;
    
    /**
     * 克隆对象
     */
    public static <T> T clone(T obj, ClassLoader classLoader) 
        throws IOException, ClassNotFoundException;
    
    /**
     * 实例化类
     */
    public static <T> T instantiate(Class<T> clazz);
}

使用示例

// 序列化
MyObject obj = new MyObject("data");
byte[] bytes = InstantiationUtil.serializeObject(obj);

// 反序列化
MyObject restored = InstantiationUtil.deserializeObject(
    bytes, 
    Thread.currentThread().getContextClassLoader()
);

// 克隆
MyObject cloned = InstantiationUtil.clone(
    obj, 
    Thread.currentThread().getContextClassLoader()
);

// 实例化
MyClass instance = InstantiationUtil.instantiate(MyClass.class);

6.2 IOUtils - IO工具

public final class IOUtils {
    
    /**
     * 关闭资源(忽略异常)
     */
    public static void closeQuietly(Closeable closeable);
    
    /**
     * 关闭多个资源
     */
    public static void closeAll(Closeable... closeables) throws IOException;
    
    /**
     * 拷贝流
     */
    public static long copyBytes(InputStream in, OutputStream out) 
        throws IOException;
    
    /**
     * 读取完整字节
     */
    public static void readFully(InputStream in, byte[] b) throws IOException;
}

6.3 StringUtils - 字符串工具

public final class StringUtils {
    
    /**
     * 字符串转十六进制
     */
    public static String byteToHexString(byte[] bytes);
    
    /**
     * 十六进制转字符串
     */
    public static byte[] hexStringToByte(String hex);
    
    /**
     * 数组转字符串
     */
    public static String arrayToString(Object[] array);
    
    /**
     * 控制字符串长度
     */
    public static String abbreviate(String str, int maxWidth);
}

6.4 MathUtils - 数学工具

public final class MathUtils {
    
    /**
     * 计算2的幂次
     */
    public static boolean isPowerOf2(long value);
    
    /**
     * 向上取整到2的幂次
     */
    public static int roundUpToPowerOfTwo(int value);
    
    /**
     * 计算log2
     */
    public static int log2(int value);
}

七、最佳实践

7.1 配置管理最佳实践

// 1. 使用类型安全的ConfigOption
public class MyConfig {
    public static final ConfigOption<String> HOST = 
        ConfigOptions.key("my.host")
            .stringType()
            .defaultValue("localhost")
            .withDescription("Host address");
}

// 2. 配置验证
Configuration config = new Configuration();
String host = config.get(MyConfig.HOST);
if (host == null || host.isEmpty()) {
    throw new IllegalArgumentException("Host must be configured");
}

// 3. 配置继承
Configuration base = new Configuration();
base.set(MyConfig.HOST, "base-host");

Configuration override = new Configuration();
override.set(MyConfig.HOST, "override-host");

base.addAll(override);  // override优先级更高

7.2 文件系统使用最佳实践

// 1. 使用try-with-resources
Path path = new Path("hdfs://namenode/data.txt");
try (FSDataOutputStream out = path.getFileSystem().create(path, WriteMode.OVERWRITE)) {
    out.writeUTF("data");
} catch (IOException e) {
    // 处理异常
}

// 2. 批量操作优化
FileSystem fs = FileSystem.get(new URI("hdfs://namenode:9000"));
FileStatus[] files = fs.listStatus(new Path("/data"));

for (FileStatus file : files) {
    if (file.isFile() && file.getLen() > 0) {
        // 处理文件
    }
}

// 3. 路径规范化
Path input = new Path(inputPath);
Path absolutePath = input.makeQualified(fs.getUri(), fs.getWorkingDirectory());

7.3 度量使用最佳实践

public class MetricExample extends RichMapFunction<String, String> {
    
    private transient Counter successCounter;
    private transient Counter failureCounter;
    private transient Histogram latencyHistogram;
    
    @Override
    public void open(Configuration parameters) {
        MetricGroup metricGroup = getRuntimeContext()
            .getMetricGroup()
            .addGroup("myapp");
        
        // 分组管理度量
        successCounter = metricGroup.counter("success");
        failureCounter = metricGroup.counter("failure");
        latencyHistogram = metricGroup.histogram("latency",
            new DescriptiveStatisticsHistogram(10000));
    }
    
    @Override
    public String map(String value) {
        long startTime = System.nanoTime();
        
        try {
            String result = processValue(value);
            successCounter.inc();
            return result;
        } catch (Exception e) {
            failureCounter.inc();
            throw e;
        } finally {
            long latency = System.nanoTime() - startTime;
            latencyHistogram.update(latency / 1_000_000);  // 转换为毫秒
        }
    }
}

八、总结

flink-core模块提供了Flink的基础设施:

配置管理

  • Configuration:核心配置类
  • ConfigOption:类型安全的配置选项
  • GlobalConfiguration:全局配置加载

文件系统

  • FileSystem:统一的文件系统抽象
  • 支持本地、HDFS、S3等多种存储
  • Path:路径操作

内存管理

  • MemorySegment:统一内存访问
  • 堆内和堆外内存支持
  • Buffer:网络缓冲区

度量系统

  • MetricGroup:层次化度量管理
  • Counter、Gauge、Histogram、Meter
  • 集成到运行时上下文

工具类

  • InstantiationUtil:序列化工具
  • IOUtils:IO操作
  • StringUtils:字符串处理
  • MathUtils:数学计算

这些基础组件支撑着整个Flink框架的运行。