现在有这样一个场景,我们需要将h 做成一个数据流,而不是数据集。根据F 自带的F -H 只能帮我们做到数据集,所以这个时候选择了重写H 的数据源。

package com.yjp.f .demo11;import org.apache.f .streaming.api.functions.source.SourceFunction;import org.apache.f .table.shaded.org.joda.time.DateTime;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.h .H Configuration;import org.apache.hadoop.h .KeyValue;import org.apache.hadoop.h .client.HTable;import org.apache.hadoop.h .client.Result;import org.apache.hadoop.h .client.ResultScanner;import org.apache.hadoop.h .client.Scan;import org.apache.hadoop.h .util.Bytes;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.util.ArrayList;import java.util.List;/*** 以H 为数据源* 从H 中获取数据,然后以流的形式发射* Date : 9:50 2018/3/12*/public class H Source implements SourceFunction<String> {    private static Logger loggerFactory = LoggerFactory.getLogger(H Source.class);    private static final long serialVersionUID = 1;    private volatile boolean isRunning = true;    /**    * 开始的时间戳    */    private long startTime;    /**    * 每次查询多长时间的数据    */    private long interval;    /**    * 需要查询的列名    */    private ArrayList<String> columns;    /**    * 需要查询的表名    */    private String tableName;    public H Source(long startTime, long interval, ArrayList<String> columns, String tableName) {        this.startTime = startTime;        this.interval = interval;        this.columns = columns;        this.tableName = tableName;    }    public H Source() {    }    @Override    public void run(SourceContext<String> out) {        if (isRunning) {            long endTime = DateTime.now().getMillis() - interval;            ResultScanner rs = new H Source().getH Data(tableName, startTime, endTime - startTime, columns);            new H Source().transmitData(rs, out);            startTime = endTime;        }        while (isRunning) {            ResultScanner rs = new H Source().getH Data(tableName, startTime, interval, columns);            new H Source().transmitData(rs, out);            startTime += interval;            try {                Thread.sleep(interval);            } catch (InterruptedException e) {                throw new RuntimeException("休眠异常", e);            }        }    }    @Override    public void cancel() {    }    /**    * 获取数据集    *    * @param startTime 时间戳开始的时间    * @param interval  间隔时间    * @return 对应的结果集    */    private ResultScanner getH Data(String tableName, long startTime, long interval, List<String> columns) {        Configuration conf = H Configuration.create();        HTable table;        Scan scan;        try {            table = new HTable(conf, tableName);            scan = new Scan();            scan.setTimeRange(startTime, startTime + interval);            for (String column : columns) {                String[] columnName = column.split(":");                scan.addColumn(Bytes.toBytes(columnName[0]), Bytes.toBytes(columnName[1]));            }            return table.getScanner(scan);        } catch (IOException e) {            throw new RuntimeException("读取数据异常", e);        }    }    private void transmitData(ResultScanner rs, SourceContext<String> out) {        Result result;        try {            while ((result = rs.next()) != null && isRunning) {                KeyValue[] kvs = result.raw();                for (KeyValue kv : kvs) {                    String value = new String(kv.getValue());                    out.collect(value);                }            }        } catch (IOException e) {            throw new RuntimeException("结果集遍历异常", e);        }    }}然后将数据结果加工和处理存入H 中package com.yjp.f .h ;import org.apache.f .api.common.functions.FlatMapFunction;import org.apache.f .api.java.tuple.Tuple2;import org.apache.f .api.java.tuple.Tuple3;import org.apache.f .streaming.api.datastream.DataStream;import org.apache.f .streaming.api.datastream.DataStreamSource;import org.apache.f .streaming.api.environment.StreamExecutionEnvironment;import org.apache.f .table.api.Table;import org.apache.f .table.api.java.StreamTableEnvironment;import org.apache.f .util.Collector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.List;public class H ToH  {    public static Logger logger = LoggerFactory.getLogger(H ToH .class);    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();        StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(sEnv);        sEnv.getConfig().disableSysoutLogging();        List<String> getColumns = new ArrayList<String>(3);        getColumns.add("cf1_name");        getColumns.add("cf2_amount");        getColumns.add("cf3_groupId");        List<String> columnFamily = new ArrayList<>(3);        columnFamily.add("cf1");        columnFamily.add("cf2");        columnFamily.add("cf3");        List<String> setColumns = new ArrayList<>(3);        setColumns.add("cf2:result");        DataStreamSource<Orders>                orderDataStream = sEnv.addSource(new                H StreamDataSource("Orders", 0L, 2000L, getColumns, Orders.class));        DataStream<Tuple3<String, Double, Integer>> dataStream = orderDataStream.flatMap(                new FlatMapFunction<Orders, Tuple3<String, Double, Integer>>() {            @Override            public void flatMap(Orders value, Collector<Tuple3<String, Double, Integer>> out) throws Exception {                out.collect(new Tuple3<String, Double, Integer>(value.getCf1_name(),                        value.getCf2_amount(), value.getCf3_groupId()));            }        });        dataStream.keyBy(2).sum(1).addSink(                new SinkH <Tuple3<String, Double, Integer>>(                        "OrderResult", columnFamily, setColumns, "result"));        sEnv.execute("test H ");    }}package com.yjp.f .h ;import org.apache.f .api.java.tuple.*;import org.apache.f .configuration.Configuration;import org.apache.f .streaming.api.functions.sink.RichSinkFunction;import org.apache.hadoop.h .HColumnDe or;import org.apache.hadoop.h .HTableDe or;import org.apache.hadoop.h .TableName;import org.apache.hadoop.h .client.Admin;import org.apache.hadoop.h .client.Table;import java.lang.reflect.Method;import java.util.*;/*** 自定义Sink** Date : 17:23 2018/3/12*/public class SinkH <T> extends RichSinkFunction<T> {    private static final long serialVersionUID = 1L;    /**    * 表名    */    private String tableName;    /**    * 列族名    */    private List<String> columnFails;    /**    * 列名 以 family:column的形式传入    column与tuple中的值一一对应    */    private List<String> columns;    /**    * 行名    */    private String rowKey;    /**    * @param tableName    表名    * @param columnFamily 列族名  当表存在时不用输入    * @param columns      储存的列名 列族:列名    * @param rowKey      传入的行名    */    public SinkH (String tableName, List<String> columnFamily, List<String> columns, String rowKey) {        this.tableName = tableName;        this.columnFails = columnFamily;        this.columns = columns;        this.rowKey = rowKey;    }    /**    * @param tableName 表名    * @param columns  储存的列名 列族:列名    * @param rowKey    传入的行名    */    public SinkH (String tableName, List<String> columns, String rowKey) {        this.tableName = tableName;        this.columns = columns;        this.rowKey = rowKey;    }    public SinkH () {    }    /**    * 初始化完成连接  当表不存在的时候 新建表和family列    *    * @param parameters 调用父类的方法    * @throws Exception 创建连接失败    */    @Override    public void open(Configuration parameters) throws Exception {        super.open(parameters);        Admin admin = FactoryConnect.getConnection().getAdmin();        final TableName tableName1 = TableName.valueOf(tableName);        if (!admin.tableExists(tableName1)) {            HTableDe or hTableDe or = new HTableDe or(tableName1);            for (String columnFamily : columnFails) {                hTableDe or.addFamily(new HColumnDe or(columnFamily));            }            admin.createTable(hTableDe or);        }    }    /**    * 执行方法 将数据存入h     *    * @param value 传入的结果    */    @Override    public void invoke(T value, Context context) throws Exception {        Map<Class, Method> map = new HashMap<>(25);        new SinkH <T>().initMap(map);        Table table = FactoryConnect.getConnection().getTable(TableName.valueOf(tableName));        Set<Class> keys = map.keySet();        for (Class key : keys) {            if (value.getClass() == key) {                map.get(key).invoke(new AssignmentTuple(), value, rowKey, columns, table);                return;            }        }    }    private void initMap(Map<Class, Method> map) {        try {            map.put(Tuple1.class, AssignmentTuple.class.getMethod("setTuple1", Tuple1.class, String.class, ArrayList.class, Table.class));            map.put(Tuple2.class, AssignmentTuple.class.getMethod("setTuple2", Tuple2.class, String.class, ArrayList.class, Table.class));            map.put(Tuple3.class, AssignmentTuple.class.getMethod("setTuple3", Tuple3.class, String.class, ArrayList.class, Table.class));            map.put(Tuple4.class, AssignmentTuple.class.getMethod("setTuple4", Tuple4.class, String.class, ArrayList.class, Table.class));            map.put(Tuple5.class, AssignmentTuple.class.getMethod("setTuple5", Tuple5.class, String.class, ArrayList.class, Table.class));            map.put(Tuple6.class, AssignmentTuple.class.getMethod("setTuple6", Tuple6.class, String.class, ArrayList.class, Table.class));            map.put(Tuple7.class, AssignmentTuple.class.getMethod("setTuple7", Tuple7.class, String.class, ArrayList.class, Table.class));            map.put(Tuple8.class, AssignmentTuple.class.getMethod("setTuple8", Tuple8.class, String.class, ArrayList.class, Table.class));            map.put(Tuple9.class, AssignmentTuple.class.getMethod("setTuple9", Tuple9.class, String.class, ArrayList.class, Table.class));            map.put(Tuple10.class, AssignmentTuple.class.getMethod("setTuple10", Tuple10.class, String.class, ArrayList.class, Table.class));            map.put(Tuple11.class, AssignmentTuple.class.getMethod("setTuple11", Tuple11.class, String.class, ArrayList.class, Table.class));            map.put(Tuple12.class, AssignmentTuple.class.getMethod("setTuple12", Tuple12.class, String.class, ArrayList.class, Table.class));            map.put(Tuple13.class, AssignmentTuple.class.getMethod("setTuple13", Tuple13.class, String.class, ArrayList.class, Table.class));            map.put(Tuple14.class, AssignmentTuple.class.getMethod("setTuple14", Tuple14.class, String.class, ArrayList.class, Table.class));            map.put(Tuple15.class, AssignmentTuple.class.getMethod("setTuple15", Tuple15.class, String.class, ArrayList.class, Table.class));            map.put(Tuple16.class, AssignmentTuple.class.getMethod("setTuple16", Tuple16.class, String.class, ArrayList.class, Table.class));            map.put(Tuple17.class, AssignmentTuple.class.getMethod("setTuple17", Tuple17.class, String.class, ArrayList.class, Table.class));            map.put(Tuple18.class, AssignmentTuple.class.getMethod("setTuple18", Tuple18.class, String.class, ArrayList.class, Table.class));            map.put(Tuple19.class, AssignmentTuple.class.getMethod("setTuple19", Tuple19.class, String.class, ArrayList.class, Table.class));            map.put(Tuple20.class, AssignmentTuple.class.getMethod("setTuple20", Tuple20.class, String.class, ArrayList.class, Table.class));            map.put(Tuple21.class, AssignmentTuple.class.getMethod("setTuple21", Tuple21.class, String.class, ArrayList.class, Table.class));            map.put(Tuple22.class, AssignmentTuple.class.getMethod("setTuple22", Tuple22.class, String.class, ArrayList.class, Table.class));            map.put(Tuple23.class, AssignmentTuple.class.getMethod("setTuple23", Tuple23.class, String.class, ArrayList.class, Table.class));            map.put(Tuple24.class, AssignmentTuple.class.getMethod("setTuple24", Tuple24.class, String.class, ArrayList.class, Table.class));            map.put(Tuple25.class, AssignmentTuple.class.getMethod("setTuple25", Tuple25.class, String.class, ArrayList.class, Table.class));        } catch (NoSuchMethodException e) {            throw new RuntimeException("反射失败", e);        }    }}package com.yjp.f .h ;import org.apache.hadoop.h .H Configuration;import org.apache.hadoop.h .client.Connection;import org.apache.hadoop.h .client.ConnectionFactory;import java.io.IOException;import java.io.Serializable;/*** 单例模式 安全的拿到连接** Date : 16:45 2018/3/16*/public class FactoryConnect implements Serializable {    private static volatile Connection connection;    private FactoryConnect() {    }    public static Connection getConnection() throws IOException {        if (null == connection) {            synchronized (FactoryConnect.class) {                try {                    if (null == connection) {                        org.apache.hadoop.conf.Configuration conf = H Configuration.create();                        connection = ConnectionFactory.createConnection(conf);                    }                } catch (Exception e) {                    System.err.println("读取配置文件异常");                }            }        }        return connection;    }}package com.yjp.f .h ;import org.apache.f .api.java.tuple.*;import org.apache.hadoop.h .client.Put;import org.apache.hadoop.h .client.Table;import org.apache.hadoop.h .util.Bytes;import java.io.IOException;import java.time.Instant;import java.util.ArrayList;import java.util.List;/*** 将tuple中的存放在H 中** Date : 16:49 2018/3/12*/public class AssignmentTuple {    /**    * tuple 为1    *    * @param tuple1  传入tuple的值    * @param rowKey  传入的rowkey的值    * @param columns 需要赋值的列    * @param table  put的table对象    */    public void setTuple1(Tuple1< > tuple1, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple1, rowKey, columns, table);    }    public void setTuple2(Tuple2< ,  > tuple2, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple2, rowKey, columns, table);    }    public void setTuple3(Tuple3< ,  ,  > tuple3, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple3, rowKey, columns, table);    }    public void setTuple4(Tuple4< ,  ,  ,  > tuple4, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple4, rowKey, columns, table);    }    public void setTuple5(Tuple5< ,  ,  ,  ,  > tuple5, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple5, rowKey, columns, table);    }    public void setTuple6(Tuple6 tuple6, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple6, rowKey, columns, table);    }    public void setTuple7(Tuple7 tuple7, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple7, rowKey, columns, table);    }    public void setTuple8(Tuple8 tuple8, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple8, rowKey, columns, table);    }    public void setTuple9(Tuple9 tuple9, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple9, rowKey, columns, table);    }    public void setTuple10(Tuple10 tuple10, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple10, rowKey, columns, table);    }    public void setTuple11(Tuple11 tuple11, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple11, rowKey, columns, table);    }    public void setTuple12(Tuple12 tuple12, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple12, rowKey, columns, table);    }    public void setTuple13(Tuple13 tuple13, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple13, rowKey, columns, table);    }    public void setTuple14(Tuple14 tuple14, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple14, rowKey, columns, table);    }    public void setTuple15(Tuple15 tuple15, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple15, rowKey, columns, table);    }    public void setTuple16(Tuple16 tuple16, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple16, rowKey, columns, table);    }    public void setTuple17(Tuple17 tuple17, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple17, rowKey, columns, table);    }    public void setTuple18(Tuple18 tuple18, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple18, rowKey, columns, table);    }    public void setTuple19(Tuple19 tuple19, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple19, rowKey, columns, table);    }    public void setTuple20(Tuple20 tuple20, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple20, rowKey, columns, table);    }    public void setTuple21(Tuple21 tuple21, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple21, rowKey, columns, table);    }    public void setTuple22(Tuple22 tuple22, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple22, rowKey, columns, table);    }    public void setTuple23(Tuple23 tuple23, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple23, rowKey, columns, table);    }    public void setTuple24(Tuple24 tuple24, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple24, rowKey, columns, table);    }    public void setTuple25(Tuple25 tuple25, String rowKey, ArrayList<String> columns, Table table) {        new AssignmentTuple().putData(tuple25, rowKey, columns, table);    }    /**    * 将tuple中的数据一一对应的赋值给列    *    * @param tuple  tuple中的数据    * @param rowKey  设置的行值    * @param columns 对应的列名    * @param table  对应的table对象    */    public void putData(Tuple tuple, String rowKey, List<String> columns, Table table) {        Put put = new Put(Bytes.toBytes(rowKey));        Long timeStamp = Instant.now().toEpochMilli();        for (int i = 0; i < columns.size(); i++) {            String[] split = columns.get(i).split(":");            put.addColumn(Bytes.toBytes(split[0]), Bytes.toBytes(split[1]), timeStamp, Bytes.toBytes(tuple.getField(i).toString()));        }        try {            table.put(put);        } catch (IOException e) {            throw new RuntimeException("存放失败", e);        }    }}

为了做到一个通用的数据源和数据存储,于是采用了反射的方法。

收藏 打印