איך שולחים ומקבלים הודעות בתור RabbitMQ מתוך שרת Node.JS Express
פוסט זה מכיל טיפ קצר על עבודה עם Node.JS. כדי ללמוד Node.JS הרבה יותר לעומק אני ממליץ לכם לבדוק את קורס Node.JS כאן באתר. הקורס כולל עשרות שיעורי וידאו מוקלטים והמון תרגול שילוו אתכם מהצעד הראשון בעבודה עם Node.JS ועד הנושאים המתקדמים.
כשאני כותב קוד מדי פעם אני נתקע על שטות שאני חושב שצריכה להיות מאוד ברורה. השטות של היום היתה RabbitMQ ובפרט איך לחבר אותו ל Express. הנה הקוד שבסוף עבד לי בצירוף כמה אילוצים ששווה לשים לב אליהם כשאתם מקודדים דברים דומים.
1. על מה חשוב להסתכל
מצאתי המון מדריכים ברשת שמראים איך להתחבר משרת Express ל RabbitMQ אבל אף אחד מהם לא התאים בדיוק, ולכן לקח לי יחסית הרבה זמן לבנות את האינטגרציה. אלה הנקודות המרכזיות שחיפשתי בפיתרון:
פתיחת Connection ל RabbitMQ היא יקרה. אנחנו רוצים לעשות את זה גג פעם אחת ביישום.
פתיחת Channel היא לא יותר מדי יקרה, ו RabbitMQ ממש אוהב לסגור לנו Channel-ים באמצע החיים, לפעמים סתם בשביל להרגיז. ה Channel מייצג רצף תקשורת מסוים והגישה של RabbitMQ היא שכל הודעה לא במקום והוא סוגר את ה Channel. לכן חשוב מאוד לא להשתמש באותו Channel בכמה Thread-ים, או במקרה של Express לא משתפים Connection בין כמה בקשות.
גם אם publish מחזיר false הוא עדיין שלח את ההודעה. אבל אם publish זרק Exception כן כדאי לדווח על זה הלאה כי זה אומר שה Connection סגור.
והכי חשוב הוא שגם RabbitMQ יכול להיות למטה מכל מיני סיבות. כשזה קורה אנחנו צריכים להחזיר שגיאה למשתמשים שמנסים לבצע פעולה בנתיב שמשדר לתור, אבל עדיין להשאיר את השרת זמין לטיפול בנתיבים אחרים וכל הזמן לנסות להתחבר מחדש כדי שנוכל לחזור לעבוד כש Rabbit יעלה חזרה.
עם ארבעת הנקודות מול העיניים אפשר להמשיך לכתוב את הקוד.
2. קוד שמטפל בחיבור ל Rabbit
ספריית amqplib של Node.JS לא מטפלת בחיבור מחדש אחרי ניתוק אז אנחנו צריכים לסגור בשבילם את הפינה. אפשר לבנות מחלקה שעוטפת Connection שתדאג לזה בצורה, כך שה Connection יישמר בתור שדה מידע במחלקה וכל פעם שהוא ייסגר אוטומטית ננסה להתחבר מחדש. הנה הקוד:
class RabbitMQ {
constructor(url) {
this.url = url;
}
async init() {
while (true) {
try {
this.connection = await amqp.connect(this.url);
this.connection.on('close', () => {
console.log('Connection closed, reconnecting in 1 second');
this.init();
});
break;
} catch (err) {
console.log('Connection failed, retrying in 1 second');
await sleep(1000);
}
}
return this.connection;
}
}
function sleep(ms) {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve();
}, ms);
});
}
במצב ששרת RabbitMQ זמנית לא זמין אוביקט ה connection ייסגר ואז כל הפעולות שננסה לעשות עליו ייכשלו. אבל, בזכות ה Event Handler שהתקנתי בצורה אוטומטית ננסה להתחבר מחדש ואחרי שיהיה שוב חיבור נמשיך לעבוד עם ה Connection החדש.
3. קוד שרת ששולח הודעות
אז איך עובדים עם המחלקה RabbitMQ אתם שואלים? המשחק הוא פשוט - בעליה של השרת אני רוצה להפעיל את init ולחכות שיצליח, ואחרי זה בכל בקשה נכנסת אנסה לפתוח Channel עבור אותה בקשה. אם הניסיון מצליח אפשר יהיה להמשיך לטפל בבקשה ואם הוא נכשל (למשל כי Rabbit זמנית לא זמין) אז נחזיר שגיאה.
את הפעולה הזאת אני עושה ב Middleware שיופעל עבור כל הבקשות שצריכות את RabbitMQ. ה Middleware ואיתחול השרת נראים כך:
const rabbit = new RabbitMQ(RABBITMQ_URL);
async function setup(app) {
await rabbit.init();
app.use(logger('dev'));
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(cookieParser());
console.log('setup');
app.use(async function(req, _res, next) {
try {
// Fetches the active connection or re-connect
const connection = rabbit.connection;
const channel = await connection.createChannel();
req.locals = req.locals || {};
req.locals.channel = channel;
return next();
} catch (err) {
console.log(err);
return next(err);
}
});
app.use('/', indexRouter);
}
setup(app);
צריך להגיד - בדוגמת הקוד כאן ה Middleware פועל על כל הנתיבים, אבל בגלל שזה Middleware מאוד קל להוסיף הגבלה שיפעל רק על נתיבים מסוימים או על Router מסוים.
בתוך קוד הטיפול בנתיב אני יכול למשוך את ה Channel ולפרסם אליו הודעות באופן הבא:
router.all('/ipn', function(req, res) {
try {
const { channel } = req.locals;
const data = { type: 'PaymentReceived', payload: req.body };
channel.publish('', 'payments-received', Buffer.from(JSON.stringify(data), 'utf8'));
res.sendStatus(201);
} catch (err) {
console.log(err);
res.sendStatus(500);
}
});
לא חייבים לבדוק את ערך ההחזר של publish כי גם אם הוא false ההודעה עדיין נשלחה לתור. כן צריך לטפל ב Exception שאומר שהערוץ או החיבור נסגר.
4. קבלת הודעות
בצד המקבל עדיין כדאי להשתמש במחלקה RabbitMQ שעוטפת את ה connection ומתחברת מחדש, אבל עם טוויסט: הפעם אנחנו צריכים להפעיל מחדש את הפונקציה consume כל פעם שמתחברים מחדש ל RabbitMQ כדי להאזין מחדש להודעות. בשביל זה אני מוסיף למחלקה שדה מידע נוסף שיחזיק מערך של תורים ופונקציות טיפול בהודעות בכל תור, ומפעיל את consume כל פעם אחרי שהתחברתי מחדש. הקוד נראה כך:
class RabbitMQ {
constructor(url) {
this.url = url;
this.messageHandlers = [];
}
async init() {
while (true) {
try {
this.connection = await amqp.connect(this.url);
this.connection.on('close', () => {
console.log('Connection closed, reconnecting in 1 second');
this.init();
});
break;
} catch (err) {
console.log('Connection failed, retrying in 1 second');
console.log(err);
await sleep(1000);
}
}
const channel = await this.connection.createChannel();
this.messageHandlers.forEach(({ queue, callback }) => {
channel.consume(queue, callback.bind(null, channel));
});
return this.connection;
}
addMessageHandler(queue, callback) {
this.messageHandlers.push({ queue, callback });
}
}
בשביל להשתמש בו אני יכול להפעיל אותו מתוך פונקציה כזאת:
exports.start = async function start() {
const rabbit = new RabbitMQ(RABBITMQ_URL);
rabbit.addMessageHandler('payments-received', (channel, msg) => {
console.log(`Received message: "${msg.content.toString()}"`);
channel.ack(msg);
});
await rabbit.init();
}
והתוצאה היא קוד שרת שמאזין להודעות וכל פעם שמקבל הודעה דרך RabbitMQ הוא ידפיס את התוכן שלה למסך, ויותר חשוב - אם RabbitMQ התנתק או לא זמין זמנית אז השרת ימשיך לנסות להתחבר מחדש עד ש Rabbit יעלה שוב.
5. נ.ב. למה לא להפיל את כולם בדומינו?
גישה חלופית נפוצה בעבודה עם קונטיינרים היא "שיטת הדומינו" (אני חושב שהרגע המצאתי את השם הזה, אז אל תשתמשו בו בשיחות עם אנשים מבחוץ), הרעיון בגדול זה שאני ממילא רץ בקונטיינר שרץ בתוך איזה מנהל קונטיינרים, אז כשיש בעיה אני יכול לשבור את הקונטיינר שלי ומנהל הקונטיינרים כבר יפעיל קונטיינר חדש עם הקוד. בגישה כזאת אני משתמש בסקריפט כמו wait-for-it בעליה בשביל לוודא ש Rabbit זמין וככה אני יודע שהקונטיינר שלי יעלה שוב רק כש Rabbit גם עולה שוב. אני קורא לה שיטת הדומינו כי היא אומרת שכשסרביס נופל אז נפיל איתו את כל הסרביסים שקשורים אליו ועוד מעט מנהל הקונטיינרים שלנו ירים את הכל מחדש והכל יסתדר.
אני לא אוהב את הגישה הזאת משתי סיבות:
מבחינת משאבים לסגור ולפתוח קונטיינרים כל היום זה בזבזני, ובמיוחד כשלא חייבים.
מבחינת המערכת, אם אני סוגר את כל הקונטיינר של השרת עד ש Rabbit יהיה זמין, אולי אני מפסיד נתיבים שהייתי יכול כן לטפל בהם שלא צריכים כתיבה ל Rabbit, אבל עכשיו יחזירו שגיאה כי הורדתי את הקונטיינר.
מחשבה על הבעיות שיקרו וטיפול בהן בקוד עוזרים לנו לכתוב סרביסים שיהיו חסינים יותר לבעיות וכך יתרמו ליציבות הכללית של המערכת.