利用java多线程向MongoDB中批量插入静态文件
生活随笔
收集整理的這篇文章主要介紹了
利用java多线程向MongoDB中批量插入静态文件
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
第一步、開發環境:
win7 64位(注:MongoDb在32位windows上有數量限制(2G),詳見官方文檔)
Mongodb3.2
mongofb_java_driver 3.2.2
第二部、安裝mongodb,并開啟服務
略:可參見官方文檔
第三部、代碼
import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.LinkedList; import java.util.List; import org.bson.Document; import com.mongodb.MongoClient; import com.mongodb.MongoWriteException; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase;/*** created by soarhu 2016/4/21*/public class MongodbBatchInsetUtils {static final int ThreadNum=3;//設置向MongoDb中插入數據的線程數static int ThreadSizeCount = 0;//用于計算子線程完成數static final String HOST = "127.0.0.1";//主機static final int PORT = 27017;//端口 static final String DATABASE_NAME="mydb";//存儲數據庫名稱,如果不存在會自動創建數據庫static final String COLLECTION_NAME="md";//存儲Collectionpublic static final String DIR = "E:\\targets";//掃描文件路徑public static final String FILE_SUFFIX = "html";//掃描文件類型,不設置,默認為所有文件public static final String CHARSET = "UTF-8";//文件處理編碼格式public static void main(String[] args) {MongoClient client =new MongoClient(HOST,PORT);MongoDatabase dataBase = client.getDatabase(DATABASE_NAME);MongoCollection<Document> collection = dataBase.getCollection(COLLECTION_NAME);Pool p = new Pool();Produce pro = new Produce(p);Long startTime = System.currentTimeMillis(); new Thread(pro).start();//開啟從磁盤讀取文件的線程Thread[] th = new Thread[ThreadNum];for(int i=0;i<ThreadNum;i++){//開啟向mongoDb寫入數據的線程Thread a = new Thread(new Customer(p,collection));a.start();th[i]=a;}boolean res=true;while(res){if(MongodbBatchInsetUtils.ThreadSizeCount==ThreadNum+1){res=false;Long endTime = System.currentTimeMillis();System.out.println("數據寫入完成,吸入總數:"+p.hasUploadToDB+",共花費時間約為:"+(endTime-startTime)+"ms\n");for(Thread t:th){t.interrupt();//在子線程將數據寫完后,中斷子線程。 }if(null!=client){client.close();//關閉連接collection=null;dataBase=null;} } else {System.out.println("已寫入數據:"+p.hasUploadToDB);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}} }//生產者,從磁盤讀取數據 class Produce implements Runnable{private Pool pool=null;public Produce(Pool pool){this.pool= pool;}@Overridepublic void run() {getFilesInDir(MongodbBatchInsetUtils.DIR, MongodbBatchInsetUtils.FILE_SUFFIX);MongodbBatchInsetUtils.ThreadSizeCount++;System.out.println("READING FINISHED!!");}//遞歸讀取dir目錄中所有以suffix結尾的文件,若不指定文件類型,默認讀取所有文件public void getFilesInDir(String dir,String suffix){if(null!=dir && dir.trim().length()>0){File file = new File(dir.trim());if(file.exists() && file.isDirectory()){File[] flist = file.listFiles();if(null!=flist && flist.length>0){for(File f:flist){if(f.isFile()){if(null==suffix|| "".equals(suffix)){pool.putFile(f);}if(null!=suffix &&suffix.trim().length()>0){if(f.getName().endsWith(suffix.trim())){pool.putFile(f);}else{throw new RuntimeException("找不到對應文件類型");}}}else{getFilesInDir(f.getAbsolutePath(),suffix);}}}else{throw new RuntimeException("文件內容為空");}}else{throw new RuntimeException("目錄不存在,請檢查路徑正確性!");}}} }//消費者,向mongoDb中寫數據 class Customer implements Runnable{private Pool pool=null;MongoCollection<Document> collection = null;public Customer(Pool pool,MongoCollection<Document> collection){this.pool = pool;this.collection = collection;}@Overridepublic void run() {while(true){File f = pool.fetchFile();if(null==f){return ;}try {saveToMonGoDb(f); // if(pool.hasUploadToDB%1000==0) // System.out.println("已寫入數據:"+pool.hasUploadToDB);} catch (MongoWriteException e) {System.out.println("寫入數據庫異常:"+e.getMessage());return ;}if(pool.getSize()==0){System.out.println(Thread.currentThread().getName()+" :WRITTING FINISHED!!");MongodbBatchInsetUtils.ThreadSizeCount++;}}}//將文件以文件名為id,文件內容為值保存在數據庫中private void saveToMonGoDb(File file){String _id = file.getName().substring(0,file.getName().lastIndexOf("."));String content = readFileContext(file, MongodbBatchInsetUtils.CHARSET);Document document = new Document("_id",_id).append("content", content);collection.insertOne(document);}//讀取文件內容,以charSet編碼處理public static String readFileContext(File file,String charSet) {StringBuilder sb;BufferedReader reader=null;try {reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), charSet)); String line = null;sb = new StringBuilder();while(null!=(line = reader.readLine())){sb.append(line+"\n");}return sb.toString();}catch (Exception e) {System.out.println("文件讀取失敗!"+e.getMessage());}finally{try {reader.close();} catch (IOException e) {e.printStackTrace();}}return null;}}//池,緩沖區 class Pool{volatile int size=0;//緩沖區中條目數量volatile int limit =1000;volatile int hasUploadToDB=0;volatile private List<File> files = new LinkedList<File>();//入棧public synchronized void putFile(File file){while(files.size()==limit){try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}files.add(file);notifyAll();++size;}//出棧public synchronized File fetchFile(){while(files.size()==0 ){try {this.wait();} catch (InterruptedException e) {return null;}}File file = null;notify();if(files.size()>0){file = files.remove(0);--size;++hasUploadToDB;}return file;}public int getSize(){return this.size;}}
?
轉載于:https://www.cnblogs.com/alienSmoking/p/5422675.html
總結
以上是生活随笔為你收集整理的利用java多线程向MongoDB中批量插入静态文件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一套伊璇补水美白价格多少钱?
- 下一篇: 乌镇白天好玩还是晚上好玩