storm安装与kafka整合

  • Post author:
  • Post category:其他


1.storm是什么

1.1storm的介绍

  • storm是twitter公司开源贡献给apache的一款实时流式处理的一个开源软件,主要用于解决数据的实时计算以及实时的处理等方面的问题

1.2storm的特点

  • Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流。Storm有很多使用场景:如实时分析,在线机器学习,持续计算,分布式RPC,ETL等等。Storm支持水平扩展,具有高容错性,保证每个消息都会得到处理,而且处理速度很快(在一个小集群中,每个结点每秒可以处理数以百万计的消息)。Storm的部署和运维都很便捷,而且更为重要的是可以使用任意编程语言来开发应用。Storm有如下特点:


编程模型简单

  • 在大数据处理方面相信大家对hadoop已经耳熟能详,基于Google Map/Reduce来实现的Hadoop为开发者提供了map、reduce原语,使并行批处理程序变得非常地简单和优美。同样,Storm也为大数据的实时计算提供了一些简单优美的原语,这大大降低了开发并行实时处理的任务的复杂性,帮助你快速、高效的开发应用。


可扩展

  • 在Storm集群中真正运行topology的主要有三个实体:工作进程、线程和任务。Storm集群中的每台机器上都可以运行多个工作进程,每个工作进程又可创建多个线程,每个线程可以执行多个任务,任务是真正进行数据处理的实体,我们开发的spout、bolt就是作为一个或者多个任务的方式执行的。因此,计算任务在多个线程、进程和服务器之间并行进行,支持灵活的水平扩展。


高可靠性

  • Storm可以保证spout发出的每条消息都能被“完全处理”,这也是直接区别于其他实时系统的地方,如S4。请注意,spout发出的消息后续可能会触发产生成千上万条消息,可以形象的理解为一棵消息树,其中spout发出的消息为树根,Storm会跟踪这棵消息树的处理情况,只有当这棵消息树中的所有消息都被处理了,Storm才会认为spout发出的这个消息已经被“完全处理”。如果这棵消息树中的任何一个消息处理失败了,或者整棵消息树在限定的时间内没有“完全处理”,那么spout发出的消息就会重发。考虑到尽可能减少对内存的消耗,Storm并不会跟踪消息树中的每个消息,而是采用了一些特殊的策略,它把消息树当作一个整体来跟踪,对消息树中所有消息的唯一id进行异或计算,通过是否为零来判定spout发出的消息是否被“完全处理”,这极大的节约了内存和简化了判定逻辑,后面会对这种机制进行详细介绍。这种模式,每发送一个消息,都会同步发送一个ack/fail,对于网络的带宽会有一定的消耗,如果对于可靠性要求不高,可通过使用不同的emit接口关闭该模式。上面所说的,Storm保证了每个消息至少被处理一次,但是对于有些计算场合,会严格要求每个消息只被处理一次,幸而Storm的0.7.0引入了事务性拓扑,解决了这个问题,后面会有详述。


高容错性

  • 如果在消息处理过程中出了一些异常,Storm会重新安排这个出问题的处理单元。Storm保证一个处理单元永远运行(除非你显式杀掉这个处理单元)。当然,如果处理单元中存储了中间状态,那么当处理单元重新被Storm启动的时候,需要应用自己处理中间状态的恢复。


支持多种编程语言

  • 除了用java实现spout和bolt,你还可以使用任何你熟悉的编程语言来完成这项工作,这一切得益于Storm所谓的多语言协议。多语言协议是Storm内部的一种特殊协议,允许spout或者bolt使用标准输入和标准输出来进行消息传递,传递的消息为单行文本或者是json编码的多行。Storm支持多语言编程主要是通过ShellBolt,ShellSpout和ShellProcess这些类来实现的,这些类都实现了IBolt 和 ISpout接口,以及让shell通过java的ProcessBuilder类来执行脚本或者程序的协议。可以看到,采用这种方式,每个tuple在处理的时候都需要进行json的编解码,因此在吞吐量上会有较大影响。


支持本地模式

  • Storm有一种“本地模式”,也就是在进程中模拟一个Storm集群的所有功能,以本地模式运行topology跟在集群上运行topology类似,这对于我们开发和测试来说非常有用。


高效

2.storm的架构模型

img

  1. Nimbus:负责资源分配和任务调度。新版本中的nimbus节点可以有多个,做主备
  2. Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。
  3. Worker:运行具体处理组件逻辑的进程。
  4. Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。最新版本的Jstorm已经废除了task的概念

img

3.storm的安装

三台机器运行服务规划

运行服务\机器规划 Node01 Node02 Node03
Zookeeper版本 3.4.9
Zookeeper服务
Storm版本 Apache-storm-1.1.1
Nimbus服务 是(leader)
Supervisor服务
IP地址规划 192.168.52.200 192.168.52.201 192.168.52.202

4.三台机器安装zookeeper服务


Node01配置文件修改

  • 修改zoo.cfg
dataDir=/export/servers/zookeeper-3.4.9/zkData/data
dataLogDir=/export/servers/zookeeper-3.4.9/zkData/log
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
  • 修改myid

img


Node02 修改配置文件

  • 修改zoo.cfg
dataDir=/export/servers/zookeeper-3.4.9/zkData/data
dataLogDir=/export/servers/zookeeper-3.4.9/zkData/log
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888 
  • 修改myid

img


Node03修改配置文件

  • 修改zoo.cfg
dataDir=/export/servers/zookeeper-3.4.9/zkData/data
dataLogDir=/export/servers/zookeeper-3.4.9/zkData/log
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
  • 修改myid

img

  • 三台服务器启动zookeeper服务
bin/zkServer.sh  start
  • 三台机器查看zookeeper服务状态
bin/zkServer.sh status

4.1三台机器安装storm集群

1、上传storm压缩包

2、解压

tar -zxvf apache-storm-1.1.1.tar.gz -C../servers/ 

3、修改配置文件

storm.zookeeper.servers:
     -"node01"
     -"node02"
     -"node03"
# 
nimbus.seeds: ["node01","node02", "node03"]

storm.local.dir:"/export/servers/apache-storm-1.1.1/stormdata" 
ui.port: 8088
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

  • supervisor任务调度

4、将storm安装程序分发拷贝到另外两台机器上

scp -r apache-storm-1.1.1/ node02:$PWD
scp -r apache-storm-1.1.1/ node03:$PWD

5、三台机器启动storm服务

  • Node01 启动相关服务
启动 nimbus进程
nohup bin/storm nimbus >/dev/null 2>&1 &

启动web  UI 页面
nohup bin/storm ui >/dev/null 2>&1 &

启动logViewer
nohup bin/storm logviewer >/dev/null 2>&1 & 

启动supervisor
nohup bin/storm supervisor >/dev/null 2>&1 &
  • Node02启动相关服务
nimbus:nohup bin/storm nimbus >/dev/null 2>&1 & 
logviewer:nohup bin/stormlogviewer >/dev/null 2>&1 & 
supervisor:nohup bin/stormsupervisor >/dev/null 2>&1 &
  • node03启动相关服务
nimbus:nohup bin/storm nimbus> /dev/null 2>&1 & 
logviewer:nohup bin/stormlogviewer >/dev/null 2>&1 & 
supervisor:nohup bin/stormsupervisor >/dev/null 2>&1 &

5.storm的UI界面管理

访问地址


http://192.168.52.200:8088/index.html

6.storm的编程模型

img

  • DataSource:外部数据源

    • Spout:接受外部数据源的组件,将外部数据源转化成Storm内部的数据,以Tuple为基本的传输单元下发给Bolt
    • Bolt:接受Spout发送的数据,或上游的bolt的发送的数据。根据业务逻辑进行处理。发送给下一个Bolt或者是存储到某种介质上。介质可以是mongodb或mysql,或者其他。
    • Tuple:Storm内部中数据传输的基本单元,里面封装了一个List对象,用来保存数据。
    • StreamGrouping:数据分组策略 7种:shuffleGrouping(Random函数),
    • Non Grouping(Random函数),
    • FieldGrouping(Hash取模)、
    • Local or ShuffleGrouping 本地或随机,优先本地。
  • 其中Local orShuffleGrouping 是如果分组的时候接收bolt的线程和发送者在一个JVM中默认优先选择一个JVM中的bolt就是local,否则和ShuffleGrouping效果一样。

7.storm的入门程序

7.1、实现单次计数的统计

  • 第一步:创建maven java 项目,导入jar包
<dependencies>
             <dependency>
               <groupId>org.apache.storm</groupId>
               <artifactId>storm-core</artifactId>
               <version>1.1.1</version>
               <scope>provided</scope>
           </dependency>
  </dependencies>

   <build>  
    <plugins>  
        <plugin>  
            <groupId>org.apache.maven.plugins</groupId>  
            <artifactId>maven-compiler-plugin</artifactId>  
            <configuration>  
                <source>1.8</source>
                <target>1.8</target>  
            </configuration>  
        </plugin>  
    </plugins>  
</build>
  • 第二步:开发我们的spout,随机选择一些单词发送到下一个bolt
public class RandomSpout extends BaseRichSpout{
      private SpoutOutputCollector collector;
      Random rand;
      /**
       * Map conf 系统初始化读取的配置文件
       * 
       * TopologyContextcontext  应用程序的上下文对象
       * 
       * SpoutOutputCollectorcollector  用于接收spout输出的数据
       * 
       * 这个方法主要用于系统的初始化工作,例如连接kafka,读取数据,连接mysql,或者连接redis等的初始化工作
       */

      @Override
      public voidopen(Map conf, TopologyContext context, SpoutOutputCollector collector) {
           // TODO Auto-generated method stub
           this.collector= collector; //初始化我们的系统当中的数据
           rand = newRandom();
      }
      /**
       * storm框架当中 会一直调用nextTuple将数据不断的往后发送,发送给下一个组件当中去
       */
      @Override
      public voidnextTuple() {
           //监控某个目录下面所有的文件,一旦发现新增的文件,就将文件给读取完成,然后将文件重命名
           try {
                  String[] sentences= new String[]{ "my storm word count", "hello my  storm", "hello  storm hello  world"};  
                String sentence= sentences[rand.nextInt(sentences.length)];  
                Thread.sleep(3000);
                 collector.emit(new Values(sentence));//Values 继承了arrayList 数组
           } catch (Exception e){
                 // TODO Auto-generated catch block
                 e.printStackTrace();
           }
      }

      @Override
      public voiddeclareOutputFields(OutputFieldsDeclarer declarer){
           declarer.declare(new Fields("helloStorm"));
      }
}
  • 第三步:开发我们的SplitBolt将我们的英文句子切割成一个个的单词
public class SplitBolt extends BaseBasicBolt{
      @Override
      public voidexecute(Tuple input, BasicOutputCollector collector) {
           StringstringByField = input.getStringByField("helloStorm");
           String[]split = stringByField.split(" ");
           for (String string: split) {
                 collector.emit(new Values(string,1));
           }
      }
      @Override
      public voiddeclareOutputFields(OutputFieldsDeclarer declarer){
           declarer.declare(new Fields("word","num"));
      }
}
  • 第四步:开发我们的计数器CountBolt
public class CountBolt  extends BaseBasicBolt{
      private Map<String, Integer> map  =   new HashMap<String,Integer>();
      @Override
      public void execute(Tuple input, BasicOutputCollector collector) {
           StringstringByField = input.getStringByField("word");
           IntegerintegerByField = input.getIntegerByField("num");
           if(map.containsKey(stringByField)){
                 map.put(stringByField,map.get(stringByField)+integerByField);
           }else{
                 map.put(stringByField,integerByField);
           }
           System.out.println(map.toString());
      }
      @Override
      public void declareOutputFields(OutputFieldsDeclarer declarer){
           // TODO Auto-generated method stub
      }
}

第五步:组装我们的程序向storm集群进行提交

public class WordCountTopology {
      public static void main(String[] args) throwsException {
           TopologyBuildertopologyBuilder = new TopologyBuilder();
           topologyBuilder.setSpout("mySpout",new RandomSpout(),2);
           topologyBuilder.setBolt("splitBolt",new SplitBolt(),2).shuffleGrouping("mySpout");
           topologyBuilder.setBolt("countBolt",new CountBolt(),2).shuffleGrouping("splitBolt");
           Config config = new Config();
           if(args.length > 0){
                 config.setDebug(false);
                 config.setNumWorkers(1);
                 config.setNumAckers(config, 5);
                 StormSubmitter.submitTopology(args[0], config,topologyBuilder.createTopology());
           }else{
                 config.setDebug(true);
                 LocalCluster cluster = new LocalCluster();
                 cluster.submitTopology("wordCount",config, topologyBuilder.createTopology());
           }
      }
}

8.storm的并行度

img

config.setNumWorkers(1);//设置进程数
topologyBuilder.setSpout("mySpout",new RandomSpout(),3);
topologyBuilder.setBolt("splitBolt",new SplitBolt(),3).shuffleGrouping("mySpout");//线程数
topologyBuilder.setBolt("countBolt",newCountBolt(),3).setNumTasks(4).shuffleGrouping("splitBolt");
  • Storm当中的worker,executor,task之间的相互关系

    • Worker:表示一个进程
    • Executor:表示由worker启动的线程
    • 一个worker只会负责一个topology任务,不会出现一个worker负责多个topology任务的情况。
    • 一个worker进程当中,可以启动多个线程executor,也就是说,一个worker进程可以对应多个executor线程
    • task 是实际执行数据处理的最小工作单元(注意,task 并不是线程) —— 在你的代码中实现的每个spout 或者 bolt 都会在集群中运行很多个 task。在拓扑的整个生命周期中每个组件的 task 数量都是保持不变的,不过每个组件的 executor 数量却是有可能会随着时间变化。在默认情况下 task 的数量是和 executor 的数量一样的,也就是说,默认情况下 Storm 会在每个线程上运行一个 task
  • 注:调整task的数量,并不能够实际上提高storm的并行度,因为storm不管是spout还是bolt当中的代码都是串行执行的,就算一个executor对应多个task,这多个task也是串行去执行executor当中的代码,所以这个调整task的个数,实际上并不能提高storm的并行度
  • 在实际工作当中,由于spout与bolt的数量不能够精准确定,所以需要随时调整spout与bolt的数量,所以在storm当中,我们可以通过命令来动态的进行调整
storm rebalance mytopo -n3 -e mySpout=5 -e splitBolt=6 -e countBolt=8

  • 一定要注意:重新调整的时候=号两边不要有空格

9.Storm的分发策略

  • Storm当中的分组策略,一共有八种:

    所谓的grouping策略就是在Spout与Bolt、Bolt与Bolt之间传递Tuple的方式。总共有八种方式:

1)

shuffleGrouping(随机分组)随机分组

;将tuple随机分配到bolt中,能够保证各task中处理的数据均衡;

2)

fieldsGrouping

(按照字段分组,在这里即是同一个单词只能发送给一个Bolt)

​ 按字段分组; 根据设定的字段相同值得tuple被分配到同一个bolt进行处理;

​ 举例:builder.setBolt(“mybolt”,new MyStoreBolt(),5).fieldsGrouping(“checkBolt”,newFields(“uid”));

​ 说明:该bolt由5个任务task执行,相同uid的元组tuple被分配到同一个task进行处理;该task接收的元祖字段是mybolt发射出的字段信息,不受uid分组的影响。

​ 该分组不仅方便统计而且还可以通过该方式保证相同uid的数据保存不重复(uid信息写入数据库中唯一);

3)

allGrouping

(广播发送,即每一个Tuple,每一个Bolt都会收到)广播发送:所有bolt都可以收到该tuple

4)

globalGrouping

(全局分组,将Tuple分配到task id值最低的task里面)全局分组:tuple被发送给bolt的同一个并且最小task_id的任务处理,实现事务性的topology,保证全局有序,全局找task_id最小的那个线程,所有的数据都由最小的这个task_id来进行处理,保证数据的顺序

5)

noneGrouping

(随机分派)不分组:效果等同于shuffleGrouping.

6)

directGrouping

(直接分组,指定Tuple与Bolt的对应发送关系)

​ 直接分组:由tuple的发射单元直接决定tuple将发射给那个bolt,一般情况下是由接收tuple的bolt决定接收哪个bolt发射的Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid(OutputCollector.emit方法也会返回taskid)。

7)

Local or shuffle Grouping本地或者随机分组

,优先将数据发送到本机的处理器executor,如果本机没有对应的处理器,那么再发送给其他机器的executor,避免了网络资源的拷贝,减轻网络传输的压力

8)

customGrouping

(自定义的Grouping)

在开发中我们常用 fieldsGrouping确定可以分配到同一个bolt
shuffleGrouping
Local or shuffle Grouping常用是这个 可以减少网络开销
具体看情况而定

10.storm与kafka集成

10.1旧版本的kafka与storm之间相互集成

  • 第一步:导入jar包
<dependency>
       <groupId>org.apache.storm</groupId>
       <artifactId>storm-core</artifactId>
       <version>1.1.1</version>
   </dependency>
       <!--  use old kafka spout code-->
       <dependency>
           <groupId>org.apache.storm</groupId>
           <artifactId>storm-kafka</artifactId>
           <version>1.1.1</version>
       </dependency>
       <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka_2.10</artifactId>
           <version>0.8.2.1</version>
           <exclusions>
               <exclusion>
                   <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
               </exclusion>
               <exclusion>
                   <groupId>org.slf4j</groupId>
                   <artifactId>slf4j-api</artifactId>
               </exclusion>
           </exclusions>
       </dependency>
  • 第二步:代码实现

10.2新版本的kafka与storm1.1.1集成

  • 第一步:导入jar包
<!-- use new kafka spout code -->
       <dependency>
           <groupId>org.apache.storm</groupId>
           <artifactId>storm-kafka-client</artifactId>
           <version>1.1.1</version>
       </dependency>
       <dependency>
           <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
           <version>0.10.0.0</version>
       </dependency>
<dependency>
              <groupId>org.apache.storm</groupId>
              <artifactId>storm-core</artifactId>
               <version>1.1.1</version>
               <scope>provided</scope>
           </dependency>
  • 第二步:编写我们的主函数入口程序
public class KafkStormTopo {
      public staticvoid main(String[] args) throwsException {
           KafkaSpoutConfig.Builder<String,String> builder = KafkaSpoutConfig.builder("192.168.52.200:9092,192.168.52.201:9092,192.168.52.202:9092","yun01");
        builder.setGroupId("test_storm_wc");
       KafkaSpoutConfig<String, String> kafkaSpoutConfig= builder.build();
       TopologyBuilder topologyBuilder = new TopologyBuilder();
       topologyBuilder.setSpout("WordCountFileSpout",new KafkaSpout<String,String>(kafkaSpoutConfig), 1);
        topologyBuilder.setBolt("readKafkaBolt",new KafkaBolt()).shuffleGrouping("WordCountFileSpout");
       Config config = new Config();
        if(args !=null && args.length > 0){
          config.setDebug(false);
          StormSubmitter submitter= new StormSubmitter();
          submitter.submitTopology("kafkaStromTopo", config, topologyBuilder.createTopology());
        }else{
          config.setDebug(true);
          LocalCluster cluster= new LocalCluster();
          cluster.submitTopology("kafkaStromTopo", config, topologyBuilder.createTopology());
        }
      }
}
  • 第三步:开发我们的kafkabolt作为消息处理
public class KafkaBolt extends BaseBasicBolt {
      @Override
      public voidexecute(Tuple input, BasicOutputCollector collector) {
           System.out.println(input.getValues().get(4)+"消息接受bolt");
      }
      @Override
      public voiddeclareOutputFields(OutputFieldsDeclarer declarer){
      }
}
  • input获取到的值
0索引代表kafka的topic
1索引代表kafka的分区
2索引代表kafka的偏移量
3索引代表kafka的key值
4索引代表kafka的value



版权声明:本文为qq_34795664原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。