• בלוג
  • איך ולמה להקים Fanout Exchange ב RabbitMQ

איך ולמה להקים Fanout Exchange ב RabbitMQ

23/04/2021

תור ההודעות RabbitMQ הוא אחד האהובים עליי בגלל תיעוד מעולה והתקנה פשוטה. בין היכולות החזקות של רביט היא האפשרות לנהל את שליחת ההודעות באמצעות רכיב שנקרא Exchnage. בואו נראה איך להשתמש בו כדי לשלוח הודעה למספר רב של נמענים.

1. הגדרת Fanout Exchange

האקסצ'יינג' הוא המנגנון שאחראי על קבלת הודעה ושליחתה לתורים הרלוונטים. ב RabbitMQ יש 4 סוגים של Exchange:

  1. Direct Exchange
  2. Fanout Exchange
  3. Topic Exchange
  4. Headers Exchange

הראשון, Direct Exchange, פשוט שולח את ההודעה לתור שמתאים לה לפי מפתח הניתוב של ההודעה (שזה בדרך כלל שם התור אליו היא מיועדת). השני, fanout, עליו נדבר בפוסט זה, משכפל כל הודעה ושולח עותק שלה לכל התורים ב Exchange. ב Topic Exchange ההודעה משוכפלת ונשלחת לתורים מסוימים לפי מילות קוד במפתח הניתוב והסוג הרביעי מאפשר שליחה לפי אוביקט JSON של קריטריונים מורכבים יותר המצורף לכל הודעה.

ב Python, הפקודה:

channel.exchange_declare(exchange='demo', exchange_type='fanout')

מגדירה Exchange לפי ה type שנבחר ועם השם שעובר לפרמטר exchange.

2. שליחת הודעות

אחרי שהגדרנו את ה Exchange אנחנו יכולים לשלוח אליו הודעה עם הפקודה:

channel.basic_publish(exchange='demo',
                      routing_key='hello',
                      body='Hello World!')

ולמרות שאני מעביר ערך למשתנה routing_key המחשב מתעלם מערך זה מאחר ו Fanout Exchange פשוט משכפל את ההודעה לכל התורים.

התוכנית המלאה ב Python שתתחבר ל RabbitMQ על המחשב המקומי, תגדיר את ה Exchange ותשלח הודעה נראית כך:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='demo', exchange_type='fanout')

channel.basic_publish(exchange='demo',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")

3. קבלת הודעות

בצד המקבל אנחנו יוצרים תור הודעות ומחברים אותו ל Exchange שיצרנו. בשביל שכל תהליך ייצור תור הודעות משלו אני מעביר מחרוזת ריקה בתור שם התור, וזה גורם ל Rabbit לבחור בשבילי את השם. הקוד הבא יוצר תור חדש ומחבר אותו ל Exchange:

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='demo', queue=queue_name)

אחרי שאני מפעיל את הקוד הזה מכמה תוכניות שרצות במקביל אני מקבל Exchange מרכזי עם הרבה תורים שמחוברים אליו. בגלל שסוג ה Exchange הוא fanout, כל הודעה שתגיע ל Exchange תישלח אוטומטית לכל התורים.

התוכנית המלאה ב Python שתתחבר ל RabbitMQ על המחשב המקומי, תיצור Exchange, תחבר אליו תור ותאזין להודעות בתור נראית כך:

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='demo', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='demo', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

עכשיו מתחיל הכיף: נוכל להפעיל במקביל מספר עותקים של תוכנית קריאת ההודעות, כל עותק בחלון חדש. בחלון אחרון נפעיל את התוכנית ששולחת הודעה ואז נראה במקביל בכל החלונות שמאזינים את שורת ההדפסה מקוד הקריאה.

4. למה זה טוב

שליחת הודעה למספר תורים במקביל מאפשרת טיפול במגוון של מצבים:

  1. עבור לוגים - אפשר להגדיר מספר תוכניות שיקראו הודעות לוג, כל אחת תכתוב את ההודעה ליעד אחר: תוכנית אחת תכתוב לקובץ, שניה לבסיס הנתונים ושלישית תשלח את הלוג למאגר לוגים בענן.

  2. עבור מספר תהליכים שקורים במקביל בעקבות אירוע - למשל כשמשתמש נרשם למערכת נרצה גם לשלוח לו אימייל, וגם לסמן בסרביס של הסטטיסטיקה אירוע של הצטרפות למערכת. שני תהליכים אלה לא קשורים והם מתרחשים בסרביסים שונים, ולכן נוח להשתמש ב Message Queue כדי לטפל בשניהם.

  3. לצורך Debug או מעקב הודעות - אפשר להגדיר סרביס ששומר את כל ההודעות שעוברות ב Exchange לבסיס הנתונים וכך אנחנו יכולים לוודא שהודעות מסוג מסוים לא ילכו לאיבוד.

בעולם האמיתי נוכל להגדיר Exchange לכל סוג אירוע ותור עבור כל תהליך שצריך לטפל באירוע. בעזרת Fanout Exchange כל התורים יקבלו במקביל את ההודעה על האירוע וכל סרביס יוכל לבצע את החלק שלו בטיפול.