1 源码

  • 清空H 表
    \"在这里插入图片描述\"
  • StatStreamingApp.scala
package streamingproject

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import streamingproject.dao.CourseClickCountDAO
import streamingproject.domian.{ClickLog, CourseClickCount}
import streamingproject.utils.DateUtils

import scala.collection.mutable.ListBuffer

/*
* 使用 Spark Streaming 处理 Kafka 输出的数据
* */
  StatStreamingApp {
  def main(args: Array[String]): Unit = {

    if (args.length != 4) {
      println(\"Usage:<zkQuorum> <group> <topics> <numThreads>\")
      System.exit(1)
    }

    val Array(zkQuorum, groupId, topics, numThreads) = args

    val sparkConf = new SparkConf().setAppName(\"StatStreamingApp\").setMaster(\"local[2]\")
    val ssc = new StreamingContext(sparkConf, Seconds(60))

    val topicMap = topics.split(\",\").map((_, numThreads.toInt)).toMap

    val messages = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap)

    /*
    * 测试1:测试数据接收
    * */
    //messages.map(_._2).count().print()


    /*
    * 测试2:数据清洗
    * */
    val logs = messages.map(_._2)
    val cleanData = logs.map(line => {

      //infos(2) = \"GET /class/128.html HTTP/1.1\"
      //url =  /class/128.html
      val infos = line.split(\"\\t\")
      val url = infos(2).split(\" \")(1)
      var courseId = 0
      if (url.startsWith(\"/class\")) {
        val courseIdHTML = url.split(\"/\")(2)
        courseId = courseIdHTML.substring(0, courseIdHTML.lastIndexOf(\".\")).toInt
      }

      ClickLog(infos(0), DateUtils.parseToMinute(infos(1)), courseId, infos(3).toInt, infos(4))

    }).filter(clickLog => clickLog.courseId != 0)


    //cleanData.print()


    /*
    * 测试3 :统计到现在为止实战课程的访问量
    * */
    cleanData.map(x => {
      //H  rowkey: 20181111_88
      (x.time.substring(0, 8) + \"_\" + x.courseId, 1)
    }).reduceByKey(_ + _).foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecords => {
        val list = new ListBuffer[CourseClickCount]

        partitionRecords.foreach(pair => {
          list.append(CourseClickCount(pair._1, pair._2))
        })

        CourseClickCountDAO.save(list)
      })
    })


    ssc.start()
    ssc.awaitTermination()

  }
}

\"在这里插入图片描述\"

收藏 打印