2023學年完整公開課版Flink 實時APIDataSink_第1頁
2023學年完整公開課版Flink 實時APIDataSink_第2頁
2023學年完整公開課版Flink 實時APIDataSink_第3頁
2023學年完整公開課版Flink 實時APIDataSink_第4頁
2023學年完整公開課版Flink 實時APIDataSink_第5頁
已閱讀5頁,還剩6頁未讀 繼續免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

Flink實時API-DataSinkDataStream——SinkDataStreamAPISourceTransformationsSinkDataStream——SinkDataStreamAPISourceTransformationsSink數據接收器Sink將DataStream中的數轉發到文件,套接字,外部系統或打印它們。1)writeAsText():-將元素按行寫為字符串。

通過調用每個元素的toString()方法獲得字符串。2)writeToSocket:根據一個元素將元素寫入套接字3)Print():打印每個元素值到標準輸出流中4)addSink:調用自定義接受器功能DataStream——Sink1)writeAsText():將元素按行寫為字符串。通過調用每個元素的toString()方法獲得字符串。2)writeToSocket:根據一個元素將元素寫入套接字DataStream——Sink

//換一個接受的節點env.socketTextStream("localhost",port=8989).writeToSocket("localhost",port=8989,newSimpleStringSchema())env.execute()valenv=StreamExecutionEnvironment.getExecutionEnvironment//多并行度env.socketTextStream("localhost",port=8989).writeAsText("D:\\project\\first_flink01\\src\\main\\resources")Connectors3)addSink:調用自定義接收器

Connectors即連接器,實現了某個組件的Source或Sink,我們可以使用官方內置Connectors實現對數據流的實時讀取或實時寫出。官方內置Connectors有如下:ApacheKafka(source/sink)ApacheCassandra(sink)Elasticsearch(sink)HadoopFileSystem(sink)RabbitMQ(source/sink)ApacheActiveMQ(source/sink)Redis(sink)Kafka

Source&Sink

Flink生產者算子轉換操作主題:t1KafkaSource消費者(主題:t1)KafkaSink生產者(主題:t2)消費者主題:t2算子轉換操作KafkaSource消費者(主題:t1)Kafka

Source&Sinkflink既可以從kafka中消費數據,也能將數據寫入kafka。官方已經內置好Kafka的Connector。使用時需要導入maven依賴讀取步驟:①獲取kafkaSource②調用env.addSource(kafkaSource)寫入步驟:①獲取kafkaSink②調用env.addSink(kafkaSink)Kafka

Source&Sinkflink既可以從kafka中消費數據,也能將數據寫入kafka。官方已經內置好Kafka的Connector。使用時需要導入maven依賴讀取步驟:①獲取kafkaSource②調用env.addSource(kafkaSource)寫入步驟:①獲取kafkaSink②調用env.addSink(kafkaSink)Kafka

Source&SinkobjectKafkaUtil{valprop=newProperties()valtopic="t1"http://設置kafka服務器地址

prop.setProperty("bootstrap.servers","ubuntu:9092")//設置kafka消費者

prop.setProperty("group.id","con1")//獲得KafkaSource,類似于消費者

defgetKafkaSource(topic:String):FlinkKafkaConsumer09[String]={newFlinkKafkaConsumer09[String](topic,newSimpleStringSchema(),prop)}

//獲得KafkaSink,此sink負責將數據寫入到Kafka,類似于生產者

defgetKafkaSink(topic:String):FlinkKafkaProducer09[String]={newFlinkKafkaProducer09[String](prop.getProperty("bootstrap.servers"),topic,newSimpleStringSchema())

}}Kafka

Source&SinkobjectKafkaSourceAndSinkDemo{defmain(args:Array[String]):Unit={valenv=StreamExecutionEnvironment.getExecutionEnvironment

//獲取KafkaSourcevalkafkaSource=KafkaUtil.getKafkaSource("event_log")//獲取KafkaSinkvalkafkaSink=

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論