LarryDpk
发布于 2024-12-25 / 11 阅读
0

Flink Sink to ClickHouse with Table API with Connector

Flink Sink to ClickHouse with Table API with Connector

Get the code:

  
git clone git@github.com:itinycheng/flink-connector-clickhouse.git  
  

Update the maven plugin version:

  
<maven-shade-plugin.version>3.6.0</maven-shade-plugin.version>  
  
  
mvn clean install  
  

Add connector lib to project

  
<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.pkslow</groupId> <artifactId>bigdata</artifactId> <version>1.0-SNAPSHOT</version> </parent>  
 <groupId>com.pkslow.bigdata</groupId> <artifactId>flink-clickhouse-table-api-with-connector</artifactId>  
 <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.16.3</flink.version> </properties>  
 <dependencies><!--        Flink Base-->  
 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency>  
 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency>  
<!--        Flink Table API-->  
 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> </dependency>  
 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> </dependency>  
 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> </dependency>  
<!--        Filesystem and csv-->  
 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency>  
 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency>  
<!--        ExecutorFactory-->  
 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency>  
<!--        JDBC-->  
 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>${flink.version}</version> </dependency>  
 <!-- ClickHouse JDBC Driver --> <dependency> <groupId>com.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.7.1</version> </dependency>  
 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-clickhouse</artifactId> <version>1.16.0-SNAPSHOT</version> </dependency>  
 <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.5.11</version> </dependency>  
  
 </dependencies>  
</project>  

Java code:

package com.pkslow.bigdata;  
  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.table.api.EnvironmentSettings;  
import org.apache.flink.table.api.TableEnvironment;  
  
public class FlinkCsvToClickHouseWithConnectorMain {  
 public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
 EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings);  
 String sourceDDL = "CREATE TABLE csv_source (\n" + "    id INT,\n" + "    name STRING,\n" + "    age INT\n" + ") WITH (\n" + "    'connector' = 'filesystem',\n" + "    'path' = 'file:///Users/larry/IdeaProjects/pkslow-samples/bigdata/flink-csv-to-parquet/src/main/resources/input.csv',\n" +  // Update this path "    'format' = 'csv',\n" + "    'csv.ignore-parse-errors' = 'true'\n" + ")";  
 tableEnv.executeSql(sourceDDL);  
 String printTable = "CREATE TABLE print_table (\n" + "  id INT,\n" + "  name STRING,\n" + "  age INT\n" + ") WITH (\n" + "  'connector' = 'print'\n" + ");";  
 tableEnv.executeSql(printTable);  
 String insertSQL = "INSERT INTO print_table SELECT * FROM csv_source"; tableEnv.executeSql(insertSQL);  
 tableEnv.executeSql( "CREATE TABLE clickhouse_sink (\n" + "    id BIGINT,\n" + "    name STRING,\n" + "    age INT\n" + ") WITH (\n" + "    'connector' = 'clickhouse',\n" + "    'url' = 'clickhouse://localhost:8123',\n" + "    'database-name' = 'flink',\n" + "    'table-name' = 'persons',\n" +//                        "    'driver' = 'com.clickhouse.jdbc.ClickHouseDriver',\n" +  
 "    'username' = 'larry',\n" + "    'password' = 'larrydpk'\n" + ")" );  
 tableEnv.executeSql( "INSERT INTO clickhouse_sink\n" + "SELECT id, name, age FROM csv_source" );  
 }}  
  

5 Code

Code on GitHub: https://github.com/LarryDpk/pkslow-samples