jax-rs jax-ws_迟来总比没有好:SSE或服务器发送的事件现在已在JAX-RS中
jax-rs jax-ws
服務器發送的事件 (或簡稱為SSE )是非常有用的協議,它允許服務器通過HTTP將數據推送到客戶端。 這是我們的Web瀏覽器支持的年齡,但是令人驚訝的是, JAX-RS規范在很長一段時間內都忽略了這一點。 盡管Jersey提供了適用于SSE媒體類型的擴展名,但該API尚未正式化,因此不能移植到其他JAX-RS實現中。
幸運的是, JAX-RS 2.1 (也稱為JSR-370)通過將SSE支持(客戶端和服務器端)作為正式規范的一部分,已經改變了這一點。 在今天的帖子中,我們將研究如何使用最近發布的出色的Apache CXF框架3.2.0版將SSE支持集成到現有的Java REST(ful) Web服務中。 實際上,除了自舉之外,實際上沒有CXF特定的東西,所有示例都應在實現JAX-RS 2.1規范的任何其他框架中工作。
事不宜遲,讓我們開始吧。 由于這些天大量的Java項目都建立在出色的Spring Framework之上,因此我們的示例應用程序將使用Spring Boot和Apache CXF Spring Boot Integration使我們Swift起步。 老好伙伴Apache Maven也可以通過管理項目依賴項來幫助我們。
org.springframework.bootspring-boot-starter1.5.8.RELEASEorg.apache.cxfcxf-rt-frontend-jaxrs3.2.0org.apache.cxfcxf-spring-boot-starter-jaxrs3.2.0org.apache.cxfcxf-rt-rs-client3.2.0org.apache.cxfcxf-rt-rs-sse3.2.0在后臺, Apache CXF正在使用Atmosphere框架來實現SSE傳輸,因此這是我們必須包含的另一個依賴項。
org.atmosphereatmosphere-runtime2.4.14有關依賴于Atmosphere框架的詳細信息導致需要提供其他配置設置,即transportId ,以便確保在運行時拾取支持SSE的傳輸。 相關詳細信息可以添加到application.yml文件中:
cxf:servlet:init:transportId: http://cxf.apache.org/transports/http/sse太好了,所以基礎就在那里,繼續前進。 我們將要構建的REST(ful) Web服務將在SSE流中公開虛構的CPU平均負載(為簡單起見,隨機生成)。 Stats類將構成我們的數據模型。
public class Stats {private long timestamp;private int load;public Stats() {}public Stats(long timestamp, int load) {this.timestamp = timestamp;this.load = load;}// Getters and setters are omitted... }說到流, Reactive Streams規范已進入Java 9 ,希望我們能看到Java社區加速采用React式編程模型。 此外,在具有Reactive Streams支持的情況下,開發支持SSE的 REST(ful) Web服務將變得更加容易和直接。 為此,讓我們將RxJava 2集成到示例應用程序中。
io.reactivex.rxjava2rxjava2.1.6這是開始使用我們的StatsRestService類(典型的JAX-RS資源實現)的好時機。 JAX-RS 2.1中的關鍵SSE功能以Sse上下文對象為中心,可以像這樣注入。
@Service @Path("/api/stats") public class StatsRestService {@Context public void setSse(Sse sse) {// Access Sse context here}在Sse上下文之外,我們可以訪問兩個非常有用的抽象: SseBroadcaster和OutboundSseEvent.Builder ,例如:
private SseBroadcaster broadcaster; private Builder builder;@Context public void setSse(Sse sse) {this.broadcaster = sse.newBroadcaster();this.builder = sse.newEventBuilder(); }您可能已經猜到了, OutboundSseEvent.Builder構造了OutboundSseEvent類的實例,這些實例可以通過電線發送,而SseBroadcaster則向所有連接的客戶端廣播相同的SSE流。 話雖如此,我們可以生成OutboundSseEvent的流并將其分發給感興趣的每個人:
private static void subscribe(final SseBroadcaster broadcaster, final Builder builder) {Flowable.interval(1, TimeUnit.SECONDS).zipWith(eventsStream(builder), (id, bldr) -> createSseEvent(bldr, id)).subscribeOn(Schedulers.single()).subscribe(broadcaster::broadcast); }private static Flowable<OutboundSseEvent.Builder> eventsStream(final Builder builder) {return Flowable.generate(emitter -> emitter.onNext(builder.name("stats"))); }如果您不熟悉RxJava 2 ,請不用擔心,這就是這里發生的情況。 eventsStream方法為stats類型的SSE事件返回有效的無限OutboundSseEvent.Builder實例流。 訂閱方法稍微復雜一點。 我們首先創建一個流,該流每秒發出序號,例如fe 0、1、2、3、4、5、6等。 稍后,我們將此流與eventsStream方法返回的流組合在一起,實質上將兩個流合并為一個流,該流每秒發出一個元組(number, OutboundSseEvent.Builder ) 。 公平地說,該元組對我們不是很有用,因此我們將其轉換為OutboundSseEvent類的實例,將數字視為SSE事件標識符:
private static final Random RANDOM = new Random();private static OutboundSseEvent createSseEvent(OutboundSseEvent.Builder builder, long id) {return builder.id(Long.toString(id)).data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100))).mediaType(MediaType.APPLICATION_JSON_TYPE).build(); }OutboundSseEvent可以使用常規MessageBodyWriter解析策略在data屬性中攜帶將相對于指定的mediaType進行序列化的任何有效負載。 一旦獲得OutboundSseEvent實例,便使用SseBroadcaster :: broadcast方法將其發送出去。 請注意,我們使用subscribeOn運算符將控制流移交給了另一個線程,這通常是您一直在做的事情。
很好,希望現在清除了流部分,但是我們如何才能真正訂閱SseBroadcaster發出的SSE事件? 這比您想像的要容易:
@GET @Path("broadcast") @Produces(MediaType.SERVER_SENT_EVENTS) public void broadcast(@Context SseEventSink sink) {broadcaster.register(sink); }我們都準備好了。 這里最重要的是正在生成的內容類型,應將其設置為MediaType.SERVER_SENT_EVENTS 。 在這種情況下, SseEventSink的上下文實例變為可用,并且可以向SseBroadcaster實例注冊。
要查看我們的JAX-RS資源,我們需要使用例如JAXRSServerFactoryBean引導服務器實例,并在此過程中配置所有必需的提供程序。 請注意,我們也在顯式指定要使用的傳輸,在這種情況下為SseHttpTransportFactory.TRANSPORT_ID 。
@Configuration @EnableWebMvc public class AppConfig extends WebMvcConfigurerAdapter {@Beanpublic Server rsServer(Bus bus, StatsRestService service) {JAXRSServerFactoryBean endpoint = new JAXRSServerFactoryBean();endpoint.setBus(bus);endpoint.setAddress("/");endpoint.setServiceBean(service);endpoint.setTransportId(SseHttpTransportFactory.TRANSPORT_ID);endpoint.setProvider(new JacksonJsonProvider());return endpoint.create();}@Overridepublic void addResourceHandlers(ResourceHandlerRegistry registry) {registry.addResourceHandler("/static/**").addResourceLocations("classpath:/web-ui/"); } }要結束循環,我們只需要為Spring Boot應用程序提供運行器即可:
@SpringBootApplication public class SseServerStarter { public static void main(String[] args) {SpringApplication.run(SseServerStarter.class, args);} }現在,如果我們運行該應用程序并使用多個Web瀏覽器或同一瀏覽器中的不同選項卡導航到http:// localhost:8080 / static / broadcast.html ,我們將觀察到所有事件內部繪制的相同事件流:
不錯,廣播當然是一個有效的用例,但是在每次端點調用時返回一個獨立的SSE流又如何呢? 簡單,只需使用SseEventSink方法(例如send和close )即可直接操作SSE流。
@GET @Path("sse") @Produces(MediaType.SERVER_SENT_EVENTS) public void stats(@Context SseEventSink sink) {Flowable.interval(1, TimeUnit.SECONDS).zipWith(eventsStream(builder), (id, bldr) -> createSseEvent(bldr, id)).subscribeOn(Schedulers.single()).subscribe(sink::send, ex -> {}, sink::close); }這次,如果我們運行該應用程序并使用多個Web瀏覽器或同一瀏覽器中的不同選項卡導航到http:// localhost:8080 / static / index.html ,我們將觀察到完全不同的圖表:
出色的服務器端API確實非常簡潔且易于使用。 但是客戶端方面,我們可以使用Java應用程序中的SSE流嗎? 答案是肯定的。 JAX-RS 2.1還概述了客戶端API,其核心是SseEventSource 。
final WebTarget target = ClientBuilder.newClient().register(JacksonJsonProvider.class).target("http://localhost:8080/services/api/stats/sse");try (final SseEventSource eventSource =SseEventSource.target(target).reconnectingEvery(5, TimeUnit.SECONDS).build()) {eventSource.register(event -> {final Stats stats = event.readData(Stats.class, MediaType.APPLICATION_JSON_TYPE);System.out.println("name: " + event.getName());System.out.println("id: " + event.getId());System.out.println("comment: " + event.getComment());System.out.println("data: " + stats.getLoad() + ", " + stats.getTimestamp());System.out.println("---------------");});eventSource.open();// Just consume SSE events for 10 secondsThread.sleep(10000); }如果運行此代碼段(假設服務器也已啟動并且正在運行),我們將在控制臺中看到類似的內容(您可能還記得,數據是隨機生成的)。
name: stats id: 0 comment: null data: 82, 1509376080027 --------------- name: stats id: 1 comment: null data: 68, 1509376081033 --------------- name: stats id: 2 comment: null data: 12, 1509376082028 --------------- name: stats id: 3 comment: null data: 5, 1509376083028 ---------------...如我們所見,服務器端的OutboundSseEvent變為客戶端的InboundSseEvent 。 客戶端可以使用通常的MessageBodyReader解析策略,通過指定預期的媒體類型來消耗數據屬性中可以反序列化的任何有效負載。
單篇文章中壓縮了很多內容。 而且,關于SSE和JAX-RS 2.1的其他事情我們在這里沒有涉及,例如使用HttpHeaders.LAST_EVENT_ID_HEADER或配置重新連接延遲。 如果有興趣學習的話,這些可能是即將發布的帖子的重要話題。
總而言之, JAX-RS對SSE的支持是我們許多人期待已久的事情。 最后,它在那里,請嘗試一下!
完整的項目資源可在Github上找到 。
翻譯自: https://www.javacodegeeks.com/2017/10/better-late-never-sse-server-sent-events-now-jax-rs.html
jax-rs jax-ws
總結
以上是生活随笔為你收集整理的jax-rs jax-ws_迟来总比没有好:SSE或服务器发送的事件现在已在JAX-RS中的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: WeGame下载速度慢怎么解决?
- 下一篇: 戴尔台式电脑进不了winpe(戴尔电脑进