代码实例
package com.epoint.com.mysql_mpp_full;import java.io.FileNotFoundException;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.ResultSet Data;import java.sql.SQLException;import java.sql.Statement;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.TimeZone;public class AutoMysqltoMPP { private static String MYSQLUSERNAME = "root"; private static String MYSQLPASSWORD = "Gepoint"; private static String MYSQLDRIVER = "com.mysql.jdbc.Driver"; private static String MYSQLURL = "jdbc:mysql://100.2.5.221:3307/dep_fr_db"; private static String MYSQLDATA = "dep_fr_db"; private static String MPPDRIVER = "com.MPP.jdbc.Driver"; private static String MPPURL = "jdbc:MPP://100.2.5.1:5258/dep_fr_db"; private static String MPPUSERNAME = "mpp"; private static String MPPPASSWORD = "h3c"; private static Connection mysqlconn = null; private static Statement mysqlpstm = null; private static ResultSet mysqlrs = null; private static Connection mppconn = null; private static Statement mppstm = null; private static ResultSet mpprs = null; String sql1 = " "; String sql2 = " "; String sql3 = " "; String sql4 = " "; String sql5 = " "; String sql6 = " "; public static void main(String[] args) throws Exception { AutoMysqltoMPP aidth = new AutoMysqltoMPP(); aidth.getMYSQLConnection(); aidth.MYSQLReleaseResource(); aidth.getMPPConnection(); aidth.MPPReleaseResource(); aidth.CreateMPPTable(); // aidth.ImportDataToMPP(); System.out.println("表已创建完毕,赶紧去查看表吧!!"); SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss:SS"); TimeZone t = sdf.getTimeZone(); t.setRawOffset(0); sdf.setTimeZone(t); Long startTime = System.currentTimeMillis(); // 此段为要放置测取时间的函数 mysqlconn = getMYSQLConnection(); mppconn = getMPPConnection(); mppstm = mppconn.createStatement(); mysqlpstm = mysqlconn.createStatement(); mppstm.execute("TRUNCATE table code_main"); aidth.tableInput(); Long endTime = System.currentTimeMillis(); System.out.println("用时:" + sdf.format(new Date(endTime - startTime))); } public void CreateMPPTable() { mysqlconn = getMYSQLConnection(); mppconn = getMPPConnection(); try { mppstm = mppconn.createStatement(); mysqlpstm = mysqlconn.createStatement(); int i = 0; String sql = "SELECT table_schema
" + " ,table_name
" + " ,(
" + " CASE
" + " WHEN ORDINAL_POSITION = mincol
" + " AND ORDINAL_POSITION < maxcol
" + " THEN CONCAT ("create table if not exists "
" + " ,table_schema
" + " ,"."
" + " ,table_name
" + " ,"(`"
" + " ,column_name
" + " ,"` "
" + " ,COLUMN_TYPE
" + " ,","
" + " )
" + " WHEN ORDINAL_POSITION = mincol
" + " AND ORDINAL_POSITION = maxcol
" + " THEN CONCAT ("create table if not exists "
" + " ,table_schema
" + " ,"."
" + " ,table_name
" + " ,"(`"
" + " ,column_name
" + " ,"` "
" + " ,COLUMN_TYPE
" + " ,");"
" + " )
" + " WHEN ORDINAL_POSITION > mincol
" + " AND ORDINAL_POSITION < maxcol
" + " THEN CONCAT (
" + " "`"
" + " ,column_name
" + " ,"` "
" + " ,COLUMN_TYPE
" + " ,","
" + " )
" + " WHEN ORDINAL_POSITION = maxcol
" + " THEN CONCAT (
" + " "`"
" + " ,column_name
" + " ,"` "
" + " ,COLUMN_TYPE
" + " ,");"
" + " )
" + " END
" + " ) AS statement
" + " ,ORDINAL_POSITION
" + " ,maxcol
" + " ,mincol
" + "FROM (
" + " SELECT b.table_schema,b.table_name,b.ORDINAL_POSITION,b.column_name,
" + " (case
" + " when column_type = 'timestamp' then 'datetime'
" + " when column_type = 'bit(1)' then 'int(1)'
" + " else
" + " column_type
" + " end ) AS column_type
" + " ,a.maxcol
" + " ,a.mincol
" + " FROM (
" + " SELECT table_schema
" + " ,table_name
" + " ,max(ORDINAL_POSITION) maxcol
" + " ,min(ORDINAL_POSITION) mincol
" + " FROM information_schema.COLUMNS
" + " GROUP BY table_schema
" + " ,table_name
" + " ) a
" + " JOIN (
" + " SELECT table_schema
" + " ,table_name
" + " ,ORDINAL_POSITION
" + " ,column_name
" + " ,COLUMN_TYPE
" + " FROM information_schema.COLUMNS
" + " ORDER BY table_schema
" + " ,table_name
" + " ,ORDINAL_POSITION ASC
" + " ) b ON a.table_schema = b.table_schema
" + " AND a.table_name = b.table_name
" + " ) c
" + "WHERE table_schema = '" + MYSQLDATA + "'"; mysqlrs = mysqlpstm.executeQuery(sql); while (mysqlrs.next()) { sql1 = mysqlrs.getString(3); sql2 = sql2 + sql1; } sql3 = "create data IF NOT EXISTS " + MYSQLDATA ; mppstm.execute(sql3); System.out.println("-------------------建mpp表,表结构的语句为:" + sql2); String[] sqls = sql2.split(";"); for (String m : sqls) { mppstm.execute(m); } System.out.println("----------------------------------------建mpp表已结束!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); mppstm.close(); mysqlpstm.close(); } catch (SQLException e) { e.printStackTrace(); } finally { MYSQLReleaseResource(); MPPReleaseResource(); } } public void ImportDataToMPP() { mysqlconn = getMYSQLConnection(); mppconn = getMPPConnection(); String sql = "select table_name from user_tables where num_rows > 0 order by table_name asc"; int i = 0; try { mysqlpstm = mysqlconn.createStatement(); mysqlrs = mysqlpstm.executeQuery(sql); mppstm = mppconn.createStatement(); while (mysqlrs.next()) { i = i + 1; String table_name = mysqlrs.getString("table_name").replaceAll("\$", ""); String sql7 = "insert into " + MYSQLDATA + "." + table_name + " select * from " + MYSQLDATA + "_ex." + table_name; System.out.println("现在插入第" + i + "个表:" + sql7); mppstm.execute(sql7); } } catch (SQLException e) { e.printStackTrace(); } finally { MYSQLReleaseResource(); MPPReleaseResource(); } } public static List<List<String>> tableInput() throws FileNotFoundException, SQLException { List<List<String>> FindList = new ArrayList<List<String>>(); mysqlconn = getMYSQLConnection(); mppconn = getMPPConnection(); mppstm = mppconn.createStatement(); mysqlpstm = mysqlconn.createStatement(); PreparedStatement pre = null; ResultSet resultSet = null; String sql = "SELECT CODEID,CODENAME,LEVNUM,CATEGORYNUM,de ion,isfromsoa FROM code_main"; try { pre = mysqlconn.prepareStatement(sql); resultSet = pre.executeQuery(); String[] columu = { "CODEID","CODENAME","LEVNUM","CATEGORYNUM","de ion","isfromsoa"}; int i = 0; while (resultSet.next()) { List<String> minList = new ArrayList<String>(); for (String each : columu) { minList.add(resultSet.getString(each)); } FindList.add(minList); i++; if (i % 10000 == 0) { // 设置的每次提交大小为10000 executeManySql(FindList); FindList.removeAll(FindList); System.out.println(i); } } executeManySql(FindList);// 最后别忘了提交剩余的 return FindList; } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { try { pre.close();// 关闭Statement } catch (SQLException e) { e.printStackTrace(); } try { mppstm.close(); mysqlpstm.close(); } catch (SQLException e) { e.printStackTrace(); } } return null; }//executeManySql(FindList)函数如下,因为数据量比较大,所以我设置的每次提交大小为10000,这样就不会内存溢出了。 public static void executeManySql(List<List<String>> FindList) throws SQLException { mysqlconn = getMYSQLConnection(); mppconn = getMPPConnection(); mppstm = mppconn.createStatement(); mysqlpstm = mysqlconn.createStatement(); mppconn.setAutoCommit(false); Statement stat = null; PreparedStatement pst = (PreparedStatement) mppconn .prepareStatement("insert into code_main values (?,?,?,?,?,?)"); for (List<String> minList : FindList) { for (int i = 0; i < minList.size(); i++) { pst.setString(i + 1, minList.get(i)); } // 把一个SQL命令加入命令列表 pst.addBatch(); } // 执行批量更新 pst.executeBatch(); // 语句执行完毕,提交本事务 mppconn.commit(); pst.close(); mppstm.close(); mysqlpstm.close();// 一定要记住关闭连接,不然mysql回应为too many connection自我保护而断开。 }//同时我还设置了计时的函数,可以看到这个从数据抽取到完成数据迁移的时间。 public static Connection getMYSQLConnection() { try { Class.forName(MYSQLDRIVER); mysqlconn = DriverManager.getConnection(MYSQLURL, MYSQLUSERNAME, MYSQLPASSWORD); } catch (ClassNotFoundException e) { throw new RuntimeException("class not find !", e); } catch (SQLException e) { throw new RuntimeException("get connection error!", e); } return mysqlconn; } public void MYSQLReleaseResource() { if (mysqlrs != null) { try { mysqlrs.close(); } catch (SQLException e) { e.printStackTrace(); } } if (mysqlpstm != null) { try { mysqlpstm.close(); } catch (SQLException e) { e.printStackTrace(); } } if (mysqlconn != null) { try { mysqlconn.close(); } catch (SQLException e) { e.printStackTrace(); } } } public static Connection getMPPConnection() { try { Class.forName(MPPDRIVER); mppconn = DriverManager.getConnection(MPPURL, MPPUSERNAME, MPPPASSWORD); } catch (ClassNotFoundException e) { throw new RuntimeException("class not find !", e); } catch (SQLException e) { throw new RuntimeException("get connection error!", e); } return mppconn; } public void MPPReleaseResource() { if (mpprs != null) { try { mpprs.close(); } catch (SQLException e) { e.printStackTrace(); } } if (mppstm != null) { try { mppstm.close(); } catch (SQLException e) { e.printStackTrace(); } } if (mppconn != null) { try { mppconn.close(); } catch (SQLException e) { e.printStackTrace(); } } }} 继续阅读与本文标签相同的文章
上一篇 :
监控目录中文件上传HDFS中
下一篇 :
PHP代码的解析过程
-
让身边河流变清澈,“小河长”面向杭州中小学招募啦
2026-05-25栏目: 教程
-
揭秘阿里机器翻译团队:拿下5项全球冠军,每天帮商家翻译7.5亿次
2026-05-25栏目: 教程
-
专访刘津:唯有快速成长,让我们不再焦虑
2026-05-25栏目: 教程
-
使用 TensorFlow 和 DLTK 进行生物医学影像分析
2026-05-25栏目: 教程
-
DataV 4.0 功能简介
2026-05-25栏目: 教程
