spark mongo java_java及spark2.X连接mongodb3.X单机或集群的方法(带认证及不带认证)...
首先,我們明確的是訪問Mongos和訪問單機Mongod并沒有什么區別。接下來的方法都是既可以訪問mongod又可以訪問Mongos的。
另外,讀作java寫作scala,反正大家都看得懂......大概?
1、不帶認證集群的連接方法(JAVAscala):
首先是創建連接的方法,我們先聲明一個client,然后指定訪問的DB和collection:
private lazy val mongo = new MongoClient("192.168.2.51", 27017)private lazy val db = mongo.getDatabase("test")private lazy val dbColl = db.getCollection("origin2")
然后我們讀取數據:
import com.mongodb.client.model.Filters.{eq =>eqq}
val docs= dbColl.find(eqq("basiclabel.procedure", "second")).iterator()
額。。上面那段代碼是帶filter過濾的讀取數據。首先Import?com.mongodb.client.model.Filters.eq并把eq重命名為eqq,然后通過dbColl.find(Bson)方法讀取指定數據。剩下的就是正常的迭代器的使用方法了,docs獲取出來的數據是Iterator[Document]。
然后我們更新數據:
dbColl.updateOne(eqq("_id", x.get("_id")), set("segdata", fenduan(str, name)))
上面這段代碼是說找到_id對應的數據,并將其中一個字段set為一個新的值,這個值可以為Document,String,Int,List等一系列數據結構。我這里fenduan方法返回的是一個Document,做了一層嵌套。
至于插入數據更為簡單:
dbColl.insertOne(doc)
2、不帶認證的spark讀取方法(scala,理直氣壯)
兩種方式,其一是在創建sparksession的時候(SparkContext可以使用第二種方法,醒醒兄弟,2017年了),直接指定"spark.mongodb.input.uri"。然后使用正常的MongoSpark來讀取數據。(pipeline里面是過濾條件,愿意嘗試的各位可以自己試試filter下的其他方法)。使用rdd是因為rdd更適合進行map和flatmap等一系列精細的轉換操作,如果只需要讀數據,可以使用MongoSpark.read(spark)方法,直接獲取DataFrameReader。
val spark =SparkSession.builder()
.master("spark://192.168.2.51:7077")
.config(new SparkConf().setJars(Array("hdfs://192.168.2.51:9000/mongolib/mongo-spark-connector_2.11-2.0.0.jar","hdfs://192.168.2.51:9000/mongolib/bson-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongo-java-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-core-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/commons-io-2.5.jar","hdfs://192.168.2.51:9000/segwithorigin2.jar")))
.config("spark.cores.max", 80)
.config("spark.executor.cores", 16)
.config("spark.executor.memory", "32g")
.config("spark.mongodb.input.uri", "mongodb://192.168.2.51:27017/test.origin2")//.config("spark.mongodb.output.uri", "mongodb://192.168.12.161:27017/test.origin2")
.getOrCreate()
val rdd = MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(eqq("basiclabel.procedure", "second")))).build.toRDD()
第二種方式也較為簡單,創建一個ReadConfig,這個是connector提供的一個單例類,可以設置很多參數,例如(此時不必指定"spark.mongodb.input.uri"),如下所示是通過sparkcontext和通過sparksession兩種方式讀取數據的方法:
val readConfig =ReadConfig(Map("uri" -> "mongodb://192.168.2.48:27017/","database" -> "test","collection" -> "test"))
val r2=MongoSpark.load(spark, readConfig).rdd//val r2 = MongoSpark.load(spark.sparkContext, readConfig)
3、帶認證的Java讀取方法:
帶認證的需要先創建一個MongoURI,在URI里把用戶名,密碼和認證庫都指定清楚。這種方法通用性比較強,因為spark也這么用,如果使用其他方式認證要么是必須使用庫等于認證庫,要么是沒有通用性。這種方法可以在admin認證然后去讀test的數據,就很好。
//帶認證的需要先創建一個MongoURI,在URI里把用戶名,密碼和認證庫都指定清楚,至于為什么需要指定庫建議看上一篇博客
val mongoURI = new MongoClientURI("mongodb://gaoze:gaolaoban@192.168.2.48:27017/?authSource=admin")//val mongoURI = new MongoClientURI("mongodb://192.168.2.48:27017/");
lazy val mongo = newMongoClient(mongoURI)private lazy val db = mongo.getDatabase("test")private lazy val dbColl = db.getCollection("test")
//然后和1一樣
4、帶認證的Spark讀取方法:
同3一樣,在URI里加入用戶名密碼和庫就行了:
val spark =SparkSession.builder()
.master("spark://192.168.2.51:7077")
.config(new SparkConf().setJars(Array("hdfs://192.168.2.51:9000/mongolib/mongo-spark-connector_2.11-2.0.0.jar","hdfs://192.168.2.51:9000/mongolib/bson-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongo-java-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-core-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/commons-io-2.5.jar","hdfs://192.168.2.51:9000/segwithorigin2.jar")))
.config("spark.cores.max", 80)
.config("spark.executor.cores", 16)
.config("spark.executor.memory", "32g")
//這里這個配置項指定了用戶名gaoze,密碼gaolaoban,認證庫admin
.config("spark.mongodb.input.uri", "mongodb://gaoze:gaolaoban@192.168.2.51:27017/test.origin2?authSource=admin").getOrCreate()
val rdd= MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(eqq("basiclabel.procedure", "second")))).build.toRDD()
或者:
//這里指定了用戶名rw,密碼1,認證庫test
val readConfig =ReadConfig(Map("uri" -> "mongodb://rw:1@192.168.2.48:27017/?authSource=test","database" -> "test","collection" -> "test"))
val rdd = MongoSpark.builder().sparkSession(spark).readConfig(readConfig).build().toRDD()
//val r2 = MongoSpark.load(spark.sparkContext, readConfig)
總結
以上是生活随笔為你收集整理的spark mongo java_java及spark2.X连接mongodb3.X单机或集群的方法(带认证及不带认证)...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java反射的优化_请问Java反射的性
- 下一篇: java 管道设计_使用管道流实现Jav