Skip to content

Example: Task generator (yield)

QTasks supports tasks with yield, both in asynchronous and synchronous form. This allows values to be processed as they are generated, and the task can be launched from an asynchronous QueueTasks application, regardless of the function type.


🔧 Example 1: Asynchronous generation with intermediate processing

async def yield_func(result):
    print(f"Processed: {result}")
    return result + 2

@app.task(generate_handler=yield_func, echo=True)
async def test_yield(n: int):
    for _ in range(n):
        n += 1
        yield n

Input: 5

Console output:

Processed: 6
Processed: 7
Processed: 8
Processed: 9
Processed: 10

Task result: [8, 9, 10, 11, 12]


🔧 Example 2: Synchronous generation with post-processing

def sync_handler(value):
    print("SYNC:", value)
    return value * 2

@app.task(generate_handler=sync_handler)
def sync_gen(n: int):
    for i in range(n):
        yield i + 1

Input: 5

Console output:

SYNC: 1
SYNC: 2
SYNC: 3
SYNC: 4
SYNC: 5

Task result: [2, 4, 6, 8, 10]


🔧 Example 3: Generating and collecting IDs via an external service

async def save_to_db(value):
    db.insert({"value": value})
    return value

@app.task(generate_handler=save_to_db)
def ids():
    for id_ in range(5):
        yield f"user_{id_}"

Application: automatic writing to the database and returning a list of records.


⚙️ How does generate_handler work inside QTasks?

  • The generate_handler argument is called on every value generated by yield.
  • The result of return from the handler is added to the resulting list (list) of the task.
  • If generate_handler is defined, the task is executed via run_task_gen, otherwise, it is executed in the usual way.
  • The generating setting in TaskExecSchema controls the switch to generator mode.

🏢 Example of use in a company

Suppose you have a task that sends notifications to employees on a schedule:

async def log_notification(msg):
    logger.info("Sent:", msg)
    return msg

@app.task(generate_handler=log_notification)
def send_notifications():
    for employee in get_employees():
        yield f"Notification sent: {employee.email}"

Result:

  • The generator iterates through the list of employees.
  • For each message, generate_handler is called, which logs or sends the data to the system.
  • A list of notifications is returned.

This approach is useful for:

  • Integrations with external APIs
  • Gradual streaming
  • Tasks where transformation and control of each step is important

✅ Summary

Tasks with yield + generate_handler are a powerful reactive processing tool that makes QTasks flexible for real-time processing, logging, streaming, and more complex pipelines. Support for both async and sync makes it convenient for any development style.