npm version

Lightweight reactive extension to official Node.js MongoDB driver.

Features

  • ObjectId mapping. Automatically converts the _id field from the ObjectId to a string.
  • ️️Reactive. Fires events as a document created, updated, or deleted from the database;
  • CUD operations timestamps. Automatically sets createdOn, updatedOn, and deletedOn timestamps for CUD operations;
  • Schema validation. Validates your data before saving;
  • Paging. Implements high-level paging API;
  • Soft delete. By default, documents don’t remove from the collection, but are marked with the deletedOn field;
  • Extendable. API is easily extendable, you can add new methods or override existing ones;
  • Outbox support. node-mongo can create collections with _outbox postfix that stores all CUD events for implementing the transactional outbox pattern;

The following example shows some of these features:

import { eventBus, InMemoryEvent } from '@paralect/node-mongo';

await userService.updateOne(
  { _id: '62670b6204f1aab85e5033dc' },
  (doc) => ({ firstName: 'Mark' }),
);

eventBus.onUpdated('users', ['firstName', 'lastName'], async (data: InMemoryEvent<User>) => {
  await userService.atomic.updateOne(
    { _id: data.doc._id },
    { $set: { fullName: `${data.doc.firstName} ${data.doc.lastName}` } },
  );
});

Installation

npm i @paralect/node-mongo

Connect to Database

Usually, you need to define a file called db that does two things:

  1. Creates database instance and connects to the database;
  2. Exposes factory method createService to create different Services to work with MongoDB;
db.ts
import { Database, Service, ServiceOptions, IDocument } from '@paralect/node-mongo';

import config from 'config';

const database = new Database(config.mongo.connection, config.mongo.dbName);
database.connect();

class CustomService<T extends IDocument> extends Service<T> {
  // You can add new methods or override existing here
}

function createService<T extends IDocument>(collectionName: string, options: ServiceOptions = {}) {
  return new CustomService<T>(collectionName, database, options);
}

export default {
  database,
  createService,
};

Services

Service is a collection wrapper that adds all node-mongo features. Under the hood it uses Node.js MongoDB native methods.

createService method returns the service instance. It accepts two parameters: collection name and ServiceOptions.

user.service.ts
import { z } from 'zod';

import db from 'db';

const schema = z.object({
  _id: z.string(),
  createdOn: z.date().optional(),
  updatedOn: z.date().optional(),
  deletedOn: z.date().optional().nullable(),
  fullName: z.string(),
}).strict();

type User = z.infer<typeof schema>;

const service = db.createService<User>('users', {
  schemaValidator: (obj) => schema.parseAsync(obj),
});

export default service;
update-user.ts
import userService from 'user.service';

await userService.insertOne({ fullName: 'Max' });

Schema validation

Node-mongo supports any schema library, but we recommend Zod, due to this ability to generate TypeScript types from the schemas.

Zod

const schema = z.object({
  _id: z.string(),
  createdOn: z.date().optional(),
  updatedOn: z.date().optional(),
  deletedOn: z.date().optional().nullable(),
  fullName: z.string(),
});

type User = z.infer<typeof schema>;

const service = createService<User>('users', {
  schemaValidator: (obj) => schema.parseAsync(obj),
});

Joi

const schema = Joi.object({
  _id: Joi.string().required(),
  createdOn: Joi.date(),
  updatedOn: Joi.date(),
  deletedOn: Joi.date().allow(null),
  fullName: Joi.string().required(),
});

type User = {
  _id: string;
  createdOn?: Date;
  updatedOn?: Date;
  deletedOn?: Date | null;
  fullName: string;
};

const service = createService<User>('users', {
  schemaValidator: (obj) => schema.validateAsync(obj),
});

Node-mongo validates documents before save.

Reactivity

The key feature of the node-mongo is that each create, update or delete operation publishes a CUD event.

  • ${collectionName}.created
  • ${collectionName}.updated
  • ${collectionName}.deleted

Events are used to easily update denormalized data and also to implement complex business logic without tight coupling of different entities.

SDK support two type of events:

In-memory events

  • Enabled by default;
  • Events can be lost on service failure;
  • Events are stored in eventBus (Node.js EventEmitter instance);
  • For handling these events type you will use Events API;
  • Designed for transferring events inside a single Node.js process. Events handlers listens node-mongo eventBus.

Transactional events

  • Can be enabled by setting { outbox: true } when creating a service;
  • Guarantee that every database write will produce an event;
  • Events are stored in special collections with _outbox postfix;
  • For handling these events type you will use watch (method for working with Change Streams) on the outbox table;
  • Designed for transferring events to messages broker like Kafka. Events handlers should listen to message broker events (You need to implement this layer yourself).

On the project start, we recommend using in-memory events. When your application becomes tougher you should migrate to transactional events.

Options and Types

ServiceOptions

interface ServiceOptions {
  skipDeletedOnDocs?: boolean,
  schemaValidator?: (obj: any) => Promise<any>,
  publishEvents?: boolean,
  addCreatedOnField?: boolean,
  addUpdatedOnField?: boolean,
  outbox?: boolean,
  collectionOptions?: CollectionOptions;
  collectionCreateOptions?: CreateCollectionOptions;
}
OptionDescriptionDefault value
skipDeletedOnDocsSkip documents with the deletedOn fieldtrue
schemaValidatorValidation function that will be called on data save-
publishEventsPublish CUD events on save.true
addCreatedOnFieldSet the createdOn field to the current timestamp on document creation.true
addUpdatedOnFieldSet updateOne field to the current timestamp on the document update.true
outboxUse transactional events instead of in-memory eventsfalse
escapeRegExpEscape $regex values to prevent special characters from being interpreted as patterns.false
collectionOptionsMongoDB CollectionOptions{}
collectionCreateOptionsMongoDB CreateCollectionOptions{}

CreateConfig

Overrides ServiceOptions parameters for create operations.

type CreateConfig = {
  validateSchema?: boolean,
  publishEvents?: boolean,
};

ReadConfig

Overrides ServiceOptions parameters for read operations.

type ReadConfig = {
  skipDeletedOnDocs?: boolean,
};

UpdateConfig

Overrides ServiceOptions parameters for update operations.

type UpdateConfig = {
  skipDeletedOnDocs?: boolean,
  validateSchema?: boolean,
  publishEvents?: boolean,
};

DeleteConfig

Overrides ServiceOptions parameters for delete operations.

type DeleteConfig = {
  skipDeletedOnDocs?: boolean,
  publishEvents?: boolean,
};

InMemoryEvent

type InMemoryEvent<T = any> = {
  doc: T,
  prevDoc?: T,
  name: string,
  createdOn: Date
};

InMemoryEventHandler

type InMemoryEventHandler = (evt: InMemoryEvent) => Promise<void> | void;

OnUpdatedProperties

type OnUpdatedProperties = Array<Record<string, unknown> | string>;

Extending API

Extending API for a single service.

const service = db.createService<User>('users', {
  schemaValidator: (obj) => schema.parseAsync(obj),
});

const privateFields = [
  'passwordHash',
  'signupToken',
  'resetPasswordToken',
];

const getPublic = (user: User | null) => _.omit(user, privateFields);

export default Object.assign(service, {
  updateLastRequest,
  getPublic,
});

Extending API for all services.

const database = new Database(config.mongo.connection, config.mongo.dbName);

class CustomService<T extends IDocument> extends Service<T> {
  createOrUpdate = async (query: any, updateCallback: (item?: T) => Partial<T>) => {
    const docExists = await this.exists(query);

    if (!docExists) {
      const newDoc = updateCallback();
      return this.insertOne(newDoc);
    }

    return this.updateOne(query, (doc) => updateCallback(doc));
  };
}

function createService<T extends IDocument>(collectionName: string, options: ServiceOptions = {}) {
  return new CustomService<T>(collectionName, database, options);
}

const userService = createService<UserType>('users', {
  schemaValidator: (obj) => schema.parseAsync(obj),
});

await userService.createOrUpdate(
  { _id: 'some-id' },
  () => ({ fullName: 'Max' }),
);