新聞中心
我們已經(jīng)經(jīng)歷了Apache Storm的核心技術(shù)細節(jié),現(xiàn)在是時候編寫一些簡單的場景。

讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項目有:空間域名、雅安服務(wù)器托管、營銷軟件、網(wǎng)站建設(shè)、澄城網(wǎng)站維護、網(wǎng)站推廣。
場景 - 移動呼叫日志分析器
移動呼叫及其持續(xù)時間將作為對Apache Storm的輸入,Storm將處理和分組在相同呼叫者和接收者之間的呼叫及其呼叫總數(shù)。
Spout創(chuàng)建
Spout是用于數(shù)據(jù)生成的組件?;旧?,一個spout將實現(xiàn)一個IRichSpout接口。 “IRichSpout”接口有以下重要方法 -
-
open -為Spout提供執(zhí)行環(huán)境。執(zhí)行器將運行此方法來初始化噴頭。
-
nextTuple -通過收集器發(fā)出生成的數(shù)據(jù)。
-
close -當spout將要關(guān)閉時調(diào)用此方法。
-
declareOutputFields -聲明元組的輸出模式。
-
ack -確認處理了特定元組。
-
fail -指定不處理和不重新處理特定元組。
open
open方法的簽名如下 -
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
-
conf - 為此spout提供storm配置。
-
context - 提供有關(guān)拓撲中的spout位置,其任務(wù)ID,輸入和輸出信息的完整信息。
-
collector - 使我們能夠發(fā)出將由bolts處理的元組。
nextTuple
nextTuple方法的簽名如下 -
nextTuple()
nextTuple()從與ack()和fail()方法相同的循環(huán)中定期調(diào)用。它必須釋放線程的控制,當沒有工作要做,以便其他方法有機會被調(diào)用。因此,nextTuple的第一行檢查處理是否已完成。如果是這樣,它應(yīng)該休眠至少一毫秒,以減少處理器在返回之前的負載。
close
close方法的簽名如下-
close()
declareOutputFields
declareOutputFields方法的簽名如下-
declareOutputFields(OutputFieldsDeclarer declarer)
declarer -它用于聲明輸出流id,輸出字段等
此方法用于指定元組的輸出模式。
ack
ack方法的簽名如下 -
ack(Object msgId)
該方法確認已經(jīng)處理了特定元組。
fail
nextTuple方法的簽名如下-
ack(Object msgId)
此方法通知特定元組尚未完全處理。 Storm將重新處理特定的元組。
FakeCallLogReaderSpout
在我們的場景中,我們需要收集呼叫日志詳細信息。呼叫日志的信息包含。
- 主叫號碼
- 接收號碼
- 持續(xù)時間
由于我們沒有呼叫日志的實時信息,我們將生成假呼叫日志。假信息將使用Random類創(chuàng)建。完整的程序代碼如下。
編碼 - FakeCallLogReaderSpout.java
import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
//Create a class FakeLogReaderSpout which implement IRichSpout interface
to access functionalities
public class FakeCallLogReaderSpout implements IRichSpout {
//Create instance for SpoutOutputCollector which passes tuples to bolt.
private SpoutOutputCollector collector;
private boolean completed = false;
//Create instance for TopologyContext which contains topology data.
private TopologyContext context;
//Create instance for Random class.
private Random randomGenerator = new Random();
private Integer idx = 0;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple() {
if(this.idx <= 1000) {
List mobileNumbers = new ArrayList();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");
Integer localIdx = 0;
while(localIdx++ < 100 && this.idx++ < 1000) {
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
while(fromMobileNumber == toMobileNumber) {
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}
Integer duration = randomGenerator.nextInt(60);
this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("from", "to", "duration"));
}
//Override all the interface methods
@Override
public void close() {}
public boolean isDistributed() {
return false;
}
@Override
public void activate() {}
@Override
public void deactivate() {}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
@Override
public Map getComponentConfiguration() {
return null;
}
}
Bolt創(chuàng)建
Bolt是一個使用元組作為輸入,處理元組,并產(chǎn)生新的元組作為輸出的組件。Bolts將實現(xiàn)IRichBolt接口。在此程序中,使用兩個Bolts
類CallLogCreatorBolt和CallLogCounterBolt來執(zhí)行操作。
IRichBolt接口有以下方法 -
-
prepare -為bolt提供要執(zhí)行的環(huán)境。執(zhí)行器將運行此方法來初始化spout。
-
execute -處理單個元組的輸入
-
cleanup -當spout要關(guān)閉時調(diào)用。
-
declareOutputFields -聲明元組的輸出模式。
Prepare
prepare方法的簽名如下 -
prepare(Map conf, TopologyContext context, OutputCollector collector)
-
conf -為此bolt提供Storm配置。
-
context -提供有關(guān)拓撲中的bolt位置,其任務(wù)ID,輸入和輸出信息等的完整信息。
-
collector -使我們能夠發(fā)出處理的元組。
execute
execute方法的簽名如下-
execute(Tuple tuple)
這里的元組是要處理的輸入元組。
execute方法一次處理單個元組。元組數(shù)據(jù)可以通過Tuple類的getValue方法訪問。不必立即處理輸入元組。多元組可以被處理和輸出為單個輸出元組。處理的元組可以通過使用OutputCollector類發(fā)出。
cleanup
cleanup方法的簽名如下 -
cleanup()
declareOutputFields
declareOutputFields方法的簽名如下-
declareOutputFields(OutputFieldsDeclarer declarer)
這里的參數(shù)declarer用于聲明輸出流id,輸出字段等。
此方法用于指定元組的輸出模式。
呼叫日志創(chuàng)建者bolt
呼叫日志創(chuàng)建者bolt接收呼叫日志元組。呼叫日志元組具有主叫方號碼,接收方號碼和呼叫持續(xù)時間。此bolt通過組合主叫方號碼和接收方號碼簡單地創(chuàng)建一個新值。新值的格式為“來電號碼 - 接收方號碼”,并將其命名為新字段“呼叫”。完整的代碼如下。
編碼 - CallLogCreatorBolt.java
//import util packages
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
//Create instance for OutputCollector which collects and emits tuples to produce output
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(new Values(from + " - " + to, duration));
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call", "duration"));
}
@Override
public Map getComponentConfiguration() {
return null;
}
}
呼叫日志計數(shù)器Bolt
呼叫日志創(chuàng)建者bolt接收呼叫日志元組。呼叫日志元組具有主叫方號碼,接收方號碼和呼叫持續(xù)時間。此bolt通過組合主叫方號碼和接收方號碼簡單地創(chuàng)建一個新值。新值的格式為“來電號碼 - 接收方號碼”,并將其命名為新字段“呼叫”。完整的代碼如下。
編碼 - CallLogCounterBolt.java
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class CallLogCounterBolt implements IRichBolt {
Map counterMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if(!counterMap.containsKey(call)){
counterMap.put(call, 1);
}else{
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry entry:counterMap.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call"));
}
@Override
public Map getComponentConfiguration() {
return null;
}
}
創(chuàng)建拓撲
Storm拓撲基本上是一個Thrift結(jié)構(gòu)。 TopologyBuilder類提供了簡單而容易的方法來創(chuàng)建復(fù)雜的拓撲。TopologyBuilder類具有設(shè)置spout(setSpout)和設(shè)置bolt(setBolt)的方法。最后,TopologyBuilder有createTopology來創(chuàng)建拓撲。使用以下代碼片段創(chuàng)建拓撲 -
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping和fieldsGrouping方法有助于為spout和bolts設(shè)置流分組。
本地集群
為了開發(fā)目的,我們可以使用“LocalCluster”對象創(chuàng)建本地集群,然后使用“LocalCluster”類的“submitTopology”方法提交拓撲。 “submitTopology”的參數(shù)之一是“Config”類的實例?!癈onfig”類用于在提交拓撲之前設(shè)置配置選項。此配置選項將在運行時與集群配置合并,并使用prepare方法發(fā)送到所有任務(wù)(spout和bolt)。一旦拓撲提交到集群,我們將等待10秒鐘,集群計算提交的拓撲,然后使用“LocalCluster”的“shutdown”方法關(guān)閉集群。完整的程序代碼如下 -
編碼 - LogAnalyserStorm.java
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
public static void main(String[] args) throws Exception{
//Create Config instance for cluster configuration
Config config = new Config();
config.setDebug(true);
//
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);
//Stop the topology
cluster.shutdown();
}
}
構(gòu)建和運行應(yīng)用程序
完整的應(yīng)用程序有四個Java代碼。它們是 -
- FakeCallLogReaderSpout.java
- CallLogCreaterBolt.java
- CallLogCounterBolt.java
- LogAnalyerStorm.java
應(yīng)用程序可以使用以下命令構(gòu)建 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
應(yīng)用程序可以使用以下命令運行 -
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
輸出
一旦應(yīng)用程序啟動,它將輸出有關(guān)集群啟動過程,spout和螺栓處理的完整詳細信息,最后是集群關(guān)閉過程。在“CallLogCounterBolt”中,我們打印了呼叫及其計數(shù)詳細信息。此信息將顯示在控制臺上如下 -
1234123402 - 1234123401 : 78 1234123402 - 1234123404 : 88 1234123402 - 1234123403 : 105 1234123401 - 1234123404 : 74 1234123401 - 1234123403 : 81 1234123401 - 1234123402 : 81 1234123403 - 1234123404 : 86 1234123404 - 1234123401 : 63 1234123404 - 1234123402 : 82 1234123403 - 1234123402 : 83 1234123404 - 1234123403 : 86 1234123403 - 1234123401 : 93
非JVM語言
Storm拓撲通過Thrift接口實現(xiàn),這使得輕松地提交任何語言的拓撲。Storm支持Ruby,Python和許多其他語言。讓我們來看看python綁定。
Python綁定
Python是一種通用的解釋,交互,面向?qū)ο蠛透呒壘幊陶Z言。Storm支持Python實現(xiàn)其拓撲。Python支持發(fā)射,錨定,acking和日志操作。
如你所知,bolt可以用任何語言定義。用另一種語言編寫的bolt作為子進程執(zhí)行,Storm通過stdin / stdout與JSON消息進行通信。首先拿一個支持python綁定的樣例bolt WordCount。
public static class WordCount implements IRichBolt {
public WordSplit() {
super("python", "splitword.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
這里的類WordCount實現(xiàn)IRichBolt接口和運行與python實現(xiàn)指定超級方法參數(shù)“splitword.py”。現(xiàn)在創(chuàng)建一個名為“splitword.py”的python實現(xiàn)。
import storm
class WordCountBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
WordCountBolt().run()
這是Python的示例實現(xiàn),它計算給定句子中的單詞。同樣,您也可以與其他支持語言綁定。
當前名稱:創(chuàng)新互聯(lián)ApacheStorm教程:ApacheStorm工作實例
網(wǎng)頁鏈接:http://www.5511xx.com/article/djodcgc.html


咨詢
建站咨詢
