av激情亚洲男人的天堂国语,日韩欧美精品一中文字幕,无码av一区二区三区无码,国产又色又爽又刺激的a片,国产又色又爽又刺激的a片

用這四招優(yōu)雅地實現SpringBoot異步線程間數據傳遞

Spring Boot 自定義線程池實現異步開發(fā)相信看過陳某的文章都了解,但是在實際開發(fā)中需要在父子線程之間傳遞一些數據,比如用戶信息,鏈路信息等等

網站建設哪家好,找創(chuàng)新互聯公司!專注于網頁設計、網站建設、微信開發(fā)、小程序制作、集團企業(yè)網站建設等服務項目。為回饋新老客戶創(chuàng)新互聯還提供了清徐免費建站歡迎大家使用!

比如用戶登錄信息使用ThreadLocal存放保證線程隔離,代碼如下:

/**
* @author 公眾號:碼猿技術專欄
* @description 用戶上下文信息
*/
public class OauthContext {
private static final ThreadLocal loginValThreadLocal=new ThreadLocal<>();

public static LoginVal get(){
return loginValThreadLocal.get();
}
public static void set(LoginVal loginVal){
loginValThreadLocal.set(loginVal);
}
public static void clear(){
loginValThreadLocal.remove();
}
}

那么子線程想要獲取這個LoginVal如何做呢?

今天就來介紹幾種優(yōu)雅的方式實現Spring Boot 內部的父子線程的數據傳遞。

1. 手動設置

每執(zhí)行一次異步線程都要分為兩步:

  • 獲取父線程的LoginVal
  • 將LoginVal設置到子線程,達到復用

代碼如下:

public void handlerAsync() {
//1. 獲取父線程的loginVal
LoginVal loginVal = OauthContext.get();
log.info("父線程的值:{}",OauthContext.get());
CompletableFuture.runAsync(()->{
//2. 設置子線程的值,復用
OauthContext.set(loginVal);
log.info("子線程的值:{}",OauthContext.get());
});
}

雖然能夠實現目的,但是每次開異步線程都需要手動設置,重復代碼太多,看了頭疼,你認為優(yōu)雅嗎?

2. 線程池設置TaskDecorator

TaskDecorator是什么?官方api的大致意思:這是一個執(zhí)行回調方法的裝飾器,主要應用于傳遞上下文,或者提供任務的監(jiān)控/統計信息。

知道有這么一個東西,如何去使用?

TaskDecorator是一個接口,首先需要去實現它,代碼如下:

/**
* @author 公眾號:碼猿技術專欄
* @description 上下文裝飾器
*/
public class ContextTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
//獲取父線程的loginVal
LoginVal loginVal = OauthContext.get();
return () -> {
try {
// 將主線程的請求信息,設置到子線程中
OauthContext.set(loginVal);
// 執(zhí)行子線程,這一步不要忘了
runnable.run();
} finally {
// 線程結束,清空這些信息,否則可能造成內存泄漏
OauthContext.clear();
}
};
}
}

這里我只是設置了LoginVal,實際開發(fā)中其他的共享數據,比如SecurityContext,RequestAttributes....

TaskDecorator需要結合線程池使用,實際開發(fā)中異步線程建議使用線程池,只需要在對應的線程池配置一下,代碼如下:

@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
poolTaskExecutor.setCorePoolSize(xx);
poolTaskExecutor.setMaxPoolSize(xx);
// 設置線程活躍時間(秒)
poolTaskExecutor.setKeepAliveSeconds(xx);
// 設置隊列容量
poolTaskExecutor.setQueueCapacity(xx);
//設置TaskDecorator,用于解決父子線程間的數據復用
poolTaskExecutor.setTaskDecorator(new ContextTaskDecorator());
poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任務結束后再關閉線程池
poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
return poolTaskExecutor;
}

此時業(yè)務代碼就不需要去設置子線程的值,直接使用即可,代碼如下:

public void handlerAsync() {
log.info("父線程的用戶信息:{}", OauthContext.get());
//執(zhí)行異步任務,需要指定的線程池
CompletableFuture.runAsync(()-> log.info("子線程的用戶信息:{}", OauthContext.get()),taskExecutor);
}

來看一下結果,如下圖:

這里使用的是CompletableFuture執(zhí)行異步任務,使用@Async這個注解同樣是可行的。

注意:無論使用何種方式,都需要指定線程池

3. InheritableThreadLocal

這種方案不建議使用,InheritableThreadLocal雖然能夠實現父子線程間的復用,但是在線程池中使用會存在復用的問題。

這種方案使用也是非常簡單,直接用InheritableThreadLocal替換ThreadLocal即可,代碼如下:

/**
* @author 公眾號:碼猿技術專欄
* @description 用戶上下文信息
*/
public class OauthContext {
private static final InheritableThreadLocal loginValThreadLocal=new InheritableThreadLocal<>();

public static LoginVal get(){
return loginValThreadLocal.get();
}
public static void set(LoginVal loginVal){
loginValThreadLocal.set(loginVal);
}
public static void clear(){
loginValThreadLocal.remove();
}
}

4. TransmittableThreadLocal

TransmittableThreadLocal是阿里開源的工具,彌補了InheritableThreadLocal的缺陷,在使用線程池等會池化復用線程的執(zhí)行組件情況下,提供??ThreadLocal??值的傳遞功能,解決異步執(zhí)行時上下文傳遞的問題。

使用起來也是非常簡單,添加依賴如下:


com.alibaba
transmittable-thread-local
2.14.2

OauthContext改造代碼如下:

/**
* @author 公眾號:碼猿技術專欄
* @description 用戶上下文信息
*/
public class OauthContext {
private static final TransmittableThreadLocal loginValThreadLocal=new TransmittableThreadLocal<>();

public static LoginVal get(){
return loginValThreadLocal.get();
}
public static void set(LoginVal loginVal){
loginValThreadLocal.set(loginVal);
}
public static void clear(){
loginValThreadLocal.remove();
}
}

TransmittableThreadLocal原理

從定義來看,TransimittableThreadLocal繼承于InheritableThreadLocal,并實現TtlCopier接口,它里面只有一個copy方法。所以主要是對InheritableThreadLocal的擴展。

public class TransmittableThreadLocal extends InheritableThreadLocal implements TtlCopier

在TransimittableThreadLocal中添加holder屬性。這個屬性的作用就是被標記為具備線程傳遞資格的對象都會被添加到這個對象中。

要標記一個類,比較容易想到的方式,就是給這個類新增一個Type字段,還有一個方法就是將具備這種類型的的對象都添加到一個靜態(tài)全局集合中。之后使用時,這個集合里的所有值都具備這個標記。

// 1. holder本身是一個InheritableThreadLocal對象
// 2. 這個holder對象的value是WeakHashMap, ?>
// 2.1 WeekHashMap的value總是null,且不可能被使用。
// 2.2 WeekHasshMap支持value=null
private static InheritableThreadLocal, ?>> holder = new InheritableThreadLocal, ?>>() {
@Override
protected WeakHashMap, ?> initialValue() {
return new WeakHashMap, Object>();
}

/**
* 重寫了childValue方法,實現上直接將父線程的屬性作為子線程的本地變量對象。
*/
@Override
protected WeakHashMap, ?> childValue(WeakHashMap, ?> parentValue) {
return new WeakHashMap, Object>(parentValue);
}
};

應用代碼是通過TtlExecutors工具類對線程池對象進行包裝。工具類只是簡單的判斷,輸入的線程池是否已經被包裝過、非空校驗等,然后返回包裝類ExecutorServiceTtlWrapper。根據不同的線程池類型,有不同和的包裝類。

@Nullable
public static ExecutorService getTtlExecutorService(@Nullable ExecutorService executorService) {
if (TtlAgent.isTtlAgentLoaded() || executorService == null || executorService instanceof TtlEnhanced) {
return executorService;
}
return new ExecutorServiceTtlWrapper(executorService);
}

進入包裝類ExecutorServiceTtlWrapper。可以注意到不論是通過ExecutorServiceTtlWrapper#submit方法或者是ExecutorTtlWrapper#execute方法,都會將線程對象包裝成TtlCallable或者TtlRunnable,用于在真正執(zhí)行run方法前做一些業(yè)務邏輯。

/**
* 在ExecutorServiceTtlWrapper實現submit方法
*/
@NonNull
@Override
public Future submit(@NonNull Callable task) {
return executorService.submit(TtlCallable.get(task));
}

/**
* 在ExecutorTtlWrapper實現execute方法
*/
@Override
public void execute(@NonNull Runnable command) {
executor.execute(TtlRunnable.get(command));
}

所以,重點的核心邏輯應該是在TtlCallable#call()或者TtlRunnable#run()中。以下以TtlCallable為例,TtlRunnable同理類似。在分析call()方法之前,先看一個類Transmitter。

public static class Transmitter {
/**
* 捕獲當前線程中的是所有TransimittableThreadLocal和注冊ThreadLocal的值。
*/
@NonNull
public static Object capture() {
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}

/**
* 捕獲TransimittableThreadLocal的值,將holder中的所有值都添加到HashMap后返回。
*/
private static HashMap, Object> captureTtlValues() {
HashMap, Object> ttl2Value =
new HashMap, Object>();
for (TransmittableThreadLocal threadLocal : holder.get().keySet()) {
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
return ttl2Value;
}

/**
* 捕獲注冊的ThreadLocal的值,也就是原本線程中的ThreadLocal,可以注冊到TTL中,在
* 進行線程池本地變量傳遞時也會被傳遞。
*/
private static HashMap, Object> captureThreadLocalValues() {
final HashMap, Object> threadLocal2Value =
new HashMap, Object>();
for(Map.Entry,TtlCopier>entry:threadLocalHolder.entrySet()){
final ThreadLocal threadLocal = entry.getKey();
final TtlCopier copier = entry.getValue();
threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
}
return threadLocal2Value;
}

/**
* 將捕獲到的本地變量進行替換子線程的本地變量,并且返回子線程現有的本地變量副本backup。
* 用于在執(zhí)行run/call方法之后,將本地變量副本恢復。
*/
@NonNull
public static Object replay(@NonNull Object captured) {
final Snapshot capturedSnapshot = (Snapshot) captured;
return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value),
replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}

/**
* 替換TransmittableThreadLocal
*/
@NonNull
private static HashMap, Object> replayTtlValues(@NonNull HashMap, Object> captured) {
// 創(chuàng)建副本backup
HashMap, Object> backup =
new HashMap, Object>();

for (final Iterator> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal threadLocal = iterator.next();
// 對當前線程的本地變量進行副本拷貝
backup.put(threadLocal, threadLocal.get());

// 若出現調用線程中不存在某個線程變量,而線程池中線程有,則刪除線程池中對應的本地變量
if (!captured.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 將捕獲的TTL值打入線程池獲取到的線程TTL中。
setTtlValuesTo(captured);
// 是一個擴展點,調用TTL的beforeExecute方法。默認實現為空
doExecuteCallback(true);
return backup;
}

private static HashMap, Object> replayThreadLocalValues(@NonNull HashMap, Object> captured) {
final HashMap, Object> backup =
new HashMap, Object>();
for (Map.Entry, Object> entry : captured.entrySet()) {
final ThreadLocal threadLocal = entry.getKey();
backup.put(threadLocal, threadLocal.get());
final Object value = entry.getValue();
if (value == threadLocalClearMark) threadLocal.remove();
else threadLocal.set(value);
}
return backup;
}

/**
* 清除單線線程的所有TTL和TL,并返回清除之氣的backup
*/
@NonNull
public static Object clear() {
final HashMap, Object> ttl2Value =
new HashMap, Object>();

final HashMap, Object> threadLocal2Value =
new HashMap, Object>();
for(Map.Entry,TtlCopier>entry:threadLocalHolder.entrySet()){
final ThreadLocal threadLocal = entry.getKey();
threadLocal2Value.put(threadLocal, threadLocalClearMark);
}
return replay(new Snapshot(ttl2Value, threadLocal2Value));
}

/**
* 還原
*/
public static void restore(@NonNull Object backup) {
final Snapshot backupSnapshot = (Snapshot) backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}

private static void restoreTtlValues(@NonNull HashMap, Object> backup) {
// 擴展點,調用TTL的afterExecute
doExecuteCallback(false);

for (final Iterator> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal threadLocal = iterator.next();

if (!backup.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}

// 將本地變量恢復成備份版本
setTtlValuesTo(backup);
}

private static void setTtlValuesTo(@NonNull HashMap, Object> ttlValues) {
for (Map.Entry, Object> entry : ttlValues.entrySet()) {
TransmittableThreadLocal threadLocal = entry.getKey();
threadLocal.set(entry.getValue());
}
}

private static void restoreThreadLocalValues(@NonNull HashMap, Object> backup) {
for (Map.Entry, Object> entry : backup.entrySet()) {
final ThreadLocal threadLocal = entry.getKey();
threadLocal.set(entry.getValue());
}
}

/**
* 快照類,保存TTL和TL
*/
private static class Snapshot {
final HashMap, Object> ttl2Value;
final HashMap, Object> threadLocal2Value;

private Snapshot(HashMap, Object> ttl2Value,
HashMap, Object> threadLocal2Value) {
this.ttl2Value = ttl2Value;
this.threadLocal2Value = threadLocal2Value;
}
}

進入??TtlCallable#call()??方法。

@Override
public V call() throws Exception {
Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterCall &&
!capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after call!");
}
// 調用replay方法將捕獲到的當前線程的本地變量,傳遞給線程池線程的本地變量,
// 并且獲取到線程池線程覆蓋之前的本地變量副本。
Object backup = replay(captured);
try {
// 線程方法調用
return callable.call();
} finally {
// 使用副本進行恢復。
restore(backup);
}
}

到這基本上線程池方式傳遞本地變量的核心代碼已經大概看完了。總的來說在創(chuàng)建TtlCallable對象是,調用capture()方法捕獲調用方的本地線程變量,在call()執(zhí)行時,將捕獲到的線程變量,替換到線程池所對應獲取到的線程的本地變量中,并且在執(zhí)行完成之后,將其本地變量恢復到調用之前。

總結

上述列舉了4種方案,陳某這里推薦方案2和方案4,其中兩種方案的缺點非常明顯,實際開發(fā)中也是采用的方案2或者方案4。


網站標題:用這四招優(yōu)雅地實現SpringBoot異步線程間數據傳遞
當前網址:http://uogjgqi.cn/article/dpdgppo.html
掃二維碼與項目經理溝通

我們在微信上24小時期待你的聲音

解答本文疑問/技術咨詢/運營咨詢/技術建議/互聯網交流