当前位置:网站首页>Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)

Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)

2020-11-08 12:06:33 osc_15vyay19

Flink提供了各种数据的转换操作,但实际业务过程中有很多业务上需要处理的数据结构、规则等等,需要自己写自己的业务代码,这时候就用到的flink提供的函数类(Function Class)

Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类),例如MapFunction,FilterFunction,ProcessFunction等。

一个小栗子,要筛选数据中以sensor3为开头的数据
还是在com.mafei.apitest新建一个scala Object UDFTest1
其他代码跟之前一样,读取文件做些简单处理,这里增加了一个自定义的函数类MyFilterFunction,在使用时,只需要在逻辑处增加.filter方法即可,

package com.mafei.apitest

import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}

//获取传感器数据

case class SensorReadingTest1(id: String,timestamp: Long, temperature: Double)

object UdfTest1 {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    case class Person(name: String, age: Int)

    val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
    env.setParallelism(1)

//    inputStream.print()
    //先转换成样例类类型
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",") //按照,分割数据,获取结果
        SensorReadingTest1(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
        //      }).filter(new MyFilterFunction)
        //      }).filter(_.id.startsWith("sensor1"))   //如果特别简单的逻辑,也可以匿名类直接这样子写,和写一个函数是一样的效果

        //      }).filter(new RichFilterFunction[SensorReadingTest1] {
        //      override def filter(t: SensorReadingTest1): Boolean =
        //        t.id.startsWith("sensor3")
        //    })   //匿名类的实现效果,和上面2种效果都是一样的

      }).filter(new KeywordFilterFunction("sensor3"))  //也可以把要过滤的参数传进去

    dataStream.print()
    env.execute("udf test")

  }

}

//自定义一个函数类,做过滤,实现接口中的filter方法即可
class MyFilterFunction extends FilterFunction[SensorReadingTest1] {

  override def filter(t: SensorReadingTest1): Boolean = t.id.startsWith("sensor3")

}

//自定义的函数类,和上面一样,增加了传参,
class KeywordFilterFunction(keyword: String) extends FilterFunction[SensorReadingTest1]{
  override def filter(t: SensorReadingTest1): Boolean =
    t.id.startsWith(keyword)
}

代码结构及运行效果图

Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)

RichMap

主要做一些数据处理等操作,代码演示了 MapperDemo和RichMapDemo的区别及运行效果

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichMapFunction.html

package com.mafei.apitest

import org.apache.flink.api.common.functions.{FilterFunction, MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}

//获取传感器数据

case class SensorReadingTest2(id: String,timestamp: Long, temperature: Double)

object UdfTest2 {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    case class Person(name: String, age: Int)

    val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
    env.setParallelism(1)

//    inputStream.print()
    //先转换成样例类类型
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",") //按照,分割数据,获取结果
        SensorReadingTest2(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
      }).map(new RichMapDemo())

    dataStream.print()
    env.execute("udf test")
  }

}

class MapperDemo extends MapFunction[SensorReadingTest2, String]{
  override def map(t: SensorReadingTest2): String = t.id+"测试加一些字符串"
}

//富函数,比上面类多了open和close等方法,可以做些数据库连接等操作
class RichMapDemo extends RichMapFunction[SensorReadingTest2, String]{

  //这里主要是一些初始化操作,启动调用时,整个过程只会调用一次,类似于类初始化加载的变量,像数据库连接等等
  override def open(parameters: Configuration): Unit = {
    println("进行了数据库连接。。。。。。。。。。")
    //获取运行时上下文
    getRuntimeContext()
  }

  //每条数据都会经过这个方法
  override def map(in: SensorReadingTest2): String = in.id+"测试富函数加一些字符串"

  override def close(): Unit = {
    //跟open类似,当任务停止时会执行,可以做一些如释放数据库连接等等
    print("关闭了数据库连接。。。。。。")
  }
}

运行效果:可以看到,整个过程中,只有一次数据库连接操作

进行了数据库连接。。。。。。。。。。
sensor1测试富函数加一些字符串
sensor2测试富函数加一些字符串
sensor3测试富函数加一些字符串
sensor4测试富函数加一些字符串
sensor4测试富函数加一些字符串
sensor4测试富函数加一些字符串
关闭了数据库连接。。。。。。






版权声明
本文为[osc_15vyay19]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4368960/blog/4708103