透过 In-memory Channel 看 Knative Eventing 中 Broker/Trigger 工作机制
In-memory Channel是當前Knative Eventing中默認的Channel, 也是一般剛接觸Knative Eventing首先了解到的Channel。本文通過分析 In-memory Channel 來進一步了解 Knative Eventing 中Broker/Trigger事件處理機制。
事件處理概覽
我們先整體看一下Knative Eventing 工作機制示意圖:
通過 namespace 創建默認 Broker 如果不指定Channel,會使用默認的 Inmemory Channel。
下面我們從數據平面開始分析Event事件是如何通過In-memory Channel分發到Knative Service
Ingress
Ingress是事件進入Channel前的第一級過濾,但目前的功能僅僅是接收事件然后轉發到Channel。過濾功能處理TODO狀態。
func (h *handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {tctx := cloudevents.HTTPTransportContextFrom(ctx)if tctx.Method != http.MethodPost {resp.Status = http.StatusMethodNotAllowedreturn nil}// tctx.URI is actually the path...if tctx.URI != "/" {resp.Status = http.StatusNotFoundreturn nil}ctx, _ = tag.New(ctx, tag.Insert(TagBroker, h.brokerName))defer func() {stats.Record(ctx, MeasureEventsTotal.M(1))}()send := h.decrementTTL(&event)if !send {ctx, _ = tag.New(ctx, tag.Insert(TagResult, "droppedDueToTTL"))return nil}// TODO Filter.ctx, _ = tag.New(ctx, tag.Insert(TagResult, "dispatched"))return h.sendEvent(ctx, tctx, event) }In-memory Channel
Broker 字面意思為代理者,那么它代理的是誰呢?是Channel。為什么要代理Channel呢,而不直接發給訪問Channel。這個其實涉及到Broker/Trigger設計的初衷:對事件過濾處理。我們知道Channel(消息通道)負責事件傳遞,Subscription(訂閱)負責訂閱事件,通常這二者的模型如下:
這里就涉及到消息隊列和訂閱分發的實現。那么在In-memory Channel中如何實現的呢?
其實 In-memory 的核心處理在Fanout Handler中,它負責將接收到的事件分發到不同的 Subscription。
In-memory Channel處理示意圖:
事件接收并分發核心代碼如下:
func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error {return func(_ provisioners.ChannelReference, m *provisioners.Message) error {if f.config.AsyncHandler {go func() {// Any returned error is already logged in f.dispatch()._ = f.dispatch(m)}()return nil}return f.dispatch(m)} }當前分發機制默認是異步機制(可通過AsyncHandler參數控制分發機制)。
消息分發機制:
// dispatch takes the request, fans it out to each subscription in f.config. If all the fanned out // requests return successfully, then return nil. Else, return an error. func (f *Handler) dispatch(msg *provisioners.Message) error {errorCh := make(chan error, len(f.config.Subscriptions))for _, sub := range f.config.Subscriptions {go func(s eventingduck.SubscriberSpec) {errorCh <- f.makeFanoutRequest(*msg, s)}(sub)}for range f.config.Subscriptions {select {case err := <-errorCh:if err != nil {f.logger.Error("Fanout had an error", zap.Error(err))return err}case <-time.After(f.timeout):f.logger.Error("Fanout timed out")return errors.New("fanout timed out")}}// All Subscriptions returned err = nil.return nil }通過這里的代碼,我們可以看到分發處理超時機制。默認為60s。也就是說如果分發的請求響應超過60s,那么In-memory會報錯:Fanout timed out。
Filter
一般的消息分發會將消息發送給訂閱的服務,但在 Broker/Trigger 模型中需要對事件進行過濾處理,這個處理的地方就是在Filter 中。
- 根據請求獲取Trigger信息。Filter中會根據請求的信息拿到 Trigger 名稱,然后通過獲取Trigger對應的資源信息拿到過濾規則
- 根據Trigger 過濾規則進行事件的過濾處理
- 最后將滿足過濾規則的分發到Kservice
其中過濾規則處理代碼如下:
func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool {if ts.Filter == nil || ts.Filter.SourceAndType == nil {r.logger.Error("No filter specified")ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-fail"))return false}// Record event count and filtering timestartTS := time.Now()defer func() {filterTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond)stats.Record(ctx, MeasureTriggerFilterTime.M(filterTimeMS))}()filterType := ts.Filter.SourceAndType.Typeif filterType != eventingv1alpha1.TriggerAnyFilter && filterType != event.Type() {r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("event.Type()", event.Type()))ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail"))return false}filterSource := ts.Filter.SourceAndType.Sources := event.Context.AsV01().SourceactualSource := s.String()if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != actualSource {r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", actualSource))ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail"))return false}ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass"))return true }當前的機制是所有的訂閱事件都會通過 Filter 集中進行事件過濾,如果一個Broker有大量的訂閱Trigger,是否會給Filter帶來性能上的壓力? 這個在實際場景 Broker/Trigger 的運用中需要考慮到這個問題。
結論
作為內置的默認Channel實現,In-memory 可以說很好的完成了事件接收并轉發的使命,并且 Knative Eventing 在接下來的迭代中會支持部署時指定設置默認的Channel。有興趣的同學可以持續關注一下。
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的透过 In-memory Channel 看 Knative Eventing 中 Broker/Trigger 工作机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C语言动态内存管理和动态内存分配
- 下一篇: Linus 本尊来了!为什么 KubeC