ZeroMQ pull-push pattern for Node JS
The article emphasizes ZeroMQ's flexibility for messaging in Node.js, highlighting the pull-push pattern ideal for high-volume distributed systems.
Daniel Gustaw
• 3 min read
ZeroMQ is a messaging library that enables fast and efficient communication between distributed systems. With ZeroMQ, developers can easily build complex messaging systems that can handle high volumes of data and are resilient to network failures. In Node.js, the ZeroMQ library can be used to create messaging endpoints and send and receive messages between them.
In this article, we will explore how to use the ZeroMQ library in Node.js by examining a code snippet that demonstrates how to send and receive messages using the ZeroMQ Push
and Pull
sockets. The code shows how to create a push
function that sends a message to a Pull
socket, and a pull
function that receives messages from a Push
socket. We will examine the various components of the code and explain how they work together to create a reliable messaging system. We will also explore some best practices for using ZeroMQ in Node.js and discuss some common use cases for the library.
The built-in core ZeroMQ patterns are:
- Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
- Pub-sub, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
- Pipeline, which connects nodes in a fan-out/fan-in pattern that can have multiple steps and loops. This is a parallel task distribution and collection pattern.
- Exclusive pair, which connects two sockets exclusively. This is a pattern for connecting two threads in a process, not to be confused with “normal” pairs of sockets.
We are interested strictly in pipeline
. Our goal is write code from wich I can import and export pull
and push
functions to easy use them later without worrying if socket is open or maintaining socket in context or global state.
import {Push, Pull, MessageLike} from "zeromq";
// Import the required ZeroMQ sockets and types
const sleep = (time: number): Promise<void> => new Promise((resolve) => setTimeout(resolve, time))
// Define a sleep function that returns a Promise that resolves after a given time interval
const pushSocket = new Push();
const pullSocket = new Pull();
// Create a new ZeroMQ Push socket and a new Pull socket
export async function push<T extends MessageLike | MessageLike[]>(job: T): Promise<void> {
// Define a push function that takes a generic message and sends it to the Pull socket
// The function returns a Promise that resolves when the message is sent
if(!pushSocket.writable){
await pushSocket.bind("tcp://*:7777");
}
// If the socket is not yet writable, bind it to the address "tcp://*:7777"
await pushSocket.send(job);
// Send the message to the connected Pull socket
}
export async function pull<T>(): Promise<void> {
// Define a pull function that receives messages from the Push socket
// The function returns a Promise that resolves when a message is received
pullSocket.connect("tcp://localhost:7777");
// Connect the Pull socket to the address "tcp://localhost:7777"
for await (const [msg] of pullSocket) {
await sleep(2000);
console.log(`Received Job ${msg.toString()}`);
}
// Use a for-await-of loop to iterate over incoming messages from the Pull socket
// The sleep function is called to simulate a delay in processing the message
// The message is then logged to the console
}
Assuming that i called this file zero.ts
I can use it in script below
import {pull, push} from "./zero";
pull().catch(console.error)
async function main() {
for(let i = 1; i <= 10; i++) {
console.log("push", i);
await push(`ok ${i}`)
}
}
main().catch(console.error)
I will see almost instant push 1..10
and then any 2
seconds new lines with Received Job
.
Of course you can use many instances of scripts that are workers and do these jobs in independent processes. In this case I wanted streamline outgoing http requests so I decided to process them in single pull
and inject these requests from different places of code by push
. So you can see that there is huge variety of applications especially, taking into account that I presented only one pattern from 4 available in ZeroMQ.
Other articles
You can find interesting also.
Xss attack using script style and image
Learn how to infect a page using an XSS attack with the script, style, or image tags. You can see how to replace the content of the page with your own even without javascript.
Daniel Gustaw
• 4 min read
tRPC - super fast development cycle for fullstack typescript apps
We building tRPC client and server with query, mutation, authentication and subscriptions. Authentication for websocket can be tricky and it is in this case so there are presented three approaches to solve this problem.
Daniel Gustaw
• 15 min read
How many families can fit on the plane - an algorithmics problem
We compare two solutions to the problem of counting free sets of adjacent seats. You will learn how to use Profiling and how much difference the use of pop and shift makes on arrays in js.
Daniel Gustaw
• 12 min read