[27]Presto Event Listener Plugin开发简述

小编 2026-06-14 阅读:990 评论:0
一、Event Listener presto事件监听器Event Listener,作为plugin监听以下事件: Query creation 查询建立相关信息 Query comple...

一、Event Listener

presto事件监听器Event Listener,作为plugin监听以下事件:

  • Query creation
    查询建立相关信息
  • Query completion (success or failure)
    查询执行相关信息,包含成功查询的细节信息,失败查询的错误码等信息
  • Split completion (success or failure)
    split执行信息,同理包含成功和失败的细节信息.

备注

一个presto集群中只能有一个Event Listener plugin

二、Implementation

如何实现一个Event Listener:

1)通过EventListenerFactory建立EventListener
2)实现EventListener,根据需要实现所需方法

public interface EventListener
{
    default void queryCreated(QueryCreatedEvent queryCreatedEvent)
    {
    }

    default void queryCompleted(QueryCompletedEvent queryCompletedEvent)
    {
    }

    default void splitCompleted(SplitCompletedEvent splitCompletedEvent)
    {
    }
}

其中:

  • 事件 QueryCreatedEvent 包含query建立的详细信息
  • 事件 QueryCompletedEvent 包含query执行的详细信息
  • 事件 SplitCompletedEvent 包含split执行的详细信息

备注
如果希望将自定义的异常信息监听至 QueryCompletedEvent中的failureInfo QueryFailureInfo中,自定义异常需要继承spi中标准异常,如PrestoException

3)实现一个Plugin,实现 getEventListenerFactories() 方法
Plugin的使用方式,可参见Presto函数开发简述.
4) 添加配置信息etc/event-listener.properties.
event-listener.properties 至少应配置event-listener.name , presto结合该属性和EventListenerFactory.getName() 中获得的值找到事件监听插件
Example configuration file:

event-listener.name=custom-event-listener
custom-property1=custom-value1
custom-property2=custom-value2  

三、Example

由于集群运维治理需要,有时需要将presto执行日志分类落盘。这里简单给出一个日志事件监听器插件的样例。
Presto函数开发简述 所述,插件的实现有两种方式:
1)独立工程,jar上传至安装包安装包根目录下的plguin下,重启机器激活
2)源码注入(在源码中新增module,配置maven打包方式,直接将plugin打包至安装包根目录下的plguin下)
这里使用第二种方法。

  • 步骤1
    root的pom中新增一个事件module,如:
 <module>presto-event-listener</module>

  • 步骤2
    在新增的module中实现Plugin,EventListenerFactory,EventListener

LogEventPlugin

public class LogEventPlugin
        implements Plugin
{
    @Override
    public Iterable<EventListenerFactory> getEventListenerFactories()
    {
        EventListenerFactory listenerFactory = new LogEventListenerFactory();
        return ImmutableList.of(listenerFactory);
    }
}

LogEventListenerFactory

public class LogEventListenerFactory
        implements EventListenerFactory
{
    private ImmutableMap<String, String> config;

    @Override
    public String getName()
    {
        return \"event-listener\";
    }

    @Override
    public EventListener create(Map<String, String> config)
    {
        this.config = ImmutableMap.copyOf(config);
        if (!config.containsKey(\"log.path\")) {
            throw new RuntimeException(\"/etc/event-listener.properties file missing log.path\");
        }
        // 这里初始化log4j2基本配置
        Configuration logConfiguration = Log4j2ConfigurationFactory.createConfiguration(\"Log4j2\", config.get(\"log.path\")));
        Configurator.initialize(logConfiguration);
        return new LogEventListener(config);
    }
}

LogEventListener

public class LogEventListener
        implements EventListener
{
    private static final Logger LOGGER = LogManager.getLogger(LogEventListener.class);
    private ImmutableMap<String, String> config;

    public LogEventListener(Map<String, String> config)
    {
        this.config = ImmutableMap.copyOf(config);
    }

    /* query建立的详细信息
    @Override
    public void queryCreated(QueryCreatedEvent queryCreatedEvent)
    {
    }*

    // query执行信息,这里我们将执行失败的查询具体信息,通过日志落盘用于治理分析  
    @Override 
    public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
    {
        if (queryCompletedEvent.getFailureInfo().isPresent()) {
            LOGGER.info(\"Query执行异常: \"
                    + \" QueryId: \" + queryCompletedEvent.getMetadata().getQueryId()
                    + \" QuerySql: \" + queryCompletedEvent.getMetadata().getQuery()
                    + \" FailureInfo: \" + queryCompletedEvent.getFailureInfo().get().getFailuresJson());
            // 这里根据需要添加报警通知信息
            // 调用相应的定义服务
        }
    }

    /*
    // split执行的详细信息 
    @Override
    public void splitCompleted(SplitCompletedEvent splitCompletedEvent)
    {
    }
    */
}

Log4j2ConfigurationFactory
在这个定义类中参阅Log4j2官方文档Programmatic Configuration,使用ConfigurationBuilder 实现所需的日志配置,返回一个Configuration即可,这里不详述,具体可参阅该文档。

properties文件
etc/event-listener.properties 下新建文件,添加所需配置信息 ,如:

# 强制参数
event-listener.name=event-listener
# log path
log.path=/xxxx/logs 

四、本地debug说明

若想本地debug,则在presto-main下的etc目录添加event-listener.properties,同时再congfig.properties,添加插件配置信息即可(过程同Presto函数开发简述 ):

plugin.bundles=\\
     。。。。
          ../presto-log-listener/pom.xml, \\
     。。。。
版权声明

本文仅代表作者观点,不代表百度立场。
本文系作者授权百度百家发表,未经许可,不得转载。

热门文章
  • 机房智能化温湿度解决方式之POE供电以太网温湿度传感器

    机房智能化温湿度解决方式之POE供电以太网温湿度传感器
    机房智能化温湿度解决方式之POE供电以太网温湿度传感器 北京盈创力和电子科技有限公司 智能型TCP网口温湿度记录仪 北京IP网络温湿度记录仪厂家,北京盈创力和 北京智能型TCP网口温湿度记录仪IP网络温湿度记录仪是一种新型的基于TCP/IP协议双绞线以太网标准温湿度采集模块,利用它可以实现现场温度值、相对湿度值的采集,同时利用其自身的RJ45通信接口可以方便地和机房监控主机或交换机集线器进行联网。 工作于-40℃~85℃工业级带...
  • Sequential Monte Carlo Methods (SMC) 序列蒙特卡洛/粒子滤波/Bootstrap Filtering

    Sequential Monte Carlo Methods (SMC) 序列蒙特卡洛/粒子滤波/Bootstrap Filtering
    Problem Statement 我们考虑一个具有马尔可夫性质、非线性、非高斯的状态空间模型(State Space Model):对于一个时间序列上的观测结果{yt,t∈N}\\{ y_t , t \\in N \\}{yt​,t∈N},我们认为每个观测结果yty_tyt​的生成依赖于一个无法直接观察的隐变量xt∈{xt,t∈N}x_t \\in \\{x_t , t \\in N \\}xt​∈{xt​,t∈N},即:p(...
  • HTTP状态保持的原理

    HTTP状态保持的原理
    a)在用户登录之后,浏览器返回响应的时候会在响应中添加上cookieb)浏览器接收到cookie之后会自动保存c)当用户再次请求同一服务器中的其他网页的时候,浏览器会自动带上之前保存的cookied)服务接收到请求之后可以请 request 对象中取到cookie 判断当前用户是否登录  Http是无状态的,就是连接时数据互通,关闭后...
  • Hive 系统函数及示例

    Hive 系统函数及示例
    查看所有系统函数 show functions; 函数分类 内置函数【系统函数】 数学函数: floor、round、ceil、cos、log2等 字符串函数: length、reverse、trim、lower、get_json_object、repeat等 收集函数: size 转换函数: cast 日期函数: year、month、datediff、date、date_add等 条件函数: coalesce、case…w...
  • CSRF的原理和防范措施

    CSRF的原理和防范措施
    a)攻击原理:i.用户C访问正常网站A时进行登录,浏览器保存A的cookieii.用户C再访问攻击网站B,网站B上有某个隐藏的链接或者图片标签会自动请求网站A的URL地址,例如表单提交,传指定的参数iii.而攻击网站B在访问网站A的时候,浏览器会自动带上网站A的cookieiv.所以网站A在接收到请求之后可判断当前用户是登录状态,所以...
标签列表