• בלוג
  • טיפ דרמטיק - עבודה עם actors אסינכרוניים

טיפ דרמטיק - עבודה עם actors אסינכרוניים

03/07/2023

דרמטיק היא ספריה להרצת משימות ברקע בפייתון. היא מאוד דומה ל celery אבל קצת יותר מתוחזקת ויודעת לעבוד טוב יותר עם משימות שצריך לתזמן לשעה מסוימת בעתיד. פועל של דרמטיק מריץ Threads, וכל Thread יודע להריץ פונקציות שנקראות Actors. דוגמת הקוד הבאה מהאתר שלהם מראה איך למשוך דף אינטרנט ולספור כמה מילים יש בו:

1. קוד הדוגמה

import dramatiq
import requests


@dramatiq.actor
def count_words(url):
     response = requests.get(url)
     count = len(response.text.split(" "))
     print(f"There are {count} words at {url!r}.")


# or send the actor a message so that it may perform the count
# later, in a separate process.
count_words.send("http://example.com")

בשביל ש count_words באמת ירוץ לא מספיק להפעיל את הסקריפט. יש להפעיל גם תהליך נפרד שמריץ את ה Worker של dramatiq, ואת זה עושים באמצעות הפעלה בחלון נפרד של הפקודה:

dramatiq main.py

בהנחה שלקובץ התוכנית קוראים main. עכשיו כל פעם שאריץ את main בחלון השני אני מקבל את ההודעה:

There are 298 words at 'http://example.com'.

2. תרגום לקוד אסינכרוני

עד לפה הכל טוב אבל מה קורה אם רוצים להריץ קוד אסינכרוני? מה אם count_words היתה משתמשת למשל ב aiohttp במקום ב requests? אני מעדכן את הקוד:

@dramatiq.actor
async def count_words(url: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            response = await resp.text()
            count = len(response.split(" "))
            print(f"There are {count} words at {url!r}.")

asyncio.run(count_words("http://example.com"))

והתוכנית עובדת, אבל הפעלה אסינכרונית כבר לא ממש:

count_words.send("http://example.com")

הודעת השגיאה שמופיעה על מסך ה Worker היא:

/Users/ynonp/work/intel-dec-2020/dramatiq-demo/venv/lib/python3.10/site-packages/dramatiq/worker.py:460: RuntimeWarning: coroutine 'count_words' was never awaited
  self.process_message(message)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

בואו נתקן את זה

3. הוספת תמיכה בפונקציות אסינכרוניות ל dramatiq

בגלל שדרמטיק פותח Thread-ים לא מומלץ לפתוח Event Loop חדש מתוך פונקציית ה Actor. לפעמים זה יעבוד אבל לפעמים זה ייכשל עם שגיאות מוזרות. עדיף להעביר את ה Event Loop מה Thread הראשי ל Threadשלנו, ולהריץ ב Event Loop הראשי. אבל איך נעשה את זה אם אין לנו גישה לתהליכון הראשי?

מסתבר שגירסאות עדכניות של דרמטיק הוסיפו תמיכה במנגנון זה ואפילו בצורה פשוטה. דבר ראשון יש להסיר את דרמטיק ולהתקין את גירסת ה master שלו:

$ yes | pip uninstall dramatiq
$ pip install git+https://github.com/Bogdanp/dramatiq@master

לאחר מכן יש להתקין מידלוור של דרמטיק שמריץ משימות אסינכרוניות. אני מעדכן שוב את הקוד:

import dramatiq
from dramatiq.middleware.asyncio import AsyncIO
from dramatiq.brokers.redis import RedisBroker
import requests
import aiohttp
import asyncio
import os

broker_url = os.environ.get("BROKER_URL", "redis://localhost:6379/0")
broker = RedisBroker(url=broker_url)
broker.add_middleware(AsyncIO())
dramatiq.set_broker(broker)


@dramatiq.actor
async def count_words(url: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            response = await resp.text()
            count = len(response.split(" "))
            print(f"There are {count} words at {url!r}.")

# asyncio.run(count_words("http://example.com"))

# Synchronously count the words on example.com in the current process
# count_words("http://example.com")

# or send the actor a message so that it may perform the count
# later, in a separate process.
count_words.send("http://example.com")

וזה הכל - מפעיל מחדש את הפועל של dramatiq בחלון אחד וכשאני מפעיל את הסקריפט עם ה Actor האסינכרוני הכל עובד ודרמטיק יודע להריץ אותו ב Event Loop הראשי של התהליך ולחכות שיסיים.