# Parquet文件

# Parquet 文件概述

Parquet 是一种开源的 列式存储文件格式,专为大数据处理场景设计。它通过高效的编码和压缩技术,优化了数据存储和查询性能,尤其适合 OLAP(联机分析处理)类任务。


# 核心优势

  1. 列式存储

    • 数据按列而非行存储,查询时仅需读取相关列,大幅减少 I/O 和内存开销。
    • 示例:若查询仅需 user_idtimestamp,Parquet 可跳过其他列数据。
  2. 高压缩率

    • 原因:同一列数据类型一致,重复值多(如枚举字段),支持高效的编码和压缩算法(如字典编码、RLE、Snappy、ZSTD)。
    • 效果:压缩率通常比 CSV/JSON 高 2~10 倍,存储成本显著降低。
  3. 高效查询性能

    • 列式存储结合元数据(如 Min/Max、统计信息),支持谓词下推(Predicate Pushdown),提前过滤无关数据块。
  4. 兼容性与扩展性

    • 支持复杂嵌套数据结构(通过 Dremel 算法),兼容 Avro、Thrift 等数据模型。
    • 跨平台(Hadoop、Spark、云服务)通用,生态工具丰富。

# 压缩率高的原因

  1. 列局部性:同列数据值相似,利于压缩算法发现重复模式。
  2. 智能编码
    • 字典编码:将重复值映射为短整数(如 "Male"=1, "Female"=2)。
    • 位打包:对整型数据紧凑存储。
    • Delta 编码:存储相邻值的差值(适用于时间序列)。
  3. 可选压缩算法:支持 Snappy(速度优先)、GZIP(压缩率优先)、ZSTD(平衡型)等。

# 典型应用场景

  1. 大数据分析

    • 场景:数据仓库(如 Hive)、交互式查询(如 Presto)、ETL 处理(如 Spark)。
    • 案例:分析十亿级日志数据,仅扫描 error_code 列快速定位问题。
  2. 云数据湖

    • 结合 Amazon S3、Azure Data Lake,用于低成本存储海量数据。
  3. 机器学习特征存储

    • 高效读取部分特征列,加速模型训练。

# 常用集成工具与平台

类别 工具/平台
计算引擎 Apache Spark, Apache Hive, Presto, Impala, Flink
云服务 AWS Athena/Glue, Google BigQuery, Azure Synapse, Databricks
文件系统 HDFS, Amazon S3, Google Cloud Storage
序列化框架 Apache Avro, Apache Thrift(定义数据结构)
查询优化 Apache Arrow(内存加速), Parquet-MR(Hadoop 读写库)

# Python处理Parquet

以下是使用 Python 操作 Parquet 文件的完整指南,包含创建、读取和切割的代码示例:


# 一、环境准备

# 安装必要库
pip install pandas pyarrow fastparquet dask

# 二、创建 Parquet 文件

# 方法 1: 使用 Pandas

import pandas as pd
import numpy as np

# 创建示例 DataFrame
data = {
    'id': range(1, 6),
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
    'age': [25, 30, 35, 40, 45],
    'timestamp': pd.date_range('2023-01-01', periods=5)
}
df = pd.DataFrame(data)

# 写入 Parquet 文件(默认使用 snappy 压缩)
df.to_parquet('data.parquet', engine='pyarrow')  # 或 engine='fastparquet'

# 方法 2: 使用 PyArrow

import pyarrow as pa
import pyarrow.parquet as pq

# 创建 PyArrow Table
table = pa.Table.from_pandas(df)

# 写入文件(指定压缩算法)
pq.write_table(table, 'data.parquet', compression='ZSTD')

# 三、读取 Parquet 文件

# 基本读取

# 读取整个文件
df = pd.read_parquet('data.parquet', engine='pyarrow')

# 只读取特定列
df = pd.read_parquet('data.parquet', columns=['id', 'name'])

# 分块读取(大型文件)

# 使用 PyArrow 分块读取
parquet_file = pq.ParquetFile('large_data.parquet')

for batch in parquet_file.iter_batches(batch_size=1000):
    chunk_df = batch.to_pandas()
    # 处理每个分块...

# 四、切割 Parquet 文件

# 场景 1: 按行数切割

# 将 DataFrame 分为两个文件
df1 = df.iloc[:3]  # 前3行
df2 = df.iloc[3:]  # 剩余行

df1.to_parquet('split_part1.parquet')
df2.to_parquet('split_part2.parquet')

# 场景 2: 按条件筛选

# 按年龄筛选
young_df = df[df['age'] < 35]
old_df = df[df['age'] >= 35]

# 写入不同文件
young_df.to_parquet('young_data.parquet')
old_df.to_parquet('old_data.parquet')

# 场景 3: 按列值分割(高效方法)

# 使用 PyArrow 直接操作(避免加载全量数据)
table = pq.read_table('data.parquet', columns=['id', 'age'])

# 过滤年龄大于30的数据
filtered_table = table.filter(pa.compute.greater(table['age'], 30))
pq.write_table(filtered_table, 'filtered_data.parquet')

# 五、处理大型文件(Dask 示例)

import dask.dataframe as dd

# 分块读取大型 Parquet
ddf = dd.read_parquet('large_data.parquet', engine='pyarrow')

# 并行筛选和写入
filtered_ddf = ddf[ddf['age'] > 30]
filtered_ddf.to_parquet('output_dir/', partition_on='age')  # 按年龄分区存储

# 六、关键参数说明

参数 作用 常用值
engine 指定读写引擎 'pyarrow', 'fastparquet'
compression 压缩算法 'snappy', 'gzip', 'zstd'
columns 指定读取列 ['col1', 'col2']
partition_on 按列分区存储(目录结构) 'date', 'category'

# 七、注意事项

  1. 引擎选择

    • pyarrow:性能更好,支持最新 Parquet 格式
    • fastparquet:兼容性更佳,适合旧系统
  2. 压缩算法

    • snappy:快速压缩/解压(默认推荐)
    • gzip:高压缩率,速度较慢
    • zstd:平衡型(需 PyArrow>=2.0)
  3. 内存管理

    • 超过 1GB 的数据建议使用 Dask 分块处理
    • 使用 pd.read_parquet(..., filters=...) 提前过滤数据

以上代码可直接在 Jupyter Notebook 或 Python 脚本中运行,根据数据规模选择合适的方法。

# Java处理Parquet

以下是使用 Java 操作 Parquet 文件的完整代码示例,包含创建、读取和切割操作:


# 一、环境依赖 (Maven)

<dependencies>
    <!-- Parquet Core -->
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.13.1</version>
    </dependency>
    
    <!-- Avro 数据模型 -->
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.11.3</version>
    </dependency>
    
    <!-- Hadoop Common(文件操作需要) -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.6</version>
    </dependency>
</dependencies>

# 二、创建 Parquet 文件

# 步骤 1:定义 Avro Schema

// 创建 schema.avsc 文件
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"},
    {"name": "timestamp", "type": "long"}
  ]
}

# 步骤 2:生成 Avro 类

java -jar avro-tools-1.11.3.jar compile schema schema.avsc .

# 步骤 3:写入 Parquet 文件

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class ParquetWriterExample {
    public static void main(String[] args) throws IOException {
        Path path = new Path("users.parquet");
        Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
        
        try (ParquetWriter<GenericRecord> writer = AvroParquetWriter
                .<GenericRecord>builder(path)
                .withSchema(schema)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .build()) {
            
            // 写入示例数据
            for (int i = 1; i <= 5; i++) {
                GenericRecord record = new GenericData.Record(schema);
                record.put("id", i);
                record.put("name", "User" + i);
                record.put("age", 20 + i);
                record.put("timestamp", System.currentTimeMillis());
                writer.write(record);
            }
        }
    }
}

# 三、读取 Parquet 文件

# 基本读取

import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;

public class ParquetReaderExample {
    public static void main(String[] args) throws IOException {
        Path path = new Path("users.parquet");
        
        try (ParquetReader<GenericRecord> reader = AvroParquetReader
                .<GenericRecord>builder(path)
                .build()) {
            
            GenericRecord record;
            while ((record = reader.read()) != null) {
                System.out.println("Read record: " + record);
            }
        }
    }
}

# 四、切割 Parquet 文件

# 场景 1:按条件过滤切割

public class ParquetSplitter {
    public static void splitByAge(Path inputPath, Path outputPath1, Path outputPath2, int ageThreshold) 
            throws IOException {
            
        // 创建两个写入器
        try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputPath).build();
             ParquetWriter<GenericRecord> writer1 = createWriter(outputPath1);
             ParquetWriter<GenericRecord> writer2 = createWriter(outputPath2)) {
            
            GenericRecord record;
            while ((record = reader.read()) != null) {
                int age = (Integer) record.get("age");
                if (age < ageThreshold) {
                    writer1.write(record);
                } else {
                    writer2.write(record);
                }
            }
        }
    }

    private static ParquetWriter<GenericRecord> createWriter(Path path) throws IOException {
        return AvroParquetWriter.<GenericRecord>builder(path)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .build();
    }

    public static void main(String[] args) throws IOException {
        splitByAge(
            new Path("users.parquet"),
            new Path("young_users.parquet"),
            new Path("old_users.parquet"),
            25
        );
    }
}

# 场景 2:按行数分块切割

public class ParquetRowSplitter {
    public static void splitByRowCount(Path inputPath, String outputPrefix, int chunkSize) 
            throws IOException {
        
        int fileCounter = 0;
        int recordCounter = 0;
        ParquetWriter<GenericRecord> currentWriter = null;
        
        try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputPath).build()) {
            
            GenericRecord record;
            while ((record = reader.read()) != null) {
                if (recordCounter % chunkSize == 0) {
                    if (currentWriter != null) currentWriter.close();
                    currentWriter = createWriter(new Path(outputPrefix + fileCounter + ".parquet"));
                    fileCounter++;
                }
                currentWriter.write(record);
                recordCounter++;
            }
        } finally {
            if (currentWriter != null) currentWriter.close();
        }
    }

    // main 方法省略...
}

# 五、高级操作

# 1. 使用投影读取部分列

// 只读取 id 和 name 列
ParquetReader<GenericRecord> reader = AvroParquetReader
    .<GenericRecord>builder(path)
    .withProjection(Schema.createRecord(Arrays.asList(
        new Schema.Field("id", Schema.create(Schema.Type.INT), "", null),
        new Schema.Field("name", Schema.create(Schema.Type.STRING), "", null)
    )))
    .build();

# 2. 使用谓词下推过滤

// 使用 Filter API 提前过滤
FilterPredicate filter = FilterApi.gt(FilterApi.intColumn("age"), 25);

ParquetReader<GenericRecord> reader = AvroParquetReader
    .<GenericRecord>builder(path)
    .withFilter(filter)
    .build();

# 六、性能优化建议

  1. 批量写入:使用 ParquetWriter 的自动缓冲机制(默认行组大小 128MB)
  2. 压缩选择
   .withCompressionCodec(CompressionCodecName.ZSTD) // 更高压缩率
  1. 列式处理:优先使用投影减少 I/O
  2. 内存管理:处理大文件时使用分块读取(示例中的切割代码)

# 七、常见问题解决

  1. Schema 不匹配:确保读写使用相同的 Avro Schema
  2. Hadoop 依赖:若不在 Hadoop 环境运行,需添加 hadoop-common 依赖
  3. 版本冲突:保持 Parquet/Avro/Hadoop 版本兼容性

以上代码示例展示了 Java 中 Parquet 文件的核心操作流程,可直接用于大数据处理流水线的开发。对于超大规模数据,建议结合 Apache Spark 进行分布式处理。


# 总结

Parquet 凭借 列式存储、高压缩率、高性能查询,成为大数据生态的核心存储格式,尤其适合需要低成本存储、快速分析海量数据的场景。与 Spark、Presto 等工具深度集成,是构建数据湖和现代数仓的首选格式。

parquet