Coup de Grace

框架系列 关于vert.x的一些

生产环境上暂时还没有正式应用过,先把东西积累一下,说不定日后在某处推广的起来.

也是因为上一个用RxJava更新的升级包一直没测试上线…颇有遗憾.

项目地址:vert.x

要实现异步无阻塞的服务提供,在Netty的加持下我们有

这样的选择,Vert.x提供了类似NodeJs的所有功能.

优点别人吹了一遍又一遍,响应式/无阻塞/Actor消息/多协议什么的.

用到了再说吧.把它当做非传统技术的趋势合集来观望也挺好的~.


闲话

Reactor模式

实现看过不少了.最近读了篇博客,盗了个图换个画风做备忘.

涉及到Reactor模式我们就可以用Halik来调试了.

同步/异步/阻塞/非阻塞

我们异步线程可以通过await与Condition转为同步

而非阻塞程序除了I/O异常之外就不要出现阻塞部分了.比如Thread.sleep()之类.

真想要同步转异步的话Vert.x中有executeBlocking.


基础 & 组件

本节都会来自官方文档,我提纯点儿有用的人话出来~.对于本节的摘抄来讲,看懂上一节的内容十分必要.

众所周知NodeJs只利用了单核的处理能力,空间换时间换来了性能提升,Vert.x跟Go一样利用了多核的处理能力,通过使用多个EventLoop(cores*2).

Vertx

启动或者新建一个Vertx对象很简单了:Vertx vertx = Vertx.vertx();

里面可以传入一个VertxOptions来设置HA,集群模式,线程池等等.

Vertx继承了一个可供Metrics的类,用于开启数据收集.

后续不需要拆分成Verticle的,运行在本Context内的部分就可以现在通过Vertx来创建了,起任务啊,起http server啊…

执行block代码

前文说过同步转异步通过executeBlocking来进行

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime);
executor.executeBlocking(future -> {
  String result = someAPI.blockingMethod("hello");
  future.complete(result);
}, res -> {
  System.out.println(res.result());
});

当然了,不需要指定线程池的可以Vertx.executeBlocking().

异步协调

全异步操作间的协调等待工作交给CompositeFuture

Future<String> future1 = Future.future();
Future<String> future2 = Future.future();
CompositeFuture.any(future1, future2).setHandler(ar -> {
  if (ar.succeeded()) ;
});

相对应的还有CompositeFuture.all()供使用

流式版本

FileSystem fs = vertx.fileSystem();
Future<Void> fut1 = Future.future();
fs.createFile("/foo", fut1.completer());
fut1.compose(v -> {
  Future<Void> fut2 = Future.future();
  fs.writeFile("/foo", Buffer.buffer(), fut2.completer());
  // Compose fut1 with fut2
  return fut2;
}).compose(v -> {
  // Compose fut1 with fut2 and fut3
  fs.move("/foo", "/bar", startFuture.completer());
}, startFuture);

Verticles

类似Actor的设定,默认生命周期方法只有启动与停止.也是承接消息的组件,可以理解为service与处理单元吧.

万事不明先继承个AbstractVerticle再说.

三种类型

These are the most common and useful type - they are always executed using an event loop thread.

This means you can write all the code in your application as single threaded and let Vert.x worrying about the threading and scaling.

No more worrying about synchronized and volatile any more, and you also avoid many other cases of race conditions and deadlock so prevalent when doing hand-rolled ‘traditional’ multi-threaded application development.

真是欢天下之大乐,选型Go的话不是有一大部分原因就是可以屏蔽开发人员对于异步事件的迷惑么~

These run using a thread from the worker pool. An instance is never executed concurrently by more than one thread.

Worker verticles are designed for calling blocking code, as they won’t block any event loops.

These run using a thread from the worker pool. An instance can be executed concurrently by more than one thread.

创建Verticle

public class MyVerticle extends AbstractVerticle {
  public void start(Future<Void> startFuture) {
    // Now deploy some other verticle:
    vertx.deployVerticle("com.foo.OtherVerticle", res -> {
      if (res.succeeded()) {
        startFuture.complete();
      } else {
        startFuture.fail(res.cause());
      }
    });
  }
}

第一次尝到全异步系统的时候可能就是这样惊*,你连组件的启动都可能是异步带回调的…

部署Verticle

Verticle myVerticle = new MyVerticle();
//指定实例数目用来使用多核能力
DeploymentOptions options = new DeploymentOptions().setInstances(16);
vertx.deployVerticle(myVerticle,options);
// OR
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", res -> {
  if (res.succeeded()) {
    System.out.println("Deployment id is: " + res.result());
  } else {
    System.out.println("Deployment failed!");
  }
});
//后续如下config可以在Context中或者使用config()直接拿到.
JsonObject config = new JsonObject().put("name", "tim").put("directory", "/blah");
DeploymentOptions options = new DeploymentOptions().setConfig(config);

Verticle内Context

对于不同类型的Verticle来说Context有不同表现

那么有如下:

Context context = vertx.getOrCreateContext();
if (context.isEventLoopContext()) ;
else if (context.isWorkerContext()) ;
else if (context.isMultiThreadedWorkerContext()) ;
else if (! Context.isOnVertxThread()) ;

上下文内最重要的存放信息

final Context context = vertx.getOrCreateContext();
context.put("data", "hello");
context.runOnContext((v) -> {
  String hello = context.get("data");
});

Verticle内定时任务

long timerID = vertx.setTimer(1000, id -> {
  System.out.println("And one second later this is printed");
});

long timerID = vertx.setPeriodic(1000, id -> {
  System.out.println("And every second this is printed");
}); 

EventBus事件总线

实现的广播/订阅,点对点,应答式都是基于消息盒子的.

我们把handler绑在一个地址上在总线中接受消息.

使用vertx最膨胀的就是各种handler了.

消息类型: 基本类型/简单类型/JSON/codec过的复杂对象.

EventBus eventBus = vertx.eventBus();
eventBus.registerCodec(myCodec);

//广播
DeliveryOptions options = new DeliveryOptions();
//说起这个header,有很大的flume event既视感啊
options.addHeader("some-header", "some-value");
//Object编码,可以直接发送obj
options.setCodecName(myCodec.name());

eventBus.publish("news.uk.sport", "Yay! Someone kicked a ball",options);
//点对点发送,地址上有多个handler的话就是轮询一手
eventBus.send("news.uk.sport", "Yay!", ar -> {
  if (ar.succeeded()) {
    //reply回复的body    
    System.out.println(ar.result().body());
  }
});

MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");
consumer.handler(message -> {
  System.out.println(message.body());
  //reply
  message.reply("how interesting!");
});
consumer.completionHandler(res -> {
  if (res.succeeded()) ;
});

集群上的EventBus配置通过EventBusOptions来进行,这就是后话了.

JSON

消息对象里面的这两位应该是一等公民了.

TCP server

创建TCP Server/Client来操作socket通信,使用NetServerNetClient,处理请求还是handler的形式.

具体的一些feature:SSL/Proxy/Logging等等.

socket上的读写Stream通过内置的Buffer对象来进行.

Http Server

Vert.x是支持Http/2 的,也就是说push API可以尝鲜一下.

具体类的话就是HttpServerHttpClient.eg如下:

HttpClientRequest request = client.post("some-uri", response -> {
  System.out.println(response.statusCode());
});

request.handler(response -> {
  System.out.println(response.statusCode());
});
request.exceptionHandler(e -> {
  e.printStackTrace();
});

其他的功能: 块传输/Http压缩/轮询与心跳/Http2/Proxy/Https等等就不介绍了,谁家都有.

代理有ProxyType.SOCKS5类型,在HttpClientOptions中指定,可以的.

对待WebSocket有这么一手:

//Server
server.websocketHandler(websocket -> {
  if (websocket.path().equals("/myapi")) {
    websocket.reject();
  } else {
    WebSocketFrame frame1 = WebSocketFrame.binaryFrame(buffer1, false);
    websocket.writeFrame(frame1);
    // Write the final frame
    WebSocketFrame frame3 = WebSocketFrame.continuationFrame(buffer2, true);
    websocket.writeFrame(frame3);
  }
});

//Client
client.websocket("/some-uri", websocket -> {
  System.out.println("Connected!");
});

SharedData

SharedData sd=vertx.sharedData();

Vert.x内共享数据几种类型:

Cluster-wide asynchronous maps

//clusted
sd.<String, String>getClusterWideMap("mymap", res -> {
  if (res.succeeded()) 
    AsyncMap<String, String> map = res.result();
});
//cluster opt
map.put("foo", "bar", resPut -> {
  if (resPut.succeeded()) ;
});

接下来的几章

接下来的几章偏工具形式,我挑感兴趣的说说.

Vertx-Core部分就到此结束了,构建一个异步无阻塞的程序的思路也浮上水面.

上文中需要等待pi cluster的部分我会择日在集群文章中补充.

后续会更新什么ext呢?比如说以下的部分都在选型范围内.

或许生产环境上的独立功能也可能用以下ext来实现,部分微服务/爬虫/自动化feature.

在我个人可控范围内的话

done.