springCloud 初探
分布式系統理論
分布式系統是若干個獨立計算機的集合,這些計算機的集合,這些計算機對于用戶來說就像單個相關系統。分布式系統是由一組通過網絡進行通信,為了完成共同的任務而協調工作的計算機節點組成的系統。分布式系統的出現是為了用廉價的,普通的機器完成單個計算機無法完成的計算,存儲任務。其目的是:利用更多的機器,處理更多的數據。
RPC
定義: RPC是指遠程過程調用,是一種進程間的通信方式,它是一種技術的思想,而不是規范。它允許程序調用另一個地址空間的過程或函數,而不是程序顯式編碼這個遠程調用的細節。即程序員無論是調用本地還是遠程的函數,本質上編寫的調用代碼基本相同。
RPC原理:
?
RPC的核心:通訊, 序列化(方便數據傳輸)。
序列化:數據傳輸需要轉換。
解決這些核心問題我們可以使用Doubb。
springCloud技術概況
?springCloud技術分布圖:
springCloud升級:
springBoot和springCloud版本兼容查詢
https://spring.io/projects/spring-cloud#learn
springCloud 對應依賴:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.3</version><type>pom</type><scope>import</scope> </dependency>Eureka
?作用: Eureka能夠自動注冊并發現微服務,然后對服務的狀態,信息進行集中管理,這樣我們需要獲取其他服務的信息時,我們只需要向Eureka進行查詢就可以了。
這樣服務之間的強廣聯性就會被進一步減弱。
?對應依賴:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId><version>3.1.3</version> </dependency>配置application.yml文件
eureka:client:#由于我們是作為服務端角色,所以不需要獲取服務端,改為false, 默認為truefetch-registry: false#暫時不需要將自己注冊到eurekaregister-with-eureka: falseservice-url:defaultZone: http://localhost:8888/eureka且要在啟動類中添加注解:?@EnableEurekaServer?
效果圖:
?接下來將各個服務作為客戶端。
作為客戶端的對應依賴:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId><version>3.1.3</version> </dependency>配置各個服務的application.yml,讓將服務地址指向eureka服務的地址,這樣才能實現注冊。?
客戶端application.yml配置:
eureka:client:service-url:defaultZone: http://localhost:8888/eurekaspring:application:name: ”對應名字“效果圖:
當我們的服務啟動之后,每隔一段時間eureka會發送一次心跳包,這樣eureka就能檢測到我們的服務是否還在正常運行。
通過eureka來調用服務?
例子一:
舊代碼
public UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid);RestTemplate template = new RestTemplate();DbUser user = template.getForObject("http://localhost:8083/dbUser/" + uid, DbUser.class);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){DbBook book = template.getForObject("http://localhost:8081/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);books.add(i, book);}userBook.setBook(books);return userBook;}這里其實就是通過詢問eureka對應的服務名來獲取對應的ip地址。?
?負載均衡的實現
? ? 同一個服務器實際上可以注冊很多個的, 但它們的端口是不同的,比如我們創建多個用戶查詢服務,將原有的端口進行修改,由idea中設置啟動參數來決定,這樣就可以創建幾個同端口的相同服務了。?
效果圖中說明在用戶服務處有多個相同的服務。?
?當我們要使用用戶服務時,如果有第一個用戶服務down掉的話,就會有另一個用戶服務來執行對應的操作,防止整哥微服務不可用,大大提高了安全性。
當存在多個相同服務的時候就會通過對應的負載均衡的策略使每個服務都被調用起來,從而實現負載均衡。
新的實現代碼
1.在condig包中創建一個BeanConfiguration.java。
@Configuration public class BeanConfiguration {@Bean//負載均衡@LoadBalancedpublic RestTemplate restTemplate(){return new RestTemplate();} }2.在對應的service層中的使用@Autowired進行自動注入使用,將原來的ip地址改為在eureka中對應的服務名字。
@Resource private RestTemplate template;public UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid); //將原來的ip地址改為在eureka中對應的服務名字DbUser user = template.getForObject("http://user-service/dbUser/" + uid, DbUser.class);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){DbBook book = template.getForObject("http://book-service/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);books.add(i, book);}userBook.setBook(books);return userBook;}注冊中心高可用
?為了防止eureka down掉,我們可以搭建eureka集群。
效果圖:
?搭建eureka集群步驟:
1.修改兩個eureka服務端的配置文件。
applicationn01.yml
server:port: 9999 eureka:instance:#由于不支持多個localhost的eureka的服務器,但是又只能在本地測試,所有就只能自定義主機名稱了hostname: eureka01client:#不需要獲取服務端fetch-registry: false#去掉register-with-eureka選項,讓eureka服務器自己注冊到其他的eureka服務器,這樣才能相互啟用service-url:#注意這里要填寫其他的eureka服務器地址,不用寫自己的defaultZone: http://eureka02:9999/erekaapplication02.yml
server:port: 9999 eureka:instance:#由于不支持多個localhost的eureka的服務器,但是又只能在本地測試,所有就只能自定義主機名稱了hostname: eureka02client:#不需要獲取服務端fetch-registry: false#去掉register-with-eureka選項,讓eureka服務器自己注冊到其他的eureka服務器,這樣才能相互啟用service-url:#注意這里要填寫其他的eureka服務器地址,不用寫自己的defaultZone: http://eureka01:8888/ereka2.啟動eureka集群
在因為是本地測試所以我們要修改本地的hosts。
eureka01:?
eureka02:?
3.把所有的微服務在eureka集群中都注冊一次
service-url:defaultZone: http://localhost:8888/eureka, http://localhost:9999/eureka?在一個eureka down掉的時候,另一個eureka還會繼續工作,這時我們就可以對應down 掉的eureka進行維修,這樣就實現了高可用。
OpenFeign
? ? OpenFeign和RestTemplate一樣,也是一種HTTP客戶端請求工具,但它使用起來更加便捷。
對應依賴:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId><version>3.1.3</version> </dependency>在對應的啟動類上加上 @EnableFeignClients?
配對應的FeignClient服務接口?。(可以單獨創建一個包來存放這些服務接口)
服務接口格式:
@FeignClient("在eureka中配置的服務名字") public interface BookClient { //該接口里面的方法為你要調用的Controller中的方法,這些方法的路徑要寫全@GetMapping("/dbBook/{id}")DbBook queryById(@PathVariable("id") Integer id); }?要使用此接口時可以通過@Resource進行注入。(類似Dao層的mybatis,其通過@FeignClient將接口注入到spring中)
?調用此接口的Service層中的代碼就要修改為下:
public UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid); // DbUser user = template.getForObject("http://user-service/dbUser/" + uid, DbUser.class);DbUser user = userClient.queryById(uid);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){ // DbBook book = template.getForObject("http://book-service/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);DbBook book = bookClient.queryById(dbBorrows.get(i).getBid());books.add(i, book);}userBook.setBook(books);return userBook;}OpenFeign服務降級
為對應的client接口創建實現類。
@Component //注入到spring中,使得BookClient能調用到此實現類 public class BookFallBackClient implements BookClient{@Overridepublic DbBook queryById(Integer id) {return new DbBook();} } @Component 注入到spring中,使得UserClient能調用到此實現類 public class UserFallBackClient implements UserClient{@Overridepublic DbUser queryById(Integer id) {return new DbUser();} }?將此實現類添加到client的備選方案中。
?在application.xml配置熔斷支持
feign:circuitbreaker:enabled: true?效果圖:
Hystrix??
??Hystrix服務熔斷
? ? 微服務之間是可以相互調用的。?
由于位于最底端的服務提供者發生了故障,那么此時會直接導致ABCD全線崩潰,就像雪崩一樣。
這種情況實際上是不可避免的,由于太多的因素,比如網絡故障,系統故障,硬件問題,會導致這種極端的情況發生,因此我們需要找到對應的方法來解決次問題。
為了解決分布式系統的雪崩問題,springCloud提供了Hystrix熔斷組件,它就像我們家中的保險絲一樣,當電流過載的時候直接熔斷掉,防止危險的進一步發生,從而保障家庭用電的安全,可以想象一下,如果整條鏈路上的服務已經全線崩潰,這時還在不斷地有大量的請求到達,想要各個服務進行處理,肯定是會使得情況越來越糟。
熔斷機制是應對雪崩效應的一種微服務鏈路保護機制,當檢測到鏈路的某個微服務不可用或者響應的時間太長時,會進行服務降級,進而熔斷該節點微服務的調用,快速返回"錯誤"的響應信息,當檢測到該節點微服務響應正常后恢復調用鏈路。
實際上,熔斷就是在降級的基礎上進一步形成的,也就是說,在一段時間內多次調用失敗,那么就直接升級為熔斷。
?當需要的服務正常啟動后,熔斷機制就會關閉了。
Hystrix服務降級
? ? 服務降級并不會直接返回錯誤信息,而是可以提供一個補救的措施,正常響應給請求者,這樣相當于服務依然可用,但是服務能力肯定是下降的。?
對應依賴:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-hystrix</artifactId><version>2.2.10.RELEASE</version> </dependency>在啟動類上添加注解: @EnableHystrix。
?
?在對應的Controller層中編寫編寫備選方案。
//備選方案,返回一個空的對象public ResponseEntity<UserBook> onError(Integer uid) {UserBook userBook = new UserBook();userBook.setUser(null);userBook.setBook(Collections.emptyList());return ResponseEntity.ok(userBook);}在對的方法上添加注解:@HystrixCommand(fallbackMethod = "備選方案名")
?效果圖:?
Gateway路由網關?
可能并不是所有的微服務都需要直接暴露給外部調用,這時我們就可以使用路由機制,添加一層防護,讓所有的請求全部通過路由來轉發到各個微服務,并且轉發給多個相同微服務實例也可以實現負載均衡。
?部署網關
對應依賴為下:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId><version>3.1.2</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId><version>3.1.3</version></dependency>第一個是網關的依賴,第二個是跟其他微服務一樣,需要注冊到eureka中才能生效,不要添加web依賴,使用的是WebFlux框架。
編寫配置文件
eureka:client:service-url:defaultZone: http://localhost:8888/eureka, http://localhost:9999/eureka spring:application:name: gateway server:port: 8500?繼續在配置文件中配置路由功能。
spring:cloud:gateway:routes:- id: borrowService #路由的名字uri: lb://borrow-service #路由的地址,lb表示使用負載均衡到微服務,也可以使用Http正常轉發predicates: #路由規則 斷言什么請求會被路由- Path=/dbBorrow/queryUserBook/** #只要訪問這個路徑,一律都被路由到上面指定的服務?在輸入路徑后,如果路徑符合斷言,就會將uri和Path進行拼接。
路由過濾器
路由過濾器支持以某中方式修改傳入的HTTP請求或傳出的HTTP響應,路由過濾器的范圍是某個過濾器,跟之前的斷言一樣
修改對應的配置文件。
spring:application:name: gatewaycloud:gateway:routes:- id: borrowService #路由的名字uri: lb://borrow-service #路由的地址,lb表示使用負載均衡到微服務,也可以使用Http正常轉發predicates: #路由規則 斷言什么請求會被路由- Path=/dbBorrow/queryUserBook/** #只要訪問這個路徑,一律都被路由到上面指定的服務- id: bookServiceuri: lb://book-servicepredicates:- Path=/dbBook/**filters: #添加過濾器- AddRequestHeader=Test, HELLO WORLD!?在對應的Controller層中測試是否過濾成功。
效果圖:
?
設置全局過濾器
?例子:攔截沒有攜帶指定參數的請求。
在gateway項目中創建一個實現類。
@Component public class TestFilter implements GlobalFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // exchange表示請求過來的信息//先獲取ServerHttpRequest對象,注意不是HttpServletRequestServerHttpRequest request = exchange.getRequest();//打印請求攜帶的所有參數System.out.println(request.getQueryParams());//判斷是否包含test參數,且參數值是否為1List<String> test = request.getQueryParams().get("test");if(test != null && test.contains("1")){//將ServerHttpExchange向過濾鏈的下一級傳遞,類似javaWeb中的過濾器return chain.filter(exchange);} else {//直接在這里不再向下傳遞,然后返回響應return exchange.getResponse().setComplete();}} }次過濾器會作用于整個網關。
效果圖:
只要路徑中沒有攜帶test=1就會被攔截。
當然過濾器會存在很多個的,所以我們手動指定過濾器之間的順序。可以通過實現Ordered接口,重寫getOredr方法來設置執行的順序。
@Component public class TestFilter implements GlobalFilter, Ordered {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // exchange表示請求過來的信息//先獲取ServerHttpRequest對象,注意不是HttpServletRequestServerHttpRequest request = exchange.getRequest();//打印請求攜帶的所有參數System.out.println(request.getQueryParams());//判斷是否包含test參數,且參數值是否為1List<String> test = request.getQueryParams().get("test");if(test != null && test.contains("1")){//將ServerHttpExchange向過濾鏈的下一級傳遞,類似javaWeb中的過濾器return chain.filter(exchange);} else {//直接在這里不再向下傳遞,然后返回響應return exchange.getResponse().setComplete();}}@Overridepublic int getOrder() {//返回的數字表示執行的順序 return 0;} }注意Oreder的值越小執行的優先級就越高,并且無論是配置文件里編寫的單個路由過濾器還是全局過濾器,都會受到Order的影響(單個路由過濾器的Order值按從上到下的順序從1開始遞增),最終是按照Order值決定哪個路由過濾器先執行,當Order值相同時,全局路由過濾器會優先于單個路由過濾器執行。?
微服務CAP原則
在一個分布式系統中存在 Consistency(一致性),Availabiity(可用性),Partition Tolerance(區分容錯性)三者是不可同時保證的,最多同時保證其中的兩個。
?一致性:在分布式系統中的所有數據備份,在同一時刻都是相同的值。(所有的節點無論何時訪問都能拿到最新的值)
可用性:系統中非故障節點收到的每個都必須得到響應。(不如我們之前使用的服務降級和熔斷,其實就是一種維持可用性的措施,雖然服務器返回的是無意義的數據,但不至于用戶的請求會被服務器忽略)
區分容錯性:一個分布式系統里面,節點之間組成的網絡本應該是相互連通的,然而可能因為一些故障(比如網絡丟包等,這是很難避免的),使得一些節點之間不能連通,整個網絡分成了幾塊區域,數據就散落在這些不相連通的區域里面。(這樣就可能出現某些被分區節點存放的數據訪問失敗,我們需要來容忍這些不可靠的情況)
總的來說,數據存放的節點越多,分區容忍性就越高,都是要復制更新的次數就會越來越多,同時為了保證一致性,更新所有節點數據所需要的時間就越長,那么可用性就會降低。
所以存在三種方案:
??
springCloud Alibaba的使用??
? ? springCloud的缺點:
1.springCloud部分組件停止維護和更新了,給開發帶來了不便。
2.springCloud部分開發環境復雜,沒有完善的可視化界面,我們需要大量的二次開發和定制。
3.springCloud配置復雜,難上手,部分配置差別難以群分和合理應用。
? ? springCloud Alibaba的優點:阿里使用過的組件經歷了考驗,性能強悍,設計合理,現在開源出來給大家使用成套的產品搭配完善的可視化界面給開發帶來運維帶來了極大的便利,搭建簡單,學習曲線低。
spring-cloud-alibaba對應依賴:
<dependencyManagement><dependencies><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2.2.7.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies> </dependencyManagement>?Nacos
Nacos: 一個易于使用的動態服務發現、配置和服務管理平臺,用于構建云原生應用程序。
作用: 將其作為服務注冊中心。
?Nacos作為服務注冊中心對應依賴:
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>在github中下載nacos并將其復制到項目中。
在startup.sh中設置nacos讓其在前臺運行。?
配置服務。?
直接使用nacos。
后臺啟動nacos:?bash nacos/bin/startup.sh。(以集群的方式)?
關閉nacos: bash nacos/bin/shutdown.sh。
后臺啟動naocs: bash nacos/bin/startup.sh -m standalone。(以單例的方式啟動)
nacos 的地址為: localhost:8084/nacos。
?導入對應依賴。
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.1</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2021.0.1.0</version><type>pom</type><scope>import</scope></dependency>在子項目中添加依賴。
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2021.1</version></dependency>設置配置文件。
spring:cloud: #配置nacos注冊中心地址nacos:discovery:server-addr: lcoalhost:8848效果圖:
?使用openFeign調用服務。
對應依賴為下:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId><version>3.1.1</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-loadbalancer</artifactId><version>3.1.1</version></dependency>和原生的調用方式一樣。
臨時實例和非臨時實例的區別:
臨時實例:和eureka一樣,采用心跳機制向nacos發送請求保持在線的狀態,一但心跳停止,就代表實例下線,不保留實例信息。
非臨時實例:由nacos主動進行聯系,如果連接失敗,那么不會刪除實例信息,而是將健康狀態設置為false,相當于對某個實例狀態持續進行監控。
?設置非臨時實例
設置配置文件。
cloud:nacos:discovery: # 修改為false 表示其是個非臨時文件ephemeral: false?對應的實例下線時會顏色會變為紅色。
?集群分區
對應依賴為下:
spring:cloud:nacos:discovery:#對應的集群名cluster-name: name效果圖
在默認情況下,集群間的調用方式采用的是輪番調用,使用為了實現就近原則,我們要對配置文件進行相應配置。
spring:cloud:#將loadbalancer的nacos支持開啟,集成nacos負載均衡loadbalancer:nacos:enabled: true?在同個集群中如果存在多個相同的服務時,就會根據權重來執行對應的服務,我們可以在nacos頁面中進行設置。
也可以通過配置文件進行修改。
spring:cloud:nacos:discovery:#設置權重,默認為1weight: 2?配置中心
我們可以通過配置來加載遠程配置,這樣我們可以遠端集中管理配置文件。
我們可以通過在nacos中,點擊新建配置項,進行配置。
點擊發布。
在項目中添加依賴。
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bootstrap</artifactId><version>3.1.2</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId><version>2021.1</version></dependency>編寫bootstrap.yml文件。
spring:application:#去遠程倉庫中調用對應名字的配置name: user-serviceprofiles:active: devcloud:nacos:config:file-extension: ymlserver-addr: localhost:8848此時兩個yml文件都會被使用。
?在默認情況下,在nacos中修改yml文件,對應的服務中的yml的信息并不會發生對應的改變,雖然對其做了監聽。
為了能夠保證在nacos中修改的yml文件立刻生效,我們需要添加注解@RefreshScope。
?效果圖:
原配置文件?
?修改后的配置文件
?結果
命名空間?
?新建命名空間。
在配置文件中配置所屬的命名空間。
spring:cloud:nacos:discovery:namespace: 對應命名空間的id命名空間不相同的服務是不能相互調用的。
指定分組
?修改配置文件。
spring:cloud:nacos:discovery:group: 對應的組名默認為?DEFAULT_GROUP。?
高可用
?在本地數據庫導入nacos中的sql文件。
?配置conf/application.properties文件。
?將cluster.conf.example重命名為cluster.conf,并對其內容進行修改,輸入多個的nanos地址,這里使用內網映射。
?因為要創建nacos集群,所以我們要創建多個nacos,通過復制修改好的nacos進行操作,這時我們只要再對每個nacos修改端口號就可以了。(要以集群的方式啟動,可能會啟動失敗,多啟動幾次)
效果圖:?
我們需要一個負載均衡器來管理這些nacos,這里我們使用Nginx。
在Mac上安裝Nginx。
brew install nginx編輯Nginx。
nano /usr/local/etc/nginx/nginx.conf編輯內容:
# 添加我們剛創建好的nacos服務器 upstream nacos-server {#被代理的服務群,nacos-server服務群的名字server localhost:8801;server localhost:8802; }server {#監聽的端口listen 80;#服務名server_name localhost;#類似過濾器,當訪問到符合此要求的路徑時就會去訪問對應的服務群location /nacos {proxy_pass http://nacos-server;} }重啟Nginx。
brew services restart nginx將各個微服務的nacos的注冊地址改為localhost:80/nacos。(其會實現負載均衡)
在云服務器上做反向代理的例子。
1.配置nacos的端口號和對應的數據庫。
2.啟動倆個nacos。
3.在Nginx中配置反向代理。
集群效果:
在每個微服務中將nacos的地址改為云服務器上Nginx反向代理的地址。
??
最終效果圖:
?Sentinel流量防衛兵
?Sentinel 可以做到熔斷和降級,可以取代Hystrix。
Sentinel具有以下功能:
- 豐富的適用場景:哨兵在阿里巴巴被廣泛使用,在過去10年中,幾乎涵蓋了Double-11(11.11)購物節的所有核心場景,例如需要限制突發流量以滿足系統容量的“第二次殺戮”,消息峰值剪切和山谷填充,不可靠的下游服務的斷路,集群流量控制等。
- 實時監控:哨兵還提供實時監控功能。您可以實時查看一臺機器的運行時信息,以及少于500個節點的集群的聚合運行時信息。
- 廣泛的開源生態系統:Sentinel提供與Spring Cloud、Dubbo和gRPC等常用框架和庫的開箱即用集成。您只需將適配器依賴項添加到您的服務中,即可輕松使用Sentinel。
- Polyglot支持:Sentinel為Java、Go和C++提供了原生支持。
- 各種SPI擴展:Sentinel提供易于使用的SPI擴展接口,允許您快速自定義邏輯,例如自定義規則管理、調整數據源等。
下載對應的jar包。
sentinel V1.8.3
?將jar導入到項目,為其創建一個服務。
啟動項目。?
?添加對應的依賴。
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</artifactId><version>2021.0.1.0</version></dependency>對微服務中application.yml進行配置。(實際上Sentinel是本地在管理,但我們可以連接到監控頁面,這樣可以圖形化操作了)
spring:cloud:sentinel: # 添加監控頁面地址transport:dashboard: localhost:8858為了提高可讀性和節省空間,Sentinel只監視被調用過的微服務。所以為了讓sentinel監視我們的微服務,我們需要手動的調用一次微服務。
效果圖:
流量控制
我們的機器不可能無限的接受和處理客戶端的請求,如果不加以限制,當發生高并發時,就會使得系統的資源很快的被耗盡。為了避免這種情況,我們可以添加流量控制,當一段時間內的流量達到一定的閥值的時候,新的請求將不會再進行處理,這樣不僅可以合理地應對高并發的情況,同時也可以在一定的程度上保護服務器不受到外界的攻擊。
解決方案
?針對判斷是否超過流量的閥值的四種算法
1.漏桶算法
?2.令牌桶算法(有點像游戲里的能量條機制)
?3.固定時間窗口算法
4.滑動時間窗口算法
通過Sentinel進行設置
?閥值類型:QPS就是每秒中的請求數量,并發線程數是按服務當時使用的線程數據進行統計的。
流控模式:當達到閥值時,流控的對象,這里暫時只用直接。
流控效果:就是我們上面所說的三種方案。
流控模式的區別:
1.直接:只針對當前接口。
2.關聯:當其他接口超過閥值時,會導致當前接口被限流。(別的接口出現問題由當前接口承擔責任)
3.鏈路:更細粒度的限流,能精確到具體的方法。
鏈路模式能夠更加精準的進行流量控制,鏈路流控模式指的是,當從指定接口過來的資源請求達到限流條件時,開啟限流,這里得先講解一下@SentinelResource的使用。
我們可以對某個方法進行限流控制,無論是誰在何處調用了它,這里需要使用到@SentinelResource,一但方法被標注,那么就會進行監控。
在調用的方法上添加注解@SentinelResource。
@SentinelResource("detail")@Overridepublic UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid);RestTemplate template = new RestTemplate(); // DbUser user = template.getForObject("http://localhost:8083/dbUser/" + uid, DbUser.class);DbUser user = userClient.queryById(uid);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){ // DbBook book = template.getForObject("http://localhost:8081/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);DbBook book = bookClient.queryById(dbBorrows.get(i).getBid());books.add(i, book);}userBook.setBook(books);return userBook;}修改配置文件。
spring:cloud:sentinel:#關閉Context收斂,這樣被監控方法可以進行不同鏈路的單獨監控web-context-unify: false運行查看效果。
?對精確限流設置限流策略。
?設置閥值為1,然后進行連續訪問測試,發現精確限流的位置拋出了異常。
?那么這個鏈路選項實際上就是決定只限流從哪個方向來的調用,比如我們只對borrow2這個接口對queryByUid方法的調用進行限流,那么我們就可以為其制定鏈路。
?入口資源的設置表示:進行精確限流的路徑,如果設置了入口資源,那么其他路徑調用被精確限流的方法時則不會被限流。
系統保護規則
?檢測對應的設備來進行限流。
限流和異常處理?
我們看到被限流之后返回的Sentinel默認的數據,其實我們可以返回我們自己定義的數據。?
?這里我們先創建好被限流狀態下需要返回的內容,自定義一個請求映射。
?在要返回自定義的微服務的Controller中添加自定義的錯誤頁面。
@RequestMapping("/blocked")public JSONObject blocked() {JSONObject json = new JSONObject();json.put("code", 403);json.put("success", false);json.put("message", "訪問頻率過快, 請稍后訪問!");return json;}在配置文件中配置限流頁面返回信息。
spring:cloud:sentinel:block-page: /dbUser/blocked效果圖:?
?對于方法級別的限流,當某個方法被限流時,會直接在后臺拋出異常,那么這種情況可以通過Sentinel添加一個替代方案,這樣當我們發現異常時會直接執行我們的代替方案并返回。
在service層中添加代替方案。
@SentinelResource(value = "detail", blockHandler = "blocked")//限流之后代替返回的其他方案,這樣就不會使用默認的拋出異常的方式了@Overridepublic UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid);RestTemplate template = new RestTemplate(); // DbUser user = template.getForObject("http://localhost:8083/dbUser/" + uid, DbUser.class);DbUser user = userClient.queryById(uid);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){ // DbBook book = template.getForObject("http://localhost:8081/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);DbBook book = bookClient.queryById(dbBorrows.get(i).getBid());books.add(i, book);}userBook.setBook(books);return userBook;}//代替方案public UserBook blocked(Integer uid, BlockException blockException) {UserBook userBook = new UserBook();userBook.setBook(Collections.emptyList());userBook.setUser(new DbUser());return userBook;}blockHandler只會處理限流的異常,而不會處理方法體內的其他代碼異常。
如果要處理限流以外的其他異常,我們可以通過其他參數進行處理。
@RequestMapping("/test")@SentinelResource(value = "test",fallback = "except", //fallback指定出現異常是的替代方案exceptionsToIgnore = IOException.class) //忽略注定的異常,也就是出現這些指定的異常時不回調用的替代方案public String test() {throw new RuntimeException("拋出異常");}public String except(Throwable t) {return t.getMessage();}效果圖:?
?當在@SentinelResource中同時存在 fallback和blockHandler時,在拋出限流異常范圍內的異常的時候就先調用blockHandler中的替代方案,其他的時候就會調用fallback。(注意在兩個都打存在的時候,因為限流會在方法執行前調用,所以在限流代替方案執行完以后還會在執行出現其異常時的代替方案)
熱點參數限流
我們可以對某一熱點進行精準限流,比如在某一時刻,不同參數被攜帶訪問的頻率是不一樣的:
http://localhost:8082/test?a=10 訪問100次
http://localhost:8082/test?b=10 訪問0次
http://localhost:8082/test?c=10 訪問3次?
由于攜帶的參數a的請求比較多,我們就可以只對攜帶參數a的請求進行限流。?
創建一個新的測試請求映射。
@RequestMapping("/test")@SentinelResource("test") //注意這里需要添加@SentinelResource才可以,用戶資源名稱就使用這里定義的資源名稱public String findBorrow(@RequestParam(value = "a", required = false) String a,@RequestParam(value = "b", required = false) String b,@RequestParam(value = "c", required = false) String c) {return "請求成功!" + "a = " + a + " b = " + b + " c = " + c;}在Senntinel中設置熱點配置。(我們對a進行了限流)
?效果圖:
?我們也可以對某個參數的特定值進行特定限流。(我們對a進行特定值限流)?
效果圖:
Sentinel的服務熔斷和降級
為了防止鏈路故障,我們能進行隔離,這里我們有兩種隔離方案。
1.線程池隔離
線程池隔離實際上就是對每個服務的遠程調用單獨開放線程池,比如服務A要調用服務B,那么只基于固定數量的線程池,這樣即使在短時間內出現大量請求,由于沒有線程可以分配,所以就不會導致資源耗盡。?
?2.信號量隔離
信號量隔離是使用Semaphore類實現的,思想基本跟上面的相同,也是限定指定的線程數量能夠同時進行服務調用,但它相對于線程池開銷會更小一些,使用效果同樣優秀,也支持超時等,Sentinel就是采用這個方案進行隔離的。?
說回我們的熔斷與降級,當下游的服務因為某些原因變得不可用或響應過慢時,上游為了保證自己整體的高可用性,不再繼續調用目標服務而是快速返回或執行自己的代替方案。?
?整個過程分為三個狀態:
1.關閉:熔斷器不工作,所有的請求全部該干嘛就干嘛。
2.開啟:熔斷器工作,所有的請求一律降級。
3.半開:嘗試進行一下正常的流程,要是還是不行就繼續保持開啟的狀態,否則關閉。
?在Sentinel設置熔斷規則
熔斷策略
1.慢調用比例:如果出現那種半天都處理不完的調用,有可能就是服務出現故障,這個選項是按照最大效應時間(RT)進行判斷的,如果一次請求的處理時間超過了指定的RT,那么就會判斷為慢調用,在一個統計時長內,如果請求數目大于最小請求數目,并且被判斷定為慢調用的請求比例已經超過閥值,將觸發熔斷,經過熔斷時長之后,將會進入到半開狀態進行試探。(這里和Hystrix一致)
?測試代碼
@RequestMapping("/borrow2/{uid}")public String test2(@PathVariable("uid") Integer uid) throws InterruptedException {Thread.sleep(1000);return "hello World!";}在在對應的微服務中設置熔斷設置。
?效果圖:
?2.異常比例:與慢調用比例相似,不過這里判斷的是出現異常的次數。
測試代碼
@RequestMapping("/borrow3/{uid}")public String test3(@PathVariable("uid") Integer uid) throws Exception {throw new RuntimeException();}在Sentinel中配置熔斷配置。
效果圖:
3.異常數:很異常比例好像,但有明確指出異常數量。
降級策略
我們需要在@SentinelResource中配置blockHandler參數。(這里跟之前方法限流的配置是一樣的,因為如果添加了@SentinelResource注解,那么這里就會進行方法級別細粒度的限制,和之前方法級別限流一樣,會在降級之后直接拋出異常,如果不添加則返回默認的限流頁面,blockHandler的目的就是處理這種Sentinel機制的異常,所以這里其實和之前的限流配置是一個道理,因此下面熔斷配置也應該對value自定義名稱的資源進行配置,才能作用到此方法上)
測試代碼:
//降級測試@RequestMapping("/borrow4/{uid}")@SentinelResource(value = "findBorrow", blockHandler = "test")public ResponseEntity<UserBook> findBorrow(@PathVariable("uid") Integer uid) throws Exception {throw new RuntimeException();}//代替方案public ResponseEntity<UserBook> test(Integer uid, BlockException e) {System.out.println(e.getClass());UserBook userBook = new UserBook();userBook.setUser(new DbUser());userBook.setBook(Collections.emptyList());return ResponseEntity.ok(userBook);}在Sentinel中設置熔斷規則。
?效果圖:
?拋出降級異常。
openFeign支持Sentinel
?前面我們使用Hystrix的時候,就可以直接對openFeign的每個接口調用單獨進行服務降級,而使用Sentinel,也可以的。
在配置文件中開啟支持。
feign:sentinel:enabled: true?和之前的openfign整合eureka的服務降級配置相似。
BookClient接口:
@FeignClient(value = "book-service",fallback = BookClientImpl.class) public interface BookClient {@GetMapping("/dbBook/{id}")DbBook queryById(@PathVariable("id") Integer id); }BookClient替代方案:
@Component public class BookClientImpl implements BookClient{@Overridepublic DbBook queryById(Integer id) {return new DbBook();} }啟用代替方案效果圖:?
?Sentinel整合RestTemplate使用服務降級
在config進行配置。
@Configuration public class RestTemplateConfig {@Bean@LoadBalanced@SentinelRestTemplate(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class,fallback = "對應的降級方案", fallbackClass = ExceptionUtil.class)public RestTemplate restTemplate() {return new RestTemplate();} }?Seata與分布式事務
Seata 是一款開源的分布式事務解決方案,致力于提供高性能和簡單易用的分布式事務服務。Seata 將為用戶提供了 AT、TCC、SAGA 和 XA 事務模式,為用戶打造一站式的分布式解決方案。
事務特性
?分布式講解方案
1.XA分布式事務協議 -2PC (兩階段提交實現)?
這里的PC實際上指的是Prepare和Commit,也就是說它分為兩個階段,一個是準備一個是提交,整個過程的參與者一共有兩個角色,一個是事務的執行者,一個是事務的協調者,實際上整個事務的運作需要毅力啊協調者來維持。
?在準備和提交階段,會進行:
準備階段:
? ? 一個分布式事務是由協調者來開啟的,首先協調者會向所有的事務執行者發送事務內容,等待所有的事務執行者答復。
各個事務執行者開始執行事務操作,但不會進行提交,并將undo和redo信息記錄到事務日志中。
如果事務執行者執行事務成功,那么就告訴協調者成功Yes,否則告訴協調者失敗No,不能提交事務。
提交階段:
? ? 當前有的執行者都反饋完成之后,進入第二階段。
協調者會檢測各個執行者的反饋內容,如果所有的返回都是成功,那么就告訴所有的執行者可以提交事務了,最后再釋放鎖的資源。
如果有至少一個執行者返回失敗或超時面,那么就讓所有的執行者都會回滾,分布式事務執行失敗。
雖然這種方式看起來比簡單,但是存在以下幾個問題:
1.事務協調者是非常核心的角色,一旦出現問題,將導致整個分布式不能正常運行。
2.如果提交階段發生網絡問題,導致某事務執行者沒有收到協調者發來的提交命令,將導致某些執行者沒提交,這樣肯定是不行的。
2.XA分布式事務協議 -3PC(三階段提交實現)
三階段提交是在二階段提交的基礎上的改進播版本,主要是加了超時記機制,同時在協調者和執行者都引入了超時機制。
三個階段分別進行:
CanCommit階段:
? ? 協調者向執行者發送CanCommit請求,詢問是否可以執行事務提交操作,然后開始等待執行者的響應。
ProeCommit階段:
? ? 協調者根據執行者的反應情況來決定是否可以進入第二階段事務的PreCommit操作。
如果所有的執行者都返回Yes,則協調者向所有的執行者發送PreCommit請求,并進入Prepared階段,執行者接收到請求后,會執行事務操作,并將undo和redo信息記錄到事務日志中,如果成功執行,則返回成功的響應。
如果所有的執行者至少有一個返回No,則協調者會向所有的執行者發送abort請求,所有的執行者在收到請求或超時一段時間沒有收到任何請求時,會直接中斷事務。
DoCommit階段:
? ? 該階段進行真正的事務提交。
? ? 協調者接收到所有執行者發送的成功響應,那么它就從PreCommit狀態進入DOCommit狀態,并向所有的執行者發送doCommit請求,執行者接收到doCommit請求之后,開始執行事務提交,并在完成事務提交之后釋放所有的事務資源,并最后向協調者發送確認響應,協調者接收到所有執行者的確認響應之后,完成事務(如果因為網絡問題導致執行者沒有接收到doCommit請求,執行者會在超時之后直接提交事務,雖然執行者只是猜測協調者返回的是doCommit請求,但是因為前面的兩個流程都正常執行,所以能夠在一定程度上認為本次事務是成功的,因此會直接提交)
? ? 協調者沒有接收到至少一個執行者發送的成功響應(可能是響應超時),那么就會執行中斷事務,協調者會向所有的執行者發送abort請求,執行者接收到abort請求之后,利用其在PreCommit階段記錄的undo信息來執行事務的回滾操作,并在完成回滾操作之后釋放所有的事務資源,執行者完成事務回滾之后,向協調者發送確認信息,協調者接收到參與者反饋的確認信息之后,執行事務的中斷。
第三階段的特點:
1.3PC在2PC的第一階段和第二階段中插入一個準備階段,保證在最后提交階段之前各參與節點的狀態是一致的。
2.一旦參與者無法及時收到來自協調者的信息之后,會默認執行Commit,這樣就不會因為協調者單方面故障導致全局出現問題。
3.但是我們知道,實際上超時之后的Commit決策本質上就是一個賭注罷了,如果此時協調者發送的是abort請求但是超時未接收,那么就會直接導致數據一致性的問題。?
3.TCC (補償事務)
補償事務TCC就是Try,Comfirm,Cancel,它對業務有入侵性,一共分為三個階段。
Try階段:
? ? 比如我們需要借書時,將書籍的庫存-1,并將用戶的借閱量-1,但是這個操作,除了直接對庫存和借閱量進行修改之外,還需要將減去的值,單獨存放到凍結表中,但是此時不會創建借閱信息,也就是說只是預先把關鍵的東西給處理了,預留業務資源出來。
Confirm階段:
? ? 如果Try執行成功無誤,那么就進入Confirm階段,接著之前,我們就該創建借閱信息了,只能使用Try階段預留的業務資源,如果創建成功,那么就對Try階段凍結的值進行解凍,整個流程就完成了,如果失敗了,就會進入Cancel階段。
Cancel階段:
? ? 將凍結的東西還給人家,進行回滾。
TCC特點:
跟XA協議相比,TCC就沒有協調者這一角色的參與了,而是自主通過上一階段的執行情況來確保正常,充分利用了集群的優勢,性能也是有很大的提升,但是缺點也很明顯,它與業務具有一定的關聯性,需要開發者去編寫更多的補償代碼,同時并不一定所有的業務流程都適用于這種形式。?
?Seata機制簡介
?seata支持四種事務:
1.AT:標志上就是2PC的升級版,在AT模式下,用戶只需要關心自己的"業務SQL"。
? ? ?一階段:seata會攔截"業務SQL",首先解析SQL語義,找到"業務SQL"要更新的業務數據,在業務數據更新之前,將其保存成"before image",然后進行"業務SQL"更新業務數據,在業務數據更新后,再將其保存到"after image",最后生成行鎖。以上操作只在一個數據庫事務內完成,這樣保證了第一階段操作的原子性。
? ? 二階段如果確定提交的話,因為"業務SQL"在一階段已經提交到數據庫,所以Seata框架只需要將第一階段保存的快照數據和行鎖刪掉,完成數據清除即可,當然如果需要回滾,那么就用"before image"還原業務數據,但在還原前首先要校驗臟讀,對比"數據庫當前業務數據"和"after image",如果兩份數據完全一致就說明沒有臟讀,可以還原業務數據,如果不一致就說明有臟讀,出現臟讀就需要轉人工處理。??
2.TCC:和我們上面講解的思路是一樣的。
3.XA:同上,但是要求數據庫本身支持這種模式才可以。
4.saga:用用處理長事務,每個執行者需要實現事務的正向操作和補償操作。
?那么,以AT模式為例,我們的程序是如何才能做到不對業務進行侵入的情況下實現分布式事務能?實際上,Seata客戶端,是通過對數據源進行代理實現的,使用的是DataSourceProxy類,所以在程序這邊,我們只需要將對應的代理類注冊到Bean即可。(0.9版本之后支持自動代理,并不需要我們手動導入)
?使用file進行部署(以AT為例)
下載seata-server。
seats-server
?在idea中配置seata服務。
?seata支持本地部署也支持服務注冊與發現中心部署。(比如eureka,nacos)
seata存在著事務分組機制:
1.事務分組:seata資源邏輯,可以按微服務的需要,在應用程序(客戶端)對自定義事務進行分組,每個組取一個名字。
2.集群:seata-server服務端一個或多個節點組成的集群cluster。應用程序(客戶端)使用時需要指定事務邏輯分組與seata服務器集群(默認為default)的映射關系。
為啥要設計成通過事務分組再直接映射到集群?為什么不直接將事務指定到集群呢?
獲取事務分組到映射集群的配置。這樣設計后,事務分組可以作為資源的邏輯隔離單位,出現某集群故障時可以快速failover(故障切換),只切換對應的分組,可以把故障縮減到服務級別,但提前也是你有足夠server集群。
將各個服務作為seata的客戶端導入對于依賴:
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><version>2021.0.1.0</version></dependency>修改配置文件。
seata:service:vgroup-mapping:#這里需要對事務組進行映射,默認組名為應用名稱-seata-service-group,將其映射到default集群#這個很關鍵,一定要配置,不然會找不到服務borrow-service-seata-service-group: defaultgrouplist: localhost:8868也可以設置自定義的服務分組。
seata:service:vgroup-mapping:#這里需要對事務組進行映射,默認組名為應用名稱-seata-service-group,將其映射到default集群#這個很關鍵,一定要配置,不然會找不到服務xx服務名xx-seata-service-group: xxxgrouplist: localhost:8868tx-service-group: xxx現在我們接著來配置開啟分布式事務,首先在啟動類上添加注解,此注解會添加一個后置處理器將數據源封裝為支持分布式事務的代理數據源(1.4.2版本還是要手動添加此注解才能生效)?
?接著我們需要在開啟分布式事務的方法上添加注解@GlobalTransactional。
?因為Seata會分析修改數據的sql,同時生成對應的反向回滾sql,這個回滾記錄會存放在undo_log表中,所以要求每個Client都有一個對應的undo_log表(也就是說服務連接的數據庫都需要創建一個這樣的表,因為我們的例子就一個數據庫,所有只要創建一個表)
創建undo_log表的sql語句。
CREATE TABLE IF NOT EXISTS `undo_log` (`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'increment id',`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',`log_created` DATETIME NOT NULL COMMENT 'create datetime',`log_modified` DATETIME NOT NULL COMMENT 'modify datetime',`ext` VARCHAR(100) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`) ) ENGINE = InnoDBAUTO_INCREMENT = 1DEFAULT CHARSET = utf8;?啟動服務,進行測試。
第一次借閱成功。?
第二次借閱失敗。
?查看數據庫是否進行回滾。(這里進行了回滾)?
我們可以打印XID,查看其對應的XID,在service層添加語句。
System.out.println(RootContext.getXID());也可以在日志中查看。
使用nacos模式部署
我們先為Seata在nacos中配置一個命名空間。
修改seata的/conf/registry.conf文件,修改其注冊和配置中的“type”,“namespace”,“password”,“username”。
?注冊信息配置完成之后,接著我們需要將配置文件也放到Nacos中,讓Nacos管理配置,這樣我們就可以對配置進行熱更新了,一旦環境需要改變,只需要直接到Nacos中修改即可。
??
?我們需要對配置導入到Nacos中,我們打開seata源碼的 /script/config-center/nacos目錄,這是官方提供上傳腳本,我們直接運行即可。(去github下載seata源碼)
?在nacos中查看seata的配置。
?把所有微服務的事務分組信息的配置放在nacos中,我們還需要將對應的事務組映射配置也添加上,DataId格式為service.vgroupMapping.'事務的名稱'。??
?接下來我們要對服務端的配置進行修改,我們刪除原本的seata配置,添加新的seata配置。
seata:#注冊registry:type: nacosnacos:#使用seata命名空間,這樣才能找到seata服務,由于組名我們設置的是SEATA_GROUP就是默認的名字,所以就不用配了namespace: 550e71d6-4604-4952-a24b-b0d3781d8223username: nacospassword: nacos#配置config:type: nacosnacos:namespace: 550e71d6-4604-4952-a24b-b0d3781d8223username: nacospassword: nacos? 啟動seata服務,在nacos中對應的命名空間觀察seata服務是否正常啟動。
啟動各個微服務,各個服務使用nacos配置成功。
測試事務效果圖:
??
?我們還可以配置一下事務會話信息的存儲方式,默認是file類型的,那么就會在運行目錄下創建file_store目錄,我們可以將其放到數據庫中存儲,只需要修改一下數據即可。
?默認情況的存儲方式:
將會話信息存放到數據庫中?
修改nacos中的seata配置store.mode,store.session.mode,將存儲方式改為數據庫方式。
?
?將數據庫的配置信息進行修改。
1.數據庫啟動。
?2.數據庫的URL。
3.數據庫用戶名和密碼。
?
創建seata數據庫。
-- -------------------------------- The script used when storeMode is 'db' -------------------------------- -- the table to store GlobalSession data CREATE TABLE IF NOT EXISTS `global_table` (`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`status` TINYINT NOT NULL,`application_id` VARCHAR(32),`transaction_service_group` VARCHAR(128),`transaction_name` VARCHAR(128),`timeout` INT,`begin_time` BIGINT,`application_data` VARCHAR(2000),`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`xid`),KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;-- the table to store BranchSession data CREATE TABLE IF NOT EXISTS `branch_table` (`branch_id` BIGINT NOT NULL,`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`resource_group_id` VARCHAR(32),`resource_id` VARCHAR(256),`branch_type` VARCHAR(8),`status` TINYINT,`client_id` VARCHAR(64),`application_data` VARCHAR(2000),`gmt_create` DATETIME(6),`gmt_modified` DATETIME(6),PRIMARY KEY (`branch_id`),KEY `idx_xid` (`xid`) ) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;-- the table to store lock data CREATE TABLE IF NOT EXISTS `lock_table` (`row_key` VARCHAR(128) NOT NULL,`xid` VARCHAR(128),`transaction_id` BIGINT,`branch_id` BIGINT NOT NULL,`resource_id` VARCHAR(256),`table_name` VARCHAR(32),`pk` VARCHAR(36),`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`row_key`),KEY `idx_status` (`status`),KEY `idx_branch_id` (`branch_id`) ) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;CREATE TABLE IF NOT EXISTS `distributed_lock` (`lock_key` CHAR(20) NOT NULL,`lock_value` VARCHAR(20) NOT NULL,`expire` BIGINT,primary key (`lock_key`) ) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('HandleAllSession', ' ', 0);數據庫效果圖:
運行事務效果圖:?
分布式權限校驗??
因為是分布式服務,每個微服務存儲的sessionb是各不相同的,而我們需要的是保證所有的微服務都能同步這些session信息,這樣我們才能實現某一個微服務登錄時,其他微服務都能知道。
實現上述要求的方案
方案一:我們可以在每臺服務器上都復制一份Session,但這樣顯然是很浪費時間的,并且用戶驗證數據占用的內存會成倍的增加。
方案二:將Session移出服務器,用統一訪問Redis或是Mysql即可,這樣就能保證服務都可以同步Seesion了。
?明顯方案二是可行的。
每個微服務需要添加驗證機制,導入對應依賴。
<!-- springSession Redis支持--><dependency><groupId>org.springframework.session</groupId><artifactId>spring-session-data-redis</artifactId><version>2.7.0</version></dependency> <!-- 添加Redis的Starter--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.7.0</version></dependency>導入springSecurity框架的依賴。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId><version>2.7.0</version></dependency>對每個微服務的配置文件進行修改。
spring:session:store-type: redisredis:host: lcoalhost:6379進行測試。
因為spring-security的安全機制,所以我們需要攜帶對應的cookies才能訪問對應的微服務。
其登錄頁面的用戶名為:user,密碼在idea的運行日志中。
?然后服務端會將session存放到數據庫中。
但是我們服務對應的服務還是會報500錯誤,只是因為我們使用了RestTemplate,RestTemplate類似一個瀏覽器,由于該微服務調用了其他的微服務,又因為RestTemplate在訪問微服務時沒有攜帶對應的cookies,所以報出500錯誤。
OAuth2.0實現單點操作
?前面我們雖然使用了統一存儲來解決Session共享的問題,但是我們發現就算實現了Session共享,依舊存在一些問題,由于我們每個微服務都有自己的驗證模板,實際上整個系統上存在冗余功能的,同時還有我們上面出現的問題,那么能否實現只在一個服務進行等錄,就可以訪問其他的服務能?
?實際上之前的登錄模式稱為多點登錄,而我們希望的是實現單點登錄。
這里我們需要了解一種全新的登錄方式:OAuth2.0,我們經常看到一些網站支持第三方登錄,就是使用OAuth2.0?來實現第三方授權,基于第三方應用訪問用戶信息的權限。(本質上就是給別人調用自己服務接口的權限)
?四種授權模式
1.客戶端模式(Client Credentials)
這是最簡單的一種模式,我們可以直接向驗證服務器 請求可以Token,服務器拿到Token之后,才能去訪問服務資源,這樣資源服務器才能知道我們是誰以及是否成功登錄。(不需要密碼驗證)
雖然這種模式比較簡便,但是已經失去了用戶驗證的意義,壓根就不是給用戶校驗準備的,而是更適合內部調用的場景。?
2.密碼模式(Resource Owner Password Credentials)
密碼模式相比客戶端模式,就多了用戶名和密碼的信息,用戶需要提供對應的賬號的用戶名和密碼,才能獲取Token。
?雖然這樣看起來比較合理,但是會直接將賬號和密碼泄露給客戶端,需要后臺完全信任客戶端不會拿賬號和密碼去干其他壞事,所以也不是我們常見的。(可能前端或第三方會拿著你的賬號,登錄你的服務干壞事,很不安全)
3.隱式授權模式(Implicit Grant)
?首先用戶訪問頁面時,會重定向到認證服務器上,接著認證服務器給用戶一個認證頁面,等待用戶授權,用戶填寫信息完成授權后,認證服務器返回Token。?
?它適用于沒有服務端的第三方應用頁面,并且相比前面一種形式,驗證都是在驗證服務器進行的,敏感信息不會輕易泄露,但是Token依然存在泄漏的風險。
?4.授權碼模式(Authrization Code)
這種模式是最安全的一種模式,也是推薦使用的一種,比如我們手機上的很多APP都是使用的這種方式。
相比隱式授權模式,它并不會直接返回Token,而是返回授權碼,真正的Token是通過應用服務器訪問驗證服務器獲得的。在一開始的時候,應用服務器(客戶端通過訪問自己的應用服務器來進行訪問其他的服務)和驗證服務器之間會共享一個“secret”(沒有登錄的時候是沒有的“secret”),這個東西沒有其他人知道,而驗證服務器在用戶驗證之后,會返回一個授權碼,應用服務器最后將授權碼和“secret”一起交給驗證服務器進行驗證,并且Token也是在服務器之間傳遞,不會直接給客戶端。?
?這樣就算有人中途竊取了授權碼,也毫無意義,因為Token的獲取必須同時攜帶授權碼和“secret”,但是“secret”第三方是無法得知的,并且Token不會直接給客戶端,大大減少了泄漏的風險。
OAth2.0不應該是那種第三方應用為了請求我們的服務而使用的嗎,而我們這里需要的只是實現同一個應用內部服務之間的認證,其實我們也可以利用OAuth2.0來實現單點登錄,只是少了資源服務器這個角色,客戶端就是我們的整個系統,接下來就讓我們來實現一下。
?搭建驗證服務器
第一步就是最重要的,我們需要搭建一個驗證服務器,它是我們進行權限校驗的核心,驗證服務器有很多的第三方實現,也有Spring官方通過的實現,這里我們使用Spring官方通過的驗證服務器。?
?導入對應依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId></dependency><!-- oauth依賴不再內置到spring-cloudy依賴中,需要指定對應的版本,且其已經整合到spring-security中了--><dependency><groupId>org.springframework.security.oauth</groupId><artifactId>spring-security-oauth2</artifactId><version>2.5.2.RELEASE</version></dependency>修改配置文件。
server:port: 8500servlet:#為了防止一會在服務間跳轉導致Cookies打架。(因為所有的服務地址但是localhost,都會存JSESSIONID)#這里修改一下context-path,這樣保存的Cookie會使用指定的路徑,就不會和其他服務打架了#但注意之后的所有請求都得在最前面添加這個路徑context-path: /sso?編寫spingSecurity配置類和OAuth2的配置類。
springSecurity配置類:
@Configuration public class SecurityConfiguration extends WebSecurityConfigurerAdapter {@Overrideprotected void configure(AuthenticationManagerBuilder auth) throws Exception {BCryptPasswordEncoder bCryptPasswordEncoder = new BCryptPasswordEncoder();//對密碼進行加密的類auth.inMemoryAuthentication()//直接創建一個用戶.passwordEncoder(bCryptPasswordEncoder).withUser("test").password(bCryptPasswordEncoder.encode("123456")).roles();}@Overrideprotected void configure(HttpSecurity http) throws Exception {http.authorizeRequests().anyRequest().authenticated().and().formLogin().permitAll();//使用表單登錄}@Bean@Override //這里我們需要將AuthenticationManager注冊為Bean,因為我們要在OAuth2中使用它public AuthenticationManager authenticationManagerBean() throws Exception {return super.authenticationManagerBean();} }OAth2配置類:
@EnableAuthorizationServer //開啟驗證服務器 @Configuration public class OAuth2Configuration extends AuthorizationServerConfigurerAdapter {@Resourceprivate AuthenticationManager manager;private final BCryptPasswordEncoder encoder = new BCryptPasswordEncoder();//對密碼進行加密的類/*** 這個方法是對客戶端進行配置,一個驗證服務器可以預設很多個客戶端,* 之后這些指定的客戶端就可以按照下面指定的方式進行驗證* @param clients 客戶端配置工具*/@Overridepublic void configure(ClientDetailsServiceConfigurer clients) throws Exception {clients.inMemory() //這里我們直接硬編碼創建,當然也可以像Security那樣自定義或是使用JDBC從數據庫讀取.withClient("web") //客戶端名稱,隨便起就行.secret(encoder.encode("654321")) //只與客戶端分享的secret,隨便寫,但是注意要加密.autoApprove(false) //自動審批,這里關閉,要的就是一會體驗那種感覺.scopes("book", "user", "borrow") //授權范圍,這里我們使用全部all.authorizedGrantTypes("client_credentials", "password", "implicit", "authorization_code", "refresh_token");//授權模式,一共支持5種,除了之前我們介紹的四種之外,還有一個刷新Token的模式//這里我們直接把五種都寫上,方便一會實驗,當然各位也可以單獨只寫一種一個一個進行測試//現在我們指定的客戶端就支持這五種類型的授權方式了}@Overridepublic void configure(AuthorizationServerSecurityConfigurer security) {security.passwordEncoder(encoder) //編碼器設定為BCryptPasswordEncoder.allowFormAuthenticationForClients() //允許客戶端使用表單驗證,一會我們POST請求中會攜帶表單信息.checkTokenAccess("permitAll()"); //允許所有的Token查詢請求}@Overridepublic void configure(AuthorizationServerEndpointsConfigurer endpoints) {endpoints.authenticationManager(manager);//由于SpringSecurity新版本的一些底層改動,這里需要配置一下authenticationManager,才能正常使用password模式} }? ? ? ? 然后我們使用測試工具進行測試。?
1.首先我們從最簡單的客戶端模式進行測試,客戶端模式只需要提供id和secret即可直接拿到Token,注意需要添加一個grant_type來表明我們的授權方式,默認請求路徑為:http://localhost:8500/sso/oauth/token。
測試結果圖:
?我們可以通訪問http://localhost:8500/sso/oauth/check_token來驗證我們的Token是否有效。?
?
2.我們進行密碼模式的測試,這里的請求參數為username和password,授權模式改為passwod。
?在請求頭中添加Basic驗證信息。
測試結果圖:
?Token驗證(返回用戶名):
?3.隱式授權模式,這種模式我們需要在驗證服務器上進行驗證,而不是直接請求Token,驗證登錄請求地址:http://localhost:8500/sso/oauth/authorize?client_id=web&response_type=token。
注意response_type一定要是Token類型,這樣才會返回Token,瀏覽器發起請求后,可以看到SpringSecurity的登錄頁面。
登錄之后會有個錯誤信息,這是因為登錄成功后,驗證服務器需要將結果給回客戶端,所以需要提供供客戶端的回調地址,這樣瀏覽器就會被重定向到指定的回調地址并且請求中回攜帶Token信息,這里我們隨便配置一個回調信息。
@Overridepublic void configure(ClientDetailsServiceConfigurer clients) throws Exception {clients.inMemory() //這里我們直接硬編碼創建,當然也可以像Security那樣自定義或是使用JDBC從數據庫讀取.withClient("web") //客戶端名稱,隨便起就行.secret(encoder.encode("654321")) //只與客戶端分享的secret,隨便寫,但是注意要加密.autoApprove(false) //自動審批,這里關閉,要的就是一會體驗那種感覺.scopes("book", "user", "borrow") //授權范圍,這里我們使用全部all.redirectUris("localhost:8202/login").authorizedGrantTypes("client_credentials", "password", "implicit", "authorization_code", "refresh_token");//授權模式,一共支持5種,除了之前我們介紹的四種之外,還有一個刷新Token的模式//這里我們直接把五種都寫上,方便一會實驗,當然各位也可以單獨只寫一種一個一個進行測試//現在我們指定的客戶端就支持這五種類型的授權方式了}進行授權。
最終會將Toekn返回到指定的頁面。
4.最安全的授權碼模式,這種模式其實流程和隱式授權模式是一樣的,當是請求的是Code類型:http:localhost:8500/sso/oauth/authorize?/client_id=web&response_type=code。
在訪問此地址依舊會進入回調地址,但是這時給的就是授權碼了,而不是直接返回Token。
然后我們可以使用這個授權碼和secret來獲取Token。
5.最后還有一個是刷新Token用的模式,當我們的Token過期時,我們就可以使用refresh_token來申請一個新的Token,當我們使用授權碼模式時,在成功驗證以后驗證服務器會返回一個refresh_token,如果我們需要刷新一下Token,就執行下面操作。
?在SecurityConfiguration配置類中將UserDetailsService注入spring容器中。
@Bean@Overridepublic UserDetailsService userDetailsServiceBean() throws Exception {return super.userDetailsServiceBean();}在OAuth2Configuration類中使用UserDetailsService。
@ResourceUserDetailsService service;@Overridepublic void configure(AuthorizationServerEndpointsConfigurer endpoints) {endpoints.userDetailsService(service).authenticationManager(manager);//由于SpringSecurity新版本的一些底層改動,這里需要配置一下authenticationManager,才能正常使用password模式}進行測試。
redis與分布式(docker模擬集群搭建)
拉取redis文件。
docker pull redis在云服務器上創建等會需要掛載的目錄和文件
- 創建data目錄(存放數據文件,包括用于持久化的dump.rdb)
- 創建配置文件
?創建redis容器,運行redis,并進行數據和配置文件的掛載。
docker run -p 3346:6379 --name redis -v /mydata/redis/data:/data \ -v /mydata/redis/conf/redis.conf:/etc/redis/redis.conf \ -d redis redis-server /etc/redis/redis.conf:左邊的表示云服務器的真實端口,右邊表示redis容器中的端口。
--name 表示為容器設置的名稱,-d 后面的名稱表示鏡像的名稱。
進入對應的容器
docker exec -it 容器對應id或是容器名稱 bash #進入到docker中 redis-cli #加入容器中的redis中#也可以直接進入 docker exec -it 容器對應的id或是容器的名稱 redis-cli效果圖:
docker 搭建redis集群(使用三個redis搭建集群)
?在對應的目錄下創建用來掛在數據的目錄。
mkdir -p /mydata/redis/cluster/node1/data #集群一 mkdir -p /mydata/redis/cluster/node2/data #集群二 mkdir -p /mydata/redis/cluster/node3/data #集群三為了方便我們這里就不去手動修改redis.conf,我們這里使用命令我們設置操作。
1.為先創建對應的redis容器。
docker create --name redis-node1 -v /mydata/redis/cluster/node1/data:/data \ -p 3346:6379 redis --cluster-enabled yes \ --cluster-config-file redis-node1.conf首先間將存儲數據目錄進行掛載,配置端口,設置集群模式為true,設置集群的配置文件,做集群的時候會將一些配置添加到該配置文件中。(該文件會自動生成)
2.啟動該容器。
docker start redis-node1另一個redis的配置跟redis-node1是一樣的,只要將容器名稱和掛載路徑進行修改即可。
?現在我們只是單純的搭建了兩個單獨的redis,我們還需要將兩者聯系起來。
因為我們設置docker中部署,所以docker 會給每一個容器分配一個ip地址,我們需要查看對應的ip才能進行聯系。
查看方式。
docker inspect 對應的容器名ip為IpAddress的參數。?
?
執行命令創建集群。
redis-cli --cluster create 172.17.0.3:6379 172.17.0.4:6379 172.17.0.4:6379 --cluster-replicas 0這里cluster-replicas表示主從比例,我們設置為0,就說明全部為master,如果我們需要做主從關系的話,也就是不將cluster-replicas的比例設置為0,那我們需要保證有三個以上的master(主節點),不然是無法搭建集群的。(并且搭建集群的最少節點數為3個,就是需要保證有三個master)
錯誤信息:
按其默認的配置,我們輸入"yes"?。
測試集群是否生效
?加入對應的容器。
docker exec -it redis-node1 Redis-cli -c #-c必須要加,表示以集群啟動,這樣在做數據庫操作時不會因為插槽的限制而不完成不了操作因為redis-node1表示負責管理5798插槽的,所以集群就中找到負責管理該插槽的節點進行set操作。?
?接下來我們去其他節點查看是否將數據插入成功。
我們隨便在一個節點中查詢對應的值,即使該節點沒有需要的值,該集群會自動在節點中查找需要的值。
?查詢集群節點信息。
cluster nodes?如果需要刪除集群的話,我們只需要刪除集群中使用節點的集群配置信息redis-node*.conf。
如果需要在集群中添加從節點,我們可以執行對應命令:
redis-cli --cluster add-node 172.17.0.8:6379 --cluster-slave --cluster-master 對應的主節點的id如果需要提高java代碼獲取信息的話,我們就需要將各個節點的配置進行掛載,然后修改配置信息,將保護模式關閉,并將綁定Ip注釋掉。
?集群模式是自帶哨兵模式的。(當主節點down掉時,會將其從節點作為新的主節點)
哨兵模式的選舉規則
1.首先會根據優先級進行選擇,可以在配置文件中進行配置,添加"relica-priority"配置項(默認是100),越小表示的優先級就越高。
2.如果優先級一樣,那就選擇偏移量最大的。
3.要是還是選不出來,就選擇runid(啟動時隨機生成的)最小的。
實現分布式鎖
?為了解決電商超買的問題,我們可以通過redis設置分布式鎖。
#只有當key值不存在的時候才能進行設置,其實際上是set if no exists的縮寫 setnx key value利用這種特性,我們就可以在不同的服務中實現分布式鎖,如果某個服務器加了鎖但是卡頓了,或是直接崩潰了,那么這把鎖豈不是就永遠無法釋放了,因此我們可以考慮添加一個過期時間。
set key value EX num NX?這里使用set命令,最后加一個NX表示使用setx的模式,和上面一樣,但是可以通過EX設置過期時間,這里設置為num秒,也就是說如果num秒還沒釋放,那就自動刪除。
上鎖出現的問題
1.?
?2.?
?3.
?要解決這個問題,我們可以借助一下Redisson框架,它是Redis官方推薦的java版Redis客戶端,它提供的功能非常強大,也非常大,Redisson內部提供一個監控鎖的看門狗。它的作用是在Redisson實例被關閉前,不斷的延長鎖的有效期,它為我們提供了很多中分布式的實現,這里我們嘗試使用它的分布式鎖的功能。
導入依賴。
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.2.1</version></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.17.0</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.75.Final</version></dependency>?我們先測試沒有加鎖的情況。
測試代碼:
import redis.clients.jedis.Jedis;public class Main {public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {try(Jedis jedis = new Jedis("127.0.0.1", 6379)){for (int j = 0; j < 100; j++) { //每個客戶端獲取a然后增加a的值再寫回去,如果不加鎖那么肯定會出問題,在每次插入的過程中,其實內部值已經發生改變了,比如說同一時間,其獲取到a的值是相同的,也就是說這么多個插入最終只是+1。int a = Integer.parseInt(jedis.get("a")) + 1;jedis.set("a", a+"");}}}).start();}} }結果圖:?
沒有到1000,說明出現錯誤了,?加上鎖試試。
測試代碼:
import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import redis.clients.jedis.Jedis;public class Main {public static void main(String[] args) {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379"); //配置連接的Redis服務器,也可以指定集群RedissonClient client = Redisson.create(config); //創建RedissonClient客戶端for (int i = 0; i < 10; i++) {new Thread(() -> {try(Jedis jedis = new Jedis("127.0.0.1", 6379)){RLock lock = client.getLock("testLock"); //指定鎖的名稱,拿到鎖對象for (int j = 0; j < 100; j++) {lock.lock(); //加鎖int a = Integer.parseInt(jedis.get("a")) + 1;jedis.set("a", a+"");lock.unlock(); //解鎖}}System.out.println("結束!");}).start();}} }效果圖:
符合最終的結果。?
Mysql與分布式??
?主從復制
當我們使用Mysql的時候,也可以采用主從復制的策略,它的實現基本和Redis相似,也可以采用增量復制的方式,Mysql會在運行的過程中,會記錄二進制日志,所有的DML和DDL操作都會被記錄進日志中,主數據庫只需要將記錄的操作復制給從庫,讓從庫也運行一次,那么就可以實現主從復制,但是注意它不會在一開始進行全量復制,所以最好在開始主從復制之前將數據庫的內容保持一致。
和Redis一樣,一但我們實現了主從復制,那么就算主庫出現故障,從庫也能正常提供服務,并且可以實現讀寫分離等操作,這里我們使用一主一從方式來搭建。?
通過docker 拉取鏡像,通過設置兩個不同的端口,啟動兩個mysql。
docker run -p 3346:3306 --name main_mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql #-p 設置端口進行端口映射,-e編輯mysql root用戶的密碼,-d表示后太啟動 docker run -p 3347:3306 --name slave_mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql?5.進入容器。
docker exec -it 容器Id或name /bin/bash #再切目錄: cd /etc/mysql。 如果vim指令不存在,說明沒有安裝,我們需要進行安裝。 apt-get update apt-get install vim修改主從mysql的配置 my.cnf。
主表中進行配置:
#如果是在同一個服務器上那就要保證server-id不同,不然會發生沖突 server-id=100 #開啟二進制日志功能,因為是mater所以是必要的(名字自己定) log-bin=mysql-bin?從表中進行配置:
#設置server_id,注意要唯一 server-id=101 #開啟二進制日志功能,以備Slave作為其它Slave的Master時使用(子節點可以作為別人的主節點) log-bin=mysql-slave-bin #relay_log配置中繼日志 relay_log=edu-mysql-relay-bin??
重啟docker 容器使其配置生效。
docker restart main_mysql進入mysql。
mysql mysql -uroot -p123456創建用戶并授權允許從庫服務連接主庫的服務。
#創建一個用戶 賬號為:slave,密碼為:123456 CREATE USER test IDENTIFIED WITH mysql_native_password BY '123456';#給這個slave用戶授權,授權主從復制權限和主從復制的連接GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO test;如果報以下錯誤,說明已經存在主從關系了,我們需要先關閉主從關系。
stop slave; reset master;刷新一下mysql權限。
flush privileges查詢主庫File和Position。
show master status?查看docker 給對應的容器分配的ip。
docker inspect 對應的容器名?進入從數據庫,進行對指定對應的主庫。
#指定ip,端口,和用戶,將對應的mysql數據庫設置為該mysql數據庫的主節點,我們還需要指定主節點的二進制日志文件和主節點的偏移量和主節點的重連次數 change master to master_host='172.17.0.4', master_user='test', master_password='123456', master_port=3306, master_log_file='mysql-bin.000004', master_log_pos= 861, master_connect_retry=30;#例子二: change master to master_host='172.17.0.2', master_user='test', master_password='123456', master_port=3306, master_log_file='mysql-bin.000003', master_log_pos= 157, master_connect_retry=30;開啟主從復制功能。
start replica使用對應命令?查看從庫的狀態。
show slave status \G效果圖:
分庫分表?
?在大型的互聯網系統中,可能單臺的Mysql已經無法滿足業務的需求,這時候就需要進行擴容了。
單臺主機的硬件資源是存在瓶頸的,不可能無限制地擴展,這時候我們就得通過多臺實例來進行容量的橫向擴容,我們可以將數據分散儲存,讓多臺主機共同來保存數據。
擴容方法分為有兩種。
1.垂直拆分:我們的表和數據庫都可以進行垂直拆分的,就是將數據庫中所有的表按照業務功能進行拆分到各個數據庫中(有點類似微服務),而對于一張表也可以通過外鍵之類的機制將其拆分為多個表。
????????
?2.水平拆分:水平拆分針對的不是表,而是數據,我們可以讓很多個具有相同表的數據庫存放一部分數據,相當于是將數據分散存儲在各個節點上。
那么要實現這樣的拆分操作,我們自行去編寫代碼的工作量是比較大的,因此目前實際上已經有一些解決方案了,比如我們可以使用MyCat(也就是數據庫中間插件,相當于掛了一層代理,再通過MyCat進行分庫分表操作數據庫,只需要連接就可以使用,類似的還有ShardingSphere-Proxy)或是Sharding JDBC(應用程序中直接對SQL語句進行分析,然后轉換成分庫分表操作,需要我們自己編寫一些邏輯代碼)?
?Sharding JDBC?
shardingJDBC官網
?定位為輕量級Java框架,在java的JDBC層提供的額外服務,它使用客戶端直連數據庫,以Jar包形式提供服務,無需額外部署和依賴,可以理解為增強版的JDBC驅動,完全兼容JDBC和各種ORM框架。
1.適用于如何基于JDBC的ORM框架,如:JPA,Mybatis,Spring JDBC Template或直接使用JDBC。
2.支持任何第三方的數據庫連接池,如 DBCP,C3P0,BoneCP,HikariCP。
3.支持任意實現JDBC規范的數據庫,目前支持Mysql,Oracle,SQLServer以及任何可以使用JDBC的數據庫。
水平拆分實現
導入對應依賴。
<!-- shardingJDBC的依賴--><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId><version>5.1.1</version></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.2.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency>?在配置文件中配置數據源。
spring:shardingsphere:datasource:#幾個數據就配幾個,這里是名稱,格式就是名稱+數字names: db0,db1#為每個數據源單獨配置db0:#數據源實現類,這里默認使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3344/springcloudusername: rootpassword: 123456db1:#數據源實現類,這里默認使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3345/springcloudusername: rootpassword: 123456到目前配置為止,實際上這些配置都是常規的操作,在編寫代碼時關注點依然放在業務上,現在我們就來編寫配置文件,我們需要告訴ShardingJDBC要如何進行分片,首先明確:現在就是兩個數據庫都有Test表存放用戶數據,我們的目標是將用戶信息分別存放到這兩個數據庫表中。
?繼續修改配置文件,設置其切片模式。
spring:shardingsphere:rules:sharding:tables:#這里填寫表的名稱,程序中對這張表的所有操作都會采用下面的路由方案#比如我們上面Mybatis就是對test表進行操作,所以會走下面的路由方案test:#這里填寫實際的路由節點,比如我們要分兩個庫,那么就可以把兩個數據庫都寫上去,以及對應的表#也可以使用表達式,比如下面的可以簡寫為db$->{0,1}.testactual-data-nodes: db0.test,db1.test#這里是分庫的策略配置database-strategy:#這里選擇標準策略,也可以配置復雜策略,基于多個鍵進行分片standard:#參與分片運算的字段,下面的算法會根據這里提供的字段進行運算sharding-column: id#這里填寫我們下面自定義的算法名稱sharding-algorithm-name: my-algsharding-algorithms:#自定義用戶新的算法,名稱自定義my-alg:#算法類型,官方內置了很多種、type: MODprops:sharding-count: 2props:sql-show: true進行測試。
我們在mapper中編簡單的插入語句,對應其進行測試。
在測試類中編寫代碼。
@SpringBootTest class ShardingtestApplicationTests { @AutowiredUserMapper userMapper;@Testvoid contextLoads() {for(int i = 0; i < 10; i++) {userMapper.addUser(new User(i,"xxx", "cccc"));}}}?測試結果圖:
以為我們自定義的算法是通過取模的結果來存放到不同的數據庫中,在圖中我們可以發現取模為0時將數據存放到db0中,而取模為1時就將數據存放到db1中。
可以看到所有的SQL語句都有一個Logic SQL(這個就是我們在Mybatis里面寫的,是什么就是什么)緊接著下面就是Actual SQL,也就是說每個邏輯SQL最終會根據我們的策略轉換為實際SQL,它的id是0,那么實際轉換出來的SQL會在db0這個數據源進行插入。
我們查看數據庫中的信息。
分表查詢和查詢操作
現在我們在我們的數據庫中有test_0,test_1兩張表,表結構一樣,但是我們也可以希望能夠根據id取模運算的結果分別放到這兩個不同的表中,其實和分庫差不多。
1.邏輯表:相同結構的水平拆分數據庫(表)的邏輯名稱,是SQL中表的邏輯標識。如:訂單數據根據主鍵尾數拆分為10張表,分別是t_order_0到t_order_9?,它們的邏輯表名為t_order。
2.真實表:在水平分的數據庫中真實存在的物理表,即上個例子中的t_order_0到t_order_9。
我們創建兩個跟test表結構相同的test_0,test_1。
修改配置文件改為分表操作。
spring:shardingsphere:rules:sharding:tables:test:actual-data-nodes: db0.test_$->{0..1}#現在我們來配置一下分表策略,注意這里是table-strategy上面是database-strategytable-strategy:#基本都跟之前是一樣的standard:sharding-column: idsharding-algorithm-name: my-algsharding-algorithms:my-alg:#這里我們演示一下INLINE方式,我們可以自行編寫表達式來決定,自己編寫表達式type: INLINEprops:#比如我們還是希望進行模2計算得到數據該去的表#只需要給一個最終的表名稱就行了test_,后面的數字是表達式取模算出的#實際上這樣寫和MOD模式一模一樣algorithm-expression: test_$->{id % 2}#沒錯,查詢也會根據分片策略來進行,但是如果我們使用的是范圍查詢,那么依然會進行全量查詢#這個我們后面緊接著會講,這里先寫上吧allow-range-query-with-inline-sharding: falseprops:sql-show: true插入數據效果圖:?
?在id去摸為0時就插入test_0,取模為1時就插入test_1。
接下來我們進行范圍查詢操作,在Mapper層中編寫范圍查詢。
@Mapper public interface UserMapper {@Select("select * from test where id between #{startId} and #{endId}")List<User> getUserById(int startId, int endId); }在test層做測試。?
@Testvoid contextLoads() {System.out.println(userMapper.getUserById(3,5));}運行測試,發生報錯。
???????
這是因為默認情況下?allow-range-query-with-inline-sharding配置就是為false,就是不支持范圍查詢,所以我們要將其設置為true。
重新運行的效果圖:
分布式序列算法?
在復雜分布式系統中,特別是微服務架構中,往往需要大量的數據和信息進行唯一的標識,隨著系統的復雜,數據的增多,分庫分表成為了常見的方案,對數據分庫分表后需要有一個唯一的Id來標識一條數據后消息(如訂單號,交易流水,事件編號等),此時一個能夠生成全局唯一Id的系統是非常重要的。
我們之前創建過學生信息表,圖書借閱表,圖書管理表,所有的信息都會有一個Id作為主鍵,并且這個Id有以下要求:
1.為了區別其他的數據,這個Id必須是全局唯一的。
2.主鍵應該盡可能的保持有序,這樣會大大提高索引的查詢效率。
?在我們的分布式系統下有兩種方案來解決此問題
1.使用UUID:UUID是由一組32位數的16進制數字隨機生成的,我們可以直接使用JDK為我們提供的UUID類來實現。
@Testvoid test01() {System.out.println(UUID.randomUUID().toString());}效果圖:
UUID的生成速度非常快,可以看到確實是能夠保證唯一性,因為每都不一樣,而且這樣長一串那重復的幾率會非常小。但是它并不滿足我們上面的第二個要求,也就是說我們需要盡可能的保證有序,而這里我們得到的都是一些無序的Id。
2.雪花算法(Snowflake)
?它會生成一個64bit大小的整型的Id,int肯定是裝不下的。
可以看到它主要是三個部分組成的,時間+工作機器Id+序列號,時間以毫秒為單位,41個bit位能表示約為70年的時間,時間紀元從2016年11月1日零點開始,可以使用到2086年,工作機器ID其實就是節點Id,每個節點的Id都不同,那么就可以區分出來,10個bit位可以表示最多1024個節點,最后12位就是每個節點的序列號,因此每臺機器每毫秒?就可以有4096個序列號。
它兼具了上面所說的唯一性和有序性了,但是依然是有缺點的,第一是時間問題,如果機器時間出現倒退,那么就會導致生成重復的Id,并且節點容量只有1024個,如果是超大規模集群,也是存在隱患的。
?修改數據庫的id類型,因為是要裝下64的數據,所以我們要為其配置bigint類型。
設置mybatis的插入操作。
@Update("insert into test(name, password) values(#{name},#{password} )")int addUser2(User user); }?修改配置文件。
spring:shardingsphere:datasource:#幾個數據就配幾個,這里是名稱,格式就是名稱+數字names: db0,db1#為每個數據源單獨配置db0:#數據源實現類,這里默認使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3344/springcloudusername: rootpassword: 123456db1:#數據源實現類,這里默認使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3345/springcloudusername: rootpassword: 123456rules:sharding:tables:test:actual-data-nodes: db0.test,db1.test#這里還是使用分庫策略database-strategy:standard:sharding-column: idsharding-algorithm-name: my-alg#這里使用自定義的主鍵生成策略key-generate-strategy:column: idkey-generator-name: my-genkey-generators:#這里寫我們自定義的主鍵生成算法my-gen:#使用雪花算法type: SNOWFLAKEprops:#工作機器ID,保證唯一就行worker-id: 666sharding-algorithms:my-alg:type: MODprops:sharding-count: 2props:sql-show: true測試類代碼。
@Testvoid contextLoads() {for(int i = 0; i < 10; i++) {userMapper.addUser2(new User("aaa", "cccc"));}}效果圖:
數據庫信息:
?
?如果我們要使用UUID的話,只要在配置文件中將自定義生成主鍵算法的type改為UUID即可。
讀寫分離
?在從表中的配置文件中設置開啟只讀模式 read-only=1。(如果你是root的話還是可以入數據的,而普通用戶就只能讀取了)
?配置好主從關系。(前講過了)
然后修改配置文件。
spring:shardingsphere:datasource:#幾個數據就配幾個,這里是名稱,格式就是名稱+數字names: db0,db1#為每個數據源單獨配置db0:#數據源實現類,這里默認使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3344/springcloudusername: rootpassword: 123456db1:#數據源實現類,這里默認使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3345/springcloudusername: rootpassword: 123456rules:#配置讀寫分離readwrite-splitting:data-sources:#名稱隨便寫user-db:#使用靜態類型,動態Dynamic類型可以自動發現auto-aware-data-source-nametype: Staticprops:#配置寫庫(只能一個)write-data-source-name: db0#配置從庫(多個,逗號隔開)read-data-source-names: db1#負載均衡策略,可以自定義load-balancer-name: my-loadload-balancers:#自定義的負載均衡策略my-load:type: ROUND_ROBINprops:sql-show: true測試代碼:
@Testvoid contextLoads() {for(int i = 0; i < 10; i++) {userMapper.addUser(new User(i,"aaa", "cccc"));}System.out.println(userMapper.getUserById(3, 5));}?測試效果圖:
?插入語句全部在主節點數據庫中執行,而查詢操作都在從節點數據庫中操作。
RabbitMQ(消息隊列)
我們之前如果需要進行遠程調用,那么一般可以通過發送HTTP請求完成,而現在,我們可以使用第二種方式,就是消息隊列,它能夠將發送的消息放到隊列中,當新的消息入列時,會通知接收方進行處理,一般消息發送稱為生產者,接收方稱為消費者。
這樣我們所有的請求都可以直接丟到消息隊列中,再由消費者取出,不再是直接連接消費者的形式了,而是加了一個中間商,這也是一種很好的解決方案,并且在高并發的情況下,消息隊列也能起到綜合的作用,堆積一部分請求,再由消費者來慢慢處理,而不會像直接調用那樣請求蜂擁而至。
消息隊列的具體實現:
在云服務器上安裝和部署(在docker進行)
在docker 中拉去Ribbitmq鏡像。
在docker 中運行ribbitmq。
docker run -d -p 5672:5672 -p 15672:15672 -p 25672:25672 --name rabbitmq rabbitmq?查看rabbitmq的狀態。
rabbitmqctl status接著我們還可以將Rabbitmq的管理面板開啟,這樣就可以在瀏覽器上進行實時訪問和監控了。?
rabbitmq-plugins enable rabbitmq_management?開啟面板。
?賬號和密碼都為:guest。
給Rabbitmq設置新的用戶。
rabbitmqctl add_user 用戶名 密碼給予新的用戶管理員權限。
rabbitmqctl set_user_tags 用戶名 administrator消息隊列的基本原理:
????????
生產者(Publisher)和消費者(Consumer):不用多說了吧。
Channel:我們的客戶端連接都會使用一個Channel,再通過Channel去訪問到RabbitMQ服務器,注意通信協議不是http,而是amqp協議。
Exchange:類似于交換機一樣的存在,會根據我們的請求,轉發給相應的消息隊列,每個隊列都可以綁定到Exchange上,這樣Exchange就可以將數據轉發給隊列了,可以存在很多個,不同的Exchange類型可以用于實現不同消息的模式。
Queue:消息隊列本體,生產者所有的消息都存放在消息隊列中,等待消費者取出。
Virtual Host:有點類似于環境隔離,不同環境都可以單獨配置一個Virtual Host,每個Virtual Host可以包含很多個Exchange和Queue,每個Virtual Host相互之間不影響。
?如果出現以下錯誤,需要在rabbitmq的配置文件進行更改
?修改方式為下:
因為是使用docker 容器安裝的,所有需要進入容器 docker exec -it rabbitmq /bin/bash進入目錄 cd /etc/rabbitmq/conf.d/執行命令 echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf退出容器 exit重啟rabbitmq docker restart rabbitmqrabbitmq的使用
?1.最簡單模式:
?(一個生產者->消息隊列->一個消費者)
生產者只需要將數據丟入消息隊列,二消費者只需要將數據從消息隊列中取出,這樣就實現了生產者和消費者的消息交互。
創建測試環境。
當前的用戶就添加了剛剛我們新建的測試環境。
現在我們來看看交換機。
?交換機列表中自動為我們新增了剛剛創建好的預設交換機,一共7個。
?
?第一個交換機是所有虛擬主機都會自帶的一個默認交換機,并且此交換機不可能刪除,此交換機默認綁定所有的消息隊列,如果是通過默認交換機發送消息,那么就會根據消息的"rountKey"(類似IP地址)決定發送給哪個同名的消息隊列(是消息隊列的名稱不是它的routingKey),同時也不能顯示地將消息隊列綁定或解綁到此交換機。
?我們可以看到詳細信息中,當前交換機特性是持久化的,也就是說就算機器重啟,那么此交換機也會被保留,如果不是持久化,那么一旦重啟就會消失,實際上我們在列表中看到D字樣就是表示此交換機是持久化的,包括消息隊列也是這樣的,所有自動生成的交換機都是持久化的。
?第二個交換機是個直連的交換機。
?這個交換機和我們剛剛介紹的默認交換機類型是一致的,并且也是持久化的,但是我們可以看到它是具有綁定關系的,如果沒有指定的消息隊列綁定到此交換機上,那么這個交換機會無法正常將信息存放到指定的消息隊列中,也是根據對應的routingKey尋找消息隊列(但是可以自定義)
?創建隊列。
在我創建隊列的選項中的auto delete的作用是: 需要至少有一個消費者連接到這個隊列,之后,一旦所有與這個隊列連接的消費斷開時,就會自動刪除此隊列。
?通過默認交換機綁定我們創建的隊列并將數據傳入消息隊列中,因為我們默認的交換機是自動綁定的,我們直接傳入數據。
?現在在queueTest中就存在一條數據了。
?獲取數據。
?在獲取數據位置有四種消息的處理方式。
1.Nack message requeue true:拒絕消息,也就是說不會將消息從消息隊列取出,并且重新排隊,一次可以拒絕多個消息。
2. Ack message requeue false:確認應答,確認后消息會從消息隊列中移除,一次可以確認多個消息。
3.Reject message requeue true/false:也是拒絕此消息,但是可以指定是否重新排隊。
?而我們的通過綁定直接交換機也可以達到將數據傳入消息隊列中并取出的效果,我們在直接交換機中綁定隊列并發送消息到隊列中。
在對應的隊列中獲取消息。
?刪除隊列中的所有消息。
?刪除此隊列。
?使用java操作消息隊列
導入對應依賴。
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version></dependency>我們來思想一下生產者和消費者,首先是生產者,生產者負責將信息發送給消息隊列。
@Testvoid contextLoads() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("test");factory.setPassword("123456");factory.setVirtualHost("/test");try {factory.newConnection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}這里我們可以直接在程序中定義并創建消息隊列(實際上是和我們在管理界面創建一樣的效果)客戶端需要通過連接創建一個新的通道,同一個連接下可以有很多個通道,這樣就不用創建很多個連接也能支持分開發送。?
@Testvoid contextLoads() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("test");factory.setPassword("123456");factory.setVirtualHost("/test");//創建連接try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){ //通過Connection創建新的Channel//聲明隊列,如果此隊列不存在,會自動創建channel.queueDeclare("queueTest", false, false, false, null);//將隊列綁定到交換機channel.queueBind("queueTest", "amq.direct", "queuekey");//發布新的消息,注意消息需要轉換為byte[]channel.basicPublish("amq.direct", "queuekey", null, "Hello World!".getBytes());}catch (Exception e){e.printStackTrace();}}其中queueDeclare方法的參數如下:
queue:隊列的名稱(默認創建后routingKey和隊列名稱一致)
durable:是否持久化。
exclusive:是否排他,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,并在連接斷開時自動刪除。排他隊列是基于Connection可見,同一個Connection的不同Channel是可以同時訪問同一個連接創建的排他隊列,并且,如果一個Connection已經聲明了一個排他隊列,其他的Connection是不允許建立同名的排他隊列的,即使該隊列是持久化的,一旦Connection關閉或者客戶端退出,該排他隊列都會自動被刪除。
autoDelete:是否自動刪除。
arguments:設置隊列的其他一些參數,這里我們暫時不需要什么其他參數。
其中queueBind方法參數如下:
queue:需要綁定的隊列名稱。
exchange:需要綁定的交換機名稱。
routingKey:不用多說了吧。
其中basicPublish方法的參數如下:
exchange: 對應的Exchange名稱,我們這里就使用第二個直連交換機。
routingKey:這里我們填寫綁定時指定的routingKey,其實和之前在管理頁面操作一樣。
props:其他的配置。
body:消息本體。
接著我們運行測試代碼,并在控制面板中測試。
消費者測試。????????
@Testvoid contextLoads() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("test");factory.setPassword("123456");factory.setVirtualHost("/test");//這里不使用try-with-resource,因為消費者是一直等待新的消息到來,然后按照//我們設定的邏輯進行處理,所以這里不能在定義完成之后就關閉連接Connection connection = factory.newConnection();Channel channel = connection.createChannel();//創建一個基本的消費者channel.basicConsume("queueTest", false, (s, delivery) -> {//delivery里面是消息的一些內容System.out.println(new String(delivery.getBody()));//basicAck是確認應答,第一個參數是當前的消息標簽,第二個的參數表示//是否批量處理消息隊列中所有的消息,如果為false表示只處理當前消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//basicNack是拒絕應答,最后一個參數表示是否將當前消息放回隊列,如果//為false,那么消息就會被丟棄//channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);//跟上面一樣,最后一個參數為false,只不過這里省了//channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);}, s -> {});其中basicConsume方法參數如下:
queue - 消息隊列名稱,直接指定。
autoAck - 自動應答,消費者從消息隊列取出數據后,需要跟服務器進行確認應答,當服務器收到確認后,會自動將消息刪除,如果開啟自動應答,那么消息發出后會直接刪除。
deliver - 消息接收后的函數回調,我們可以在回調中對消息進行處理,處理完成后,需要給服務器確認應答。
cancel - 當消費者取消訂閱時進行的函數回調,這里暫時用不到。
在springBoot整合消息隊列?
導入對應依賴。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>?在配置文件中修改配置。
spring:rabbitmq:addresses: 127.0.0.1username: testpassword: 123456virtual-host: /test我們創建一個配置類。
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitConfiguration {@Bean("directExchange") //定義交換機Bean,可以很多個public Exchange exchange(){return ExchangeBuilder.directExchange("amq.direct").build();}@Bean("queueTest") //定義消息隊列public Queue queue(){return QueueBuilder.nonDurable("queueTest") //非持久化類型.build();}@Bean("binding")public Binding binding(@Qualifier("directExchange") Exchange exchange,@Qualifier("queueTest") Queue queue){//將我們剛剛定義的交換機和隊列進行綁定return BindingBuilder.bind(queue) //綁定隊列.to(exchange) //到交換機.with("queuekey") //使用自定義的routingKey.noargs();} }?接下來我們來創建一個生產者,這里我們直接編寫在測試用例中:
//RabbitTemplate為我們封裝了大量的RabbitMQ操作,已經由Starter提供,因此直接注入使用即可@ResourceRabbitTemplate template;@Testvoid publisher() {//使用convertAndSend方法一步到位,參數基本和之前是一樣的//最后一個消息本體可以是Object類型,真是大大的方便template.convertAndSend("amq.direct", "queueTest", "Hello World!");}創建消費者,創建監聽器。
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest") //定義此方法為隊列queueTest的監聽器,一旦監聽到新的消息,就會接受并處理public void test(Message message){ //如果我們Message的類型改為String類型也是可以的,它會自動將我們的數據轉換為String類型System.out.println(new String(message.getBody()));} }效果圖:
我們可以往消息隊列中提交數據,然后等待消費者返回信息。
@Testvoid publisher() {//使用convertAndSend方法一步到位,參數基本和之前是一樣的//最后一個消息本體可以是Object類型,真是大大的方便Object queuekey = template.convertSendAndReceive("amq.direct", "queuekey", "Hello World!");System.out.println("收到消費者的響應");}消費者的響應方式,就是通過配置監聽器的監聽方法的返回值來實現。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest") //定義此方法為隊列queueTest的監聽器,一旦監聽到新的消息,就會接受并處理public String test(String message){System.out.println(message);return "消費者已經做出響應";} }進行測試。
消息隊列處理json數據?
如果需要傳入的數據為json類型時我們該怎么做呢?
在rabbitmq中配置類中將json數據轉換器注入spring中。
@Configuration public class RabbitConfiguration {@Bean("jacksonConverter") //直接創建一個用于JSON轉換的Beanpublic Jackson2JsonMessageConverter converter(){return new Jackson2JsonMessageConverter();}}在消費者類中指定消息轉換器。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter")public void receiver(User user){System.out.println(user);} }進行測試。
因為我們在spring注入json轉換器,所以我們可以在測試類中直接給傳入數據實現類,其會自動將我們的實現類轉換為json類型。
實現類:
@Data public class User {int id;String name; }發送消息到消息隊列中的測試類。
@Testvoid publisher() {template.convertAndSend("amq.direct", "queuekey",new User());}測試結果圖:
?死信隊列
?消息隊列中的數據,如果遲遲沒有消費者來處理,那么就會一直占用消息隊列的空間,比如我們模擬一下搶車票的場景,用戶下單高鐵之后,會進行搶座,然后再進行付款,但是如果用戶下單之后并沒有及時的付款,這張票不可能一直讓該用戶占用著,因為你不買別人還要買呢,所以會在一段時間后超時,讓這張票可以繼續被其他人購買。
這時,我們就可以使用死信隊列,將那些用戶超時未付款的或是用戶主動取消的訂單,進行進一步的處理,以下類型的消息都會被判定為死信。
1.消息被拒絕(basic.reject/basic.nack),并且requeue = false。
2.消息TTL過期。
3.隊列達到最大值。
?那么如何構建這樣的一種使用模式呢?實際上本質上就是一個死信交換機+綁定的死信隊列,當正常隊列中的消息被判定為死信時,會被發送到對應的死信交換機中,死信隊列也有對應的消費者去處理消息。
這里我們直接在消息隊列配置類中創建一個新的死信隊列,并對其進行綁定。
@Bean("directDlExchange")public Exchange dlExchange(){//創建一個新的死信交換機//在這里做配置的話,即使在交換機中沒有該交換機,其也會自動被創建return ExchangeBuilder.directExchange("dlx.direct").build();}@Bean("testDlQueue") //創建一個新的死信隊列public Queue dlQueue(){return QueueBuilder.nonDurable("testDl").build();}@Bean("dlBinding") //死信交換機和死信隊列進綁定public Binding dlBinding(@Qualifier("directDlExchange") Exchange exchange,@Qualifier("testDlQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("queueDlKey").noargs();}?再將該隊列綁定為其他隊列的死信隊列。
@Bean("queueTest") //定義消息隊列public Queue queue(){return QueueBuilder.nonDurable("queueTest") //非持久化類型.deadLetterExchange("dlx.direct").deadLetterRoutingKey("queueDlKey").build();}修改消費者類的配置信息,將今天的隊列改為死信隊列,當死信隊列有消息時就進行消費。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "testDl", messageConverter = "jacksonConverter")public void receiver(User user){System.out.println(user);} }測試情況一
當前消息隊列中存在一個消息,我們對獲取其消息并將其在放到消息隊列中,讓死信隊列處理。
可以發現死信隊列處理了這個消息。?
?測試情況二
在rabbitmq的配置類中進行配置,配置隊列中消息的生命周期。
@Bean("queueTest") //定義消息隊列public Queue queue(){return QueueBuilder.nonDurable("queueTest") //非持久化類型.deadLetterExchange("dlx.direct").deadLetterRoutingKey("queueDlKey").ttl(500).build();}可以發現對消息隊列發送的消息在0.5秒后就被死信隊列消費了。?
?測試第三中情況
在rabbitmq的配置類中對消息隊列的長度進行配置。?
@Bean("queueTest") //定義消息隊列public Queue queue(){return QueueBuilder.nonDurable("queueTest") //非持久化類型.deadLetterExchange("dlx.direct").deadLetterRoutingKey("queueDlKey").maxLength(3)//設置消息隊列的長度.build();}往消息隊列中添加四組數據,我們可以發現第一組數據被死信隊列消費了。
?工作隊列模式
?實際上這種模式就非常適合多個工人等待新的任務到來的場景,我們的任務有很多個,一個一個丟到消息隊列中,而此時個人有很多個,那么我們就可以將這些任務分配給各個工人,讓他們各自負責一些任務,并且做的快的工人還可以多完成一些。(能者多勞)
我們只需要創建多個監聽器即可。(這里我們就先創建兩個監聽器)
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter")public void receiver1(User user){System.out.println(user);}@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter")public void receiver2(User user){System.out.println(user);} }在隊列中添加多個消息,觀察消費者的處理情況。
可以發現消費者采用的是輪番的策略,進行消息消費的。
默認情況下,一個消費者可以同時處理250個消息,我們也可以對其進行修改。
?在rabbitmq的配置類中條件監聽器創建者工廠。
@Resourceprivate CachingConnectionFactory connectionFactory;@Bean(name = "listenerContainer")public SimpleRabbitListenerContainerFactory listenerContainer(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPrefetchCount(1); //將PrefetchCount設定為1表示一次只能取一個return factory;}?在消費者類中設置對應的監聽器生產者。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter", containerFactory = "listenerContainer")public void receiver1(User user){System.out.println("消費者1處理消息" + user);}@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter", containerFactory = "listenerContainer")public void receiver2(User user){System.out.println("消費者2處理消息" + user);} }進行測試。
當我們想同時創建多個功能相同的消費者時,我們只要進行下列配置即可。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter", concurrency = "count")public void receiver1(User user){System.out.println("消費者1處理消息" + user);}}測試效果圖:
發布訂閱模式
比如我們在購買了云服務器,但是最近快到期了,那么就會給你的手機和郵箱發送消息,告訴你需要續費了,但是手機短信和郵箱發送?并不是同一個業務提供的,但是現在我們又希望能夠都去執行,所以就可以用到發布訂閱模式,簡而言之就是,發布一次,消費多個。
因為我們之前使用的是直連交換機,是一對一的關系,肯定是不行的,我們這里需要用到另一種類型的交換機,叫做fanout(扇出)類型,這是一種廣播類型,消息會被廣播到所有與此交換機綁定的消息隊列中。
?在rabbitmq配置類中進行配置,創建多個隊列,并將這些對應的隊列綁定到扇出交換機上。
@Configuration public class RabbitConfiguration {@Resourceprivate CachingConnectionFactory connectionFactory;@Bean("fanoutExchange")public Exchange exchange(){//注意這里是fanoutExchangereturn ExchangeBuilder.fanoutExchange("amq.fanout").build();}@Bean("queueTest1") //定義消息隊列public Queue queue1(){return QueueBuilder.nonDurable("queueTest1") //非持久化類型.build();}@Bean("queueTest2") //定義消息隊列public Queue queue2(){return QueueBuilder.nonDurable("queueTest2") //非持久化類型.build();}@Bean("binding1")public Binding binding1(@Qualifier("fanoutExchange") Exchange exchange,@Qualifier("queueTest1") Queue queue){//將我們剛剛定義的交換機和隊列進行綁定return BindingBuilder.bind(queue) //綁定隊列.to(exchange) //到交換機.with("queuekey1") //使用自定義的routingKey.noargs();}@Bean("binding2")public Binding binding2(@Qualifier("fanoutExchange") Exchange exchange,@Qualifier("queueTest2") Queue queue){//將我們剛剛定義的交換機和隊列進行綁定return BindingBuilder.bind(queue) //綁定隊列.to(exchange) //到交換機.with("queuekey2") //使用自定義的routingKey.noargs();}@Bean("jacksonConverter") //直接創建一個用于JSON轉換的Beanpublic Jackson2JsonMessageConverter converter(){return new Jackson2JsonMessageConverter();}}修改監聽器。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest1", messageConverter = "jacksonConverter")public void receiver1(User user){System.out.println("隊列一接收到消息" + user);}@RabbitListener(queues = "queueTest2", messageConverter = "jacksonConverter")public void receiver2(User user){System.out.println("隊列二接收到消息" + user);} }?測試結果圖:
在對應的交換機中沒有指定routingKey時發送數據,兩個隊列都會收到消息。
?路由模式
?我們可以在綁定時指定想要的routingKey只有生產者發送時指定了對應的routingKey才能到達對應的隊列。
?當然除了我們之前的一次綁定之外,同一個消息隊列可以多次綁定到交換機,并且使用不同的routingKey,這樣只要滿足其中一個都可以被發送到此消息隊列中。
在rabbitmq的配置類中進行配置。
@Configuration public class RabbitConfiguration {@Bean("directExchange")public Exchange exchange(){return ExchangeBuilder.directExchange("amq.direct").build();}@Bean("queueTest")public Queue queue(){return QueueBuilder.nonDurable("queueTest").build();}@Bean("binding") //使用yyds1綁定public Binding binding(@Qualifier("directExchange") Exchange exchange,@Qualifier("queueTest") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("key1").noargs();}@Bean("binding2") //使用yyds2綁定public Binding binding2(@Qualifier("directExchange") Exchange exchange,@Qualifier("queueTest") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("key2").noargs();} }?修改監聽器。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest")public void receiver1(String message) {System.out.println("隊列一接收到消息" + message);} }我們在交換機中添加兩條消息,分別通過不同的routingKey。
進行測試,通過不同的routingkey進入了同一個消息隊列中?。
主題模式
?實際上這種模式就是一種模糊匹配模式,我們可以將routingKey以模糊匹配的方式去進行轉發。?
?我們可以使用*或#來表示:
1.*:表示容易的一個單詞。
2.#:表示0個或多個單詞。
?修改rabbitmq的配置類。
@Configuration public class RabbitConfiguration {@Bean("topicExchange") //這里使用預置的Topic類型交換機public Exchange exchange(){return ExchangeBuilder.topicExchange("amq.topic").build();}@Bean("queueTest")public Queue queue(){return QueueBuilder.nonDurable("queueTest").build();}@Bean("binding")public Binding binding2(@Qualifier("topicExchange") Exchange exchange,@Qualifier("queueTest") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("*.test.*").noargs();} }在預設的topic交換機中以a.test.c?作為routingKey將數據傳入對應的消息隊列中。
?消費者去消費了此消息。
"#"也是差不多的效果。
?除了我們這里使用的默認主題交換機之外,還有一個叫做amq.rabbitmq.trace的交換機。
?可以看到它也是topic類型的,那么這個交換機是做什么的呢?實際上這個是用于幫助我們記錄和追蹤生產者和消費者使用消息隊列的交換機,它是一個內部的交換機。
接著我們需要在rabbitmq主機中將/test的追蹤功能開啟。
rabbitmqctl trace_on -p /test?創建新的消息隊列。
?將消息隊列綁定到amq.rabbitmq.trace交換機上,要將生產者輸入的交換機和消費者獲取數據的隊列全部存放到剛剛那個trace消息隊列中。?
?我們獲取trace中的消息,會得到一個交換機和消息隊列。
?第四種交換機類型
?第四種交換機類型header,它是根據頭部消息來決定的,在我們發送的消息中是可以攜帶一些頭部消息的(類似于HTTP),我們可以根據這些頭部信息來決定路由哪個消息隊列中。
???????修改rabbitmq的配置類。
@Configuration public class RabbitConfiguration {@Bean("headerExchange") //注意這里返回的是HeadersExchangepublic HeadersExchange exchange(){return ExchangeBuilder.headersExchange("amq.headers") //RabbitMQ為我們預置了兩個,這里用第一個就行.build();}@Bean("queueTest")public Queue queue(){return QueueBuilder.nonDurable("queueTest").build();}@Bean("binding")public Binding binding2(@Qualifier("headerExchange") HeadersExchange exchange, //這里和上面一樣的類型@Qualifier("queueTest") Queue queue){return BindingBuilder.bind(queue).to(exchange) //使用HeadersExchange的to方法,可以進行進一步配置//.whereAny("a", "b").exist(); 這個是只要存在任意一個指定的頭部Key就行//.whereAll("a", "b").exist(); 這個是必須存在所有指定的的頭部Key.where("test").matches("hello"); //比如我們現在需要消息的頭部信息中包含test,并且值為hello才能轉發給我們的消息隊列//.whereAny(Collections.singletonMap("test", "hello")).match(); 傳入Map也行,批量指定鍵值對} }將數據傳入amq.header交換機,并設置頭部信息,進行測試。
查看queueTest隊列的消息,可以發現消息隊列中的消息被消費了。
docker 搭建rabbitmq集群?
下載rabbitmq的管理者版本。
docker pull rabbitmq:3.9.5-management創建三個rabbitmq。(如果需要查看各自的控制面板的話,我們只需要為每個rabbitmq綁定15672端口)
docker run -d --hostname myRabbit1 --name rabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.9.5-management docker run -d --hostname myRabbit2 --name rabbit2 -p 5673:5672 --link rabbit1:myRabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.9.5-management docker run -d --hostname myRabbit3 --name rabbit3 -p 15673:5672 --link rabbit1:myRabbit1 --link rabbit2:myRabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.9.5-management-e RABBITMQ_ERLANG_COOKIE=‘rabbitcookie’ : 必須設置為相同,因為 Erlang節點間是通過認證Erlang cookie的方式來允許互相通信的。
–link rabbit1:myRabbit1 --link rabbit2:myRabbit2: 不要漏掉,否則會 一直處在 Cluster status of node rabbit@myRabbit3 … 沒有反應。
啟動完成之后,使用docker ps命令查看運行情況,確保RabbitMQ都已經啟動。
將RabbitMQ節點加入到集群。
#進入rabbitmq02容器,重新初始化一下,將02節點加入到集群中 docker exec -it rabbit2 bash rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster --ram rabbit@myRabbit1 #參數“--ram”表示設置為內存節點,忽略該參數默認為磁盤節點,@后面的為ip名,以為我們在啟動rabbitmq時給其ip設置了新的名字,且我們以一節點作為主節點其他作為從節點。 rabbitmqctl start_app exit#進入rabbitmq03容器,重新初始化一下,將03節點加入到集群中 docker exec -it rabbit3 bash rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster --ram rabbit@myRabbit1 rabbitmqctl start_app exit內存節點將所有的隊列、交換器、綁定、用戶等元數據定義都存儲在內存中;而磁盤節點將元數據存儲在磁盤中。單節點系統只允許磁盤類型的節點,否則當節點重啟以后,所有的配置信息都會丟失。如果采用集群的方式,可以選擇至少配置一個節點為磁盤節點,其余部分配置為內存節點,這樣可以獲得更快的響應。所以本集群中配置節點一位磁盤節點,節點二和節點三位內存節點。
?此時我只是完成了簡單的集群,接下來我們還要配置鏡像隊列(類型主從復制),我們這里在終端中配置,其實也可以直接在控制面板中在admin中配置對應的策略。
#隨便進入一個容器 docker exec -it rabbit1 bash#設置策略匹配所有名稱的隊列都進行高可用配置,且實現自動同步。 rabbitmqctl set_policy -p / ha "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'#查詢策略 rabbitmqctl list_policies -p / #查看vhost下的所有的策略(policies )當主節點中的隊列down后從節點中的隊列就會被使用,且在主節點中的隊列恢復以后,其會變成從隊列來繼續使用。
1.策略名稱,我們命名為ha(高可用);
2.-p / 設置vhost,可以使用rabbitmqctl list_policies -p / 查看該vhost 下所有的策略(policies )。
3.隊列名稱的匹配規則,使用正則表達式表示;
4.為鏡像隊列的主體規則,是json字符串,分為三個屬性:ha-mode | ha-params | ha-sync-mode,分別的解釋如下:
????????ha-mode:鏡像模式,分類:all/exactly/nodes,all存儲在所有節點;exactly存儲x個節點,節點的個數由ha-params指定;nodes指定存儲的節點上名稱,通過ha-params指定;
????????ha-params:作為參數,為ha-mode的補充;
????????ha-sync-mode:鏡像消息同步方式:automatic(自動),manually(手動);
?消息隊列中間件
?由于使用不同的消息隊列,我們不能保證系統相同,為了注重邏輯,springCloud Stream它能夠屏蔽底層實現,我們使用統一的消息隊列操作方式就能操作多種不同類型的消息隊列。
它屏蔽了Rabbitmq底層操作,讓我們使用統一 的Input和Output形式。以Binder為中間件,這樣我們就算切換了不同的消息隊列,也無需修改代碼,而具體某種消息隊列的底層實現是交給Stream在做的。?
創建兩個模塊,一個是生產者一個是消費者。
?導入對應依賴。
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId><version>3.2.4</version></dependency>在生產者的配置文件中進行配置。
server:port: 8001 spring:cloud:stream:binders: #此處配置要綁定的rabbitmq服務的配置信息cloud-server: #綁定的名稱,自定義一個就行type: rabbit #消息主件類型,這里使用rabbit,所以這粒就填寫rabbitenvironment: #服務器相關信息,以為是自定義的名稱,所以下面會爆紅,不影響運行spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /bindings:test-out-0: #自定義的綁定名稱destination: test.exchange #目的地,就是交換機的名稱。如果不存在就會創建在生產者的controller層編寫發送消息的controller。
@RestController public class PublisherController {@ResourceStreamBridge streamBridge;public String publish() {//第一個次數其實就是Rabbitmq的交換機名稱//這個交換機的名稱有些規則//輸入: <名稱>-in-<index>//輸出: <名稱>-out-<index>//這里我們使用輸出方式,來將數據發送到消息隊列,注意這里的名稱會和之后的消費者Bean名稱進行對應streamBridge.send("test-out-0", "hello world");return "發送成功"+new Date();} }編寫消費者配置文件。
server:port: 8001 spring:cloud:stream:binders: #此處配置要綁定的rabbitmq服務的配置信息cloud-server: #綁定的名稱,自定義一個就行type: rabbit #消息主件類型,這里使用rabbit,所以這粒就填寫rabbitenvironment: #服務器相關信息,以為是自定義的名稱,所以下面會爆紅,不影響運行spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /bindings:test-in-0:destination: test.exchange創建一個類用于做消費者的配置。
@Component public class TestConsumer {@Bean("test")//這里的注入名要和剛剛生產者的綁定名稱中的名稱相同public Consumer<String> consumer() {return System.out::println;} }啟動測試。
?其自動幫我們創建了對應的消息隊列。
?消息發送以后,消費者去去消費了這個消息。
這樣我們就通過springCloud Stream屏蔽底層Rabbitmq來直接進行消息的操作了。??
總結
以上是生活随笔為你收集整理的springCloud 初探的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java泛型类的构造函数_Java泛型构
- 下一篇: Python中Pyinstaller使用