实践中的事件源和CQRS
任何嘗試實(shí)施完全符合ACID的系統(tǒng)的人都知道,您需要做很多事情。 您需要確保可以自由創(chuàng)建,修改和刪除數(shù)據(jù)庫實(shí)體而不會(huì)出錯(cuò),在大多數(shù)情況下,解決方案將以性能為代價(jià)。 可以用來解決此問題的一種方法是根據(jù)一系列事件而不是可變狀態(tài)來設(shè)計(jì)系統(tǒng)。 這通常稱為事件來源。
在本文中,我將展示一個(gè)演示應(yīng)用程序,該應(yīng)用程序使用開源工具包Speedment快速啟動(dòng)并運(yùn)行可擴(kuò)展的基于事件的數(shù)據(jù)庫應(yīng)用程序。 示例的完整源代碼在此處 。
什么是事件源?
在典型的關(guān)系數(shù)據(jù)庫系統(tǒng)中,您將實(shí)體的狀態(tài)存儲(chǔ)為數(shù)據(jù)庫中的一行。 狀態(tài)更改時(shí),應(yīng)用程序使用UPDATE或DELETE語句修改行。 這種方法的問題在于,當(dāng)要確保沒有更改任何行以致使系統(tǒng)處于非法狀態(tài)時(shí),它將對(duì)數(shù)據(jù)庫增加很多要求。 您不希望任何人提取比他們帳戶中更多的錢,或者要競標(biāo)已經(jīng)結(jié)束的拍賣。
在基于事件的系統(tǒng)中,我們對(duì)此采取了不同的方法。 無需將實(shí)體的狀態(tài)存儲(chǔ)在數(shù)據(jù)庫中,而是存儲(chǔ)導(dǎo)致該狀態(tài)的一系列更改 。 事件一旦創(chuàng)建便是不可變的,這意味著您僅需實(shí)現(xiàn)兩個(gè)操作CREATE和READ。 如果實(shí)體被更新或刪除,則可以通過創(chuàng)建“更新”或“刪除”事件來實(shí)現(xiàn)。
事件源系統(tǒng)可以輕松擴(kuò)展以提高性能,因?yàn)槿魏喂?jié)點(diǎn)都可以簡單地下載事件日志并重播當(dāng)前狀態(tài)。 由于寫入和查詢由不同的機(jī)器處理,因此您還可以獲得更好的性能。 這稱為CQRS(命令查詢職責(zé)隔離)。 正如您將在示例中看到的,使用Speedment工具包,我們可以在極短的時(shí)間內(nèi)獲得最終一致的實(shí)例化視圖并開始運(yùn)行。
可預(yù)訂的桑拿
為了展示構(gòu)建事件源系統(tǒng)的工作流程,我們將創(chuàng)建一個(gè)小型應(yīng)用程序來處理住宅區(qū)中共享桑拿浴室的預(yù)訂。 我們有多個(gè)租戶有興趣預(yù)訂桑拿浴室,但我們需要確保害羞的租戶永遠(yuǎn)不會(huì)意外預(yù)訂它。 我們還希望在同一系統(tǒng)中支持多個(gè)桑拿浴室。
為了簡化與數(shù)據(jù)庫的通信,我們將使用Speedment工具箱 。 Speedment是一個(gè)Java工具,它使我們能夠從數(shù)據(jù)庫生成完整的域模型,并且還可以使用優(yōu)化的Java 8流輕松查詢數(shù)據(jù)庫。 在Apache 2-license下可以使用Speedment ,在Github頁面上有很多很好的例子說明了不同的用法。
步驟1:定義數(shù)據(jù)庫架構(gòu)
第一步是定義我們的(MySQL)數(shù)據(jù)庫。 我們僅擁有一張稱為“預(yù)訂”的桌子,用于存儲(chǔ)與預(yù)訂桑拿浴室有關(guān)的事件。 請(qǐng)注意,預(yù)訂是事件而不是實(shí)體。 如果我們要取消預(yù)訂或?qū)ζ溥M(jìn)行更改,則必須將其他更改發(fā)布為新行。 我們不允許修改或刪除已發(fā)布的行。
CREATE DATABASE `sauna`;CREATE TABLE `sauna`.`booking` (`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,`booking_id` BIGINT NOT NULL,`event_type` ENUM('CREATE', 'UPDATE', 'DELETE') NOT NULL,`tenant` INT NULL,`sauna` INT NULL,`booked_from` DATE NULL,`booked_to` DATE NULL,PRIMARY KEY (`id`) );“ id”列是一個(gè)遞增的整數(shù),每次將新事件發(fā)布到日志時(shí)都會(huì)自動(dòng)分配。 “ booking_id”告訴我們我們指的是哪個(gè)預(yù)訂。 如果兩個(gè)事件共享相同的預(yù)訂ID,則它們引用相同的實(shí)體。 我們還有一個(gè)名為“ event_type”的枚舉,它描述了我們?cè)噲D執(zhí)行的操作。 之后是屬于預(yù)訂的信息。 如果列為NULL,則與任何先前值相比,我們將其視為未修改。
步驟2:使用加速生成代碼
下一步是使用Speedment為項(xiàng)目生成代碼。 只需創(chuàng)建一個(gè)新的maven項(xiàng)目并將以下代碼添加到pom.xml文件即可。
pom.xml
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><speedment.version>3.0.0-EA2</speedment.version><mysql.version>5.1.39</mysql.version> </properties><build><plugins><plugin><groupId>com.speedment</groupId><artifactId>speedment-maven-plugin</artifactId><version>${speedment.version}</version><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency></dependencies></plugin></plugins> </build><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.speedment</groupId><artifactId>runtime</artifactId><version>${speedment.version}</version><type>pom</type></dependency> </dependencies>如果生成項(xiàng)目,則IDE 中將出現(xiàn)一個(gè)名為speedment:tool的新maven目標(biāo)。 運(yùn)行它以啟動(dòng)Speedment用戶界面。 在其中,連接到Sauna數(shù)據(jù)庫并使用默認(rèn)設(shè)置生成代碼。 現(xiàn)在應(yīng)在項(xiàng)目中填充源文件。
提示:如果對(duì)數(shù)據(jù)庫進(jìn)行了更改,則可以使用speedment:reload -goal下載新配置,并使用speedment:generate 重新生成源。 無需重新啟動(dòng)該工具!
步驟3:創(chuàng)建物化視圖
物化視圖是一個(gè)組件,該組件定期輪詢數(shù)據(jù)庫以查看是否已添加任何新行,如果已添加,則以正確的順序下載并將它們合并到視圖中。 由于輪詢有時(shí)會(huì)花費(fèi)很多時(shí)間,因此我們希望此過程在單獨(dú)的線程中運(yùn)行。 我們可以使用Java Timer和TimerTask來實(shí)現(xiàn)。
輪詢數(shù)據(jù)庫? 真? 好吧,需要考慮的重要一點(diǎn)是,只有服務(wù)器才能輪詢數(shù)據(jù)庫,而不會(huì)輪詢客戶端。 這給我們提供了很好的可伸縮性,因?yàn)槲覀兛梢宰屔贁?shù)服務(wù)器輪詢數(shù)據(jù)庫,從而為成千上萬的租戶提供服務(wù)。 將此與常規(guī)系統(tǒng)進(jìn)行比較,在常規(guī)系統(tǒng)中,每個(gè)客戶端都會(huì)從服務(wù)器請(qǐng)求資源,然后服務(wù)器又會(huì)聯(lián)系數(shù)據(jù)庫。
BookingView.java
public final class BookingView {...public static BookingView create(BookingManager mgr) {final AtomicBoolean working = new AtomicBoolean(false);final AtomicLong last = new AtomicLong();final AtomicLong total = new AtomicLong();final String table = mgr.getTableIdentifier().getTableName();final String field = Booking.ID.identifier().getColumnName();final Timer timer = new Timer();final BookingView view = new BookingView(timer);final TimerTask task = ...;timer.scheduleAtFixedRate(task, 0, UPDATE_EVERY);return view;} }計(jì)時(shí)器任務(wù)是匿名定義的,這就是輪詢邏輯所在的位置。
final TimerTask task = new TimerTask() {@Overridepublic void run() {boolean first = true;// Make sure no previous task is already inside this block.if (working.compareAndSet(false, true)) {try {// Loop until no events was merged // (the database is up to date).while (true) {// Get a list of up to 25 events that has not yet // been merged into the materialized object view.final List added = unmodifiableList(mgr.stream().filter(Booking.ID.greaterThan(last.get())).sorted(Booking.ID.comparator()).limit(MAX_BATCH_SIZE).collect(toList()));if (added.isEmpty()) {if (!first) {System.out.format("%s: View is up to date. A total of " + "%d rows have been loaded.%n",System.identityHashCode(last),total.get());}break;} else {final Booking lastEntity = added.get(added.size() - 1);last.set(lastEntity.getId());added.forEach(view::accept);total.addAndGet(added.size());System.out.format("%s: Downloaded %d row(s) from %s. " + "Latest %s: %d.%n", System.identityHashCode(last),added.size(),table,field,Long.parseLong("" + last.get()));}first = false;}// Release this resource once we exit this block.} finally {working.set(false);}}} };有時(shí),合并任務(wù)可能需要花費(fèi)比計(jì)時(shí)器間隔更多的時(shí)間。 為避免此問題,我們使用AtomicBoolean進(jìn)行檢查并確保只能同時(shí)執(zhí)行一個(gè)任務(wù)。 這類似于信號(hào)量,不同之處在于我們想要?jiǎng)h除沒有時(shí)間的任務(wù)而不是排隊(duì),因?yàn)槲覀兇_實(shí)不需要執(zhí)行所有任務(wù),因此只需一秒鐘即可完成一個(gè)新任務(wù)。
構(gòu)造函數(shù)和基本成員方法很容易實(shí)現(xiàn)。 我們將傳遞給類的計(jì)時(shí)器作為參數(shù)存儲(chǔ)在構(gòu)造函數(shù)中,以便在需要停止時(shí)可以取消該計(jì)時(shí)器。 我們還會(huì)存儲(chǔ)一張地圖,以將所有預(yù)訂的當(dāng)前視圖保存在內(nèi)存中。
private final static int MAX_BATCH_SIZE = 25; private final static int UPDATE_EVERY = 1_000; // Millisecondsprivate final Timer timer; private final Map<Long, Booking> bookings;private BookingView(Timer timer) {this.timer = requireNonNull(timer);this.bookings = new ConcurrentHashMap<>(); }public Stream<Booking> stream() {return bookings.values().stream(); }public void stop() {timer.cancel(); }BookingView類的最后一個(gè)缺少的部分是合并過程中上面使用的accept()方法。 在這里考慮新事件并將其合并到視圖中。
private boolean accept(Booking ev) {final String type = ev.getEventType();// If this was a creation eventswitch (type) {case "CREATE" :// Creation events must contain all information.if (!ev.getSauna().isPresent()|| !ev.getTenant().isPresent()|| !ev.getBookedFrom().isPresent()|| !ev.getBookedTo().isPresent()|| !checkIfAllowed(ev)) {return false;}// If something is already mapped to that key, refuse the // event.return bookings.putIfAbsent(ev.getBookingId(), ev) == null;case "UPDATE" :// Create a copy of the current statefinal Booking existing = bookings.get(ev.getBookingId());// If the specified key did not exist, refuse the event.if (existing != null) {final Booking proposed = new BookingImpl();proposed.setId(existing.getId());// Update non-null valuesproposed.setSauna(ev.getSauna().orElse(unwrap(existing.getSauna())));proposed.setTenant(ev.getTenant().orElse(unwrap(existing.getTenant())));proposed.setBookedFrom(ev.getBookedFrom().orElse(unwrap(existing.getBookedFrom())));proposed.setBookedTo(ev.getBookedTo().orElse(unwrap(existing.getBookedTo())));// Make sure these changes are allowed.if (checkIfAllowed(proposed)) {bookings.put(ev.getBookingId(), proposed);return true;}}return false;case "DELETE" :// Remove the event if it exists, else refuse the event.return bookings.remove(ev.getBookingId()) != null;default :System.out.format("Unexpected type '%s' was refused.%n", type);return false;} }在事件源系統(tǒng)中,規(guī)則在收到事件時(shí)不執(zhí)行,但在實(shí)現(xiàn)時(shí)才執(zhí)行。 基本上,任何人都可以在表的末尾插入新事件到系統(tǒng)中。 在這種方法中,我們選擇丟棄不遵循規(guī)則設(shè)置的事件。
步驟4:示例用法
在此示例中,我們將使用標(biāo)準(zhǔn)的Speedment API將三個(gè)新的預(yù)訂插入到數(shù)據(jù)庫中,其中兩個(gè)有效,第三個(gè)與先前的一個(gè)相交。 然后,我們將等待視圖更新并打印出所有預(yù)訂。
public static void main(String... params) {final SaunaApplication app = new SaunaApplicationBuilder().withPassword("password").build();final BookingManager bookings = app.getOrThrow(BookingManager.class);final SecureRandom rand = new SecureRandom();rand.setSeed(System.currentTimeMillis());// Insert three new bookings into the system.bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(1).setBookedFrom(Date.valueOf(LocalDate.now().plus(3, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(5, DAYS))));bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(2).setBookedFrom(Date.valueOf(LocalDate.now().plus(1, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(2, DAYS))));bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(3).setBookedFrom(Date.valueOf(LocalDate.now().plus(2, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(7, DAYS))));final BookingView view = BookingView.create(bookings);// Wait until the view is up-to-date.try { Thread.sleep(5_000); }catch (final InterruptedException ex) {throw new RuntimeException(ex);}System.out.println("Current Bookings for Sauna 1:");final SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd");final Date now = Date.valueOf(LocalDate.now());view.stream().filter(Booking.SAUNA.equal(1)).filter(Booking.BOOKED_TO.greaterOrEqual(now)).sorted(Booking.BOOKED_FROM.comparator()).map(b -> String.format("Booked from %s to %s by Tenant %d.", dt.format(b.getBookedFrom().get()),dt.format(b.getBookedTo().get()),b.getTenant().getAsInt())).forEachOrdered(System.out::println);System.out.println("No more bookings!");view.stop(); }如果運(yùn)行它,將得到以下輸出:
677772350: Downloaded 3 row(s) from booking. Latest id: 3. 677772350: View is up to date. A total of 3 rows have been loaded. Current Bookings for Sauna 1: Booked from 2016-10-11 to 2016-10-12 by Tenant 2. Booked from 2016-10-13 to 2016-10-15 by Tenant 1. No more bookings!我的GitHub頁面上提供了此演示應(yīng)用程序的完整源代碼。 在這里您還可以找到許多其他示例,這些示例說明了如何在各種情況下使用Speedment快速開發(fā)數(shù)據(jù)庫應(yīng)用程序。
摘要
在本文中,我們?cè)跀?shù)據(jù)庫表上開發(fā)了一個(gè)物化視圖,該視圖可評(píng)估物化而不是插入時(shí)的事件。 這樣就可以啟動(dòng)應(yīng)用程序的多個(gè)實(shí)例,而不必?fù)?dān)心對(duì)其進(jìn)行同步,因?yàn)樗鼈冏罱K將保持一致。 然后,我們通過展示如何使用Speedment API查詢實(shí)例化視圖來生成當(dāng)前預(yù)訂列表來結(jié)束。
感謝您的閱讀,請(qǐng)?jiān)贕ithub頁面上查看更多Speedment示例 !
翻譯自: https://www.javacodegeeks.com/2016/10/event-sourcing-cqrs-practise.html
總結(jié)
以上是生活随笔為你收集整理的实践中的事件源和CQRS的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 尤尔达键盘快捷键(尤里快捷键)
- 下一篇: 金蝶中核销单的作用(金蝶核销单是什么意思