1. Multithreading Module#
The threading module in Python provides support for threads.
threading Module#
Common functions in the threading module:
- threading.current_thread(): Returns the current thread object.
- threading.enumerate(): Returns a list of currently running threads. "Currently running" means threads that have started but not yet finished, excluding those that have not started or have terminated.
- threading.active_count(): Returns the number of currently running threads, which is the same as len(threading.enumerate()).
Thread Class#
Create a thread object using threading.Thread().
Main parameters:
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- group Defaults to None, reserved for future extension of the ThreadGroup class.
- target The callable object to be invoked by the run() method. Defaults to None, meaning no method needs to be called.
- name The thread name. By default, a unique name is constructed in the format "Thread-N", where N is a small decimal number.
- args A tuple of arguments to pass to the target function. Defaults to ().
- kwargs A dictionary of keyword arguments to pass to the target function. Defaults to {}.
- daemon Indicates whether the thread is a daemon thread.
Common methods and attributes of the Thread class
- run(): Method that represents the thread's activity.
- start(): Starts the thread's activity.
- join(*timeout=None*): Waits for the thread to terminate. This blocks the calling thread until the thread's join() method is called, it terminates normally, or an optional timeout occurs.
- isAlive(): Returns whether the thread is active.
- getName(): Returns the thread name.
- setName(): Sets the thread name.
- name: The name of the thread object.
- setDaemon(): Sets whether it is a daemon thread.
2. GIL Lock#
GIL, the Global Interpreter Lock, is a feature unique to the CPython interpreter that allows only one thread to be executed by the CPU at any given time within a process.
If a program wants to take advantage of the multi-core capabilities of a computer and allow the CPU to handle multiple tasks simultaneously, it is suitable for multiprocessing development (even if resource overhead is high).
If a program does not utilize the multi-core advantages of a computer, it is suitable for multithreading development.
In common program development, computational operations need to use the CPU's multi-core advantages, while I/O operations do not need to utilize the CPU's multi-core advantages. Therefore, there is this statement:
- For compute-intensive tasks, use multiprocessing, for example: large data calculations [cumulative calculation example].
- For I/O-intensive tasks, use multithreading, for example: file reading and writing, network data transmission [downloading Douyin videos example].
Serial processing:
import time
start = time.time()
result = 0
for i in range(100000000):
result += i
print(result)
end = time.time()
print("Time taken:", end - start)
Multithreaded processing:
import time
import multiprocessing
def task(start, end, queue):
result = 0
for i in range(start, end):
result += i
queue.put(result)
if __name__ == '__main__':
queue = multiprocessing.Queue()
start_time = time.time()
p1 = multiprocessing.Process(target=task, args=(0, 50000000, queue))
p1.start()
p2 = multiprocessing.Process(target=task, args=(50000000, 100000000, queue))
p2.start()
v1 = queue.get(block=True) # blocking
v2 = queue.get(block=True) # blocking
print(v1 + v2)
end_time = time.time()
print("Time taken:", end_time - start_time)
Combining multithreading and multiprocessing:
import multiprocessing
import threading
def thread_task():
pass
def task(start, end):
t1 = threading.Thread(target=thread_task)
t1.start()
t2 = threading.Thread(target=thread_task)
t2.start()
t3 = threading.Thread(target=thread_task)
t3.start()
if __name__ == '__main__':
p1 = multiprocessing.Process(target=task, args=(0, 50000000))
p1.start()
p2 = multiprocessing.Process(target=task, args=(50000000, 100000000))
p2.start()
3. Multithreading Development#
import threading
def task(arg):
pass
# Create a Thread object (thread) and encapsulate the task to be executed by the CPU when the thread is scheduled along with related parameters.
t = threading.Thread(target=task,args=('xxx',))
# The thread is ready (waiting for CPU scheduling), and the code continues to execute.
t.start()
print("Continuing execution...") # The main thread completes all code and does not end (waiting for the child thread)
Common methods of threads#
t.start()
, the current thread is ready (waiting for CPU scheduling, the specific time is determined by the CPU)
import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
t = threading.Thread(target=_add,args=(loop,))
t.start()
print(number)
t.join()
, waits for the current thread's task to complete before continuing execution
import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
t = threading.Thread(target=_add)
t.start()
t.join() # The main thread is waiting...
print(number)
import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
def _sub():
global number
for i in range(10000000):
number -= 1
t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t1.join() # t1 thread completes before continuing
t2.start()
t2.join() # t2 thread completes before continuing
print(number)
import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
def _sub(count):
global number
for i in range(count):
number -= 1
t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()
t1.join() # t1 thread completes before continuing
t2.join() # t2 thread completes before continuing
print(number)
t.setDaemon(boolean)
, daemon thread (must be set before start)t.setDaemon(True)
, sets it as a daemon thread; when the main thread completes, the child thread also automatically closes.t.setDaemon(False)
, sets it as a non-daemon thread; the main thread waits for the child thread, and the main thread only ends after the child thread completes. (default)
import threading
import time
def task(arg):
time.sleep(5)
print('Task')
t = threading.Thread(target=task, args=(11,))
t.setDaemon(True) # True/False
t.start()
print('END')
- Setting and getting thread names
import threading
def task(arg):
# Get the current thread executing this code
name = threading.current_thread().getName()
print(name)
for i in range(10):
t = threading.Thread(target=task, args=(11,))
t.setName('Day Demon-{}'.format(i))
t.start()
- Custom thread class, directly write the tasks that the thread needs to perform in the run method
import threading
class MyThread(threading.Thread):
def run(self):
print('Executing this thread', self._args)
t = MyThread(args=(100,))
t.start()
4. Thread Locks#
When multiple threads operate on a global variable, if one thread operates on the global variable in several steps, and before it gets the final result, this thread is removed from the CPU and other threads continue to use the CPU, it will mess up the global variable data.
import time, threading
balance = 0
def change_it(n):
# Deposit first and then withdraw, the result should be 0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(1000):
change_it(n)
# Repeat the experiment 100 times here
for i in range(100):
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
import time, threading
balance = 0
lock = threading.Lock()
def change_it(n):
# Deposit first and then withdraw, the result should be 0:
global balance
# Acquire lock for thread synchronization
lock.acquire()
balance = balance + n
balance = balance - n
# Release lock, allowing the next thread to proceed
lock.release()
def run_thread(n):
for i in range(1000):
change_it(n)
# Repeat the experiment 100 times here
for i in range(100):
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
If you want to manually lock in a program, there are generally two types: Lock and RLock.
- Lock, a synchronization lock. (does not support nested locks)
import threading
num = 0
lock_object = threading.Lock()
def task():
print("Starting")
lock_object.acquire() # The first thread to arrive enters and locks; other threads must wait.
global num
for i in range(1000000):
num += 1
lock_object.release() # The thread exits and unlocks, allowing other threads to enter and execute.
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
- RLock, a recursive lock. (supports nested locks)
import threading
num = 0
lock_object = threading.RLock()
def task():
print("Starting")
lock_object.acquire() # The first thread to arrive enters and locks; other threads must wait.
global num
for i in range(1000000):
num += 1
lock_object.release() # The thread exits and unlocks, allowing other threads to enter and execute.
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
RLock supports multiple lock acquisitions and releases; Lock does not support this.
import threading
lock = threading.RLock()
# Programmer A developed a function that can be called by other developers, which needs to ensure data safety based on locks.
def func():
with lock:
pass
# Programmer B developed a function that can directly call this function.
def run():
print("Other functionality")
func() # Calls the function written by Programmer A, which uses a lock internally.
print("Other functionality")
# Programmer C developed a function that needs to lock itself and also needs to call func.
def process():
with lock:
print("Other functionality")
func() # ----------------> This will cause multiple lock situations, only RLock supports (Lock does not support).
print("Other functionality")
5. Queue#
A queue can be seen as a simple version of event synchronization and condition synchronization, where thread A can store data in the queue, and thread B can take data from the queue without worrying about synchronization and locking issues. The main methods are:
- put(): Adds an item to the queue.
- get(): Removes and returns an item from the queue.
- task_done(): Called when a task is completed, indicating that the previously queued task has been completed.
- join(): Blocks until all items have been processed.
import threading
import queue
import random
import time
class Producer(threading.Thread):
def __init__(self, queue):
super(Producer,self).__init__()
self.queue = queue
def run(self):
while True:
integer = random.randint(0, 1000)
self.queue.put(integer)
print('%d put to queue by %s' % (integer, self.name))
time.sleep(1)
class Consumer(threading.Thread):
def __init__(self, queue):
super(Consumer, self).__init__()
self.queue = queue
def run(self):
while True:
integer = self.queue.get()
print('%d popped from list by %s' % (integer, self.name))
self.queue.task_done()
if __name__ == '__main__':
queue = queue.Queue()
t1 = Producer(queue)
t2 = Consumer(queue)
t1.start()
t2.start()
t1.join()
t2.join()