TopologyBuilder

  • Post author:
  • Post category:其他


TopologyBuilder是构建拓扑的类,用于指定执行的拓扑。拓扑底层是Thrift结构,由于Thrift API非常冗长,使用TopologyBuilder可以极大地简化建立拓扑的过程。
TopologyBuilder的公有方法如图3.1所示。
创建和提交拓扑的过程如下:首先,使用new关键字创建一个TopologyBuilder对象,然后调用setSpout方法设置Spout,接着调用setBolt方法设置Bolt,最后调用createTopology方法返回StormTopology对象给submitTopology方法作为输入参数。
创建并提交Topology到Storm集群的完整代码如下:
// 创建TopologyBuilder对象
TopologyBuilder builder = new TopologyBuilder();
// 添加一个id为“1”,并行度为5的TestWordSpout对象
builder.setSpout(“1”, new TestWordSpout(true), 5);
// 添加一个id为“2”,并行度为3的TestWordSpout对象
builder.setSpout(“2”, new TestWordSpout(true), 3);
// 添加一个id为“3”,并行度为3的TestWordCounter对象
// 对id为“1”的

组件

按“word”字段进行分组
// 对id为“2”的

组件

按“word”字段进行分组
builder.setBolt(“3”, new TestWordCounter(), 3)
.fieldsGrouping(“1”, new Fields(“word”))
.fieldsGrouping(“2”, new Fields(“word”));
// 添加一个id为“4”,并行度为1的TestGlobalCount对象
// 对id为“1”的组件按全局分组
builder.setBolt(“4”, new TestGlobalCount(),1)
.globalGrouping(“1”);
Map conf = new HashMap();


// 创建HashMap对象
conf.put(Config.TOPOLOGY_WORKERS, 4);


// 设置Worker的数量为4
// 提交拓扑
StormSubmitter.submitTopology(“mytopology”, conf, builder.createTopology());
在本地模式(进程中)下运行完全相同的拓扑的代码如下:
TopologyBuilder builder = new TopologyBuilder();


// 创建TopologyBuilder对象
// 添加一个id为“1”,并行度为5的TestWordSpout对象
builder.setSpout(“1”, new TestWordSpout(true), 5);
// 添加一个id为“2”,并行度为3的TestWordSpout对象
builder.setSpout(“2”, new TestWordSpout(true), 3);
// 添加一个id为“3”,并行度为3的TestWordCounter对象
// 对id为“1”的组件按“word”字段进行分组
// 对id为“2”的组件按“word”字段进行分组
builder.setBolt(“3”, new TestWordCounter(), 3)

.fieldsGrouping(“1”, new Fields(“word”))
.fieldsGrouping(“2”, new Fields(“word”));
// 添加一个id为“4”,并行度为1的TestGlobalCount对象
// 对id为“1”的组件按全局分组
builder.setBolt(“4”, new TestGlobalCount(),1)
.globalGrouping(“1”);
Map conf = new HashMap();


// 创建HashMap对象
conf.put(Config.TOPOLOGY_WORKERS, 4);


// 设置Worker的数量为4
conf.put(Config.TOPOLOGY_DEBUG, true);


// 设置调试模式为true
LocalCluster cluster = new LocalCluster();


// 创建LocalCluster对象
// 提交拓扑
cluster.submitTopology(“mytopology”, conf, builder.createTopology());
Utils.sleep(10000);


// 线程睡眠10秒,即拓扑可以运行10秒
cluster.shutdown();


// 关闭拓扑