Dev Notes Blog

Angular RxJS Interop

10th August 2024
Angular
angular
signals
Last updated:6th September 2025
8 Minutes
1516 Words

Angular’s reactive programming landscape has been significantly enriched with the introduction of the @angular/core/rxjs-interop package. This package provides a range of utilities designed to bridge the gap between Angular Signals and RxJS Observables, making it easier to handle reactive data streams throughout your application.

toSignal

The toSignal function is a powerful utility that allows you to create a Signal that tracks the value of an Observable. Similar to Angular’s async pipe, toSignal subscribes to the Observable immediately, which may trigger side effects. However, unlike the async pipe, toSignal can be used anywhere in your application, offering greater flexibility.

One of the key advantages of toSignal is its automatic subscription management. When you use toSignal, the subscription it creates is automatically cleaned up when the component or service that called it is destroyed. This ensures that your application remains efficient and free of memory leaks.

signal.component.ts
1
import { AsyncPipe, JsonPipe } from '@angular/common';
2
import { Component } from '@angular/core';
3
import { toSignal } from '@angular/core/rxjs-interop';
4
import { fromEvent, map } from 'rxjs';
5
6
@Component({
7
selector: 'app-signal',
8
standalone: true,
9
imports: [AsyncPipe, JsonPipe],
10
template: `
11
<div>Click Observable: {{ clickEvent$ | async | json }}</div>
12
<div>Click Signal: {{ clickSignal() | json }}</div>
13
`,
14
})
15
export class SignalComponent {
12 collapsed lines
16
clickEvent$ = fromEvent(window, 'click').pipe(
17
map((click) => {
18
const pointerEvent: PointerEvent = click as PointerEvent;
19
return {
20
x: pointerEvent.clientX,
21
y: pointerEvent.clientY,
22
};
23
}),
24
);
25
26
clickSignal = toSignal(this.clickEvent$);
27
}

initialValue

  • Purpose: Sets the initial value for the Signal created by toSignal.
  • Details: This value will be used by the Signal until the Observable emits its first value, helping to avoid undefined states.
1
counterObservable = interval(1000);
2
counter = toSignal(this.counterObservable, { initialValue: 0 });

requireSync

  • Purpose: Ensures that the Observable emits a value immediately upon subscription.
  • Details: If set to true, toSignal expects the Observable to emit synchronously. This option eliminates the need for an initialValue, but will throw a runtime error if the Observable doesn’t emit immediately.
1
counter = new BehaviorSubject(1);
2
counter$ = this.counter.asObservable();
3
counterSignal = toSignal(this.counter$, { requireSync: true });

injector

  • Purpose: Provides the Injector to supply the DestroyRef needed for subscription cleanup.
  • Details: If not provided, DestroyRef is automatically retrieved from the current injection context, ensuring that the Observable subscription is properly managed.
signal.component.ts
1
import { AsyncPipe } from '@angular/common';
2
import { Component, effect, inject, Injector, OnInit, Signal } from '@angular/core';
3
import { toSignal } from '@angular/core/rxjs-interop';
4
import { interval } from 'rxjs';
5
6
@Component({
7
selector: 'app-signal',
8
standalone: true,
9
imports: [AsyncPipe],
10
template: `
11
<div>Couter Observable: {{ counter$ | async }}</div>
12
<div>Couter Signal: {{ counter() }}</div>
13
`,
14
})
15
export class SignalComponent implements OnInit {
9 collapsed lines
16
private injector = inject(Injector);
17
18
counter$ = interval(1000);
19
counter!: Signal<number | undefined>;
20
21
ngOnInit(): void {
22
this.counter = toSignal(this.counter$, { injector: this.injector });
23
}
24
}

manualCleanup

  • Purpose: Controls whether the subscription should be cleaned up automatically or manually.
  • Details: When set to true, the subscription persists until the Observable completes, bypassing automatic cleanup through DestroyRef.

rejectErrors

  • Purpose: Determines how to handle errors from the Observable.
  • Details: If enabled, toSignal will throw errors from the Observable back to RxJS, resulting in uncaught exceptions. The Signal will continue to return the last good value, similar to the behavior of the async pipe.
signal.component.ts
1
import { AsyncPipe } from '@angular/common';
2
import { Component } from '@angular/core';
3
import { toSignal } from '@angular/core/rxjs-interop';
4
import { interval, map } from 'rxjs';
5
6
@Component({
7
selector: 'app-signal',
8
standalone: true,
9
imports: [AsyncPipe],
10
template: `
11
<div>Signal {{ counter() }}</div>
12
<div>Observable {{ counterObservable$ | async }}</div>
13
<button (click)="logSignal()">Log</button>
14
`,
15
})
19 collapsed lines
16
export class SignalComponent {
17
counterObservable$ = interval(1000).pipe(
18
map((value) => {
19
if (value > 10) {
20
throw new Error('Ups');
21
}
22
return value;
23
}),
24
);
25
26
counter = toSignal(this.counterObservable$, {
27
initialValue: 0,
28
rejectErrors: true,
29
});
30
31
logSignal() {
32
console.log({ signal: this.counter() }); // -> 10
33
}
34
}

equal

  • Purpose: Defines how equality is determined for values emitted by the Observable.
  • Details: This option lets you specify a custom comparison function, ensuring that only significant changes are reflected in the Signal. Comparisons are also made against the initialValue, if provided.

toObservable

The toObservable utility converts a Signal into an Observable, enabling you to react to Signal changes using RxJS operators. This conversion is particularly useful for integrating Angular’s reactivity model with the broader ecosystem of RxJS-based data handling.

1
counter = signal(0);
2
counter$ = toObservable(this.counter);

How It Works

  • Effect-Driven Tracking: toObservable uses an effect to monitor changes in the Signal. These changes are stored in a ReplaySubject, which ensures that the latest value is emitted when an Observable subscribes.
  • Timing Considerations: The first value from the Signal may be emitted synchronously, but all subsequent values are emitted asynchronously. This ensures that even if the Signal is updated multiple times in quick succession, toObservable will only emit the final, stabilized value.
  • Synchronous vs. Asynchronous: Unlike Observables, Signals do not provide immediate notifications of changes. This means that even if you update a Signal’s value multiple times rapidly, toObservable will only emit once, after the Signal stabilizes.
signal.component.ts
1
import { AsyncPipe } from '@angular/common';
2
import { Component, OnInit, signal } from '@angular/core';
3
import { toObservable } from '@angular/core/rxjs-interop';
4
5
@Component({
6
selector: 'app-signal',
7
standalone: true,
8
imports: [AsyncPipe],
9
template: `
10
<div>Couter Observable: {{ counter$ | async }}</div>
11
<div>Couter Signal: {{ counter() }}</div>
12
`,
13
})
14
export class SignalComponent implements OnInit {
15
counter = signal(42);
9 collapsed lines
16
counter$ = toObservable(this.counter);
17
18
ngOnInit(): void {
19
this.counter.set(1);
20
this.counter.set(2);
21
this.counter.set(3);
22
//Output -> Only the final value (3) will be emitted
23
}
24
}

In this example, although the Signal’s value changes multiple times, toObservable only emits the final stabilized value, ensuring efficient and predictable data flow.

Injection Context

toObservable typically requires an injection context, such as within the construction of a component or service. If an injection context is unavailable, you can manually specify an Injector to use instead.

signal.component.ts
1
import { AsyncPipe } from '@angular/common';
2
import { Component, effect, inject, Injector, OnInit, Signal } from '@angular/core';
3
import { toObservable, toSignal } from '@angular/core/rxjs-interop';
4
import { interval, Observable } from 'rxjs';
5
6
@Component({
7
selector: 'app-signal',
8
standalone: true,
9
imports: [AsyncPipe],
10
template: `
11
<div>Couter Observable: {{ counter$ | async }}</div>
12
<div>Couter2 Observable: {{ newCounter$ | async }}</div>
13
<div>Couter Signal: {{ counter() }}</div>
14
`,
15
})
12 collapsed lines
16
export class Signal2Component implements OnInit {
17
private injector = inject(Injector);
18
19
counter$ = interval(1000);
20
counter!: Signal<number | undefined>;
21
newCounter$!: Observable<number | undefined>;
22
23
ngOnInit(): void {
24
this.counter = toSignal(this.counter$, { injector: this.injector });
25
this.newCounter$ = toObservable(this.counter, { injector: this.injector });
26
}
27
}

outputFromObservable

The outputFromObservable function allows you to declare an Angular output that uses an RxJS Observable as its source. This is particularly useful when you want to emit events to parent components using an Observable. The behavior is straightforward: new values from the Observable are forwarded to the Angular output, errors need to be handled manually, and the output stops emitting when the Observable completes.

child.component.ts
1
import { Component } from '@angular/core';
2
import { outputFromObservable } from '@angular/core/rxjs-interop';
3
import { interval, Subject } from 'rxjs';
4
5
@Component({
6
selector: 'app-child',
7
standalone: true,
8
template: `<button (click)="change()">Change Name</button>`,
9
})
10
export class ChildComponent {
11
nameSubject = new Subject<string>();
12
nameChange$ = this.nameSubject.asObservable();
13
nameChange = outputFromObservable(this.nameChange$);
14
15
intervalChange = outputFromObservable(interval(1000));
5 collapsed lines
16
17
change() {
18
this.nameSubject.next('Andrés');
19
}
20
}
1
<app-child (nameChange)="nameChanged($event)" (intervalChange)="logInterval($event)" />

outputToObservable

The outputToObservable function converts an Angular output into an Observable. This allows you to subscribe to the output using RxJS and respond to events in a reactive manner.

1
import { AsyncPipe } from '@angular/common';
2
import { Component, OnInit, output, viewChild } from '@angular/core';
3
import { outputFromObservable, outputToObservable } from '@angular/core/rxjs-interop';
4
import { interval, Observable } from 'rxjs';
5
6
@Component({
7
selector: 'app-interval',
8
standalone: true,
9
template: `<label>Interval</label> <button (click)="changeName()">Change Name</button>`,
10
})
11
export class IntervalComponent {
12
intervalChange = outputFromObservable(interval(1000));
13
nameChange = output<string>();
14
15
changeName() {
24 collapsed lines
16
this.nameChange.emit('Andrés');
17
}
18
}
19
20
@Component({
21
selector: 'app-root-output',
22
standalone: true,
23
imports: [IntervalComponent, AsyncPipe],
24
template: ` <div>
25
<app-interval />
26
<h1>IntervalChange {{ logInterval$ | async }}</h1>
27
<h1>IntervalChange {{ nameChanged$ | async }}</h1>
28
</div>`,
29
})
30
export class AppOutputComponent implements OnInit {
31
childComponent = viewChild.required(IntervalComponent);
32
logInterval$!: Observable<number>;
33
nameChanged$!: Observable<string>;
34
35
ngOnInit(): void {
36
this.logInterval$ = outputToObservable(this.childComponent().intervalChange);
37
this.nameChanged$ = outputToObservable(this.childComponent().nameChange);
38
}
39
}

takeUntilDestroyed

The takeUntilDestroyed operator is a convenient way to automatically complete an Observable when the component, directive, or service using it is destroyed. This is essential for preventing memory leaks and ensuring that your Observables don’t continue running after the context they are tied to has been destroyed.

take-until-destroyed.component.ts
1
import { Component, OnInit } from '@angular/core';
2
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
3
import { interval } from 'rxjs';
4
5
@Component({
6
selector: 'app-take-until',
7
standalone: true,
8
template: ``,
9
})
10
export class TakeUntilDestroyedComponent implements OnInit {
11
ngOnInit(): void {
12
interval(1000)
13
.pipe(takeUntilDestroyed())
14
.subscribe((value) => console.log(value));
15
}
1 collapsed line
16
}

DestroyRef

take-until-destroyed.component.ts
1
import { AsyncPipe } from '@angular/common';
2
import { Component, DestroyRef, effect, inject, signal, viewChild } from '@angular/core';
3
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
4
import { interval } from 'rxjs';
5
6
@Component({
7
selector: 'app-child-interval',
8
standalone: true,
9
template: `<label>Child Interval</label>`,
10
})
11
export class ChildIntervalComponent {
12
destroyRef = inject(DestroyRef);
13
}
14
15
@Component({
28 collapsed lines
16
selector: 'app-parent-interval',
17
standalone: true,
18
imports: [ChildIntervalComponent, AsyncPipe],
19
template: ` <div>
20
@if (visible()) {
21
<app-child-interval />
22
}
23
<br />
24
<button (click)="hideOrShow()">Destroy Child</button>
25
</div>`,
26
})
27
export class ParentIntevalComponent {
28
childComponent = viewChild(ChildIntervalComponent);
29
visible = signal(true);
30
31
intervalEffect = effect(() => {
32
const child = this.childComponent();
33
if (child) {
34
interval(1000)
35
.pipe(takeUntilDestroyed(child.destroyRef))
36
.subscribe((value) => console.log('Parent', value));
37
}
38
});
39
40
hideOrShow() {
41
this.visible.update((visible) => !visible);
42
}
43
}

In this code, we use the takeUntilDestroyed operator and pass the destroyRef from the child component. This means that when the child component is destroyed, the interval will automatically stop running.

Article title:Angular RxJS Interop
Article author:Andrés Arias
Release time:10th August 2024