程序员文章、书籍推荐和程序员创业信息与资源分享平台

网站首页 > 技术文章 正文

python线程安全:使用Rlock实现可重入锁

hfteth 2024-12-17 11:38:27 技术文章 18 ℃

前面我们使用lock来对共享资源的并发修改进行控制,

但是如果函数a中加锁lock,而a函数调用b函数,b内部也对lock加锁,就可能出现死锁

使用 python线程安全:RLock 用于可重入锁定

同一个锁多加加锁。

如果由于代码中的错误或疏忽而未正确释放锁,则可能导致死锁,即线程无限期地等待锁被释放。死锁的原因包括:

  • 嵌套锁获取:如果线程尝试获取它已持有的锁,则可能会发生死锁。在传统锁中,尝试在同一线程中多次获取相同的锁会导致线程自行阻塞,如果没有外部干预,这种情况将无法解决。
  • 多个锁获取:当使用多个锁,并且线程以不一致的顺序获取它们时,可能会出现死锁。如果两个线程都持有一个锁并等待另一个锁,则两个线程都无法继续,从而导致死锁。

这个死锁问题可以通过使用 RLock 来解决,RLock 是一个可重入锁。当保持线程再次请求锁时,它不会阻塞。换句话说 线程在释放锁之前多次获取锁。这在递归函数或线程需要重新输入已锁定的锁定资源的情况下非常有用。

import threading
import time
from concurrent.futures import ThreadPoolExecutor

class BankAccount:
    def __init__(self):
        self.balance = 0
        self.lock = threading.RLock()

    def deposit(self, amount):
        print(
            f"Thread {threading.current_thread().name} "
            "waiting to acquire lock for .deposit()"
        )
        with self.lock:
            print(
                f"Thread {threading.current_thread().name} "
                "acquired lock for .deposit()"
            )
            time.sleep(0.1)
            self._update_balance(amount)

    def _update_balance(self, amount):
        print(
            f"Thread {threading.current_thread().name} "
            "waiting to acquire lock for ._update_balance()"
        )
        with self.lock:
            print(
                f"Thread {threading.current_thread().name} "
                "acquired lock for ._update_balance()"
            )
            self.balance += amount

account = BankAccount()

with ThreadPoolExecutor(max_workers=3, thread_name_prefix="Worker") as executor:
    for _ in range(3):
        executor.submit(account.deposit, 100)



class Semaphore:
 
    def __init__(self, value=1):
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
        self._cond = Condition(Lock())
        self._value = value

    def acquire(self, blocking=True, timeout=None):
     
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        with self._cond:
            while self._value == 0:
                if not blocking:
                    break
                if timeout is not None:
                    if endtime is None:
                        endtime = _time() + timeout
                    else:
                        timeout = endtime - _time()
                        if timeout <= 0:
                            break
                self._cond.wait(timeout)
            else:
                self._value -= 1
                rc = True
        return rc

    __enter__ = acquire

    def release(self):
        with self._cond:
            self._value += 1
            self._cond.notify()

    def __exit__(self, t, v, tb):
        self.release()


threading.RLock() 因为该对象允许线程多次获取相同的锁

使用Semaphore信号量限制访问

当资源数量有限并且许多线程尝试访问这些有限的资源时,信号量非常有用。它使用计数器来限制多个线程对关键部分的访问

import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor

# Semaphore with a maximum of 2 resources (tellers)
teller_semaphore = threading.Semaphore(2)

def now():
    return time.strftime("%H:%M:%S")

def serve_customer(name):
    print(f"{now()}: {name} is waiting for a teller.")
    with teller_semaphore:
        print(f"{now()}: {name} is being served by a teller.")
        # Simulate the time taken for the teller to serve the customer
        time.sleep(random.randint(1, 3))
        print(f"{now()}: {name} is done being served.")

customers = [
    "Customer 1",
    "Customer 2",
    "Customer 3",
    "Customer 4",
    "Customer 5",
]

with ThreadPoolExecutor(max_workers=5) as executor:
    for customer_name in customers:
        thread = executor.submit(serve_customer, customer_name)

print(f"{now()}: All customers have been served.")
teller_semaphore = threading.Semaphore(2)

Semaphore(2)中的2表示资源 数是 2 ,也就是最多能让2个线程同时访问某个资源 。

09:57:50: Customer 1 is waiting for a teller.
09:57:50: Customer 1 is being served by a teller.
09:57:50: Customer 2 is waiting for a teller.
09:57:50: Customer 2 is being served by a teller.
09:57:50: Customer 3 is waiting for a teller.
09:57:50: Customer 4 is waiting for a teller.
09:57:50: Customer 5 is waiting for a teller.
09:57:52: Customer 1 is done being served.
09:57:52: Customer 3 is being served by a teller.
09:57:53: Customer 2 is done being served.
09:57:53: Customer 4 is being served by a teller.
09:57:55: Customer 4 is done being served.09:57:55: Customer 3 is done being served.
09:57:55: Customer 5 is being served by a teller.
09:57:57: Customer 5 is done being served.
09:57:57: All customers have been served.

Java中的 Semaphore信号量在原理上与Python中一模一样


 }import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    // 创建一个Semaphore实例,许可数量为3
    private static final Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {
        // 创建并启动三个线程
        for (int i = 1; i <= 3; i++) {
            new Thread(new Task(semaphore), "线程" + i).start();
        }
    }

    static class Task implements Runnable {
        private final Semaphore semaphore;

        public Task(Semaphore semaphore) {
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                // 请求许可
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " 获取许可,正在执行");
                Thread.sleep(1000); // 模拟任务执行
                System.out.println(Thread.currentThread().getName() + " 执行完毕,释放许可");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                // 释放许可
                semaphore.release();
            }
        }
    }
}
  • semaphore.acquire();获取锁
  • semaphore.release();释放锁

高级编程语言中的多线程lock的原理都差不多

同步Synchronization和conditon

事件通知

Event对象的默认状是False,,调用set方法其状态变成True,调用reset方法恢复到False状态

线程中调用event.wait方法会阻塞线程,直接event的状态变成True

import threading
import time
from concurrent.futures import ThreadPoolExecutor

bank_open = threading.Event()
transactions_open = threading.Event()

def serve_customer(customer_data):
    print(f"{customer_data['name']} is waiting for the bank to open.")

    bank_open.wait()
    print(f"{customer_data['name']} entered the bank")
    if customer_data["type"] == "WITHDRAW_MONEY":
        print(f"{customer_data['name']} is waiting for transactions to open.")
        transactions_open.wait()
        print(f"{customer_data['name']} is starting their transaction.")

        # Simulate the time taken for performing the transaction
        time.sleep(2)

        print(
            f"{customer_data['name']} completed transaction and exited bank"
        )
    else:
        # Simulate the time taken for banking
        time.sleep(2)
        print(f"{customer_data['name']} has exited bank")

customers = [
    {"name": "Customer 1", "type": "WITHDRAW_MONEY"},
    {"name": "Customer 2", "type": "CHECK_BALANCE"},
    {"name": "Customer 3", "type": "WITHDRAW_MONEY"},
    {"name": "Customer 4", "type": "WITHDRAW_MONEY"},
]

with ThreadPoolExecutor(max_workers=4) as executor:
    for customer_data in customers:
        executor.submit(serve_customer, customer_data)

    print("Bank manager is preparing to open the bank.")
    time.sleep(2)
    print("Bank is now open!")
    bank_open.set()  # Signal that the bank is open

    time.sleep(3)
    print("Transactions are now open!")
    transactions_open.set()

print("All customers have completed their transactions.")

在这个代码中使用了两个event对象,第一个表示

  • bank_open开门状态

bank_open.wait()之后的代码会阻塞,直到bank_open.set被调用后

  • transactions_open交易开始状态

transactions_open.wait()之后的代码会阻塞,直到transactions_open.set被调用后

Tags:

最近发表
标签列表