介绍
nel.boots目标是集成多种框架,并为其提供默认的,统一的使用方式
nel.boots依赖配置
- 包括但不限于配置文件(默认
nel.boots.properties
). - 其中大部分配置都已提供默认实现
- 所有的配置都可以以配置文件的形式,屏蔽在运行时的变更
- 所有的配置都可以在运行时变更来屏蔽配置文件中的默认情况
nel.boots将所有应用以插件形式集成到Boots对象中
- 提供
Object BootsDefault
,大部分情况下可以直接使用
Core
内置模块
部分Core环境配置
配置 | 说明 |
---|---|
nel.env.date.in.formats | 默认 yyyy-MM-dd,yyyyMMdd,yyyy/MM/dd,yyyy-MM-dd HH:mm:ss |
nel.env.date.output.format.full | yyyy-MM-dd HH:mm:ss |
nel.env.date.output.format.short” | yyyy-MM-dd |
nel.env.date.timezone | Asia/Shanghai |
nel.fs.file.field.seq | \t |
nel.fs.file.row.seq | \n |
… | … |
- 常用模式(UsingPattern,LazyInitializePattern,RetryPattern…)
- 本地文件IO
- 配置管理
- 对象池
- 序列化
- 格式化
- Http
- Implicit Enhanced
- 异常体系(BootsRuntimeException,BootsUserBehaviorException)
部分使用例子
// UsingPattern,要求对象必须有close方法并且必定调用其close方法
usingPattern(HDFS){ hdfs =>
hdfs.exists(new Path(config.path))
}
// 本地文件系统IO
BootsDefault.fs
.config.path(path).build()
.read().length == fileDataList.length
// Http Utils
BootsDefault.http.get("https://www.baidu.com/")
// format
BootsDefault.format.byteSize(1232786734)
BootsDefault.format.tryDate(new Date()) // if null default '-'
// convert
// 默认使用配置 nel.env.date.in.formats 转换
BootsDefault.convert.date("2018-1-1")
// 对象池
val pool = new ObjectPool[BootsConnection](cfg, creator: () => T)
.....
Persistent
ORM
封装Domain-CRUD操作,同时内置执行源生SQL()
基于db配置的多种支持
- 基于
db.dialect
的多数据库类型支持
MySQL
,HBase
- 基于
db.connection.provider
的连接方式提供
单例级连接
全局单例,适合一些诸如Kafka,HBase等客户端期望单例场景
连接池
适合诸如常规JDBC场景
会话级连接
连接在一个sessionContext中保持单例,unitOfWork完成后释放
常规使用
# 配置
# test_db db映射(支持多数据库)
nel.orm.db.test_db.mysql.url=jdbc:mysql://127.0.0.1:3306/test_db_name?serverTimezone=UTC&characterEncoding=UTF-8&useSSL=false
nel.orm.db.test_db.mysql.user=root
nel.orm.db.test_db.mysql.password=12abAB
//对象映射
//使用MySQL注解完成db映射和方言映射
@MySQL(db = "test_db",table = "t_test",isColumnAutoMapping = true)
class TestDomain extends SomeParentDomain{
var a:String = null
def set(a:String):this.type = {
this.a = a
this
}
}
/**
* 命令系-CRUD
*/
import nel.framework.boots.orm.BootsOrm._
// createNew 创建一个实体(补全主键生成,CreateBy,Creator等等)
// saveOrUpdate 新增或更新
// find 根据主键查找一个实体
// load 根据主键加载一个实体(约定如果查找为null将抛出异常)
// delete 根据主键删除一个实体
// ......
// commit 显式提交当前事务(目标方言需支持事务,不显式提交将在unitOfWork尾自动调用)
// rollback 显式回滚当前事务(目标方言需支持回滚,unitOfWork出现异常将自动回滚)
val id = "id"
BootsDefault.orm.unitOfWork("Operator")(implicit sessionContext => {
sessionContext.createNew[TestDomain](id).set("aaa")
})
BootsDefault.orm.unitOfWork("Operator")(implicit sessionContext => {
sessionContext.saveOrUpdate {
sessionContext.load[TestDomain](id).set("bbb")
}
})
/**
* 查询系-依赖目标方言
*/
BootsDefault.orm.unitOfWork("Operator")(implicit sessionContext => {
import nel.framework.boots.orm.dialect.mysql.MySQLDialect._
//指定目标方言,不同例如MySQL 和 HBase 的查询系API是不同
// mysqlExecuteByClazzMeta 执行SQL
// mysqlQueryListByDB 指定DB执行SQL查询列表
// mysqlQuerySingleByClazzMeta 依赖ClazzMappingMeta执行SQL查询首行
// mysqlQueryListByClazzMeta 依赖ClazzMappingMeta执行SQL查询列表
// mysqlQueryConditionListByClazzMeta 依赖ClazzMappingMeta执行SQL条件查询列表
// ....
//依据条件Tuple查询列表
sessionContext.mysqlQueryConditionListByClazzMeta[TestDomain](("a", "aaa"))
//依赖ClazzMappingMeta执行SQL查询
sessionContext.mysqlQueryListByClazzMeta[TestDomain](
"select * from test_db where a =? ;",
ListBuffer("aaa")
)
//执行SQL查询
sessionContext.mysqlQueryListByDB(
db = "test_db",
sql = "select * from test_db where a =? ;",
parameters = ListBuffer("aaa")
)(rs=> new TestDomain(){.....})
})
jdbc
Redis
内部使用Lettuce
作为Redis的连接客户端
- Jedis是直连Redis,多线程下非线程安全
- Lettuce基于Netty的线程安全连接实例(StatefulRedisConnection)
Module引入
<dependency>
<groupId>cn.night</groupId>
<artifactId>nel.framework.boots.redis</artifactId>
<version>${boots.version}</version>
</dependency>
可选配置
配置 | 说明 |
---|---|
nel.redis.host | 必填 |
nel.redis.port | 必填 |
nel.redis.password | 选填 |
nel.redis.timeout | 请求超时(毫秒),默认10000 |
nel.redis.pool.active.max | 连接池最大数,默认8,负值表示不限 |
nel.redis.pool.wait.max | 连接池阻塞等待最大时间,默认-1,负值表示不限 |
nel.redis.pool.idle.max | 连接池最大空闲数,默认8,负值表示不限 |
nel.redis.pool.idle.min | 连接池最小空闲数,默认0,负值表示不限 |
nel.redis.data.expired | 数据默认过期时间(秒),默认10 |
部分使用例子
// 为Boots加载Redis插件
import nel.framework.boots.modules.redis.BootsRedis._
/**
* 写入
*/
BootsDefault.cache.redis
.set("test_key","123") // 使用默认过期超时10秒
BootsDefault.cache.redis
.config.host("xxxx").port(1234).build()
.set("test_key","123",30,TimeUnit.SECONDS)
/**
* 读取
*/
BootsDefault.cache.redis.get[Test]("test_key")
InfluxDB
时序数据库InfluxDB的封装(基于org.influxdb
客户端封装)
- Influx-SQL执行
- InfluxDB的批量提交封装,在大数据量提交下比较有用
Module引入
<dependency>
<groupId>cn.night</groupId>
<artifactId>nel.framework.boots.influx</artifactId>
<version>${boots.version}</version>
</dependency>
可选配置
配置 | 说明 |
---|---|
nel.influx_db.url | 必填 |
nel.influx_db.user | 必填 |
nel.influx_db.password | 选填 |
nel.influx_db.db_name | 必填 |
nel.influx_db.retention_policy | 数据保留策略,默认default |
nel.influx_db.consistency_level | 事务级别:all,any,one,quorum,默认all |
部分使用例子
//删除测试库
BootsDefault.influxDB.execute("drop measurement \"cpu\"")
//批量写入数据
BootsDefault.influxDB.batch(Array(
Point.measurement("cpu")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.addField("idle", 90L)
.addField("user", 9L)
.addField("system", 1L)
.build()
))
//查询
BootsDefault.influxDB.execute("SELECT idle FROM cpu").size > 0
hadoop
HDFS
Module引入
<dependency>
<groupId>cn.night</groupId>
<artifactId>nel.framework.boots.hdfs</artifactId>
<version>${boots.version}</version>
</dependency>
可选配置
配置 | 说明 |
---|---|
nel.fs.hdfs.url | 必须配置 |
nel.fs.hdfs.user | HDFS用户 |
部分使用例子
// 为Boots加载HDFS向插件
import cn.night.nel.framework.boots.modules.fs.BootsHDFS._
val path = "hdfs://XXX:9000/test/file_suit/fileSuitOutput.txt"
// 追加写入
BootsDefault.hdfs
.config.path(path).saveMod(SaveMode.Append).build()
.write(fileDataList)
// 覆盖写入
BootsDefault.hdfs
.config.path(path).saveMod(SaveMode.Overwrite).build()
.write(fileDataList)
// 读取
val list:List[String] = BootsDefault.hdfs
.config.path(path).build()
.read()
Zookeeper
基于org.apache.curator
的封装
Module引入
Boots.zk 封装依赖 org.apache.curator
<dependency>
<groupId>cn.night</groupId>
<artifactId>nel.framework.boots.zk</artifactId>
<version>${boots.version}</version>
</dependency>
可选配置
配置 | 说明 |
---|---|
zk.connectionString | 必填 |
zk.delete_auto_guaranteed | 删除确认,开启后如果删除断开将会在后台线程不断重试删除节点操作.默认true.使用ZK锁务必开启此选项,因为断开后临时节点未删除成功,重连后临时节点也不会自动删除,可能会对锁造成严重破坏 |
zk.data_charset | 默认utf-8 |
zk.connection_timeout | 连接超时时间(秒),默认15 |
zk.session_timeout | 会话超时时间(秒),默认30 |
zk.connection_retry_count | 连接断开重试此时,默认3 |
zk.connection_retry_interval | 连接断开重试间隔(秒),默认1.(指数提升1,2,4) |
部分使用例子
val path = "/test/node"
BootsDefault.zk.
.config.build
.createWithCreatingParents(CreateMode.EPHEMERAL,path,"123")
BootsDefault.zk.
.config.build
.setData(path,"456")
BootsDefault.zk.
.config.build
.getData(path)
BootsDefault.zk.
.config.build
.getChildren("/test")
BootsDefault.zk.
.config.build
.delete(path)
MQ
Kafka
基于org.apache.kafka
的封装
- 发送消息封装(只关注业务部分而屏蔽发送实现细节)
内置KafkaProducer单例 - 消费线程池封装(只关注业务部分而屏蔽消费实现细节)
自动Offset管理 基于消费实现方法是否成功返回true
线程池消费封装 基于Kafka的特点设计为1+M的模式
自动Kafka分区感知&分区有序 自动根据Kafka分区平衡实际的消息线程数(削峰填谷)
自动消费限速
Module引入
<dependency>
<groupId>cn.night</groupId>
<artifactId>nel.framework.boots.kafka</artifactId>
<version>${boots.version}</version>
</dependency>
部分使用例子
配置 | 说明 |
---|---|
nel.boots.kafka.url | 必填 |
nel.boots.kafka.send.retry.count | 发送失败重试次数,默认3次 |
nel.boots.kafka.consumer.topics | 消费topic(竖线分割),必填 |
nel.boots.kafka.consumer.group | 消费组,必填 |
nel.boots.kafka.consumer.poll.max | 消费最大拉取条数,默认200 |
nel.boots.kafka.consumer.partition.max | 分区内最大消费条数,默认不限 |
//发送
val sender = BootsDefault.kafka.sender
.config.kafkaUrl("192.168.18.151:9092").build()
.sendString(
topic = "kafka.consumer.test",
message = "M" + idx,
key = Some("kafka.consumer.test"))
//接收
BootsDefault.kafka.sequenceConsumer()
.config
.kafkaUrl("xxx:9092")
.kafkaConsumerTopics("your_topic")
.kafkaConsumerGroup("your_group")
.build()
.process(messages => {
//分区内消息已保持有序
messages.foreach(message => println(message))
//返回true将自动提交分区内Offset
true
}).start()
}
RabbitMQ
基于com.rabbitmq
封装
- 发送消息封装(只关注业务部分而屏蔽发送实现细节)
基于RabbitMQ特点设计为短连接发送发送消息 - 消费线程池封装(只关注业务部分而屏蔽消费实现细节)
自动ACK 基于消费实现方法是否成功返回true而ACK
线程池消费封装 基于RabbitMQ的特点设计为M+N模式
M个连接线程,负责拉取消息,处理满时阻塞
N个处理线程,负责处理消息,没有消息时阻塞
Module引入
<dependency>
<groupId>cn.night</groupId>
<artifactId>nel.framework.boots.rabbitmq</artifactId>
<version>${boots.version}</version>
</dependency>
可选配置
配置 | 说明 |
---|---|
nel.rabbitmq.host | 必填 |
nel.rabbitmq.queue.exchange | 默认amq.direct |
nel.rabbitmq.queue.auto_create | 是否自动创建队列,默认false |
nel.rabbitmq.send.retry.count | 发送失败重试次数,默认3次 |
nel.rabbitmq.send.message.persist | 是否要求持久化消息,默认false |
nel.rabbitmq.consume.queue.name | 消费队列名 |
nel.rabbitmq.consume.receiver.auto.ack | 消息是否自动ACK,默认false(根据结果ACK) |
nel.rabbitmq.consume.receiver.thread.count | 消息拉取线程数,默认1 |
nel.rabbitmq.consume.processor.thread.count | 消息处理线程数,默认10 |
部分使用例子
// 为Boots加载RabbitMQ插件
import BootsRabbitMQ._
//消息发送
BootsDefault.rabbitMQ.sender()
.config.sendRetryCount(1).build()
.sendMessage("queueName", "123")
//消息消费
BootsDefault.rabbitMQ.consumer()
.config.queueName("queueName").processorThreadCountMax(2).build()
.process(message => {
println(message)
// 返回true表示ACK成功
true
})
.start()
Web
nel.boots的Web框架是架构在Spring.Boots 2.0 之上的
Rest
Module
<dependency>
<groupId>cn.night</groupId>
<artifactId>nel.framework.boots.web</artifactId>
<version>${boots.version}</version>
</dependency>
部分使用例子
@RestController
@RequestMapping(path = Array("/api/monitoring/project"))
class ProjectController extends RestApi {
import web._
// Action.Get
@GetMapping(Array("/application/list"))
def application_list(request: HttpServletRequest) = sso(request) { implicit sessionContext =>
......
}
// Action.Post
@PostMapping(Array("/application/set"))
def application_set(request: HttpServletRequest) = sso(request) { implicit sessionContext =>
......
}
}
前端
Angular
这是一个nel的前端向框架
提供基于Angular1.5.8的样式皮肤,布局,ajax,常用如Table,Window,Panel,tips等等功能
<dependency>
<groupId>cn.night</groupId>
<artifactId>nel.framework.template.angular</artifactId>
<version>${boots.version}</version>
</dependency>
Spark
自定义数据源
Spark-HBase
读
val df = spark.read
.format("nel.framework.boots.spark.client.source.hbase")
.option("spark.cus.hbase.url","192.168.198.151")
.option("spark.cus.hbase.table","dbo:test")
.option("spark.cus.hbase.schema","c1 int,c2 long,c3 string") // row_key将被自动添加
.option("spark.cus.hbase.start_key","00000001")
.option("spark.cus.hbase.end_key","10000000")
.load()
写
df.write
.format("nel.framework.boots.spark.client.source.hbase")
.option("spark.cus.hbase.url","192.168.198.151")
.option("spark.cus.hbase.table","dbo:test")
.option("spark.cus.hbase.tmp_path","hdfs://192.168.198.151:9000/xx/tmp/hbase_xxx")//MR数据落地Tmp地址
//.option("spark.cus.hbase.row_key","row_key") // 默认df中row_key做为row_key,该列必须存在且有值
.save()
常规使用
implicit val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WCApp")
// SparkSQL
BootsDefault.sparkClient
// 关闭SparkServerMetric传输
.config.isEnableSparkServer(false).build()
.sql(spark => {
val lines = spark.sparkContext.textFile("D:\\data\\test\\wc.txt")
lines.flatMap(_.split(",")).map((_, 1))
.reduceByKey(_ + _)
.collect()
.foreach(println(_))
});
// SparkStreaming
BootsDefault.sparkClient
.config.isEnableSparkServer(false).build()
.streaming(Duration(5*1000)){ ssc =>
val lines = ssc.socketTextStream("xxx",9999)
//...
}
Spark Server Metric Collect
单次执行过程
执行过程时序日志
执行过程明细
执行存储信息
Stage明细