`

《java并发编程实践》读书笔记(3)

    博客分类:
  • Book
阅读更多
上一篇
死锁
当每个人都拥有他人需要的资源, 并且等待其他人正在占有的资源, 如果大家一直占有资源, 直到获得自己需要却没被占用的资源, 那么就会产生死锁.
当一个线程永远占有一个锁, 而其他线程尝试去获得这个锁, 那么它们将永远被阻塞. 当线程占有锁L时, 想要获得锁M, 但是同时, 线程B持有M, 并尝试获得L, 两个线程将永远等待下去, 这种情况是死锁的最简单的形式.

当死锁的时候, 往往是遇到了最不幸的时候:在高负载之下.

如果所有线程以通用的固定次序获得锁, 程序就不会出现锁顺序死锁问题了.

动态的锁顺序死锁
这种情况举个例子比较好明白, 比如一个转账操作, 将账号A的钱转到账号B中, 然后在转账过程中顺序对账号A和账号B使用同步锁, 同时又出现账号B向账号A转账, 这时应为参数顺序导致死锁.
代码如下:
    public void transferMoney(Account fromAccount, Account toAccount, DollarAmount amount) {
        synchronized (fromAccount) {
            synchronized (toAccount) {
                if (fromAccount.getBalance().compareTo(amount)<0) {
                    throw new RuntimeException();
                } else {
                    fromAccount.debit(amount);
                    toAccount.credit(amount);
                }
            }
        }
    }

避免这种情况是强制指定顺序:
public void transferMoney(final Account fromAccount, final Account toAccount, final DollarAmount amount) {
        class Helper {
            public void transfer() {
                if (fromAccount.getBalance().compareTo(amount) < 0) {
                    throw new RuntimeException();
                } else {
                    fromAccount.debit(amount);
                    toAccount.credit(amount);
                }
            }
        }
       // 通过唯一hashcode来统一锁的顺序, 如果account具有唯一键, 可以采用该键来作为顺序.
        int fromHash = System.identityHashCode(fromAccount);
        int toHash = System.identityHashCode(toAccount);
        if (fromHash < toHash) {
            synchronized (fromAccount) {
                synchronized (toAccount) {
                    new Helper().transfer();
                }
            }
        } else if (fromHash > toHash) {
            synchronized (toAccount) {
                synchronized (fromAccount) {
                    new Helper().transfer();
                }
            }
        } else {
            synchronized (tieLock) { // 针对fromAccount和toAccount具有相同的hashcode
                synchronized (fromAccount) {
                    synchronized (toAccount) {
                        new Helper().transfer();
                    }
                }
            }
        }
    }



需要等待其他任务的结果是生成线程饥饿死锁的来源, 有界池和相互依赖的任务不能放在一起使用.

活锁(live lock)是线程中活跃度失败的另一种形式, 尽管没有被阻塞, 线程却仍然不能继续, 因为它不断重试相同的操作, 却总是失败. 活锁通常发生在消息处理应用程序中.

如果程序都不能保持现有的处理器处于忙碌的工作状态, 添加更多的处理器也无济于事. 线程通过分解应用程序, 总是让空闲的处理器进行未完成的工作, 从而保持所有的cpu热火朝天的工作.

可伸缩性指的是: 当增加计算资源的时候(比如增加额外CPU数量, 内存, 存储器, I/O宽带), 吞吐量和生产量能够得到相应的得到改进.

我们使用线程的重要原因之一是为了支配多处理器的能力, 我们必须保证问题被恰当地进行并行化分解, 并且我们的程序有效地使用了这种并行的潜能.

ReadWriteLock实现了一个多读少写的加锁规则: 只要没有修改, 那么多个读者可以并发的访问共享资源, 但是写者必须独占获得锁. 对于多数操作都为读操作的数据结构, ReadWriteLock比独占的锁具有更好的并发性.

单线程化的ConcurrentHashMap的性能要比同步的HashMap的性能稍好一些, 而且在并发应用中, 这种作用就十分明显了. ConcurrentHashMap的实现, 假定大多数常用的操作都是获得已存在的某个值, 因此它的优化是针对get操作. 提供更好的性能和并发性.

同步的Map实现中, 可伸缩性最主要的障碍在于整个Map存在 一个锁, 所以一次只有一个线程能够访问Map, 从另一个方面来说, ConcurrentHashMap并没有对成功的读操作加锁, 对写操作和真正需要锁的读操作使用了分离锁的方法, 因此, 多线程能够并发访问map, 而不是被阻塞.

关于救火的一个比喻
救火有两种解决方案: 一个是排成长队以传水救火, 另一个是一群人跑来跑去的救火. 在后一种方案中, 你对水源和火源都存在很大的竞争(结果是更少的水能够传递到灭火点), 还会造成更低的效率, 因为每一个工人都在不停的变换模式(注水, 跑步, 倒水, 跑步, 等等). 在第一种方案中, 从水源到燃烧的建筑物间的水流是不断的, 付出更少的体力却换得了更多的水传输过来, 每个工人只负责自己的一项工作.

高级主题
ReentrantLock实现了Lock接口, 提供了与synchronized相同的互斥和内存可见性的保证, 或者ReentrantLock的锁与进入synchronized块有相同的内存语义, 释放ReentrantLock锁与退出synchronized块有相同的意义.但是与synchronized相比, ReentrantLock为处理比可用的锁提供了更多的灵活性.

synchronized的一个缺点是不能中断正在等待获取锁的线程, 并且在请求锁失败的情况, 必须无限制的等待.

使用tryLock避免顺序死锁:
    public boolean transferMoney(final Account fromAccount, final Account toAccount, final DollarAmount amount,
            long timeout, TimeUnit unit) throws InterruptedException {
        long fixedDelay = 123;
        long randMod = 456;
        long stopTime = System.nanoTime() + unit.toNanos(timeout);
        while (true) {
            if (fromAccount.getLock().tryLock()) { // 如果不能获得锁就返回重试
                try {
                    if (toAccount.getLock().tryLock()) {
                        try {
                            if (fromAccount.getBalance().compareTo(amount) < 0) {
                                throw new RuntimeException();
                            } else {
                                fromAccount.debit(amount);
                                toAccount.credit(amount);
                            }
                        } finally {
                            toAccount.getLock().unlock();
                        }
                    }
                } finally {
                    fromAccount.getLock().unlock();
                }
            }
            
            if (System.nanoTime() > stopTime) { // 重试了指定次数仍然无法获得锁则返回失败
                return false;
            }
            
            Thread.sleep(fixedDelay + new Random().nextLong() % randMod);
        }
    }

使用预定时间锁:
    public boolean trySendOneSharedLine(String message, long timeout, TimeUnit unit) throws InterruptedException {
        long nanosToLock = unit.toNanos(timeout) - estimatedNanosToSend(message);
        if (lock.tryLock(nanosToLock, TimeUnit.NANOSECONDS)) {
            return false;
        }
        
        try {
            return sendOnSharedLined(message);
        }finally {
            lock.unlock();
        }
    }


在内部锁不能满足使用时, ReentrantLock才被作为更高级的工具, 当你需要以下高级特性时, 才应该使用: 可定时的, 可轮询的与可中断的锁获取操作, 公平队列, 或者非块结构的锁, 否则请使用synchronized.

ReentrantLock实现了标准的互斥锁: 一次最多只有一个线程能够持有相同ReentrantLock. 但是互斥通常作为保护数据一致性的很强的加锁约束, 因此过分地限制了并发性. 互斥是保守的加锁策略, 避免了"写/写", "读/写"的重叠, 但是同样避开了"读/读"的重叠. 为了提高并发性, 提供了一个读写锁, 它允许一个资源能够被多个读者访问, 或者被一个写者访问, 两者不能同时进行.

ReadWriteLock同时暴露了两个Lock对象, 一个用来读, 一个用来写. 读取ReadWriteLock锁守护的数据, 你必须首先获得读取的锁, 当需要修改守护的数据时, 你必须首先获得写入的锁.
public class ReadWriteMap<K, V> {
    private final Map<K, V> map;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock w = lock.writeLock();
    private final Lock r = lock.readLock();

    public ReadWriteMap(Map<K, V> map) {
        this.map = map;
    }

    public V put(K key, V value) {
        w.lock();
        try {
            return map.put(key, value);
        } finally {
            w.unlock();
        }
    }

    public V get(K key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }
}



条件队列
条件队列就如同烤面包机上的面包已好的铃声, 如果你正在听着它, 当面包烤好后你可以立即注意到, 并且放下手头的事情开始品尝面包, 如果你没有听见它, 你会错过通知消息, 但是回到厨房后还是看到面包的状态, 如果已经烤完, 就取面包, 如果未烤完, 就再次监听铃声.

如果Object.wait()方法被调用表示"我要去休息了, 但是发生了需要关注的事情后叫醒我", 调用通知方法意味着"需要关注的事情发生了".

调用notify的结果是: JVM会从在这个队列中等待的众多线程中挑选出一个, 并把它唤醒; 而调用notifyAll会唤醒所有正在这个条件队列中等待的线程.调用notifyAll()的效率有时候非常低, 这主要是因为如果有10个线程在条件队列中等待, 调用notifyAll会唤醒每一个线程, 让它们去竞争锁, 然后它们中的大多数或者全部又回到休眠状态, 这意味着每一个激活单一线程执行的事件, 都会带来大量的上下文切换, 和大量竞争锁的请求.

Condition
Condition是一种广义的内部条件队列, 一个Condition和一个单独的Lock相关联, 调用Lock.newCondition()方法, 可以创建一个Condition. 每个Lock可以有任意数量的Condition对象. wait, notify, notifyAll在Condition中都有对等体:await, signal, signalAll, 而且一定要使用后者!
一个使用Condition的BoundedBuffer:
public class ConditionBoundedBuffer<T> {
    private static final int BUFFER_SIZE = 2;
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    private final T[] items = (T[]) new Object[BUFFER_SIZE];
    private int tail, head, count;

    public void put(T x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[tail] = x;
            if (++tail == items.length) {
                tail = 0;
            }
            count++;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            T x = items[head];
            items[head] = null;
            if (++head == items.length) {
                head = 0;
            }
            count--;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        final ConditionBoundedBuffer<String> cbb = new ConditionBoundedBuffer<String>();
        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    cbb.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    cbb.put("haha");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        // new Thread(new Runnable() {
        //
        // @Override
        // public void run() {
        // try {
        // cbb.take();
        // } catch (InterruptedException e) {
        // e.printStackTrace();
        // }
        // }
        // }).start();
        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    cbb.put("haha");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}


ReentrantLock和Semaphore这两个接口有很多共同点, 它们都扮演了"阀门"的角色, 每次允许有限数量的线程通过, 线程到达阀门之后, 可以允许通过(lock和acquire成功返回), 也可以等待(lock和acquire阻塞), 也可以被取消(tryLock和tryAcquire返回false, 指明在允许的时间内, 锁或者许可不可用), 更进一步, 它们允许可中断, 不可中断的, 可限时的请求尝试, 它们都允许选择公平, 非公平的等待线程队列.

CountDownLatch, ReentrantLock, SynchronousQueue和FutureTask都构建于共同的基类AbstractQueuedSynchronizer. AQS获取操作可能是独占的, 就像ReentrantLock一样, 也可能是非独占的, 就像Semaphore和CountDownLatch一样, 这取决于不同的Synchronizer.

基于AQS的闭锁
public class OneShotLatch {
    private final Sync sync = new Sync();

    private class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected int tryAcquireShared(int arg) {
            return (getState() == 1) ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            setState(1);
            return true;
        }
    }

    public void signal() {
        sync.releaseShared(0);
    }

    public void await() throws InterruptedException {
        sync.acquireInterruptibly(0);
    }
}

代码中通过AQS管理闭锁的状态:关闭0, 打开1。 await方法调用AQS的acqurieSharedInterruptibly, 后者随后请求OneShotLatch中的tryAcquireShared方法必须返回一个值来表明操作能否继续执行. 如果闭锁已经事先打开, tryAcquireShared会返回成功, 并允许线程通过; 否则他会返回一个值, 表明获取请求的尝试失败. acquireSharedInterruptibly方法处理失败的方式, 是把线程植入一个队列中, 该队列中的元素都是等待中的线程, asignal调用releaseShared, 进而导致tryReleaseShared被调用. tryReleasseShared的实现无条件地把闭锁的状态设置为打开, 通过返回值表明Synchronizer处于完全释放的状态.

原子变量与非阻塞同步机制
与基于锁的方案对比, 非阻塞算法的设计与实现要复杂的多. 但是他们可伸缩性和活跃度上占有很大的优势. 因为非阻塞算法可以让多个线程在竞争相同资源时不会发生阻塞, 所以它能在更精细化的层面上调整力度, 并能大大减少调度的开销. 进一步讲, 他们对死锁和其他活跃度问题具有免疫性

volatile变量与锁相比是更轻量的同步机制, 因为它不会引起上下文的切换和线程调度. 然而, volatile变量与锁相比有一定的局限性:尽管他们提供了相似的可见性保证, 但是它不能用于构造原子性的复合操作.

独占锁是一项悲观技术: 它假设最坏情况, 并且会通过获得正确的锁来避免其他线程更多打扰, 直到作出保证才能继续进行.

对于细粒度的操作, 有另外一种选择通常更加有效: 乐观的解决方法. 凭借新的方法, 我们可以指望不受打扰地完成更新, 这个方法依赖于冲突监测, 从而能判定更新过程中是否存在来自其他线程的干涉, 在冲突发生的情况下, 操作失败, 并会重试.

比较并交换(CompareAndSwap CAS)有三个操作数: 内存位置V, 旧的预期值A和新值B, 当且仅当V符合旧值A时, CAS用新值B原子化地更新V的值, 否则它什么都不会做. 在任何情况下都会返回V的真实值, CAS的意思是: 我认为V的值应该是A, 如果是, 那么将B值赋值给V, 若不是, 则不修改, 并告诉我应该为多少. CAS是一项乐观技术: 它抱着成功的希望进行更新, 并且如果另一个线程在上次检查后更新了该变量, 它能够发现错误.

当多个线程试图使用CAS同时更新相同的变量时, 其中一个会胜出, 并更新变量的值, 而其他的都会失败, 失败的线程更不会被挂起, 他们会被告知这次赛跑失利, 但是允许重试
CAS原理的一种简单实现:
public class SimulateCAS {
    private int value;

    public synchronized int get() {
        return value;
    }

    public synchronized int compareAndSwap(int expectedValue, int newValue) {
        int oldValue = value;
        if (expectedValue == oldValue) {
            value = newValue;
        }
        return oldValue;
    }

    public synchronized boolean compareAndSet(int expectedValue, int newValue) {
        return (expectedValue == compareAndSwap(expectedValue, newValue));
    }
}

CAS在阻塞计数器中的一种应用:
public class CasCounter {
    private SimulateCAS value = new SimulateCAS();

    public int getValue() {
        return value.get();
    }

    public int increment() {
        int v;
        do {
            v = value.get();
        } while (v != value.compareAndSwap(v, v + 1));
        return v + 1;
    }
}

自增计数器遵循了经典形式:取得旧值, 根据它计算出新值(加1), 并使用CAS设定新值. 如果CAS白了, 立即重试该操作.

加锁的语法可能比较简洁, 但是JVM和OS管理锁的工作却并不简单.

锁与原子化随竞争程度的不同, 性能发生的改变表明了各自的优势和劣势. 在中低程度的竞争下, 原子化提供更好的伸缩性; 在高强度的竞争下, 锁能够很好的帮助我们避免竞争. 但是在单CPU系统中, CAS算法要胜于锁的算法, 因为CAS在单CPU的系统下总是成功的, 除非线程在读-改-写操作的中途被抢占.

非阻塞算法的所有特性: 一些任务的完成具有不确定性, 并可能必须重做.

基于非阻塞的Stack实现
public class ConcurrentStack<E> {
    class Node<E> {
        E item;
        Node<E> next;
        public Node(E item) {
            this.item = item;
        }

    }

    AtomicReference<Node<E>> top = new AtomicReference<Node<E>>(); // 对栈顶的一个引用

    public void push(E item) {
        Node<E> newHead = new Node<E>(item);
        Node<E> oldHead;
        do {
            oldHead = top.get();
            newHead.next = oldHead;
        } while (!top.compareAndSet(oldHead, newHead));
    }

    public E pop() {
        Node<E> oldHead;
        Node<E> newHead;
        do {
            oldHead = top.get();
            if (oldHead == null) {
                return null;
            }
            newHead = oldHead.next;
        } while (!top.compareAndSet(oldHead, newHead));
        return oldHead.item;
    }
}


CAS的基本模式: 如果更新失败就重试

构建非阻塞算法的窍门是: 缩小原子化的范围到唯一的变量.

基于非阻塞的链表
public class LinkedQueue<E> {
    static class Node<E> {

        final E item;
        final AtomicReference<Node<E>> next;

        public Node() {
            this(null, null);
        }

        public Node(E item, Node<E> next) {
            this.item = item;
            this.next = new AtomicReference<Node<E>>(next);
        }
    }

    private final Node<E> dummy = new Node<E>();
    private final AtomicReference<Node<E>> head = new AtomicReference<Node<E>>(dummy);
    private final AtomicReference<Node<E>> tail = new AtomicReference<Node<E>>(dummy);

    public boolean put(E item) {
        Node<E> newNode = new Node<E>(item, null);
        while (true) {
            Node<E> curTailNode = tail.get();
            Node<E> tailNextNode = curTailNode.next.get();
            if (curTailNode == tail.get()) {
                if (tailNextNode == null) {
                    // 更新尾节点下一个节点
                    if (curTailNode.next.compareAndSet(null, newNode)) {
                        // 更新成功, 将尾节点指向下一个节点
                        tail.compareAndSet(curTailNode, newNode);
                        return true;
                    }
                } else {
                    // 在更新过程中, 发现尾节点的下一个节点被更新了, 将尾节点指向下一个节点
                    tail.compareAndSet(curTailNode, tailNextNode);
                }
            }
        }
    }

    public static void main(String[] args) {
        final LinkedQueue<String> queue = new LinkedQueue<String>();
        new Thread(new Runnable() {
            @Override
            public void run() {
                queue.put("item1");
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                queue.put("item2");
            }
        }).start();
    }
}

这里有一个窍门, 第一个就是即使是在多步更新中, 也要确保数据结构总能处于一致状态. 如果线程B到达时发现线程A在更新中, B可以分辨操作已部分完成, 并且知道不能立即开始自己的更新, 那么B就开始等待(反复检查重试)直到A完成更新, 这样两个线程就不会互相影响了.

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics