LINUX 如何实现多线程进行cp复制
生活随笔
收集整理的這篇文章主要介紹了
LINUX 如何实现多线程进行cp复制
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
關于這個問題,意義雖然有限因為一般來說在復制文件的時候,實際的瓶頸來自于I/O,不管開啟多少個線程實際上速度并不會快多少,但是為了練習多線程編程,
這里給出了一種C++代碼實現的方式,代碼附在最后。
實際上就是將一個文件分割為多個片段,開啟多個線程進行同時復制,如果用戶制定的并行大于服務器實際的CPU核數,程序會自動降級并行度為CPU核數,如果文件小于
100M則并行度始終為1。
root@bogon:/home/gaopeng/mmm# ./parcp log.log log10.log 2 ? ? ??
set parallel:2
Your cpu core is:4
real parallel:2
Will Create 2 Threads
140677902710528:0:174522367:3:4
140677894317824:174522368:349044736:3:4
Copy Thread:140677902710528 work 25%
Copy Thread:140677894317824 work 25%
Copy Thread:140677902710528 work 50%
Copy Thread:140677902710528 work 75%
Copy Thread:140677902710528 work 100%
Copy Thread:140677902710528 work Ok!!
Copy Thread:140677894317824 work 50%
Copy Thread:140677894317824 work 75%
Copy Thread:140677894317824 work Ok!!
復制完成后進行md5驗證
root@bogon:/home/gaopeng/mmm# md5sum log.log
f64acc21f7187a865938b340b3eda198 ?log.log
root@bogon:/home/gaopeng/mmm# md5sum log10.log
f64acc21f7187a865938b340b3eda198 ?log10.log
可以看出校驗是通過的
代碼如下:
#include<iostream>
#include <map>
#include<stdint.h>
#include<stdio.h>
#include<string.h>
#include<unistd.h>
#include<sys/types.h>
#include<sys/stat.h>
#include <sys/sysinfo.h>
#include<fcntl.h>
#include<errno.h>
#include <time.h>
#include <stdarg.h>
#include <stdlib.h>
#include <pthread.h>
#define MAX_BUFFER 65536
using namespace std;
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;
class thread_info
{
????????private:
????????????????uint64_t start_pos;
????????????????uint64_t end_pos;
????????????????int fdr;
????????????????int fdw;
????????public:
????????????????pthread_t t_id;
????????????????static int do_id;
????????public:
????????????????thread_info()
????????????????{
????????????????????????start_pos = 0;
????????????????????????end_pos = 0;
????????????????????????fdr = 0;
????????????????????????fdw = 0;
????????????????????????t_id = 0;
????????????????}
????????????????void set_start(uint64_t a)
????????????????{
????????????????????????start_pos = a;
????????????????}
????????????????void set_end(uint64_t a)
????????????????{
????????????????????????end_pos = a;
????????????????}
????????????????void set_fd(int a,int b)
????????????????{
????????????????????????fdr = a;
????????????????????????fdw = b;
????????????????}
????????????????uint64_t get_start(void)
????????????????{
????????????????????????return start_pos;
????????????????}
????????????????uint64_t get_stop(void)
????????????????{
????????????????????????return end_pos;
????????????????}
????????????????int get_fdr(void)
????????????????{
????????????????????????return fdr;
????????????????}
????????????????int get_fdw(void)
????????????????{
????????????????????????return fdw;
????????????????}
????????????????void print(void)
????????????????{
????????????????????????cout<<start_pos<<":"<<end_pos<<":"<<t_id<<endl;
????????????????}
};
int thread_info::do_id = 0;
class ABS_dispatch
{
????????public:
????????????????ABS_dispatch()
????????????????{
????????????????????????par_thr = 0;
????????????????????????max_cpu = 0;
????????????????}
????????????????virtual thread_info* call_thread(void) = 0;
????????????????virtual void make_init(uint32_t t_n) = 0;
????????????????uint32_t getcpu(void)
????????????????{
????????????????????????return get_nprocs() ;
????????????????}
????????????????virtual ~ABS_dispatch(){
????????????????}
????????protected:
????????????????uint32_t par_thr;
????????????????uint32_t max_cpu;
};
class dispatch:public ABS_dispatch
{
????????public:
????????????????typedef multimap<uint64_t,uint64_t>::iterator pair_piece_iterator;
????????????????dispatch():ABS_dispatch()
????????{
????????????????file_idr = 0;
????????????????file_idw = 0;
????????????????file_size = 0;
????????????????tread_arr_p = NULL;
????????}
????????????????virtual thread_info* call_thread(void);
????????????????virtual void make_init(uint32_t t_n)
????????????????{
????????????????????????max_cpu = getcpu(); //
????????????????????????cout<<"Your cpu core is:"<<max_cpu<<"\n";
????????????????????????if(t_n > max_cpu) //parallel
????????????????????????{
????????????????????????????????cout<<"Parallel downgrad to cpu core:"<<max_cpu<<"\n";
????????????????????????????????par_thr = max_cpu;
????????????????????????}
????????????????????????else
????????????????????????{
????????????????????????????????par_thr = t_n;
????????????????????????}
????????????????}
????????????????void set_init(int file_idr,int file_idw)
????????????????{
????????????????????????file_size = lseek(file_idr,0,SEEK_END);
????????????????????????if(file_size<100000000)
????????????????????????{
????????????????????????????????cout<<"File small than 100M par = 1" <<"\n";
????????????????????????????????par_thr = 1;
????????????????????????}
????????????????????????this->file_idr = file_idr;
????????????????????????this->file_idw = file_idw;
????????????????}
????????????????uint32_t real_par()
????????????????{
????????????????????????return par_thr;
????????????????}
????????????????virtual ~dispatch()
????????????????{
????????????????????????pair_piece.clear();
????????????????????????delete [] tread_arr_p;
????????????????}
????????private:
????????????????int file_idr;
????????????????int file_idw;
????????????????multimap<uint64_t,uint64_t> pair_piece;
????????????????uint64_t file_size;
????????public:
????????????????thread_info* tread_arr_p;
};
static void* do_work(void* argc)
{
????????uint64_t b;
????????uint64_t e;
????????int fdr;
????????int fdw;
????????char* buffer[MAX_BUFFER]={0};
????????thread_info* tread_arr_p;
????????uint64_t loopc = 0;
????????uint64_t loopc25 = 0;
????????uint64_t i = 0;
????????int m = 1;
????????pthread_t t_id;
????????tread_arr_p = static_cast<thread_info*>(argc);
????????//臨界區 MUTEX
????????pthread_mutex_lock(&counter_mutex);
????????b = (tread_arr_p+ tread_arr_p->do_id)->get_start();
????????e = (tread_arr_p+ tread_arr_p->do_id)->get_stop();
????????fdr = (tread_arr_p+ tread_arr_p->do_id)->get_fdr();
????????fdw = (tread_arr_p+ tread_arr_p->do_id)->get_fdw();
????????t_id = (tread_arr_p+ tread_arr_p->do_id)->t_id ;
????????cout<< t_id <<":"<<b<<":"<<e<<":"<<fdr<<":"<<fdw<<"\n";
????????tread_arr_p->do_id++;
????????pthread_mutex_unlock(&counter_mutex);
????????//臨界區
????????loopc = e/uint64_t(MAX_BUFFER);
????????loopc25 = loopc/(uint64_t)4;
????????while(i<loopc)
????????{
????????????????if(i == loopc25*m )
????????????????{
????????????????????????cout<< "Copy Thread:"<<t_id<<" work "<<25*m<<"%\n";
????????????????????????m++;
????????????????}
????????????????memset(buffer,0,MAX_BUFFER);
????????????????pread(fdr,buffer,MAX_BUFFER,uint64_t(i*MAX_BUFFER));
????????????????pwrite(fdw,buffer,MAX_BUFFER,uint64_t(i*MAX_BUFFER));
????????????????i++;
????????}
????????memset(buffer,0,MAX_BUFFER);
????????pread(fdr,buffer,(e-uint64_t(i*MAX_BUFFER)),uint64_t(i*MAX_BUFFER));
????????pwrite(fdw,buffer,(e-uint64_t(i*MAX_BUFFER)),uint64_t(i*MAX_BUFFER));
????????cout<< "Copy Thread:"<<t_id<<" work Ok!!"<<"\n";
????????return NULL;
}
thread_info* dispatch::call_thread()
{
????????int i = 0;
????????uint64_t temp_size = 0;
????????temp_size = file_size/par_thr;
????????tread_arr_p = new thread_info[par_thr];
????????cout<<"Will Create "<<par_thr<<" Threads\n";
????????//cout<<tread_arr_p<<endl;
????????//cout<<sizeof(thread_info)<<endl;
????????for(i = 0;i<par_thr-1;i++)
????????{
????????????????pair_piece.insert( pair<uint64_t,uint64_t>(temp_size*i,temp_size*(i+1)-1 ));
????????}
????????pair_piece.insert( pair<uint64_t,uint64_t>(temp_size*i,file_size ));
????????i = 1;
????????for(pair_piece_iterator it =pair_piece.begin();it !=pair_piece.end() ;it++)
????????{
????????????????//cout<<"--Thread: "<<i<<"\n";
????????????????//cout<<it->first<<"\n";
????????????????//cout<<it->second<<"\n";
????????????????//cout<<tread_arr_p+(i-1)<<endl;
????????????????(tread_arr_p+(i-1))->set_start(it->first);
????????????????(tread_arr_p+(i-1))->set_end(it->second);
????????????????(tread_arr_p+(i-1))->set_fd(file_idr,file_idw);
????????????????pthread_create(&((tread_arr_p+(i-1))->t_id),NULL,do_work,static_cast<void*>(tread_arr_p));
????????????????//(tread_arr_p+(i-1))->print();
????????????????i++;
????????}
????????return tread_arr_p;
}
int main(int argc,char** argv)
{
????????dispatch test;
????????thread_info* thread_info_p = NULL;
????????uint32_t real_par;
????????void *tret;
????????int fdr = open(argv[1],O_RDONLY|O_NOFOLLOW);
????????int fdw = open(argv[2],O_RDWR|O_CREAT|O_EXCL,0755);
????????cout<<"Author: gaopeng QQ:22389860 Blog:http://blog.itpub.net/7728585\n";
????????if(argc<4)
????????{
????????????????cout<<"USAGE:parcp sourcefile destfile paralle\n";
????????????????return -1;
????????}
????????if(fdr == -1 || fdw == -1)
????????{
????????????????perror("open readfile:");
????????????????return -1;
????????}
????????if(fdw == -1)
????????{
????????????????perror("open wirtefile:");
????????????????return -1;
????????}
????????if(sscanf(argv[3],"%u",&real_par) == EOF)
????????{
????????????????perror("sscanf:");
????????????????return -1;
????????}
????????cout<<"set parallel:"<<real_par<<"\n";
????????//cout<<lseek(fd,0,SEEK_SET) <<endl;
????????test.make_init(real_par);
????????test.set_init(fdr,fdw);
????????real_par = test.real_par();
????????cout<<"real parallel:" <<real_par<<"\n";
????????thread_info_p = test.call_thread();
????????for(int i = 0 ;i<real_par;i++)
????????{
????????????????//cout<<(thread_info_p+i)->t_id<<endl;
????????????????pthread_join((thread_info_p+i)->t_id,&tret);
????????????????//cout<<reinterpret_cast<long>(tret)<<endl;
????????}
????????close(fdw);
????????close(fdr);
} 如果覺得不錯 您可以考慮請作者喝杯茶(微信支付): ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?作者微信號:
? ? ? ? ? ? ? ? ? ?
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生
這里給出了一種C++代碼實現的方式,代碼附在最后。
實際上就是將一個文件分割為多個片段,開啟多個線程進行同時復制,如果用戶制定的并行大于服務器實際的CPU核數,程序會自動降級并行度為CPU核數,如果文件小于
100M則并行度始終為1。
root@bogon:/home/gaopeng/mmm# ./parcp log.log log10.log 2 ? ? ??
set parallel:2
Your cpu core is:4
real parallel:2
Will Create 2 Threads
140677902710528:0:174522367:3:4
140677894317824:174522368:349044736:3:4
Copy Thread:140677902710528 work 25%
Copy Thread:140677894317824 work 25%
Copy Thread:140677902710528 work 50%
Copy Thread:140677902710528 work 75%
Copy Thread:140677902710528 work 100%
Copy Thread:140677902710528 work Ok!!
Copy Thread:140677894317824 work 50%
Copy Thread:140677894317824 work 75%
Copy Thread:140677894317824 work Ok!!
復制完成后進行md5驗證
root@bogon:/home/gaopeng/mmm# md5sum log.log
f64acc21f7187a865938b340b3eda198 ?log.log
root@bogon:/home/gaopeng/mmm# md5sum log10.log
f64acc21f7187a865938b340b3eda198 ?log10.log
可以看出校驗是通過的
代碼如下:
點擊(此處)折疊或打開
? ? ? ? ? ? ? ? ? ?
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生
總結
以上是生活随笔為你收集整理的LINUX 如何实现多线程进行cp复制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: servlet监听完成统计在线人数,显示
- 下一篇: HelloWorld实例(springm