Flink Read CSV and Sink to Parquet File
1. Setup Environment
Prerequisites
- Apache Flink 1.16.3 installed
- JDK 11 installed
- Maven or Gradle for dependency management
- Access to a distributed filesystem like HDFS or a local filesystem for input/output paths
Maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.pkslow</groupId>
<artifactId>bigdata</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<groupId>com.pkslow.bigdata</groupId>
<artifactId>flink-csv-to-parquet</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.16.3</version>
</dependency>
<!-- Flink Table API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.16.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>1.16.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.16.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>1.16.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.16.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.16.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.16.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.12</artifactId>
<version>1.11.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.16.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.16.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>1.16.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.4.1</version>
</dependency>
</dependencies>
</project>
2. Define Table(DDL)
Use Flink’s Table API to define a source table that reads from a CSV file.
Example DDL for CSV Source
CREATE TABLE csv_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'filesystem',
'path' = 'file:///path/to/input.csv', -- Replace with your CSV path
'format' = 'csv',
'csv.ignore-parse-errors' = 'true' -- Optional: ignores malformed rows
);
Explanation
connector
: Specifies the connector type; here, it’sfilesystem
.path
: The path to your input CSV file. Usefile:///
for local filesystem or appropriate URI for distributed filesystems like HDFS.format
: Specifies the data format; set tocsv
for CSV files.csv.ignore-parse-errors
: (Optional) Ignores rows that fail to parse, preventing job failure due to bad data.
To write data to Parquet files with custom filenames, configure the sink table with specific prefix and suffix settings.
Example DDL for Parquet Sink
CREATE TABLE parquet_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'filesystem',
'path' = 'file:///path/to/output/', -- Replace with your desired output directory
'format' = 'parquet',
-- Optional: Configure rolling policies
'sink.rolling-policy.file-size' = '134217728', -- 128 MB
'sink.rolling-policy.rollover-interval' = '15 min', -- 15 minutes
'sink.rolling-policy.inactivity-interval' = '5 min' -- 5 minutes
);
Explanation
connector
: Set tofilesystem
to write to a filesystem.path
: Directory where Parquet files will be written.format
: Set toparquet
to write data in Parquet format.- Rolling Policies: These settings control when Flink rolls over to a new file based on size, time, or inactivity.
Note: Flink appends unique identifiers to the prefix and suffix to ensure filename uniqueness, resulting in filenames like pkslow-output-0-0.parquet
.
3. Execute the Data Pipeline
Combine the source and sink tables to transfer data from CSV to Parquet.
Complete Java Example
Here’s a complete Java program using Flink’s Table API to perform the desired operation:
package com.pkslow.bigdata;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class FlinkCsvToParquet {
public static void main(String[] args) {
// 1. Set up the streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // Set parallelism to 1 for single output file (optional)
// 2. Set up the Table environment
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 3. Define the source table
String sourceDDL = "CREATE TABLE csv_source (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'file:///Users/larry/IdeaProjects/pkslow-samples/bigdata/flink-csv-to-parquet/src/main/resources/input.csv',\n" + // Update this path
" 'format' = 'csv',\n" +
" 'csv.ignore-parse-errors' = 'true'\n" +
")";
tableEnv.executeSql(sourceDDL);
// 4. Define the sink table with custom filename settings
String sinkDDL = "CREATE TABLE parquet_sink (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'file:///Users/larry/IdeaProjects/pkslow-samples/bigdata/flink-csv-to-parquet/output/',\n" + // Update this path
" 'format' = 'parquet',\n" +
" 'sink.rolling-policy.file-size' = '134217728',\n" + // 128 MB
" 'sink.rolling-policy.rollover-interval' = '15 min',\n" + // 15 minutes
" 'sink.rolling-policy.inactivity-interval' = '5 min'\n" + // 5 minutes
")";
tableEnv.executeSql(sinkDDL);
// 5. Insert data from source to sink
String insertSQL = "INSERT INTO parquet_sink SELECT * FROM csv_source";
tableEnv.executeSql(insertSQL);
// Note: In Table API, executing DDL and DML statements triggers the job.
// No need to call env.execute()
}
}
Steps Explained
- Initialize Execution Environment:
- Sets up the streaming environment with a specified parallelism. Setting
parallelism
to1
ensures a single output file, but in distributed setups, multiple files will be created based on parallel instances.
- Sets up the streaming environment with a specified parallelism. Setting
- Initialize Table Environment:
- Creates a Table Environment in streaming mode to execute SQL-like DDL and DML statements.
- Define Source Table:
- Creates a table named
csv_source
that reads from a specified CSV file. Update'path'
with your actual CSV file location.
- Creates a table named
- Define Sink Table:
- Creates a table named
parquet_sink
that writes data to Parquet files in the specified output directory. Adjust the'path'
to your desired output location.
- Creates a table named
- Data Ingestion:
- Executes an
INSERT INTO
statement to transfer data fromcsv_source
toparquet_sink
. This action triggers the Flink job.
- Executes an
Running the Job
-
Package Your Application:
- Use Maven or Gradle to package your application into a JAR file.
mvn clean package
-
Submit to Flink Cluster:
- Submit the JAR to your Flink cluster using the Flink CLI.
flink run -c your.package.CsvToParquetJob path/to/your-jar.jar
Replace
your.package.CsvToParquetJob
with the actual package and class name.
4. Additional Considerations
Handling Multiple Output Files
In distributed environments with higher parallelism, multiple output files will be created. Ensure your downstream processes can handle this, or manage file consolidation as needed.
Rolling Policies
Adjusting rolling policies helps manage when Flink closes a file and starts writing to a new one. The provided settings:
file-size
: Rolls over to a new file when the current file reaches 128 MB.rollover-interval
: Rolls over every 15 minutes regardless of size.inactivity-interval
: Rolls over if no new data is written for 5 minutes.
Tune these settings based on your data ingestion rate and file size requirements.
Schema Evolution
Ensure that the schema of your CSV source matches the schema expected by the Parquet sink. Mismatched schemas can lead to runtime errors.
Error Handling
Consider adding error handling mechanisms, such as:
csv.ignore-parse-errors
: As shown, to skip malformed rows.- Logging: Implement logging to monitor job execution and catch issues early.
Testing Locally
Before deploying to a production cluster, test your job locally to ensure it behaves as expected.
5 Code
Code on GitHub: https://github.com/LarryDpk/pkslow-samples