javascript
spring jpa 流式_从响应式Spring Data存储库流式传输实时更新
spring jpa 流式
這篇文章詳細(xì)介紹了從數(shù)據(jù)庫(kù)到對(duì)該數(shù)據(jù)感興趣的任何其他組件進(jìn)行流更新的幼稚實(shí)現(xiàn)。 更準(zhǔn)確地說(shuō),如何更改Spring Data R2DBC存儲(chǔ)庫(kù)以向相關(guān)訂閱者發(fā)出事件。
對(duì)R2DBC和Spring的一點(diǎn)背景知識(shí)將對(duì)這篇文章有所幫助。 我以前的著作《 使用 Microsoft SQL Server的 Spring Data R2DBC和Spring Data R2DBC進(jìn)行 異步RDBMS訪問(wèn)》在這方面應(yīng)該有所幫助。
如前所述,這將是一個(gè)幼稚的實(shí)現(xiàn)。 因此,代碼將不會(huì)花哨。
為此,我劫持了SimpleR2dbcRepository以創(chuàng)建一個(gè)存儲(chǔ)庫(kù)實(shí)現(xiàn),該存儲(chǔ)庫(kù)實(shí)現(xiàn)在每次保存新記錄時(shí)都會(huì)發(fā)出事件。 新事件將添加到DirectProcessor ,并發(fā)送到訂閱它的任何Publisher 。 看起來(lái)像:
class PersonRepository(entity: RelationalEntityInformation<Person, Int>,databaseClient: DatabaseClient,converter: R2dbcConverter,accessStrategy: ReactiveDataAccessStrategy ) : SimpleR2dbcRepository<Person, Int>(entity, databaseClient, converter, accessStrategy) {private val source: DirectProcessor<Person> = DirectProcessor.create<Person>()val events: Flux<Person> = sourceoverride fun <S : Person> save(objectToSave: S): Mono<S> {return super.save(objectToSave).doOnNext(source::onNext)} }來(lái)自SimpleR2dbcRepository唯一需要重寫(xiě)的函數(shù)是save ( saveAll委托來(lái)save )。 doOnNext添加到原始保存調(diào)用中,該調(diào)用通過(guò)調(diào)用onNext將新事件推送到source ( DirectorProcessor )。
source被強(qiáng)制轉(zhuǎn)換為Flux以防止來(lái)自存儲(chǔ)庫(kù)外部的類添加新事件。 從技術(shù)上講,他們?nèi)匀豢梢蕴砑邮录?#xff0c;但是他們需要自己進(jìn)行轉(zhuǎn)換。
您可能已經(jīng)注意到,存儲(chǔ)庫(kù)正在加載參數(shù)并將其傳遞到SimpleR2dbcRepository 。 存儲(chǔ)庫(kù)的一個(gè)實(shí)例需要手動(dòng)創(chuàng)建,因?yàn)樗哪承┮蕾図?xiàng)無(wú)法自動(dòng)注入:
@Configuration class RepositoryConfiguration {@Beanfun personRepository(databaseClient: DatabaseClient,dataAccessStrategy: ReactiveDataAccessStrategy): PersonRepository {val entity: RelationalPersistentEntity<Person> = dataAccessStrategy.converter.mappingContext.getRequiredPersistentEntity(Person::class.java) as RelationalPersistentEntity<Person>val relationEntityInformation: MappingRelationalEntityInformation<Person, Int> =MappingRelationalEntityInformation(entity, Int::class.java)return PersonRepository(relationEntityInformation,databaseClient,dataAccessStrategy.converter,dataAccessStrategy)} }至此,所有內(nèi)容都已設(shè)置好并可以使用。 以下是其工作的示例:
personRepository.events.doOnComplete { log.info("Events flux has closed") }.subscribe { log.info("From events stream - $it") } // insert people records over time MARVEL_CHARACTERS.toFlux().delayElements(Duration.of(1, SECONDS)).concatMap { personRepository.save(it) }.subscribe()哪個(gè)輸出:
29-08-2019 09:08:27.674 [reactor-tcp-nio-1] From events stream - Person(id=481, name=Spiderman, age=18) 29-08-2019 09:08:28.550 [reactor-tcp-nio-2] From events stream - Person(id=482, name=Ironman, age=48) 29-08-2019 09:08:29.555 [reactor-tcp-nio-3] From events stream - Person(id=483, name=Thor, age=1000) 29-08-2019 09:08:30.561 [reactor-tcp-nio-4] From events stream - Person(id=484, name=Hulk, age=49) 29-08-2019 09:08:31.568 [reactor-tcp-nio-5] From events stream - Person(id=485, name=Antman, age=49) 29-08-2019 09:08:32.571 [reactor-tcp-nio-6] From events stream - Person(id=486, name=Blackwidow, age=34) 29-08-2019 09:08:33.576 [reactor-tcp-nio-7] From events stream - Person(id=487, name=Starlord, age=38) 29-08-2019 09:08:34.581 [reactor-tcp-nio-8] From events stream - Person(id=488, name=Captain America, age=100) 29-08-2019 09:08:35.585 [reactor-tcp-nio-9] From events stream - Person(id=489, name=Warmachine, age=50) 29-08-2019 09:08:36.589 [reactor-tcp-nio-10] From events stream - Person(id=490, name=Wasp, age=26) 29-08-2019 09:08:37.596 [reactor-tcp-nio-11] From events stream - Person(id=491, name=Winter Soldier, age=101) 29-08-2019 09:08:38.597 [reactor-tcp-nio-12] From events stream - Person(id=492, name=Black Panther, age=42) 29-08-2019 09:08:39.604 [reactor-tcp-nio-1] From events stream - Person(id=493, name=Doctor Strange, age=42) 29-08-2019 09:08:40.609 [reactor-tcp-nio-2] From events stream - Person(id=494, name=Gamora, age=29) 29-08-2019 09:08:41.611 [reactor-tcp-nio-3] From events stream - Person(id=495, name=Groot, age=4) 29-08-2019 09:08:42.618 [reactor-tcp-nio-4] From events stream - Person(id=496, name=Hawkeye, age=47) 29-08-2019 09:08:43.620 [reactor-tcp-nio-5] From events stream - Person(id=497, name=Pepper Potts, age=44) 29-08-2019 09:08:44.627 [reactor-tcp-nio-6] From events stream - Person(id=498, name=Captain Marvel, age=59) 29-08-2019 09:08:45.631 [reactor-tcp-nio-7] From events stream - Person(id=499, name=Rocket Raccoon, age=30) 29-08-2019 09:08:46.637 [reactor-tcp-nio-8] From events stream - Person(id=500, name=Drax, age=49) 29-08-2019 09:08:47.639 [reactor-tcp-nio-9] From events stream - Person(id=501, name=Nebula, age=30)每秒保存一條記錄,該記錄與從存儲(chǔ)庫(kù)發(fā)出的事件相匹配。
請(qǐng)注意, doOnComplete事件永遠(yuǎn)不會(huì)觸發(fā)。 源永遠(yuǎn)不會(huì)關(guān)閉,因此永遠(yuǎn)不會(huì)向其任何訂戶發(fā)出完成事件。
至少在此基本實(shí)現(xiàn)中,這就是全部。 我敢肯定還有很多事情可以做,但是我首先需要弄清楚該怎么做……總而言之,加上一些補(bǔ)充,您可以將插入數(shù)據(jù)庫(kù)的數(shù)據(jù)流式傳輸?shù)綄?duì)記錄感興趣的組件被添加。
翻譯自: https://www.javacodegeeks.com/2019/09/streaming-live-updates-reactive-spring-data-repository.html
spring jpa 流式
總結(jié)
以上是生活随笔為你收集整理的spring jpa 流式_从响应式Spring Data存储库流式传输实时更新的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 柯洁简介及个人资料(九段围棋高手柯洁是最
- 下一篇: selenium并行_如何在不同的浏览器