在pom中添加依赖
<? version=\"1.0\" encoding=\"UTF-8\"?>
<project ns=\"http://maven.apache.org/POM/4.0.0\"
ns:xsi=\"http://www.w3.org/2001/ Schema-instance\"
xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd\">
<modelVersion>4.0.0</modelVersion>
<groupId>emg</groupId>
<artifactId>emg.spark</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude> -INF/*.SF</exclude>
<exclude> -INF/*.DSA</exclude>
<exclude> -INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation=\"org.apache.maven.plugins.shade.resource.ManifestResourceTransformer\">
<!-- 指定自己的类名 -->
<mainClass>emg.branchs.EmgFilterDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
方式1.使用sparkSQL直接连接hive
经自己测试 ,hive的 store启动不了,只启动hiveServer2,这种方式一直报错,找不到hive的元数据库
def main(args: Array[String]): Unit = {
val Array(inpath, dt, hour) = args
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
//.setMaster(\"local[*]\")
.setMaster(\"spark://192.168.40.52:7077\")
val session = SparkSession.builder()
.config(conf)
// 指定hive的 store的端口 默认为9083 在hive-site. 中查看
.config(\"hive. store.uris\", \"thrift://192.168.40.51:9083\")
//指定hive的warehouse目录
.config(\"spark.sql.warehouse.dir\", \"hdfs://192.168.40.51:9000/user/hive/warehouse\")
//直接连接hive
.enableHiveSupport()
.getOrCreate()
import session.implicits._
val df1 = session.read.parquet(inpath)
//df1.write.saveAsTable(s\"tmp.tmp_app_log_1\")
df1.createOrReplaceTempView(\"tmp_app_log_test\")
//sql的代码省略
val sql1 =
s\"\"\"
|select *
|from tmp_app_log_test
\"\"\".stripMargin
val hive_table = \"dwb2.fact_mbk_offline_log_mbk_app_action_event_v2_i_h\"
val sql2 = s\"alter table $hive_table add if not exists partition ( dt=\'$dt\',hour=\'$hour\')\"
session.sql(sql2)
val tmp_table =s\"\"\"tmp.app_log_${dt}_${hour}\"\"\"
val sql3 = s\"\"\"drop table IF EXISTS $tmp_table\"\"\".stripMargin
session.sql(sql3)
val df2 = session.sql(sql1)
//结果先写入临时表
df2.write.saveAsTable(tmp_table)
//结果从临时表写入分区表
val sql4 =
s\"\"\"INSERT OVERWRITE TABLE $hive_table
|PARTITION( dt=\'$dt\',hour=\'$hour\')
| select * from $tmp_table \"\"\".stripMargin
session.sql(sql4)
val sql5 = s\"\"\"drop table IF EXISTS $tmp_table\"\"\".stripMargin
session.sql(sql5)
}
方式2 使用jdbc连接hive
经自己测试 hive的 store启动不了 只启动hiveServer2 jdbc连接方式可以正常使用
def main(args: Array[String]): Unit = {
//经自己测试 hive的 store启动不了 只启动hiveServer2 jdbc连接方式可以正常使用
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
//.setMaster(\"local[*]\")
.setMaster(\"spark://192.168.40.**:7077\")
val session = SparkSession.builder()
.config(conf)
.getOrCreate()
//注意这里的写法 好像是hive1.3版本以上不一样了 自行百度
val url = \"jdbc:hive2://192.168.40.**:10000/emg\"
val username = \"root\"
val password = \"123456\"
val driverName = \"org.apache.hive.jdbc.HiveDriver\"
try {
Class.forName(driverName)
} catch {
case e: ClassNotFoundException =>
println(\"Missing Class\", e)
}
val con: Connection = DriverManager.getConnection(url, username, password)
val state = con.createStatement()
import session.implicits._
var paths = \"/user/emg/cxb_out/\" + CurrentTime.getMonthDate() + \"/\" + CurrentTime.getYesterday() + \"/\" + CurrentTime.getHourDate() + \"/\"
//由于hive的元数据库启动不了 连接不上 只能用jdbc的方式将结果load进hive表中
var sql2 = \"load data inpath \'\" + paths + \"\' into table result01\"
try {
val assertion = state.execute(sql2)
state.execute(sql2)
println(\"===============================存入hvie成功==========================\")
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (null != con) {
con.close()
}
}
/* val sql =
\"\"\"
|create external table zz_result(id bigint,lat float,lon float,utc bigint,tags int)
|row format delimited fields terminated by \'\\t\' location \'/user/hive/zz\'
\"\"\".stripMargin
state.executeQuery(sql)
println(\"建表成功\")
try {
val assertion = state.execute(sql)
state.execute(sql)
println(\"===============================存入hvie成功==========================\")
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (null != con) {
con.close()
}
}
*/
session.close()
}
继续阅读与本文标签相同的文章
上一篇 :
关于前端请求文件的处理
-
花旗投资现金流量预测公司Cashforce,拟新添增值服务
2026-05-18栏目: 教程
-
开发者必读 · 周报 | 003期
2026-05-18栏目: 教程
-
科技巨头正在合作解决自动驾驶标准!
2026-05-18栏目: 教程
-
人工智能帮助设计自行车并打破竞速纪录
2026-05-18栏目: 教程
-
分层存储超详细解读,为什么大数据时代它已不可或缺
2026-05-18栏目: 教程
