עיבוד נתונים מקבילי עם תבנית Producer/Consumer
בואו נניח שאני רוצה להוציא מיילים לאלף מנויים של הבלוג כל בוקר כשאני מוציא פוסט חדש. כל עוד אנחנו במספרים קטנים אפשר לרוץ בלולאה בודדת על המידע ולשלוח את המייל אחד אחד. ככל שנצטרך לרוץ על יותר נתונים סיכוי טוב שנרצה למקבל את התהליך, כך שלא יווצר מצב שמישהו מקבל את הפוסט בשבע ועשרה במקום בשבע רק בגלל שיש עוד כמה מנויים לאתר.
המעבר מתהליך יחיד למקביליות הוא לא תמיד פשוט. אם קודם היתה לנו לולאה בודדת שרצה על כל המיילים, עכשיו יש שתי תוכניות שלא מכירות אחת את השניה שעוברות כל אחת באופן עצמאי על רשימת המיילים. איך נוודא ששתי התוכניות האלה לא ישלחו מייל לאותו בן אדם? ומה אם אחת התוכניות תתרסק באמצע?
תבנית Producer/Consumer עוזרת לנו להתמודד עם עיבוד מקבילי של מידע ומומלצת בתרחישים כאלה ורבים נוספים. נמשיך לראות איך זה בנוי.
1. ארכיטקטורה של Producer/Consumer
הארכיטקטורה איתה כדאי להתמודד עם בעיה כזאת נראית בערך כך:
ניצור Message Queue שיחזיק את כל המידע.
ניצור תוכנית Producer שתפקידה למלא את התור. התוכנית תרוץ כל בוקר בשבע ותכניס לתור את כל כתובות המייל של כל המנויים.
ניצור מספר תהליכי Consumer שתפקידם לקרוא מהתור. התהליכים האלה יהיו כל הזמן פעילים (גם בלילה כשאין פוסטים, כי זה יותר נוח ככה).
ניצור תהליך בקרה שמוודא שתמיד יש לנו 5 (או כמה שהחלטנו) תהליכים קוראים פעילים.
כל פעם שתהליך שלח מייל למנוי נסמן בטבלא ליד המנוי שהוא קיבל מייל עם מזהה של הפוסט ומתי המייל הגיע.
בשביל להתמודד עם התרסקויות אפשר להריץ את תהליך ה Producer כל עשר דקות, גם אם אין פוסטים חדשים. כל ריצה התהליך יזהה את כל האנשים שצריכים לקבל מייל ויכניס את הפרטים שלהם ל Message Queue שלנו. אם אין כאלה הכל טוב אפשר לחזור לישון ולהתעורר עוד עשר דקות. בצורה כזאת אם אחד ה Consumers התרסק באמצע שהוא שלח מיילים למנויים, כל המנויים שכבר קיבלו את המייל מסומנים בבסיס הנתונים, וכל אלה שעדיין לא קיבלו את המייל ייכנסו מחדש לתור עם ההפעלה הבאה של ה Producer.
2. קוד? ברור
נראה איך זה עובד ברובי עם דוגמא קטנה יותר של מספרים ראשוניים. ה Producer שלנו מייצר סלייסים של מספרים ושולח אותם לתור כדי ש Consumers יוכלו לבדוק מי מהמספרים בסלייס הם ראשוניים. אני עובד עם RabbitMQ בתור תור ההודעות שלי וחבילת bunny כדי לתקשר עם התור:
require "bunny"
require "json"
# Start a communication session with RabbitMQ
conn = Bunny.new
conn.start
# open a channel
ch = conn.create_channel
# declare a queue
q = ch.queue("test1")
# publish a message to the default exchange which then gets routed to this queue
(1..1_000_000).each_slice(500) do |slice|
q.publish slice.to_json
end
# close the connection
conn.stop
בצד ה Consumers כל אחד מהם יירשם לקבל הודעות מהתור ויתחיל לטפל בהודעות אחת אחרי השניה. ה Consumers ימשיכו לפעול לנצח ולכן אני מסיים אותם בלולאת sleep:
require "bunny"
require "json"
require 'prime'
# Start a communication session with RabbitMQ
conn = Bunny.new
conn.start
# open a channel
ch = conn.create_channel
# declare a queue
q = ch.queue("test1")
q.subscribe do |delivery_info, metadata, payload|
slice = JSON.parse(payload)
slice.each do |i|
puts i if Prime.prime?(i)
end
end
loop { sleep 5 }
# close the connection
conn.stop
בשביל להפעיל את הקוד נפתח 3 חלונות, קודם כל נפעיל בשניים הראשונים את ה Consumers, ואחרי זה נפעיל בשלישי את ה Producer. אם כתבתם הכל נכון תוכלו לראות על המסך מספרים אקראיים רצים בשני החלונות במקביל.
השימוש בתבנית Producer/Consumer מאפשר לנו לבנות מערכות שיגדלו טוב גם כשכמות המידע שצריך לטפל בה עולה כל הזמן. התור יחלק את ההודעות שווה בשווה בין ה Consumers וכך תמיד נוכל להוסיף עוד תהליכי טיפול כדי לשפר ביצועים.