使用Storm实现实时大数据分析!
2013-01-12 19:10:27   来源:互联网   评论:0 点击:

随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析。CSDN在此编译
  1.             }     
  2.             }   
  3.         catch (SQLException e)   
  4.         {  
  5.              e.printStackTrace();  
  6.         }  
  7.     }  
  8.     Date now = new Date();            
  9.     try 
  10.     {  
  11.         prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));  
  12.         prepStatement.addBatch();  
  13.         counter.incrementAndGet();  
  14.         if (counter.get()== batchSize)   
  15.         executeBatch();  
  16.     }   
  17.     catch (SQLException e1)   
  18.     {  
  19.         e1.printStackTrace();  
  20.     }             
  21.    }  
  22.    else 
  23.    {  
  24.         long curTime = System.currentTimeMillis();  
  25.        long diffInSeconds = (curTime-startTime)/(60*1000);  
  26.        if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds)  
  27.        {  
  28.             try {  
  29.                 executeBatch();  
  30.                 startTime = System.currentTimeMillis();  
  31.             }  
  32.             catch (SQLException e) {  
  33.                  e.printStackTrace();  
  34.             }  
  35.        }  
  36.    }  
  37. }  
  38.    
  39. public void executeBatch() throws SQLException  
  40. {  
  41.     batchExecuted=true;  
  42.     prepStatement.executeBatch();  
  43.     counter = new AtomicInteger(0);  

一旦Spout和Bolt准备就绪(等待被执行),topology生成器将会建立topology并准备执行。下面就来看一下执行步骤。

在本地集群上运行和测试topology

  • 通过TopologyBuilder建立topology。
  • 使用Storm Submitter,将topology递交给集群。以topology的名字、配置和topology的对象作为参数。
  • 提交topology。

Listing Eight:建立和执行topology。


  1. public class StormMain  
  2. {  
  3.      public static void main(String[] args) throws AlreadyAliveException,   
  4.                                                    InvalidTopologyException,   
  5.                                                    InterruptedException   
  6.      {  
  7.           ParallelFileSpout parallelFileSpout = new ParallelFileSpout();  
  8.           ThresholdBolt thresholdBolt = new ThresholdBolt();  
  9.           DBWriterBolt dbWriterBolt = new DBWriterBolt();  
  10.           TopologyBuilder builder = new TopologyBuilder();  
  11.           builder.setSpout("spout", parallelFileSpout, 1);  
  12.           builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");  
  13.           builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");  
  14.           if(this.argsMain!=null && this.argsMain.length > 0)   
  15.           {  
  16.               conf.setNumWorkers(1);  
  17.               StormSubmitter.submitTopology(   
  18.                    this.argsMain[0], conf, builder.createTopology());  
  19.           }  
  20.           else 
  21.           {      
  22.               Config conf = new Config();  
  23.               conf.setDebug(true);  
  24.               conf.setMaxTaskParallelism(3);  
  25.               LocalCluster cluster = new LocalCluster();  
  26.               cluster.submitTopology(  
  27.               "Threshold_Test", conf, builder.createTopology());  
  28.           }  
  29.      }  

topology被建立后将被提交到本地集群。一旦topology被提交,除非被取缔或者集群关闭,它将一直保持运行不需要做任何的修改。这也是Storm的另一大特色之一。

这个简单的例子体现了当你掌握了topology、spout和bolt的概念,将可以轻松的使用Storm进行实时处理。如果你既想处理大数据又不想遍历Hadoop的话,不难发现使用Storm将是个很好的选择。

原文链接:Easy, Real-Time Big Data Analysis Using Storm (编译/仲浩 王旭东/审校)

相关热词搜索:Storm 实时 大数据 分析

上一篇:网站统计中的数据收集原理及实现(二)
下一篇:最后一页

分享到: 收藏