狠狠撸

狠狠撸Share a Scribd company logo
蔡剑峰
大纲
? 什么是并发
? 并发编程的好处
? Java中的并发
? 并发编程的常见问题
并发的常见场景
? 12306铁路客户服务中心—— 购票系统
? Nginx Web服务器
? QQ即时聊天通讯
? 超市的收银台
并发的定义
? 在操作系统中,是指一个时间段中有几个程序都处于
已启动运行到运行完毕之间,且这几个程序都是在同
一个处理机上运行,但任一个时刻点上只有一个程序
在处理机上运行。
? 在过去30多年时间里,计算性能一直遵循着摩尔定律,
但从现在开始,它将遵循Amdahl 定律能。编写能高
效利用多处理器的代码非常具有挑战性。
——Doron Rajwan, Intel 公司科学家
Amdahl’s law(阿姆达尔定律)
N: Number of processors
F: Serial fraction
Speedup ≤
1
? +
1 ??
?
闯补惫补并发编程实践
并发的好处
? 发挥多处理器的强大性能,提升资源利用率以及系统
的吞吐率。
? 提供更好的GUI交互体验(搜狐视频可以边下边播。)
如何并发
? 单核,由于有I/O 等待,CPU较空闲,通过多进程/多
线程让CPU忙起来,提升处理速度。
? 单核,通过多路复用,利用I/O 等待的时间进行运算,
让CPU干活,提升处理速度。
? 多核,使用多路复用只能跑满一个核心,必须使用多
进程/多线程才能充分利用多核。
? 利用Map/Reduce 模式将任务分发到许多网络节点上,
同时并行计算,通过对每个节点的返回值进行归并计
算得到最终结果。
多进程
? linux 下进程开销较windows 要小很多(示例:nignx)
? 进程间数据隔离严密,较清爽、稳定
? 缺点:进程是内核对象,进程间切换比较耗资源
多线程
? 线程是比进程更轻量级的调度执行单位
? Sun官方的SDK,Java的每个线程映射一个轻量级进程
(Light Weight Process,LWP)
Java中的线程
实现线程的两种方式
? 实现 Runnable接口
? 继承罢丑谤别补诲
FutureTask
? 用于要异步获取结果或取消执行任务的场景。
? public FutureTask(Callable<V> callable)
? public boolean cancel(boolean mayInterruptIfRunning)
? public V get() throws InterruptedException,
ExecutionException
线程池ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize和最大池大小
当新任务在方法 execute(java.lang.Runnable) 中提交时,
如果运行的线程少于 corePoolSize,则创建新线程来处理
请求,即使其他辅助线程是空闲的。如果运行的线程多于
corePoolSize 而少于 maximumPoolSize,则仅当队列满时
才创建新线程。如果设置的 corePoolSize 和
maximumPoolSize 相同,则创建了固定大小的线程池。
如果将 maximumPoolSize 设置为基本的无界值(如
Integer.MAX_VALUE),则允许池适应任意数量的并发任
务。
按需构造:默认情况下,即使核心线程最初只是在新任务
到达时才创建和启动的
keepAliveTime
? 如果池中当前有多于 corePoolSize 的线程,则这些多
出的线程在空闲时间超过 keepAliveTime 时将会终止
排队BlockingQueue
所有 BlockingQueue 都可用于传输和保持提交的任务。
可以使用此队列与池大小进行交互:
? 如果运行的线程少于 corePoolSize,则 Executor 始终
首选添加新的线程,而不进行排队。
? 如果运行的线程等于或多于 corePoolSize,则
Executor 始终首选将请求加入队列,而不添加新的线
程。
? 如果无法将请求加入队列,则创建新的线程,除非创
建此线程超出 maximumPoolSize,在这种情况下,任
务将被拒绝。
RejectedExecutionHandler拒绝任务
? CallerRunsPolicy 当线程池中的线程数 >= 最大线程数后,则交由
调用者的线程来执行此Runable任务。
? AbortPolicy (默认)当线程池中的线程数 >= 最大线程数后,直接抛
出RejectExectuionExceiption.
? DiscardPolicy 当线程池中的线程数 >= 最大线程数后,不做任何动作。
? DiscardOldestPolicy 当线程池中的线程数 >= 最大线程数后,抛弃要
队列中最早的Runnable 任务。
参见: ThreadPoolExecutor 源码
Executors
1. newFixedThreadPool(int nThreads)
创建固定大小的线程池,启动corePoolSize 数量的线程后就一直
运行,不会由于keepAliveTime 到达而退出。
2. newSingleThreadExecutor()
创建大小固定为1的线程池,其它task都在LinkedBlockingQueue中.
3. newCachedThreadPool()
创建corePoolSize 为0,最大线程数为整型的最大值,线程
keepAliveTime 为1分钟,缓存队列为SynchronousQueue 的线程池。
达到最大线程数后抛出RejectExecutionException.
4. newScheduledThreadPool(int corePoolSize)
创建corePoolSize 为传入参数,最大线程数为整形的最
大值,线程keepAliveTime 为0,缓存任务的队列为
DelyedWorkQueue的线程池。
VS Timer?
Timer 只能单线程,一量失败影响到其它任务。
Timer 中task 抛出runtime 异常,所有task 不再执行。
SchuledThreadPoolExecutor 可执行callbable 的task ,从
而获得返回值。
Thread的调度
? interrupt() 中断线程。
? join() 等待该线程终止。
? sleep(long millis) 在指定的毫秒数内让当前正在执行
的线程休眠(暂停执行)
? yield() 暂停当前正在执行的线程对象,并执行其他线
程。
? Object.wait()在其他线程调用此对象的 notify() 方法
或 notifyAll() 方法前,导致当前线程等待。
? Object.notifyAll()唤醒在此对象监视器上等待的所有
线程。
闯补惫补并发编程实践
多线程示例
? ID生成器
多线程面临的风险
? 线程安全 —— 多线程环境下,数据没有同步
? 死锁 —— 循环依赖
? 性能问题—— 频繁切换线程上下文事来的开销,锁同
步机制的等待。
Java的内存模型
1. 所有的变量都存储在主内存
2. 每条线程还有自己的工作内
存,线程的工作内存中保存了该
线程使用到的变量的主内存副
本拷贝
3.线程对变量的所有操作(读
取、赋值)等,都必须在工作
内存中进行,而不能直接读写
主存中的变量。
4. 不同线程间无法直接访问对
方工作内存中的变量,线程间
变量的传值都需要通过主内存
来完成。
内存间的交互操作
? lock(锁定):作用于主内存,把一个变量标识为一条
线程独占状态。
? unlock(解锁):作用于主内存,把一个处于锁定状态
的变量释放出来,释放后才能被其它线程锁定。
? read(读取):作用于主内存,把一个变量从主内存传
输到线程的工作内存中,以便随后的load动作。
? load(载入):作用于工作内存,它把read 操作中得到
的变量值放入工作内存的变量副本。
? use(使用):作用于工作内存,把工作内存中的变量传
递给执行引擎 ,虚拟机执行需要变量值的字节码指令
时执行这个操作。
? assign(赋值):作用于工作内存,把一个从执行引擎收
到的值 赋值给工作内存的变量,虚拟机执行一个给变
量赋值的字节码指令时执行这个操作。
? store(存储):作用于工作内存,把一个工作内存的变
量传送到主存中,以便随后的write使用。
? write(写入):作用于主存,把store 操作从工作内存中
得到的变量放入主内存的变量中。
内存交互的规则
? 不允许read和load、store和write 操作之一单独出现。即不
允许一个变量从主内存读取了但工作内存不接受,或者从
工作内存发起回写了但主内存不接受的情况出现。
? 不允许一个线程丢弃了它的最近的assign操作,即变量在
工作内存中改变了之后必须把该变化同步回主存。
? 不允许一个线程无原因地(没有发生过任何assign操作),
把数据从线程的工作内存同步回主内存中。
? 一个新的变量只能在主内存中“诞生”,不允许在工作内
存中直接使用一个未被初始化(load或assign)的变量,
换句话说就是对一个变量实施use和store操作之前,必须
先执行过了assign 和 load 操作。
? 一个变量在同一个时刻只允许一条线程对其进行lock
操作,但lock操作可以被同一条线程重复执行多次,
多次执行lock后,只有执行相同次数的unlock 操作,
变量才会被解锁。
? 如果对一个变量执行lock操作,将会清空工作内存中
此变量的值,在执行引擎使用这个变量前,需要重新
执行load 或 assign 操作初始化变量的值。
? 如果一个事先没有被 lock 操作锁定,则不允许对它执
行unlock操作;也不允许去unlock 一个被其它线程锁
定住的变量。
? 对一个变量执行unlock操作之前,必须先前此变量同
步回主内存中(执行store和write操作)。
并发要解决的问题
? 共享数据
? 同步
? 性能
保护共享数据
? 锁机制
? 可见性
? 原子性
锁
? Synchronized
? ReentrantLock
? ReadWriteLock
Synchronized
? synchronized(lock){
//code acess shared state
? }
? 每个java对象都可以用做一个实现同步的锁,这些锁称为内置锁
(Intrinsic Lock)或 监视器锁(Monitor Lock).
? 线程在进入同步代码块之前会自动获得锁,并且在退出同步代
码块的时候自动释放锁。
? 同步块相当于一个互斥锁,最多只有一个线程能持有这种锁。
? 同步锁是可重入的,一个线程可以获取已持有的锁。
synchronized 可见性
? 同步块的可见性是由
“对一个变量执行
unlock操作之前,必
须把此变量同步回主
内存中(执行store和
write操作)”这条规
则获取的。
Object对象上的锁
? wait()
? notify()/notifyAll()
? 示例:生产者/消费者 模型,典型应用消息队列。
ReentrantLock
? Lock 对象必须被显式地创建、锁定 和释放 。
? 一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和
语句所访问的隐式监视器锁相同的一些基本行为和语义,但功
能更强大。
? void lock() 获取锁。 如果锁不可用,出于线程调度目的,将禁
用当前线程,并且在获得锁之前,该线程将一直处于休眠状态。
? boolean tryLock() 仅在调用时锁为空闲状态才获取该锁。 如果
锁可用,则获取锁,并立即返回值 true。如果锁不可用,则此
方法将立即返回值 false。
? void unlock() 释放锁。
ReentrantReadWriteLock
? ReadWriteLock 维护了一对相关的锁,一个用于只读
操作,另一个用于写入操作。只要没有 writer,读取
锁可以由多个 reader 线程同时保持。写入锁是独占的。
? 与互斥锁相比,读-写锁允许对共享数据进行更高级别
的并发访问。
? readLock()返回用于读取操作的锁。
? writeLock() 返回用于写入操作的锁。
volatile修饰符
当一个字段被多个线程同时访问,至少 其中一个访问是
写操作时,除了用锁来停止同时访问,也可以用volatile.
当一个变量被定义成volatile后,具备两种特性:
1. 保证此变量对所有线程的可见性。当一条线程修改
了变量的值,这个新值对其他线程来说是立即可得
知的。
2. 禁止指令重排序优化。
指令重排序
Program Order: Execution Order:(maybe)
int w = 10;
int x = 20;
int y = 30;
int z = 40;
int a = w + z;
int b = x + y;
int x = 20;
int y = 30;
int b = x * y;
int w = 10;
int z = 40;
int a = w + z;
volatile可见性 = 线程安全?
? volatile 每次使用变量前,都必须从主内存刷新最新值。可以认
为use 操作 必须与 load和read 一起出现。
? volatile 每次修改变量后,都必须立刻同步回主内存中。 可以认
为assign 操作 必须与 store 和write 一起出现。
? volatile 方式 的ID生成器。
? 但在执行 use 与 assign 指令之间,变量有可能被其它线程修改。
? 因此必须保证对变量的操作是原子操作,如赋值才能保证线程
安全。
volatile应用场景:
1:简化实现或者同步策略验证的时候来使用它;
2:确保引用对象的可见性;
3:标示重要的生命周期的事件,例如:开始或者关闭。
脆弱的volatile的使用条件:
1:写入变量不依赖变量的当前值,或者能够保证只有单一的
线程修改变量的值;
2:变量不需要和其他变量共同参与不变约束;
3:访问变量时不需要其他原因需要加锁。
public class Singleton {
private volatile static Singleton singleton;
private Singleton() {}
public static Singleton getSingleton() {
if (singleton == null) {
synchronized (Singleton.class) {
if (singleton == null) {
singleton = new Singleton();
}
}
}
return singleton;
}
}
AtomicInteger原子类
? 非阻塞的同步,乐观并发
? 比较并交换 Compare and Swap (CAS),CPU指令
? sun.misc.Unsafe
? public final boolean compareAndSet(int expect,
int update) {
? return unsafe.compareAndSwapInt(this,
valueOffset, expect, update);
? }
public final int getAndIncrement() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}
线程安全的数据
? 不共享的局部变量(没有全局状态,可重入的代码)
? final 类型的数据
? ThreadLocal 中的数据
Signals 信号量(同步)
? Semaphore
? CountDownLatch
? CyclicBarrier
Semaphore
? 用于控制某种资源 同时被 访问的个数
? public Semaphore(int permits)
? public void acquire() throws InterruptedException
? public void release()
? 适用场景 :数据库连接池
CountDownLatch
采用计数方式,等待计数减至零,位于
countDownLatch.await后的代码才会被执行。
? public CountDownLatch(int count)
? public void countDown()
? public void await() throws InterruptedException
? 示例:等待所有线程完成,统计时间
CyclicBarrier
与CountDownLatch 不同,CyclicBarrier 是当
await 的数量达到了设定的数量后,才继续往下执行。
? public CyclicBarrier(int parties)
? public int await() throws InterruptedException,
BrokenBarrierException
线程引入的开销(性能)
? 上下文切换
? 内存同步
? 阻塞
多线程的性能优化
? 缩小锁的范围
? 减小锁的粒度
? 缩小锁的存在时间
? 锁分段
? 替代独占锁(如:原子变量类)
并发容器类
? ConcurrentHashMap
? CopyOnWriteArrayList
? CopyOnWriteArraySet
? ArrayBlockingQueue
Executors
?Executor
?ExecutorService
?ScheduledExecutorService
?Callable
?Future
?ScheduledFuture
?Delayed
?CompletionService
?ThreadPoolExecutor
?ScheduledThreadPoolExecutor
?AbstractExecutorService
?Executors
?FutureTask
?ExecutorCompletionService
Queues
?BlockingQueue
?ConcurrentLinkedQueue
?LinkedBlockingQueue
?ArrayBlockingQueue
?SynchronousQueue
?PriorityBlockingQueue
?DelayQueue
Concurrent Collections
● ConcurrentMap
● ConcurrentHashMap
● CopyOnWriteArray{List,Set}
Synchronizers
● CountDownLatch
● Semaphore
● Exchanger
● CyclicBarrier
Timing
● TimeUnit
Locks
● Lock
● Condition
● ReadWriteLock
● AbstractQueuedSynchronizer
● LockSupport
● ReentrantLock
● ReentrantReadWriteLock
Atomics
● Atomic[Type], Atomic[Type]Array
● Atomic[Type]FieldUpdater
● Atomic{Markable,Stampable}Reference
线程监控:箩肠辞苍蝉辞濒别
Thread Dump Analyzer
自已动手实践
? 消息队列
? Web服务器
? Socket 聊天(IM)
其它内容
? JDK 7 中的 Fork/Join 模式
? 函数式编程:Erlang,Cloure
? 基于Map/Reduce 的并行计算
? 并发的架构设计(无共享架构(Share Nothing
Architecture、 CookieSession、分布式的锁、缓存、
可伸缩、服务过载保护)
延伸阅读
? 《Java Concurrentcy in Practice》
? 《深入理解Java虚拟机》
谢谢!

More Related Content

闯补惫补并发编程实践

Editor's Notes

  • #6: 目前CPU主频受限于材料、散热、工艺多因素的影响,达到了某种瓶颈,多核心才是真正的发展趋势。 没有出现10Ghz的CPU
  • #7: 参考:丑迟迟辫://别苍.飞颈办颈辫别诲颈补.辞谤驳/飞颈办颈/础尘诲补丑濒&#虫27;蝉冲濒补飞
  • #9: 1.(随着硬件发展,多核心CPU成为标配。现有公司服务器基本上都是16核心的,并发程序能够提升资源利用率,不让其它CPU打酱油。) (例如:当某个程序等待输入或输出时,在等待的同时可以运行另外一个程序。常见的生活场景:你可以一边煮饭,另外一边炒菜。) 2.服务器的TPS(Transaction Per Second) 每秒事务处理数量是衡量一个服务性能的重要指标,它代表着一秒内服务端平均能响应的请求总数。
  • #10: “多路复用”也可被称为“复用”。多路复用通常表示在一个信道上传输多路信号或数据流的过程和技术。 http://zh.wikipedia.org/wiki/%E5%A4%9A%E8%B7%AF%E5%A4%8D%E7%94%A8 多路复用示例:Socket selector 模型。
  • #26: 参考: Java Memory Model http://www.javaol.net/2010/10/java-memory-model/ Java 理论与实践: 修复 Java 内存模型,第 1 部分 http://www.ibm.com/developerworks/cn/java/j-jtp02244/
  • #27: 把主内存的变量传递给工作内存之所以需要read 和 load 的两个步骤,是因为可能需要经过cpu 缓存的中转。 (参考:汇编MOV 指令:两个操作数不能都是存储器。 http://blog.lanyue.com/view/87/1885071.htm)
  • #59: 参考: Java 多线程与并发编程专题 http://www.ibm.com/developerworks/cn/java/j-concurrent/
  • #60: 单线程一定低效吗?Redis 单线程? 避免同步?Java NIO,异步调用 ?