大数据常用工具类
工具類
config.properties
# jbdc配置
jdbc.datasource.size=10
jdbc.url=jdbc:mysql://hadoop101:3306/database?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
jdbc.user=root
jdbc.password=000000
?
# Kafka
kafka.broker.list=hadoop101:9092,hadoop102:9092,hadoop103:9092
?
# Redis配置
redis.host=hadoop101
redis.port=6379
?
# hive 的數據庫名(選配)
hive.database=database
Properties.Util
import java.io.InputStreamReader
import java.util.Properties
?
object PropertiesUtil {
?
? ?def load(propertieName: String): Properties = {
? ? ? ?val prop = new Properties();
? ? ? ?prop.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertieName),
? ? ? ? ? ?"UTF-8"))
? ? ? ?prop
? }
?
}
MyJdbcUtil
import com.alibaba.druid.pool.DruidDataSourceFactory
import java.sql.PreparedStatement
import java.util.Properties
import javax.sql.DataSource
?
object JdbcUtil {
?
? ?var dataSource: DataSource = init()
?
? ?def init() = {
? ? ? ?val properties = new Properties()
? ? ? ?val prop = PropertiesUtil.load("config.properties")
?
? ? ? ?properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
? ? ? ?properties.setProperty("url", prop.getProperty("jdbc.url"))
? ? ? ?properties.setProperty("username", prop.getProperty("jdbc.user"))
? ? ? ?properties.setProperty("password", prop.getProperty("jdbc.password"))
? ? ? ?properties.setProperty("maxActive", prop.getProperty("jdbc.datasource.size"))
?
? ? ? ?DruidDataSourceFactory.createDataSource(properties)
?
? }
?
? ?def executeUpdate(sql: String, params: Array[Any]): Int = { // "insert into xxx values (?,?,?)"
? ? ? ?var rtn = 0
? ? ? ?var pstmt: PreparedStatement = null
? ? ? ?val connection = dataSource.getConnection
? ? ? ?try {
? ? ? ? ? ?connection.setAutoCommit(false)
? ? ? ? ? ?pstmt = connection.prepareStatement(sql)
?
? ? ? ? ? ?if (params != null && params.length > 0) {
? ? ? ? ? ? ? ?for (i <- 0 until params.length) {
? ? ? ? ? ? ? ? ? ?pstmt.setObject(i + 1, params(i))
? ? ? ? ? ? ? }
? ? ? ? ? }
? ? ? ? ? ?rtn = pstmt.executeUpdate()
? ? ? ? ? ?connection.commit()
? ? ? } catch {
? ? ? ? ? ?case e: Exception => e.printStackTrace
? ? ? }
? ? ? ?rtn
? }
?
? ?def executeBatchUpdate(sql: String, paramsList: Iterable[Array[Any]]): Array[Int] = {
? ? ? ?var rtn: Array[Int] = null
? ? ? ?var pstmt: PreparedStatement = null
? ? ? ?val connection = dataSource.getConnection
? ? ? ?try {
? ? ? ? ? ?connection.setAutoCommit(false)
? ? ? ? ? ?pstmt = connection.prepareStatement(sql)
? ? ? ? ? ?for (params <- paramsList) {
? ? ? ? ? ? ? ?if (params != null && params.length > 0) {
? ? ? ? ? ? ? ? ? ?for (i <- 0 until params.length) {
? ? ? ? ? ? ? ? ? ? ? ?pstmt.setObject(i + 1, params(i))
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ?pstmt.addBatch()
? ? ? ? ? ? ? }
? ? ? ? ? }
? ? ? ? ? ?rtn = pstmt.executeBatch()
? ? ? ? ? ?connection.commit()
? ? ? } catch {
? ? ? ? ? ?case e: Exception => e.printStackTrace
? ? ? }
? ? ? ?rtn
? }
?
? ?// 測試
? ?def main(args: Array[String]): Unit = {
// ? ? ? JdbcUtil.executeUpdate("insert into table_1 values(?,?,?,?,?)", Array("take100", "100", 100, 200,300))
? ? ? ?JdbcUtil.executeBatchUpdate("insert into table_1 values(?,?,?,?,?)",List(Array("take101", "100", 200, 200,200),Array("take102", "100", 300, 300,300)))
? }
}
MyRedisUtil
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
?
object MyRedisUtil {
?
? ?var jedisPool: JedisPool = null
?
? ?def getJedisClient: Jedis = {
? ? ? ?if (jedisPool == null) {
? ? ? ? ? ?println("開辟一個連接池")
? ? ? ? ? ?val prop = PropertiesUtil.load("config.properties")
? ? ? ? ? ?val host = prop.getProperty("redis.host")
? ? ? ? ? ?val port = prop.getProperty("redis.port").toInt
?
? ? ? ? ? ?val jedisPoolConfig = new JedisPoolConfig()
? ? ? ? ? ?jedisPoolConfig.setMaxTotal(100) ?//最大連接數
? ? ? ? ? ?jedisPoolConfig.setMaxIdle(20) ? //最大空閑
? ? ? ? ? ?jedisPoolConfig.setMinIdle(20) ? ? //最小空閑
? ? ? ? ? ?jedisPoolConfig.setBlockWhenExhausted(true) ?//忙碌時是否等待
? ? ? ? ? ?jedisPoolConfig.setMaxWaitMillis(500)//忙碌時等待時長 毫秒
? ? ? ? ? ?jedisPoolConfig.setTestOnBorrow(true) //每次獲得連接的進行測試
?
? ? ? ? ? ?jedisPool = new JedisPool(jedisPoolConfig, host, port)
? ? ? }
? ? ? ?println(s"jedisPool.getNumActive = ${jedisPool.getNumActive}")
? ? ? ?println("獲得一個連接")
? ? ? ?jedisPool.getResource
? }
? ?
}
MyKafkaUitl
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
?
object MyKafkaUtil {
?
? ?private val properties: Properties = PropertiesUtil.load("config.properties")
? ?val broker_list = properties.getProperty("kafka.broker.list")
?
? ?// kafka消費者配置
? ?val kafkaParam = Map(
? ? ? ?"bootstrap.servers" -> broker_list,//用于初始化鏈接到集群的地址
? ? ? ?"key.deserializer" -> classOf[StringDeserializer],
? ? ? ?"value.deserializer" -> classOf[StringDeserializer],
? ? ? ?//用于標識這個消費者屬于哪個消費團體
? ? ? ?"group.id" -> "gmall_consumer_group",
? ? ? ?//如果沒有初始化偏移量或者當前的偏移量不存在任何服務器上,可以使用這個配置屬性
? ? ? ?//可以使用這個配置,latest自動重置偏移量為最新的偏移量
? ? ? ?"auto.offset.reset" -> "latest",
? ? ? ?//如果是true,則這個消費者的偏移量會在后臺自動提交,但是kafka宕機容易丟失數據
? ? ? ?//如果是false,會需要手動維護kafka偏移量
? ? ? ?"enable.auto.commit" -> (true: java.lang.Boolean)
? )
?
? ?// 創建DStream,返回接收到的輸入數據
? ?// LocationStrategies:根據給定的主題和集群地址創建consumer
? ?// LocationStrategies.PreferConsistent:持續的在所有Executor之間分配分區
? ?// ConsumerStrategies:選擇如何在Driver和Executor上創建和配置Kafka Consumer
? ?// ConsumerStrategies.Subscribe:訂閱一系列主題
? ?def getKafkaStream(topic: String, ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
? ? ? ?val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
? ? ? ?dStream
? }
}
MyEsUtil
import io.searchbox.client.config.HttpClientConfig
import io.searchbox.client.{JestClient, JestClientFactory}
import io.searchbox.core.{Bulk, BulkResult, Index}
import java.util.Objects
?
object MyEsUtil {
? ?private val ES_HOST = "http://hadoop101"
? ?private val ES_HTTP_PORT = 9200
? ?private var factory: JestClientFactory = null
?
? ?/**
? ? ?* 獲取客戶端
? ? ?*
? ? ?* @return jestclient
? ? ?*/
? ?def getClient: JestClient = {
? ? ? ?if (factory == null) build()
? ? ? ?factory.getObject
? }
?
? ?/**
? ? ?* 關閉客戶端
? ? ?*/
? ?def close(client: JestClient): Unit = {
? ? ? ?if (!Objects.isNull(client)) try
? ? ? ? ? ?client.shutdownClient()
? ? ? ?catch {
? ? ? ? ? ?case e: Exception =>
? ? ? ? ? ? ? ?e.printStackTrace()
? ? ? }
? }
?
? ?/**
? ? ?* 建立連接
? ? ?*/
? ?private def build(): Unit = {
? ? ? ?factory = new JestClientFactory
? ? ? ?factory.setHttpClientConfig(new HttpClientConfig.Builder(ES_HOST + ":" + ES_HTTP_PORT).multiThreaded(true)
? ? ? ? ? .maxTotalConnection(20) //連接總數
? ? ? ? ? .connTimeout(10000).readTimeout(10000).build)
?
? }
?
? ?// 批量插入
? ?def insertBulk(indexName: String, docList: List[Any]): Unit = {
? ? ? ?val jest: JestClient = getClient
? ? ? ?val bulkBuilder = new Bulk.Builder
? ? ? ?bulkBuilder.defaultIndex(indexName).defaultType("_ex")
? ? ? ?println(docList.mkString("\n"))
? ? ? ?for (doc <- docList) {
?
? ? ? ? ? ?val index: Index = new Index.Builder(doc).build()
? ? ? ? ? ?bulkBuilder.addAction(index)
? ? ? }
? ? ? ?val result: BulkResult = jest.execute(bulkBuilder.build())
? ? ? ?println(s"保存es= ${result.getItems.size()} 條")
? ? ? ?close(jest)
? }
?
? ?// 測試
? ?def main(args: Array[String]): Unit = {
? ? ? ?val jest: JestClient = getClient
? ? ? ?val doc = "{\n \"name\":\"yiyi\",\n \"age\": 17\n}"
? ? ? ?val index: Index = new Index.Builder(doc).index("myesutil_test").`type`("_doc").build()
? ? ? ?jest.execute(index)
? }
?
}
?
---------------------
原文:https://blog.csdn.net/qq_31108141/article/details/88367058
轉載于:https://www.cnblogs.com/Bkxk/p/10563723.html
總結
- 上一篇: html语义化练习易牛课堂代码
- 下一篇: 一个关于Integer的秘密