`

《并发编程实践》读书笔记(2)

    博客分类:
  • Book
阅读更多
前一篇

构建并发应用程序

两种实现

所有线程在单一线程中顺序执行, 会产生糟糕的响应性和吞吐量; 每任务每线程会给资源管理带来麻烦.

Excutor基于生产者-消费者模式, 提交任务的执行者是生产者, 执行任务的线程是消费者, 如果要在你的程序中实现一个生产者-消费者的设计, 使用Excutor通常是最简单的方式.

两种实现方式:
每线程每任务
public class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable command) {
        new Thread(command).start();
    }
}

一个线程所有任务:
public class WithinThreadExecutor implements Executor {
    public void execute(Runnable command) {
        command.run();
    }
}


线程池
Executors有很多静态的工厂方法:
newFixedThreadPool创建一个定长的线城市, 每当提交一个任务就创建一个线程, 直到达到池的最大长度, 这时线程池会保持长度不再变化.

newCachedThreadPool 创建一个可缓存的线程池, 如果当前线程的长度超过了处理的需要时, 它可以灵活的回收空闲的线程, 当需求增加时, 它可以灵活的增加新的线程, 而并不会对池的长度做任何限制.

newSingleThreadExecutor 创建一个单线程化的executor, 它只创建唯一的工作者线程来执行任务, 如果这个线程异常结束, 会有另一个取代它, 但是任务会保存在一个queue中等待执行.

newScheduleThreadPool 创建一个定长的线程池, 而且支持定时的以及周期性执行任务, 类似timer.

Executor生命周期
作为Executor的继承者ExecutorService有三种状态: running, shuting down, terminated.
创建之后是running.
调用shutdown, 将停止接受新的任务, 同时等待已经提交的任务完成, 包括尚未完成的任务
调用showdownNow会启动一个强制的关闭过程, 尝试取消所有运行中的任务和排在队列中尚未开始的任务.

对于关闭后提交到ExecutorService中的任务, 会被rejected execution handler处理.

所有任务完成之后, 进入terminated状态, 可以调用awaitTermination等待ExecutorService到达终止状态, 也可以轮询检查isTerminated判断是否终止. 通常shutdown会紧随awaitTermination之后, 这样可以产生同步地关闭ExecutorService的效果.

Timer只会创建一个线程来执行所有task, 如果一个task非常耗时, 会导致其他的task的实效准确性出问题. 而Executors.newScheduledThreadPool()创建的ScheduledThreadPoolExecutor则不会有这样的问题

Timer的另一个问题是, 如果TimeTask抛出未检查异常, 将导致不可预料的行为. 因为Timer并不捕获异常.在JDK5.之后几乎没有必要使用Timer了.

可以使用DelayQueue来实现自己的调度服务, 它使BlockingQueue的一种实现. 其内部包含了一个Delayed对象的容器. Delayed是一种延迟时间对象, 只有元素过期后,它才会让你执行take获取元素(这个元素实现了Delayed接口.)

在给多个工作者划分相异任务时, 各个任务的大小可能完全不同, 比如给两个工作者划分了任务a和任务b, 但是a执行花费的时间是b执行时间的10倍, 那么整个过程仅仅加速了9%而已. 最后多个工作者之间划分任务, 总会涉及到一些任务协调上的开销, 为了使得划分任务是值得的, 这一开销不能多于通过并行性带来的生产力的提高.

将大量相互独立且同类的任务进行并发的处理, 会将程序的任务量分配到不同的任务中, 这样才能真正获得性能上的提升.

CompletionService用来将Executor与BlockingQueue进行结合, 将Callable任务提交给它执行, 然后使用类似队列中的take和poll在结果完整时获得这个结果.

ExecutorCompletionService在构造函数中创建一个BlockingQueue用来保存结果, 在计算完成时会调用FutureTask的done()方法, 而用来包装任务的QueueFuture会复写done方法, 用来将结果置入BlockingQueue.
public class CompletionRenderer {
    class ImageInfo {
        public ImageData downloadImage() {
            return null;
        }
    }

    class ImageData {
    }

    private final ExecutorService executor = Executors.newCachedThreadPool();

    void renderPage(String source) {
        final List<ImageInfo> imageInfos = scanForImageInfo(source);
        CompletionService<ImageData> service = new ExecutorCompletionService<ImageData>(executor);

        for (final ImageInfo imageInfo : imageInfos) {
            service.submit(new Callable<ImageData>() {
                public ImageData call() throws Exception {
                    return imageInfo.downloadImage();
                }
            });
        }

        renderText(source);

        for (int i = 0; i < imageInfos.size(); i++) {
            Future<ImageData> f;
            try {
                f = service.take();
                ImageData imageData = f.get();
                renderImage(imageData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    private void renderImage(ImageData imageData) {
    }

    private void renderText(String source) {
    }

    private List<ImageInfo> scanForImageInfo(String source) {
        return null;
    }

}


还有另外一种方式通过invokeAll将多个任务提交到一个ExecutorService中, 并在指定的时间内执行完成, 超过时限后, 任务要么正常完成,  要么被取消. 可以通过get和isCancelled来查明属于那一种情况.

任务的取消
在多任务协作机制中, 有一种会通过设置取消标志, 任务会定期查看: 如果发现标志被设置过, 任务就会提前结束.
这里有一个例子:
public class PrimeGenerator implements Runnable {
    private final List<BigInteger> primes = new ArrayList<BigInteger>();
    private volatile boolean cancelled;
    @Override
    public void run() {
        BigInteger p = BigInteger.ONE;
        while (!cancelled) {
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }

    public void cancel() {
        this.cancelled = true;
    }

    public synchronized List<BigInteger> get() {
        return new ArrayList<BigInteger>(primes);
    }

    public static void main(String[] args) throws Exception {
        PrimeGenerator g = new PrimeGenerator();
        new Thread(g).start();
        try {
            Thread.sleep(10);
        } finally {
            g.cancel();
        }
        System.out.println(g.get());
    }
}


对中断的理解应该是:它并不会真正中断一个正在运行的线程; 它仅仅只是发送中断请求.

中断通常是实现取消最明智的选择.
中断结合BlockingQueue的例子:
public class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;

    public PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {

        BigInteger p = BigInteger.ONE;
        while (!Thread.currentThread().isInterrupted()) {
            queue.put(p.nextProbablePrime());
        }
        } catch (InterruptedException e) {

        }
    }

    public void cancel() {
        interrupt();
    }
}


如果一个方法需要处理一批任务, 并在所有任务结束前不返回, 那么它可以通过使用私有的Executor来简化服务的生命周期, 其中Executor的寿命限定在该方法中:
    boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        final AtomicBoolean hasNewMail = new AtomicBoolean(false); // 使用Atmic去掉volatile是因为内部Runnable访问hasNewMail标识, 那么必须是final的, 这样才能避免被修改
        try {
            for (final String host : hosts) {
                exec.execute(new Runnable() {
                    @Override
                    public void run() {
                        if (checkMail(host)) {
                            hasNewMail.set(true);
                        }
                    }

                });
            }
        } finally {
            exec.shutdown(); //关闭
            exec.awaitTermination(timeout, unit); //等待结束
        }
        return hasNewMail.get();
    }


Executor的shutdownNow方法强行关闭一个ExecutorService时, 它试图取消正在进行的任务, 并返回那些已经提交, 但未开始的任务清单, 这样这些任务可以通过日志存起来, 或者存起来等待进一步处理. 注意, shutdownNow返回的Runnable的对象可能并不是提交给ExecutorService的相同的对象: 它们可能是经过保障的已提交的任务的实例.

线程池
当任务是同类的, 独立的时候, 线程池才会有最佳的工作表现.

大多数平台类库中的阻塞方法, 都同时有限时的和非限时的两个版本, 比如Thread.join, BlockingQueue.put, CountDownLatch.await, Selector.select. 如果出现超等待, 你可以把任务标识为失败, 中止它或者把它重新放回队列, 准备之后执行.

如果线程池频频被阻塞的任务充满, 它同样也可能是池太小的一个信号.

对于计算密集的任务, 一个有N个处理器的系统通常需要有N+1个线程的线程池来获得最优的利用率(当计算时出现一个页错误或者其他原因而暂停, 刚好有一个额外的线程, 可以确保这种情况下CPU周期不会中断工作)

ThreadPoolExecutor
核心池大小, 最大池大小, 存活时间共同管理着线程的创建与销毁. 即使没有任务执行, 池的大小也等于核心池的大小, 并且直到工作队列充满前, 池都不会创建更多的线程. 最大池大小是可以同时活动的线程的上限, 如果一个线程已经闲置的时间超过了存活时间, 它将成为一个被收回的候选者, 如果当前的池的大小超过了核心池的大小, 线程会终止该候选者.

newFixedThreadPool工厂为请求的池设置了核心池的大小和最大池的大小, 而且它永远不会超时;
newCachedThreadPool工厂将最大池的大小设置为Integer.MAX_VALUE, 核心池的大小设置为0, 超时时间设置为1分钟, 这样创建出来的无限扩大的线程池会在需求量减少的情况下减少线程数量.

newFixedThreadPool和newSingleThreadExecutor默认使用的是一个无限的LinkedBlockingQueue, 如果所有的工作者线程都处于忙碌状态, 任务将会在队列中等候, 如果任务持续地到达, 超过了它被执行的速度, 队列会无限地增加.

对于庞大或者无限的池, 你可以使用SynchronousQueue, 完全绕开队列, 将任务直接从生产者移交到工作者线程, 因为SynchronousQueue并不是一个真正的队列, 而是一种管理直接在线程间移交信息的机制.

只有当池的大小是无限的, 或者可以接受任务被拒绝, SynchronousQueue才是一个有实际价值的选择, newCachedThreadPool工厂就是用了SynchronousQueue.

只有当任务彼此独立时, 才能使有限线程池或者有限工作队列的使用是合理的. 倘若任务之间是互相依赖, 有限的线程池就有可能引起线程饥饿死锁; 使用一个无限的池配置可以避免这类问题, 就像newCachedThreadPool所作的.

饱和策略
默认的终止(abort)策略会引起execute抛出RejectedExecutionException: 调用者可以捕获这个隐藏然后编写满足自己需求的处理代码
当最新提交的任务不能进入队列等待执行时, 遗弃(discard)策略会默认放弃这个任务.
遗弃最旧(discard-oldest)策略选择丢弃的任务是本应该接下来就应该执行的任务, 该策略还会尝试去重新提交新任务.
调用者运行(caller-runs)策略的实现方式, 既不会丢弃哪个任务, 也不会抛出任何异常. 它会把一些任务退回到调用者那里, 从此缓解新任务流. 他不会在池线程中执行最新提交的任务, 但是他会在一个调用了execute的线程中执行.
当工作队列充满后, 并没有预置的饱和策略来阻塞execute. 但是, 使用Semaphore信号量可以实现这个效果. Semaphore会限制任务注入率.
public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semahpore;

    public BoundedExecutor(Executor exec, int bound) {
        super();
        this.exec = exec;
        this.semahpore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command) throws InterruptedException {
        semahpore.acquire();
        try {
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semahpore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semahpore.release();
        }
    }
}


定制ThreadPoolExecutor
大多数通过狗展函数传给ThreadPoolExecutor的参数(核心池大小, 最大池大小, 活动时间, 拒绝处理器) 都可以在创建之后通过setter进行设置, 但是如果通过Executors的unconfigableExecutorService的工厂方法得到一个现有的ExecutorService, 并对它进行了包装, 它只暴露出ExecutoeService的方法, 因此不能进行进一步的配置. 如果你将ExecutorService暴露给你不信任的代码, 不希望它被修改, 可以用一个unconfigableExecutorService包装它.

扩展ThreadPoolExecutor
ThreadPoolExecutor提供了一个钩子方法(afterExecute, beforeExecute, terminated), 无论任务是正常地从run返回, 还似乎抛出一个异常, afterExecute都会被调用, 如果抛出一个Error则不会, 如果任务抛出一个RuntimeException, 任务不会被执行, afterExecute也不会被调用. terminated钩子会在县城池完成关闭动作后被掉哦了给, 也就是当所有任务已完成并且所有工作者线程也已经关闭后, 会执行, 这样可以用来释放Executor在生命周期中分配的一些资源, 还可以发出通知, 记录日志或者完成统计信息.
一个对执行时间进行log的例子:
public class TimingThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
    private final Logger log = Logger.getLogger("TimingThreadPool");
    private final AtomicLong numTask = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();

    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        log.fine(String.format("Thread %s: start %s", t, r));
        startTime.set(System.nanoTime());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            totalTime.addAndGet(taskTime);
            numTask.incrementAndGet();
        } finally {
            super.afterExecute(r, t);
        }
    }

    @Override
    protected void terminated() {
        try {
            log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTask.get()));
        }finally {
            super.terminated();
        }
    }
}


并行递归算法
当每个迭代彼此独立, 并且完成循环体中每个迭代的工作, 意义都足够重大, 足以弥补管理一个新任务的开销时, 这个顺序循环是适合并行化的.
将串行执行转换成并行执行:
    void processSequentially(List<Element> element) {
        for (Element e : element) {
            process(e);
        }
    }

    void processInParallel(Executor exec, List<Element> elements) {
        for (final Element e : elements) {
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    process(e);
                }
            });
        }
        exec.invokeAll(null); // 如果需要等到所有处理结束之后才能继续执行可以加上这句
    }

将顺序递归转换成并行递归:
    public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
        for (Node<T> node : nodes) {
            results.add(node.compute());
            sequentialRecursive(node.getChildren(), results);
        }
    }

    public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
        for (final Node<T> node : nodes) {
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    results.add(node.compute());
                }
            });
            parallelRecursive(exec, nodes, results);
        }
    }

    public <T> Collection<T> getParallelResult(List<Node<T>> nodes) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
        parallelRecursive(exec, nodes, resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        return resultQueue;
    }


笔记3
分享到:
评论
1 楼 guozhigang 2011-09-29  
通常shutdown会紧随awaitTermination之后,应该改成 shutdown 后面紧随 awaitTermination吧。

相关推荐

    java并发编程实践笔记

    java并发编程实践笔记java并发编程实践笔记java并发编程实践笔记java并发编程实践笔记

    java并发编程实践pdf笔记

    java并发编程实战pdf学习笔记 总结了重要的知识点

    java并发编程实践高清中文版+源码

    《JAVA并发编程实践》随着多核处理器的普及,使用并发成为构建高性能应用程序的关键。Java 5以及6在开发并发程序中取得了显著的进步,提高了Java虚拟机的性能以及并发类的可伸缩性,并加入了丰富的新并发构建块。在...

    java并发编程实践笔记资料.pdf

    java并发编程实践笔记资料.pdf

    JAVA并发编程实践

    《JAVA并发编程实践》随着多核处理器的普及,使用并发成为构建高性能应用程序的关键。Java 5以及6在开发并发程序中取得了显著的进步,提高了Java虚拟机的性能以及并发类的可伸缩性,并加入了丰富的新并发构建块。在...

    JAVA并发编程实践.pdf

    《JAVA并发编程实践》适合于具有一定Java编程经验的程序员、希望了解Java SE 5以及6在线程技术上的改进和新特性的程序员,以及Java和并发编程的爱好者。 作者简介 作者:(美)戈茨 等 本书作者系lava标准化组织...

    java并发编程实践(第一版)

    java并发编程实践java并发编程实践java并发编程实践java并发编程实践

    JAVA并发编程实践高清中文带书签

    《JAVA并发编程实践》随着多核处理器的普及,使用并发成为构建高性能应用程序的关键。Java 5以及6在开发并发程序中取得了显著的进步,提高了Java虚拟机的性能以及并发类的可伸缩性,并加入了丰富的新并发构建块。在...

    Java并发编程实践.pdf

    Java并发编程实践.pdf

    Java并发编程实践

    Java并发编程实践

    java并发编程实践

    java并发编程实践

    java 并发编程实践

    java 并发编程实践java 并发编程实践java 并发编程实践java 并发编程实践java 并发编程实践

    java并发编程实践(中文版pdf全部40M分2部分上传)2

    《JAVA并发编程实践》既能够成为读者的理论支持,又可以作为构建可靠的、可伸缩的、可维护的并发程序的技术支持。《JAVA并发编程实践》并不仅仅提供并发API的清单及其机制,还提供了设计原则、模式和思想模型,使...

    [原]Java并发编程实践-读书笔记

    《Java并发编程实践》一书的个人读书笔记。主要列举包括各个章节的关键知识点,便于反复阅读和知识复习掌握。

    JAVA并发编程实践 .pdf

    《Java并发编程实战》深入浅出地介绍了Java线程和并发,是一本完美的Java并发参考手册。书中从并发性和线程安全性的基本概念出发,介绍了如何使用类库提供的基本并发...《Java并发编程实战》适合Java程序开发人员阅读。

    Java并发编程实践-电子书

    Java并发编程实践-电子书-01章.pdf Java并发编程实践-电子书-02章.pdf Java并发编程实践-电子书-03章.pdf Java并发编程实践-电子书-04章.pdf Java并发编程实践-电子书-05章.pdf Java并发编程实践-电子书-06章.pdf ...

    Java并发编程学习笔记.rar

    Java并发编程学习笔记

Global site tag (gtag.js) - Google Analytics