Sandbox - Docs - Twitter - YouTube
Memphis is a next-generation message broker.
A simple, robust, and durable cloud-native message broker wrapped with
an entire ecosystem that enables fast and reliable development of next-generation event-driven use cases.
Memphis enables building modern applications that require large volumes of streamed and enriched data,
modern protocols, zero ops, rapid development, extreme cost reduction,
and a significantly lower amount of dev time for data-oriented developers and data engineers.
$ npm install memphis-dev
for javascript, you can choose to use the import or required keyword
const memphis = require('memphis-dev');
for Typescript, use the import keyword to aid for typechecking assistance
import memphis from 'memphis-dev';
import type { Memphis } from 'memphis-dev/types';
To leverage Nestjs dependency injection feature
import { Module } from '@nestjs/common';
import { MemphisModule, MemphisService } from 'memphis-dev/nest';
import type { Memphis } from 'memphis-dev/types';
First, we need to connect with Memphis by using memphis.connect
.
/* Javascript and typescript project */
await memphis.connect({
host: "<memphis-host>",
port: <port>, // defaults to 6666
username: "<username>", // (root/application type user)
connectionToken: "<broker-token>", // you will get it on application type user creation
reconnect: true, // defaults to true
maxReconnect: 3, // defaults to 3
reconnectIntervalMs: 1500, // defaults to 1500
timeoutMs: 1500, // defaults to 1500
// for TLS connection:
keyFile: '<key-client.pem>',
certFile: '<cert-client.pem>',
caFile: '<rootCA.pem>'
});
Nest injection
@Module({
imports: [MemphisModule.register()],
})
class ConsumerModule {
constructor(private memphis: MemphisService) {}
startConnection() {
(async function () {
let memphisConnection: Memphis;
try {
memphisConnection = await this.memphis.connect({
host: "<memphis-host>",
username: "<application type username>",
connectionToken: "<broker-token>",
});
} catch (ex) {
console.log(ex);
memphisConnection.close();
}
})();
}
}
Once connected, the entire functionalities offered by Memphis are available.
To disconnect from Memphis, call close()
on the memphis object.
memphisConnection.close();
If a station already exists nothing happens, the new configuration will not be applied
const station = await memphis.station({
name: '<station-name>',
schemaName: '<schema-name>',
retentionType: memphis.retentionTypes.MAX_MESSAGE_AGE_SECONDS, // defaults to memphis.retentionTypes.MAX_MESSAGE_AGE_SECONDS
retentionValue: 604800, // defaults to 604800
storageType: memphis.storageTypes.DISK, // defaults to memphis.storageTypes.DISK
replicas: 1, // defaults to 1
idempotencyWindowMs: 0, // defaults to 120000
sendPoisonMsgToDls: true, // defaults to true
sendSchemaFailedMsgToDls: true // defaults to true
});
Creating a station with Nestjs dependency injection
@Module({
imports: [MemphisModule.register()],
})
class stationModule {
constructor(private memphis: MemphisService) { }
createStation() {
(async function () {
const station = await this.memphis.station({
name: "<station-name>",
schemaName: "<schema-name>",
retentionType: memphis.retentionTypes.MAX_MESSAGE_AGE_SECONDS, // defaults to memphis.retentionTypes.MAX_MESSAGE_AGE_SECONDS
retentionValue: 604800, // defaults to 604800
storageType: memphis.storageTypes.DISK, // defaults to memphis.storageTypes.DISK
replicas: 1, // defaults to 1
idempotencyWindowMs: 0, // defaults to 120000
sendPoisonMsgToDls: true, // defaults to true
sendSchemaFailedMsgToDls: true // defaults to true
});
})();
}
}
Memphis currently supports the following types of retention:
memphis.retentionTypes.MAX_MESSAGE_AGE_SECONDS;
Means that every message persists for the value set in retention value field (in seconds)
memphis.retentionTypes.MESSAGES;
Means that after max amount of saved messages (set in retention value), the oldest messages will be deleted
memphis.retentionTypes.BYTES;
Means that after max amount of saved bytes (set in retention value), the oldest messages will be deleted
Memphis currently supports the following types of messages storage:
memphis.storageTypes.DISK;
Means that messages persist on disk
memphis.storageTypes.MEMORY;
Means that messages persist on the main memory
Destroying a station will remove all its resources (producers/consumers)
await station.destroy();
await memphisConnection.attachSchema({ name: '<schema-name>', stationName: '<station-name>' });
await memphisConnection.detachSchema({ stationName: '<station-name>' });
The most common client operations are produce
to send messages and consume
to
receive messages.
Messages are published to a station and consumed from it by creating a consumer. Consumers are pull based and consume all the messages in a station unless you are using a consumers group, in this case messages are spread across all members in this group.
Memphis messages are payload agnostic. Payloads are Uint8Arrays
.
In order to stop getting messages, you have to call consumer.destroy()
. Destroy will terminate regardless
of whether there are messages in flight for the client.
const producer = await memphisConnection.producer({
stationName: '<station-name>',
producerName: '<producer-name>',
genUniqueSuffix: false
});
Creating producers with nestjs dependecy injection
@Module({
imports: [MemphisModule.register()],
})
class ProducerModule {
constructor(private memphis: MemphisService) { }
createProducer() {
(async function () {
const producer = await memphisConnection.producer({
stationName: "<station-name>",
producerName: "<producer-name>"
});
})();
}
}
await producer.produce({
message: '<bytes array>/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
ackWaitSec: 15 // defaults to 15
});
const headers = memphis.headers();
headers.add('<key>', '<value>');
await producer.produce({
message: '<bytes array>/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
headers: headers // defults to empty
});
Meaning your application won't wait for broker acknowledgement - use only in case you are tolerant for data loss
await producer.produce({
message: '<bytes array>/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
ackWaitSec: 15, // defaults to 15
asyncProduce: true // defaults to false
});
Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id
await producer.produce({
message: '<bytes array>/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
ackWaitSec: 15, // defaults to 15
msgId: 'fdfd' // defaults to null
});
await producer.destroy();
const consumer = await memphisConnection.consumer({
stationName: '<station-name>',
consumerName: '<consumer-name>',
consumerGroup: '<group-name>', // defaults to the consumer name.
pullIntervalMs: 1000, // defaults to 1000
batchSize: 10, // defaults to 10
batchMaxTimeToWaitMs: 5000, // defaults to 5000
maxAckTimeMs: 30000, // defaults to 30000
maxMsgDeliveries: 10, // defaults to 10
genUniqueSuffix: false,
startConsumeFromSequence: 1, // start consuming from a specific sequence. defaults to 1
lastMessages: -1 // consume the last N messages, defaults to -1 (all messages in the station)
});
To set Up connection in nestjs
import { MemphisServer } from 'memphis-dev/nest'
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
strategy: new MemphisServer({
host: '<memphis-host>',
username: '<application type username>',
connectionToken: '<broker-token>'
}),
},
);
await app.listen();
}
bootstrap();
To Consume messages in nestjs
export class Controller {
import { consumeMessage } from 'memphis-dev/nest';
import { Message } from 'memphis-dev/types';
@consumeMessage({
stationName: '<station-name>',
consumerName: '<consumer-name>',
consumerGroup: ''
})
async messageHandler(message: Message) {
console.log(message.getData().toString());
message.ack();
}
}
consumer.on('message', (message) => {
// processing
console.log(message.getData());
message.ack();
});
Acknowledge a message indicates the Memphis server to not re-send the same message again to the same consumer / consumers group
message.ack();
Get headers per message
headers = message.getHeaders();
Get message sequence number
sequenceNumber = message.getSequenceNumber();
consumer.on('error', (error) => {
// error handling
});
await consumer.destroy();