Spark与Mysql,Oracle等关系型数据库的整合开发 JdbcRdd分页

Alex / 8-25 9:29 / Spark / Tag: sprak

在Spark中提供了一个JdbcRDD类,该RDD就是读取JDBC中的数据并转换成RDD,然后就可以随心所欲了,(坏笑)


下边是JdbcRdd的构造函数:

new JdbcRDD(
	sc: SparkContext,
	getConnection: () ⇒ Connection,
	sql: String,
	lowerBound: Long,
	upperBound: Long,
	numPartitions: Int,
	mapRow: (ResultSet) ⇒ T = JdbcRDD.resultSetToObjectArray)(implicit arg0: ClassTag[T])

 

/**
 *:getConnection 
 * --------------a function that returns an open Connection. The RDD takes care of closing the connection.
 *:sql
 *---------------the text of the query. The query must contain two ? placeholders for parameters used to partition the results. E.g. "select title, author from books where ? <= id and id <= ?"
 *:lowerBound
 *---------------the minimum value of the first placeholder
 *:upperBound
 *---------------the maximum value of the second placeholder The lower and upper bounds are inclusive.
 *:numPartitions
 *---------------the number of partitions. Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, the query would be executed twice, once with (1, 10) and once with (11, 20)
 *:mapRow
 *---------------a function from a ResultSet to a single row of the desired result type(s). This should only call getInt, getString, etc; the RDD takes care of calling next. The default maps a ResultSet to an array of Object.
 * */
 
1、getConnection 返回一个已经打开的结构化数据库连接,JdbcRDD会自动维护关闭。
2、sql 是查询语句,此查询语句必须包含两处占位符?来作为分割数据库ResulSet的参数,例如:”select title, author from books where ? < = id and id <= ?”
3、lowerBound, upperBound, numPartitions 分别为第一、第二占位符,partition的个数。例如,给出lowebound 1,upperbound 20, numpartitions 2,则查询分别为(1, 10)与(11, 20)
4、mapRow 是转换函数,将返回的ResultSet转成RDD需用的单行数据,此处可以选择Array或其他,也可以是自定义的case class。默认的是将ResultSet 转换成一个Object数组。

熟悉了这几个参数后我相信你的不解就消除了了一大部分

 

下边是一个简单的例子:

object SparkToJDBC {	  
	  def main(args: Array[String]) {
	    val sc = new SparkContext("local", "mysql")
	    val rdd = new JdbcRDD(
	      sc,
	      () => {
	        Class.forName("com.mysql.jdbc.Driver").newInstance()
	        DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
	      },
	      "SELECT content FROM mysqltest WHERE ID >= ? AND ID <= ?",
	      1, 100, 3,
	      r => r.getString(1)).cache()
	  
	    print(rdd.filter(_.contains("success")).count())
	    sc.stop()
	  }
}

      读mysqltest 表中的数据,并统计ID>=1 && ID < = 100 && content.contains("success")的记录条数。我们从代码中可以看出JdbcRDD的sql参数要带有两个?的占位符,而这两个占位符是给参数lowerBound和参数upperBound定义where语句的上下边界的。从JdbcRDD类的构造函数可以知道,参数lowerBound和参数upperBound都只能是Long类型的,并不支持其他类型的比较

==========================================================================================

下边是一个进阶的例子:

 

    val b_str = nyr + " 00:00:00"
    val e_str = nyr + " 23:59:59"
    
    val strSql1 = s" select  count(1) from table1 t where t.optime  >= '$b_str' and  t.optime <> ? and t.optime <> ?  "
    
    val strSql2=" select t.字段1,t.optime from "+
	s" (select t.*,@rownum:=@rownum+1 AS rownum from table1 t,(SELECT @rownum:=0) a where  t.optime  >= '$b_str') t "+
    " where t.rownum >= ? and t.rownum <= ?"
     
	 //该执行返回我们指定时间内符合条件数据量的总条数
    val count_rdd_lrb = new JdbcRDD(sc,()=>{Class.forName(jdbc_driverName).newInstance() 
      DriverManager.getConnection(jdbc_url,username,password)},
      strSql1,
      0,0,1,
      x=> x
    ).map { x => Row(x.getInt(1)) }
    
    val count = count_rdd_lrb.first().getInt(0)
    
    //启动新的jdbcRdd 根据数据大小 分100个区,取出数据转换成RDD
    val rdd_lrb = new JdbcRDD(sc,()=>{Class.forName(jdbc_driverName).newInstance() 
      DriverManager.getConnection(jdbc_url,username,password)},
      strSql2,
      1,count,100,
      x=> x
    ).map { 处理函数(使用getString,getLong等方法取出数据,处理后放入元组或者case class中返回)}

 

 进阶例子为团队大神 穆清松 编写,跟他学到了很多,同时感谢 莫鹏 同事的解答

本文链接:http://www.lishiyu.cn/post/101.html

参考:https://www.iteblog.com/archives/1113

 

 

 

 


大家在说:

烟台网络公司
2016-10-30 16:52
技术贴,顶一个!

发表留言:

Python3.x不再支持mysqldb以后python3.x使用pymysql连接mysql windows下Python中pip安装Pillow报错总结
返回顶部
Themes by lishiyu.cn