S4(Simple Scalable Stream System) 流数据处理系统是Yahoo!公司提出的,在2011年的时候成为Apache软件基金下的一个孵化项目,可惜的是在2014年的时候该孵化项目“退休”了,具体原因未知!!从这里可以了解它当前的状态信息: . 阅读了所发表的论文S4:Distributed Stream Computing Platform之后,发现该系统模型有其独特之处,总结起来就是

  • 灵活
  • 系统架构模型和编程接口简单
  • 可拓展性强。

虽然该项目已经退休,但是因为其”系统架构模型和编程接口简单“,其论文通俗易懂,并且源码开放,因此决定从系统模型架构和源码两个方面较为深入地学习它。

接下来先讲系统模型架构,然后针对几个问题通过源码来寻找答案,以此来了解S4的实现过程。

系统模型架构

\"在这里插入图片描述\"

S4系统把流处理的逻辑过程表示成一个或者多个有向无环图,如上图所示,在图中有两个关键组建,Stream和ProcessingElement(PE)。在上图中Stream表示成带箭头的线,负责数据的传递;PE表示出一个节点,代表一个处理单元,也是用户业务逻辑所在地。Stream中传递的主要为以Event为基类的对象,也就是事件对象。什么是事件对象呢?从编程实现的角度来讲,一个事件对象有其时间和其他属性,这里的时间通常表示时间所发现的时间。每个Event对象能够通过三个字段信息一起确定其所属的Stream,分别是1. Event的类型,在Java中表示为继承了Event类的子类的Class类型,2. Event的Key值,这个可以为null,3. 事件的Value值。这里的Key和Value跟Hadoop的Key和Value是一个设计思想。举一个例子,看上面的图片,WordEvent统一表示单词这样的事件类型,因为单词事件类型中有很多中不同的单词,因此使用Key来表示每一个单词,使用Value表示每种单词的个数。

PE作为用户的逻辑处理单元,需要用户编程实现或者重写下面几个主要方法。系统会为用户隐藏其他细节,比如PE处理如何接收数据,如何发送处理完的中间数据等等。

	@Override
    protected void onCreate() {}
    @Override
    protected void onRemove() {}
	public void onEvent(Event event) {}
	public void onTrigger(Event event) {}

这样的系统模型,看起来非常简单且灵活,但是需要考虑并解决一些问题。
一个Stream只会传输一种类型的Event对象或者同种类型相同Key的Event对象。一个PE可能接收一个或者多个Stream的数据。然后可能向一个或者多个PE发送处理过的Event数据。一台机器中可能有一个或者多个PE,整个系统使用Zookeeper来协调整合。

下面这张图片是系统的整体架构图,Processing Node表示一台机器,在机器接收到一个事件之后,需要根据事件的类型和key值来判断该事件应该由哪一个PE来处理。在PE处理之后,由Emiter发送。了解了系统模型架构,下面通过源码来学习具体的实现。

\"在这里插入图片描述\"

源码下载地址:download, 使用Intelij IDEA导入。

主要包含三个小模块,分别为s4- , s4-comm和s4-core,第二个模块用于数据传输,所以我们的源码基本指涉及到s4- 和s4-core.

programer的编程模型

在看源码之前,非常有必要了解编程人员如何编程构建和使用PE。用户需要继承ProcessingElement类,然后覆写和实现onEvent和onTrigger等方法。其中onEvent是事件到来时的处理逻辑,onTrigger是要触发输出处理结果的逻辑。触发条件有两个,一个是事件数量触发,另一个是时间触发。在下面的代码中,在onEvent中,每来一个事件就计数加一,在onTrigger中,每来一个事件就将结果放到输出流中。

public class CounterPE extends ProcessingElement {
    private Stream<CountEvent>[] countStream;
    public CounterPE(App app) {
        super(app);
    }
    public Stream<CountEvent>[] getCountStream() {
        return countStream;
    }
    public void setCountStream(Stream<CountEvent>... countStream) {
        this.countStream = countStream;
    }
    private long counter = 0;
    public void onEvent(Event event) {
        counter += 1;
    }
    public void onTrigger(Event event) {
        CountEvent countEvent = new CountEvent(getId(), counter);
        emit(countEvent, countStream);
    }
    @Override
    protected void onCreate() {
    }
    @Override
    protected void onRemove() {
    }
}

系统实现逻辑

  • Event类是所有事件类的基类,是该系统的基础类。一个事件主要包含事件和其他保存在Map中的属性。
public class Event {
    final private long time;
    private String streamName;
    private int appId;
    private Map<String, Data<?>> map;

在Stream中,Event对象进一步被封装成EventMessage对象,顾名思义,EventMessage对象类似于一个消息报,相比于Event对象,其主要是增加了数据在不同PE,不同机器上面传输所需要的定位属性。从下面源码的定义看出,一个EventMessage能够根据appName和streamName来定位数据的接收者。

public class EventMessage {
    private String appName;
    private String streamName;
    private byte[] serializedEvent;

一个Stream对象只容纳一种类型的事件数据,使用的数据结构为ArrayBlockingQueue阻塞队列,

protected final BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);

其特点是,当队列为空的时候,如果要获取,阻塞take的线程,直到有新的数据;如果队列满了,阻塞put的线程,直到队列不满。

  • Stream类主要字段为如下所示,具体请看注释。一个stream只存储一种类型的事件数据,保存在一个阻塞队列中,专门使用一个线程不断地读取队列中地数据并处理。下面我们来具体看Stream类如何将事件数据保存到队列中,以及如何读取和处理队列中地数据。
public class Stream<T extends Event> implements Runnable, Streamable {
    protected Key<T> key; //该对象定义了获取Event对象的key的方法
    private ProcessingElement[] targetPEs;//一个Stream中的数据可能流向多个PE
    protected final BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);
    private Thread thread;//新开一个线程不断地从阻塞队列中读取事件,并处理
    private Class<T> eventType = null;//保存该Stream的事件类型

Stream作为连接不同PE或者说不同节点的组建,所以它具有两个重要的功能,1是接收数据,2是发送由PE处理后的数据。

  • 接收数据
    Stream中的接收event的方法很简单,只是简单地把event对象放入阻塞队列中。因为会有很多个Stream对象,因此使用了Reciever类来统一管理所有的Stream类的receiveEvent方法。也就是下面的receiveEvent方法在Reciever方法中被调用。
 public void receiveEvent(EventMessage event) {
        try {
            queue.put(event);
        } catch (InterruptedException e) {
            logger.error(\"Interrupted while waiting to put an event in the queue: {}.\", e.getMessage());
            Thread.currentThread().interrupt();
        }
    }

Reciever类实现了Runnable接口,下面的方法是Reciever类的run方法。在该方法中,首先通过appName和streamName来找到对应的Stream对象,然后调用其receiveEvent方法。

public void run() {
        // TODO: this thread never seems to get interrupted. SHould we catch an interrupted exception from listener
        // here?
        while (!Thread.interrupted()) {
            byte[] message = listener.recv();
            EventMessage event = (EventMessage) serDeser.deserialize(message);

            int appId = Integer.valueOf(event.getAppName());
            String streamId = event.getStreamName();
            /*
             * Match appId and streamId in event to the target stream and pass the event to the target stream. TODO:
             * make this more efficient for the case in which we send the same event to multiple PEs.
             */
            try {
                streams.get(appId).get(streamId).receiveEvent(event);
            } catch (NullPointerException e) {
                logger.error(\"Could not find target stream for event with appId={} and streamId={}\", appId, streamId);
            }
        }
    }

小结:统一使用Reciever来接收event对象,通过appName和streamName找到对应的Stream对象,然后调用Stream对象的recieveEvent方法,将event对象放入queue中。

  • 发送数据

下面的函数用于发送一个Event对象给拓扑图中下一层节点,这里的下一层节点有可能包含当前Stream类的阻塞队列queue。Stream中的put方法表示了该发送的逻辑。在该方法中,首先设置事件的streamName和App Name。然后检测该Stream是否跟远程的节点关联,如果没有,直接把event放入当前的queue中,如果有,使用sender.sendToRemotePartitions(event)来讲event发送到远程队列中,同时put到本地的当前的queue中。下面将会讲sender.sendToRemotePartitions(event)方法。

 public void put(Event event) {
        try {
            event.setStreamId(getName());
            event.setAppId(app.getId());
            /*
             * Events may be sent to local or remote partitions or both. The following code implements the logic.
             */
            if (key != null) {
                /*
                 * We send to a specific PE instance using the key but we don\'t know if the target partition is remote
                 * or local. We need to ask the sender.
                 */
                if (!sender.checkAndSendIfNotLocal(key.get((T) event), event)) {
                    /*
                     * Sender checked and decided that the target is local so we simply put the event in the queue and
                     * we save the trip over the network.
                     */
                    queue.put(new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), app
                            .getSerDeser().serialize(event)));
                }
            } else {
                /*
                 * We are broadcasting this event to all PE instance. In a cluster, we need to send the event to every
                 * node. The sender method takes care of the remote partitions an we take care of putting the event into
                 * the queue.
                 */
                sender.sendToRemotePartitions(event);
                queue.put(new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), app.getSerDeser()
                        .serialize(event)));
            }
        } catch (InterruptedException e) {
            logger.error(\"Interrupted while waiting to put an event in the queue: {}.\", e.getMessage());
            Thread.currentThread().interrupt();
        }
}

该方法用于讲一个event对象发送到拓扑图中的下一层的节点。如何定位下一层的节点呢?这里只用到了AppID (跟App Name相同)和stream Name,这两个信息便可以定位了。每一个Event对象都包含这两个属性。

emitter.send方法将event对象发送到下一个远程节点,目前有TCP和UDP两种实现。

 public void sendToRemotePartitions(Event event) {
        for (int i = 0; i < emitter.getPartitionCount(); i++) {
            /* Don\'t use the comm   when we send to the same partition. */
            if (localPartitionId != i)
                emitter.send(i, new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), serDeser  .serialize(event)));
        }
    }

讲到这里,有一个问题,Stream类的发送evnet对象的方法put方法最后由谁来调用呢?答案是先由PE来调用,最终由代码开发人员调用。下面是ProcessingElement类emit方法的代码。该方法没有使用到PE类的成员变量,这个需要注意,一个PE类没有将Stream作为成员变量。emit方法只是简单地遍历所传入的Stream对象数组,然后调用它们的put方法,该方法在上面讲过。而最终这个emit方法将由用户调用,在其所继承的ProcessingElement类中,具体代码例子请看上一节内容。

	//Helper method to be used by PE implementation classes. Sends an event to all the target streams.
    protected <T extends Event> void emit(T event, Stream<T>[] streamArray) {
        for (int i = 0; i < streamArray.length; i++) {
            streamArray[i].put(event);
        }
    }

我们知道了一个流如何获取event数据以及如何发送event数据,现在数据已经保存在一个阻塞队列中了,那么它是怎么从队列中获取数据然后处理的呢?

Event数据处理

Stream类中有一个成员变量Thread thread, 用于不断地从queue中获取和处理数据。因此,处理逻辑放在run方法中,下面是run方法源码,其主要是采用阻塞的方式不断从queue中获取数据,然后调用相应的PE的pe.handleInputEvent(event)方法。那么pe.handleInputEvent(event);方法又做了什么呢?

 @Override
    public void run() {
        while (true) {
            try {
                /* Get oldest event in queue. */
                EventMessage eventMessage = queue.take();

                @SuppressWarnings(\"unchecked\")
                T event = (T) app.getSerDeser().deserialize(eventMessage.getSerializedEvent());

                /* Send event to each target PE. */
                for (int i = 0; i < targetPEs.length; i++) {

                    if (key == null) {

                        /* Broadcast to all PE instances! */

                        /* STEP 1: find all PE instances. */

                        Collection<ProcessingElement> pes = targetPEs[i].getInstances();

                        /* STEP 2: iterate and pass event to PE instance. */
                        for (ProcessingElement pe : pes) {

                            pe.handleInputEvent(event);
                        }

                    } else {

                        /* We have a key, send to target PE. */

                        /* STEP 1: find the PE instance for key. */
                        ProcessingElement pe = targetPEs[i].getInstanceForKey(key.get(event));

                        /* STEP 2: pass event to PE instance. */
                        pe.handleInputEvent(event);
                    }
                }

            } catch (InterruptedException e) {
                logger.info(\"Closing stream {}.\", name);
                receiver.removeStream(this);
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

PE的handleInputEvent方法主要做了两件事,1是在第一次调用该方法的时候,判断是否有该PE的持久化历史纪录,如果有的话就恢复;2是调用OverloadDispatcher类的dispatchEvent和dispatchTrigger两个方法。这两个方法将调用由用户实现的onEvent和onTrigger方法。onEvent和onTrigger方法在上一节讲过,它们是用户的逻辑处理单元。那么,dispatchEvent和dispatchTrigger两个方法是什么样子的呢?

protected void handleInputEvent(Event event) {

           ;
        if (isThreadSafe) {
              = new  (); // a dummy   TODO improve this.
        } else {
              = this;
        }
        synchronized ( ) {
            if (!recoveryAttempted) {
                recover();
                recoveryAttempted = true;
            }

            /* Dispatch onEvent() method. */
            overloadDispatcher.dispatchEvent(this, event);

            /* Dispatch onTrigger() method. */
            if (haveTriggers && isTrigger(event)) {
                overloadDispatcher.dispatchTrigger(this, event);
            }

            eventCount++;

            dirty = true;

            if (isCheckpointable()) {
                checkpoint();
            }
        }
    }

进入OverloadDispatcher 的定义,发现它是一个接口,而且没有找到实现类。有这么一行注释Implementations of this interface are typically generated at runtime.。原来该接口的实现类是动态实现的。使用的第三方工具包为org. web.asm。我们现在已经明确,下面的两个方法将会调用由用户定义的onEvent和onTrigger方法,调用的方法体是动态实现的。那么问题有:为什么使用动态的方式生成它们的实现方法呢?

public interface OverloadDispatcher {
    public void dispatchEvent(ProcessingElement pe, Event event);
    public void dispatchTrigger(ProcessingElement pe, Event event);
}

在回答这个问题之前,我们需要考虑另外一个问题,编程人员定义了一个继承了ProcessingElement的类MyPE,实现了onEvent和onTrigger方法,S4系统编程人员在发布系统之前是不知道用户的PE继承类的,系统如何知道所继承的类是MyPE,并且如何找到并加载MyPE类并调用onEvent和onTrigger方法呢?为了解决这个问题,S4使用了动态生成代码的方法。动态生成代码的类为org.apache.s4.core.gen.OverloadDispatcherGenerator。这样动态生成实现类的方式到底优雅不优雅呢?性能如何?有待考量。

多线程的使用

系统中使用到的多线程主要有:

  • 读取Stream类中的阻塞队列中的数据的时候,使用了专门的线程
  • Reciever类实现了Runnable接口,用于接收数据,并把保存到对应的Stream中。
  • 发送数据的时候,使用到了NIO之Socket NIO,使用到了Netty框架。

关键组建的关系

一个Stream对象包含多个目标PE,对于每一个到来的event对象,根据事件类型或者事件类型+事件的key来确定相应的PE。然后调用PE的处理逻辑方法。

Reciever对象保存了单个node下的所有的Stream对象引用,它统一接收event数据,然后根据事件类型或者事件类型+事件的key来确定相应的Stream对象。

问题

  • 问题: 一个PE能否处理多中Event?
    回答:一个PE只能处理一种类型的Event,但是可以处理同种类型的不同key的event。

  • 问题:如果按照上面第一张图片所示的例子,从上往下第二次拓扑中,一个单词生成对应的一个PE,那么最终会生成大量的PE,所占用的内存也是很可观的。有一些PE在生成之后很可能之用到了几次,一直占用这内存。

  • Event类中使用到了Map作为成员函数,而map在put一个数据之后就会一个默认初始化大小,如果map中只有一个数据,那么意味着存在多个空的内存占用。导致一个Event的内存使用率很低。这样的设计是不是有失优雅呢?

public class Event {
   final private long time;
   private String streamName;
   private int appId;
   private Map<String, Data<?>> map;

谢谢!

收藏 打印