对应chd5.14的spark_GitHub - shixiaopengql/BigData-News: 基于Spark2.2新闻网大数据实时系统项目...
基于Spark2.2新聞網大數據實時系統項目
1. 說明
2.環境配置
2.1 CDH-5.14.2 (安裝步驟可參考地址),關于版本是按實際操作, CDH的版本兼容性很好。
Service
hadoop01
hadoop02
hadoop03
HDFS
NameNode
DateNode
DataNode
HBase
HMaster、HRegionServer
HRegionServer
HRegionServer
Hive
Hive
Flume
Flume
Flume
Flume
Kafka
Kafka
YARN
ResourceManager
NodeManager
NodeManager
Oozie
Oozie
Hue
Hue
Spark2
Spark
Zookeeper
Zookeeper
MySQL
MySQL
2.2 主機配置
1.Hadoop01, 4核16G , centos7.2
2.Hadoop02, 2核8G, centos7.2
3.Haddop03, 2核8G, centos7.2
2.3 項目架構
2.4 安裝依賴包
# yum -y install psmisc MySQL-python at bc bind-libs bind-utils cups-client cups-libs cyrus-sasl-gssapi cyrus-sasl-plain ed fuse fuse-libs httpd httpd-tools keyutils-libs-devel krb5-devel libcom_err-devel libselinux-devel libsepol-devel libverto-devel mailcap noarch mailx mod_ssl openssl-devel pcre-devel postgresql-libs python-psycopg2 redhat-lsb-core redhat-lsb-submod-security x86_64 spax time zlib-devel wget psmisc
# chmod +x /etc/rc.d/rc.local
# echo "echo 0 > /proc/sys/vm/swappiness" >>/etc/rc.d/rc.local
# echo "echo never > /sys/kernel/mm/transparent_hugepage/defrag" >>/etc/rc.d/rc.local
# echo 0 > /proc/sys/vm/swappiness
# echo never > /sys/kernel/mm/transparent_hugepage/defrag
# yum -y install rpcbind
# systemctl start rpcbind
# echo "systemctl start rpcbind" >> /etc/rc.d/rc.local
安裝perl支持
yum install perl* (yum安裝perl相關支持)
yum install cpan (perl需要的程序庫,需要cpan的支持,詳細自行百度)
3. 編寫數據生成模擬程序
3.1 模擬從nginx生成日志的log,數據來源(搜狗實驗室下載用戶查詢日志,搜索引擎查詢日志庫設計為包括約1個月(2008年6月)Sogou搜索引擎部分網頁查詢需求及用戶點擊情況的網頁查詢日志數據集合。)
3.2 數據清洗
數據格式為:訪問時間\t用戶ID\t[查詢詞]\t該URL在返回結果中的排名\t用戶點擊的順序號\t用戶點擊的URL其中,用戶ID是根據用戶使用瀏覽器訪問搜索引擎時的Cookie信息自動賦值,即同一次使用瀏覽器輸入的不同查詢對應同一個用戶ID
將文件中的tab更換成逗號
cat weblog.log|tr "\t" "," > weblog2.log
將文件中的空格更換成逗號
cat weblog2.log|tr " " "," > weblog.log
3.3 主要代碼段
public static void readFileByLines(String fileName) {
FileInputStream fis = null;
InputStreamReader isr = null;
BufferedReader br = null;
String tempString = null;
try {
System.out.println("以行為單位讀取文件內容,一次讀一整行:");
fis = new FileInputStream(fileName);
從文件系統中的某個文件中獲取字節
isr = new InputStreamReader(fis, "GBK");
br = new BufferedReader(isr);
int count = 0;
while ((tempString = br.readLine()) != null) {
count++;
//顯示行號
Thread.sleep(300);
String str = new String(tempString.getBytes("GBK"), "UTF8");
System.out.println("row:"+count+">>>>>>>>"+str);
writeFile(writeFileName, str);
}
isr.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (isr != null) {
try {
isr.close();
} catch (IOException e1) {
}
}
}
}
3.4 打包成weblogs.jar,打包步驟, 寫Shell腳本weblog-shell.sh
#/bin/bash
echo "start log......"
#第一個參數是原日志文件,第二個參數是日志生成輸出文件
java -jar /opt/jars/weblogs.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log
3.5 修改weblog-shell.sh可執行權限
chmod 777 weblog-shell.sh
4. Flume數據采集配置
4.1 將hadoop02, hadoop03中Flume數據采集到hadoop01中,而且hadoop02和hadoop03的flume配置文件大致相同
flume-collect-conf.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type =exec
a1.sources.r1.command= tail -F /opt/datas/weblog-flume.log
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop01
a1.sinks.k1.port = 5555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.keep-alive = 5
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4.2 hadoop01通過flume接收hadoop02與hadoop03中flume傳來的數據,并將其分別發送至hbase與kafka中,配置內容如下:
a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaSink hbaseSink
a1.sources.r1.type = avro
a1.sources.r1.channels = hbaseC kafkaC
a1.sources.r1.bind = hadoop01
a1.sources.r1.port = 5555
a1.sources.r1.threads = 5
#****************************flume + hbase******************************
a1.channels.hbaseC.type = memory
a1.channels.hbaseC.capacity = 10000
a1.channels.hbaseC.transactionCapacity = 10000
a1.channels.hbaseC.keep-alive = 20
a1.sinks.hbaseSink.type = asynchbase
## HBase表名
a1.sinks.hbaseSink.table = weblogs
## HBase表的列族名稱
a1.sinks.hbaseSink.columnFamily = info
## 自定義異步寫入Hbase
a1.sinks.hbaseSink.serializer = main.hbase.KfkAsyncHbaseEventSerializer
a1.sinks.hbaseSink.channel = hbaseC
## Hbase表的列 名稱
a1.sinks.hbaseSink.serializer.payloadColumn = datetime,userid,searchname,retorder,cliorder,cliurl
#****************************flume + kafka******************************
a1.channels.kafkaC.type = memory
a1.channels.kafkaC.capacity = 10000
a1.channels.kafkaC.transactionCapacity = 10000
a1.channels.kafkaC.keep-alive = 20
a1.sinks.kafkaSink.channel = kafkaC
a1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaSink.brokerList = hadoop01:9092
a1.sinks.kafkaSink.topic = webCount
a1.sinks.kafkaSink.zookeeperConnect = hadoop01:2181
a1.sinks.kafkaSink.requiredAcks = 1
a1.sinks.kafkaSink.batchSize = 1
a1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder
4.3 配置Flume執行Shell腳本
flume-collect-start.sh 分發到hadoop02,hadoop03 ,/opt/shell/
#/bin/bash
echo "flume-collect start ......"
sh /bin/flume-ng agent --conf conf -f /opt/conf/flume-collect-conf.properties -n a1 -Dflume.root.logger=INFO,console
flume-kfk-hb-start.sh 分發到hadoop01 ,/opt/shell
#/bin/bash
echo "flume-collect start ......"
sh /bin/flume-ng agent --conf conf -f /opt/conf/flume-hbase-kafka-conf.properties -n a1 -Dflume.root.logger=INFO,console
4.4 Flume分發到Hbase集成
下載Flume源碼并導入IDEA開發工具
1)將apache-flume-1.7.0-src.tar.gz源碼下載到本地解壓
2)通過IDEA導入flume源碼
3)根據flume-ng-hbase-sink模塊源碼修改
4)修改代碼SimpleAsyncHbaseEventSerializer.java
5)具體代碼看源碼
KfkAsyncHbaseEventSerializer.java 關鍵代碼
@Override
public List getActions() {
List actions = new ArrayList();
if (payloadColumn != null) {
byte[] rowKey;
try {
/*---------------------------代碼修改開始---------------------------------*/
// 解析列字段
String[] columns = new String(this.payloadColumn).split(",");
// 解析flume采集過來的每行的值
String[] values = new String(this.payload).split(",");
for (int i = 0; i < columns.length; i++) {
byte[] colColumn = columns[i].getBytes();
byte[] colValue = values[i].getBytes(Charsets.UTF_8);
// 數據校驗:字段和值是否對應
if (columns.length != values.length) break;
// 時間
String datetime = values[0].toString();
// 用戶id
String userid = values[1].toString();
// 根據業務自定義Rowkey
rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid, datetime);
// 插入數據
PutRequest putRequest = new PutRequest(table, rowKey, cf,
colColumn, colValue);
actions.add(putRequest);
/*---------------------------代碼修改結束---------------------------------*/
}
} catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}
4.5 將項目打包成jar,vita-flume-ng-hbase-sink.jar,分發到CDH的Flume/libs/下
5. Kafka配置(測試環境,Kafka部署hadoop01,不做高可用)
5.1 配置
配置advertised.listeners:=PLAINTEXT://xxxx:9092
5.2 測試生產消費是否成功
//create topic,副本數為1、分區數為1的topic,如果是配置了auto.create.topics.enable參數為true,可以忽略
sh bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --topic webCount --replication-factor 1 --partitions 1
//producer
sh /bin/kafka-console-producer --broker-list hadoop01:9092 --topic webCount
//consumer
sh /bin/kafka-console-consumer --zookeeper hadoop01:2181 --topic webCount --from-beginning
//delete topic
sh /bin/kafka-topics --delete --zookeeper hadoop01 --topic webCount
//topic list
sh /bin/kafka-topics --zookeeper hadoop01:2181 --list
5.3 編寫Kafka Consumer執行腳本kfk-test-consumer.sh,分發到/opt/shell/
#/bin/bash
echo "kfk-kafka-consumer.sh start......"
/bin/kafka-console-consumer --zookeeper hadoop01:2181 --from-beginning --topic webCount
6. Hbase配置
6.1 創建業務表
create 'weblogs','info'
//查看數據
count 'weblogs'
7. Hive配置
7.1 CDH配置 Hive與Hbase集成,或者配置
hbase.zookeeper.quorum
hadoop01,hadoop02,hadoop03
7.2 在hive中創建與hbase集成的外部表
CREATE EXTERNAL TABLE weblogs(
id string,
datetime string,
userid string,
searchname string,
retorder string,
cliorder string,
cliurl string
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES("hbase.columns.mapping"=
":key,info:datetime,info:userid,info:searchname,info:retorder,info:cliorder,info:cliurl")
TBLPROPERTIES("hbase.table.name"="weblogs");
#查看hbase數據記錄
select count(*) from weblogs;
# 查看表
show tables;
# 查看前10條數據
select * from weblogs limit 10;
8. Structured Streaming配置
8.1 測試Spark與mysql
val df =spark.sql("select count(1) from weblogs").show
8.2 Structured Streaming與MySQL集成
mysql創建相應的數據庫和數據表,用于接收數據
create database test;
use test;
CREATE TABLE `webCount` (
`titleName` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
`count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
8.3 Structured Streaming關鍵代碼
/**
* 結構化流從kafka中讀取數據存儲到關系型數據庫mysql
* 目前結構化流對kafka的要求版本0.10及以上
*/
object StructuredStreamingKafka {
case class Weblog(datatime: String,
userid: String,
searchname: String,
retorder: String,
cliorder: String,
cliurl: String)
val LOGGER: Logger = LogManager.getLogger("vita")
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("yarn")
.appName("streaming")
.getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "hadoop01:9092")
.option("subscribe", "webCount")
.load()
import spark.implicits._
val lines = df.selectExpr("CAST(value AS STRING)").as[String]
// // lines.map(_.split(",")).foreach(x => print(" 0 = " + x(0) + " 1 = " + x(1) + " 2 = " + x(2) + " 3 = " + x(3) + " 4 = " + x(4) + " 5 = " + x(5)))
val weblog = lines.map(_.split(","))
.map(x => Weblog(x(0), x(1), x(2), x(3), x(4), x(5)))
val titleCount = weblog
.groupBy("searchname")
.count()
.toDF("titleName", "count")
val url = "jdbc:mysql://hadoop01:3306/test"
val username = "root"
val password = "root"
val writer = new JDBCSink(url, username, password)
// val writer = new MysqlSink(url, username, password)
val query = titleCount
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(ProcessingTime("5 seconds"))
.start()
query.awaitTermination()
}
}
8.4 項目打包,spark-weblogs.jar.
9. 啟動流程
9.1 CDH啟動Zookeeper,Hadoop,Hbase,Mysql,Yarn,Flume,Kafka
9.2 先在Hadoop01 執行/opt/shell/flume-kfk-hb-start.sh 將數據分別傳到hbase和kafka中
9.3 在Hadoop02,Hadoop03 執行/opt/shell/flume-collect-start.sh 將數據發送到Hadoop01中
9.4 在hadoop01 , 執行提交Spark任務
spark on yarn, 集成spark-sql-kafka
sh /bin/spark2-submit \
--class com.vita.spark.StructuredStreamingKafka \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--executor-cores 2 \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 \
/opt/jars/spark-weblogs.jar \
10
用IDEA 遠程調試Spark代碼,參考地址
sh /bin/spark2-submit \
--class com.vita.spark.StructuredStreamingKafka \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--executor-cores 1 \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 \
--driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005" \
/opt/jars/spark-weblogs.jar \
10
Yarn kill Spark任務 : yarn application -kill [任務名]
9.5 在Hadoop02,Hadoop03 執行/opt/weblog-shell.sh , 啟動 StructuredStreamingKafka來從kafka中取得數據,處理后存到mysql中
9.6 登錄MySQL ,查看webCount表
總結
以上是生活随笔為你收集整理的对应chd5.14的spark_GitHub - shixiaopengql/BigData-News: 基于Spark2.2新闻网大数据实时系统项目...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: editorloop 占用_system
- 下一篇: 2019小程序没必要做了_企业有必要开发