Flink Sink to ClickHouse with Table API with Connector
Build flink-connector-clickhouse
Get the code:
git clone git@github.com:itinycheng/flink-connector-clickhouse.git
Update the maven plugin version:
<maven-shade-plugin.version>3.6.0</maven-shade-plugin.version>
mvn clean install
Add connector lib to project
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.pkslow</groupId>
<artifactId>bigdata</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<groupId>com.pkslow.bigdata</groupId>
<artifactId>flink-clickhouse-table-api-with-connector</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.16.3</flink.version>
</properties>
<dependencies>
<!-- Flink Base-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Table API-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Filesystem and csv-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- ExecutorFactory-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- JDBC-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- ClickHouse JDBC Driver -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-clickhouse</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.11</version>
</dependency>
</dependencies>
</project>
Java code:
package com.pkslow.bigdata;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class FlinkCsvToClickHouseWithConnectorMain {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String sourceDDL = "CREATE TABLE csv_source (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'file:///Users/larry/IdeaProjects/pkslow-samples/bigdata/flink-csv-to-parquet/src/main/resources/input.csv',\n" + // Update this path
" 'format' = 'csv',\n" +
" 'csv.ignore-parse-errors' = 'true'\n" +
")";
tableEnv.executeSql(sourceDDL);
String printTable = "CREATE TABLE print_table (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
");";
tableEnv.executeSql(printTable);
String insertSQL = "INSERT INTO print_table SELECT * FROM csv_source";
tableEnv.executeSql(insertSQL);
tableEnv.executeSql(
"CREATE TABLE clickhouse_sink (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT\n" +
") WITH (\n" +
" 'connector' = 'clickhouse',\n" +
" 'url' = 'clickhouse://localhost:8123',\n" +
" 'database-name' = 'flink',\n" +
" 'table-name' = 'persons',\n" +
// " 'driver' = 'com.clickhouse.jdbc.ClickHouseDriver',\n" +
" 'username' = 'larry',\n" +
" 'password' = 'larrydpk'\n" +
")"
);
tableEnv.executeSql(
"INSERT INTO clickhouse_sink\n" +
"SELECT id, name, age FROM csv_source"
);
}
}
5 Code
Code on GitHub: https://github.com/LarryDpk/pkslow-samples