Spark学习案例——SparkSQL结合Kudu实现广告业务分析

下方有数据可免费下载

目录原始数据项目架构ETL处理业务一业务二业务三代码重构打包定时运行

源码地址https://github.com/chengyanban/spark-project/tree/master/广告数据分析

原始数据

下载数据: 请点击我.提取码:3bm9
有两个文件,一个广告业务的data-test.json,一个ip.txt文件

项目架构

Spark学习案例——SparkSQL结合Kudu实现广告业务分析

ETL处理

data-test.json文件中每行有ip地址,需通过ip.txt文件进行解析,解析出地名,运营商等信息。但是data-test.json中的ip格式为123.23.3.11,而ip.txt中的ip格式为16777472——16778239十进制的形式,需将ip转化为十进制,判断是否在其范围内。

def ip2Long(ip:String)={
val splits = ip.split("[.]")
var ipNum = 0L
for (i <- 0.until(splits.length)){
ipNum = splits(i).toLong | ipNum << 8L
}
ipNum
}

读取两个文件形成DF,再创造临时视图,再通过join产生ODS表

jsonDF.createOrReplaceTempView("logs")
ipRuleDF.createOrReplaceTempView("ips")

lazy val SQL = "select " +
"logs.ip ," +
...
...
...
"logs.sessionid," +
"from logs left join " +
"ips on logs.ip_long between ips.start_ip and ips.end_ip "

业务一

统计每个省每个城市的数量
select provincename, cityname, count(1) as cnt from ods group by provincename,cityname

object ProvinceCityStatProcess extends DataProcess{
override def process(spark:SparkSession): Unit = {

val sourceTableName = DateUtils.getTableName("ods", spark)
val masterAdress = "hadoop000"

//第一步,读取ODS表
val odsDF = spark.read.format("org.apache.kudu.spark.kudu")
.option("kudu.master", masterAdress)
.option("kudu.table", sourceTableName)
.load()
//odsDF.show()

//第二步,建立视图根据业务进行SQL分析
odsDF.createOrReplaceTempView("ods")
val result = spark.sql(SQLUtils.PROVINCE_CITY_SQL)
//result.show()

val KUDU_MASTERS = "hadoop000"
val tableName = DateUtils.getTableName("province_city_stat", spark)
val partitionID = "provincename"
val schema = SchemaUtils.ProvinceCitySchema

//第三步,将得到的DF写入到Kudu中
KuduUtils.sink(result,tableName, KUDU_MASTERS, schema,partitionID)
}

业务二

地区分布指标
Spark学习案例——SparkSQL结合Kudu实现广告业务分析

业务三

APP统计直播
Spark学习案例——SparkSQL结合Kudu实现广告业务分析

代码重构

1.创建一个trait文件

trait DataProcess {
def process(spark:SparkSession)
}

2.各种业务(object实例文件)只需要去实现process()这个函数就行,如上业务一,继承DataProcess,并实现process()。

object ProvinceCityStatProcess extends DataProcess{
override def process(spark:SparkSession): Unit = {
...
//第一步,读取ODS表
//第二步,建立视图根据业务进行SQL分析
//第三步,将得到的DF写入到Kudu中
...
}

3.程序mian入口

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().getOrCreate()

val time = spark.sparkContext.getConf.get("spark.time") // spark框架只认以spark.开头的参数,否则系统不识别
if(StringUtils.isBlank(time)) { // 如果是空,后续的代码就不应该执行了
logError("处理批次不能为空....")
System.exit(0)
}

//ETL,每个业务就运行process()函数,都有统一的SparkSession
LogETLProcessor.process(spark)
//业务一
ProvinceCityStatProcess.process(spark)
//业务二
AreaStatProcess.process(spark)
//业务三
AppStatProcess.process(spark)

spark.stop()
}

4.各种工具类只需调用即可

DateUtils           //表名后加日期,一天一处理
IPUtils //ip二进制转换
KuduUtils //写入到Kudu表
SchemaUtils //Kudu表的schema信息
SQLUtils //SQL语句

打包定时运行

//sparksql.sh:
time=20200903
${SPARK_HOME}/bin/spark-submit \
--class com.chengyanban.project.App \
--master local \
--jars /home/hadoop/lib/kudu-client-1.7.0.jar,/home/hadoop/lib/kudu-spark2_2.11-1.7.0.jar \
--conf spark.time=$time \
--conf spark.raw.path="hdfs://hadoop000:8020/muke/sparksql/$time" \
--conf spark.ip.path="hdfs://hadoop000:8020/muke/sparksql/ip.txt" \
/home/hadoop/lib/sparksql-train-1.0.jar

chmod -R 777 sparksql.sh 需要给执行权限
路径前要加hdfs://hadoop000:8020,不加crontab会用本地

crontab -e
28 */17 * * * /home/hadoop/script/sparksql.sh

原创:https://www.panoramacn.com
源码网提供WordPress源码,帝国CMS源码discuz源码,微信小程序,小说源码,杰奇源码,thinkphp源码,ecshop模板源码,微擎模板源码,dede源码,织梦源码等。

专业搭建小说网站,小说程序,杰奇系列,微信小说系列,app系列小说

Spark学习案例——SparkSQL结合Kudu实现广告业务分析

免责声明,若由于商用引起版权纠纷,一切责任均由使用者承担。

您必须遵守我们的协议,如果您下载了该资源行为将被视为对《免责声明》全部内容的认可-> 联系客服 投诉资源
www.panoramacn.com资源全部来自互联网收集,仅供用于学习和交流,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。 敬请谅解! 侵权删帖/违法举报/投稿等事物联系邮箱:2640602276@qq.com
未经允许不得转载:书荒源码源码网每日更新网站源码模板! » Spark学习案例——SparkSQL结合Kudu实现广告业务分析
关注我们小说电影免费看
关注我们,获取更多的全网素材资源,有趣有料!
120000+人已关注
分享到:
赞(0) 打赏

评论抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

您的打赏就是我分享的动力!

支付宝扫一扫打赏

微信扫一扫打赏