akka介绍

akka简介

   一开始想接触到akka,是在看一些并发相关资料的时候,查了下akka的官方介绍,介绍如下:Akka是一个开发库和运行环境,可以用于构建高并发、分布式、可容错、事件驱动的基于JVM的应用,使构建高并发的分布式应用更加容易。

听到高并发和分布式这两个关键字就已经足够让人想去探索究竟是什么样的框架,当深入查看各种资料后,发现当前大数据领域火热的spark、flink底层的分布式计算和通信实现都是akka,是不是很意外。虽然它是由scala编写的,但也同时提供java api接口,所以使用java一样可以使用akka。

   akka是基于actor模型的实现,actor模型也就是响应式模型,它和我们常用的基于方法堵塞式的调用不同,而是基于消息的异步调用。

在使用Java进行并发编程时需要特别的关注锁和内存原子性等一系列线程问题,而Actor模型内部的状态由它自己维护即它内部数据只能由它自己修改(通过消息传递来进行状态修改),并且一次一次actor只能处理一条消息,这就相当于java里的一个单例对象的一个加了synchronized修饰符的方法的调用,但actor不同的是它通过从邮箱存储消息,然后消息的处理是按顺序的。如果要进行并行的消息处理,就需要创建多个actor,多个actor之间消息数据是并行的。

Actor由状态(state)、行为(Behavior)和邮箱(mailBox)三部分组成

状态(state):Actor中的状态指的是Actor对象的变量信息,状态由Actor自己管理,避免了并发环境下的锁和内存原子性等问题

行为(Behavior)行为指定的是Actor中计算逻辑,通过Actor接收到消息来改变Actor的状态

邮箱(mailBox)邮箱是Actor和Actor之间的通信桥梁,邮箱内部通过FIFO消息队列来存储发送方Actor消息,接受方Actor从邮箱队列中获取消息

Actor的基础就是消息传递。如下图所示:

Akka的五大特性

1)易于构建并行和分布式应用 (Simple Concurrency & Distribution)
      Akka在设计时采用了异步通讯和分布式架构,并对上层进行抽象,如Actors、Futures ,STM等。
 

2)可靠性(Resilient by Design)

     系统具备自愈能力,在本地/远程都有监护。

3)高性能(High Performance

    在单机中每秒可发送50000000个消息。内存占用小,1GB内存中可保存2500000个actors。

4)弹性,无中心(Elastic Decentralized

   自适应的负责均衡,路由,分区,配置

5)可扩展(Extensible

可以使用Akka 扩展包进行扩展。

整个akka体系由如下几部分组成:

akka-actors

akka的核心,一个用于并发和分发的模型,没有线程原语的所有痛苦

akka-stream

一种直观而安全的方式来实现异步、非阻塞的回压流处理。

akka-http

现代的、快速的、异步的、流的HTTP服务器和客户端。

akka-cluster

通过在多个节点上分布您的系统来获得弹性和弹性。

akka-sharding

根据用户的身份,在集群中分配您的参与者。

Distributed Data

最终一致,高度读取和写入可用,低延迟数据

Akka Persistence

为参与者的事件包允许他们在重新启动后到达相同的状态。

Akka Management

在云系统上运行Akka系统的扩展(k8saws

Alpakka

Akka流连接器用于集成其他技术

 

简单入门使用案例:

首先maven的pom加入akka的依赖

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.12</artifactId>
    <version>2.5.3</version>
</dependency>

创建actors

要创建actor需要继承AbstractActor 类并重写它的初始行为方法createReceive ,actor接收消息后会触发createReceive方法被调用,所以actor收到消息的处理动作主要在该方法。

createReceive 没有参数返回的结果是AbstractActor.Receive. 可以通过receiveBuilder()来接收消息以及它的类型,从而判断该如何处理消息。

如下图所示:

接下来我们创建整个actor系统的上下文,并创建ActorRef引用我们的actor,由于actor是通过消息驱动的,我们是无法直接获取actor的实例,所以需要通过ActorRef来向actor投递消息;而ActorRef的创建需要通过actor上下文ActorSystem的实例来创建,如下图所示:

Props是创建actorRef的配置,如上图我们需要创建的actor类是SendActor。

上图的actorSystem创建一个名字叫做sender的actorRef,然后调用ActorRef的tell方法把消息投递给actor处理。

要停止整个actor系统,需要ActorSystem的terminate()方法。

接下来我们看下结果的输出,如下图所示

 

以上是单机情况下使用akka的案例,后面讲接收使用akka-remote进行远程调用案例。

  • 17
    点赞
  • 104
    收藏
    觉得还不错? 一键收藏
  • 11
    评论
Akka Scheduler是Akka框架中的一个任务调度器,它可以用于执行基于时间的调度任务。Akka框架是一个用于构建高并发和分布式应用的工具包,它基于Actor模型,提供了可扩展、高可用、高吞吐量的分布式计算能力。 在Akka框架中,Scheduler是一个全局的调度器,它负责执行所有的任务调度。Scheduler的使用非常简单,只需要调用ActorSystem.scheduler().schedule()方法即可创建一个新的任务调度。 Akka Scheduler支持两种类型的任务调度:一次性任务和重复任务。一次性任务只会执行一次,而重复任务会按照指定的时间间隔重复执行。调度时间可以使用Duration类型来定义,Duration类型可以表示时间间隔,例如1秒、2分钟等。 下面是一个使用Akka Scheduler创建任务调度的示例代码: ```java import akka.actor.ActorSystem; import scala.concurrent.duration.Duration; import java.util.concurrent.TimeUnit; public class SchedulerDemo { public static void main(String[] args) { ActorSystem system = ActorSystem.create("MyActorSystem"); // 创建一个一次性任务 system.scheduler().scheduleOnce(Duration.create(1, TimeUnit.SECONDS), new Runnable() { @Override public void run() { System.out.println("One time task executed."); } }, system.dispatcher()); // 创建一个重复任务 system.scheduler().schedule(Duration.Zero(), Duration.create(1, TimeUnit.SECONDS), new Runnable() { @Override public void run() { System.out.println("Recurring task executed."); } }, system.dispatcher()); } } ``` 在上面的示例中,我们创建了一个一次性任务和一个重复任务,它们分别在1秒后执行和每隔1秒执行一次。在实际开发中,我们可以使用Akka Scheduler来执行定时任务、清理任务、统计任务等。

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 11
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值