近期在学习kotlin相关知识,看到协程这块的知识,觉得对java程序员来说,是一个比较新颖的概念,而且加上之前面试的时候也被问过相关概念,遂对协程进行了一番了解。

本文结合参考了大量知名博主的博文的写作思路,加上自己对协程的一些认知,编写而成,如有雷同,尽请谅解。


现在都9102年了,Google也开始强推Kotlin作为主打的Android编程语言,作为一线开发人员,此时不学一点kotlin,更待何时?

本次分享我将会介绍kotlin里面,对于java程序员而言,比较新颖的特性——协程。

  1. 什么是协程;
  2. 协程怎么写;
  3. 协程和线程的关系;
  4. 关于协程的一些误区;

什么是协程?

小伙伴们肯定知道什么是进程和线程。

什么是进程和线程?必备面试题答案:

  • 进程是一个应用程序的一个实例,拥有应用程序打开的资源,有独立的内存空间,进程之间无法相互访问资源。

  • 线程是最小的可执行单位,一个进程可能含有多个线程,但一个线程只能从属于一个进程,线程之间共享进程的资源,可以互相访问,有自己的栈空间。

  • 对操作系统来说,线程是最小的执行单元,进程是最小的资源管理单元。

    img

协程的名字听起来和进程、线程有点相似,那么他们是不是一回事呢?

协程的概念

我们思考进程和线程的诞生过程:

  • 我们为了解决同一时刻运行多个程序的需求,我们通过分配CPU时间片,有了并发的概念;
  • 我们并发时,资源需要来回切换,为了保存中间态,达到上下文切换,我们有了进程的概念;
  • 后来有了多个CPU,我们可以真正意义上同时运行多个程序,于是我们有了并行的概念;
  • 为了解决反复切换内核态和用户态,频繁切换资源带来的性能低下,我们有了线程的概念,一个地方阻塞了,CPU不需要切换到其他进程,不需要进入内核态,同一进程下的多个线程还可以继续做其他事情,减少了性能损失;
  • 为了做到一个函数或者一段程序(我们称他为A函数)他执行的过程中可以被挂起(暂停),去做其他事情(B函数),做完之后再回到A函数,(利用或不利用A函数的结果)继续执行剩下A函数逻辑,我们有了协程的概念;

关于并发和并行:并行和并发看起来概念接近,但两者有本质上的区别。正因为 CPU 时间片足够小,即便一个单核的 CPU,也可以给我们营造多任务同时运行的假象,这就是所谓的“并发”。并行才是真正的同时运行。并发的话,更像是一种障眼法。

从上面的描述来看,其实协程没那么复杂。协程是一个抽象的概念,他可以做到自行挂起恢复,做到多协程协作运行。它跟线程最大的区别在于线程一旦开始执行,从任务的角度来看,就不会被暂停,直到任务结束这个过程都是连续的,线程之间是抢占式的调度,因此也不存在协作问题。

协程的分类

因为协程的概念比较模糊,上面说的挂起和恢复具体怎么做,也没有一个标准实现,各种语言都有他们自己的理解和实现,因此协程会给人一种摸不清头脑的感觉。

相对于线程来说,线程是由操作系统实现的,主流的操作系统都有成熟的线程模型,因此编程语言基本都是照搬操作系统的线程来实现。

协程基本都是语言层面各自的实现,每个语言都有不同的使用场景,甚至像java自己本身都没有官方的协程实现,只有一些比较冷门的开源框架(比如Quasar)对协程进行实现。


先来一个无奖竞猜:以下代码中,我们分别在Activity的OnCreate中,分别用两种方式(线程的sleep和协程的delay),延迟1000ms之后,弹出一条HelloWorld的Toast。

请问他们有什么区别吗?他们都会阻塞主线程吗?如果你知道答案,你能大概讲一下他们写成java代码的时候大概是什么样子吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@ExperimentalCoroutinesApi
class MainActivity1 : AppCompatActivity(), CoroutineScope by MainScope() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
delay1()
delay2()
}
//代码一
private fun delay1() {
Thread.sleep(1000)//sleep 1000ms
Toast.makeText(this, "HelloWorld", Toast.LENGTH_LONG).show()
}
//代码二
private fun delay2() {
launch {//启动一个协程
delay(1000)//delay 1000ms
Toast.makeText(this@MainActivity1, "HelloWorld", Toast.LENGTH_LONG).show()
}
}
}

答案先不公布,最后再进行解答。


用协程改造网络请求代码

我们平时如果需要发起一个网络请求,一般是怎么做的?

如果我们用的是Retrofit,那么代码可能是下面这样的:

  1. 先定义一个Api
1
2
3
4
interface GitHubServiceApi {
@GET("users/{user}/city")
fun getUserCity(@Path("user") user: String): Call<String>
}
  1. 请求网络的时候
1
2
3
4
5
6
7
8
9
gitHubServiceApi.getUserCity("test").enqueue(object : Callback<String> {
override fun onFailure(call: Call<String>, t: Throwable) {
handler.post { showError(t) }
}

override fun onResponse(call: Call<String>, response: Response<String>) {
handler.post { response.body()?.let(::showUserCity) ?: showError(NullPointerException()) }
}
})

请求结果回来之后,如果不是在UI现场,我们需要手动切换到UI线程来展示结果。这类代码大量存在于我们的逻辑当中,它有什么问题呢?

  • 通过 Lambda 表达式,我们让线程切换变得不是那么明显,但它仍然存在,一旦开发者出现遗漏,这里就会出现问题
  • 回调嵌套了两层,看上去倒也没什么,但真实的开发环境中逻辑一定比这个复杂的多,例如登录失败的重试
  • 重复或者分散的异常处理逻辑,在请求失败时我们调用了一次 showError,在数据读取失败时我们又调用了一次,真实的开发环境中可能会有更多的重复

假如你使用过Rxjava,你可能会想要使用Rxjava解决这个问题。Rxjava本身也是一种很好的解决这类问题的方案,但是他的操作符太繁琐了,上手门槛较高。

引入一个开源库

1
implementation 'com.jakewharton.retrofit:retrofit2-kotlin-coroutines-adapter:0.9.2'

我们可以直接改造api接口

1
2
3
4
interface GitHubServiceApi {
@GET("users/{user}/city")
fun getUserCity(@Path("user") user: String): Deferred<String>
}

构造Retrofit实例时,添加:

1
2
3
4
5
6
7
8
val gitHubServiceApi by lazy {
val retrofit = retrofit2.Retrofit.Builder()
.baseUrl("https://api.github.com")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(CoroutineCallAdapterFactory())//添加对 Deferred 的支持
.build()
retrofit.create(GitHubServiceApi::class.java)
}

我们可以这么调用这个api方法

1
2
3
4
5
6
7
GlobalScope.launch(Dispatchers.Main) {
try {
showUser(gitHubServiceApi.getUserCity("test").await())
} catch (e: Exception) {
showError(e)
}
}

我们用看似同步的代码来执行了这段代码,当原有的回调返回了onFailure或者被cancel,都会抛出异常,走到showError处。

那么黑魔法发生在哪里呢?

我们通过GlobalScope.launch启动了一个协程,类似我们通过以下代码启动一个线程:

1
2
3
new Thread(new Runnable(){
//...run something
}).start();

GlobalScope是一个协程作用域的概念,涉及到协程的取消和异常传递。因为时间关系不细讲这个类。

他的launch方法签名如下:

1
2
3
4
5
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext, // 上下文,很多作用,包括写代参数,拦截协程执行等
start: CoroutineStart = CoroutineStart.DEFAULT, // 启动模式,默认是DEFAULT模式启动
block: suspend CoroutineScope.() -> Unit // 协程体,类似我们Thread的Runnable
): Job

方法参数中的上下文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

>Dispatchers.Main定义的主线程随平台不同而不同,在android中是UI线程,在 Java Swing 上为 `SwingDispatcher` 等等。

启动模式本身的概念不是很复杂,我们发现参数的默认参数是``` CoroutineStart.DEFAULT```,这代表了他会在launch调用之后立即启动,不需要像新建线程一样,需要调用start方法。

> 除了Default之外,还有以下模式,他们可以做到饿汉式和懒汉式启动协程的作用。具体不再详细介绍,可以参考bennyhuo老师的[这篇博文](https://www.bennyhuo.com/2019/04/08/coroutines-start-mode/)
>
> | 模式 | 功能 |
> | :----------- | :------------------------------------------------ |
> | DEFAULT | 立即执行协程体 |
> | ATOMIC | 立即执行协程体,但在开始运行之前无法取消 |
> | UNDISPATCHED | 立即在当前线程执行协程体,直到第一个 suspend 调用 |
> | LAZY | 只有在需要的情况下运行 |

最后是我们作为一个Lambda表达式传入的协程体,他和我们在Thread里面写的Runnable一样,是协程运行是执行的代码。

重新看一下这段代码:

```kotlin
GlobalScope.launch(Dispatchers.Main) {
try {
showUser(gitHubServiceApi.getUserCity("test").await())
} catch (e: Exception) {
showError(e)
}
}

我们仔细看代码,

1
2
3
4
5
6
7
8
9
10
11

其实```getUserCity```在执行的时候确实切换到了其他线程,但是返回结果的时候,我们通过await方法,从Deferred中拿到了结果,我们又回到了UI线程。

我们也许会联想到Java里面有一个叫Future的东西,他也能做到这种效果。看起来,线程是阻塞在这里,等待线程执行完成,返回结果再继续执行吧?但是这可是UI线程,这样不会阻塞UI线程吗?

答案是不会。这是一种**非阻塞式挂起**。关于非阻塞式挂起的一个定义,在下面进行了解释。

我们看看await方法是怎么实现的:

```kotlin
public suspend fun await(): T

他是一个suspend方法,这意味着他是一个可能会执行很长时间的方法,只能在另外一个suspend方法中调用。

看起来他是同步返回的方法,他直接就返回了结果,但是事实上这是编译器给我们的障眼法,我们反编译之后可以看到,他本质上是类似这么的一个方法:

1
kotlinx/coroutines/Deferred.await (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

方法其实并不是一个空入参的方法,而是传入了一个

1
2
3
4
5
6
7
8

```kotlin
@SinceKotlin("1.1")
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resume(value: T)
public fun resumeWithException(exception: Throwable)
}

resume和resumeWithException不就对应刚刚代码里面的继续执行和抛出异常吗?那么这段代码不就是一个java的经典的回调用法吗?只是编译器让我们忽略了这个细节,将回调变成了同步代码执行的方式运行,这样就避免了java的著名的回调地狱的问题。

现在我们再回过头来看看,Deferred.await()和Future.get()他们是一回事吗?并不是,Deferred.await()是编译器巧妙地让回调变成了同步方法,并不阻塞运行,而Future.get()是真实的阻塞的线程的运行。

我们讲了这么多其实想表达的就是,其实运行机制上,协程和回调没有太大区别。

协程好在哪里?

我们可以看到上述的代码,从一个异步回调式的代码,变成了一个类似同步的代码逻辑。异步回调式的代码会降低代码可读性,提高复杂度。在需要切换线程的场合里,这个问题还会变的异常复杂。而协程可以很优雅的处理这些问题。

协程的定义

这是维基百科对协程的定义:

Coroutines are computer program components that generalize subroutines for non-preemptive multitasking, by allowing execution to be suspended and resumed. Coroutines are well-suited for implementing familiar program components such as cooperative tasks, exceptions, event loops, iterators, infinite lists and pipes.

引用bennyhuo老师对协程的定义:

简单来说就是,协程是一种非抢占式或者说协作式的计算机程序并发调度的实现,程序可以主动挂起或者恢复执行。我们在 Java 虚拟机上所认识到的线程大多数的实现是映射到内核的线程的,也就是说线程当中的代码逻辑在线程抢到 CPU 的时间片的时候才可以执行,否则就得歇着,当然这对于我们开发者来说是透明的;而经常听到所谓的协程更轻量的意思是,协程并不会映射成内核线程或者其他这么重的资源,它的调度在用户态就可以搞定,任务之间的调度并非抢占式,而是协作式的。

一些名词解释

非阻塞式挂起:就像我们刚刚说的,await并不会阻塞当前线程,但是他是对当前协程的一个挂起,所以他是非阻塞式的挂起,我们称不阻塞线程的挂起就叫非阻塞式挂起

非抢占式/协作式:非抢占式和协作式的定义类似,是一种调度方式,让程序自己决定要运行到什么时候,出让自己的控制权,直到结束的调度方式。kotlin的协程是从逻辑上达到这种效果,代码当然还是在线程上运行,并不能完全独占一个CPU。

抢占式:一种调度方式,由CPU决定给每个线程的时间片,如果一个线程用完当前分配给它的时间片,将由操作系统决定下一次占用CPU的线程。


所以我们回过头来看之前那个delay和sleep的区别,你现在还能知道他们的区别吗?

我们可以大概模拟出来他们在java语言下的模样,其实delay就是在其他地方等待了1000ms,再通过一个隐藏的回调,调用了剩余的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//仅为模拟,解释相关概念,并不代表他们在执行时真实的情况
public class MainActivity2 extends AppCompatActivity {
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
delay1();
delay2();
}
private void delay1() {
try {
Thread.sleep(1000);
Toast.makeText(MainActivity2.this, "HelloWorld", Toast.LENGTH_LONG).show();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void delay2() {
new Handler().postDelayed(new Runnable() {
@Override
public void run() {
Toast.makeText(MainActivity2.this, "HelloWorld", Toast.LENGTH_LONG).show();
}
}, 1000);
}
}

在Android中协程怎么用?

我们解释了一大堆概念,我们可以开始上手,看看kotlin的协程到底应该怎么用,可以怎么用。

如何创建一个协程

launch并不是一个顶层函数,他需要通过一个CoroutineScope作用域来发起。我们可以通过以下几种方式创建一个协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 方法一,使用 runBlocking 顶层函数
runBlocking {
getImage(imageId)
}

// 方法二,使用 GlobalScope 单例对象
// 可以直接调用 launch 开启协程
GlobalScope.launch {
getImage(imageId)
}

// 方法三,自行通过 CoroutineContext 创建一个 CoroutineScope 对象
// 需要一个类型为 CoroutineContext 的参数
val coroutineScope = CoroutineScope(context)
coroutineScope.launch {
getImage(imageId)
}

关于runBlocking和launch的区别,看以下代码就知道了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun main() {
GlobalScope.launch {
println("1")
delay(500)
println("2")
}
println("3")

runBlocking {
println("4")
delay(1000)
println("5")
}
println("6")
}

输出的结果是

1
2
3
4
5
6
3
1
4
2
5
6

可以看到runBlocking阻塞了当前线程,直到协程体结束才会继续往下执行。一般不会直接使用runBlocking编写代码。

方法二就是我们一直在用的创建协程的方式。

方法三就是通过自定义协程作用域来创建协程。android有一个MainScope可以很方便绑定与组件的生命周期。

关于 CoroutineScopeCoroutineContext 时间有限本次不讲,同学们可以自己查找相关文档。

在Android中引入协程

kotlin的主要用户是android开发,因此我们也有针对android开发的协程配套库,在android项目中引用:

1
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.1'

这个框架里面包含了 Android 专属的 Dispatcher,我们可以通过 Dispatchers.Main 来拿到这个实例;也包含了 MainScope,用于与 Android 作用域相结合。

使用MainScope绑定协程和Activity生命周期

我们可以这么写一个Activity应用MainScope:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
//CoroutineScope by MainScope()的作用类似于下面这一句。
// val mainScope = MainScope()

override fun onCreate(savedInstanceState: Bundle?) {
//..doSomething
tv_main.text = async(Dispatchers.IO){
//get Data
delay(1000)
"Hello World1"
}.await()

launch {
val text = withContext(Dispatchers.IO) {
delay(1000)
"Hello World2"
}
tv_main.text = text
}
}

override fun onDestroy() {
super.onDestroy()
//取消MainScope作用域里面的所有正在执行的协程
cancel()
}
}

我们通过接口委托,使得MainActivity有了CoroutineScope的相关方法。此时,他的launch方法默认运行在主线程,同时,我们能够结合生命周期,在Activity的onDestroy时,主动取消正在执行的协程。

谨慎使用GlobalScope

正如同他的名字一样,GlobalScope是一个顶级全局作用域,他的生命周期不被android的组件所控制。如果使用GlobalScope启动协程,那么MainScope就没有起到应有的作用。

使用协程发起多个请求并组合完成

假如我们需要实现个人中心页,用户的昵称和头像是不同接口返回的,我们需要将两个接口的返回组合,一起展示在页面上。(当然实际上图片我们一般是占位图先显示,这里仅仅是模拟一种场景)

如果没有协程,我们可能需要有两个线程同时发起请求,然后用一个同步锁来并同步两者的节奏,让他们能够最后一起展现在用户面前。

如果我们对自己的异步代码写的不够自信,还可能写出先执行一个请求,返回后再执行另外一个请求的辣鸡代码。

如果有了协程,我们可以这么实现:

1
2
3
4
5
6
7
8
//简单的实现,实际情况还要考虑失败,取消等情况
coroutineScope.launch(Dispatchers.Main) { // 开始协程:主线程
val avatar = async{api.getAvatar()} // 网络请求:IO 线程
val user = async{api.getUser()} // 网络请求:IO 线程
val merged = suspendingMerge(avatar,user) // 组合结果,自定义方法
//setVisibility
show() // 更新 UI:主线程
}

使用withContext方便的切换线程

我们可以依靠withContext,方便的把线程切换到其他地方,执行耗时操作,再返回当前线程继续执行代码。详见以下例子:

1
2
3
4
5
6
7
launch(Dispachers.Main) {              // 在 UI 线程开始
val image = getImage(imageId)
avatarIv.setImageBitmap(image) // 执行结束后,自动切换回 UI 线程
}
fun getImage(imageId: Int) = withContext(Dispatchers.IO) {
...
}

实现生产者消费者模型

思考题:我们先思索一下,一个最经典的生产者消费者模式是怎样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* 生产者和消费者,wait()和notify()的实现
*/
public class Test1 {

private static int count = 0;
private static final int FULL = 3;
private static final String LOCK = "lock";

public static void main(String[] args) {
TestConsumer1 test1 = new TestConsumer1();
new Thread(test1.new Producer()).start();
new Thread(test1.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
synchronized (LOCK) {
while (count == FULL) {
System.out.println(Thread.currentThread().getName() + "生产者队列满了,等待消费" + count);
try {
LOCK.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
count++;
System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count);
LOCK.notifyAll();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (LOCK) {
while (count == 0) {
System.out.println(Thread.currentThread().getName() + "消费者者队列空了,等待生产");
try {
LOCK.wait();
} catch (Exception ignored) {
}
}
count--;
System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count);
LOCK.notifyAll();
}
}
}
}
}

他有什么性能问题?

  • 通过同步锁实现了线程同步
  • 每个线程可能会在wait和可运行状态之间来回切换
  • 涉及到线程的上下文切换

如果同样一段代码,使用kotlin的协程来写,是怎样的写法呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel

/**
* @author zhangshuyue
**/
fun main() {
runBlocking {
val channel = Channel<Int>(10)
//val channel = Channel<Int>(Channel.UNLIMITED)//可以指定容量

val producer = GlobalScope.launch {
var i = 0
while (true){
println("before send $i")
channel.send(i++)
println("after send $i")
delay(1000)
}
}
val consumer = GlobalScope.launch {
while(true){
delay(2000) //receive 之前延迟 2s
val element = channel.receive()
println(element)
}
}
producer.join()
consumer.join()
}
}

关于协程的一些误区

  1. 协程效率很高,可以取代线程

协程的效率来自他不需要进入内核态,不需要刷新CPU高速缓存,在用户态就可以切换执行内容。

编程语言级别实现的协程就是程序内部的逻辑,不会涉及操作系统的资源之间的切换,操作系统的内核线程自然会重一些,且不说每创建一个线程就会开辟的栈带来的内存开销,线程在上下文切换的时候需要 CPU 把高速缓存清掉并从内存中替换下一个线程的内存数据,并且处理上一个内存的中断点保存就是一个开销很大的事儿。

简单的说,kotlin上的协程是通过线程池封装实现,从这个角度来说,如果是需要来回切换协程的任务,我们使用协程来优化,能够达到很好的效率。但是不要抱有不切实际的幻想,认为协程可以优化一切代码,比线程高一等。

  1. 协程一定是单线程的

协程不一定都是单线程的,一般来说,协程可以运行在单线程,也可以运行在多线程,这取决于他的实现。kotlin可以跑在java虚拟机上,也可以跑在JavaScript和native上,这些情况不能一概而论。

  1. 协程是对线程池的一个封装

在kotlin上,这句话从实现上可以这么理解,协程确实是在线程池的基础上开发出来的。但协程和线程池概念上有区别,不能完全等同,他们是两个不同的角色。

  1. 协程的挂起是非阻塞式的,线程是阻塞式的

所谓的非阻塞式指的是不阻塞当前线程,从一开始介绍的例子来看,协程的挂起是通过切换线程来达成的,他线程都切走了,当然不会卡当前线程,所以他是非阻塞式的。所谓的线程是阻塞式的,指的是单线程中的等待,当然是阻塞式的,如果通过切线程,把阻塞任务抛到异步线程中去,那么当前线程当然也是非阻塞式的。

  1. 协程不需要加锁

这是有问题的,协程在kotlin上的实现就是线程池,有可能出现多线程执行的,既然是多线程,那就有线程同步问题。

参阅资料

[1] bennyhuo老师的博客 https://www.bennyhuo.com/

[2] 扔物线老师的教程 https://kaixue.io/kotlin-coroutines-1/