LarryDpk
发布于 2024-12-25 / 35 阅读
0

Flink Sink to ClickHouse with Table API with Connector

Flink Sink to ClickHouse with Table API with Connector

Get the code:


git clone git@github.com:itinycheng/flink-connector-clickhouse.git

Update the maven plugin version:


<maven-shade-plugin.version>3.6.0</maven-shade-plugin.version>


mvn clean install

Add connector lib to project


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.pkslow</groupId>
        <artifactId>bigdata</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <groupId>com.pkslow.bigdata</groupId>
    <artifactId>flink-clickhouse-table-api-with-connector</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.16.3</flink.version>
    </properties>

    <dependencies>
<!--        Flink Base-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

<!--        Flink Table API-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>

<!--        Filesystem and csv-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>

<!--        ExecutorFactory-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

<!--        JDBC-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- ClickHouse JDBC Driver -->
        <dependency>
            <groupId>com.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.7.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-clickhouse</artifactId>
            <version>1.16.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.5.11</version>
        </dependency>


    </dependencies>

</project>

Java code:

package com.pkslow.bigdata;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvToClickHouseWithConnectorMain {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String sourceDDL = "CREATE TABLE csv_source (\n" +
                "    id INT,\n" +
                "    name STRING,\n" +
                "    age INT\n" +
                ") WITH (\n" +
                "    'connector' = 'filesystem',\n" +
                "    'path' = 'file:///Users/larry/IdeaProjects/pkslow-samples/bigdata/flink-csv-to-parquet/src/main/resources/input.csv',\n" +  // Update this path
                "    'format' = 'csv',\n" +
                "    'csv.ignore-parse-errors' = 'true'\n" +
                ")";

        tableEnv.executeSql(sourceDDL);

        String printTable = "CREATE TABLE print_table (\n" +
                "  id INT,\n" +
                "  name STRING,\n" +
                "  age INT\n" +
                ") WITH (\n" +
                "  'connector' = 'print'\n" +
                ");";

        tableEnv.executeSql(printTable);

        String insertSQL = "INSERT INTO print_table SELECT * FROM csv_source";
        tableEnv.executeSql(insertSQL);

        tableEnv.executeSql(
                "CREATE TABLE clickhouse_sink (\n" +
                        "    id BIGINT,\n" +
                        "    name STRING,\n" +
                        "    age INT\n" +
                        ") WITH (\n" +
                        "    'connector' = 'clickhouse',\n" +
                        "    'url' = 'clickhouse://localhost:8123',\n" +
                        "    'database-name' = 'flink',\n" +
                        "    'table-name' = 'persons',\n" +
//                        "    'driver' = 'com.clickhouse.jdbc.ClickHouseDriver',\n" +
                        "    'username' = 'larry',\n" +
                        "    'password' = 'larrydpk'\n" +
                        ")"
        );

        tableEnv.executeSql(
                "INSERT INTO clickhouse_sink\n" +
                        "SELECT id, name, age FROM csv_source"
        );

    }
}

5 Code

Code on GitHub: https://github.com/LarryDpk/pkslow-samples