av激情亚洲男人的天堂国语,日韩欧美精品一中文字幕,无码av一区二区三区无码,国产又色又爽又刺激的a片,国产又色又爽又刺激的a片

FlinkCDC里flink1.17寫doris的代碼怎么做?

在Flink CDC中,將數(shù)據(jù)從Flink 1.17寫入Doris,需要遵循以下步驟:

創(chuàng)新互聯(lián)公司公司2013年成立,先為巴林左旗等服務(wù)建站,巴林左旗等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為巴林左旗企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問(wèn)題。

1、添加依賴

在項(xiàng)目的pom.xml文件中添加Flink CDC和Doris的依賴:


    org.apache.flink
    flinkconnectordoris_2.11
    1.13.2


    org.apache.flink
    flinkconnectormysqlcdc
    2.1.0

2、創(chuàng)建Flink CDC Source

創(chuàng)建一個(gè)Flink CDC Source,用于從MySQL數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)變更事件:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
public class FlinkCDCSourceExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SourceFunction sourceFunction = MySqlSource.builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("mydb") // 監(jiān)聽(tīng)的數(shù)據(jù)庫(kù)名
                .tableList("mydb.mytable") // 監(jiān)聽(tīng)的表名
                .username("root")
                .password("password")
                .deserializer(new StringDebeziumDeserializationSchema()) // 反序列化方式
                .build();
        env.addSource(sourceFunction).print();
        env.execute("Flink CDC Example");
    }
}

3、創(chuàng)建Doris Sink

創(chuàng)建一個(gè)Doris Sink,用于將數(shù)據(jù)寫入Doris數(shù)據(jù)庫(kù):

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.doris.DorisSink;
import org.apache.flink.streaming.connectors.doris.DorisStreamLoadOptions;
import org.apache.flink.types.Row;
public class DorisSinkExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 假設(shè)從Flink CDC Source獲取的數(shù)據(jù)流為dataStream
        DataStream dataStream = ...;
        DorisSink dorisSink = DorisSink.builder()
                .setDorisTable("mydb.mytable") // Doris表名
                .setUsername("root")
                .setPassword("password")
                .setFenodes("localhost:8030") // Doris FE節(jié)點(diǎn)地址
                .setLoadProps(DorisStreamLoadOptions.DEFAULT_LOAD_PROPS) // 加載屬性
                .build();
        dataStream.addSink(dorisSink);
        env.execute("Doris Sink Example");
    }
}

4、整合Flink CDC Source和Doris Sink

將Flink CDC Source和Doris Sink整合到一起,實(shí)現(xiàn)從MySQL數(shù)據(jù)庫(kù)到Doris數(shù)據(jù)庫(kù)的數(shù)據(jù)同步:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.doris.DorisSink;
import org.apache.flink.streaming.connectors.doris.DorisStreamLoadOptions;
import org.apache.flink.types.Row;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
public class FlinkCDCToDorisExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SourceFunction sourceFunction = MySqlSource.builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("mydb") // 監(jiān)聽(tīng)的數(shù)據(jù)庫(kù)名
                .tableList("mydb.mytable") // 監(jiān)聽(tīng)的表名
                .username("root")
                .password("password")
                .deserializer(new StringDebeziumDeserializationSchema()) // 反序列化方式
                .build();
        DataStream dataStream = env.addSource(sourceFunction);
        // 將數(shù)據(jù)流轉(zhuǎn)換為Row類型,以便寫入Doris
        DataStream rowDataStream = dataStream.map(json > {
            JsonObject jsonObject = new JsonParser().parse(json).getAsJsonObject();
            String before = jsonObject.get("before").getAsString();
            String after = jsonObject.get("after").getAsString();
            return Row.of(before, after);
        }).returns(new RowTypeInfo(Types.STRING, Types.STRING));
        DorisSink dorisSink = DorisSink.builder()
                .setDorisTable("mydb.mytable") // Doris表名
                .setUsername("root")
                .setPassword("password")
                .setFenodes("localhost:8030") // Doris FE節(jié)點(diǎn)地址
                .setLoadProps(DorisStreamLoadOptions.DEFAULT_LOAD_PROPS) // 加載屬性
                .build();
        rowDataStream.addSink(dorisSink);
        env.execute("Flink CDC to Doris Example");
    }
}

這樣,就完成了使用Flink CDC將數(shù)據(jù)從MySQL數(shù)據(jù)庫(kù)同步到Doris數(shù)據(jù)庫(kù)的過(guò)程。


分享文章:FlinkCDC里flink1.17寫doris的代碼怎么做?
本文地址:http://uogjgqi.cn/article/cdecesc.html
掃二維碼與項(xiàng)目經(jīng)理溝通

我們?cè)谖⑿派?4小時(shí)期待你的聲音

解答本文疑問(wèn)/技術(shù)咨詢/運(yùn)營(yíng)咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流