Reactive programming trong RXJS

Photo by Yunfan Xu on Unsplash

Reactive programming trong RXJS

·

23 min read

Reactive programming

"Reactive programming là lập trình bất đồng bộ xoay quanh luồng, luồng là đối tượng vận chuyển chuỗi dữ liệu, sự kiện nối tiếp nhau”. Có nhiều khái niệm cần nắm được sẽ được giới thiệu ở dưới.

Observable

"Là đối tượng đại diện cho 1 nguồn (dữ liệu hoặc sự kiện) nào đấy, nguồn này sẽ không được kích hoạt nếu không đăng kí, đăng kí bằng cách gọi phương thức subscribe, khi đó có thể nhận được giá trị từ nguồn phát ra (emit ra)".

Observer

"Là các callback next, error, complete để lắng nghe và xử lí logic khi nhận được data từ các sự kiện đến, các callback error và complete chỉ xảy ra 1 lần và chỉ 1 trong 2 xảy ra".
  • next: xử lí trong trường hợp observable thực thi thành công - vd call api lên server trả về data thành công, status 200, mỗi lần có 1 giá trị emit thì sẽ chạy vào next 1 lần.
  • error: xử lí khi observable gửi đi 1 tín hiệu thực thi thất bại - vd call api bị server trả về lỗi 401, 404, 5x..., observable chỉ gửi đi tín hiệu error và close stream.
  • complete: xử lí khi observable này đã hoàn thành, observable chỉ phát tín hiệu complete ra 1 lần và close stream, sau khi đã emit hết data trong next, nếu observable thực thi thất bại và trả về error thì sẽ không gửi đi tín hiệu complete.

Subscription

"Sau khi Observable subscribe sẽ sinh ra 1 đối tượng Subscription, dùng để hủy việc tiếp tục xử lí (call unsubscribe()), tránh memory leak".

Operators

"Các function để làm việc với Observable, nhận vào observable và trả về 1 observable khác, giúp xử lí logic dễ dàng và nhanh hơn".

Pipe

"Là 1 operator chứa các operator khác"

Creation Operators

 các operator để tạo 1 Observable mới như: of, from, fromEvent, fromPromise, interval, timer, throwError ...”

of: tạo 1 Observable từ bất kì giá trị nào, có thể truyền vào nhiều giá trị

   const observable = of({name: 'ABC'}, [1,2,3], function hello() { return 'Hello'});
=> observable.subscribe(
    data => console.log('next: ', data),
    err => console.log('error: ', err),
    () => console.log('complete!'),
  );
  // outputs : next: {name: 'ABC'}, [1,2,3], function hello(){ return 'Hello'}
  "complete!".

Use case: dùng nếu yêu cầu trả về 1 observable chứa giá trị bất kì hoặc 1 observable rỗng thì có thể dùng of

from: tạo 1 Observable từ 1 promise, iterable (iterable là mảng, string, map, set ...)

     var array = [10, 20, 30];
  var result = from(array);
  result.subscribe(x => console.log(x)); // 10 20 30

Use case: dùng khi muốn chuyển 1 promise thành observable, muốn emit từng giá trị một

fromEvent: tạo 1 observable từ 1 sự kiện: click, keyup, keydown ...

   fromEvent(document, 'click').subscribe(val => console.log(val)) // MouseEvent {isTrusted: true}

Use case: dùng để chuyển bất kì sự kiện nào sang observable: tạo 1 sự kiện về chuột (click, up,down ...), key, nhập form ... nhớ unsubscribe

interval: tạo 1 observable và emit data theo chu kì giống như setInterval

   interval(1000).subscribe(val => console.log(val)); // 0 1 2 ...

Use case: dùng trong TH tạo 1 sự kiện lặp lại liên tục ví dụ muốn call api để lấy thông báo sau mỗi 1 phút

timer: sẽ emit sau 1 khoảng thời gian giống setTimeout hoặc sau khi chờ 1 thời gian sẽ emit theo chu kì giống interval

   VD1: giống setTimeout
timer(1000).subscribe(val => console.log(val), null, () => console.log('complete')); // 0 complete
   VD2: giống interval
timer(1000, 5000).subscribe(val => console.log(val)); // 0 - (sau 5s) - 1 - (sau 5s) - 2 ...

Use case: dùng khi muốn thực hiện 1 action nào đấy nhưng phải chờ 1 thời gian mới kích hoạt nó

throwError: tạo 1 observable từ data truyền vào nhưng không phải chạy vào next mà chạy vào error của observer

   // tạo 1 observable bằng throwError
const source = throwError('This is an error!');
const subscribe = source.subscribe(
val => console.log(val),
err => console.log(`Error: ${err}`)) // 'Error: This is an error!'

Use case: dùng để bắn lỗi ra, ví dụ interceptor sẽ bắn lỗi ra và tại api call nó sẽ nhận về

Transformation Operators

“Là các operator dùng để chuyển đổi data: map, pluck, mapTo, scan, toArray, buffer …”.

map: duyệt qua từng phần tử, chuyển đổi giá trị của chúng trước khi trả về

   VD1: cộng mỗi phần tử với 1 số
const source = from([1, 2, 3, 4, 5]);
//add 10 to each value
const example = source.pipe(map(val => val + 10));
//output: 11,12,13,14,15
   VD2: chỉ trả về 1 thuộc tính của 1 đối tượng : 
const source = from([
    { name: 'Joe', age: 30 },
    { name: 'Frank', age: 20 },
    { name: 'Ryan', age: 50 }
]);
// chỉ trả về name
const example = source.pipe(map(({ name }) => name));
// output: "Joe","Frank","Ryan"
   VD3: trả về thêm data
const users = [
{id: 'ddfe3653-1569-4f2f-b57f-bf9bae542662', username: 'tiepphan', firstname: 'tiep', lastname:     'phan'},
{id: '34784716-019b-4868-86cd-02287e49c2d3', username: 'nartc', firstname: 'chau', lastname: 'tran'},];
const source = from(users);
source.pipe(
    map(user => {
        return {
            ...user,
            fullname: `${user.firstname} ${user.lastname}`
        };
    })
)
// output {id: 'ddfe3653-1569-4f2f-b57f-bf9bae542662', username: 'tiepphan', firstname: 'tiep', lastname: 'phan', fullname: 'tiep phan'}
{id: '34784716-019b-4868-86cd-02287e49c2d3', username: 'nartc', firstname: 'chau', lastname: 'tran', fullname: 'chau tran'}

Use case: dùng nếu muốn biến đổi data trước khi trả về, hoặc chỉ cần trả về giá trị 1 biến trong đối tượng vd giá trị trước khi trả về sẽ sẽ uppercase string, nhân 2 ...

pluck: Giống như map với TH trả về 1 thuộc tính của đối tượng

  const source = from([
{ name: 'Joe', age: 30, job: { title: 'Developer', language: 'JavaScript' } },
{ name: 'Sarah', age: 35 }
]);
// lấy name viết như sau
const example1 = source.pipe(pluck('name'));
// output: 'Joe' , 'Sarah'
// lấy title trong job viết như sau
const example2 = source.pipe(pluck('job', 'title'));
// output: 'Developer' , undefined (ubndefined vì phần tử thứ 2 ({ name: 'Sarah', age: 35 }) không có job )
const subscribe = example2.subscribe(val => console.log(val));

Use case: ví dụ 1 object {name: 'A', age: 20} mà chỉ muốn lấy biến name thì dùng pluck

mapTo: tất cả các giá trị emit ra đều convert thành 1 giá trị chung

const source = from([1, 2, 3, 4, 5]);
// chuyển tất cả các giá trị thành 10
const example = source.pipe(mapTo(10));
// output: 10 10 10 10 10
const subscribe = example.subscribe(val => console.log(val));

Use case: ví dụ tất cả các giá trị trả về đều biến thành true/false ...

scan: Khi muốn có 1 action nào đấy lên 1 phần tử (giống như map) nhưng cần lấy giá trị của các lần trả về trước thì có thể dùng scan

   const source = from([1,2,3,4,5]);
const sourceScan = source.pipe(scan((accumulator, currentValue) => accumulator + currentValue, 0));
accumulator : kết quá lưu trữ từ trước (tức là mỗi lần lặp, accumulator + currentValue bằng bao nhiêu thì nó sẽ gán lại vào accumulator)
currentvalue: giá trị phần tử hiện tại
0: giá trị khởi tạo <=> accumulator ban đầu = 0
source.subscribe(val => console.log(val));
// result: 1 3 6 10 15

Use case: ví dụ đếm số lần người dùng đã click vào 1 button

toArray: Gộp tất cả các giá trị được emit ra và trả về 1 mảng

  interval(100)
.pipe(take(10), toArray()) // .pipe(take(10)) tức  interval(100) emit 10 lần rồi complete
.subscribe(console.log);
// output: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

buffer: gộp các giá output từ observable bên ngoài thành 1 mảng nhưng phải đợi đến khi observable bên trong (observable truyền vào buffer) emit thì mới emit các output này

  const interval$ = interval(1000);
const timer$ = timer(15000);
interval$.pipe(buffer(timer$)).subscribe(
  val => console.log('Buffered Values: ', val)
);
//output: sau 15s sẽ in ra [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
tức là interval$ emit giá trị sau mỗi giây, nhưng phải đợi đến khi timer$ emit thì tất cả các giá trị trong interval$ mới được in ra, và chúng được gộp thành 1 mảng

Filtering Operators

"Dùng để lọc, lấy ra các giá trị thỏa mãn điều kiện nào đó - filter, first, last, find, single, take, takeUntil, skip, distinctUntilChanged, throttle, throttleTime, debounce, debounceTime ...".

filter: Lấy ra những giá trị thỏa mãn điều kiện lọc

  from([1, 2, 3, 4, 5, 6])
.pipe(
    filter((x) => x % 2 === 0) // số chẵn
).subscribe(console.log); // output: 2, 4, 6

use case: chỉ lấy những giá trị thỏa mãn 1 điều kiện nào đấy, ví dụ chỉ lấy những phần tử chẵn trong mảng

first: chỉ lấy ra giá 1 trị đầu tiên, sau đó complete, nếu không có gía trị nào được emit, nó sẽ throw ra 1 error

  VD1: // lấy ra giá tri đầu tiên
from([1, 2, 3, 4, 5, 6])
.pipe(first())
.subscribe(console.log, null, () => console.log('complete')); // output: 1 -> complete

  VD2: // throw error
of() // không có giá trị nào
.pipe(first())
.subscribe(null, console.log, null); // Error: EmptyError

  VD3: // có thể nhận vào 1 điều kiện và có thể nhận vào 1 defaultValue để khi không có giá trị nào được emit, nó sẽ emit defaultvalue và tránh bị throw error
from([1, 2, 3, 4, 5, 6])
.pipe(
    first(x => x > 7, 'defaultValue'),) // set default value
.subscribe(console.log, null, () => console.log('complete')); // output: 'defaultValue' -> complete

Use case: chỉ lấy 1 lấy giá trị đầu tiên thỏa mãn điều kiện nào đấy

last: giống như first nhưng không lấy các giá trị đầu tiên mà lấy các giá trị cuối cùng thỏa mãn

  VD1: from([1, 2, 3, 4, 5, 6])
.pipe(last())
.subscribe(console.log, null, () => console.log('complete')); // output: 6 -> complete
  VD2: of() // an empty Observable
.pipe(last())
.subscribe(null, console.log, null); // Error: EmptyError

find: lấy ra giá 1 trị đầu tiên thỏa m, sau đó complete, nếu không có giá trị nào được emit, nó sẽ complete

  from([1, 2, 3, 4, 5, 6])
.pipe(find((x) => x % 7 === 0) // không có giá trị nào thỏa mãn
).subscribe(console.log, null, () => console.log('complete')); // output: complete

single: tương tự first - emit 1 giá trị duy nhất thỏa mãn, nếu có nhiều giá trị thỏa mãn => throw error, nếu không có giá trị nào thỏa mãn thì sẽ trả về undefined sau đó complete

  VD1: TH không có giá trị nào thỏa mãn
from([1, 2, 3])
.pipe(single(x => x === 12))
.subscribe(console.log,x => console.log(x),() => console.log('complete'));
// output: undefined, complete
  VD2: TH có giá trị thỏa mãn
from([1, 2, 3])
.pipe(single(x => x === 2))
.subscribe(console.log,x => console.log(x),() => console.log('complete'));
// output: 2, complete

take: lấy bao nhiêu giá trị được emit từ observable

  VD1:
from([1, 2, 3, 4])
.pipe(take(2)) // lấy 2 giá trị
.subscribe(console.log, null, () => console.log('complete')); // output: 1, 2 -> complete
  VD2: nếu không có giá trị nào thỏa mãn thì sẽ complete
of().pipe(take(2))
.subscribe(console.log, null, () => console.log('complete'));
//output : complete

use case: ví dụ lấy vị trí chuột khi user click vào page lần đầu, người dùng click nhiều lần => chỉ muốn lấy lần click đầu tiên hoặc 1 số lần đầu tiên, 1 observable trả về liên tục như interval, retry ... mà chỉ cần lấy data ở 1 số lần đầu tiên

takeUntil: nhận vào 1 observable và observable bên ngoài sẽ emit giá trị tới khi observable bên trong (observable truyền vào takeUntil) emit thì complete

  interval(1000)
.pipe(takeUntil(fromEvent(document, 'click'))) // emit giá trị cho tới khi click
.subscribe(console.log, null, () => console.log('complete')); // output: 0, 1, 2, 3, 4 -- click --> 'complete'

skip: bỏ qua bao nhiêu giá trị đầu tiên sau đấy mới emit ra

  VD1:
from([1, 2, 3, 4])
.pipe(skip(1)) // bỏ qua giá trị đầu tiên
.subscribe(console.log, null, () => console.log('complete')); // output: 2, 3, 4 --> complete
  VD2: nếu skip tất cả, nó sẽ complete
from([1, 2, 3, 4])
.pipe(skip(4)) // skip cả 4 giá trị
.subscribe(console.log, null, () => console.log('complete')); // output: complete

Use case: ví dụ 1 observable luôn phát ra giá trị mặc định lần đầu hoặc 1 số lần mà muốn bỏ qua nó thì dùng skip

distinct: chỉ emit các giá trị mà observable chưa từng emit, nếu không có giá trị nào nó sẽ complete

  from([ 'A', 'B', 'A' ])
.pipe(distinct())
.subscribe(console.log, null, () => console.log('complete')); // output: A, B -> complete

distinctUntilChanged: giá trị đang chuẩn bị emit sẽ được emit nếu nó khác với giá trị vừa được emit trước đấy

  from([1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4])
.pipe(distinctUntilChanged())
.subscribe(console.log, null, () => console.log('complete')); // output: 1, 2, 1, 2, 3, 4 -> complete

use case: nếu chỉ muốn search khi giá trị input khác với giá trị vừa search thì có thể dùng distinctUntilChanged

throttle(innerObservable : Observable): observable gốc emit giá trị đầu tiên, tiếp theo chờ tới khi innerObservable emit thì giá trị mà observable gốc emit mới được lấy, nếu giá trị chưa được lấy mà observable gốc lại emit ra thì giá trị emit trước sẽ bị bỏ qua, và vẫn chờ innerObservable emit (từ luồng cũ chứ không phải luồng mới mà observable gốc emit lần thứ 2 sinh ra) xong thì nó mới được lấy

  //emit giá trị sau mỗi 1 second
const source = interval(1000);
// chờ đến khi click thì giá trị cuối cùng source emit mới được emit ra
const example = source.pipe(throttle(val => fromEvent(document, 'click')));
//output: 0 (chạy thêm 3s rồi click) 4 (chạy thêm 4s rồi click) 9
example.subscribe(val => console.log(val));

throttleTime (time: number) - observable gốc emit giá trị đầu tiên, tiếp theo chờ trong 1 khoảng thời gian (time) thì giá trị emit mới được lấy, nếu giá trị chưa được lấy mà observable gốc lại emit ra thì giá trị emit trước sẽ bị bỏ qua, sau khi thời gian chờ hết thì giá trị emit mới nhất sẽ được lấy

  //emit giá trị sau mỗi 1 second
const source = interval(1000);
// bỏ qua 1 seconds, emit latest value
const example = source.pipe(throttleTime(1000));
//output: 0..2..4..6
const subscribe = example.subscribe(val => console.log(val));

use case với throttle/throttleTime

  • User click liên tục vào nút search để gọi API, ta không muốn gọi quá nhiều api như vậy, ta có thể set cho khoảng thời gian là 2s thì mới call lại api 1 lần
  • hoặc call api mỗi khi thay đổi text ở input, hoặc khi click vào 1 button thì call api, hover chuột để hiển thị hay làm gì đó
  • Dùng với ý định cho 1 thứ gì đó xảy ra theo chu kì

debounce(innerObservable : Observable): sau khi observable gốc emit ra, phải chờ innerObservable emit thì giá trị này mới được lấy, nếu giá trị chưa được lấy mà observable gốc lại emit ra thì giá trị emit trước sẽ bị bỏ qua và giá trị mới emit lại chờ tiếp innerObservable emit (luồng mới mà observable gốc emit lần thứ 2 sinh ra) thì mới được lấy

  //emit giá trị sau mỗi 1 second
const source = interval(1000);
// sau khi click thì source mới emit
const example = source.pipe(debounce(() => fromEvent(document, 'click')));
//output: click -- 0 ... (chạy 10s) - click - 10
const subscribe = example.subscribe(val => console.log(val));

debounceTime(time: number): sau khi observable gốc emit ra, phải chờ 1 khoảng thời gian (time) thì giá trị này mới được lấy, nếu giá trị chưa được lấy mà observable gốc lại emit ra thì giá trị emit trước sẽ bị bỏ qua và giá trị mới emit lại chờ 1 khoảng thời gia (time) thì mới được lấy

  //emit giá trị sau mỗi 1 second
const source = interval(1000);
const example = source.pipe(debounceTime(800));// chờ sau 800ms, giá trị emit cuối cùng cuả source sẽ được lấy
const subscribe = example.subscribe(val => console.log(val)); 0 1 2 ...

use case với debounce/debounceTime

  • Nếu chưa muốn thực hiện ngay action nào đấy mà dừng 1 khoảng thời gian nào đấy hay chờ cho xong hành động nào đó trước thì mới thực hiện hành động sau
  • Nếu muốn người dùng nhập text xong mới thực hiện search, filter, hoặc lấy tọa độ chuột sau khi ngừng click/hover ...
  • Dùng cho sự kiện resize window mà muốn lấy kích thước được resize cuối cùng

Khác nhau giữa debounce/debounceTime và throttle/throttleTime là:

  • debounce/debounceTime: mỗi khi observable gốc emit ra data thì nó sẽ đặt lại việc chờ - tức là cứ emit ra thì chờ 1 thời gian (hoặc đến khi observable bên trong debounce thực thi), nếu chưa chờ xong mà lại emit tiếp thì thời gian chờ sẽ tính lại từ 0 và giá trị từ lần emit trước sẽ bị bỏ qua.
  • throttle/throttleTime: mỗi khi observable gốc emit ra data, thời gian sẽ được tính tiếp (hoặc observable bên trong throttle thực thi) - tức là nếu emit data ra, chờ được 3s mà lại emit data mới thì thời gian chờ sẽ tính từ 3 và giá trị emit cũ cũng sẽ bị bỏ qua.

Combination Operators

“Là các operator sử dụng để kết hợp các Observables lại với nhau: forkJoin, combineLatest, zip, merge, concat …”.

forkJoin: nhận vào 1 array observable và thực thi cùng lúc, sau khi tất cả các observable khác hoàn thành, forkJoin sẽ emit ra 1 array chứa kết qủa của các observable đó, nếu 1 observable không complete thì forkJoin sẽ không complete, nếu có 1 observable có lỗi thì forkJoin bắn ra lỗi

  const example = forkJoin([of('Hello'), of('World')]).subscribe(console.log)
// output: ["Hello", "World"]
-- thường dùng khi call nhiều api cùng lúc

Use case: khi muốn call nhiều api cùng lúc

combineLatest: Nhận vào 1 array observable, khi tất cả các observable này commit ít nhất 1 lần thì combineLastest sẽ emit ra 1 array chứa kết quả của các observable đó, mỗi lần 1 observable emit kết quả ra thì combineLatest sẽ emit ra 1 array mới, chứa kết quả là các giá trị gần nhất mà các observable emit. Nếu 1 observable không complete thì combineLatest sẽ không complete, nếu có 1 observable có lỗi thì combineLatest bắn ra lỗi

  combineLatest([
interval(2000).pipe(map((x) => `First: ${x}`)), // {1}
interval(1000).pipe(map((x) => `Second: ${x}`)), // {2}
interval(3000).pipe(map((x) => `Third: ${x}`)), // {3}
]).subscribe(console.log);
// output
["First: 0", "Second: 2", "Third: 0"]
["First: 1", "Second: 2", "Third: 0"]
["First: 1", "Second: 3", "Third: 0"]
["First: 1", "Second: 4", "Third: 0"]
["First: 2", "Second: 4", "Third: 0"]
["First: 2", "Second: 5", "Third: 0"]
["First: 2", "Second: 5", "Third: 1"]

Giải thích như sau

  • Sau 1s => chỉ có observable second emit 0 (sau mỗi 1000 miliseconds = 1s), các observable khác chưa emit ra giá trị nào => combineLatest chưa emit
  • Sau 2s => observable first emit 0 (sau mỗi 2000 miliseconds = 2s emit 1 lần), observable seconds emit 1, observable third chưa emit => combineLatest chưa emit
  • Sau 3s => observable third emit, lúc này giá trị gần nhất observable first emit là 0 (vì mới chạy được 3s nên nó chưa emit lần thứ 2 - là giây thứ 4), observable seconds emit 2, observable third emit 0 => combineLatest emit ["First: 0", "Second: 2", "Third: 0"]
  • Sau 4s => observable first emit là 1 (đã 4s => emit lần 2), giá trị gần nhất observable second emit là 2 (tại sao observable second không emit tại thời điểm này dù đã chạy thêm 1s, tại vì observable first đã emit trước, nên combineLatest emit data ngay lập tức), giá trị gần nhất observable third emit là 0 => combineLatest emit ["First: 1", "Second: 2", "Third: 0"]
  • Vẫn ở giây thứ 4 => giá trị gần nhất observable first emit là 1, observable second emit 3 (lúc này observable second mới emit), giá trị gần nhất observable third emit là 0 => combineLatest emit ["First: 1", "Second: 3", "Third: 0"] ...

use case: Dùng khi paging với sự kiện thay đổi page và thay đổi số lượng recore trên 1 page, tìm kiếm bản ghi khi có nhiều selectbox ...

zip: truyền vào 1 list observable và trả về array kết quả, kết quả này được zip theo cặp

  • observable1 emit lần 1, observable2 emit lần 1, observabl3 emit lần 1 => zip emit data lần 1 - [result1 - obser1, result1 - obser2, result1 - obser3]
  • observable1 emit lần 2, observable2 emit lần 2, observabl3 emit lần 2 => zip emit data lần 2 - [result12 - obser1, result2 - obser2, result2 - obser3]
  • Nếu observable1 emit lần 3, observable2 emit lần 3, observabl3 không còn data để emit => không có gì được emit và sẽ complete

Use case: Nếu muốn lấy toạ độ chuột hoặc thời gian của người dùng từ lúc họ mousedown cho đến lúc họ mouseup

merge: nhận vào 1 list observable, theo dạng tham số không phải kiểu array, các observable này được chạy song song, có thể chỉ định cho bao nhiều observable được chạy cùng 1 lúc

  VD1: merge(of(1, 2, 3), of('das'), output of(7, 9)).subscribe(console.log);
// output - 1, 2, 3, 'das', 7, 9
// nếu có lỗi thì bắn ra lỗi, không complete nếu 1 trong số các observable không complete
merge(of(1, 2, 3), of('das'), throwError('er'), of(7, 9)).subscribe(res => console.log(res), errr => console.log(errr), () => console.log('complete'));
// output: 1, 2, 3, das, er
  VD2: // chỉ định bao nhiêu observable có thể chạy cùng lúc
merge(
    interval(3000),
    interval(3000),
    of(7, 9),
    2// => chỉ định số observable chạy cùng lúc
).subscribe(res => console.log(res), null, () => console.log('complete'));
// => 0, 0, 1, 1, 2, 2, 3, 3, 4, 4

Use case: sử dụng khi có 1 FormGroup và muốn lắng nghe vào từng FormControl.valueChanges để thực hiện 1 nghiệp vụ nào đó, không cần quan tâm thứ tự việc FormControl nào sẽ thay đổi, chỉ cần là nếu FormControl đó thay đổi thì sẽ xử lý

concat: nhận vào 1 list observable giống như merge, các observable được chạy tuần tự

  • Nếu observable trước emit và complete, concat() sẽ emit giá trị mà observable vừa emit rồi sẽ subscribe vào Observable kế tiếp.
  • Nếu observable trước error, concat() sẽ error ngay lặp tức và chuỗi Observable phía sau sẽ bị bỏ qua.
  • Nếu observable trước complete mà không emit, concat() sẽ bỏ qua và subscribe vào Observable kế tiếp
  • Nếu observable trước emit và không complete, concat() sẽ emit giá trị mà observable này emit NHƯNG sẽ không subscribe vào Observable kế tiếp vì observable không complete. VD: concat(of(4, 5, 6).pipe(delay(1000)), of(1, 2, 3)).subscribe(observer); // output: // sau 1s: // 4-5-6-1-2-3 // output: 'complete'

Error Handling Operators

"Là các operator dùng để xử lí lỗi".

catchError: bắt lỗi và trả về 1 observable - thường là trả về nội dung lỗi

  // tạo 1 observable bằng throwError
const source = throwError('This is an error!');
// handle error, returning observable with error message
const example = source.pipe(catchError(val => of(`I caught: ${val}`)));
//output: 'I caught: This is an error'
example.subscribe(val => console.log(val));

use case: ví dụ call nhiều api 1 lúc sẽ dùng forkjoin, nhưng nếu 1 api lỗi thì forkJoin sẽ bắn lỗi và không nhận được data từ các api không bị lỗi, vậy nếu muốn nhận data từ các api không bị lỗi thì có thể dùng catchError và xử lí logic cho nó

retry: dùng để subscribe lại observable, có thể dùng cho TH call lại API nếu bị lỗi

  return this.http.get<Config>(this.configUrl)
.pipe(retry(3)); // retry 3 lần tối đa lần nếu observable subscribe lỗi

use case: ví dụ TH call api bị lỗi (chỉ dùng khi get), ta muốn call lại, nhớ set số lần retry để tránh vòng lặp vô hạn

Multicasting

Có thể hiểu về unicast và multicast như sau

  • Unicast: mỗi khi observable emit 1 giá trị, chỉ 1 thằng nhận được giá trị đấy ( ví dụ người A nói chuyện với người B => A là observable, A emit ra lời nói thì chỉ có B (observer) nghe thấy ).
  • Multicast: mỗi khi observable emit 1 giá trị, nhiều thằng sẽ nhận được giá trị đấy (ví dụ người A phát biểu tại hội nghị => A là observabe, A emit ra lời phát biểu thì cả hội nghị (các observer) nghe thấy). Dưới đây sẽ tìm hiểu các đối tượng để multicast.

Subject: là một observable đặc biệt có khả năng thực hiện việc gửi dữ liệu đến nhiều Observers

  const subject = new Subject();
const observableA = subject.subscribe({
next: (v) => console.log('observerA: ' + v)});
const observableB = subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
});
subject.next(15);
subject.next(18);
//output - observerA: 15 observerB: 15 observerA: 18 observerB: 18
=> mỗi lần subject.next thì data đươc gửi đến observableA và observableB => đây chính là multicast

BehaviorSubject: giống như Subject nhưng có thể lưu lại giá trị gần nhất đã emit (chỉ 1 giá trị), khi khởi tạo cần tryền 1 giá trị mặc định

  VD1: Khi dùng Subject
const subject = new Subject();
subject.next(1);
subject.subscribe({
    next: v => console.log("observerA: " + v)
});
=> output không có gì cả vì sau khi next(1), giá trị 1 không được Subject lưu lại
  Khi dùng BehaviorSubject
const subject = new BehaviorSubject(0); => 0 là giá trị mặc định
subject.next(1);
subject.subscribe({
    next: v => console.log("observerA: " + v)
});
=> output: observerA: 1 (vì khi emit - subject.next(1) thì BehaviorSubject đã lưu lại giá trị này)

ReplaySubject: Tương tự như BehaviorSubject nhưng có thể lưu trữ nhiều giá trị

  const subject = new ReplaySubject(2); => lưu trữ 2 giá trị được emit gần nhất
subject.next(1);
subject.next(5);
subject.next(3);
subject.subscribe({
    next: v => console.log("observerA: " + v)
});
=> output: observerA: 5, observerA: 3

AsyncSubject: Chỉ emit giá trị cuối cùng, điều kiện là observable phải complete thì nó mới emit

Use case các loại subject: khi có dữ liệu mới mà muốn update cho nhiều component, có thể dùng coi subject là 1 kho lưu trữ, tức là dùng để chia sẻ dữ liệu với nhau

Higher Order Observables

“Là những operators mà sẽ nhận vào Outer Observable và sẽ trả về một Inner Observable: switchMap, mergeMap, concatMap …”.

"Mỗi khi Outer Observable emit thì sẽ có 1 stream mới được sinh ra"

switchMap: Chỉ cho phép 1 stream được chạy, nếu stream trước chưa complete thì unsubscribe

  fromEvent(document, 'click').pipe(
    switchMap(() => interval(1000).pipe(take(5)))
).subscribe(console.log);

Giải thích

  • fromEvent(document, 'click') là Outer Observable
  • interval(1000) là Inner Observable
  • khi click => Outer Observable emit => stream mới được tạo ra (interval(1000)) và nó sẽ emit 5 giá trị 0 1 2 3 4, nếu mới emit đến 2 mà Outer Observable emit lần 2 (click tiếp) => stream này sẽ bị unsubscribe (các giá trị 3, 4 của stream này sẽ không được emit ), stream mới được tạo ra và nó sẽ subscribe và emit 0 1 2 3 4
VD: Nếu nhập 'ABC' => call API (1), nhập tiếp 'ABCDE' => data truyền lên sẽ là data mới ('ABCDE' chứ không phải 'ABC'), lúc này không cần data của request (1) nữa, có thể dùng switchMap để hủy request cũ (1) đi.

Use case: dùng khi mà có action mới thì hủy bỏ action cũ đi: ví dụ khi nhập data/ chọn checkbox ... nói chung là thay đổi data trên form để search, filter thì sẽ call api => ta có thể dùng switchMap để khi có event form change data thì ta sẽ hủy lần call api trước đi và call api với dữ liệu mới, chỉ nên dùng với get

mergeMap: Có thể chạy nhiều stream cùng 1 lúc

  fromEvent(document, 'click').pipe( mergeMap(() => interval(1000).pipe(take(10)), 2) // 2 là cho phép 2 stream chạy cùng lúc
).subscribe(console.log);

Giải thích Khi click => Outer Observable emit => stream được tạo ra (interval(1000)) và stream này sẽ emit 5 giá trị 0 1 2 3 4, nếu mới emit đến 3 mà Outer Observable emit lần 2 (click tiếp) => lúc này lại có 1 stream mới được tạo ra và nó sẽ chạy song song với stream thứ nhất // output 0 1 2 3 0 (stream thứ 2 được tạo) 4 (stream thứ nhất kết thúc) 1 2 3 4

Use case: dùng với mục đích là không cần hủy cái cũ và có thể cho nhiều observable chạy đồng thời, hoặc muốn control số lượng observable chạy đồng thời - ví dụ upload file lên server dạng chạy song song, nhưng có thể chỉ cho 1 số lượng nào đấy upload lên cùng 1 lúc để hiệu năng tốt hơn, chú ý unsubscribe

concatMap: Chỉ cho phép 1 stream được chạy, các stream khác phải đợi stream trước complete mới được chạy

  fromEvent(document, 'click')
    .pipe(concatMap(() => interval(1000).pipe(take(5))))
.subscribe(console.log);

Giải thích Khi click => Outer Observable emit => stream mới được tạo - interval(1000) và stream này sẽ emit 5 giá trị 0 1 2 3 4, nếu mới emit đến 2 mà Outer Observable emit lần 2 (click tiếp) => lúc này lại có 1 stream mới được tạo ra và nó sẽ đợi cho đến khi stream trước complete thì mới emit ra // output 0 1 2 (click => stream mới được tạo, nhưng phải đợi stream thứ nhất kết thúc thì stream này mới emit ) 3 4 (stream thứ nhất kết thúc) 0 (stream mới bắt đầu emit) 1 2 3 4

Use case: dùng khi muốn thực hiện các công việc theo thứ tự, ví dụ upload nhiều file nhưng muốn upload tuần tự, chú ý unsubscribe