מדריך קצר לאגרגציות ב MongoDB

03/09/2021

אחד הפיצ'רים האהובים עליי ב MongoDB הוא היכולת לחבר יחד מסמכים לקבלת תוצאה באמצעות מיני שפת תכנות שלהם שנקראת Aggregation Pipeline. אגיד לכם את האמת אהבה ממבט ראשון לא היתה שם, אבל אחרי שלמדתי להשתמש בפיצ'ר ולהתמצא בתיעוד שלהם החיים שלי עם מונגו השתפרו משמעותית. בואו נראה איך זה עובד.

1. מהו Aggregation Pipeline

דמיינו שפת תכנות רק שהתוכנית שאתם כותבים בה רצה על קלט מאוד ספציפי: רשימת מסמכים מ Mongo שמתאימים לקריטריון מסוים. כל פקודה בתוכנית עושה משהו עם הקלט הזה, למשל תהיה פקודה שתוסיף שדה לכל מסמך, פקודה אחרת שמסננת החוצה חלק מהמסמכים, פקודה שלישית שמשאירה רק שדות מסוימים של המסמכים ופקודה רביעית שמאחדת מסמכים שונים לקבוצה. הפקודות האלה נקראות Aggregation Stages ואפשר למצוא רשימה שלהן ופירוט על כל אחת בדף התיעוד כאן: https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/.

וכן כל Stage כזה מקבל פרמטרים שאומרים לו איך בדיוק לעשות את הדבר שלו, למשל הפקודה $count מחליפה את כל המסמכים במסמך עם שדה בודד שהערך שלו הוא כמה מסמכים עברו דרכה. היא תקבל פרמטר שיגיד לה מה שם השדה שיהיה במסמך התוצאה. הפקודה $match מסננת את המסמכים שעוברים דרכה ותתן רק למסמכים מסוימים לעבור. היא תקבל פרמטר שאומר לאיזה מסמכים מותר להמשיך.

את ה Pipeline אנחנו כותבים בתור רצף של פקודות בתוך מערך, וכל פקודה היא אוביקט JavaScript. זה תחביר שלוקח זמן להתרגל אליו. הנה דוגמה ל Pipeline:

  [
    // First Stage
    {
      $group :
        {
          _id : "$item",
          totalSaleAmount: { $sum: { $multiply: [ "$price", "$quantity" ] } }
        }
     },
     // Second Stage
     {
       $match: { "totalSaleAmount": { $gte: 100 } }
     }
   ]

יש פה שתי פקודות: הפקודה הראשונה היא פקודת $group והשניה פקודת $match. עכשיו בואו נדמיין כמה מסמכי Mongo שהולכים לעבור דרך ה Pipeline הזה. אולי הם נראים כך:

[
  { "_id" : 1, "item" : "abc", "price" : NumberDecimal("10"), "quantity" : NumberInt("2"), "date" : ISODate("2014-03-01T08:00:00Z") },
  { "_id" : 2, "item" : "jkl", "price" : NumberDecimal("20"), "quantity" : NumberInt("1"), "date" : ISODate("2014-03-01T09:00:00Z") },
  { "_id" : 3, "item" : "xyz", "price" : NumberDecimal("5"), "quantity" : NumberInt( "10"), "date" : ISODate("2014-03-15T09:00:00Z") },
  { "_id" : 4, "item" : "xyz", "price" : NumberDecimal("5"), "quantity" :  NumberInt("20") , "date" : ISODate("2014-04-04T11:21:39.736Z") },
  { "_id" : 5, "item" : "abc", "price" : NumberDecimal("10"), "quantity" : NumberInt("10") , "date" : ISODate("2014-04-04T21:23:13.331Z") },
  { "_id" : 6, "item" : "def", "price" : NumberDecimal("7.5"), "quantity": NumberInt("5" ) , "date" : ISODate("2015-06-04T05:08:13Z") },
  { "_id" : 7, "item" : "def", "price" : NumberDecimal("7.5"), "quantity": NumberInt("10") , "date" : ISODate("2015-09-10T08:43:00Z") },
  { "_id" : 8, "item" : "abc", "price" : NumberDecimal("10"), "quantity" : NumberInt("5" ) , "date" : ISODate("2016-02-06T20:20:13Z") },
]

הפקודה הראשונה היא פקודת $group שמקבלת בתור פרמטר אוביקט. האוביקט מתאר איך יראו הקבוצות שאנחנו בונים. הוא מכיל שדה בשם _id עם הערך $item, שזה אומר בעברית שכל קבוצה תהיה מזוהה לפי הערך של שדה item של כל מסמך. אז המסמך הראשון מגיע ו Mongo מסתכל על שדה item שלו ורואה את הערך abc, מבין שאין עדיין מסמך כזה בתוצאה ונותן לו לעבור. אחרי זה עוברים עוד כמה מסמכים עד שמגיעים למסמך החמישי ששוב מכיל בשדה item את אותו ערך abc. הפעם מונגו מזהה שהוא כבר ראה את הערך הזה ומוסיף את המסמך החמישי לאותה קבוצה יחד עם המסמך הראשון.

אנחנו יודעים שכל המסמכים באותה קבוצה מכילים את אותו ערך לשדה _id שלהם, כי ככה בנינו את הקבוצות, אבל מה לגבי השדות האחרים? אז אנחנו רואים שהפרמטר ש $group קיבל היה אוביקט ויש שם באמת מפתח נוסף בשם totalSaleAmount. מפתח נוסף מגדיר שיהיה שדה נוסף באוביקט התוצאה, והערך של השדה יהיה החישוב שאנחנו רושמים באוביקט - כלומר סכום של מכפלות המחיר בכמות. בכל מסמך שמגיע מונגו ייקח את המחיר, יכפיל אותו בכמות ויוסיף את זה לערך ששמור לו עבור הקבוצה.

אחרי שהשלב הראשון מסתיים ויש לנו את הקבוצות מונגו ממשיך לשלב השני שכולל פקודת $match. פקודה זו מסננת את המסמכים (כלומר את הקבוצות) לפי הפרמטר שקיבלה. ב Pipeline הדוגמה הפרמטר הזה הגדיר שהשדה totalSaleAmount יהיה גדול או שווה ל 100, ולכן רק הקבוצות שמתאימות יופיעו בתוצאה. סך הכל תוצאת ה Pipeline תהיה:

{ "_id" : "abc", "totalSaleAmount" : NumberDecimal("170") }
{ "_id" : "xyz", "totalSaleAmount" : NumberDecimal("150") }
{ "_id" : "def", "totalSaleAmount" : NumberDecimal("112.5") }

נמשיך לעוד כמה דוגמאות והפעם על אוסף נתונים של משימות.

2. יצירת הנתונים

אלה הנתונים לדוגמה ב Collection שלי. יש פה משימות לביצוע, לכל משימה יש סטטוס (בשדה done) האם בוצעה או לא, משתמש שהמשימה בבעלותו, טקסט של המשימה ורשימת תגיות:

[
  {
    "key": 1,
    text: "do it!",
    done: false,
    owner: "dan",
    tags: [
      "home"
    ],
  },
  {
    "key": 2,
    text: "do something else",
    done: false,
    owner: "mike",
    tags: [
      "home",
      "work"
    ],
  },
  {
    key: 3,
    text: "fix stuff",
    done: true,
    owner: "mike",
    tags: [
      "home",
      "garden"
    ],
  },
  {
    key: 4,
    text: "eat something",
    done: false,
    owner: "mike",
    tags: [],
  }
]

3. ספירת משימות פתוחות של כל המשתמשים

משימה ראשונה היא למצוא כמה משימות פתוחות יש סך הכל לכל שמשתמשים יחד. שתי הפקודות שיעזרו לנו כאן הן $match ו $count: הראשונה תסנן רק את המשימות שעדיין לא בוצעו, והשניה תספור כמה משימות כאלה היו.

כך נראה ה Pipeline:

db.collection.aggregate([
  {
    $match: {
      done: false
    }
  },
  {
    $count: "totalOpenTasks",

  }
])

ואם אתם רוצים לשחק עם הקוד, להריץ דרך הדפדפן או לשנות קצת ולראות את האפקט הכנתי דף ב Mongo Playground שמתאים והוא מחכה לכם בקישור כאן: https://mongoplayground.net/p/QwZCzp4etfd

4. ספירת משימות פתוחות של משתמש מסוים

ומה אם נרצה להתאים את השאילתה רק למשתמש מסוים? הכל קל בעולם, רק צריך לשנות את התנאי ב $match. כך יראה הקוד:

db.collection.aggregate([
  {
    $match: {
      done: false,
      owner: "dan",

    }
  },
  {
    $count: "dansOpenTasks",
  }
])

וכאן יש את ה Mongo Playground המתאים לו: https://mongoplayground.net/p/s08WtYHrjbp

5. יצירת דוח: כמה משימות פתוחות יש לכל משתמש

בואו נסבך קצת את העניינים ונשאל כמה משימות פתוחות יש לכל אחד מהמשתמשים במערכת. אם נדמיין את ה Pipeline אז בשביל משימה כזאת צריך להחליף את הפקודה השניה, במקום לספור כמה מסמכים קיבלנו, נרצה לקבץ יחד לקבוצות את המשימות לפי שדה owner (כלומר לפי המשתמש שהמשימה שייכת לו), ולספור כמה מסמכים יש בכל קבוצה כזאת. התחביר קצת מסורבל והוא נראה כך:

db.collection.aggregate([
  {
    $match: {
      done: false,
    }
  },
  {
    $group: {
      "_id": "$owner",
      "openTasks": {
        $count: {}
      }
    }
  }
])

ופה אפשר לשחק עם השאילתה: https://mongoplayground.net/p/7FhgRMgHFvE

6. זיהוי המשתמש עם הכי הרבה משימות פתוחות

אחרי שגילינו כמה משימות פתוחות יש לכל משתמש אפשר להמשיך ולחפש את המשתמש שיש לו הכי הרבה משימות פתוחות. זו משימה שעשויה לבלבל כי אין ב Aggregation Pipeline פעולה של Max שמחזירה את המסמך עם הערך המקסימלי. במקום זה אנחנו צריכים להשתמש במיון וב limit. זה קצת מבלבל כי sort נשמע כמו משהו שלוקח יותר זמן מחישוב ערך מקסימלי, אבל מונגו מספיק חכם בשביל לזהות את התבנית הזאת ולא למיין באמת את כל המסמכים. הנה הקוד:

db.collection.aggregate([
  {
    $match: {
      done: false,
    }
  },
  {
    $group: {
      "_id": "$owner",
      "openTasks": {
        $count: {}
      }
    }
  },
  {
    "$sort": {
      "openTasks": -1
    }
  },
  {
    $limit: 1,
  }
])

שבאמת ייתן את התוצאה:

[
  {
    "_id": "mike",
    "openTasks": 2
  }
]

ואתם יכולים לשחק איתו בקישור הזה: https://mongoplayground.net/p/fUtBp28Cakb.

נסו למצוא איך לשנות את סדר המיון כדי למצוא את המשתמש עם הכי פחות משימות פתוחות, ואז למצוא את המשתמש עם הכי הרבה משימות סגורות.

7. יצירת ענן תגיות מכל התגיות בכל המשימות

הדוגמה האחרונה לפוסט והחביבה עליי ביותר היא היכולת לקחת את כל הערכים משדה שנשמר בתור מערך וליצור מהם קבוצה אחת גדולה שמכילה את כולם. זה עוזר כשיש לנו למשל רשימת תגיות ואנחנו רוצים לייצר ענן תגיות או פשוט להציג בצד של המסך רשימה של כל התגיות האפשריות.

ה Pipeline הראשון לוקח את שדה tags ומוציא את כל הערכים החוצה מהמערך כדי שנוכל לשחק איתם באמצעות שלב unwind, אחרי זה הוא מארגן את כולם בקבוצות אבל שימו לב למזהה שבחרתי - פשוט null. מזהה זה הוא זהה לכל המסמכים ולכן נקבל רק מסמך אחד בתוצאה. השדה tags של אותו מסמך בתוצאה מכיל את התוצאה של פקודת $addToSet, שלוקחת מכל מסמך את שדה tags שלו ומוסיפה אותו לקבוצה:

db.collection.aggregate([
  {
    "$unwind": "$tags"
  },
  {
    "$group": {
      _id: null,
      tags: {
        "$addToSet": "$tags"
      }
    }
  },

])

אתם מוזמנים לשחק עם השאילתה כאן: https://mongoplayground.net/p/2sxnLVSARWl נסו להוריד את ה $unwind ותראו אם אתם מצליחים לנחש מה יקרה.

אפשרות שניה ויותר מועילה היא להוסיף לכל תגית גם את מספר המסמכים שמתויגים בתגית זו. בשביל זה אני משתמש ב tags בתור המזהה כדי שכל המסמכים שיש להם את אותה תגית יהיו באותה קבוצה, ומוסיף להם שדה בשם count שמכיל את מספר המסמכים בקבוצה:

db.collection.aggregate([
  {
    "$unwind": "$tags"
  },
  {
    "$group": {
      _id: "$tags",
      "count": {
        "$count": {}
      }
    }
  },
])

צריך להגיד: התחביר של Aggregation Pipeline הוא לא אהבה ממבט ראשון. יש המון פקודות ולמרות זאת לפעמים אתם לא מוצאים בדיוק את הפקודה שאתם צריכים, ולפעמים צריך למצוא דרכים יצירתיות כדי לגרום ל Pipeline לבצע בדיוק את החישוב שרוצים. ועדיין, אחרי קצת אימונים אתם תראו ש Pipelines מצליחים לתת פיתרון לא פחות טוב (ואולי לפעמים קצת יותר טוב) מ SQL.