使用Storm实现实时大数据分析!
2013-01-12 19:10:27   来源:互联网   评论:0 点击:

随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析。CSDN在此编译
  1.                                    count.incrementAndGet();  
  2.                                    if(count.get() > frequency)  
  3.                                        splitAndEmit(inputTupleList,collector);  
  4.                               }  
  5.                           }  
  6.                           else if(frequencyChkOp.equals(">"))  
  7.                           {  
  8.                                if(valueToCheck > Double.parseDouble(thresholdValue.toString()))  
  9.                                 {  
  10.                                    count.incrementAndGet();  
  11.                                    if(count.get() > frequency)  
  12.                                        splitAndEmit(inputTupleList,collector);  
  13.                                }  
  14.                            }  
  15.                            else if(frequencyChkOp.equals("=="))  
  16.                            {  
  17.                               if(valueToCheck == Double.parseDouble(thresholdValue.toString()))  
  18.                               {  
  19.                                   count.incrementAndGet();  
  20.                                   if(count.get() > frequency)  
  21.                                       splitAndEmit(inputTupleList,collector);  
  22.                                }  
  23.                            }  
  24.                            else if(frequencyChkOp.equals("!="))  
  25.                            {  
  26.     . . .  
  27.                             }  
  28.                        }  
  29.              }  
  30.       else 
  31.           splitAndEmit(null,collector);  
  32.       }  
  33.       else 
  34.      {  
  35.            System.err.println("Emitting null in bolt");  
  36.            splitAndEmit(null,collector);  
  37.     }  

经由Bolt发送的的tuple将会传递到下一个对应的Bolt,在我们的用例中是DBWriterBolt。

DBWriterBolt

经过处理的tuple必须被持久化以便于触发tigger或者更深层次的使用。DBWiterBolt做了这个持久化的工作并把tuple存入了数据库。表的建立由prepare()函数完成,这也将是topology调用的第一个方法。方法的编码如Listing Six所示。

Listing Six:建表编码。


  1. public void prepare( Map StormConf, TopologyContext context )   
  2. {         
  3.     try   
  4.     {  
  5.         Class.forName(dbClass);  
  6.     }   
  7.     catch (ClassNotFoundException e)   
  8.     {  
  9.         System.out.println("Driver not found");  
  10.         e.printStackTrace();  
  11.     }  
  12.    
  13.     try   
  14.     {  
  15.        connection driverManager.getConnection(   
  16.            "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);  
  17.        connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();  
  18.    
  19.        StringBuilder createQuery = new StringBuilder(  
  20.            "CREATE TABLE IF NOT EXISTS "+tableName+"(");  
  21.        for(Field fields : tupleInfo.getFieldList())  
  22.        {  
  23.            if(fields.getColumnType().equalsIgnoreCase("String"))  
  24.                createQuery.append(fields.getColumnName()+" VARCHAR(500),");  
  25.            else 
  26.                createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");  
  27.        }  
  28.        createQuery.append("thresholdTimeStamp timestamp)");  
  29.        connection.prepareStatement(createQuery.toString()).execute();  
  30.    
  31.        // Insert Query  
  32.        StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");  
  33.        String tempCreateQuery = new String();  
  34. for(Field fields

相关热词搜索:Storm 实时 大数据 分析

上一篇:网站统计中的数据收集原理及实现(二)
下一篇:最后一页

分享到: 收藏