當前位置:網站首頁>flink(scala版)學習一之常用的source

flink(scala版)學習一之常用的source

2022-05-14 03:22:03皮皮蝦不皮呀

source之文件數據讀取

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//導入隱式轉換,建議寫在這裏,可以防止IDEA代碼提示出錯的問題
import org.apache.flink.streaming.api.scala._

object FileReadeSource {
    
  def main(args: Array[String]): Unit = {
    
    //初始化Flink的Streaming(流計算)上下文執行環境
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamEnv.setParallelism(1)

    //讀取數據
    val dataStream =streamEnv.readTextFile("./data/trafficdata")
    val ds = dataStream.map(line=>{
    
      val arr = line.split("\t")
      val info = s"${
      arr(0)}\t${
      arr(1)}\t${
      arr(2)}\t${
      arr(3)}\t${
      arr(4)}\t${
      arr(5)}\t${
      arr(6)}"
      info
    })

    ds.print()
    streamEnv.execute("flink start")
  }
}

數據結構如下
在這裏插入圖片描述
最終結果:
在這裏插入圖片描述

source之kafka數據讀取

普通的string數據

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.flink.streaming.api.scala._
import java.util.Properties


object KafkaSourceWithoutKey {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //組織配置項
    val props = new Properties()
    props.setProperty("bootstrap.servers", "172.16.254.4:9092,172.16.254.5:9092,172.16.254.6:9092")
// props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092")
    props.setProperty("key.deserializer", classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer", classOf[StringDeserializer].getName)
    props.setProperty("group.id", "test01_group")
// props.setProperty("auto.offset.reset", "latest") //也可以不設置,默認是 flinkKafkaConsumer.setStartFromGroupOffsets(),設置了也不會起作用
    //讀取Kafka中的數據
    val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("test01", new SimpleStringSchema(), props))
    lines.print()
    //觸發執行
    env.execute()
  }
}

前往kafka集群進行測試:
在這裏插入圖片描述
結果打印:
在這裏插入圖片描述

標准的key-value數據

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment, createTuple2TypeInformation}
import org.apache.flink.streaming.connectors.kafka.{
    FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.flink.streaming.api.scala._
import java.util.Properties

object KafkaSourceWithKey {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val props = new Properties()
    props.setProperty("bootstrap.servers", "172.16.254.4:9092,172.16.254.5:9092,172.16.254.6:9092")
    props.setProperty("key.serializer", classOf[StringDeserializer].getName)
    props.setProperty("value.serializer", classOf[StringDeserializer].getName)
    props.setProperty("group.id", "test02_group")
    // props.setProperty("auto.offset.reset","latest") 設置不設置無所謂,因為可以對FlinkKafkaConsumer設置 從什麼比特置讀取數據

    val flinkKafkaConsumer = new FlinkKafkaConsumer[(String, String)]("test02", new KafkaDeserializationSchema[(String, String)] {
    
      override def isEndOfStream(t: (String, String)): Boolean = false //是否流結束

      override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
    
        var key = "0"
        var value = "null"
        if (consumerRecord.key() != null) {
    
          key = new String(consumerRecord.key(), "UTF-8")
        }
        if (consumerRecord.value() != null) {
    
          value = new String(consumerRecord.value, "UTF-8")
        }
        (key, value)
      }

      //設置返回的二元組類型 ,createTuple2TypeInformation 需要導入隱式轉換
      override def getProducedType: TypeInformation[(String, String)] = {
    
        createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
      }
    }, props)

    //設置讀取Kafka中的數據從最後開始,默認設置為 setStartFromGroupOffsets
    val infos: DataStream[(String, String)] = env.addSource(flinkKafkaConsumer.setStartFromLatest())
    //打印結果
    infos.print()
    //觸發執行
    env.execute()
  }
}

結果:
在這裏插入圖片描述

source之集合數據讀取

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//導入隱式轉換,建議寫在這裏,可以防止IDEA代碼提示出錯的問題
import org.apache.flink.streaming.api.scala._

case class StationLog(sid: String, callOut: String, callIn: String, callType: String, callTime: Long, duration: Long)

object CollectionSource {
    
  def main(args: Array[String]): Unit = {
    
    //初始化Flink的Streaming(流計算)上下文執行環境
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamEnv.setParallelism(1)

    //讀取數據
    var dataStream = streamEnv.fromCollection(Array(
      new StationLog("001", "186", "189", "busy", 1577071519462L, 0),
      new StationLog("002", "186", "188", "busy", 1577071520462L, 0),
      new StationLog("003", "183", "188", "busy", 1577071521462L, 0),
      new StationLog("004", "186", "188", "success", 1577071522462L, 32)
    ))

    dataStream.print()
    streamEnv.execute()
  }
}

結果:
在這裏插入圖片描述

source之sockets數據讀取

import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._

object SocketSource {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // get input data
    val text: DataStream[String] = env.socketTextStream("127.0.0.1", 6666)
    val counts = text.flatMap(_.toLowerCase().split("\\W+"))
      .map((_, 1)).keyBy(0).sum(1)

    counts.print()
    env.execute("Streaming Count")
  }
}

結果展示:
在這裏插入圖片描述

版權聲明
本文為[皮皮蝦不皮呀]所創,轉載請帶上原文鏈接,感謝
https://cht.chowdera.com/2022/134/202205140319038565.html

隨機推薦