Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中

  大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—不温不火,本意是希望自己性情温和。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客主页:https://buwenbuhuo.blog.csdn.net/

  本片博文为大家带来的是实时的分析处理用户对广告点击的行为数据(1) | 准备数据。
Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中

目录一. 数据生成方式二. 数据生成模块1. 开启集群2. 创建 Topic3. 产生循环不断的数据到指定的 topic4. 确认 kafka 中数据是否生成成功

Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中
  本实战项目使用 Structured Streaming 来实时的分析处理用户对广告点击的行为数据.

一. 数据生成方式

  使用代码的方式持续的生成数据, 然后写入到 kafka 中.

  然后Structured Streaming 负责从 kafka 消费数据, 并对数据根据需求进行分析.

二. 数据生成模块

模拟出来的数据格式:

时间戳,地区,城市,用户 id,广告 id
1566035129449,华南,深圳,101,2


1. 开启集群

// 启动 zookeeper 和 Kafka
[bigdata@hadoop002 zookeeper-3.4.10]$ bin/start-allzk.sh
[bigdata@hadoop002 kafka]$ bin/start-kafkaall.sh

Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中

2. 创建 Topic

在 kafka 中创建topic: ads_log0814

[bigdata@hadoop002 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop002:9092 -topic ads_log0814

Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中

3. 产生循环不断的数据到指定的 topic

创建模块spark-realtime模块

  • 1. 导入依赖:
    // 尽量与Kafka版本保持一致

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.2</version>
    </dependency>

    Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中

  • 2. 工具类: RandomNumUtil
    用于生成随机数

    package com.buwenbuhuo.data.mock.util
    import java.util.Random

    import scala.collection.mutable

    /**
    *
    * @author 不温卜火
    * @create 2020-08-14 12:12
    * MyCSDN : https://buwenbuhuo.blog.csdn.net/
    * 随机生成整数的工具类
    *
    */
    object RandomNumUtil {
    val random = new Random()

    /**
    * 返回一个随机的整数 [from, to]
    *
    * @param from
    * @param to
    * @return
    */
    def randomInt(from: Int, to: Int): Int = {
    if (from > to) throw new IllegalArgumentException(s"from = $from 应该小于 to = $to")
    // [0, to - from) + from [form, to -from + from ]
    random.nextInt(to - from + 1) + from
    }

    /**
    * 随机的Long [from, to]
    *
    * @param from
    * @param to
    * @return
    */
    def randomLong(from: Long, to: Long): Long = {
    if (from > to) throw new IllegalArgumentException(s"from = $from 应该小于 to = $to")
    random.nextLong().abs % (to - from + 1) + from
    }

    /**
    * 生成一系列的随机值
    *
    * @param from
    * @param to
    * @param count
    * @param canReat 是否允许随机数重复
    */
    def randomMultiInt(from: Int, to: Int, count: Int, canReat: Boolean = true): List[Int] = {
    if (canReat) {
    (1 to count).map(_ => randomInt(from, to)).toList
    } else {
    val set: mutable.Set[Int] = mutable.Set[Int]()
    while (set.size < count) {
    set += randomInt(from, to)
    }
    set.toList
    }
    }

    def main(args: Array[String]): Unit = {
    println(randomMultiInt(1, 15, 10))
    println(randomMultiInt(1, 8, 10, false))
    }
    }

    Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中

  • 3. 工具类: RandomOptions
    用于生成带有比重的随机选项

    package com.buwenbuhuo.data.mock.util
    import scala.collection.mutable.ListBuffer
    /**
    *
    @author 不温卜火
    * @create 2020-08-14 12:12
    * MyCSDN : https://buwenbuhuo.blog.csdn.net/
    *
    * 根据提供的值和比重, 来创建RandomOptions对象.
    * 然后可以通过getRandomOption来获取一个随机的预定义的值

    *
    */
    object RandomOptions {
    def apply[T](opts: (T, Int)*): RandomOptions[T] = {
    val randomOptions = new RandomOptions[T]()
    randomOptions.totalWeight = (0 /: opts) (_ + _._2) // 计算出来总的比重
    opts.foreach {
    case (value, weight) => randomOptions.options ++= (1 to weight).map(_ => value)
    }
    randomOptions
    }

    def main(args: Array[String]): Unit = {
    // 测试
    val opts = RandomOptions(("张三", 10), ("李四", 30), ("ww", 20))

    println(opts.getRandomOption())
    println(opts.getRandomOption())
    println(opts.getRandomOption())
    println(opts.getRandomOption())
    println(opts.getRandomOption())
    }
    }

    // 工程师 10 程序猿 10 老师 20
    class RandomOptions[T] {
    var totalWeight: Int = _
    var options = ListBuffer[T]()
    /**
    * 获取随机的 Option 的值
    *
    * @return
    */
    def getRandomOption() = {
    options(RandomNumUtil.randomInt(0, totalWeight - 1))
    }
    }

    Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中

  • 4. 样例类: CityInfo
    package com.buwenbuhuo.data.mock.bean

    /**
    * 城市表
    *
    * @param city_id 城市 id
    * @param city_name 城市名
    * @param area 城市区域
    */
    case class CityInfo(city_id: Long,
    city_name: String,
    area: String)

  • 5. 生成模拟数据: MockRealtimeData
    package com.buwenbuhuo.data.mock
    import java.util.Properties

    import com.buwenbuhuo.data.mock.bean.CityInfo
    import com.buwenbuhuo.data.mock.util.{RandomNumUtil, RandomOptions}
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

    import scala.collection.mutable.ArrayBuffer

    /**
    *
    * @author 不温卜火
    * @create 2020-08-14 12:12
    * MyCSDN : https://buwenbuhuo.blog.csdn.net/
    *
    */

    /**
    * 生成实时的模拟数据
    */
    object MockRealtimeData {
    /*
    数据格式:
    timestamp area city userid adid
    某个时间点 某个地区 某个城市 某个用户 某个广告

    */
    def mockRealTimeData(): ArrayBuffer[String] = {
    // 存储模拟的实时数据
    val array = ArrayBuffer[String]()
    // 城市信息
    val randomOpts = RandomOptions(
    (CityInfo(1, "北京", "华北"), 30),
    (CityInfo(2, "上海", "华东"), 30),
    (CityInfo(3, "广州", "华南"), 10),
    (CityInfo(4, "深圳", "华南"), 20),
    (CityInfo(5, "杭州", "华中"), 10))
    (1 to 50).foreach {
    i => {
    val timestamp = System.currentTimeMillis()
    val cityInfo = randomOpts.getRandomOption()

    val area = cityInfo.area
    val city = cityInfo.city_name
    val userid = RandomNumUtil.randomInt(100, 105)
    val adid = RandomNumUtil.randomInt(1, 5)
    // 拼接成字符串
    array += s"$timestamp,$area,$city,$userid,$adid"
    Thread.sleep(10)
    }
    }
    array
    }

    def createKafkaProducer: KafkaProducer[String, String] = {
    val props: Properties = new Properties
    // Kafka服务端的主机名和端口号
    props.put("bootstrap.servers", "hadoop002:9092,hadoop003:9092,hadoop004:9092")
    // key序列化
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    // value序列化
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    new KafkaProducer[String, String](props)
    }

    def main(args: Array[String]): Unit = {
    val topic = "ads_log0814"
    // top生产者
    val producer: KafkaProducer[String, String] = createKafkaProducer
    while (true) {
    mockRealTimeData().foreach {
    msg => {
    producer.send(new ProducerRecord(topic, msg))
    Thread.sleep(100)
    }
    }
    Thread.sleep(1000)
    }
    }
    }

  • 6. 先看一下随机生成的数据
    // 这时候需要注释MockRealtimeData中的这两行代码

    Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中
    Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中

    4. 确认 kafka 中数据是否生成成功

    Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中
      本次的分享就到这里了,

    Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中

      好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
      如果我的博客对你有帮助、如果你喜欢我的博客内容,请“点赞” “评论”“收藏”一键三连哦!听说点赞的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
      码字不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注我哦!

    Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中
    Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中

    原创:https://www.panoramacn.com
    源码网提供WordPress源码,帝国CMS源码discuz源码,微信小程序,小说源码,杰奇源码,thinkphp源码,ecshop模板源码,微擎模板源码,dede源码,织梦源码等。

    专业搭建小说网站,小说程序,杰奇系列,微信小说系列,app系列小说

    Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中

    免责声明,若由于商用引起版权纠纷,一切责任均由使用者承担。

    您必须遵守我们的协议,如果您下载了该资源行为将被视为对《免责声明》全部内容的认可-> 联系客服 投诉资源
    www.panoramacn.com资源全部来自互联网收集,仅供用于学习和交流,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。 敬请谅解! 侵权删帖/违法举报/投稿等事物联系邮箱:2640602276@qq.com
    未经允许不得转载:书荒源码源码网每日更新网站源码模板! » Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中
    关注我们小说电影免费看
    关注我们,获取更多的全网素材资源,有趣有料!
    120000+人已关注
    分享到:
    赞(0) 打赏
  • 评论抢沙发

    • 昵称 (必填)
    • 邮箱 (必填)
    • 网址

    您的打赏就是我分享的动力!

    支付宝扫一扫打赏

    微信扫一扫打赏