LarryDpk
发布于 2024-12-24 / 16 阅读
0

Flink Read CSV and Sink to Parquet File

Flink Read CSV and Sink to Parquet File

1. Setup Environment

Prerequisites

  • Apache Flink 1.16.3 installed
  • JDK 11 installed
  • Maven or Gradle for dependency management
  • Access to a distributed filesystem like HDFS or a local filesystem for input/output paths

Maven

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.pkslow</groupId>
        <artifactId>bigdata</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <groupId>com.pkslow.bigdata</groupId>
    <artifactId>flink-csv-to-parquet</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.16.3</version>
        </dependency>

        <!-- Flink Table API -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>1.16.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>1.16.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.16.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>1.16.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>1.16.3</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.16.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>1.16.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.12</artifactId>
            <version>1.11.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.16.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.16.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.16.3</version>
        </dependency>



        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>1.16.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.4.1</version>
        </dependency>
    </dependencies>

</project>

2. Define Table(DDL)

Use Flink’s Table API to define a source table that reads from a CSV file.

Example DDL for CSV Source

CREATE TABLE csv_source (
    id INT,
    name STRING,
    age INT
) WITH (
    'connector' = 'filesystem',
    'path' = 'file:///path/to/input.csv',  -- Replace with your CSV path
    'format' = 'csv',
    'csv.ignore-parse-errors' = 'true'    -- Optional: ignores malformed rows
);

Explanation

  • connector: Specifies the connector type; here, it’s filesystem.
  • path: The path to your input CSV file. Use file:/// for local filesystem or appropriate URI for distributed filesystems like HDFS.
  • format: Specifies the data format; set to csv for CSV files.
  • csv.ignore-parse-errors: (Optional) Ignores rows that fail to parse, preventing job failure due to bad data.

To write data to Parquet files with custom filenames, configure the sink table with specific prefix and suffix settings.

Example DDL for Parquet Sink

CREATE TABLE parquet_sink (
    id INT,
    name STRING,
    age INT
) WITH (
    'connector' = 'filesystem',
    'path' = 'file:///path/to/output/',  -- Replace with your desired output directory
    'format' = 'parquet',
    -- Optional: Configure rolling policies
    'sink.rolling-policy.file-size' = '134217728',          -- 128 MB
    'sink.rolling-policy.rollover-interval' = '15 min',     -- 15 minutes
    'sink.rolling-policy.inactivity-interval' = '5 min'     -- 5 minutes
);

Explanation

  • connector: Set to filesystem to write to a filesystem.
  • path: Directory where Parquet files will be written.
  • format: Set to parquet to write data in Parquet format.
  • Rolling Policies: These settings control when Flink rolls over to a new file based on size, time, or inactivity.

Note: Flink appends unique identifiers to the prefix and suffix to ensure filename uniqueness, resulting in filenames like pkslow-output-0-0.parquet.


3. Execute the Data Pipeline

Combine the source and sink tables to transfer data from CSV to Parquet.

Complete Java Example

Here’s a complete Java program using Flink’s Table API to perform the desired operation:

package com.pkslow.bigdata;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvToParquet {
    public static void main(String[] args) {
// 1. Set up the streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // Set parallelism to 1 for single output file (optional)

        // 2. Set up the Table environment
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 3. Define the source table
        String sourceDDL = "CREATE TABLE csv_source (\n" +
                "    id INT,\n" +
                "    name STRING,\n" +
                "    age INT\n" +
                ") WITH (\n" +
                "    'connector' = 'filesystem',\n" +
                "    'path' = 'file:///Users/larry/IdeaProjects/pkslow-samples/bigdata/flink-csv-to-parquet/src/main/resources/input.csv',\n" +  // Update this path
                "    'format' = 'csv',\n" +
                "    'csv.ignore-parse-errors' = 'true'\n" +
                ")";

        tableEnv.executeSql(sourceDDL);

        // 4. Define the sink table with custom filename settings
        String sinkDDL = "CREATE TABLE parquet_sink (\n" +
                "    id INT,\n" +
                "    name STRING,\n" +
                "    age INT\n" +
                ") WITH (\n" +
                "    'connector' = 'filesystem',\n" +
                "    'path' = 'file:///Users/larry/IdeaProjects/pkslow-samples/bigdata/flink-csv-to-parquet/output/',\n" +  // Update this path
                "    'format' = 'parquet',\n" +
                "    'sink.rolling-policy.file-size' = '134217728',\n" +          // 128 MB
                "    'sink.rolling-policy.rollover-interval' = '15 min',\n" +     // 15 minutes
                "    'sink.rolling-policy.inactivity-interval' = '5 min'\n" +     // 5 minutes
                ")";

        tableEnv.executeSql(sinkDDL);

        // 5. Insert data from source to sink
        String insertSQL = "INSERT INTO parquet_sink SELECT * FROM csv_source";
        tableEnv.executeSql(insertSQL);

        // Note: In Table API, executing DDL and DML statements triggers the job.
        // No need to call env.execute()
    }
}

Steps Explained

  1. Initialize Execution Environment:
    • Sets up the streaming environment with a specified parallelism. Setting parallelism to 1 ensures a single output file, but in distributed setups, multiple files will be created based on parallel instances.
  2. Initialize Table Environment:
    • Creates a Table Environment in streaming mode to execute SQL-like DDL and DML statements.
  3. Define Source Table:
    • Creates a table named csv_source that reads from a specified CSV file. Update 'path' with your actual CSV file location.
  4. Define Sink Table:
    • Creates a table named parquet_sink that writes data to Parquet files in the specified output directory. Adjust the 'path' to your desired output location.
  5. Data Ingestion:
    • Executes an INSERT INTO statement to transfer data from csv_source to parquet_sink. This action triggers the Flink job.

Running the Job

  1. Package Your Application:

    • Use Maven or Gradle to package your application into a JAR file.
    mvn clean package
    
  2. Submit to Flink Cluster:

    • Submit the JAR to your Flink cluster using the Flink CLI.
    flink run -c your.package.CsvToParquetJob path/to/your-jar.jar
    

    Replace your.package.CsvToParquetJob with the actual package and class name.


4. Additional Considerations

Handling Multiple Output Files

In distributed environments with higher parallelism, multiple output files will be created. Ensure your downstream processes can handle this, or manage file consolidation as needed.

Rolling Policies

Adjusting rolling policies helps manage when Flink closes a file and starts writing to a new one. The provided settings:

  • file-size: Rolls over to a new file when the current file reaches 128 MB.
  • rollover-interval: Rolls over every 15 minutes regardless of size.
  • inactivity-interval: Rolls over if no new data is written for 5 minutes.

Tune these settings based on your data ingestion rate and file size requirements.

Schema Evolution

Ensure that the schema of your CSV source matches the schema expected by the Parquet sink. Mismatched schemas can lead to runtime errors.

Error Handling

Consider adding error handling mechanisms, such as:

  • csv.ignore-parse-errors: As shown, to skip malformed rows.
  • Logging: Implement logging to monitor job execution and catch issues early.

Testing Locally

Before deploying to a production cluster, test your job locally to ensure it behaves as expected.

5 Code

Code on GitHub: https://github.com/LarryDpk/pkslow-samples