标题:多任务的调度情况 node ts。
简介:在我们开发中有些关于分布式的任务需要部署执行的时候,那么这里就需要到关于这个逻辑的代码,来帮助我们解决这个繁琐的事情。以下的内容就给出了关于这块的实现,结构,只是需要在具体的逻辑中去添加我们需要实现的逻辑即可。
整体的结构中就好比是有一个包工头,他带领了很多的员工,然后包工头接了很多的工作事情,这些需要一些员工来执行,那么包工头就会将这些工作事情告诉员工们执行,员工们就做具体的事情,等待工作完成后告诉包工头自己任务完成即可,或可增加事情的结果等。
关于上面的介绍后,就需要建立一个包工头与员工的通信情况,这里采用了websocket来做。
有如下消息工具:
MsgUitl.js–来控制消息的解码编码与解析,具体的逻辑可以在后面自定义扩展
var iconv_lite = require("iconv-lite");
/**
* 编码默认使用utf-8
*/
var coding = "utf-8";
/**
* 解码消息 将消息的格式解析
*
* @param {ArrayBuffer} data
* @returns {string}
*/
var decode = function (data) {
//
let str = iconv_lite.decode(data, "utf-8");
// console.log("decodeMsg : ", str);
return str;
}
/**
* 编码消息 将消息的格式编码
*
* @param {string} msg
*
* @returns {ArrayBuffer}
*/
var encode = function (msg) {
let data = iconv_lite.encode(msg, "utf-8");
// console.log("encodeMsg : ", msg);
return data;
}
/**
* 解码消息 将消息内容解析
*
* @param {ArrayBuffer} data
* @returns {msg:data}
*/
var decodeMsg = function (data) {
//
let str = decode(data);
// console.log("decodeMsg : ", str);
return JSON.parse(str);
}
/**
* 编码消息 将消息内容编码
*
* @param {Object} data
*
* @returns {ArrayBuffer}
*/
var encodeMsg = function (data) {
let jsonStr = JSON.stringify({ msg: data });
let msgData = encode(jsonStr);
// console.log("encodeMsg : ", msg);
return msgData;
}
//导出
exports.encodeMsg = encodeMsg;
exports.decodeMsg = decodeMsg;
然后咱们创建任务具体的相关工具类来处理任务相关的事情–JobUilt.js
/**
* 注册angent的数据
*
* @param {string} angentName
* @returns
*/
var registerAngentData = function (angentName) {
return { register: angentName };
}
/**
* 获取任务数据
*
* @param {Object} data //具体任务数据
* @param {string} angentName
* @returns { angentInfo: getAngentInfo(angentName), data: data }
*/
var getJobData = function (data, angentName) {
let jobData = { angentInfo: getAngentInfo(angentName), data: data };
return jobData;
}
/**
* 获取一个任务响应数据
*
* @param {Object} data //具体任务数据
* @param {string} angentName
* @param {status} status 状态 0 成功 1 失败 2 异常
*
* @returns { angentInfoRev: getAngentInfo(angentName), data: data }
*/
var getJobRevData = function (data, angentName, status) {
let jobData = { angentInfoRev: getAngentInfo(angentName), data: data, status: status };
return jobData;
}
/**
* 获取angentInfo信息
*
* @param {object} angentName
* @returns {angentName:string}
*/
var getAngentInfo = function (angentName) {
return { angentName: angentName };
}
//导出
exports.getJobData = getJobData;
exports.getAngentInfo = getAngentInfo;
exports.getJobRevData = getJobRevData;
exports.registerAngentData = registerAngentData;
有了上述的工具情况,咱们就可以创建 包工头了 Master.js
// 加载node上websocket模块 ws;
var configModel = require("../Config");
var MsgUtil = require("../MsgUtil");
var JobUtil = require("../JobUtil");
/**
* 所有的可以用的angent
*/
var allAngents = new Set();
/**
* 根据名来指定的angent客户
*/
var allMapAngents = {};
/**
* 记录在执行任务的angent
*/
var doJobAngents = new Set();
/**
* 根据客户来找AngentName名字
*
* @param {ws} client_sock
* @returns
*/
var getAngentNameByClientSock = function (client_sock) {
for (let key in allMapAngents) {
if (allMapAngents[key] == client_sock) {
return key;
}
}
return null;
}
/**
* 获取一个空闲的angent
*
* @returns
*/
var getOneFreeAngent = function () {
for (let key in allMapAngents) {
let client_sock = allMapAngents[key];
if (doJobAngents.has(client_sock)) {
continue
}
return client_sock;
}
return null;
}
/**
* 增加一个可以用的angent
*
* @param {string} angentName
* @param {Object ws} client_sock
*/
var addAngent = function (angentName, client_sock) {
//
let isOk = true;//是否注册成功
//
allAngents.add(client_sock);
allMapAngents[angentName] = client_sock;
//发送注册结果
let msg = JobUtil.registerAngentData(isOk);
let dataMsg = MsgUtil.encodeMsg(msg);
client_sock.send(dataMsg);
//打印可以的angent
console.log(allAngents.size, " 个angent 可用");
}
/**
* 增加一个可以用的angent
*
* @param {string} angentName
* @param {Object ws} client_sock
*/
var removeAngent = function (angentName, client_sock) {
allMapAngents[angentName] = null;
allAngents.delete(client_sock);
//
doJobAngents.delete(client_sock);//将做任务的他删除
//打印可以的angent
console.log(allAngents.size, " 个angent 可用");
}
/**
* 模拟派发任务
*/
var distributeJob = function (jobInfo) {
if (allAngents.size > 0) {
let client_sock = getOneFreeAngent();
if (client_sock) {
//
doJobAngents.add(client_sock);//做任务记录
//
let angentName = getAngentNameByClientSock(client_sock);
//发送任务
let msg = JobUtil.getJobData(jobInfo, angentName);
let dataMsg = MsgUtil.encodeMsg(msg);
client_sock.send(dataMsg);
//
console.log("可以派发任务 给 ", angentName);
return true;
}
}
console.warn("没有可用的angent---");
return false;
}
/**
* 开启master
*/
var start = function () {
// 加载node上websocket模块 ws;
var ws = require("ws");
// 启动基于websocket的服务器,监听我们的客户端接入进来。
var server = new ws.Server({
host: configModel.config.ip,
port: configModel.config.port,
});
// 监听接入进来的客户端事件
function websocket_add_listener(client_sock) {
// close事件
client_sock.on("close", function () {
let angentName = getAngentNameByClientSock(client_sock);
removeAngent(angentName, client_sock);
console.log("client close angentName:", angentName);
});
// error事件
client_sock.on("error", function (err) {
let angentName = getAngentNameByClientSock(client_sock);
removeAngent(angentName, client_sock);
console.log("client error angentName:", angentName, " err", err);
});
// end
// message 事件, data已经是根据websocket协议解码开来的原始数据;
// websocket底层有数据包的封包协议,所以,绝对不会出现粘包的情况。
// 每解一个数据包,就会触发一个message事件;
// 不会出现粘包的情况,send一次,就会把send的数据独立封包。
// 如果我们是直接基于TCP,我们要自己实现类似于websocket封包协议就可以完全达到一样的效果;
client_sock.on("message", function (data) {
// console.log(data);
//收到
let msgData = MsgUtil.decodeMsg(data);
console.log("msg rev : ", msgData);
//
if (msgData.msg.register) {
//存在注册
console.log("注册 angent Name:", msgData.msg.register);
addAngent(msgData.msg.register, client_sock);
//
} else if (msgData.msg.angentInfoRev) {
//任务响应
switch (msgData.msg.status) {
case 0://成功
console.log("任务 ", JSON.stringify(msgData.msg.data), " 成功");
break
case 1://失败
console.log("任务 ", JSON.stringify(msgData.msg.data), " 失败");
break
case 2://异常
console.log("任务 ", JSON.stringify(msgData.msg.data), " 异常");
break
}
//将任务执行的angent移除
doJobAngents.delete(client_sock);
}
});
// end
}
// connection 事件, 有客户端接入进来;
function on_server_client_comming(client_sock) {
console.log("client comming");
websocket_add_listener(client_sock);
}
server.on("connection", on_server_client_comming);
// error事件,表示的我们监听错误;
function on_server_listen_error(err) {
console.error(err);
}
server.on("error", on_server_listen_error);
// headers事件, 回给客户端的字符。
function on_server_headers(data) {
// console.log(data);
}
server.on("headers", on_server_headers);
}
start();
//模拟定时去派发任务
let jobId = 1;
let testDoJob = function () {
setInterval(() => {
if (distributeJob({ jobId: jobId })) {
jobId++;
}
}, 3000);
}
testDoJob();
console.log("========启动Master========");
有了包工头也需要有员工 – 咱们创建员工 Angent.js
var minimist = require('minimist');
var configModel = require("../Config");
var MsgUtil = require("../MsgUtil");
var JObUtil = require("../JobUtil");
var argv = minimist(process.argv.slice(2));
console.log("命令行的参数情况:", argv);
//自己名
let usAngentName = argv.n;
var ws = require("ws");
/**
* 做任务
*
* @param {Object} jobData
* @param {ws} sock
*/
var doJob = function (jobData, sock) {
console.log("做任务:", JSON.stringify(jobData));
//模拟做任务成功
setTimeout(() => {
//告诉做任务成功了
let msg = JObUtil.getJobRevData(jobData, usAngentName, 0);
let data = MsgUtil.encodeMsg(msg);
console.log("===>告诉做任务成功了***");
console.log("send msg : ", msg);
//console.log("send data : ", data);
sock.send(data);
}, 5000);
}
/**
* 开启
*/
var start = function () {
// 创建了一个客户端的socket,然后让这个客户端去连接服务器的socket
var sock = new ws("ws://" + configModel.config.ip + ":" + configModel.config.port);
sock.on("open", function () {
console.log("connect success !!!!");
//发送注册消息
let data = JObUtil.registerAngentData(usAngentName);
let msgData = MsgUtil.encodeMsg(data);
sock.send(msgData);
});
sock.on("error", function (err) {
console.error("error: ", err);
});
sock.on("close", function () {
console.error("master close");
});
sock.on("message", function (data) {
// console.log(data);
let msgData = MsgUtil.decodeMsg(data);
console.log("client ", usAngentName, " rev msg:", msgData);
if (msgData.msg.register) {
//注册消息
console.log("注册 ", msgData.msg.register == true ? "成功" : "失败!");
} else if (msgData.msg.angentInfo) {
//任务消息
doJob(msgData.msg.data, sock);
}
});
}
//开启
start();
然后咱们就再给出个配置文件 Config.js
/**
* 配置master的情况
*/
class Config {
ip = "127.0.0.1";
port = 6080;
}
exports.config = new Config();
然后咱们给出简单的开启的批处理脚本
开启Angent-001.bat
这里如果要多开几个就可以直接修改后面的-n参数即可
CHCP 65001
echo off
call node src/angents/Angent.js -n angent-001
pause
开启Master.bat
CHCP 65001
echo off
call node src/master/Master.js
pause
下面给出整体的目录结构方便参考
备注:上诉内容是自己整理起来的处理方式,如果有借鉴之处请谅解,如有不当之处请见谅。