Hive谓词解析过程分析

小编 2026-07-05 阅读:816 评论:0
where col1 = 100 and abs(col2) > 0在Hive中的处理过程 where过滤条件称为谓词predicate。 以上where过滤条件在经过Hive的语法解析...

where col1 = 100 and abs(col2) > 0在Hive中的处理过程

where过滤条件称为谓词predicate。

以上where过滤条件在经过Hive的语法解析后,生成如下的语法树:

TOK_WHERE
AND
=
TOK_TABLE_OR_COL
c1
100
>
TOK_FUNCTION
ABS
TOK_TABLE_OR_COL
c2
0
有了语法树之后,最终的目的是生成predicate每个节点对应的ExprNodeDesc,即描述对应的节点:

public Map<ASTNode, ExprNodeDesc> genAllExprNodeDesc(ASTNode expr, RowResolver input,
TypeCheckCtx tcCtx) throws SemanticException {

Map<ASTNode, ExprNodeDesc> nodeOutputs =
TypeCheckProcFactory.genExprNode(expr, tcCtx);

生成的过程是对上述语法树的一个深度优先遍历的过程,Hive中大量对树的遍历的代码,在遍历过程中根据指定的规则或对语法树进行修改,或输出相应的结果。

Hive中有一个默认的深度优先遍历的实现DefaultGraphWalker。

这个遍历器的实现部分代码如下:

public void walk(Node nd) throws SemanticException {
// Push the node in the stack
opStack.push(nd);

// While there are still nodes to dispatch...
while (!opStack.empty()) {
  Node node = opStack.peek();

  if (node.getChildren() == null ||
          getDispatchedList().containsAll(node.getChildren())) {
    // Dispatch current node
    if (!getDispatchedList().contains(node)) {
      dispatch(node, opStack);
      opQueue.add(node);
    }
    opStack.pop();
    continue;
  }

  // Add a single child and restart the loop
  for (Node childNode : node.getChildren()) {
    if (!getDispatchedList().contains(childNode)) {
      opStack.push(childNode);
      break;
    }
  }
} // end while

}
先将当前节点放到待处理的栈opStack中,然后从opStack取节点出来,如果取出来的节点没有Children,或者Children已经全部处理完毕,才对当前节点进行处理(dispatch),如果当前节点有Children且还没有处理完,则将当前节点的Children放到栈顶,然后重新从栈中取节点进行处理。这是很基础的深度优先遍历的实现。

那在遍历的过程中,如何针对不同的节点进行不同的处理呢?

在遍历之前,先预置一些针对不同的节点不同规则的处理器,然后在遍历过程中,通过分发器Dispatcher选择最合适的处理器进行处理。

生成ExprNodeDesc的遍历中一共先预置了8个规则Rule,每个规则对应一个处理器NodeProcessor:

Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();

opRules.put(new RuleRegExp(\"R1\", HiveParser.TOK_NULL + \"%\"),
    tf.getNullExprProcessor());
opRules.put(new RuleRegExp(\"R2\", HiveParser.Number + \"%|\" +
    HiveParser.TinyintLiteral + \"%|\" +
    HiveParser.SmallintLiteral + \"%|\" +
    HiveParser.BigintLiteral + \"%|\" +
    HiveParser.DecimalLiteral + \"%\"),
    tf.getNumExprProcessor());
opRules
    .put(new RuleRegExp(\"R3\", HiveParser.Identifier + \"%|\"
    + HiveParser.StringLiteral + \"%|\" + HiveParser.TOK_CHARSETLITERAL + \"%|\"
    + HiveParser.TOK_STRINGLITERALSEQUENCE + \"%|\"
    + \"%|\" + HiveParser.KW_IF + \"%|\" + HiveParser.KW_CASE + \"%|\"
    + HiveParser.KW_WHEN + \"%|\" + HiveParser.KW_IN + \"%|\"
    + HiveParser.KW_ARRAY + \"%|\" + HiveParser.KW_MAP + \"%|\"
    + HiveParser.KW_STRUCT + \"%|\" + HiveParser.KW_EXISTS + \"%|\"
    + HiveParser.KW_GROUPING + \"%|\"
    + HiveParser.TOK_SUBQUERY_OP_NOTIN + \"%\"),
    tf.getStrExprProcessor());
opRules.put(new RuleRegExp(\"R4\", HiveParser.KW_TRUE + \"%|\"
    + HiveParser.KW_FALSE + \"%\"), tf.getBoolExprProcessor());
opRules.put(new RuleRegExp(\"R5\", HiveParser.TOK_DATELITERAL + \"%|\"
    + HiveParser.TOK_TIMESTAMPLITERAL + \"%\"), tf.getDateTimeExprProcessor());
opRules.put(new RuleRegExp(\"R6\",
    HiveParser.TOK_INTERVAL_YEAR_MONTH_LITERAL + \"%|\"
    + HiveParser.TOK_INTERVAL_DAY_TIME_LITERAL + \"%|\"
    + HiveParser.TOK_INTERVAL_YEAR_LITERAL + \"%|\"
    + HiveParser.TOK_INTERVAL_MONTH_LITERAL + \"%|\"
    + HiveParser.TOK_INTERVAL_DAY_LITERAL + \"%|\"
    + HiveParser.TOK_INTERVAL_HOUR_LITERAL + \"%|\"
    + HiveParser.TOK_INTERVAL_MINUTE_LITERAL + \"%|\"
    + HiveParser.TOK_INTERVAL_SECOND_LITERAL + \"%\"), tf.getIntervalExprProcessor());
opRules.put(new RuleRegExp(\"R7\", HiveParser.TOK_TABLE_OR_COL + \"%\"),
    tf.getColumnExprProcessor());
opRules.put(new RuleRegExp(\"R8\", HiveParser.TOK_SUBQUERY_OP + \"%\"),
    tf.getSubQueryExprProcessor());

这里使用的分发器Dispatcher是DefaultRuleDispatcher,DefaultRuleDispatcher选择处理器的逻辑如下:

// find the firing rule
// find the rule from the stack specified
Rule rule = null;
int minCost = Integer.MAX_VALUE;
for (Rule r : procRules.keySet()) {
  int cost = r.cost(ndStack);
  if ((cost >= 0) && (cost <= minCost)) {
    minCost = cost;
    rule = r;
  }
}

NodeProcessor proc;

if (rule == null) {
  proc = defaultProc;
} else {
  proc = procRules.get(rule);
}

// Do nothing in case proc is null
if (proc != null) {
  // Call the process function
  return proc.process(nd, ndStack, procCtx, nodeOutputs);
} else {
  return null;
}

遍历所有的规则Rule,调用每个规则的cost方法计算cost,找其中cost最小的规则对应的处理器,如果没有找到,则使用默认处理器,如果没有设置默认处理器,则不做任何事情。

那么每个规则的cost是如何计算的?

– 没太看懂==|| (后续再理理)

WHERE条件语法树每个节点对应的处理器如下:

TOK_WHERE
AND --> TypeCheckProcFactory.DefaultExprProcessor
= --> TypeCheckProcFactory.DefaultExprProcessor
TOK_TABLE_OR_COL --> TypeCheckProcFactory.ColumnExprProcessor
c1 --> TypeCheckProcFactory.StrExprProcessor
100 --> TypeCheckProcFactory.NumExprProcessor
> --> TypeCheckProcFactory.DefaultExprProcessor
TOK_FUNCTION --> TypeCheckProcFactory.DefaultExprProcessor
ABS --> TypeCheckProcFactory.StrExprProcessor
TOK_TABLE_OR_COL --> TypeCheckProcFactory.ColumnExprProcessor
c2 --> TypeCheckProcFactory.StrExprProcessor
0 --> TypeCheckProcFactory.NumExprProcessor

TypeCheckProcFactory.StrExprProcessor 生成ExprNodeConstantDesc
TypeCheckProcFactory.ColumnExprProcessor 处理column,生成ExprNodeColumnDesc
TypeCheckProcFactory.NumExprProcessor生成ExprNodeConstantDesc
TypeCheckProcFactory.DefaultExprProcessor生成ExprNodeGenericFuncDesc
在深度优先遍历完WHERE语法树后,每个节点都会生成一个ExprNodeDesc,但是其实除了最顶层的AND节点生成的ExprNodeDesc有用,其他的节点生成的都是中间结果,最终都会包含在AND节点生成的ExprNodeDesc中。所以在遍历WHERE树后,通过AND节点生成的ExprNodeDesc构造FilterDesc:

new FilterDesc(genExprNodeDesc(condn, inputRR, useCaching), false)
有了FilterDesc后,就能够构造出FilterOperator了,然后再将生成的FilterOperator加入到Operator树中:

Operator ret = get((Class) conf.getClass());
ret.setConf(conf);
至此,where过滤条件对应的FilterOperator构造完毕。

接下来仔细看下AND生成的ExprNodeDesc,它其实是一个ExprNodeGenericFuncDesc:

// genericUDF是GenericUDFOPAnd,就是对应AND操作符
private GenericUDF genericUDF;
// AND是一个二元操作符,children里存的是对应的操作符
// 根据WHERE语法树,可以知道children[0]肯定又是一个ExprNodeGenericFuncDesc,而且是一个=函 // 数,而children[1]也是一个肯定又是一个ExprNodeGenericFuncDesc,而且是一个>函数,以此类 // 推,每个ExprNodeGenericFuncDesc都有对应的children
private List chidren;
// UDF的名字,这里是and
private transient String funcText;
/**

  • This class uses a writableObjectInspector rather than a TypeInfo to store
  • the canonical type information for this NodeDesc.
    */
    private transient ObjectInspector writableObjectInspector;
    每个ExprNodeDesc都对应有一个ExprNodeEvaluator,来对每个ExprNodeDesc进行实际的计算。看下ExprNodeEvaluator类的基本方法:

public abstract class ExprNodeEvaluator {
// 对应的ExprNodeDesc
protected final T expr;
// 在经过这个Evaluator计算后,它的输出值该如何解析的ObjectInspector
protected ObjectInspector outputOI;

/**

  • Initialize should be called once and only once. Return the ObjectInspector
  • for the return value, given the rowInspector.
  • 初始化方法,传入一个ObjectInspector,即传入的数据应该如何解析的ObjectInspector
  • 而需要返回经过这个Evaluator计算后的输出值的解析ObjectInspector
    */
    public abstract ObjectInspector initialize(ObjectInspector rowInspector) throws HiveException;

// evaluate方法,调用来对row数据进行解析
public Object evaluate(Object row) throws HiveException {
return evaluate(row, -1);
}

/**

  • Evaluate the expression given the row. This method should use the
  • rowInspector passed in from initialize to inspect the row object. The
  • return value will be inspected by the return value of initialize.
  • If this evaluator is referenced by others, store it for them
    */
    protected Object evaluate(Object row, int version) throws HiveException {
    if (version < 0 || version != this.version) {
    this.version = version;
    return evaluation = _evaluate(row, version);
    }
    return evaluation;
    }
    // 由各个子类实现的方法的_evaluate方法,结合上面的evaluate方法,这里实际使用了设计模式的模板 // 方法模式
    protected abstract Object _evaluate(Object row, int version) throws HiveException;

    }
    通过ExprNodeEvaluatorFactory获取到每个ExprNodeDesc对应的ExprNodeEvaluator:

public static ExprNodeEvaluator get(ExprNodeDesc desc) throws HiveException {
// Constant node
if (desc instanceof ExprNodeConstantDesc) {
return new ExprNodeConstantEvaluator((ExprNodeConstantDesc) desc);
}

// Column-reference node, e.g. a column in the input row
if (desc instanceof ExprNodeColumnDesc) {
  return new ExprNodeColumnEvaluator((ExprNodeColumnDesc) desc);
}
// Generic Function node, e.g. CASE, an operator or a UDF node
if (desc instanceof ExprNodeGenericFuncDesc) {
  return new ExprNodeGenericFuncEvaluator((ExprNodeGenericFuncDesc) desc);
}
// Field node, e.g. get a.myfield1 from a
if (desc instanceof ExprNodeFieldDesc) {
  return new ExprNodeFieldEvaluator((ExprNodeFieldDesc) desc);
}
throw new RuntimeException(
    \"Cannot find ExprNodeEvaluator for the exprNodeDesc = \" + desc);

}
看下FilterOperator中如何使用ExprNodeEvaluator对数据进行过滤的。

首先在FilterOperator的initializeOp方法中,获取到ExprNodeEvaluator:

conditionEvaluator = ExprNodeEvaluatorFactory.get(conf.getPredicate());
然后在process方法中,调用initialize方法后,调用eveluate方法获取到整个where过滤的结果:

conditionInspector = (PrimitiveObjectInspector) conditionEvaluator
.initialize(rowInspector);

Object condition = conditionEvaluator.evaluate(row);

Boolean ret = (Boolean) conditionInspector
.getPrimitiveJavaObject(condition);
// 如果结果是true,则forward到下一个operator继续处理
if (Boolean.TRUE.equals(ret)) {
forward(row, rowInspector);
}
再来看下GenericUDFOPAnd的evaluate方法实现:

@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
boolean bool_a0 = false, bool_a1 = false;
Object a0 = arguments[0].get();
if (a0 != null) {
bool_a0 = boi0.get(a0);
if (bool_a0 == false) {
result.set(false);
return result;
}
}

Object a1 = arguments[1].get();
if (a1 != null) {
  bool_a1 = boi1.get(a1);
  if (bool_a1 == false) {
    result.set(false);
    return result;
  }
}

if ((a0 != null && bool_a0 == true) && (a1 != null && bool_a1 == true)) {
  result.set(true);
  return result;
}

return null;

}
从以上代码知道,在进行AND的计算时,如果左边条件返回false,则不会进行右边条件的计算,所以AND的顺序其实是影响实际的效率的。类似的还有OR也是一样的,如果左边条件返回true,则不会进行右边条件的计算。

版权声明

本文仅代表作者观点,不代表百度立场。
本文系作者授权百度百家发表,未经许可,不得转载。

热门文章
  • Sequential Monte Carlo Methods (SMC) 序列蒙特卡洛/粒子滤波/Bootstrap Filtering

    Sequential Monte Carlo Methods (SMC) 序列蒙特卡洛/粒子滤波/Bootstrap Filtering
    Problem Statement 我们考虑一个具有马尔可夫性质、非线性、非高斯的状态空间模型(State Space Model):对于一个时间序列上的观测结果{yt,t∈N}\\{ y_t , t \\in N \\}{yt​,t∈N},我们认为每个观测结果yty_tyt​的生成依赖于一个无法直接观察的隐变量xt∈{xt,t∈N}x_t \\in \\{x_t , t \\in N \\}xt​∈{xt​,t∈N},即:p(...
  • 机房智能化温湿度解决方式之POE供电以太网温湿度传感器

    机房智能化温湿度解决方式之POE供电以太网温湿度传感器
    机房智能化温湿度解决方式之POE供电以太网温湿度传感器 北京盈创力和电子科技有限公司 智能型TCP网口温湿度记录仪 北京IP网络温湿度记录仪厂家,北京盈创力和 北京智能型TCP网口温湿度记录仪IP网络温湿度记录仪是一种新型的基于TCP/IP协议双绞线以太网标准温湿度采集模块,利用它可以实现现场温度值、相对湿度值的采集,同时利用其自身的RJ45通信接口可以方便地和机房监控主机或交换机集线器进行联网。 工作于-40℃~85℃工业级带...
  • Hive 系统函数及示例

    Hive 系统函数及示例
    查看所有系统函数 show functions; 函数分类 内置函数【系统函数】 数学函数: floor、round、ceil、cos、log2等 字符串函数: length、reverse、trim、lower、get_json_object、repeat等 收集函数: size 转换函数: cast 日期函数: year、month、datediff、date、date_add等 条件函数: coalesce、case…w...
  • HTTP状态保持的原理

    HTTP状态保持的原理
    a)在用户登录之后,浏览器返回响应的时候会在响应中添加上cookieb)浏览器接收到cookie之后会自动保存c)当用户再次请求同一服务器中的其他网页的时候,浏览器会自动带上之前保存的cookied)服务接收到请求之后可以请 request 对象中取到cookie 判断当前用户是否登录  Http是无状态的,就是连接时数据互通,关闭后...
  • CSRF的原理和防范措施

    CSRF的原理和防范措施
    a)攻击原理:i.用户C访问正常网站A时进行登录,浏览器保存A的cookieii.用户C再访问攻击网站B,网站B上有某个隐藏的链接或者图片标签会自动请求网站A的URL地址,例如表单提交,传指定的参数iii.而攻击网站B在访问网站A的时候,浏览器会自动带上网站A的cookieiv.所以网站A在接收到请求之后可判断当前用户是登录状态,所以...
标签列表