NightPxy 个人技术博客

nel.boots

Posted on By NightPxy

介绍

nel.boots目标是集成多种框架,并为其提供默认的,统一的使用方式

nel.boots依赖配置

  • 包括但不限于配置文件(默认nel.boots.properties).
  • 其中大部分配置都已提供默认实现
  • 所有的配置都可以以配置文件的形式,屏蔽在运行时的变更
  • 所有的配置都可以在运行时变更来屏蔽配置文件中的默认情况

nel.boots将所有应用以插件形式集成到Boots对象中

  • 提供 Object BootsDefault,大部分情况下可以直接使用

Core

内置模块

部分Core环境配置

配置 说明
nel.env.date.in.formats 默认 yyyy-MM-dd,yyyyMMdd,yyyy/MM/dd,yyyy-MM-dd HH:mm:ss
nel.env.date.output.format.full yyyy-MM-dd HH:mm:ss
nel.env.date.output.format.short” yyyy-MM-dd
nel.env.date.timezone Asia/Shanghai
nel.fs.file.field.seq \t
nel.fs.file.row.seq \n
  • 常用模式(UsingPattern,LazyInitializePattern,RetryPattern…)
  • 本地文件IO
  • 配置管理
  • 对象池
  • 序列化
  • 格式化
  • Http
  • Implicit Enhanced
  • 异常体系(BootsRuntimeException,BootsUserBehaviorException)

部分使用例子


// UsingPattern,要求对象必须有close方法并且必定调用其close方法
usingPattern(HDFS){ hdfs => 
    hdfs.exists(new Path(config.path))
}


// 本地文件系统IO
BootsDefault.fs
	.config.path(path).build()
	.read().length == fileDataList.length
	

// Http Utils	
BootsDefault.http.get("https://www.baidu.com/")	

// format
BootsDefault.format.byteSize(1232786734)
BootsDefault.format.tryDate(new Date()) // if null default '-'

// convert
// 默认使用配置 nel.env.date.in.formats 转换 
BootsDefault.convert.date("2018-1-1")    

// 对象池  
val pool = new ObjectPool[BootsConnection](cfg, creator: () => T)
.....

Persistent

ORM

封装Domain-CRUD操作,同时内置执行源生SQL()
基于db配置的多种支持

  • 基于db.dialect的多数据库类型支持
    MySQL, HBase
  • 基于db.connection.provider的连接方式提供
    单例级连接 全局单例,适合一些诸如Kafka,HBase等客户端期望单例场景
    连接池 适合诸如常规JDBC场景
    会话级连接 连接在一个sessionContext中保持单例,unitOfWork完成后释放

常规使用


# 配置
# test_db db映射(支持多数据库)
nel.orm.db.test_db.mysql.url=jdbc:mysql://127.0.0.1:3306/test_db_name?serverTimezone=UTC&characterEncoding=UTF-8&useSSL=false
nel.orm.db.test_db.mysql.user=root
nel.orm.db.test_db.mysql.password=12abAB

//对象映射
//使用MySQL注解完成db映射和方言映射  
@MySQL(db = "test_db",table = "t_test",isColumnAutoMapping = true)
class TestDomain extends SomeParentDomain{
  var a:String = null
  
  def set(a:String):this.type = {
      this.a = a
      this
  }
}

/**
* 命令系-CRUD 
*/
import nel.framework.boots.orm.BootsOrm._

// createNew 创建一个实体(补全主键生成,CreateBy,Creator等等)
// saveOrUpdate 新增或更新  
// find 根据主键查找一个实体  
// load 根据主键加载一个实体(约定如果查找为null将抛出异常)
// delete 根据主键删除一个实体
// ......
// commit 显式提交当前事务(目标方言需支持事务,不显式提交将在unitOfWork尾自动调用) 
// rollback 显式回滚当前事务(目标方言需支持回滚,unitOfWork出现异常将自动回滚)

val id = "id"
BootsDefault.orm.unitOfWork("Operator")(implicit sessionContext => {
  sessionContext.createNew[TestDomain](id).set("aaa") 
})
BootsDefault.orm.unitOfWork("Operator")(implicit sessionContext => {
  sessionContext.saveOrUpdate {
	sessionContext.load[TestDomain](id).set("bbb")
  }
})


/**
* 查询系-依赖目标方言
*/

BootsDefault.orm.unitOfWork("Operator")(implicit sessionContext => {

   import nel.framework.boots.orm.dialect.mysql.MySQLDialect._
   //指定目标方言,不同例如MySQL 和 HBase 的查询系API是不同

   // mysqlExecuteByClazzMeta 执行SQL
   // mysqlQueryListByDB 指定DB执行SQL查询列表
   // mysqlQuerySingleByClazzMeta 依赖ClazzMappingMeta执行SQL查询首行
   // mysqlQueryListByClazzMeta 依赖ClazzMappingMeta执行SQL查询列表
   // mysqlQueryConditionListByClazzMeta 依赖ClazzMappingMeta执行SQL条件查询列表
   // ....

  //依据条件Tuple查询列表
  sessionContext.mysqlQueryConditionListByClazzMeta[TestDomain](("a", "aaa"))

  //依赖ClazzMappingMeta执行SQL查询
  sessionContext.mysqlQueryListByClazzMeta[TestDomain](
	"select * from test_db where a =? ;",
	ListBuffer("aaa")
  )
  
  //执行SQL查询
  sessionContext.mysqlQueryListByDB(
	db = "test_db",
	sql = "select * from test_db where a =? ;",
	parameters = ListBuffer("aaa")
  )(rs=> new TestDomain(){.....})
})

jdbc

Redis

内部使用Lettuce作为Redis的连接客户端

  • Jedis是直连Redis,多线程下非线程安全
  • Lettuce基于Netty的线程安全连接实例(StatefulRedisConnection)

Module引入

<dependency>
	<groupId>cn.night</groupId>
	<artifactId>nel.framework.boots.redis</artifactId>
	<version>${boots.version}</version>
</dependency>

可选配置

配置 说明
nel.redis.host 必填
nel.redis.port 必填
nel.redis.password 选填
nel.redis.timeout 请求超时(毫秒),默认10000
nel.redis.pool.active.max 连接池最大数,默认8,负值表示不限
nel.redis.pool.wait.max 连接池阻塞等待最大时间,默认-1,负值表示不限
nel.redis.pool.idle.max 连接池最大空闲数,默认8,负值表示不限
nel.redis.pool.idle.min 连接池最小空闲数,默认0,负值表示不限
nel.redis.data.expired 数据默认过期时间(秒),默认10

部分使用例子


// 为Boots加载Redis插件  
import nel.framework.boots.modules.redis.BootsRedis._

/**
* 写入
*/

BootsDefault.cache.redis
	.set("test_key","123") // 使用默认过期超时10秒  
	
BootsDefault.cache.redis
  .config.host("xxxx").port(1234).build()
  .set("test_key","123",30,TimeUnit.SECONDS)		
  
/**
* 读取
*/
BootsDefault.cache.redis.get[Test]("test_key")

InfluxDB

时序数据库InfluxDB的封装(基于org.influxdb客户端封装)

  • Influx-SQL执行
  • InfluxDB的批量提交封装,在大数据量提交下比较有用

Module引入

<dependency>
	<groupId>cn.night</groupId>
	<artifactId>nel.framework.boots.influx</artifactId>
	<version>${boots.version}</version>
</dependency>

可选配置

配置 说明
nel.influx_db.url 必填
nel.influx_db.user 必填
nel.influx_db.password 选填
nel.influx_db.db_name 必填
nel.influx_db.retention_policy 数据保留策略,默认default
nel.influx_db.consistency_level 事务级别:all,any,one,quorum,默认all

部分使用例子

//删除测试库
BootsDefault.influxDB.execute("drop measurement \"cpu\"")

//批量写入数据
BootsDefault.influxDB.batch(Array(
Point.measurement("cpu")
  .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  .addField("idle", 90L)
  .addField("user", 9L)
  .addField("system", 1L)
  .build()
))

//查询
BootsDefault.influxDB.execute("SELECT idle FROM cpu").size > 0

hadoop

HDFS

Module引入

<dependency>
	<groupId>cn.night</groupId>
	<artifactId>nel.framework.boots.hdfs</artifactId>
	<version>${boots.version}</version>
</dependency>

可选配置

配置 说明
nel.fs.hdfs.url 必须配置
nel.fs.hdfs.user HDFS用户

部分使用例子


// 为Boots加载HDFS向插件
import cn.night.nel.framework.boots.modules.fs.BootsHDFS._

val path = "hdfs://XXX:9000/test/file_suit/fileSuitOutput.txt"

// 追加写入  
BootsDefault.hdfs
        .config.path(path).saveMod(SaveMode.Append).build()
        .write(fileDataList)
        
// 覆盖写入
BootsDefault.hdfs
        .config.path(path).saveMod(SaveMode.Overwrite).build()
        .write(fileDataList)

// 读取  
val list:List[String] = BootsDefault.hdfs
        .config.path(path).build()
        .read()

Zookeeper

基于org.apache.curator的封装

Module引入

Boots.zk 封装依赖 org.apache.curator

<dependency>
	<groupId>cn.night</groupId>
	<artifactId>nel.framework.boots.zk</artifactId>
	<version>${boots.version}</version>
</dependency>

可选配置

配置 说明
zk.connectionString 必填
zk.delete_auto_guaranteed 删除确认,开启后如果删除断开将会在后台线程不断重试删除节点操作.默认true.使用ZK锁务必开启此选项,因为断开后临时节点未删除成功,重连后临时节点也不会自动删除,可能会对锁造成严重破坏
zk.data_charset 默认utf-8
zk.connection_timeout 连接超时时间(秒),默认15
zk.session_timeout 会话超时时间(秒),默认30
zk.connection_retry_count 连接断开重试此时,默认3
zk.connection_retry_interval 连接断开重试间隔(秒),默认1.(指数提升1,2,4)

部分使用例子

val path = "/test/node"

BootsDefault.zk.
	.config.build
	.createWithCreatingParents(CreateMode.EPHEMERAL,path,"123")

BootsDefault.zk.
	.config.build
	.setData(path,"456")
	
BootsDefault.zk.
	.config.build
	.getData(path)	

BootsDefault.zk.
	.config.build
	.getChildren("/test")		
	
BootsDefault.zk.
	.config.build
	.delete(path)

MQ

Kafka

基于org.apache.kafka的封装

  • 发送消息封装(只关注业务部分而屏蔽发送实现细节)
    内置KafkaProducer单例
  • 消费线程池封装(只关注业务部分而屏蔽消费实现细节)
    自动Offset管理 基于消费实现方法是否成功返回true
    线程池消费封装 基于Kafka的特点设计为1+M的模式
    自动Kafka分区感知&分区有序 自动根据Kafka分区平衡实际的消息线程数(削峰填谷)
    自动消费限速

Module引入

<dependency>
	<groupId>cn.night</groupId>
	<artifactId>nel.framework.boots.kafka</artifactId>
	<version>${boots.version}</version>
</dependency>

部分使用例子

配置 说明
nel.boots.kafka.url 必填
nel.boots.kafka.send.retry.count 发送失败重试次数,默认3次
nel.boots.kafka.consumer.topics 消费topic(竖线分割),必填
nel.boots.kafka.consumer.group 消费组,必填
nel.boots.kafka.consumer.poll.max 消费最大拉取条数,默认200
nel.boots.kafka.consumer.partition.max 分区内最大消费条数,默认不限
//发送
val sender = BootsDefault.kafka.sender
      .config.kafkaUrl("192.168.18.151:9092").build()
      .sendString(
          topic = "kafka.consumer.test",
          message = "M" + idx,
          key = Some("kafka.consumer.test"))
//接收          
BootsDefault.kafka.sequenceConsumer()
  .config
	.kafkaUrl("xxx:9092")
	.kafkaConsumerTopics("your_topic")
	.kafkaConsumerGroup("your_group")
  .build()
  .process(messages => {
    //分区内消息已保持有序
	messages.foreach(message => println(message))
	//返回true将自动提交分区内Offset
	true
  }).start()
}          

RabbitMQ

基于com.rabbitmq封装

  • 发送消息封装(只关注业务部分而屏蔽发送实现细节)
    基于RabbitMQ特点设计为短连接发送发送消息
  • 消费线程池封装(只关注业务部分而屏蔽消费实现细节)
    自动ACK 基于消费实现方法是否成功返回true而ACK
    线程池消费封装 基于RabbitMQ的特点设计为M+N模式
    M个连接线程,负责拉取消息,处理满时阻塞
    N个处理线程,负责处理消息,没有消息时阻塞

Module引入

<dependency>
	<groupId>cn.night</groupId>
	<artifactId>nel.framework.boots.rabbitmq</artifactId>
	<version>${boots.version}</version>
</dependency>

可选配置

配置 说明
nel.rabbitmq.host 必填
nel.rabbitmq.queue.exchange 默认amq.direct
nel.rabbitmq.queue.auto_create 是否自动创建队列,默认false
nel.rabbitmq.send.retry.count 发送失败重试次数,默认3次
nel.rabbitmq.send.message.persist 是否要求持久化消息,默认false
nel.rabbitmq.consume.queue.name 消费队列名
nel.rabbitmq.consume.receiver.auto.ack 消息是否自动ACK,默认false(根据结果ACK)
nel.rabbitmq.consume.receiver.thread.count 消息拉取线程数,默认1
nel.rabbitmq.consume.processor.thread.count 消息处理线程数,默认10

部分使用例子


// 为Boots加载RabbitMQ插件  
import BootsRabbitMQ._

//消息发送
BootsDefault.rabbitMQ.sender()
	.config.sendRetryCount(1).build()
	.sendMessage("queueName", "123")
	
//消息消费
BootsDefault.rabbitMQ.consumer()
  .config.queueName("queueName").processorThreadCountMax(2).build()
  .process(message => {  
	println(message)
	// 返回true表示ACK成功
	true
  })
  .start()

Web

nel.boots的Web框架是架构在Spring.Boots 2.0 之上的

Rest

Module

<dependency>
	<groupId>cn.night</groupId>
	<artifactId>nel.framework.boots.web</artifactId>
	<version>${boots.version}</version>
</dependency>

部分使用例子

@RestController
@RequestMapping(path = Array("/api/monitoring/project"))
class ProjectController extends RestApi {
	import web._
	
   // Action.Get	
   @GetMapping(Array("/application/list"))
   def application_list(request: HttpServletRequest) = sso(request) { implicit sessionContext =>
    ......
   }
   
   // Action.Post
   @PostMapping(Array("/application/set"))
   def application_set(request: HttpServletRequest) = sso(request) { implicit sessionContext =>
    ......
   }
}

前端

Angular

这是一个nel的前端向框架
提供基于Angular1.5.8的样式皮肤,布局,ajax,常用如Table,Window,Panel,tips等等功能

<dependency>
	<groupId>cn.night</groupId>
	<artifactId>nel.framework.template.angular</artifactId>
	<version>${boots.version}</version>
</dependency>

Spark

自定义数据源

Spark-HBase

val df = spark.read
  .format("nel.framework.boots.spark.client.source.hbase")
  .option("spark.cus.hbase.url","192.168.198.151")
  .option("spark.cus.hbase.table","dbo:test")
  .option("spark.cus.hbase.schema","c1 int,c2 long,c3 string") // row_key将被自动添加
  .option("spark.cus.hbase.start_key","00000001")
  .option("spark.cus.hbase.end_key","10000000")
  .load()

df.write
  .format("nel.framework.boots.spark.client.source.hbase")
  .option("spark.cus.hbase.url","192.168.198.151")
  .option("spark.cus.hbase.table","dbo:test")
  .option("spark.cus.hbase.tmp_path","hdfs://192.168.198.151:9000/xx/tmp/hbase_xxx")//MR数据落地Tmp地址
  //.option("spark.cus.hbase.row_key","row_key") // 默认df中row_key做为row_key,该列必须存在且有值   
  .save()

常规使用


implicit val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WCApp")

// SparkSQL  
BootsDefault.sparkClient
  // 关闭SparkServerMetric传输
  .config.isEnableSparkServer(false).build()
  .sql(spark => {
	val lines = spark.sparkContext.textFile("D:\\data\\test\\wc.txt")
	lines.flatMap(_.split(",")).map((_, 1))
	  .reduceByKey(_ + _)
	  .collect()
	  .foreach(println(_))
  });
  
// SparkStreaming
 BootsDefault.sparkClient
   .config.isEnableSparkServer(false).build()
   .streaming(Duration(5*1000)){ ssc =>
	 val lines = ssc.socketTextStream("xxx",9999)
	 //...
   }

Spark Server Metric Collect

单次执行过程
执行信息

执行过程时序日志
时序日志

执行过程明细
执行过程

执行存储信息
存储信息

Stage明细
Stage详细