使用Storm实现实时大数据分析!
2013-01-12 19:10:27   来源:互联网   评论:0 点击:

随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析。CSDN在此编译
  1.          {   
  2.          fileReader  =  new BufferedReader(new FileReader(new File(file)));  
  3.          }  
  4.          catch (FileNotFoundException e)  
  5.          {  
  6.          System.exit(1);   
  7.          }  
  8. }                                                          
  9.  
  10. public void nextTuple()  
  11. {  
  12.          protected void ListenFile(File file)  
  13.          {  
  14.          Utils.sleep(2000);  
  15.          RandomAccessFile access = null;  
  16.          String line = null;   
  17.             try   
  18.             {  
  19.                 while ((line = access.readLine()) != null)  
  20.                 {  
  21.                     if (line !=null)  
  22.                     {   
  23.                          String[] fields=null;  
  24.                           if (tupleInfo.getDelimiter().equals("|"))  fields = line.split("\\"+tupleInfo.getDelimiter());   
  25.                           else   
  26.                           fields = line.split  (tupleInfo.getDelimiter());   
  27.                           if (tupleInfo.getFieldList().size() == fields.length)  _collector.emit(new Values(fields));  
  28.                     }  
  29.                }  
  30.             }  
  31.             catch (IOException ex){ }  
  32.             }  
  33. }  
  34.  
  35. public void declareOutputFields(OutputFieldsDeclarer declarer)  
  36. {  
  37.       String[] fieldsArr = new String [tupleInfo.getFieldList().size()];  
  38.       for(int i=0; i<tupleInfo.getFieldList().size(); i++)  
  39.       {  
  40.               fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();  
  41.       }  
  42. declarer.declare(new Fields(fieldsArr));  
  43. }      

declareOutputFileds()决定了tuple发射的格式,这样的话Bolt就可以用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有添加Spout就会进行读入并且发送给Bolt进行处理。

Bolt的实现

Spout的输出结果将给予Bolt进行更深一步的处理。经过对用例的思考,我们的topology中需要如Figure 3中的两个Bolt。

Figure 3:Spout到Bolt的数据流程。

ThresholdCalculatorBolt

Spout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是:

临界值检查

  • 临界值栏数检查(拆分成字段的数目)
  • 临界值数据类型(拆分后字段的类型)
  • 临界值出现的频数
  • 临界值时间段检查

Listing Four中的类,定义用来保存这些值。

Listing Four:ThresholdInfo类


  1. public class ThresholdInfo implementsSerializable  
  2.  
  3. {    
  4.         private String action;   
  5.         private String rule;   
  6.         private Object thresholdValue;  
  7.         private int thresholdColNumber;   
  8.         private Integer timeWindow;   
  9.         private int frequencyOfOccurence;   
  10. }   

基于字段中提供的值,临界值检查将被Listing Five中的execute()方法执行。代码大部分的功能是解析和接收值的检测。

Listing Five:临界值检测代码段


  1. public void execute(Tuple tuple, BasicOutputCollector collector)   
  2. {  
  3.     if(tuple!=null)   
  4.     {  
  5.         List<Object> inputTupleList = (List<Object>) tuple.getValues();  
  6.         int thresholdColNum = thresholdInfo.getThresholdColNumber();   
  7.         Object thresholdValue = thresholdInfo.getThresholdValue();   
  8.         String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();   
  9.         Integer timeWindow = thresholdInfo.getTimeWindow();  
  10.          int frequency = thresholdInfo.getFrequencyOfOccurence();  
  11.          if(thresholdDataType.equalsIgnoreCase("string"))  
  12.          {  
  13.              String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();  
  14.              String frequencyChkOp = thresholdInfo.getAction();  
  15.              if(timeWindow!=null)  
  16.              {  
  17.                  long curTime = System.currentTimeMillis();  
  18.                  long diffInMinutes = (curTime-startTime)/(1000);  
  19.                  if(diffInMinutes>=timeWindow)  
  20.                  {  
  21.                      if(frequencyChkOp.equals("=="))  
  22.                      {  
  23.                           if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
  24.                           {  
  25.                               count.incrementAndGet();  
  26.                               if(count.get() > frequency)  

相关热词搜索:Storm 实时 大数据 分析

上一篇:网站统计中的数据收集原理及实现(二)
下一篇:最后一页

分享到: 收藏