spark Java oracle,spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库...
課程咨詢(xún)以及領(lǐng)取大額優(yōu)惠請(qǐng)加微信:bigdatatang01
以下是用spark RDD java api實(shí)現(xiàn)從關(guān)系型數(shù)據(jù)庫(kù)中讀取數(shù)據(jù),這里使用的是derby本地?cái)?shù)據(jù)庫(kù),當(dāng)然可以是mysql或者oracle等關(guān)系型數(shù)據(jù)庫(kù):package?com.twq.javaapi.java7;
import?org.apache.spark.api.java.JavaRDD;
import?org.apache.spark.api.java.JavaSparkContext;
import?org.apache.spark.api.java.function.Function;
import?org.apache.spark.rdd.JdbcRDD;
import?java.io.Serializable;
import?java.sql.*;
public?class?JavaJdbcRDDSuite?implements?Serializable?{
public?static?void?prepareData()?throws?ClassNotFoundException,?SQLException?{
//使用本地?cái)?shù)據(jù)庫(kù)derby,當(dāng)然可以使用mysql等關(guān)系型數(shù)據(jù)庫(kù)
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
Connection?connection?=
DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true");
try?{
//創(chuàng)建一張表FOO,ID是一個(gè)自增的主鍵,DATA是一個(gè)INTEGER列
Statement?create?=?connection.createStatement();
create.execute(
"CREATE?TABLE?FOO("?+
"ID?INTEGER?NOT?NULL?GENERATED?ALWAYS?AS?IDENTITY?(START?WITH?1,?INCREMENT?BY?1),"?+
"DATA?INTEGER)");
create.close();
//插入數(shù)據(jù)
PreparedStatement?insert?=?connection.prepareStatement("INSERT?INTO?FOO(DATA)?VALUES(?)");
for?(int?i?=?1;?i?<=?5;?i++)?{
insert.setInt(1,?i?*?2);
insert.executeUpdate();
}
insert.close();
}?catch?(SQLException?e)?{
//?If?table?doesn't?exist...
if?(e.getSQLState().compareTo("X0Y32")?!=?0)?{
throw?e;
}
}?finally?{
connection.close();
}
}
public?static?void?shutdownDB()?throws?SQLException?{
try?{
DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true");
}?catch?(SQLException?e)?{
//?Throw?if?not?normal?single?database?shutdown
//?https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html
if?(e.getSQLState().compareTo("08006")?!=?0)?{
throw?e;
}
}
}
public?static?void?main(String[]?args)?throws?Exception?{
JavaSparkContext?sc?=?new?JavaSparkContext("local",?"JavaAPISuite");
//準(zhǔn)備數(shù)據(jù)
prepareData();
//構(gòu)建JdbcRDD
JavaRDD?rdd?=?JdbcRDD.create(
sc,
new?JdbcRDD.ConnectionFactory()?{
@Override
public?Connection?getConnection()?throws?SQLException?{
return?DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");
}
},
"SELECT?DATA?FROM?FOO?WHERE???<=?ID?AND?ID?<=??",
1,?5,?1,
new?Function()?{
@Override
public?Integer?call(ResultSet?r)?throws?Exception?{
return?r.getInt(1);
}
}
);
//結(jié)果:?[2,?4,?6,?8,?10]
System.out.println(rdd.collect());
shutdownDB();
sc.stop();
}
}
總結(jié)
以上是生活随笔為你收集整理的spark Java oracle,spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 大数据工程技术人员
- 下一篇: 测试cpu多核性能软件,CPU测试软件权