使用 Channel

到目前為止,我們使用了 takeput effect 來和 Redux Store 溝通。Channels 概括了這些 Effect 與外部事件來源或 Saga 它們之間的溝通。它們也可以從 Store 指定隊列的 action。

在這個部份,我們將會看到:

  • 如何從 Store 使用 yield actionChannel Effect 來緩衝指定的 action。

  • 如何使用 eventChannel factory function 連結 take Effect 到外部的事件來源。

  • 如何使用通用的 channel factory function 建立一個 channel,並在兩個 Saga 之間使用 takeput Effect 做溝通。

使用 actionChannel Effect

讓我們回顧一下經典的範例:

import { take, fork, ... } from 'redux-saga/effects'

function* watchRequests() {
  while (true) {
    const {payload} = yield take('REQUEST')
    yield fork(handleRequest, payload)
  }
}

function* handleRequest(payload) { ... }

上方的範例說明典型的 watch-and-fork 的模式,watchRequests saga 使用 fork 來避免阻塞,因此不會錯過任何來自 store 的 action。handleRequest task 是在每次得到 REQUEST action 時被建立,所以如果有許多 action 在一個 race 被觸發,可以同時執行許多 handleRequest task。

想像現在我們需要以下的功能:我們想要每次只處理一個 REQUEST,意思是如果我們在一個時間點有四個 action,我們想要一個一個處理 REQUEST action,處理完第一個 action 後,再接著處理第二個 action...。

所以我們想要的是隊列所有還沒被處理的 action,一旦我們處理完目前的 request,我們可以從隊列取得下一個訊息。

library 提供一個 actionChannel helper Effect,讓我們可以處理這些東西。讓我們來看如何使用它重新撰寫先前的範例:

import { take, actionChannel, call, ... } from 'redux-saga/effects'

function* watchRequests() {
  // 1- 建立一個 channel 給 request action
  const requestChan = yield actionChannel('REQUEST')
  while (true) {
    // 2- 從 channel 取得
    const {payload} = yield take(requestChan)
    // 3- 注意,我們使用一個阻塞的呼叫
    yield call(handleRequest, payload)
  }
}

function* handleRequest(payload) { ... }

第一件事情是建立一個 action channel,我們使用 yield actionChannel(pattern),這個 pattern 被解讀成我們先前提到的 take(pattern) 並使用相同的規則。這兩個形式不同的地方是,如果 Saga 還沒準備好接收它們的話(例如一個被阻塞的 API 呼叫),actionChannel 可以緩衝傳入的訊息

接下來是 yield take(requestChan),除了使用一個 pattern 從 Redux Store 接收指定的 action 之外,take 也可以被用在 channel(在上面我們從指定的 Redux Store 建立 channel 物件)。take 可以阻塞 Saga,直到在 channel 有一個可用的訊息。如果有一個訊息被儲存在基礎緩衝區,take 也可以立即的恢復。

最重要的是注意到我們如何使用一個阻塞的 call。Saga 將停留在阻塞狀態,直到 call(handleRequest) 回傳,但如果其他的 REQUEST action 被 dispatch,而 Saga 仍然被阻塞時,透過 requestChan 被隊列在內部。當 Saga 從 call(handleRequest) 恢復並執行下一個 yield take(requestChan),take 將 resolve 被隊列的訊息。

預設上,actionChannel 沒有限制緩衝所有傳入的訊息。如果你想要更多的緩衝控制,你可提供一個 Buffer 的參數到 effect creator。library 提供一些普遍的 buffer(none、dropping、sliding),但你也可以提供你自己的 buffer 實作,更多細節請參考 API 文件

例如,如果我們只想要處理最近的五筆資料,你可以使用:

import { buffers } from 'redux-saga'
import { actionChannel } from 'redux-saga/effects'

function* watchRequests() {
  const requestChan = yield actionChannel('REQUEST', buffers.sliding(5))
  ...
}

使用 eventChannel factory 連結外部的事件

像是 actionChannel(Effect)、eventChannel(一個 factory function,而不是一個 Effect)為 Redux Store 以外的事件來源建立一個 Channel。

這是一個從 interval 建立 Channel 的範例:

import { eventChannel, END } from 'redux-saga'

function countdown(secs) {
  return eventChannel(emitter => {
      const iv = setInterval(() => {
        secs -= 1
        if (secs > 0) {
          emitter(secs)
        } else {
          // 這裡造成 channel 關閉
          emitter(END)
        }
      }, 1000);
      // 訂閱者必須回傳取消訂閱功能
      return () => {
        clearInterval(iv)
      }
    }
  )
}

eventChannel 第一個參數是一個 subscriber function。訂閱者的規則是初始化外部的來源(上面使用 setInterval),透過提供的 emitter 將來源路由所有傳入的事件調用到 channel。在上面的範例我們在每秒調用 emitter

注意:你需要清除你的事件來源,不是通過事件 channel 傳送 null 或 undefined。雖然可以透過數字傳送,但我們推薦像是 redux action 一樣,組織你的事件 channel 資料。

注意,也可以調用 emitter(END)。channel 被關閉時,我們使用 emitter(END) 來通知所有 channel consumer,意思是沒有其他的訊息可以可以通過這個 channel。

讓我看一下如何從 Saga 使用這個 channel。(這個範例是來自 repo 的 cancellable-counter )

import { take, put, call } from 'redux-saga/effects'
import { eventChannel, END } from 'redux-saga'

// 在每秒間隔建立一個事件 Channel
function countdown(seconds) { ... }

export function* saga() {
  const chan = yield call(countdown, value)
  try {    
    while (true) {
      // take(END) 將造成 saga 終止,跳到 finally 區塊
      let seconds = yield take(chan)
      console.log(`countdown: ${seconds}`)
    }
  } finally {
    console.log('countdown terminated')
  }
}

所以 Saga yield 一個 take(chan) 造成阻塞,直到一個訊息被 put 在 channel。在我們上面的範例,它對應到我們調用 emitter(secs),注意我們還在在一個 try/finally 區塊執行整個 while (true {...} 迴圈。當間隔終止時,countdown function 透過調用 emitter(END) 關閉 channel。在 channel 的 take effect 關閉 channel 終止所有被阻塞的 Saga。在我們的範例,終止 Saga 將造成它跳到 finally 區塊(如果有提供的話,否則 Saga 只是簡單的終止)。

訂閱者回傳一個 unsubscribe function,這是被用來在事件來源完成之前,透過 channel 取消訂閱。在 Saga 內使用來自事件 channel 的訊息,如果我們想要在事件來源完成之前提早離開(例如:Saga 已經被取消),你可以從來源呼叫 chan.close() 關閉 channel 並取消訂閱。

例如,我們可以讓我們的 Saga 支援取消:

import { take, put, call, cancelled } from 'redux-saga/effects'
import { eventChannel, END } from 'redux-saga'

// 在每秒間隔建立一個事件 Channel
function countdown(seconds) { ... }

export function* saga() {
  const chan = yield call(countdown, value)
  try {    
    while (true) {
      let seconds = yield take(chan)
      console.log(`countdown: ${seconds}`)
    }
  } finally {
    if (yield cancelled()) {
      chan.close()
      console.log('countdown cancelled')
    }    
  }
}

這裡是另一個例子,你如何使用事件 channel 去傳送 WebSockeet 事件到你的 saga(例如:使用 socket.io library)。 假設你等待伺服器的一個 ping 訊息,然後在 delay 後回覆一個 pong 訊息。

import { take, put, call, apply } from 'redux-saga/effects'
import { eventChannel, delay } from 'redux-saga'
import { createWebSocketConnection } from './socketConnection'

// 這個 function 從一個指定的 socket 建立一個 event channel
// 設定傳入 `ping` events 的 subscription
function createSocketChannel(socket) {
  // `eventChannel` 接收一個 subscriber function
  // subscriber function 接收一個 `emit` 參數,把 message 放到 channel 上 the channel
  return eventChannel(emit => {

    const pingHandler = (event) => {
      // 放入 event payload 到 channel
      // 這可以讓 Saga 從被回傳的 channel 接收 payload
      emit(event.payload)
    }

    // 設定 subscription
    socket.on('ping', pingHandler)

    // subscriber 必須回傳一個 unsubscribe function
    // 當 saga 呼叫 `channel.close` 方法將會被調用
    const unsubscribe = () => {
      socket.off('ping', pingHandler)
    }

    return unsubscribe
  })
}

// 透過調用的 `socket.emit('pong')` 回傳一個 `pong` 訊息
function* pong(socket) {
  yield call(delay, 5000)
  yield apply(socket, socket.emit, ['pong']) // 呼叫 `emit` 作為一個方法並以 `socket` 作為 context
}

export function* watchOnPings() {
  const socket = yield call(createWebSocketConnection)
  const socketChannel = yield call(createSocketChannel, socket)

  while (true) {
    const payload = yield take(socketChannel)
    yield put({ type: INCOMING_PONG_PAYLOAD, payload })
    yield fork(pong, socket)
  }
}

注意:預設上,訊息在一個 eventChannel 不會被緩衝。你可以提供一個緩衝到 eventChannel factory 來指定 channel 的緩衝策略(例如:eventChannel(subscriber, buffer))。 更多資訊請參考 API 文件

使用 channel 在 Saga 之間溝通

除了 action channel 和事件 channel 之外,你也可以直接建立 channel,預設上可以不用連結任何的來源,你可以在 channel 手動的 put。當你想要在 saga 之間使用 channel 溝通是非常方便的。

為了說明,讓我們回顧先前的請求操作範例:

import { take, fork, ... } from 'redux-saga/effects'

function* watchRequests() {
  while (true) {
    const {payload} = yield take('REQUEST')
    yield fork(handleRequest, payload)
  }
}

function* handleRequest(payload) { ... }

我們可以看到 watch-and-fork pattern 允許我們同時操作多個請求,在併行下,沒有工作 task 的數量限制。然後我們使用 actionChannel effect 來限制併發一次執行一個 task。

因此,我們要求在同一時間內執行三個 task,當我們取得一個 request,而且執行的 task 小於三個,我們會立即的處理 request,但是如果我們已經有三個 task 執行了,我們將 task 隊列,並等待其中一個 slots 完成。

下面的範例使用 channel 解決:

import { channel } from 'redux-saga'
import { take, fork, ... } from 'redux-saga/effects'

function* watchRequests() {
  // 建立一個 channel 隊列傳入的請求
  const chan = yield call(channel)

  // 建立三個 worker thread
  for (var i = 0; i < 3; i++) {
    yield fork(handleRequest, chan)
  }

  while (true) {
    const {payload} = yield take('REQUEST')
    yield put(chan, payload)
  }
}

function* handleRequest(chan) {
  while (true) {
    const payload = yield take(chan)
    // 處理請求
  }
}

在上面的範例中,我們使用 channel factory 建立一個 channel。我們取回一個 channel,預設上我們放入所有緩衝的訊息(除非有一個正在等待的 taker,如果有訊息的話立即恢復 taker)。

watchRequests saga fork 三個 worker saga。注意,建立的 channel 提供給所有被 fork 的 saga,watchRequests 將使用這個 channel 來 dispatch 工作到三個 worker saga。在每個 REQUEST action,Saga 簡單的在 channel 放入 payload,任何空閒的 worker 會接收 payload,也就是說它將透過 channel 被隊列,直到一個 woker Saga 準備接收它。

所有的 worker 執行一個典型的 while 迴圈。在每次迭代 worker 將取得下一次的 request,或者阻塞直到有可用的訊息。注意,這個機制在三個 worker 之間提供一個自動載入平衡。

results matching ""

    No results matching ""