javascript
使用Spring Boot和Project Reactor处理SQS消息
我最近參與了一個項目,在該項目中,我不得不有效地處理通過AWS SQS Queue流入的大量消息。 在這篇文章(可能還有一篇)中,我將介紹使用出色的Project Reactor處理消息的方法。
以下是我要進行的設(shè)置:
設(shè)置本地AWS環(huán)境
在我進入代碼之前,讓我先做一些準備。 首先,如何獲得SNS和SQS的本地版本。 最簡單的方法之一是使用localstack 。 我使用這里描述的docker-compose版本
我將使用的第二個實用程序是AWS CLI。 該網(wǎng)站包含有關(guān)如何在本地安裝的詳細信息。
一旦這兩個實用程序都到位,快速測試應(yīng)驗證設(shè)置:
# Create a queue aws --endpoint http: //localhost:4576 sqs create-queue --queue-name test-queue # Send a sample message aws --endpoint http: //localhost:4576 sqs send-message --queue-url http://localhost:4576/queue/test-queue --message-body "Hello world" # Receive the message aws --endpoint http: //localhost:4576 sqs receive-message --queue-url http://localhost:4576/queue/test-queue項目反應(yīng)堆的基礎(chǔ)
Project Reactor實現(xiàn)了Reactive Streams規(guī)范,并提供了一種跨異步邊界處理數(shù)據(jù)流的方法,該方法尊重背壓。 這里有很多詞,但本質(zhì)上是這樣想的:
1. SQS產(chǎn)生數(shù)據(jù) 2.應(yīng)用程序?qū)⑹褂盟⑵渥鳛閿?shù)據(jù)流進行處理 3.應(yīng)用程序應(yīng)以可持續(xù)的速度使用數(shù)據(jù)–不應(yīng)輸入太多數(shù)據(jù)。這正式稱為 “背壓”
AWS開發(fā)工具包2
我將用于消耗AWS SQS數(shù)據(jù)的庫是
AWS開發(fā)工具包2 。 該庫在幕后使用了非阻塞IO。
該庫提供了撥打電話的同步版本以及異步版本。 考慮從SQS隊列中獲取記錄的同步方式:
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest import software.amazon.awssdk.services.sqs.SqsClient val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()在這里,“ software.amazon.awssdk.services.sqs.SqsClient”用于查詢sqs和同步檢索一批結(jié)果。 另一方面,異步結(jié)果如下所示:
val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: CompletableFuture<List<Message>> = sqsAsyncClient .receiveMessage(receiveMessageRequest) .thenApply { result -> result.messages() }現(xiàn)在,輸出為“ CompletableFuture”
無限循環(huán),無背壓
我最初創(chuàng)建消息流( Flux )的嘗試非常簡單–一個無限循環(huán),它輪詢AWS sqs并使用“ Flux.create”運算符從中創(chuàng)建Flux ,方法是:
fun listen(): Flux<Pair<String, () -> Unit>> { return Flux.create { sink: FluxSink<List<Message>> -> while (running) { try { val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() LOGGER.info( "Received: $messages" ) sink.next(messages) } catch (e: InterruptedException) { LOGGER.error(e.message, e) } catch (e: Exception) { LOGGER.error(e.message, e) } } } .flatMapIterable(Function.identity()) .doOnError { t: Throwable -> LOGGER.error(t.message, t) } .retry() .map { snsMessage: Message -> val snsMessageBody: String = snsMessage.body() val snsNotification: SnsNotification = readSnsNotification(snsMessageBody) snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) } } }它的工作方式是存在一個無限循環(huán),該循環(huán)使用long-polling檢查新消息。 消息可能并非在每次輪詢時都可用,在這種情況下,會將空列表添加到流中。
然后,使用“ flatMapIterable”運算符將此列表中的最多5條消息映射到單個消息流,并通過從SNS包裝器中提取消息來進一步映射(當消息從SNS轉(zhuǎn)發(fā)到SQS時,SNS將包裝器添加到消息),并在消息成功處理后刪除消息的方法(deleteHandle)作為對返回。
這種方法可以很好地工作……但是,請想象一下有大量消息進入的情況,因為循環(huán)并沒有真正意識到下游的吞吐量,它將繼續(xù)將數(shù)據(jù)泵送到流中。 中間操作員的默認行為是根據(jù)最終使用者使用數(shù)據(jù)的方式來緩沖流入的數(shù)據(jù)。 由于此緩沖區(qū)是無界的,因此系統(tǒng)可能會達到不可持續(xù)的狀態(tài)。
背壓感知流
解決方法是使用其他運算符生成數(shù)據(jù)流–
助焊劑
使用此運算符的代碼如下所示:
這種工作方式是重復(fù)調(diào)用傳遞給“ Flux.generate”運算符的塊–與while循環(huán)類似,在每個循環(huán)中,期望將一項添加到流中。 在這種情況下,添加到流中的項目恰好是一個列表,該列表像以前一樣分解為單獨的消息。
背壓在這種情況下如何工作–
因此,請再次考慮下游使用者處理速度比生成端慢的情況。 在這種情況下,Flux本身將以調(diào)用generate運算符的速率減慢速度,因此要考慮下游系統(tǒng)的吞吐量。
結(jié)論
這應(yīng)該建立一個良好的管道來處理來自SQS的消息,對此有更多細微差別,可以稍后在流中并行處理消息,我將在以后的文章中介紹。
這個例子的代碼庫可以在我的github倉庫中找到
在這里 – https://github.com/bijukunjummen/boot-with-sns-sqs。 該代碼具有完整的管道,其中包括處理消息并在處理后將其刪除。
翻譯自: https://www.javacodegeeks.com/2020/03/processing-sqs-messages-using-spring-boot-and-project-reactor.html
總結(jié)
以上是生活随笔為你收集整理的使用Spring Boot和Project Reactor处理SQS消息的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mac修改电脑硬盘格式(mac如何修改硬
- 下一篇: 应急备案凭证编号查询(应急备案凭证)