Feign接口 多线程问题
Spring Cloud Feign傳輸Header,并保證多線程情況下也適用
一、現(xiàn)象
微服務(wù)在生產(chǎn)中,常遇到需要把 header 傳遞到下一子服務(wù)的情況(如服務(wù)A訪問服務(wù)B的接口,需要傳遞header),網(wǎng)上大多數(shù)的方案是實現(xiàn) RequestInterceptor 接口,在重寫方法中,把 header 填進 Feign 的請求中。我們先按這種方式,簡單實現(xiàn)代碼如下:
1、繼承RequestInterceptor
服務(wù)A新建類,繼承 RequestInterceptor,把 header 設(shè)置到請求中,注意 header 的key若是大寫時,請求中一般會被轉(zhuǎn)為小寫,所以建議header的key一般設(shè)置為小寫。
?
package com.he.feign.config;import feign.RequestInterceptor; import feign.RequestTemplate; import org.springframework.context.annotation.Configuration; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest; import java.util.Enumeration;/*** <b>@Desc</b>: ? 1、繼承RequestInterceptor,把header設(shè)置到請求中,注意header的key若是大寫時,請求中會被轉(zhuǎn)為小寫* <b>@Author</b>: hesh* <b>@Date</b>: ? 2020/6/21* <b>@Modify</b>:*/ @Configuration public class FeignConfig implements RequestInterceptor {@Overridepublic void apply(RequestTemplate requestTemplate) {ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();//當主線程的請求執(zhí)行完畢后,Servlet容器會被銷毀當前的Servlet,因此在這里需要做判空if (attributes != null) {HttpServletRequest request = attributes.getRequest();Enumeration<String> headerNames = request.getHeaderNames();while (headerNames.hasMoreElements()) {String name = headerNames.nextElement();//不能把所有消息頭都傳遞下去,否則會引起其他異常;header的name都是小寫if (name.equals("feignheader")) {requestTemplate.header(name,request.getHeader(name));}}}}}
2、修改 hystrix 的隔離策略為 semaphore
RequestContextHolder.getRequestAttributes()方法,實際上是從ThreadLocal變量中取得相應(yīng)信息的。hystrix斷路器的默認隔離策略為THREAD,該策略是無法取得ThreadLocal值的,所以需要修改hystrix的隔離策略,一般是改為[semaphore],在服務(wù)A中的 yml 新增配置如下#2、hystrix 的隔離策略改為 SEMAPHORE
?
3、客戶端A的測試代碼
3.1、服務(wù)A的controller接口
?
package com.he.feign.controller;import com.he.feign.feign.HeaderFeign; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException;/*** <b>@Desc</b>: ? 測試* <b>@Author</b>: hesh* <b>@Date</b>: ? 2020/6/21* <b>@Modify</b>:*/ @Slf4j @RequestMapping("/test_header") @RestController public class TestHeaderController {@Autowiredprivate HeaderFeign headerFeign;@Autowiredprivate HttpServletRequest servletRequest;//請求需要帶請求頭(key-value): feignheader-test@GetMapping("/main_thread")public String mainThread() {String resp = headerFeign.test();log.info("resp: {}", resp);return resp;}@GetMapping("/sub_thread")public void subThread() {new Thread(() -> {String resp = headerFeign.test();log.info("resp: {}", resp);}).start();}@GetMapping("/sub_thread/block")public String subThreadBlock() {//在主線程阻塞等待結(jié)果,由于請求仍有效沒執(zhí)行完畢,此時Servlet容器不會銷毀HttpServletRequest,//所以請求屬性還保存在請求鏈路中,能被傳遞下去CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());String resp = null;try {resp = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}log.info("resp: ", resp);return resp;} }
3.2、Feign類
feignclient的注解可以省略configuration配置,即configuration = FeignConfig.class可不聲明
?
?
?
4、服務(wù)端B的接口代碼
?
package com.he.eurekaclient.controller;import com.he.eurekaclient.feign.HelloFeign; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest; import java.util.Enumeration;/*** <b>@Desc</b>: ? 測試header傳遞* <b>@Author</b>: hesh* <b>@Date</b>: ? 2020/6/21* <b>@Modify</b>:*/ @RequestMapping("/header") @RestController public class HeaderController {@Value("${spring.application.name}")private String appName;@Autowiredprivate HttpServletRequest servletRequest;//請求需要帶請求頭(key-value): feignHeader-test@GetMapping("/test")public String test() {StringBuffer sb = new StringBuffer("hello from ").append(appName).append("\n");StringBuffer requestURL = servletRequest.getRequestURL();sb.append("requestURL: ").append(requestURL).append("\n");boolean isContain = false;sb.append("headers: \n");Enumeration<String> headerNames = servletRequest.getHeaderNames();//header的name都是小寫while (headerNames.hasMoreElements()){String headername = headerNames.nextElement();String headerValue = servletRequest.getHeader(headername);sb.append(headername).append("-").append(headerValue).append("\n");if (headername.equals("feignheader")) isContain = true;}if (!isContain) {sb.append("--error--").append("not contain required header!");}return sb.toString();} }
5、啟動服務(wù),在postman中測試如下
5.1、調(diào)用接口 http://localhost:8060/test_header/main_thread,結(jié)果如下
5.2、調(diào)用接口 http://localhost:8060/test_header/sub_thread ,結(jié)果如下
5.3、調(diào)用 http://localhost:8060/test_header/sub_thread/block,結(jié)果如下
從5.1 – 5.3的查詢結(jié)果,可以得到結(jié)論
經(jīng)過上述的配置后,用戶線程(主線程)中調(diào)用非feign請求,可把header傳遞到服務(wù)B中;
若在用戶線程(主線程)中啟動子線程,并在子線程中調(diào)用feign請求,header傳遞不到服務(wù)B中;
即是子線程最終異步轉(zhuǎn)同步阻塞等待結(jié)果,header仍傳遞不到服務(wù)B中。
二、網(wǎng)絡(luò)上大多數(shù)的解決方案
出現(xiàn)上面的原因, 主要是 RequestAttributes 默認不是線程共享的;主線程調(diào)用子線程時,沒把 RequestAttributes 共享給子線程。因此,只要在主線程調(diào)用其他線程前將RequestAttributes對象設(shè)置為子線程共享,就能把header等信息傳遞下去。
1、因此,網(wǎng)絡(luò)上大多數(shù)的解決方案如下,在主線程調(diào)用子線程前,增加下面配置
RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//請求屬性可繼承,線程共享
1
修改后的代碼如下
?
2、重新啟動服務(wù)A,再次調(diào)用兩個帶子線程的接口,現(xiàn)象如下
調(diào)用 http://localhost:8060/test_header/sub_thread/block,結(jié)果如下
調(diào)用接口 http://localhost:8060/test_header/sub_thread ,結(jié)果如下
測試結(jié)果,有以下兩種現(xiàn)象
在主線程get()阻塞等待子線程執(zhí)行完畢時,每次請求都成功;
主線程直接啟動子線程,且執(zhí)行完自己邏輯后便結(jié)束不需理會子線程結(jié)果的,請求偶爾成功, 偶爾失敗;
這是為什么呢,作者認為主要是以下原因
Servlet容器中Servlet屬性生命周期與接收請求的用戶線程(父線程)同步, 隨著父線程執(zhí)行完destroy()而銷毀;
子線程雖然可以從父線程共享信息中獲得了請求屬性,但這個屬性由父線程維護
當父線程比子線程執(zhí)行完慢時,請求屬性還在,子線程請求成功;當快時,請求屬性隨著父線程結(jié)束而銷毀,子線程的請求屬性變?yōu)閚ull,請求失敗。
由此可見,簡單的設(shè)置 RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);
在多線程情況下, 并非是一勞永逸的。
三、作者的解決方案
針對上面的問題,及問題根本原因,我們團隊的解決方案仍是使用 ThreadLocal,進行線程間的變量共享通信。
1、新建 ThreadLocalUtil
package com.he.feign.thread;import java.util.HashMap; import java.util.Map;/*** <b>@Desc</b>: ? 線程共享* <b>@Author</b>: hesh* <b>@Date</b>: ? 2020/6/22* <b>@Modify</b>:*/ public class ThreadLocalUtil {//使用InheritableThreadLocal,使得共享變量可被子線程繼承private static final InheritableThreadLocal<Map<String,String>> headerMap = new InheritableThreadLocal<Map<String, String>>(){@Overrideprotected Map<String, String> initialValue() {return new HashMap<>();}};public static Map<String,String> get(){return headerMap.get();}public static String get(String key) {return headerMap.get().get(key);}public static void set(String key, String value){headerMap.get().put(key,value);} }?
2、修改服務(wù)A 的接口 TestHeaderController
3、修改服務(wù)A的 FeignConfig
?
4、重啟服務(wù)A,測試結(jié)果如下
4.1、連續(xù)調(diào)用 http://localhost:8060/test_header/sub_thread 接口,日志打印如下
結(jié)合執(zhí)行日志可知,header信息通過feign成功傳遞到下一個服務(wù),而且不再出現(xiàn)偶爾失敗的情況!
4.2、連續(xù)調(diào)用接口 http://localhost:8060/test_header/sub_thread/block
綜上可見,真正解決從網(wǎng)關(guān)或者上層鏈路,把header經(jīng)過feign傳遞到另一個服務(wù),既要配置feign,也需要結(jié)合threadlocal。
下一步的優(yōu)化,可設(shè)置攔截器或者切面,把header信息統(tǒng)一設(shè)置到threadlocal中。
package com.he.feign.config;import com.he.feign.thread.ThreadLocalUtil; import org.springframework.web.servlet.HandlerInterceptor;import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.util.Enumeration; import java.util.Objects;/*** <b>@Desc</b>: ? 攔截器* <b>@Author</b>: hesh* <b>@Date</b>: ? 2020/6/22* <b>@Modify</b>:*/ public class MyInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//攔截請求,設(shè)置header到ThreadLocal中Enumeration<String> headerNames = request.getHeaderNames();while (headerNames.hasMoreElements()){String name = headerNames.nextElement();if (Objects.equals(name,"feignheader")){ThreadLocalUtil.set(name,request.getHeader(name));}}return true;}}? package com.he.feign.config;import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;/*** <b>@Desc</b>: ? web配置* <b>@Author</b>: hesh* <b>@Date</b>: ? 2020/6/25* <b>@Modify</b>:*/ @Configuration public class WebConfig extends WebMvcConfigurationSupport {@Overrideprotected void addInterceptors(InterceptorRegistry registry) {//添加自定義的攔截器registry.addInterceptor(new MyInterceptor()).addPathPatterns("/**");} }?
TestHeaderController修改如下
以上,便是作者針對spring cloud feign 傳遞 header 信息在多線程情況下失敗問題的解決方式,若有錯誤請指正,歡迎交流指導(dǎo)。
————————————————
版權(quán)聲明:本文為CSDN博主「HE-RUNNING」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。
?
以上是剛開始使用的文章,但是隨著使用線程池,就出現(xiàn)了問題,由于線程池是把線程回收,不是新建,就出現(xiàn)了在變量傳遞的時候,下次取到線程是從上一次父線程提供的共享變量導(dǎo)致了變量錯亂問題。經(jīng)過研究 阿里的解決方案出現(xiàn)在眼前
?
加入以下pom依賴:
<dependency><groupId>com.alibaba</groupId><artifactId>transmittable-thread-local</artifactId><version>2.2.0</version> </dependency>
轉(zhuǎn)載改造hystrix線程池方法:
改造線程池方式
上面介紹了改造線程的方式,并且通過建一個同樣的Java類來覆蓋Jar包中的實現(xiàn),感覺有點投機取巧,其實不用這么麻煩,Hystrix默認提供了HystrixPlugins類,可以讓用戶自定義線程池,下面來看看怎么使用:
在啟動之前調(diào)用進行注冊自定義實現(xiàn)的邏輯:
HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());ThreadLocalHystrixConcurrencyStrategy就是我們自定義的創(chuàng)建線程池的類,需要繼承HystrixConcurrencyStrategy,前面也有講到通過調(diào)試代碼發(fā)現(xiàn)最終獲取線程池的代碼就在HystrixConcurrencyStrategy中。
我們只需要重寫getThreadPool方法即可完成對線程池的改造,由于TtlExecutors只能修飾ExecutorService和Executor,而HystrixConcurrencyStrategy中返回的是ThreadPoolExecutor,我們需要對ThreadPoolExecutor進行包裝一層,最終在execute方法中對線程修飾,也就相當于改造了線程池。
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import com.netflix.hystrix.HystrixThreadPoolKey; import com.netflix.hystrix.HystrixThreadPoolProperties; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; import com.netflix.hystrix.strategy.properties.HystrixProperty; import com.netflix.hystrix.util.PlatformSpecific;public class ThreadLocalHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {private final static Logger logger = LoggerFactory.getLogger(ThreadLocalHystrixConcurrencyStrategy.class);@Overridepublic ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);final int dynamicCoreSize = corePoolSize.get();final int dynamicMaximumSize = maximumPoolSize.get();if (dynamicCoreSize > dynamicMaximumSize) {logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name()+ " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize+ ". Maximum size will be set to " + dynamicCoreSize+ ", the coreSize value, since it must be equal to or greater than the coreSize value");return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit,workQueue, threadFactory);} else {return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit,workQueue, threadFactory);}}@Overridepublic ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,HystrixThreadPoolProperties threadPoolProperties) {final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();final int dynamicCoreSize = threadPoolProperties.coreSize().get();final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();final int maxQueueSize = threadPoolProperties.maxQueueSize().get();final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);if (allowMaximumSizeToDivergeFromCoreSize) {final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();if (dynamicCoreSize > dynamicMaximumSize) {logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name()+ " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize+ ". Maximum size will be set to " + dynamicCoreSize+ ", the coreSize value, since it must be equal to or greater than the coreSize value");return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime,TimeUnit.MINUTES, workQueue, threadFactory);} else {return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime,TimeUnit.MINUTES, workQueue, threadFactory);}} else {return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES,workQueue, threadFactory);}}private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {if (!PlatformSpecific.isAppEngineStandardEnvironment()) {return new ThreadFactory() {private final AtomicInteger threadNumber = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r,"hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());thread.setDaemon(true);return thread;}};} else {return PlatformSpecific.getAppEngineThreadFactory();}} }ThreadLocalThreadPoolExecutor的代碼:
import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;import com.alibaba.ttl.TransmittableThreadLocal; import com.alibaba.ttl.TtlRunnable;public class ThreadLocalThreadPoolExecutor extends ThreadPoolExecutor {private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();public static TransmittableThreadLocal<Long> THREAD_LOCAL = new TransmittableThreadLocal<Long>();public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);}public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);}public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(maximumPoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}@Overridepublic void execute(Runnable command) {super.execute(TtlRunnable.get(command));} }啟動時加入插件
HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());?使用方法:調(diào)用feign client服務(wù)之前,設(shè)置線程變量
ThreadLocalThreadPoolExecutor.THREAD_LOCAL.set(10086L);?在FeignAuthConfiguration里,調(diào)用appTokenHolder.get();之前加入設(shè)置租戶id
Long tenantId = ThreadLocalThreadPoolExecutor.THREAD_LOCAL.get(); DefaultAppTokenHolder.TENANT_FOR_NO_SESSION.set(tenantId);
使用線程變量三種方式測試:
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;import com.alibaba.ttl.TransmittableThreadLocal; import com.alibaba.ttl.TtlRunnable;public class Test {public static void main(String[] args) throws InterruptedException, ExecutionException { // testThreadLocal1();// testThreadLocal2();testThreadLocal3();}private static void testThreadLocal1() throws InterruptedException, ExecutionException {final ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>();ExecutorService executorService = Executors.newFixedThreadPool(1);for (int i = 0; i < 20; i++) {local.set(i + "");System.out.println(local.get());Future<?> future = executorService.submit(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ":" + local.get());local.set(null);}});future.get();System.out.println(local.get());local.set(null);}}private static void testThreadLocal2() throws InterruptedException, ExecutionException {ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>();ExecutorService executorService = Executors.newFixedThreadPool(1);for (int i = 0; i < 20; i++) {local.set(i + "");System.out.println(local.get());Future<?> future = executorService.submit(new ParamRunnable(i + ""));future.get();System.out.println(local.get());local.set(null);}}private static void testThreadLocal3() throws InterruptedException, ExecutionException {final TransmittableThreadLocal<String> context = new TransmittableThreadLocal<String>();ExecutorService executorService = Executors.newFixedThreadPool(1);for (int i = 0; i < 20; i++) {context.set(i + "");System.out.println(context.get());Future<?> future = executorService.submit(TtlRunnable.get(new Runnable() {public void run() {System.out.println(Thread.currentThread().getName() + ":" + context.get());context.set(null);}}));future.get();System.out.println(context.get());context.set(null);}}private static class ParamRunnable implements Runnable {private String param;public ParamRunnable(String param) {this.param = param;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ":" + param);}}}?
原文鏈接:https://blog.csdn.net/weishaoqi2/article/details/106964787
總結(jié)
以上是生活随笔為你收集整理的Feign接口 多线程问题的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kafka logstash elk
- 下一篇: logstash windows