Spark Streaming 项目实战 (2) | 从 Kafka中消费数据

  大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—不温不火,本意是希望自己性情温和。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客主页:https://buwenbuhuo.blog.csdn.net/

  本片博文为大家带来的是Spark Streaming 项目实战 (2) | 从 Kafka中消费数据。
Spark Streaming 项目实战 (2) | 从 Kafka中消费数据

目录一. 测试是否能够从Kafka消费到数据二. 完整程序源码3. 运行结果

Spark Streaming 项目实战 (2) | 从 Kafka中消费数据
  编写App, 从 kafka 读取数据

  新建一个Maven项目:spark-streaming-project

  在依赖选择上spark-streaming-kafka此次选用0-10_2.11而非0-08_2.11

    <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.1</version>
</dependency>

Spark Streaming 项目实战 (2) | 从 Kafka中消费数据

一. 测试是否能够从Kafka消费到数据

  • 1. 新建APP(Trait)
    package com.buwenbuhuo.streaming.project.app

    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    /**
    *
    * @author 不温卜火
    * @create 2020-08-14 13:41
    * MyCSDN : https://buwenbuhuo.blog.csdn.net/
    *
    */
    trait App {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("App").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(3))

    val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "bigdata0814",
    "auto.offset.reset" -> "latest",
    // 自动提交管理
    "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val topics = Array("ads_log0814")

    val sourceStream: DStream[String] = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent, // 标配
    Subscribe[String, String](topics, kafkaParams)
    ).map(_.value())
    sourceStream.print(1000)

    ssc.start()
    ssc.awaitTermination()
    }
    }

  • 2. 新建AreaTopAPP
    package com.buwenbuhuo.streaming.project.app

    /**
    *
    * @author 不温卜火
    * @create 2020-08-14 13:41
    * MyCSDN : https://buwenbuhuo.blog.csdn.net/
    *
    */
    object AreaTopAPP extends App {

    }

  • 3. 运行AreaTopAPP

    Spark Streaming 项目实战 (2) | 从 Kafka中消费数据
      上述即为测试 ,但是其实在这个app内,有一部分可以专门封装成一个新的样例类

      测试能够成功得到所想要的结果,下面给出完善最终的程序源码

    二. 完整程序源码

    编写App, 从 kafka 读取数据

  • bean 类 AdsInfo
    package com.buwenbuhuo.streaming.project.bean
    import java.sql.Timestamp
    import java.text.SimpleDateFormat
    import java.util.Date

    /**
    *
    * @author 不温卜火
    * @create 2020-08-14 17:45
    * MyCSDN : https://buwenbuhuo.blog.csdn.net/
    *
    */
    case class AdsInfo(ts: Long,
    area: String,
    city: String,
    userId: String,
    adsId: String,
    var timestamp: Timestamp = null,
    var dayString: String = null, // 2012-8-14
    var hmString: String = null) { // 11:20

    timestamp = new Timestamp(ts)

    val date = new Date(ts)
    dayString = new SimpleDateFormat("yyyy-MM-dd").format(date)
    hmString = new SimpleDateFormat("HH:mm").format(date)
    }

  • 2. 工具类类 MyKafkaUtils
    package com.buwenbuhuo.streaming.project.util

    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

    /**
    *
    * @author 不温卜火
    * @create 2020-08-14 15:20
    * MyCSDN : https://buwenbuhuo.blog.csdn.net/
    *
    */
    object MyKafkaUtils {

    val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "bigdata0814",
    "auto.offset.reset" -> "latest",
    // 自动提交管理
    "enable.auto.commit" -> (true: java.lang.Boolean)
    )

    /*
    * 根据传入的参数,返回从kafka得到的流
    * @param ssc
    * @param topic
    * @return
    */
    def getKafkaSteam(ssc:StreamingContext,topics:String*): DStream[String] =

    KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent, // 标配
    Subscribe[String, String](topics.toIterable, kafkaParams)
    ).map(_.value())
    }

  • 3. 从kafka消费数据(APP)
    package com.buwenbuhuo.streaming.project.app

    import com.buwenbuhuo.streaming.project.bean.AdsInfo
    import com.buwenbuhuo.streaming.project.util.MyKafkaUtils
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    /**
    *
    * @author 不温卜火
    * @create 2020-08-14 13:41
    * MyCSDN : https://buwenbuhuo.blog.csdn.net/
    *
    */
    trait App {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("App").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(3))
    // 得到最原始的流
    val sourceStream: DStream[String] = MyKafkaUtils.getKafkaSteam(ssc,"ads_log0814")

    val adsInfoStream: DStream[AdsInfo] = sourceStream.map(s => {
    val spilt: Array[String] = s.split(",")
    AdsInfo(spilt(0).toLong, spilt(1), spilt(2), spilt(3), spilt(4))
    })

    adsInfoStream.print(1000)

    ssc.start()
    ssc.awaitTermination()
    }
    }

  • 4. AreaTopAPP
    package com.buwenbuhuo.streaming.project.app

    /**
    *
    * @author 不温卜火
    * @create 2020-08-14 13:41
    * MyCSDN : https://buwenbuhuo.blog.csdn.net/
    *
    */
    object AreaTopAPP extends App {

    }


    3. 运行结果

    同时运行MockRealtimeData(数据生产者)和AreaTopAPP(数据消费者)
    Spark Streaming 项目实战 (2) | 从 Kafka中消费数据
    Spark Streaming 项目实战 (2) | 从 Kafka中消费数据
      本次的分享就到这里了,

    Spark Streaming 项目实战 (2) | 从 Kafka中消费数据

      好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
      如果我的博客对你有帮助、如果你喜欢我的博客内容,请“点赞” “评论”“收藏”一键三连哦!听说点赞的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
      码字不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注我哦!

    Spark Streaming 项目实战 (2) | 从 Kafka中消费数据
    Spark Streaming 项目实战 (2) | 从 Kafka中消费数据

    原创:https://www.panoramacn.com
    源码网提供WordPress源码,帝国CMS源码discuz源码,微信小程序,小说源码,杰奇源码,thinkphp源码,ecshop模板源码,微擎模板源码,dede源码,织梦源码等。

    专业搭建小说网站,小说程序,杰奇系列,微信小说系列,app系列小说

    Spark Streaming 项目实战 (2) | 从 Kafka中消费数据

    免责声明,若由于商用引起版权纠纷,一切责任均由使用者承担。

    您必须遵守我们的协议,如果您下载了该资源行为将被视为对《免责声明》全部内容的认可-> 联系客服 投诉资源
    www.panoramacn.com资源全部来自互联网收集,仅供用于学习和交流,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。 敬请谅解! 侵权删帖/违法举报/投稿等事物联系邮箱:2640602276@qq.com
    未经允许不得转载:书荒源码源码网每日更新网站源码模板! » Spark Streaming 项目实战 (2) | 从 Kafka中消费数据
    关注我们小说电影免费看
    关注我们,获取更多的全网素材资源,有趣有料!
    120000+人已关注
    分享到:
    赞(0) 打赏
  • 评论抢沙发

    • 昵称 (必填)
    • 邮箱 (必填)
    • 网址

    您的打赏就是我分享的动力!

    支付宝扫一扫打赏

    微信扫一扫打赏