banner
kanes

kanes

In-depth and Easy-to-Understand Python Multithreading, Multiprocessing, and Coroutines

  1. Multiprocessing: Parallelism at the operating system level. Each process has its own independent memory space, and processes do not affect each other. Suitable for executing CPU-intensive tasks, making full use of multi-core CPUs. The overhead of creating and destroying processes is relatively large.
  2. Multithreading: Creating multiple execution flows within the same process. Threads share the memory space of the process. Suitable for executing I/O-intensive tasks (such as network requests, file read/write), because while waiting for I/O, other threads can continue to execute. However, in Python, it is limited by the GIL (Global Interpreter Lock), meaning only one thread can execute Python bytecode at a time, so multithreading does not achieve true parallelism for CPU-intensive tasks. The overhead of creating and destroying threads is smaller than that of processes.
  3. Coroutines: Cooperative multitasking in user space. Coroutines are lightweight and controlled by the program itself rather than scheduled by the operating system. They execute within a single thread and actively yield control using await or yield from, allowing other coroutines to run. Particularly suitable for handling large numbers of I/O-intensive tasks due to very low switching overhead. Coroutines cannot utilize multi-core CPUs for parallel computation.

1. Multiprocessing#

Use the multiprocessing module. Note that when running multiprocessing code on Windows, the main logic usually needs to be placed in the if __name__ == "__main__": block.

import multiprocessing
import time
import os

def process_task(name, duration):
    """Simulate a time-consuming task"""
    print(f"Process {name} (PID: {os.getpid()}) started, taking {duration} seconds...")
    time.sleep(duration)
    print(f"Process {name} (PID: {os.getpid()}) ended")

if __name__ == "__main__": # Windows needs this protection
    print("--- Starting multiprocessing example ---")
    start_time = time.time()

    # Create two processes
    p1 = multiprocessing.Process(target=process_task, args=("Process A", 2))
    p2 = multiprocessing.Process(target=process_task, args=("Process B", 3))

    # Start processes
    p1.start()
    p2.start()

    # Wait for all processes to complete
    p1.join()
    p2.join()

    end_time = time.time()
    print(f"--- Multiprocessing example ended, total time: {end_time - start_time:.2f} seconds ---")
    # Theoretically, if the CPU is multi-core and both tasks are CPU-intensive, the total time will be close to max(2, 3)=3 seconds
    # But here we use sleep to simulate I/O, so the total time will still be close to max(2, 3)=3 seconds, but the key is that they are executed in parallel

Result Analysis:
Process A and Process B start almost simultaneously, and their PIDs are different. The total time is close to the longest of the two tasks. This indicates that they are executed in parallel, each running in its own process.


2. Multithreading#

Use the threading module. Threads run within the same process and share memory.

import threading
import time
import os

def thread_task(name, duration):
    """Simulate a time-consuming task"""
    # Note: This simulates an I/O-intensive task (waiting)
    print(f"Thread {name} (PID: {os.getpid()}) started, taking {duration} seconds...")
    time.sleep(duration)
    print(f"Thread {name} (PID: {os.getpid()}) ended")

if __name__ == "__main__": # Although threading does not enforce this, it is conventionally placed here
    print("--- Starting multithreading example ---")
    start_time = time.time()

    # Create two threads
    t1 = threading.Thread(target=thread_task, args=("Thread A", 2))
    t2 = threading.Thread(target=thread_task, args=("Thread B", 3))

    # Start threads
    t1.start()
    t2.start()

    # Wait for all threads to complete
    t1.join()
    t2.join()

    end_time = time.time()
    print(f"--- Multithreading example ended, total time: {end_time - start_time:.2f} seconds ---")
    # Since this simulates I/O (sleep releases GIL), the total time will be close to max(2, 3)=3 seconds.
    # If it were purely CPU computation, due to GIL, the total time would be close to 2+3=5 seconds.

Result Analysis:
Thread A and Thread B start almost simultaneously, but their PIDs are the same because they run within the same process. The total time is close to the longest of the two tasks because time.sleep() releases the GIL while waiting, allowing other threads to run.


3. Coroutines - Using asyncio#

Use the asyncio module. Coroutines are single-threaded and scheduled through an event loop. await is key, indicating that the current coroutine pauses execution, yielding control back to the event loop, allowing it to run other ready coroutines.

import asyncio
import time
import os

async def async_task(name, duration):
    """Simulate an asynchronous time-consuming task"""
    # Note: Here you must use await asyncio.sleep() to simulate asynchronous waiting
    print(f"Coroutine {name} (PID: {os.getpid()}) started, taking {duration} seconds...")
    await asyncio.sleep(duration) # <-- Key: the coroutine yields control here
    print(f"Coroutine {name} (PID: {os.getpid()}) ended")

async def main():
    """Main coroutine, creates and runs other coroutines"""
    print("--- Starting coroutine example ---")
    start_time = time.time()

    # Create two coroutine objects
    task1 = async_task("Coroutine A", 2)
    task2 = async_task("Coroutine B", 3)

    # Use asyncio.gather to run coroutines concurrently and wait for them to complete
    await asyncio.gather(task1, task2)

    end_time = time.time()
    print(f"--- Coroutine example ended, total time: {end_time - start_time:.2f} seconds ---")
    # Total time is close to max(2, 3)=3 seconds, as they can switch execution while waiting.
    
if __name__ == "__main__":
    asyncio.run(main()) # Start the event loop and run the main coroutine

Result Analysis:
Coroutine A and Coroutine B start almost simultaneously, and their PIDs are also the same (because they run in the same process and thread). The total time is close to the longest of the two tasks. This is because when Coroutine A reaches await asyncio.sleep(2), it yields control to the event loop, which finds Coroutine B ready to run (it just started) and switches to execute Coroutine B. When Coroutine B also encounters await asyncio.sleep(3), it similarly yields control. The event loop waits for whichever coroutine finishes waiting first and then resumes its execution.


4. Summary and Comparison#

FeatureMultiprocessing (multiprocessing)Multithreading (threading)Coroutines (asyncio)
Parallelism/ConcurrencyParallel - Truly utilizes multi-coreConcurrency - High efficiency for I/O-intensive, limited by GIL for CPU-intensiveConcurrency - Cooperative, switches within a single thread
Resource ConsumptionHeavy (independent memory space)Light (shared memory space)Lightest (user space, small stack overhead)
Switching MethodOS scheduling (preemptive)OS scheduling (preemptive)Program control (await/yield from) (cooperative)
Applicable ScenariosCPU-intensive tasks that need full utilization of multi-coreI/O-intensive tasks, simple concurrency needsLarge numbers of I/O-intensive tasks, high concurrent connections
Data SharingRequires IPC (pipes, queues, etc.), complexShared memory, requires locks and other synchronization mechanisms, more complexShared memory, usually through parameter passing or shared objects, needs coroutine safety
Error HandlingOne process crashing does not affect othersOne thread crashing may cause the entire process to crashAn exception in one coroutine usually only affects itself, but uncaught may affect the event loop
Python LimitationsNot affected by GIL (each process has its own interpreter)Affected by GIL (only one thread executes Python bytecode at a time)Not directly affected by GIL (since only runs in one thread), but CPU-intensive tasks will block the entire event loop

5. Coroutines with async await asyncio#

What happens if asyncio, await, and async are not written?#

async, await, and asyncio are the syntax sugar and runtime environment for the coroutine cooperative model, all of which are indispensable.

  1. If async def is not written:

    • If you define a function but forget to write async def, it becomes a regular synchronous function.
    • Consequences:
      • Cannot use await: If await is used inside this function, Python will raise a SyntaxError because await can only be used inside async def functions.
      • Cannot be awaited: This regular function will return a result (or raise an exception) directly after being called; it is not an awaitable object, and you cannot await it inside other async def functions.
    # Error example: using await in a regular function
    # def blocking_function(): # Missing async
    #     await asyncio.sleep(1) # SyntaxError: 'await' not in async function
    #     print("This will not run")
  1. If await is not written:

    • If inside an async def function, you call an awaitable object (like an object returned by another coroutine function Workspace_url(...) or asyncio.sleep(...)), but forget to write await.
    • Consequences:
      • Will not wait: The current coroutine will immediately continue executing, without pausing or waiting for that awaitable object’s result.

      • Awaitable object is ignored (or produces a warning): Calling Workspace_url(...) or asyncio.sleep(...) will return a coroutine object, but since there is no await in front, this coroutine object will not be submitted to the event loop for execution. It will sit there like an unused variable until garbage collected. This is often a very common source of errors in asyncio programming.

  2. If asyncio.run() (or equivalent event loop startup code) is not written:

    • If you define a top-level async def main() function but simply call main().
    • Consequences:
      • Asynchronous code will not run: Calling main() will only return a coroutine object. This coroutine object contains all the asynchronous logic code, but it has not been submitted to any event loop for execution. The event loop is the "engine" required for asynchronous code to run. Without starting the event loop and handing the top-level coroutine to it, any code inside async def functions (including await calls) will not be executed.
    async def my_app():
        print("I am an asynchronous application")
        await asyncio.sleep(1)
        print("I should have finished running")
    
    # Error example: directly calling async function
    # my_app() # Calling returns a coroutine object, but nothing will be printed
    # print("Program ends") # This line will execute immediately
    
    # Correct approach is:
    # asyncio.run(my_app()) # Start the event loop and run my_app coroutine
    
  • async def marks a function as asynchronous, causing its call to return a coroutine object.
  • await is used inside asynchronous functions, marking a pause point that yields control back to the event loop and waits for an awaitable object to complete.
  • asyncio (especially the event loop, started via asyncio.run or other means) is the runtime environment for executing coroutines, receiving coroutine objects, scheduling their execution, and switching between them at await points.

6. Using create_task with Coroutines#

How can multiple coroutines start running simultaneously, rather than one await completing before awaiting another (which would be serial)?

asyncio.create_task() is one of the key functions to solve this problem.

The role of asyncio.create_task(coro)#

  • Functionality: Wraps an awaitable coroutine object (coro) into a Task object and schedules it to be executed by the currently running event loop.
  • Return Value: Returns an asyncio.Task object.

What is a Task?#

  • asyncio.Task is one of the core objects provided by asyncio.
  • It can be seen as a handle or representative of a coroutine that is currently running (or scheduled to run) in the event loop.
  • Task objects are themselves awaitable. You can await a Task object to wait for the coroutine it wraps to complete and get its result.
  • Task objects provide methods to check the state of the coroutine (whether it is complete, cancelled, etc.), retrieve results, get exceptions, or cancel the coroutine.

asyncio.create_task() vs Direct await#

  1. Directly await a coroutine call:

    async def main():
        print("main start")
        await some_async_function() # <-- The current main coroutine will pause here until some_async_function completes
        print("main end")
    
    async def some_async_function():
        print("some_async_function start")
        await asyncio.sleep(2)
        print("some_async_function end")
    
    # Execution order: main start -> some_async_function start -> wait 2 seconds -> some_async_function end -> main end
    # The main function waits for some_async_function to complete, which is serial waiting.
    

    Directly await means that the current coroutine must wait for the awaitable object being awaited to complete before continuing.

  2. Using asyncio.create_task():

    async def main():
        print("main start")
        # Create a Task, scheduling some_async_function in the event loop
        task = asyncio.create_task(some_async_function())
        print("some_async_function has been scheduled as a Task, main continues executing")
    
        # At this point, the main coroutine will immediately continue executing without waiting for task to complete
        # task will run concurrently with main in the event loop
    
        # If the main function does not await task here, it will finish quickly
        # To ensure main waits for task to truly complete, we need to await task somewhere
        await task # <-- main waits here for task to complete
        print("task completed, main end")
    
    async def some_async_function():
        print("some_async_function start")
        await asyncio.sleep(2)
        print("some_async_function end")
    
    # Execution order: main start -> some_async_function has been scheduled... -> main continues executing -> (event loop switches) some_async_function start -> (event loop switches) main continues executing... -> (event loop switches) wait 2 seconds -> some_async_function end -> (event loop switches) task completed, main end
    # main and task run concurrently. main does not block immediately after scheduling task, but continues executing.
    # The final await task ensures main does not end before task completes.
    

    asyncio.create_task() tells the event loop: “This is another coroutine task, add it to your to-do list and run it when appropriate.” Calling create_task does not pause the calling coroutine; it immediately returns a Task object and continues executing its own code.

Typical Use Cases for create_task()#

  1. Starting "background" tasks: You want a coroutine to start executing but do not care when it completes, or only occasionally need to check its status. For example, starting a logging coroutine, a monitoring coroutine, etc.
  2. When independent task management is needed: You may need to get the Task object to cancel a running task (task.cancel()), check if it has completed (task.done()), or retrieve results (task.result()).
  3. Used with asyncio.gather(): asyncio.gather() can accept both coroutine objects and Task objects directly. While passing coroutine objects directly is more concise, create_task is useful when you need to create Task objects for some preprocessing or management.

Combining asyncio.gather() with create_task()#

Use create_task to start multiple coroutines, then use asyncio.gather to wait for them to complete.

async def download_page(url):
    print(f"Starting download: {url}")
    await asyncio.sleep(random.randint(1, 3)) # Simulate download time
    print(f"Download complete: {url}")
    return f"Content of {url[:20]}..."

async def main_with_gather():
    urls = ["url1", "url2", "url3"] # Real URLs
    print("main_with_gather: Preparing to create tasks")

    # Create a list of Tasks
    tasks = []
    for url in urls:
        # Call download_page to return a coroutine object, then wrap it with create_task to make it a Task
        task = asyncio.create_task(download_page(url))
        tasks.append(task)
    print("main_with_gather: All tasks have been created")

    # await asyncio.gather waits for all Task objects in the tasks list to complete
    # During the waiting period, the event loop will schedule the coroutines in tasks to run concurrently
    results = await asyncio.gather(*tasks)
    print("main_with_gather: All tasks have completed")

    for result in results:
        print(f"Result: {result}")

import random
if __name__ == "__main__":
    asyncio.run(main_with_gather())

asyncio.create_task() turns each download_page(url) coroutine into a Task that can run independently in the event loop. await asyncio.gather(*tasks) then makes the main_with_gather coroutine pause until all these Tasks are complete. During this waiting period, the event loop will handle switching between these Tasks, achieving concurrent downloads.

Note: create_task() must be called after the event loop is already running. When using asyncio.run(main()) to enter the main coroutine, the event loop is already running, so it is safe to use create_task in main or in other coroutines called by main.

Some Methods of Task Objects#

  • task.done(): Check if the Task is complete (including normal completion, exception thrown, or cancelled).
  • task.result(): Get the result of the Task. If called before the Task is complete, it raises InvalidStateError; if the Task completed with an exception, it re-raises that exception.
  • task.exception(): Get the exception from the Task upon completion. Returns None if completed normally or not completed.
  • task.cancel(): Request to cancel the Task. The Task will receive an asyncio.CancelledError exception internally. The Task needs to handle this exception to implement graceful cancellation.
  • task.cancelled(): Check if the Task has been cancelled.

7. General Template for Coroutines#

# This is a general code template for Python coroutines (asyncio)

import asyncio
import time
import random # Import random to simulate different task durations

# =============================================================================
# Step 1: Define independent asynchronous tasks (coroutine functions)
# Use the async def keyword to define, indicating this is a coroutine function.
# Coroutine functions will use await to call other awaitable objects (like async I/O operations, asyncio-provided async tools, or other coroutines).
# These functions typically contain the actual asynchronous workload.
# =============================================================================
async def async_worker_task(task_id: int, simulate_duration: int):
    """
    Define a coroutine function for an asynchronous worker task.

    Args:
        task_id: Unique identifier for the task.
        simulate_duration: Simulated time the task will take (actually the time for await asyncio.sleep).
    """
    # Print when the task starts, typically you will see multiple tasks' "start" prints appearing almost simultaneously
    print(f"[Task {task_id}] Starting execution, expected duration {simulate_duration} seconds...")

    # Use await to perform an asynchronous operation.
    # asyncio.sleep() is an awaitable object; awaiting it pauses the current coroutine,
    # yielding control to the event loop, which can run other ready coroutines during this time.
    # This simulates waiting for an async I/O operation (like a network response, database query result, etc.), without blocking the entire thread.
    await asyncio.sleep(simulate_duration)

    # Print when the task is complete
    print(f"[Task {task_id}] Execution complete.")

    # The task can return a result
    return f"Task {task_id} completed, took {simulate_duration} seconds."


# =============================================================================
# Step 2: Define the main asynchronous function (Orchestrator / Entry Point)
# This is the entry point for the entire async program, and it is also an async def function.
# It is responsible for creating, organizing, and scheduling other coroutine tasks.
# Typically, asyncio.gather or asyncio.create_task will be used here to manage the concurrent execution of multiple tasks.
# =============================================================================
async def main_async_orchestrator():
    """
    Main asynchronous function, responsible for creating and running worker tasks.
    """
    print("Main orchestrator: Asynchronous process starting")
    start_time = time.time()

    # Prepare some task parameters
    task_configurations = [
        (1, random.randint(1, 4)), # Task 1, random duration 1-4 seconds
        (2, random.randint(2, 5)), # Task 2, random duration 2-5 seconds
        (3, random.randint(1, 3)), # Task 3, random duration 1-3 seconds
        (4, random.randint(3, 6)), # Task 4, random duration 3-6 seconds
    ]

    # =========================================================================
    # Step 3: Create coroutine objects
    # Calling async def functions returns coroutine objects; at this point, the code inside the coroutine has not started executing.
    # =========================================================================
    # Create a list of coroutine objects
    coroutine_objects = [
        async_worker_task(task_id, duration)
        for task_id, duration in task_configurations
    ]
    print(f"Main orchestrator: {len(coroutine_objects)} coroutine objects created.")


    # =========================================================================
    # Step 4: Concurrently execute coroutine tasks
    # Using asyncio.gather() is the most common method; it can run all "awaitable objects"
    # (including coroutine objects and Task objects) in the list concurrently and wait for them to complete.
    # await asyncio.gather(...) will pause the current main coroutine and yield control to the event loop,
    # which will concurrently schedule the coroutines in coroutine_objects to run.
    # When all coroutines are complete, gather returns a list containing the return values of all coroutines.
    # =========================================================================
    print("Main orchestrator: Running tasks concurrently using asyncio.gather...")
    # await is key here; the main coroutine will wait for all child tasks to complete
    results = await asyncio.gather(*coroutine_objects)

    print("Main orchestrator: All concurrent tasks have completed.")
    print("Main orchestrator: The results of all tasks are as follows:")
    for res in results:
        print(f"- {res}")

    # =========================================================================
    # Optional: Demonstrate using asyncio.create_task to create a task without waiting immediately
    # If you want a task to run in the background without waiting for it to complete, you can use asyncio.create_task()
    # background_task = asyncio.create_task(some_other_async_task())
    # print("Main orchestrator: Started a background task...")
    # Note: If the main coroutine ends before the background task completes, the background task may be cancelled,
    # If you need to ensure the background task completes, you may need to await background_task somewhere
    # =========================================================================

    end_time = time.time()
    print(f"Main orchestrator: Asynchronous process ended. Total time: {end_time - start_time:.2f} seconds")


# =============================================================================
# Step 5: Entry point of the program
# Use the standard if __name__ == "__main__": protection block.
# Here, call asyncio.run() to start the event loop and run the top-level main asynchronous function.
# asyncio.run() is the recommended way for Python 3.7+, which will handle creating the event loop,
# running the passed coroutine until completion, and finally closing the event loop.
# =============================================================================
if __name__ == "__main__":
    print("Program starting")
    # Call asyncio.run() to execute our main asynchronous function main_async_orchestrator
    asyncio.run(main_async_orchestrator())
    print("Program has fully exited.")
Program starting
Main orchestrator: Asynchronous process starting
Main orchestrator: 4 coroutine objects created.
Main orchestrator: Running tasks concurrently using asyncio.gather...
[Task 1] Starting execution, expected duration 2 seconds...
[Task 2] Starting execution, expected duration 5 seconds...
[Task 3] Starting execution, expected duration 2 seconds...
[Task 4] Starting execution, expected duration 5 seconds...
[Task 1] Execution complete.
[Task 3] Execution complete.
[Task 2] Execution complete.
[Task 4] Execution complete.
Main orchestrator: All concurrent tasks have completed.
Main orchestrator: The results of all tasks are as follows: 
- Task 1 completed, took 2 seconds.    
- Task 2 completed, took 5 seconds.
- Task 3 completed, took 2 seconds.
- Task 4 completed, took 5 seconds.
Main orchestrator: Asynchronous process ended. Total time: 5.00 seconds
Program has fully exited.

This article is updated synchronously to xLog by Mix Space
The original link is https://blog.kanes.top/posts/default/understanding-python-multithreading-multiprocessing-coroutines


Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.