下载驱动:https://pan.baidu.com/s/1sV4XZbbmYtC0pAO6tewMTg

功能:将mysql中的数据表结构,自动在MPPDB中按照MPPDB语法批量创建表。

package com.epoint.HadoopAPIDemo;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.ResultSet Data;import java.sql.SQLException;import java.sql.Statement;public class MPPTestCreateTable {    private static String MYSQLUSERNAME = "root";    private static String MYSQLPASSWORD = "Gepoint";    private static String MYSQLDRIVER = "com.mysql.jdbc.Driver";    private static String MYSQLURL = "jdbc:mysql://100.2.5.221:3307/dep_fr_db";    private static String MYSQLDATA  = "dep_fr_db";    private static String MPPDRIVER = "com.MPP.jdbc.Driver";    private static String MPPURL = "jdbc:MPP://100.2.5.1:5258/";    private static String MPPUSERNAME = "mpp";    private static String MPPPASSWORD = "h3c";    Connection mysqlconn = null;    Statement mysqlpstm = null;    ResultSet mysqlrs = null;    Connection mppconn = null;    Statement mppstm = null;    ResultSet mpprs = null;    String sql1 = " ";    String sql2 = " ";    String sql3 = " ";    String sql4 = " ";    String sql5 = " ";    String sql6 = " ";    public static void main(String[] args) throws Exception {        MPPTestCreateTable aidth = new MPPTestCreateTable();        aidth.getMYSQLConnection();        aidth.MYSQLReleaseResource();        aidth.getMPPConnection();        aidth.MPPReleaseResource();        aidth.CreateMPPTable();//      aidth.ImportDataToMPP();        System.out.println("程序已经执行完毕!请去waterdrop验证结果吧!!");    }    public void CreateMPPTable() {        mysqlconn = getMYSQLConnection();        mppconn = getMPPConnection();        try {            mppstm = mppconn.createStatement();            mysqlpstm = mysqlconn.createStatement();            int i = 0;                String sql = "SELECT table_schema
" +                         "   ,table_name
" +                         "   ,(
" +                         "       CASE 
" +                         "           WHEN ORDINAL_POSITION = mincol
" +                         "               AND ORDINAL_POSITION < maxcol
" +                         "               THEN CONCAT ("create  table "
" +                         "                       ,table_schema
" +                         "                       ,"."
" +                         "                       ,table_name
" +                         "                       ,"(`"
" +                         "                       ,column_name
" +                         "                       ,"` "
" +                         "                       ,COLUMN_TYPE
" +                         "                       ,","
" +                         "                       )
" +                         "           WHEN ORDINAL_POSITION = mincol
" +                         "               AND ORDINAL_POSITION = maxcol
" +                         "               THEN CONCAT ("create  table "
" +                         "                       ,table_schema
" +                         "                       ,"."
" +                         "                       ,table_name
" +                         "                       ,"(`"
" +                         "                       ,column_name
" +                         "                       ,"` "
" +                         "                       ,COLUMN_TYPE
" +                         "                       ,");"
" +                         "                       )
" +                         "           WHEN ORDINAL_POSITION > mincol
" +                         "               AND ORDINAL_POSITION < maxcol
" +                         "               THEN CONCAT (
" +                         "                       "`"
" +                         "                       ,column_name
" +                         "                       ,"` "
" +                         "                       ,COLUMN_TYPE
" +                         "                       ,","
" +                         "                       )
" +                         "           WHEN ORDINAL_POSITION = maxcol
" +                         "               THEN CONCAT (
" +                         "                       "`"
" +                         "                       ,column_name
" +                         "                       ,"` "
" +                         "                       ,COLUMN_TYPE
" +                         "                       ,");"
" +                         "                       )
" +                         "           END
" +                         "       ) AS statement
" +                         "   ,ORDINAL_POSITION
" +                         "   ,maxcol
" +                         "   ,mincol
" +                         "FROM (
" +                         "   SELECT b.table_schema,b.table_name,b.ORDINAL_POSITION,b.column_name,
" +                         "   (case
" +                         "   when column_type = 'timestamp' then 'datetime'
" +                         "   when column_type = 'bit(1)' then 'int(1)'
" +                         "   else
" +                         "       column_type
" +                         "   end ) AS column_type
" +                         "       ,a.maxcol
" +                         "       ,a.mincol
" +                         "   FROM (
" +                         "       SELECT table_schema
" +                         "           ,table_name
" +                         "           ,max(ORDINAL_POSITION) maxcol
" +                         "           ,min(ORDINAL_POSITION) mincol
" +                         "       FROM information_schema.COLUMNS
" +                         "       GROUP BY table_schema
" +                         "           ,table_name
" +                         "       ) a
" +                         "   JOIN (
" +                         "       SELECT table_schema
" +                         "           ,table_name
" +                         "           ,ORDINAL_POSITION
" +                         "           ,column_name
" +                         "           ,COLUMN_TYPE
" +                         "       FROM information_schema.COLUMNS
" +                         "       ORDER BY table_schema
" +                         "           ,table_name
" +                         "           ,ORDINAL_POSITION ASC
" +                         "       ) b ON a.table_schema = b.table_schema
" +                         "       AND a.table_name = b.table_name
" +                         "   ) c
" +                         "WHERE table_schema = '"+MYSQLDATA +"'";                mysqlrs = mysqlpstm.executeQuery(sql);                while (mysqlrs.next()) {                    sql1 = mysqlrs.getString(3);                    sql2 = sql2 + sql1;                }            sql3 = "create data  IF NOT EXISTS " + MYSQLDATA ;            mppstm.execute(sql3);            System.out.println("-------------------建mpp表,表结构的语句为:" + sql2);            String[] sqls=sql2.split(";");            for (String m : sqls) {                mppstm.execute(m);            }            System.out.println("----------------------------------------建mpp表已结束!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");            mppstm.close();            mysqlpstm.close();        } catch (SQLException e) {            e.printStackTrace();        } finally {            MYSQLReleaseResource();            MPPReleaseResource();        }    }    public void ImportDataToMPP() {        mysqlconn = getMYSQLConnection();        mppconn = getMPPConnection();        String sql = "select table_name from user_tables where num_rows > 0 order by table_name asc";        int i = 0;        try {            mysqlpstm = mysqlconn.createStatement();            mysqlrs = mysqlpstm.executeQuery(sql);            mppstm = mppconn.createStatement();            while (mysqlrs.next()) {                i = i + 1;                String table_name = mysqlrs.getString("table_name").replaceAll("\$", "");                String sql7 = "insert into " + MYSQLDATA  + "." + table_name + " select * from " + MYSQLDATA                         + "_ex." + table_name;                System.out.println("现在插入第"+i+"个表:"+sql7);                mppstm.execute(sql7);            }        } catch (SQLException e) {            e.printStackTrace();        } finally {            MYSQLReleaseResource();            MPPReleaseResource();        }    }    public Connection getMYSQLConnection() {        try {            Class.forName(MYSQLDRIVER);            mysqlconn = DriverManager.getConnection(MYSQLURL, MYSQLUSERNAME, MYSQLPASSWORD);        } catch (ClassNotFoundException e) {            throw new RuntimeException("class not find !", e);        } catch (SQLException e) {            throw new RuntimeException("get connection error!", e);        }        return mysqlconn;    }    public void MYSQLReleaseResource() {        if (mysqlrs != null) {            try {                mysqlrs.close();            } catch (SQLException e) {                e.printStackTrace();            }        }        if (mysqlpstm != null) {            try {                mysqlpstm.close();            } catch (SQLException e) {                e.printStackTrace();            }        }        if (mysqlconn != null) {            try {                mysqlconn.close();            } catch (SQLException e) {                e.printStackTrace();            }        }    }    public Connection getMPPConnection() {        try {            Class.forName(MPPDRIVER);            mppconn = DriverManager.getConnection(MPPURL, MPPUSERNAME, MPPPASSWORD);        } catch (ClassNotFoundException e) {            throw new RuntimeException("class not find !", e);        } catch (SQLException e) {            throw new RuntimeException("get connection error!", e);        }        return mppconn;    }    public void MPPReleaseResource() {        if (mpprs != null) {            try {                mpprs.close();            } catch (SQLException e) {                e.printStackTrace();            }        }        if (mppstm != null) {            try {                mppstm.close();            } catch (SQLException e) {                e.printStackTrace();            }        }        if (mppconn != null) {            try {                mppconn.close();            } catch (SQLException e) {                e.printStackTrace();            }        }    }}
收藏 打印