Node.js 中的通信方式都有哪些
#nodejs
目录
- 1. 总结
- 2. child_process 子进程通信
- 3. cluster ==集群模式==通信
- 4. Socket 通信
- 5. 消息队列(使用 Redis)
- 6. 共享内存(使用 Node.js worker_threads)
- 7. HTTP/HTTPS 通信
- 8. WebSocket 通信
- 9. 进程管理和监控
1. 总结
- child_process:
- 适用于
父子进程
通信
- 适用于
- cluster:
- 适用于
主从架构的多进程应用
- 适用于
- Socket:
- 适用于
跨机器
的进程通信
- 适用于
- 消息队列:
- 适用于解耦的异步通信 (redis)
- 共享==内存==:
- 适用于高性能数据共享(
worker_threads
)
- 适用于高性能数据共享(
- HTTP / HTTPS:
- 适用于
RESTful
服务通信
- 适用于
- WebSocket:
- 适用于实时==双向通信==
- 进程管理:
- 适用于复杂的多进程应用
下面展开聊
2. child_process 子进程通信
spawn
方式fork
方式
// 1.1 spawn 方式
const { spawn } = require('child_process');
// 父进程
const child = spawn('node', ['child.js']);
child.stdout.on('data', (data) => {
console.log('子进程输出:', data.toString());
});
child.stderr.on('data', (data) => {
console.error('子进程错误:', data.toString());
});
child.on('close', (code) => {
console.log(`子进程退出,退出码 ${code}`);
});
// 1.2 fork 方式
const { fork } = require('child_process');
// 父进程 (parent.js)
const child = fork('./child.js');
// 发送消息给子进程
child.send({ hello: 'world' });
// 接收子进程消息
child.on('message', (message) => {
console.log('收到子进程消息:', message);
});
// 子进程 (child.js)
process.on('message', (message) => {
console.log('收到父进程消息:', message);
// 回复父进程
process.send({ received: true });
});
3. cluster ==集群模式==通信
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
// 主进程代码
console.log(`主进程 ${process.pid} 正在运行`);
// 生成工作进程
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
// 主进程发送消息给工作进程
worker.send({ type: 'hello', from: 'master' });
// 接收工作进程消息
worker.on('message', (message) => {
console.log(`主进程收到来自工作进程 ${worker.id} 的消息:`, message);
});
}
// 监听工作进程退出
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 已退出`);
});
} else {
// 工作进程代码
http.createServer((req, res) => {
res.writeHead(200);
res.end('你好,世界\n');
}).listen(8000);
// 工作进程接收消息
process.on('message', (message) => {
console.log(`工作进程 ${process.pid} 收到消息:`, message);
});
// 工作进程发送消息给主进程
process.send({ type: 'ready', pid: process.pid });
}
4. Socket 通信
// 3.1 TCP Socket
const net = require('net');
// TCP 服务器
const server = net.createServer((socket) => {
socket.on('data', (data) => {
console.log('收到数据:', data.toString());
socket.write('服务器已收到消息');
});
});
server.listen(8080, () => {
console.log('TCP 服务器启动在端口 8080');
});
// TCP 客户端
const client = new net.Socket();
client.connect(8080, 'localhost', () => {
client.write('你好,服务器');
});
// 3.2 Unix Domain Socket
const server = net.createServer((socket) => {
socket.on('data', (data) => {
console.log('收到数据:', data.toString());
});
});
server.listen('/tmp/node.sock');
5. 消息队列(使用 Redis)
const Redis = require('ioredis');
const redis = new Redis();
// 发布者
class Publisher {
constructor() {
this.redis = new Redis();
}
publish(channel, message) {
return this.redis.publish(channel, JSON.stringify(message));
}
}
// 订阅者
class Subscriber {
constructor() {
this.redis = new Redis();
}
subscribe(channel, callback) {
this.redis.subscribe(channel, (err, count) => {
if (err) {
console.error('订阅错误:', err);
return;
}
console.log(`订阅了 ${count} 个频道`);
});
this.redis.on('message', (channel, message) => {
callback(channel, JSON.parse(message));
});
}
}
// 使用示例
const publisher = new Publisher();
const subscriber = new Subscriber();
subscriber.subscribe('news', (channel, message) => {
console.log(`收到来自 ${channel} 的消息:`, message);
});
publisher.publish('news', { title: '重要通知', content: '系统更新' });
6. 共享内存(使用 Node.js worker_threads)
const { Worker, isMainThread, parentPort, workerData, SharedArrayBuffer } = require('worker_threads');
if (isMainThread) {
// 主线程代码
const sharedBuffer = new SharedArrayBuffer(1024);
const sharedArray = new Int32Array(sharedBuffer);
const worker = new Worker(__filename, {
workerData: { sharedBuffer }
});
worker.on('message', (message) => {
console.log('主线程收到消息:', message);
console.log('共享内存数据:', sharedArray[0]);
});
// 修改共享内存
sharedArray[0] = 100;
worker.postMessage('已更新共享内存');
} else {
// 工作线程代码
const sharedArray = new Int32Array(workerData.sharedBuffer);
parentPort.on('message', (message) => {
console.log('工作线程收到消息:', message);
console.log('共享内存数据:', sharedArray[0]);
// 修改共享内存
sharedArray[0] += 1;
parentPort.postMessage('工作线程已修改共享内存');
});
}
7. HTTP/HTTPS 通信
const http = require('http');
const axios = require('axios');
// HTTP 服务器
const server = http.createServer((req, res) => {
if (req.method === 'POST') {
let body = '';
req.on('data', chunk => {
body += chunk.toString();
});
req.on('end', () => {
console.log('收到数据:', JSON.parse(body));
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ status: 'ok' }));
});
}
});
server.listen(3000, () => {
console.log('HTTP 服务器运行在端口 3000');
});
// HTTP 客户端
async function sendRequest() {
try {
const response = await axios.post('http://localhost:3000', {
message: '你好,服务器'
});
console.log('服务器响应:', response.data);
} catch (error) {
console.error('请求错误:', error);
}
}
8. WebSocket 通信
const WebSocket = require('ws');
// WebSocket 服务器
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
console.log('新的连接建立');
ws.on('message', (message) => {
console.log('收到消息:', message.toString());
// 广播消息给所有客户端
wss.clients.forEach((client) => {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(message.toString());
}
});
});
});
// WebSocket 客户端
const ws = new WebSocket('ws://localhost:8080');
ws.on('open', () => {
ws.send('客户端连接成功');
});
ws.on('message', (data) => {
console.log('收到服务器消息:', data.toString());
});
9. 进程管理和监控
class ProcessManager {
constructor() {
this.workers = new Map();
this.setupMaster();
}
setupMaster() {
cluster.setupMaster({
exec: 'worker.js',
args: ['--use', 'http']
});
cluster.on('online', (worker) => {
console.log(`工作进程 ${worker.process.pid} 已启动`);
this.workers.set(worker.id, worker);
});
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 退出`);
this.workers.delete(worker.id);
// 自动重启进程
if (signal !== 'SIGTERM') {
this.createWorker();
}
});
}
createWorker() {
const worker = cluster.fork();
return worker;
}
broadcastMessage(message) {
for (const worker of this.workers.values()) {
worker.send(message);
}
}
shutdown() {
for (const worker of this.workers.values()) {
worker.kill('SIGTERM');
}
}
}
// 使用示例
const manager = new ProcessManager();
// 创建工作进程
for (let i = 0; i < require('os').cpus().length; i++) {
manager.createWorker();
}
// 广播消息
manager.broadcastMessage({ type: 'config', data: { port: 8000 } });