package Test
import org.apache.spark.sql.{Data , Dataset, SparkSession}
/**
* 使用SparkSql实现access中的ip与ip规则的关联
* 前提条件是能拿到所有的数据(应用场景)
*/
SQLIIpLocation1 {
/**
* 定义一个ip转换的成十进制
* @param ip
* @return
*/
def ip2Long(ip:String):Long={
val fragments = ip.split(\"[.]\")
var ipNum =0L
for(i<- 0 until fragments.length){
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(\"SQLIIpLocation1\")
.master(\"local[*]\")
.getOrCreate()
//读取ip规则数据
val ipRulesLines: Dataset[String] = spark.read.textFile(args(0))
//导入隐式转换
import spark.implicits._
//整理ip规则数据
val tpDs: Dataset[(Long, Long, String)] = ipRulesLines.map(line => {
val fields: Array[String] = line.split(\"[|]\")
val startNum = fields(2).toLong
val endNum = fields(3).toLong
val province = fields(6)
(startNum, endNum, province)
})
//将ip规则转换成Data
val ipRulesDF: Data = tpDs.toDF(\"start_num\",\"end_num\",\"province\")
//将ip规则注册成视图
ipRulesDF.createOrReplaceTempView(\"v_ip_rules\")
//读取访问日志数据
val accessLog: Dataset[String] = spark.read.textFile(args(1))
//整理访问日志数据
val ipLogs: Data = accessLog.map(line => {
val fields: Array[String] = line.split(\"[|]\")
val ip: String = fields(1)
ip2Long(ip)
}).toDF(\"ip\")
//将ip日志注册成视图
ipLogs.createOrReplaceTempView(\"v_ip_logs\")
val result = spark.sql(\"SELECT province,Count(1) counts From v_ip_rules a,v_ip_logs b WHERE b.ip >=a.start_num and b.ip <= a.end_num GROUP BY province ORDER BY counts desc\")
result.show()
//释放资源
spark.stop()
}
}