Java19 虚拟线程原理介绍及实现

  • Post author:
  • Post category:java


Java19发布,带来了 Java 开发者期待已久的新特性——虚拟线程。在 Java 有这个新特性之前,Golang 的 协程已经流行了很长时间,在并发编程领域大获成功。随着 Golang 的快速发展和推广,协程似乎已经成为世界上最好的语言的必备特性之一。

Java19 虚拟线程可以填补这一空白。在这篇文章中,我们将带你通过对虚拟线程的介绍以及与 Golang  协程的对比,带你领略 Java19 虚拟线程的风采。

一、Java 19线程模型引入虚拟线程概念

java线程与虚拟线程

我们常见的Java线程与系统内核线程是一一对应的,系统内核线程调度器负责调度Java线程。为了提高应用程序的性能,我们会创建越来越多的Java线程,显然系统在调度Java线程时会消耗大量资源,来处理线程上下文切换。

近几十年来,我们一直依靠上述多线程模型来解决 Java 中的并发编程问题。为了提高系统的吞吐量,我们必须不断增加线程的数量,但是机器的线程很昂贵,可用线程的数量是有限的。尽管我们使用各种线程池来最大限度地提高线程的成本效益,但在 CPU、网络或内存资源被耗尽之前,线程往往成为我们应用程序性能的瓶颈,无法释放硬件应具有的最大性能。

为了解决这个问题,Java19 引入了虚拟线程。在 Java19 中,我们以前使用的线程称为平台线程,仍然与系统内核线程一一对应。大量 (M个) 的虚拟线程,运行在少量 (N个) 的平台线程上(与 OS 线程一一对应)(M:N 调度)。JVM调度多个虚拟线程在特定平台线程上执行,并且在平台线程上一次只执行一个虚拟线程。

如何创建 Java 虚拟线程?

新的线程相关 API



Thread.ofVirtual()





Thread.ofPlatform()


分别用于创建虚拟和平台线程的新 API。


1



2



3



4



5



6



7



8




//output thread ID including virtual threads and system threads Thread.getId() deprecated from jdk19





Runnable runnable



= () ->



System



.




out




.




println




(



Thread



.




currentThread




().




threadId




());






// Create virtual threads





Thread thread



=



Thread



.




ofVirtual




().




name




(




“testVT”




).




unstarted




(



runnable



);




testVT



.




start




();






// Create virtual platform threads





Thread testPT



=



Thread



.




ofPlatform




().




name




(




“testPT”




).




unstarted




(



runnable



);




testPT



.




start




();

使用


Thread.startVirtualThread(Runnable)


快速创建虚拟线程并启动它。


1



2



3




// Output thread IDs including virtual threads and system threads





Runnable runnable



= () ->



System



.




out




.




println




(



Thread



.




currentThread




().




threadId




());




Thread thread



=



Thread



.




startVirtualThread




(



runnable



);

使用


Thread.isVirtual()方法


定义一个线程是否为虚拟线程的。


1



2


Runnable runnable



= () ->



System



.




out




.




println




(



Thread



.




currentThread




().




isVirtual




());




Thread thread



=



Thread



.




startVirtualThread




(



runnable



);

使用


Thread.join()


等待虚拟线程完成作业调度,使用


Thread.sleep()讲


虚拟线程致为休眠状态。


1



2



3



4


Runnable runnable



= () ->



System



.




out




.




println




(



Thread



.




sleep




(



10



));




Thread thread



=



Thread



.




startVirtualThread




(



runnable



);






// Wait for the virtual thread to finish





thread



.




join




();

使用


Executors.newVirtualThreadPerTaskExecutor()


。创建


ExecutorService 。ExecutorService 可以


为每个任务创建一个新的虚拟线程。


1



2



3




try





(



var executor



=



Executors



.




newVirtualThreadPerTaskExecutor




()) {





executor



.




submit




(() ->



System



.




out




.




println




(




“hello”




));





}

也可以使用线程池和 ExecutorService ,完成现有代码的替换和迁移。




因为虚拟线程是


Java19


中的一个预览特性,所以本文出现的代码用到的运行命令如下







  • 使用 编译程序




    javac –release 19 –enable-preview Main.java




    并使用




    java –enable-preview Main




    .



  • 或者使用




    java –source 19 –enable-preview Main.java




    .

二、Java虚拟线程 VS Go 协程

实现原理对比

GO GMP模型

Go 语言采用两级线程模型,其中 Goroutine 是 M:N 与系统内核线程,符合 Java 虚拟线程。 最终的 goroutine 仍然交给 OS 线程执行,但需要一个中介来提供上下文。 这是 G-M-P 模型。

  • G:goroutine,类似于进程控制块,持有栈、状态、id、函数等。G只有绑定到P才能被调度。
  • M:机器,系统线程,绑定到一个有效的 P 然后被调度。
  • P:逻辑处理器,持有各种队列G。对于G来说,P是cpu核心。对于 M,P 是上下文。
  • sched:调度器,保存GRQ(全局运行队列)、M空闲队列、P空闲队列、锁等信息。


排队

Go 调度程序有两个不同的运行队列。

  • GRQ,全局运行队列,尚未分配给 G for P(在 Go 1.1 之前只有 GRO 全局运行队列,但由于全局队列锁定的性能问题,添加了 LRQ 以减少锁定等待)。
  • LRQ,本地运行队列,每个P都有一个LRQ,负责管理分配给P的G。当LRQ中没有G要执行时,从GRQ中取出。


交接机制

当G执行阻塞操作时,GMP调度空闲M执行阻塞M LRQ中的其他G,以防止阻塞M影响LRQ中其他G的执行。

  • G1 在 M1 上运行,P 的 LRQ 有 3 个其他 G。
  • G1 进行同步调用,阻塞 M。
  • 当 M1 下只有 G1 运行且没有 P 时,Scheduler 将 M1 与 P 分离。
  • 将P绑定到空闲的M2,M2从LRQ中选择其他G运行。
  • G1 结束阻塞操作并返回到 LRQ。M1 被放置在空闲队列中进行备份。


偷工减料机制

GMP 为了最大化硬件的性能,任务窃取机制用于在 M 空闲时执行其他等待的 G。

  • 有两个P,P1和P2。
  • 如果 P1 的 Gs 都被执行,并且 LRQ 为空,则 P1 开始任务窃取。
  • 在第一种情况下,P1 从 GRQ 获得 G。
  • 在第二种情况下,P1 没有从 GRQ 获得 G,然后 P1 从 P2 LRQ 窃取 G。

切换机制是为了防止 M 阻塞,任务窃取是为了防止 M 空闲。

Java 虚拟线程调度模型

JDK 依赖于操作系统中的线程调度器来调度基于操作系统线程实现的平台线程。对于虚拟线程,JDK 有自己的调度程序。JDK的调度器不是直接将虚拟线程分配给系统线程,而是将虚拟线程分配给平台线程(这就是前面提到的虚拟线程的M:N调度)。平台线程由操作系统的线程调度系统调度。

JDK 的虚拟线程调度器是一个类似


ForkJoinPool


FIFO 模式的线程池。调度程序中的并行量取决于调度程序虚拟线程中的平台线程数。默认值是可用的 CPU 内核数,但可以使用系统属性进行调整


jdk.virtualThreadScheduler.parallelism


。注意


ForkJoinPool


这里与 不同


ForkJoinPool.commonPool()


,后者用于实现并行流,以 LIFO 模式运行。



ForkJoinPool





ExecutorService


以不同的方式工作。


ExecutorService


有一个等待队列来存储其任务,其中的线程将接收并处理这些任务。虽然


ForkJoinPool


每个线程都有一个等待队列,但当一个线程运行的任务生成另一个任务时,该任务会添加到该线程的等待队列中,当我们运行


Parallel Stream


并将一个大任务分为两个较小的任务时会发生这种情况。

为了防止

线程饥饿

问题,当一个线程的等待队列中没有更多任务时,


ForkJoinPool


还实现了另一种称为

任务窃取

的模式,这意味着一个饥饿的线程可以从另一个线程的等待队列中窃取一些任务。这类似于 Go GMP 模型中的工作窃取机制。


虚拟线程执行

通常,当虚拟线程在 JDK 中执行 I/O 或其他阻塞操作时,会从平台线程中卸载虚拟线程,例如


BlockingQueue.take()


. 当阻塞操作准备好完成时(例如,网络 IO 已接收到字节数据),调度程序将虚拟线程挂载到平台线程上以恢复执行。

JDK 中的大多数阻塞操作从平台线程中卸载虚拟线程,从而允许平台线程执行其他工作任务。但是,JDK 中的一些阻塞操作不会卸载虚拟线程,因此会阻塞平台线程。这是因为操作系统级别(例如,许多文件系统操作)或 JDK 级别(例如


Object.wait()


)的限制。当这些阻塞操作阻塞平台线程时,它们会通过临时增加平台线程的数量来补偿其他平台线程阻塞的损失。因此,调度程序中的平台线程数


ForkJoinPool


可能会暂时超过 CPU 可用的内核数。可以使用系统属性调整调度程序可用的最大平台线程数


jdk.virtualThreadScheduler.maxPoolSize


. 这种阻塞补偿机制类似于 Go GMP 模型中的切换机制。

在以下两种情况下,虚拟线程固定在运行它的平台线程上,并且在阻塞操作期间无法卸载。




  1. synchronized


    块或方法中执行代码时。
  2. 执行


    native


    方法或

    外部函数

    时。

虚拟线程是固定的,并不影响程序运行的正确性,但可能会影响系统的并发和吞吐量。如果虚拟线程执行诸如 I/O 之类的阻塞操作,或者


BlockingQueue.take()


在它被固定时,负责运行它的平台线程将在操作期间被阻塞。(如果虚拟线程不固定,在执行I/O等阻塞操作时,会从平台线程中卸载掉)。


如何卸载虚拟线程

我们通过 Stream 创建了 5 个未启动的虚拟线程,它们的任务是打印当前线程,然后休眠 10 毫秒,然后再次打印线程。然后启动这些虚拟线程并调用


jion()


以确保控制台可以看到所有内容。


1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18



19



20



21



22



23



24



25



26



27



28




public static





void




main




(



String



[]



args



)





throws




Exception



{





var threads



=



IntStream



.




range




(



0



,



5



).




mapToObj




(



index



->



Thread



.




ofVirtual




().




unstarted




(() -> {





System



.




out




.




println




(



Thread



.




currentThread




());






try





{





Thread



.




sleep




(



10



);





}





catch





(



InterruptedException e



) {







throw new




RuntimeException



(



e



);





}




System



.




out




.




println




(



Thread



.




currentThread




());





})).




toList




();


threads



.




forEach




(



Thread



::



start



);






for





(



Thread thread



:



threads



) {





thread



.




join




();





}





}






//output





src



[



main



] ~/



Downloads



/



jdk







19



.




jdk




/



Contents



/



Home



/



bin



/



java







enable







preview main7

VirtualThread



[



#23



]/



runnable




@ForkJoinPool









1







worker







3

VirtualThread



[



#22



]/



runnable




@ForkJoinPool









1







worker







2

VirtualThread



[



#21



]/



runnable




@ForkJoinPool









1







worker







1

VirtualThread



[



#25



]/



runnable




@ForkJoinPool









1







worker







5

VirtualThread



[



#24



]/



runnable




@ForkJoinPool









1







worker







4

VirtualThread



[



#25



]/



runnable




@ForkJoinPool









1







worker







3

VirtualThread



[



#24



]/



runnable




@ForkJoinPool









1







worker







2

VirtualThread



[



#21



]/



runnable




@ForkJoinPool









1







worker







4

VirtualThread



[



#22



]/



runnable




@ForkJoinPool









1







worker







2

VirtualThread



[



#23



]/



runnable




@ForkJoinPool









1







worker







3

从控制台输出中,我们可以看到 VirtualThread[#21] 首先在 ForkJoinPool 的线程 1 上运行,并在从睡眠状态返回时继续在线程 4 上运行。


为什么虚拟线程在休眠后会从一个平台线程跳转到另一个?

如果我们阅读sleep方法的源码,我们发现sleep方法在Java19中已经被重写,并且重写的方法增加了虚拟线程相关的判断。


1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18



19



20



21



22



23



24




public static





void




sleep




(




long



millis



)





throws




InterruptedException



{







if





(



millis



<



0



) {







throw new




IllegalArgumentException



(




“timeout value is negative”




);





}




if





(



currentThread



()





instanceof




VirtualThread vthread



) {






long



nanos



=



MILLISECONDS



.




toNanos




(



millis



);




vthread



.




sleepNanos




(



nanos



);






return





;





}




if





(



ThreadSleepEvent



.




isTurnedOn




()) {





ThreadSleepEvent event



=





new




ThreadSleepEvent



();






try





{





event



.




time




=



MILLISECONDS



.




toNanos




(



millis



);




event



.




begin




();




sleep0



(



millis



);





}





finally





{





event



.




commit




();





}





}





else





{





sleep0



(



millis



);





}





}

深入研究代码,我们发现虚拟线程休眠时调用的真正方法是


Continuation.yield


.


1



2



3



4



5



6



7



8



9



10



11



12



13



14




@ChangesCurrentThread







private





boolean




yieldContinuation




() {






boolean



notifyJvmti



=



notifyJvmtiEvents



;






// unmount







if





(



notifyJvmti



)



notifyJvmtiUnmountBegin



(





false





);




unmount



();






try





{







return




Continuation



.




yield




(



VTHREAD_SCOPE



);





}





finally





{







// re-mount





mount



();






if





(



notifyJvmti



)



notifyJvmtiMountEnd



(





false





);





}





}

这意味着


Continuation.yield


将当前虚拟线程的堆栈从平台线程的堆栈转移到Java堆内存中,然后将其他准备就绪的虚拟线程的堆栈从Java堆复制到当前平台线程的堆栈以继续执行。阻塞操作如 IO 或


BlockingQueue.take()


引起虚拟线程切换,就像睡眠一样。虚拟线程切换也是比较耗时的操作,但是比平台线程的上下文切换要轻很多。

Go Channel vs. Java 阻塞队列

在 Go 编程中,Goroutine 与 channel 配合得很好,使用 Goroutine 计算数组元素的总和。

go实现:


1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18



19



20



21



22




package




main




import





“fmt”




func





sum



(s []



int



, c




chan





int



) {


sum



:=




0






for




_, v



:=





range




s {


sum



+=



v

}

c



<-



sum




// send sum to c





}






func





main



() {


s



:=



[]



int



{




7



,



2



,



8



,








9



,



4



,



0



}


c



:=




make



(




chan





int



)





go





sum



(s[:



len



(s)



/




2



], c)





go





sum



(s[



len



(s)



/




2



:], c)

x, y



:= <-



c,



<-



c




// receive from c



fmt.



Println



(x, y, x



+



y)

}

Java 实现。


1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18



19



20



21



22



23



24



25



26



27



28



29




import






java.util.concurrent.ArrayBlockingQueue





;






import






java.util.concurrent.BlockingQueue





;






import






java.util.concurrent.Executors





;




public class






main4





{







static





void




sum




(




int




[]



s



,




int



start



,




int



end



,



BlockingQueue



<



Integer



>



queue



)





throws




InterruptedException



{






int



sum



=



0



;






for





(




int



i



=



start



;



i



<



end



;



i



++) {





sum



+=



s



[



i



];





}




queue



.




put




(



sum



);





}




public static





void




main




(



String



[]



args



)





throws




InterruptedException



{






int




[]



s



= {




7



,



2



,



8



, –



9



,



4



,



0



};




var queue



=





new




ArrayBlockingQueue



<



Integer



>(



1



);




Thread



.




startVirtualThread




(() -> {





sum



(



s



,



0



,



s



.




length




/



2



,



queue



);





});




Thread



.




startVirtualThread




(() -> {





sum



(



s



,



s



.




length




/



2



,



s



.




length




,



queue



);





});





int



x



=



queue



.




take




();





int



y



=



queue



.




take




();


System



.




out




.




printf




(




“%d %d %d\n”




,



x



,



y



,



x



+



y



);





}





}

由于 Java 中没有切片,因此使用数组和索引。Java 中没有通道,所以


BlockingQueue


使用类似于管道的 .

使用方法比较

定义一个


say()


方法体,方法体循环 sleep 100ms,然后输出 index,分别使用java虚拟线程和 go 协执行该方法。进行比较性能。

go实现:


1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18




package




main




import




(




“fmt”





“time”




)




func





say



(s



string



) {






for




i



:=




0



; i <



5



; i



++



{


time.



Sleep



(



100




*



time.Millisecond)

fmt.



Println



(s)

}

}




func





main



() {






go





say



(



“world”



)




say



(



“hello”



)

}

java实现。


1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18



19



20



21



22



23




public final class






VirtualThreads





{







static





void




say




(



String s



) {







try





{







for





(




int



i



=



0



;



i



<



5



;



i



++) {





Thread



.




sleep




(



Duration



.




ofMillis




(



100



));




System



.




out




.




println




(



s



);





}





}





catch





(



InterruptedException e



) {







throw new




RuntimeException



(



e



);





}





}




public static





void




main




(



String



[]



args



)





throws




InterruptedException



{





var worldThread



=



Thread



.




startVirtualThread




(





() ->



say



(




“world”




)





);





say



(




“hello”




);









// Waiting for virtual threads to finish





worldThread



.




join




();





}





}

可以看到,两种语言编写协程的方式非常相似,一般Java虚拟线程写起来稍微麻烦一点,Go使用关键字轻松创建协程。

其他

虚拟线程和异步编程

反应式编程解决了平台线程需要阻塞等待其他系统响应的问题。使用异步 API 通过回调通知您结果,而不是阻塞和等待响应。当响应到达时,JVM 从线程池中分配另一个线程来处理响应。这样,

处理单个异步请求将涉及多个线程

在异步编程中,我们可以减少系统的响应延迟,但是由于硬件的限制,平台线程的数量仍然是有限的,所以我们仍然存在系统吞吐量的瓶颈。另一个问题是

异步程序在不同的线程中执行,很难调试或分析它们

虚拟线程通过较小的语法调整提高了代码质量(降低了编码、调试和分析代码的难度),同时具有可以显着提高系统吞吐量的反应式编程的优势。


不要池化虚拟线程

因为虚拟线程非常轻量级,并且每个虚拟线程在其生命周期内只运行一个任务,所以不需要池化虚拟线程。


虚拟线程下的ThreadLocal

public class main {
    private static ThreadLocal<String> stringThreadLocal = new ThreadLocal<>();

    public static void getThreadLocal(String val) {
        stringThreadLocal.set(val);
        System.out.println(stringThreadLocal.get());
    }

    public static void main(String[] args) throws InterruptedException {

        Thread testVT1 = Thread.ofVirtual().name("testVT1").unstarted(() ->main5.getThreadLocal("testVT1 local var"));

        Thread testVT2 = Thread.ofVirtual().name("testVT2").unstarted(() ->main5.getThreadLocal("testVT2 local var"));

        testVT1.start();
        testVT2.start();

        System.out.println(stringThreadLocal.get());
        stringThreadLocal.set("main local var");
        System.out.println(stringThreadLocal.get());
        testVT1.join();
        testVT2.join();
    }
}
//output
null
main local var
testVT1 local var
testVT2 local var

虚拟线程的支持


ThreadLocal


方式与平台线程相同,平台线程无权访问虚拟线程设置的变量,虚拟线程无权访问平台线程设置的变量,由平台线程负责用于运行对虚拟线程透明的虚拟线程。但是,由于可以创建数百万个虚拟线程,因此在虚拟线程中使用之前请三思


ThreadLocal


。如果我们在应用程序中创建一百万个虚拟线程,就会有一百万个


ThreadLocal


实例及其引用的数据。大量的对象会给内存带来很大的负担。

用 ReentrantLock 替换 Synchronized

因为


Synchronized


将虚拟线程一直固定在平台线程上,阻塞操作不会卸载虚拟线程,影响程序的吞吐量,所以需要


ReentrantLock


使用


Synchronized


.

之前:


1



2



3



4



5



6



7




public synchronized





void




m




() {







try





{







// … access resource






}





finally





{







//






}





}

后:


1



2



3



4



5



6



7



8



9



10




private final




ReentrantLock lock



=





new




ReentrantLock



();




public





void




m




() {





lock



.




lock




();





// block until condition holds







try





{







// … access resource






}





finally





{





lock



.




unlock




();





}





}


如何迁移

  1. 直接用虚拟线程池代替线程池。如果您的项目使用


    CompletableFuture


    ,您也可以直接将执行异步任务的线程池替换为


    Executors.newVirtualThreadPerTaskExecutor()


    .
  2. 消除池化机制。虚拟线程非常轻量级,不需要池化。


  1. synchronized



    更改为



    ReentrantLock



    减少固定到平台线程的虚拟线程。


概括

本文介绍了 Java 线程模型、Java 虚拟线程的使用、原理和适用场景,并与流行的 Go 协程 进行了比较,也发现了两种实现方式的相似之处,希望对大家理解 Java 虚拟线程有所帮助。



版权声明:本文为lewyu521原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。