# Parquet文件
# Parquet 文件概述
Parquet 是一种开源的 列式存储文件格式,专为大数据处理场景设计。它通过高效的编码和压缩技术,优化了数据存储和查询性能,尤其适合 OLAP(联机分析处理)类任务。
# 核心优势
列式存储
- 数据按列而非行存储,查询时仅需读取相关列,大幅减少 I/O 和内存开销。
- 示例:若查询仅需
user_id
和timestamp
,Parquet 可跳过其他列数据。
高压缩率
- 原因:同一列数据类型一致,重复值多(如枚举字段),支持高效的编码和压缩算法(如字典编码、RLE、Snappy、ZSTD)。
- 效果:压缩率通常比 CSV/JSON 高 2~10 倍,存储成本显著降低。
高效查询性能
- 列式存储结合元数据(如 Min/Max、统计信息),支持谓词下推(Predicate Pushdown),提前过滤无关数据块。
兼容性与扩展性
- 支持复杂嵌套数据结构(通过 Dremel 算法),兼容 Avro、Thrift 等数据模型。
- 跨平台(Hadoop、Spark、云服务)通用,生态工具丰富。
# 压缩率高的原因
- 列局部性:同列数据值相似,利于压缩算法发现重复模式。
- 智能编码:
- 字典编码:将重复值映射为短整数(如
"Male"=1, "Female"=2
)。 - 位打包:对整型数据紧凑存储。
- Delta 编码:存储相邻值的差值(适用于时间序列)。
- 字典编码:将重复值映射为短整数(如
- 可选压缩算法:支持 Snappy(速度优先)、GZIP(压缩率优先)、ZSTD(平衡型)等。
# 典型应用场景
大数据分析
- 场景:数据仓库(如 Hive)、交互式查询(如 Presto)、ETL 处理(如 Spark)。
- 案例:分析十亿级日志数据,仅扫描
error_code
列快速定位问题。
云数据湖
- 结合 Amazon S3、Azure Data Lake,用于低成本存储海量数据。
机器学习特征存储
- 高效读取部分特征列,加速模型训练。
# 常用集成工具与平台
类别 | 工具/平台 |
---|---|
计算引擎 | Apache Spark, Apache Hive, Presto, Impala, Flink |
云服务 | AWS Athena/Glue, Google BigQuery, Azure Synapse, Databricks |
文件系统 | HDFS, Amazon S3, Google Cloud Storage |
序列化框架 | Apache Avro, Apache Thrift(定义数据结构) |
查询优化 | Apache Arrow(内存加速), Parquet-MR(Hadoop 读写库) |
# Python处理Parquet
以下是使用 Python 操作 Parquet 文件的完整指南,包含创建、读取和切割的代码示例:
# 一、环境准备
# 安装必要库
pip install pandas pyarrow fastparquet dask
# 二、创建 Parquet 文件
# 方法 1: 使用 Pandas
import pandas as pd
import numpy as np
# 创建示例 DataFrame
data = {
'id': range(1, 6),
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [25, 30, 35, 40, 45],
'timestamp': pd.date_range('2023-01-01', periods=5)
}
df = pd.DataFrame(data)
# 写入 Parquet 文件(默认使用 snappy 压缩)
df.to_parquet('data.parquet', engine='pyarrow') # 或 engine='fastparquet'
# 方法 2: 使用 PyArrow
import pyarrow as pa
import pyarrow.parquet as pq
# 创建 PyArrow Table
table = pa.Table.from_pandas(df)
# 写入文件(指定压缩算法)
pq.write_table(table, 'data.parquet', compression='ZSTD')
# 三、读取 Parquet 文件
# 基本读取
# 读取整个文件
df = pd.read_parquet('data.parquet', engine='pyarrow')
# 只读取特定列
df = pd.read_parquet('data.parquet', columns=['id', 'name'])
# 分块读取(大型文件)
# 使用 PyArrow 分块读取
parquet_file = pq.ParquetFile('large_data.parquet')
for batch in parquet_file.iter_batches(batch_size=1000):
chunk_df = batch.to_pandas()
# 处理每个分块...
# 四、切割 Parquet 文件
# 场景 1: 按行数切割
# 将 DataFrame 分为两个文件
df1 = df.iloc[:3] # 前3行
df2 = df.iloc[3:] # 剩余行
df1.to_parquet('split_part1.parquet')
df2.to_parquet('split_part2.parquet')
# 场景 2: 按条件筛选
# 按年龄筛选
young_df = df[df['age'] < 35]
old_df = df[df['age'] >= 35]
# 写入不同文件
young_df.to_parquet('young_data.parquet')
old_df.to_parquet('old_data.parquet')
# 场景 3: 按列值分割(高效方法)
# 使用 PyArrow 直接操作(避免加载全量数据)
table = pq.read_table('data.parquet', columns=['id', 'age'])
# 过滤年龄大于30的数据
filtered_table = table.filter(pa.compute.greater(table['age'], 30))
pq.write_table(filtered_table, 'filtered_data.parquet')
# 五、处理大型文件(Dask 示例)
import dask.dataframe as dd
# 分块读取大型 Parquet
ddf = dd.read_parquet('large_data.parquet', engine='pyarrow')
# 并行筛选和写入
filtered_ddf = ddf[ddf['age'] > 30]
filtered_ddf.to_parquet('output_dir/', partition_on='age') # 按年龄分区存储
# 六、关键参数说明
参数 | 作用 | 常用值 |
---|---|---|
engine | 指定读写引擎 | 'pyarrow' , 'fastparquet' |
compression | 压缩算法 | 'snappy' , 'gzip' , 'zstd' |
columns | 指定读取列 | ['col1', 'col2'] |
partition_on | 按列分区存储(目录结构) | 'date' , 'category' |
# 七、注意事项
引擎选择:
pyarrow
:性能更好,支持最新 Parquet 格式fastparquet
:兼容性更佳,适合旧系统
压缩算法:
snappy
:快速压缩/解压(默认推荐)gzip
:高压缩率,速度较慢zstd
:平衡型(需 PyArrow>=2.0)
内存管理:
- 超过 1GB 的数据建议使用 Dask 分块处理
- 使用
pd.read_parquet(..., filters=...)
提前过滤数据
以上代码可直接在 Jupyter Notebook 或 Python 脚本中运行,根据数据规模选择合适的方法。
# Java处理Parquet
以下是使用 Java 操作 Parquet 文件的完整代码示例,包含创建、读取和切割操作:
# 一、环境依赖 (Maven)
<dependencies>
<!-- Parquet Core -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.13.1</version>
</dependency>
<!-- Avro 数据模型 -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- Hadoop Common(文件操作需要) -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.6</version>
</dependency>
</dependencies>
# 二、创建 Parquet 文件
# 步骤 1:定义 Avro Schema
// 创建 schema.avsc 文件
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "timestamp", "type": "long"}
]
}
# 步骤 2:生成 Avro 类
java -jar avro-tools-1.11.3.jar compile schema schema.avsc .
# 步骤 3:写入 Parquet 文件
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
public class ParquetWriterExample {
public static void main(String[] args) throws IOException {
Path path = new Path("users.parquet");
Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(path)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()) {
// 写入示例数据
for (int i = 1; i <= 5; i++) {
GenericRecord record = new GenericData.Record(schema);
record.put("id", i);
record.put("name", "User" + i);
record.put("age", 20 + i);
record.put("timestamp", System.currentTimeMillis());
writer.write(record);
}
}
}
}
# 三、读取 Parquet 文件
# 基本读取
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
public class ParquetReaderExample {
public static void main(String[] args) throws IOException {
Path path = new Path("users.parquet");
try (ParquetReader<GenericRecord> reader = AvroParquetReader
.<GenericRecord>builder(path)
.build()) {
GenericRecord record;
while ((record = reader.read()) != null) {
System.out.println("Read record: " + record);
}
}
}
}
# 四、切割 Parquet 文件
# 场景 1:按条件过滤切割
public class ParquetSplitter {
public static void splitByAge(Path inputPath, Path outputPath1, Path outputPath2, int ageThreshold)
throws IOException {
// 创建两个写入器
try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputPath).build();
ParquetWriter<GenericRecord> writer1 = createWriter(outputPath1);
ParquetWriter<GenericRecord> writer2 = createWriter(outputPath2)) {
GenericRecord record;
while ((record = reader.read()) != null) {
int age = (Integer) record.get("age");
if (age < ageThreshold) {
writer1.write(record);
} else {
writer2.write(record);
}
}
}
}
private static ParquetWriter<GenericRecord> createWriter(Path path) throws IOException {
return AvroParquetWriter.<GenericRecord>builder(path)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
}
public static void main(String[] args) throws IOException {
splitByAge(
new Path("users.parquet"),
new Path("young_users.parquet"),
new Path("old_users.parquet"),
25
);
}
}
# 场景 2:按行数分块切割
public class ParquetRowSplitter {
public static void splitByRowCount(Path inputPath, String outputPrefix, int chunkSize)
throws IOException {
int fileCounter = 0;
int recordCounter = 0;
ParquetWriter<GenericRecord> currentWriter = null;
try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputPath).build()) {
GenericRecord record;
while ((record = reader.read()) != null) {
if (recordCounter % chunkSize == 0) {
if (currentWriter != null) currentWriter.close();
currentWriter = createWriter(new Path(outputPrefix + fileCounter + ".parquet"));
fileCounter++;
}
currentWriter.write(record);
recordCounter++;
}
} finally {
if (currentWriter != null) currentWriter.close();
}
}
// main 方法省略...
}
# 五、高级操作
# 1. 使用投影读取部分列
// 只读取 id 和 name 列
ParquetReader<GenericRecord> reader = AvroParquetReader
.<GenericRecord>builder(path)
.withProjection(Schema.createRecord(Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.INT), "", null),
new Schema.Field("name", Schema.create(Schema.Type.STRING), "", null)
)))
.build();
# 2. 使用谓词下推过滤
// 使用 Filter API 提前过滤
FilterPredicate filter = FilterApi.gt(FilterApi.intColumn("age"), 25);
ParquetReader<GenericRecord> reader = AvroParquetReader
.<GenericRecord>builder(path)
.withFilter(filter)
.build();
# 六、性能优化建议
- 批量写入:使用
ParquetWriter
的自动缓冲机制(默认行组大小 128MB) - 压缩选择:
.withCompressionCodec(CompressionCodecName.ZSTD) // 更高压缩率
- 列式处理:优先使用投影减少 I/O
- 内存管理:处理大文件时使用分块读取(示例中的切割代码)
# 七、常见问题解决
- Schema 不匹配:确保读写使用相同的 Avro Schema
- Hadoop 依赖:若不在 Hadoop 环境运行,需添加
hadoop-common
依赖 - 版本冲突:保持 Parquet/Avro/Hadoop 版本兼容性
以上代码示例展示了 Java 中 Parquet 文件的核心操作流程,可直接用于大数据处理流水线的开发。对于超大规模数据,建议结合 Apache Spark 进行分布式处理。
# 总结
Parquet 凭借 列式存储、高压缩率、高性能查询,成为大数据生态的核心存储格式,尤其适合需要低成本存储、快速分析海量数据的场景。与 Spark、Presto 等工具深度集成,是构建数据湖和现代数仓的首选格式。