使用Storm实现实时大数据分析!
2013-01-12 19:10:27   来源:互联网   评论:0 点击:
			随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析。CSDN在此编译
			- for(Field fields : tupleInfo.getFieldList())
 - {
 - insertQuery.append(fields.getColumnName()+",");
 - }
 - insertQuery.append("thresholdTimeStamp").append(") values (");
 - for(Field fields : tupleInfo.getFieldList())
 - {
 - insertQuery.append("?,");
 - }
 - insertQuery.append("?)");
 - prepStatement = connection.prepareStatement(insertQuery.toString());
 - }
 - catch (SQLException e)
 - {
 - e.printStackTrace();
 - }
 - }
 
数据分批次的插入数据库。插入的逻辑由Listting Seven中的execute()方法提供。大部分的编码都是用来实现可能存在不同类型输入的解析。
Listing Seven:数据插入的代码部分。
- 
public void execute(Tuple tuple, BasicOutputCollector collector)
 - {
 - batchExecuted=false;
 - if(tuple!=null)
 - {
 - List<Object> inputTupleList = (List<Object>) tuple.getValues();
 - int dbIndex=0;
 - for(int i=0;i<tupleInfo.getFieldList().size();i++)
 - {
 - Field field = tupleInfo.getFieldList().get(i);
 - try {
 - dbIndex = i+1;
 - if(field.getColumnType().equalsIgnoreCase("String"))
 - prepStatement.setString(dbIndex, inputTupleList.get(i).toString());
 - else if(field.getColumnType().equalsIgnoreCase("int"))
 - prepStatement.setInt(dbIndex,
 - Integer.parseInt(inputTupleList.get(i).toString()));
 - else if(field.getColumnType().equalsIgnoreCase("long"))
 - prepStatement.setLong(dbIndex,
 - Long.parseLong(inputTupleList.get(i).toString()));
 - else if(field.getColumnType().equalsIgnoreCase("float"))
 - prepStatement.setFloat(dbIndex,
 - Float.parseFloat(inputTupleList.get(i).toString()));
 - else if(field.getColumnType().equalsIgnoreCase("double"))
 - prepStatement.setDouble(dbIndex,
 - Double.parseDouble(inputTupleList.get(i).toString()));
 - else if(field.getColumnType().equalsIgnoreCase("short"))
 - prepStatement.setShort(dbIndex,
 - Short.parseShort(inputTupleList.get(i).toString()));
 - else if(field.getColumnType().equalsIgnoreCase("boolean"))
 - prepStatement.setBoolean(dbIndex,
 - Boolean.parseBoolean(inputTupleList.get(i).toString()));
 - else if(field.getColumnType().equalsIgnoreCase("byte"))
 - prepStatement.setByte(dbIndex,
 - Byte.parseByte(inputTupleList.get(i).toString()));
 - else if(field.getColumnType().equalsIgnoreCase("Date"))
 - {
 - Date dateToAdd=null;
 - if (!(inputTupleList.get(i) instanceof Date))
 - {
 - DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
 - try
 - {
 - dateToAdd = df.parse(inputTupleList.get(i).toString());
 - }
 - catch (ParseException e)
 - {
 - System.err.println("Data type not valid");
 - }
 - }
 - else
 - {
 - dateToAdd = (Date)inputTupleList.get(i);
 - java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());
 - prepStatement.setDate(dbIndex, sqlDate);
 - }
 
                上一篇:网站统计中的数据收集原理及实现(二)
                下一篇:大数据定义和十大应用案例
            
          分享到:
		  
          
		  
		  
		  
		  
      
	  
		收藏
	  
	  
      
            
      评论排行
- ·Windows(Win7)下用Xming...(92)
 - ·使用jmx client监控activemq(20)
 - ·Hive查询OOM分析(14)
 - ·复杂网络架构导致的诡异...(8)
 - ·使用 OpenStack 实现云...(7)
 - ·影响Java EE性能的十大问题(6)
 - ·云计算平台管理的三大利...(6)
 - ·Mysql数据库复制延时分析(5)
 - ·OpenStack Nova开发与测...(4)
 - ·LTPP一键安装包1.2 发布(4)
 - ·Linux下系统或服务排障的...(4)
 - ·PHP发布5.4.4 和 5.3.1...(4)
 - ·RSYSLOG搭建集中日志管理服务(4)
 - ·转换程序源码的编码格式[...(3)
 - ·Linux 的木马程式 Wirenet 出现(3)
 - ·Nginx 发布1.2.1稳定版...(3)
 - ·zend framework文件读取漏洞分析(3)
 - ·Percona Playback 0.3 development release(3)
 - ·运维业务与CMDB集成关系一例(3)
 - ·应该知道的Linux技巧(3)
 




