使用Storm实现实时大数据分析!
2013-01-12 19:10:27   来源:互联网   评论:0 点击:

随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析。CSDN在此编译
  1.        for(Field fields : tupleInfo.getFieldList())  
  2.        {  
  3.             insertQuery.append(fields.getColumnName()+",");  
  4.        }  
  5.        insertQuery.append("thresholdTimeStamp").append(") values (");  
  6.        for(Field fields : tupleInfo.getFieldList())  
  7.        {  
  8.            insertQuery.append("?,");  
  9.        }  
  10.    
  11.        insertQuery.append("?)");  
  12.        prepStatement = connection.prepareStatement(insertQuery.toString());  
  13.     }  
  14.     catch (SQLException e)   
  15.     {         
  16.         e.printStackTrace();  
  17.     }         
  18. }  

数据分批次的插入数据库。插入的逻辑由Listting Seven中的execute()方法提供。大部分的编码都是用来实现可能存在不同类型输入的解析。

Listing Seven:数据插入的代码部分。


  1. public void execute(Tuple tuple, BasicOutputCollector collector)   
  2. {  
  3.     batchExecuted=false;  
  4.     if(tuple!=null)  
  5.     {  
  6.        List<Object> inputTupleList = (List<Object>) tuple.getValues();  
  7.        int dbIndex=0;  
  8.        for(int i=0;i<tupleInfo.getFieldList().size();i++)  
  9.        {  
  10.            Field field = tupleInfo.getFieldList().get(i);  
  11.            try {  
  12.                dbIndex = i+1;  
  13.                if(field.getColumnType().equalsIgnoreCase("String"))               
  14.                    prepStatement.setString(dbIndex, inputTupleList.get(i).toString());  
  15.                else if(field.getColumnType().equalsIgnoreCase("int"))  
  16.                    prepStatement.setInt(dbIndex,  
  17.                        Integer.parseInt(inputTupleList.get(i).toString()));  
  18.                else if(field.getColumnType().equalsIgnoreCase("long"))  
  19.                    prepStatement.setLong(dbIndex,   
  20.                        Long.parseLong(inputTupleList.get(i).toString()));  
  21.                else if(field.getColumnType().equalsIgnoreCase("float"))  
  22.                    prepStatement.setFloat(dbIndex,   
  23.                        Float.parseFloat(inputTupleList.get(i).toString()));  
  24.                else if(field.getColumnType().equalsIgnoreCase("double"))  
  25.                    prepStatement.setDouble(dbIndex,   
  26.                        Double.parseDouble(inputTupleList.get(i).toString()));  
  27.                else if(field.getColumnType().equalsIgnoreCase("short"))  
  28.                    prepStatement.setShort(dbIndex,   
  29.                        Short.parseShort(inputTupleList.get(i).toString()));  
  30.                else if(field.getColumnType().equalsIgnoreCase("boolean"))  
  31.                    prepStatement.setBoolean(dbIndex,   
  32.                        Boolean.parseBoolean(inputTupleList.get(i).toString()));  
  33.                else if(field.getColumnType().equalsIgnoreCase("byte"))  
  34.                    prepStatement.setByte(dbIndex,   
  35.                        Byte.parseByte(inputTupleList.get(i).toString()));  
  36.                else if(field.getColumnType().equalsIgnoreCase("Date"))  
  37.                {  
  38.                   Date dateToAdd=null;  
  39.                   if (!(inputTupleList.get(i) instanceof Date))    
  40.                   {    
  41.                        DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");  
  42.                        try   
  43.                        {  
  44.                            dateToAdd = df.parse(inputTupleList.get(i).toString());  
  45.                        }  
  46.                        catch (ParseException e)   
  47.                        {  
  48.                            System.err.println("Data type not valid");  
  49.                        }  
  50.                    }    
  51.                    else 
  52.                    {  
  53.             dateToAdd = (Date)inputTupleList.get(i);  
  54.             java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());  
  55.             prepStatement.setDate(dbIndex, sqlDate);  
  56. }

相关热词搜索:Storm 实时 大数据 分析

上一篇:网站统计中的数据收集原理及实现(二)
下一篇:最后一页

分享到: 收藏