Storm集群的组件
   
    Storm集群在表面上类似于Hadoop集群。而在Hadoop上运行“MapReduce作业”,在Storm上运行“拓扑”。
    
    “作业”和“拓扑”本身是非常不同的 – 一个关键的区别是MapReduce作业完成会结束,而拓扑一直处理消息(或直到你杀死它)。
   
Storm集群上有两种节点:主节点和工作节点。
- 主节点运行一个名为“Nimbus”的守护进程,负责在集群周围分发代码,为机器分配任务以及监控故障。
- 每个工作节点都运行一个名为“Supervisor”的守护进程。Supervisor监听分配给其机器的工作,并基于Nimbus为其分配的内容,根据需要启动和停止工作进程。
每个工作进程都执行一个拓扑的一个子集,运行的拓扑由分布在许多计算机上的许多工作进程组成。
    
    
    Nimbus和Supervisors之间的所有协调都是通过Zookeeper集群完成的。此外,Nimbus守护程序和Supervisor守护程序是快速失败和无状态的; 所有状态都保存在Zookeeper或本地磁盘上。这意味着你可以杀死-9 Nimbus或者主管,他们会像没事一样重新开始。这种设计使Storm集群非常稳定。
   
    
    
    拓扑Topology
   
拓扑是计算图。拓扑中的每个节点都包含处理逻辑,节点之间的链接指示数据应如何在节点之间传递。
    
    
    流Streams
   
Storm中的核心抽象是“流”。流是一个无限的元组序列。Storm提供了以分布式和可靠的方式将流转换为新流的原语。例如,您可以将推文流转换为趋势主题流。
Storm为进行流转换提供的基本原语是“spouts”和“bolt”。
- Spout是拓扑中数据流的来源。例如,spout可以读取Kestrel队列中的元组并将其作为流发出。
- Bolt对接收到的数据流进行处理,并可能发出新的流。Bolts可以执行任何操作,包括运行函数,过滤元组,进行流聚合,进行流连接,与数据库对话等等。
    spout和bolt网络被打包成一个“拓扑”,这是提交给Storm集群执行的顶级抽象。拓扑是流转换的图形,其中每个节点都是一个喷Spout或Bolt。图中的边缘表示哪些Bolt订阅了哪些流。当一个spout或bolt向一个流发出一个元组时,它会将元组发送给订阅该流的每个bolt。
    
    
    
    拓扑中节点之间的链接表明了元组传递的方向。
   
Storm拓扑中的每个节点并行执行。 在拓扑中,可以为每个节点指定所需的并行度,然后Storm将在集群中生成该数量的线程以执行。
拓扑永远运行,或直到你杀死它。 Storm会自动重新分配任何失败的任务。 此外,Storm保证不会丢失数据,即使计算机出现故障并且消息丢失也是如此。
    
    
    数据模型
   
    Storm使用元组作为其数据模型。
    
    元组是一个命名的值列表,元组中的字段可以是任何类型的对象。开箱即用,Storm支持所有原始类型,字符串和字节数组作为元组字段值。要使用其他类型的对象,您只需要为该类型实现一个序列化程序。
   
拓扑中的每个节点都必须声明它发出的元组的输出字段。例如,这个bolt声明它发出2元组,字段为“double”和“triple”:
public class DoubleAndTripleBolt extends BaseRichBolt {
    ......
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    
}
该declareOutputFields函数声明[“double”, “triple”]组件的输出字段.
    
    
    一个简单的拓扑
   
TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new TestWordSpout(), 10);        
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
此代码使用setSpout和setBolt方法定义节点。这些方法将用户指定的id,包含处理逻辑的对象以及节点所需的并行数量作为输入。在这个例子中:
- Spout被赋予id “words”,Bolt分别被赋予id “exclaim1”和“exclaim2”。
- 包含处理逻辑的对象(如new TestWordSpout())可实现Spout接口IRichSpout和Bolt接口IRichBolt。
- 最后一个参数是节点所需的并行度,是可选的。它指示应在群集中执行该组件的线程数。如果省略它,Storm将只为该节点分配一个线程。
    
     setBolt
    
    返回一个InputDeclarer对象,用于定义Bolt的输入。这里,组件“exclaim1”声明它想要使用shuffle分组读取组件“words”发出的所有元组,组件“exclaim2”声明它想要使用shuffle分组读取组件“exclaim1”发出的所有元组。
   
- 
 “shuffleGrouping”意味着元组从输入任务随机分配到bolt的任务中。
- 
 如果你想要组件“exclaim2”来读取组件“words”和组件“exclaim1”发出的所有元组,你可以像这样编写组件“exclaim2”的定义:builder.setBolt("exclaim2", new ExclamationBolt(), 5) .shuffleGrouping("words") .shuffleGrouping("exclaim1");
如您所见,可以链接输入声明以指定Bolt的多个源。
    
     Spouts负责从外界读取消息,然后向拓扑中发送新消息
    
    。
    
    TestWordSpout在这种拓扑结构中,每隔100ms就会从列表[“nathan”,“mike”,“jackson”,“golda”,“bertels”]中发出一个随机单词。nextTuple()TestWordSpout中的实现如下所示:
   
  public static class TestWordSpout implements IRichBolt {
    SpoutOutputCollector _collector;
     
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            _collector = collector;
        }
        
    @Override
    public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}
    @Override
    public void cleanup() {
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
    .......
}
- 
     
 open方法是初始化方法,在拓扑运行过程,只被调用一次
 
 。元组可以随时从Spout发出,在方法open,execute等。
 
 此open只是将SpoutOutputCollector作为实例变量保存,以便稍后在execute方法中使用。
 
 
 * Map conf:其实里面保存的是topology的一些配置信息
 
 * TopologyContext context:topology的上下文,类似于servletcontext
 
 * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
- 
     
 nextTuple方法
 
 是Spout用于发送元祖的方法,方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内。 每调用一次,会向外发射一条数据。方法中的 _collector.emit(new Values(word))中发送的数据new Values(word),要与 declarer.declare(new Fields(“word”));
ExclamationBolt追加字符串“!!!” 。让我们来看看完整的实现ExclamationBolt:
 public static class ExclamationBolt implements IRichBolt {
    OutputCollector _collector;
        
    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }
    @Override
    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }
    @Override
    public void cleanup() {
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
- 
     
 prepare方法
 
 为Bolt节点
 
 提供
 
 了一个用于从该Bolt中发射元组的
 
 OutputCollector
 
 ,在拓扑中只被调用一次。元组可以随时从Bolt发出,在方法prepare,execute或cleanup方法,甚至是在另一个异步的线程。
 
 此prepare实现只是将OutputCollector作为实例变量保存,以便稍后在execute方法中使用。
 
- 
     
 execute方法****从一个Bolt的输入接收一个元组并处理,在拓扑运行中一直被调用
 
 。该ExclamationBolt抓取元祖的第一个字段并发出一个“!!!” 附加到它的新的元组。如果您实现了一个订阅多个输入源的bolt,您可以通过使用该Tuple#getSourceComponent方法找出Tuple来自哪个组件。该execute方法还有一些其他的处理: **_collector.emit(
 
 tuple
 
 , new Values(tuple.getString(0) + “!!!”))**把输入元组作为第一个参数传递,
 
 输入元组在最后一行 _collector.ack(tuple)被确认
 
 ,这些是Storm的可靠性API的一部分,用于保证不会丢失数据。
- 
     
 cleanup方法
 
 在Bolt被关闭时调用,并且清除所有打开的资源。无法保证在集群上调用此方法:例如,如果正在运行任务的计算机爆炸,则无法调用该方法。该cleanup方法适用于在本地模式下运行拓扑(在进程中模拟Storm集群),并且您希望能够运行并终止许多拓扑而不会遭受任何资源泄漏。
- 
     
 declareOutputFields方法
 
 声明该节点发出的数据字段,如ExclamationBolt发出1元组,此字段称为“word”。
- 
     
 getComponentConfiguration方法
 
 允许您配置此组件运行方式的各个方面。
在Bolt实现中通常不需要cleanup和getComponentConfiguration方法。您可以更简洁地定义Bolt。ExclamationBolt可以通过扩展来更简洁地书写BaseRichBolt,如下所示:
    public static class ExclamationBolt extends BaseRichBolt {
    
    
    OutputCollector _collector;
   
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    _collector = collector;
}
@Override
public void execute(Tuple tuple) {
    _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
    _collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
}    
}
    
    
    在本地模式下运行Topology
   
    Storm有两种操作模式:本地模式和分布式模式。在本地模式下,Storm通过使用线程模拟工作节点来完全执行。本地模式对于拓扑的测试和开发很有用。
    
    topologyTest.java
   
public class topologyTest{
     public static void main(String[] args) {
        //组装topology
        TopologyBuilder builder = new TopologyBuilder();   
             
        builder.setSpout("words", new TestWordSpout(), 10);        
        builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("words");
        builder.setBolt("exclaim2", new ExclamationBolt(), 2) .shuffleGrouping("exclaim1");
        
        try{
            //创建本地storm集群
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("sumTopology", new Config(), builder.createTopology());
        }catch(TException e){
            e.printStackTrace();
        }catch (Exception e){
            e.printStackTrace();
        }
      }
   }
    
    
    流分组
   
    
     流分组告诉拓扑如何在两个组件之间发送元组
    
    。请记住,spouts和bolt在集群中并行执行任意数量的任务。如果您查看拓扑在任务级别的执行情况,它看起来像这样:
    
     
   
- 
 当一个Bolt A的任务向Bolt B发出一个元组时,它应该将该元组发送给哪个任务?
- 
“流分组”通过告诉Storm如何在多组任务之间发送元组来回答这个问题。 
    
    
    用其他语言定义Bolt
   
Bolt可以用任何语言定义。用另一种语言编写的螺栓作为子进程执行,Storm通过stdin / stdout与这些子进程通过JSON消息进行通信。通信协议只需要一个~100行适配器库,而Storm附带适用于Ruby,Python和Fancy的适配器库。
以下是SplitSentence螺栓的定义WordCountTopology:
public static class SplitSentence extends ShellBolt implements IRichBolt {
    public SplitSentence() {
        super("python", "splitsentence.py");
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}
SplitSentence覆盖ShellBolt并声明它python与参数一起运行splitsentence.py。这是实施splitsentence.py:
import storm
class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])
SplitSentenceBolt().run()
    
    
    保证消息处理
   
Storm的可靠性API的一部分:Storm如何保证从喷口流出的每条消息都将得到完全处理。请参阅保证消息处理,以获取有关其工作原理的信息以及您作为用户必须执行的操作以利用Storm的可靠性功能。
    
    
    Trident
   
Storm保证每条消息至少会通过拓扑处理一次。一个常见的问题是“你怎么做像在Storm上计数那样的事情?难道你不会重复计算?” Storm有一个名为Trudent的更高级别的API,它允许您为大多数计算实现一次一次的消息传递语义。
    
    
    分布式RPC
   
    本教程展示了如何在Storm之上进行基本的流处理。使用Storm的原语可以做更多的事情。Storm最有趣的应用之一是Distributed RPC,您可以在其中并行化强大功能的计算。
    
    结论
    
    本教程概述了开发,测试和部署Storm拓扑。其余的文档深入探讨了使用Storm的所有方面。
   
 
