Flink Sink to ClickHouse with Table API with Connector
Build flink-connector-clickhouse
Get the code:
git clone git@github.com:itinycheng/flink-connector-clickhouse.git
Update the maven plugin version:
<maven-shade-plugin.version>3.6.0</maven-shade-plugin.version>
mvn clean install
Add connector lib to project
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.pkslow</groupId> <artifactId>bigdata</artifactId> <version>1.0-SNAPSHOT</version> </parent>
<groupId>com.pkslow.bigdata</groupId> <artifactId>flink-clickhouse-table-api-with-connector</artifactId>
<properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.16.3</flink.version> </properties>
<dependencies><!-- Flink Base-->
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency>
<!-- Flink Table API-->
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> </dependency>
<!-- Filesystem and csv-->
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency>
<!-- ExecutorFactory-->
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency>
<!-- JDBC-->
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>${flink.version}</version> </dependency>
<!-- ClickHouse JDBC Driver --> <dependency> <groupId>com.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.7.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-clickhouse</artifactId> <version>1.16.0-SNAPSHOT</version> </dependency>
<dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.5.11</version> </dependency>
</dependencies>
</project>
Java code:
package com.pkslow.bigdata;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class FlinkCsvToClickHouseWithConnectorMain {
public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings);
String sourceDDL = "CREATE TABLE csv_source (\n" + " id INT,\n" + " name STRING,\n" + " age INT\n" + ") WITH (\n" + " 'connector' = 'filesystem',\n" + " 'path' = 'file:///Users/larry/IdeaProjects/pkslow-samples/bigdata/flink-csv-to-parquet/src/main/resources/input.csv',\n" + // Update this path " 'format' = 'csv',\n" + " 'csv.ignore-parse-errors' = 'true'\n" + ")";
tableEnv.executeSql(sourceDDL);
String printTable = "CREATE TABLE print_table (\n" + " id INT,\n" + " name STRING,\n" + " age INT\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ");";
tableEnv.executeSql(printTable);
String insertSQL = "INSERT INTO print_table SELECT * FROM csv_source"; tableEnv.executeSql(insertSQL);
tableEnv.executeSql( "CREATE TABLE clickhouse_sink (\n" + " id BIGINT,\n" + " name STRING,\n" + " age INT\n" + ") WITH (\n" + " 'connector' = 'clickhouse',\n" + " 'url' = 'clickhouse://localhost:8123',\n" + " 'database-name' = 'flink',\n" + " 'table-name' = 'persons',\n" +// " 'driver' = 'com.clickhouse.jdbc.ClickHouseDriver',\n" +
" 'username' = 'larry',\n" + " 'password' = 'larrydpk'\n" + ")" );
tableEnv.executeSql( "INSERT INTO clickhouse_sink\n" + "SELECT id, name, age FROM csv_source" );
}}
5 Code
Code on GitHub: https://github.com/LarryDpk/pkslow-samples