Xử lý stream với RxJS

Như bạn đã biết, RxJS chuyên dùng cho việc xử lý các data stream như các event người dùng click chuột, di chuột, hay xử lý các event có tính lặp lại theo thời gian, rồi các xử lý gọi API. Một số RxJS operator chúng ta hay dùng có thể kể đến như:

  1. map
  2. mapTo
  3. switchMap
  4. mergeMap
  5. concatMap
  6. pluck
  7. scan
  8. reduce
  9. toArray
  10. debounceTime
  11. throttleTime

Như vậy có rất nhiều operator mà RxJS cung cấp và khả năng là chúng ta hiếm khi sử dụng hết tất cả số operator đó. Tuy nhiên thay vì nhớ hết từng operator thì chúng ra nên nhớ use-case tương ứng. Ví dụ như các use-case phổ biến sau:

Tìm kiếm

Ví dụ điển hình nhất là xử lý việc search. Ta biết rằng để tìm kiếm một thông tin thì thường client sẽ request đến API mỗi khi user gõ kí tự lên ô tìm kiếm, nếu cứ mỗi lần user gõ 1 chữ mà ta lại gọi API một lần, khi gõ xong 10 kí tự ta đã thực hiện 10 request và tương ứng là 10 response trả về. Hậu quả là số lượng request lên backend sẽ rất nhiều, hơn nữa các request sẽ chồng lên nhau và gây ra trải nghiệm rất tệ cho user. Điều mà ta muốn là user phải gõ 1 số kí tự nhất định thì mới bắt đầu request lên API, và việc request chỉ được thực hiện sau khi user ngừng gõ một thời gian nào đó, sau đó thì các request trước phải bị cancel khi có request mới.

this.result$ = this.searchTerm$.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  filter((term: string) => term.length > 3),
  switchMap((term: string) => this.service.search(term))
);

Trong đoạn code này ta dùng các operator của RxJS như sau:

  1. deboundTime: đợi 300 ms từ khi user ngừng gõ phím
  2. distinctUntilChanged: chỉ emit giá trị khi keyword tìm kiếm thay đổi
  3. filter: lọc điều kiện là user đã gõ keyword dài hơn 3 kí tự
  4. switchMap: emit một Obserable mới chính là request tới API để lấy kết quả tìm kiếm, đồng thời cancel tất cả các request trước đó.

Tính số Fibonacci

Ta có thể dùng RxJS để tính số Fibonacci dựa trên giải thuật quy hoạch động như sau

let seed = [0, 1]
interval(300).pipe(
  take(100),
  scan(([a, b]) => [b, a + b], seed),
  map(([, n]) => n),
  startWith(...seed)
).subscribe(console.log)

Trong ví dụ trên ta dùng toán tử take để emit ra 100 giá trị Fibonacci đầu tiên. Ta bắt đầu với seed là 2 số fibonacci đầu tiên, sau đó dùng toán tử scan của RxJS để update mảng seed: [a, b] => [b, a + b]. Sau đó ta emit ra giá trị thứ 2 của mảng seed này với toán tử map, chính là giá trị cần tính. Ngoài ra ta thêm toán tử startWith để emit thêm 2 số đầu tiên là 0, 1 trong mảng seed, nếu thiếu xử lý này thì giá trị emit sẽ là 1, 2, 3, 5… thay vì 0, 1, 1, 2…

Leave a comment