在一个akka-cluster环境里,从数据调用的角度上,JDBC数据库与集群中其它节点是脱离的。这是因为JDBC数据库不是分布式的,不具备节点位置透明化特性。所以,JDBC数据库服务器必须通过服务方式来向外提供数据操。在这种场景里服务端是JDBC服务,其它节点,包括其它的JDBC数据库节点都是这个JDBC服务的调用客户端。因为我们已经明确选择了在akka-cluster集群环境里实施gRPC服务模式,通过akka-stream的流控制方式实现数据库操作的程序控制,所以在本次讨论里我们将示范说明gRPC-JDBC-Streaming的具体实现和使用方式。

在上次的讨论里我们已经示范了最简单的JDBC-Streaming Unary request/response模式:从客户端向JDBC-Service发送一个JDBCQuery、JDBC服务端运行JDBCQuery后向客户端返回一个数据流DataRows。jdbc.proto文件里用IDL定义的数据和服务类型如下:

message JDBCDataRow {string year = 1;string state = 2;string county = 3;string value = 4;
}message JDBCQuery {string dbName = 1;string statement = 2;bytes parameters = 3;google.protobuf.Int32Value fetchSize= 4;google.protobuf.BoolValue autoCommit = 5;google.protobuf.Int32Value queryTimeout = 6;
}service JDBCServices {rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
}

以上数据类型JDBCDataRow和JDBCQuery分别对应JDBC-Streaming工具的流元素结构和JDBCQueryContext,如下:

  val toRow = (rs: WrappedResultSet) => JDBCDataRow(year = rs.string("REPORTYEAR"),state = rs.string("STATENAME"),county = rs.string("COUNTYNAME"),value = rs.string("VALUE"))val ctx = JDBCQueryContext[JDBCDataRow](dbName = Symbol(q.dbName),statement = q.statement,parameters = params,fetchSize = q.fetchSize.getOrElse(100),autoCommit = q.autoCommit.getOrElse(false),queryTimeout = q.queryTimeout)jdbcAkkaStream(ctx, toRow)

用scalaPB编译后自动产生服务端和客户端框架代码(boilerplate-code)。我们需要实现具体的JDBC服务:

class JDBCStreamingServices(implicit ec: ExecutionContextExecutor) extends JdbcGrpcAkkaStream.JDBCServices {val logger = Logger.getLogger(classOf[JDBCStreamingServices].getName)val toRow = (rs: WrappedResultSet) => JDBCDataRow(year = rs.string("REPORTYEAR"),state = rs.string("STATENAME"),county = rs.string("COUNTYNAME"),value = rs.string("VALUE"))override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = {logger.info("**** runQuery called on service side ***")Flow[JDBCQuery].flatMapConcat { q =>//unpack JDBCQuery and construct the contextval params: Seq[Any] = unmarshal[Seq[Any]](q.parameters)logger.info(s"**** query parameters: ${params} ****")val ctx = JDBCQueryContext[JDBCDataRow](dbName = Symbol(q.dbName),statement = q.statement,parameters = params,fetchSize = q.fetchSize.getOrElse(100),autoCommit = q.autoCommit.getOrElse(false),queryTimeout = q.queryTimeout)jdbcAkkaStream(ctx, toRow)}}
}

下面是客户端调用服务示范:

  val query = JDBCQuery (dbName = "h2",statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",parameters = marshal(Seq("Arizona", 5)))def queryRows: Source[JDBCDataRow,NotUsed] = {logger.info(s"running queryRows ...")Source.single(query).via(stub.runQuery)}

这个程序的运行方式如下:

object QueryRows extends App {implicit val system = ActorSystem("QueryRows")implicit val mat = ActorMaterializer.create(system)val client = new JDBCStreamClient("localhost", 50051)client.queryRows.runForeach(println)scala.io.StdIn.readLine()mat.shutdown()system.terminate()
}

那么如果从客户端发出一串的JDBCQuery又如何呢?这也是所谓的BiDi-Streaming模式,在jdbc.proto的服务描述如下:

service JDBCServices {rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
}

我们看到batQuery的入参是一个stream。自动产生的服务函数batQuery款式是这样的:

  override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = { ... }override def batQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = runQuery

runQuery和batQuery的函数款式是一样的。这就说明服务端提供的服务模式是一样的。在我们这个例子里它们都是对每个收到的JDBCQuery发还相关的数据流。实际上这两项服务的区别在客户方。下面是scalaPB产生的源代码:

   override def runQuery: Flow[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow, NotUsed] =Flow.fromGraph(new GrpcGraphStage[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow]({ outputObserver =>new StreamObserver[grpc.jdbc.services.JDBCQuery] {override def onError(t: Throwable): Unit = ()override def onCompleted(): Unit = ()override def onNext(request: grpc.jdbc.services.JDBCQuery): Unit =ClientCalls.asyncServerStreamingCall(channel.newCall(METHOD_RUN_QUERY, options),request,outputObserver)}}))...override def batQuery: Flow[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow, NotUsed] =Flow.fromGraph(new GrpcGraphStage[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow](outputObserver =>ClientCalls.asyncBidiStreamingCall(channel.newCall(METHOD_BAT_QUERY, options),outputObserver)))

所以在客户端我们调用batQuery:

  def batQueryRows: Source[JDBCDataRow,NotUsed] = {logger.info(s"running batQueryRows ...")Source.fromIterator(() => List(query,query2,query3).toIterator).via(stub.batQuery)}

JDBC操作除Query之外还应该具备数据更新部分,包括Schema DDL和database-updates。JDBC-update是通过JDBCContext来传递更新要求的:

case class JDBCContext(dbName: Symbol,statements: Seq[String] = Nil,parameters: Seq[Seq[Any]] = Nil,fetchSize: Int = 100,queryTimeout: Option[Int] = None,queryTags: Seq[String] = Nil,sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,batch: Boolean = false,returnGeneratedKey: Seq[Option[Any]] = Nil,// no return: None, return by index: Some(1), by name: Some("id")preAction: Option[PreparedStatement => Unit] = None,postAction: Option[PreparedStatement => Unit] = None) {... }

这个class对应的protobuf message定义如下:

message JDBCResult {bytes result = 1;
}message JDBCUpdate {string dbName = 1;repeated string statements = 2;bytes parameters = 3;google.protobuf.Int32Value fetchSize= 4;google.protobuf.Int32Value queryTimeout = 5;int32 sqlType = 6;google.protobuf.Int32Value batch = 7;bytes returnGeneratedKey = 8;
}service JDBCServices {rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}rpc runDDL(JDBCUpdate) returns (JDBCResult) {}
}

服务函数runDDL返回消息类型JDBCResult: 包嵌一个Seq[Any]类型的返回值。下面是JDBCContext的protobuf message打包、还原使用方法示范,在服务端把JDBCUpdate拆解构建JDBCContext后调用jdbcExecuteDDL:

 override def runDDL: Flow[JDBCUpdate, JDBCResult, NotUsed] = {logger.info("**** runDDL called on service side ***")Flow[JDBCUpdate].flatMapConcat { context =>//unpack JDBCUpdate and construct the contextval ctx = JDBCContext(dbName = Symbol(context.dbName),statements = context.statements,sqlType = JDBCContext.SQL_EXEDDL,queryTimeout = context.queryTimeout)logger.info(s"**** JDBCContext => ${ctx} ***")Source.fromFuture(jdbcExecuteDDL(ctx)).map { r => JDBCResult(marshal(r)) }}}

jdbcExecuteDDL返回Future[String],如下:

  def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = {if (ctx.sqlType != SQL_EXEDDL) {Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))}else {Future {NamedDB(ctx.dbName) localTx { implicit session =>ctx.statements.foreach { stm =>val ddl = new SQLExecution(statement = stm, parameters = Nil)(before = WrappedResultSet => {})(after = WrappedResultSet => {})ddl.apply()}"SQL_EXEDDL executed succesfully."}}}}

我们可以用Source.fromFuture(jdbcExecuteDDL(cox))来构建一个akka-stream Source。 在客户端构建一个JDBCUpdate结构传给服务端进行运算:

  val dropSQL: String ="""drop table members"""val createSQL: String ="""create table members (id serial not null primary key,name varchar(30) not null,description varchar(1000),birthday date,created_at timestamp not null,picture blob)"""val ctx = JDBCUpdate (dbName = "h2",sqlType = JDBCContext.SQL_EXEDDL,statements = Seq(dropSQL,createSQL))def createTbl: Source[JDBCResult,NotUsed] = {logger.info(s"running createTbl ...")Source.single(ctx).via(stub.runDDL)}

注意:statements = Seq(dropSQL,createSQL)包含了两个独立的SQL运算。

下面我们示范一下从客户端传送一个数据流(stream MemberRow),由服务端插入数据库操作。DDL数据类型和服务函数定义如下:

message JDBCDate {int32 yyyy = 1;int32 mm   = 2;int32 dd   = 3;
}message JDBCTime {int32 hh   = 1;int32 mm   = 2;int32 ss   = 3;int32 nnn  = 4;
}message JDBCDateTime {JDBCDate date = 1;JDBCTime time = 2;
}
message MemberRow {string name = 1;JDBCDate birthday = 2;string description = 3;JDBCDateTime created_at = 4;bytes picture = 5;
}service JDBCServices {rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}rpc runDDL(JDBCUpdate) returns (JDBCResult) {}rpc insertRows(stream MemberRow) returns(JDBCResult) {}
}

insertRows服务函数的实现如下:

 override def insertRows: Flow[MemberRow, JDBCResult, NotUsed] = {logger.info("**** insertRows called on service side ***")val insertSQL = """insert into members(name,birthday,description,created_at) values (?, ?, ?, ?)"""Flow[MemberRow].flatMapConcat { row =>val ctx = JDBCContext('h2).setUpdateCommand(true,insertSQL,row.name,jdbcSetDate(row.birthday.get.yyyy,row.birthday.get.mm,row.birthday.get.dd),row.description,jdbcSetNow)logger.info(s"**** JDBCContext => ${ctx} ***")Source.fromFuture(jdbcTxUpdates[Vector](ctx)).map { r => JDBCResult(marshal(r)) }}}

同样,这个jdbcTxUpdates返回结果是Future类型。具体实现在附件的JDBCEngine.scala中。

客户端构建一个MemberRow流,然后经过stub.insertRows发送给服务端:

  val p1 = MemberRow( "Peter Chan",Some(JDBCDate(1967,5,17)),"new member1",None,_root_.com.google.protobuf.ByteString.EMPTY)val p2 = MemberRow( "Alanda Wong",Some(JDBCDate(1980,11,10)),"new member2",None,_root_.com.google.protobuf.ByteString.EMPTY)val p3 = MemberRow( "Kate Zhang",Some(JDBCDate(1969,8,13)),"new member3",None,_root_.com.google.protobuf.ByteString.EMPTY)val p4 = MemberRow( "Tiger Chan",Some(JDBCDate(1962,5,1)),"new member4",None,_root_.com.google.protobuf.ByteString.EMPTY)def insertRows: Source[JDBCResult,NotUsed] = {logger.info(s"running insertRows ...")Source.fromIterator(() => List(p1,p2,p3,p4).toIterator).via(stub.insertRows)}

最后,我们再示范jdbcBatchUpdate函数的使用。我们从服务端读取MemberRow再传回服务端进行更新操作。DDL如下:

message MemberRow {string name = 1;JDBCDate birthday = 2;string description = 3;JDBCDateTime created_at = 4;bytes picture = 5;
}service JDBCServices {rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}rpc runDDL(JDBCUpdate) returns (JDBCResult) {}rpc insertRows(stream MemberRow) returns(JDBCResult) {}rpc updateRows(stream MemberRow) returns(JDBCResult) {}rpc getMembers(JDBCQuery) returns (stream MemberRow) {}
}

服务端函数定义如下:

 val toMemberRow = (rs: WrappedResultSet) => MemberRow(name = rs.string("name"),description = rs.string("description"),birthday = None,createdAt = None,picture = _root_.com.google.protobuf.ByteString.EMPTY)override def getMembers: Flow[JDBCQuery, MemberRow, NotUsed] = {logger.info("**** getMembers called on service side ***")Flow[JDBCQuery].flatMapConcat { q =>//unpack JDBCQuery and construct the contextvar params: Seq[Any] =  Nilif (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)params = unmarshal[Seq[Any]](q.parameters)logger.info(s"**** query parameters: ${params} ****")val ctx = JDBCQueryContext[MemberRow](dbName = Symbol(q.dbName),statement = q.statement,parameters = params,fetchSize = q.fetchSize.getOrElse(100),autoCommit = q.autoCommit.getOrElse(false),queryTimeout = q.queryTimeout)jdbcAkkaStream(ctx, toMemberRow)}}
override def updateRows: Flow[MemberRow, JDBCResult, NotUsed] = {logger.info("**** updateRows called on service side ***")val updateSQL = "update members set description = ?, created_at = ? where name = ?"Flow[MemberRow].flatMapConcat { row =>val ctx = JDBCContext('h2).setBatchCommand(updateSQL).appendBatchParameters(row.name + " updated.",jdbcSetNow,row.name).setBatchReturnGeneratedKeyOption(true)logger.info(s"**** JDBCContext => ${ctx} ***")Source.fromFuture(jdbcBatchUpdate[Vector](ctx)).map { r => JDBCResult(marshal(r)) }}}

jdbcBatchUpdate函数的源代码在附件JDBCEngine.scala中。

客户端代码如下:

  val queryMember = JDBCQuery (dbName = "h2",statement = "select * from members")def updateRows: Source[JDBCResult,NotUsed] = {logger.info(s"running updateRows ...")Source.single(queryMember).via(stub.getMembers).via(stub.updateRows)}

下面的例子示范了如何利用JDBCActionStream来批量处理数据。服务端的源代码如下:

  val params: JDBCDataRow => Seq[Any] = row => {Seq((row.value.toInt * 2), row.state, row.county, row.year) }val sql = "update AQMRPT set total = ? where statename = ? and countyname = ? and reportyear = ?"val jdbcActionStream = JDBCActionStream('h2,sql ,params).setParallelism(4).setProcessOrder(false)val jdbcActionFlow = jdbcActionStream.performOnRowoverride def updateBat: Flow[JDBCDataRow, JDBCDataRow, NotUsed] = {logger.info("**** updateBat called on service side ***")Flow[JDBCDataRow].via(jdbcActionFlow)}

jdbcActionFlow是一个Flow[R,R,_],所以我们直接用via把它连接到上一个Flow。下面是JDBCActionStream的定义代码:

  case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,statement: String, prepareParams: R => Seq[Any]) {jas =>def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)private def perform(r: R)(implicit ec: ExecutionContextExecutor) = {import scala.concurrent._val params = prepareParams(r)Future {NamedDB(dbName) autoCommit { session =>session.execute(statement, params: _*)}r}}def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] =if (processInOrder)Flow[R].mapAsync(parallelism)(perform)elseFlow[R].mapAsyncUnordered(parallelism)(perform)}object JDBCActionStream {def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)}

函数performOnRow是个passthrough处理过程,使用了mapAsync来支持多线程运算。客户端调用方式如下:

  def updateBatches: Source[JDBCDataRow,NotUsed] = {logger.info(s"running updateBatches ...")Source.fromIterator(() => List(query,query2,query3).toIterator).via(stub.batQuery).via(stub.updateBat)}

下面是本次示范的完整源代码:

jdbc.proto

syntax = "proto3";import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";package grpc.jdbc.services;option (scalapb.options) = {// use a custom Scala package name// package_name: "io.ontherocks.introgrpc.demo"// don't append file name to packageflat_package: true// generate one Scala file for all messages (services still get their own file)single_file: true// add imports to generated file// useful when extending traits or using custom types// import: "io.ontherocks.hellogrpc.RockingMessage"// code to put at the top of generated file// works only with `single_file: true`//preamble: "sealed trait SomeSealedTrait"
};/** Demoes various customization options provided by ScalaPBs.*/message JDBCDataRow {string year = 1;string state = 2;string county = 3;string value = 4;
}message JDBCQuery {string dbName = 1;string statement = 2;bytes parameters = 3;google.protobuf.Int32Value fetchSize= 4;google.protobuf.BoolValue autoCommit = 5;google.protobuf.Int32Value queryTimeout = 6;
}message JDBCResult {bytes result = 1;
}message JDBCUpdate {string dbName = 1;repeated string statements = 2;bytes parameters = 3;google.protobuf.Int32Value fetchSize= 4;google.protobuf.Int32Value queryTimeout = 5;int32 sqlType = 6;google.protobuf.Int32Value batch = 7;bytes returnGeneratedKey = 8;
}message JDBCDate {int32 yyyy = 1;int32 mm   = 2;int32 dd   = 3;
}message JDBCTime {int32 hh   = 1;int32 mm   = 2;int32 ss   = 3;int32 nnn  = 4;
}message JDBCDateTime {JDBCDate date = 1;JDBCTime time = 2;
}message MemberRow {string name = 1;JDBCDate birthday = 2;string description = 3;JDBCDateTime created_at = 4;bytes picture = 5;
}service JDBCServices {rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}rpc runDDL(JDBCUpdate) returns (JDBCResult) {}rpc insertRows(stream MemberRow) returns(JDBCResult) {}rpc updateRows(stream MemberRow) returns(JDBCResult) {}rpc getMembers(JDBCQuery) returns (stream MemberRow) {}
}

JDBCEngine.scala

package sdp.jdbc.engine
import java.sql.PreparedStatement
import scala.collection.generic.CanBuildFrom
import akka.stream.scaladsl._
import scalikejdbc._
import scalikejdbc.streams._
import akka.NotUsed
import akka.stream._
import java.time._
import scala.concurrent.duration._
import scala.concurrent._
import sdp.jdbc.FileStreaming._import scalikejdbc.TxBoundary.Try._import scala.concurrent.ExecutionContextExecutor
import java.io.InputStreamobject JDBCContext {type SQLTYPE = Intval SQL_EXEDDL= 1val SQL_UPDATE = 2val RETURN_GENERATED_KEYVALUE = trueval RETURN_UPDATED_COUNT = false}case class JDBCQueryContext[M](dbName: Symbol,statement: String,parameters: Seq[Any] = Nil,fetchSize: Int = 100,autoCommit: Boolean = false,queryTimeout: Option[Int] = None)case class JDBCContext(dbName: Symbol,statements: Seq[String] = Nil,parameters: Seq[Seq[Any]] = Nil,fetchSize: Int = 100,queryTimeout: Option[Int] = None,queryTags: Seq[String] = Nil,sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,batch: Boolean = false,returnGeneratedKey: Seq[Option[Any]] = Nil,// no return: None, return by index: Some(1), by name: Some("id")preAction: Option[PreparedStatement => Unit] = None,postAction: Option[PreparedStatement => Unit] = None) {ctx =>//helper functionsdef appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {if (ctx.sqlType == JDBCContext.SQL_UPDATE &&!ctx.batch && ctx.statements.size == 1)ctx.copy(preAction = action)elsethrow new IllegalStateException("JDBCContex setting error: preAction not supported!")}def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {if (ctx.sqlType == JDBCContext.SQL_UPDATE &&!ctx.batch && ctx.statements.size == 1)ctx.copy(postAction = action)elsethrow new IllegalStateException("JDBCContex setting error: preAction not supported!")}def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {ctx.copy(statements = ctx.statements ++ Seq(_statement),parameters = ctx.parameters ++ Seq(Seq(_parameters)))} elsethrow new IllegalStateException("JDBCContex setting error: option not supported!")}def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {ctx.copy(statements = ctx.statements ++ Seq(_statement),parameters = ctx.parameters ++ Seq(_parameters),returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)))} elsethrow new IllegalStateException("JDBCContex setting error: option not supported!")}def appendBatchParameters(_parameters: Any*): JDBCContext = {if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")var matchParams = trueif (ctx.parameters != Nil)if (ctx.parameters.head.size != _parameters.size)matchParams = falseif (matchParams) {ctx.copy(parameters = ctx.parameters ++ Seq(_parameters))} elsethrow new IllegalStateException("JDBCContex setting error: batch command parameters not match!")}def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")ctx.copy(returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil)}def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {ctx.copy(statements = Seq(_statement),parameters = Seq(_parameters),sqlType = JDBCContext.SQL_EXEDDL,batch = false)}def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {ctx.copy(statements = Seq(_statement),parameters = Seq(_parameters),returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),sqlType = JDBCContext.SQL_UPDATE,batch = false)}def setBatchCommand(_statement: String): JDBCContext = {ctx.copy (statements = Seq(_statement),sqlType = JDBCContext.SQL_UPDATE,batch = true)}}object JDBCEngine {import JDBCContext._type JDBCDate = LocalDatetype JDBCDateTime = LocalDateTimetype JDBCTime = LocalTimedef jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd)def jdbcSetTime(hh: Int, mm: Int, ss: Int, nn: Int) = LocalTime.of(hh,mm,ss,nn)def jdbcSetDateTime(date: JDBCDate, time: JDBCTime) =  LocalDateTime.of(date,time)def jdbcSetNow = LocalDateTime.now()type JDBCBlob = InputStreamdef fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(implicit mat: Materializer) = FileToInputStream(fileName,timeOut)def jdbcBlobToFile(blob: JDBCBlob, fileName: String)(implicit mat: Materializer) =  InputStreamToFile(blob,fileName)private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>throw new IllegalStateException(message)}def jdbcAkkaStream[A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A)(implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {val publisher: DatabasePublisher[A] = NamedDB(ctx.dbName) readOnlyStream {val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))ctx.queryTimeout.foreach(rawSql.queryTimeout(_))val sql: SQL[A, HasExtractor] = rawSql.map(extractor)sql.iterator.withDBSessionForceAdjuster(session => {session.connection.setAutoCommit(ctx.autoCommit)session.fetchSize(ctx.fetchSize)})}Source.fromPublisher[A](publisher)}def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))ctx.queryTimeout.foreach(rawSql.queryTimeout(_))rawSql.fetchSize(ctx.fetchSize)implicit val session = NamedAutoSession(ctx.dbName)val sql: SQL[A, HasExtractor] = rawSql.map(extractor)sql.collection.apply[C]()}def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = {if (ctx.sqlType != SQL_EXEDDL) {Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))}else {Future {NamedDB(ctx.dbName) localTx { implicit session =>ctx.statements.foreach { stm =>val ddl = new SQLExecution(statement = stm, parameters = Nil)(before = WrappedResultSet => {})(after = WrappedResultSet => {})ddl.apply()}"SQL_EXEDDL executed succesfully."}}}}def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(implicit ec: ExecutionContextExecutor,cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {if (ctx.statements == Nil)Future.failed ( new IllegalStateException("JDBCContex setting error: statements empty!"))if (ctx.sqlType != SQL_UPDATE) {Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))}else {if (ctx.batch) {if (noReturnKey(ctx)) {val usql = SQL(ctx.statements.head).tags(ctx.queryTags: _*).batch(ctx.parameters: _*)Future {NamedDB(ctx.dbName) localTx { implicit session =>ctx.queryTimeout.foreach(session.queryTimeout(_))usql.apply[Seq]()Seq.empty[Long].to[C]}}} else {val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)Future {NamedDB(ctx.dbName) localTx { implicit session =>ctx.queryTimeout.foreach(session.queryTimeout(_))usql.apply[C]()}}}} else {Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = true !"))}}}private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(implicit ec: ExecutionContextExecutor,cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {val Some(key) :: xs = ctx.returnGeneratedKeyval params: Seq[Any] = ctx.parameters match {case Nil => Nilcase p@_ => p.head}val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)Future {NamedDB(ctx.dbName) localTx { implicit session =>session.fetchSize(ctx.fetchSize)ctx.queryTimeout.foreach(session.queryTimeout(_))val result = usql.apply()Seq(result).to[C]}}}private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(implicit ec: ExecutionContextExecutor,cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {val params: Seq[Any] = ctx.parameters match {case Nil => Nilcase p@_ => p.head}val before = ctx.preAction match {case None => pstm: PreparedStatement => {}case Some(f) => f}val after = ctx.postAction match {case None => pstm: PreparedStatement => {}case Some(f) => f}val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)Future {NamedDB(ctx.dbName) localTx {implicit session =>session.fetchSize(ctx.fetchSize)ctx.queryTimeout.foreach(session.queryTimeout(_))val result = usql.apply()Seq(result.toLong).to[C]}}}private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(implicit ec: ExecutionContextExecutor,cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {if (noReturnKey(ctx))singleTxUpdateNoReturnKey(ctx)elsesingleTxUpdateWithReturnKey(ctx)}private def noReturnKey(ctx: JDBCContext): Boolean = {if (ctx.returnGeneratedKey != Nil) {val k :: xs = ctx.returnGeneratedKeyk match {case None => truecase Some(k) => false}} else true}def noActon: PreparedStatement=>Unit = pstm => {}def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(implicit ec: ExecutionContextExecutor,cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {Future {NamedDB(ctx.dbName) localTx { implicit session =>session.fetchSize(ctx.fetchSize)ctx.queryTimeout.foreach(session.queryTimeout(_))val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {case Nil => Seq.fill(ctx.statements.size)(None)case k@_ => k}val sqlcmd = ctx.statements zip ctx.parameters zip keysval results = sqlcmd.map { case ((stm, param), key) =>key match {case None =>new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLongcase Some(k) =>new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong}}results.to[C]}}}def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(implicit ec: ExecutionContextExecutor,cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {if (ctx.statements == Nil)Future.failed( new IllegalStateException("JDBCContex setting error: statements empty!"))if (ctx.sqlType != SQL_UPDATE) {Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))}else {if (!ctx.batch) {if (ctx.statements.size == 1)singleTxUpdate(ctx)elsemultiTxUpdates(ctx)} elseFuture.failed(new IllegalStateException("JDBCContex setting error: must set batch = false !"))}}case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,statement: String, prepareParams: R => Seq[Any]) {jas =>def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)private def perform(r: R)(implicit ec: ExecutionContextExecutor) = {import scala.concurrent._val params = prepareParams(r)Future {NamedDB(dbName) autoCommit { session =>session.execute(statement, params: _*)}r}// Future.successful(r)}def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] =if (processInOrder)Flow[R].mapAsync(parallelism)(perform)elseFlow[R].mapAsyncUnordered(parallelism)(perform)}object JDBCActionStream {def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)}
}

JDBCService.scala

package demo.grpc.jdbc.servicesimport akka.NotUsed
import akka.stream.scaladsl.{Source,Flow}
import grpc.jdbc.services._
import java.util.logging.Logger
import protobuf.bytes.Converter._
import sdp.jdbc.engine._
import JDBCEngine._
import scalikejdbc.WrappedResultSet
import scala.concurrent.ExecutionContextExecutorclass JDBCStreamingServices(implicit ec: ExecutionContextExecutor) extends JdbcGrpcAkkaStream.JDBCServices {val logger = Logger.getLogger(classOf[JDBCStreamingServices].getName)val toRow = (rs: WrappedResultSet) => JDBCDataRow(year = rs.string("REPORTYEAR"),state = rs.string("STATENAME"),county = rs.string("COUNTYNAME"),value = rs.string("VALUE"))override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = {logger.info("**** runQuery called on service side ***")Flow[JDBCQuery].flatMapConcat { q =>//unpack JDBCQuery and construct the contextvar params: Seq[Any] =  Nilif (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)params = unmarshal[Seq[Any]](q.parameters)logger.info(s"**** query parameters: ${params} ****")val ctx = JDBCQueryContext[JDBCDataRow](dbName = Symbol(q.dbName),statement = q.statement,parameters = params,fetchSize = q.fetchSize.getOrElse(100),autoCommit = q.autoCommit.getOrElse(false),queryTimeout = q.queryTimeout)jdbcAkkaStream(ctx, toRow)}}override def batQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = runQueryoverride def runDDL: Flow[JDBCUpdate, JDBCResult, NotUsed] = {logger.info("**** runDDL called on service side ***")Flow[JDBCUpdate].flatMapConcat { context =>//unpack JDBCUpdate and construct the contextval ctx = JDBCContext(dbName = Symbol(context.dbName),statements = context.statements,sqlType = JDBCContext.SQL_EXEDDL,queryTimeout = context.queryTimeout)logger.info(s"**** JDBCContext => ${ctx} ***")Source.fromFuture(jdbcExecuteDDL(ctx)).map { r => JDBCResult(marshal(r)) }}}override def insertRows: Flow[MemberRow, JDBCResult, NotUsed] = {logger.info("**** insertRows called on service side ***")val insertSQL = """insert into members(name,birthday,description,created_at) values (?, ?, ?, ?)"""Flow[MemberRow].flatMapConcat { row =>val ctx = JDBCContext('h2).setUpdateCommand(true,insertSQL,row.name,jdbcSetDate(row.birthday.get.yyyy,row.birthday.get.mm,row.birthday.get.dd),row.description,jdbcSetNow)logger.info(s"**** JDBCContext => ${ctx} ***")Source.fromFuture(jdbcTxUpdates[Vector](ctx)).map { r => JDBCResult(marshal(r)) }}}override def updateRows: Flow[MemberRow, JDBCResult, NotUsed] = {logger.info("**** updateRows called on service side ***")val updateSQL = "update members set description = ?, created_at = ? where name = ?"Flow[MemberRow].flatMapConcat { row =>val ctx = JDBCContext('h2).setBatchCommand(updateSQL).appendBatchParameters(row.name + " updated.",jdbcSetNow,row.name).setBatchReturnGeneratedKeyOption(true)logger.info(s"**** JDBCContext => ${ctx} ***")Source.fromFuture(jdbcBatchUpdate[Vector](ctx)).map { r => JDBCResult(marshal(r)) }}}val toMemberRow = (rs: WrappedResultSet) => MemberRow(name = rs.string("name"),description = rs.string("description"),birthday = None,createdAt = None,picture = _root_.com.google.protobuf.ByteString.EMPTY)override def getMembers: Flow[JDBCQuery, MemberRow, NotUsed] = {logger.info("**** getMembers called on service side ***")Flow[JDBCQuery].flatMapConcat { q =>//unpack JDBCQuery and construct the contextvar params: Seq[Any] =  Nilif (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)params = unmarshal[Seq[Any]](q.parameters)logger.info(s"**** query parameters: ${params} ****")val ctx = JDBCQueryContext[MemberRow](dbName = Symbol(q.dbName),statement = q.statement,parameters = params,fetchSize = q.fetchSize.getOrElse(100),autoCommit = q.autoCommit.getOrElse(false),queryTimeout = q.queryTimeout)jdbcAkkaStream(ctx, toMemberRow)}}
}

JDBCServer.scala

package demo.grpc.jdbc.serverimport java.util.logging.Loggerimport akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import io.grpc.Server
import demo.grpc.jdbc.services._
import io.grpc.ServerBuilder
import grpc.jdbc.services._class gRPCServer(server: Server) {val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName)def start(): Unit = {server.start()logger.info(s"Server started, listening on ${server.getPort}")sys.addShutdownHook {// Use stderr here since the logger may has been reset by its JVM shutdown hook.System.err.println("*** shutting down gRPC server since JVM is shutting down")stop()System.err.println("*** server shut down")}()}def stop(): Unit = {server.shutdown()}/*** Await termination on the main thread since the grpc library uses daemon threads.*/def blockUntilShutdown(): Unit = {server.awaitTermination()}
}object JDBCServer extends App {import sdp.jdbc.config._implicit val system = ActorSystem("JDBCServer")implicit val mat = ActorMaterializer.create(system)implicit val ec = system.dispatcherConfigDBsWithEnv("dev").setup('h2)ConfigDBsWithEnv("dev").loadGlobalSettings()val server = new gRPCServer(ServerBuilder.forPort(50051).addService(JdbcGrpcAkkaStream.bindService(new JDBCStreamingServices)).build())server.start()//  server.blockUntilShutdown()scala.io.StdIn.readLine()ConfigDBsWithEnv("dev").close('h2)mat.shutdown()system.terminate()
}

JDBCClient.scala

package demo.grpc.jdbc.client
import grpc.jdbc.services._
import java.util.logging.Loggerimport protobuf.bytes.Converter._
import akka.stream.scaladsl._
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ThrottleMode}
import io.grpc._
import sdp.jdbc.engine._class JDBCStreamClient(host: String, port: Int) {val logger: Logger = Logger.getLogger(classOf[JDBCStreamClient].getName)val channel = ManagedChannelBuilder.forAddress(host,port).usePlaintext(true).build()val stub = JdbcGrpcAkkaStream.stub(channel)val query = JDBCQuery (dbName = "h2",statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",parameters = marshal(Seq("Arizona", 2)))val query2 = JDBCQuery (dbName = "h2",statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",parameters = marshal(Seq("Colorado", 3)))val query3= JDBCQuery (dbName = "h2",statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",parameters = marshal(Seq("Arkansas", 8)))def queryRows: Source[JDBCDataRow,NotUsed] = {logger.info(s"running queryRows ...")Source.single(query).via(stub.runQuery)}def batQueryRows: Source[JDBCDataRow,NotUsed] = {logger.info(s"running batQueryRows ...")Source.fromIterator(() => List(query,query2,query3).toIterator).via(stub.batQuery)}val dropSQL: String ="""drop table members"""val createSQL: String ="""create table members (id serial not null primary key,name varchar(30) not null,description varchar(1000),birthday date,created_at timestamp not null,picture blob)"""val ctx = JDBCUpdate (dbName = "h2",sqlType = JDBCContext.SQL_EXEDDL,statements = Seq(dropSQL,createSQL))def createTbl: Source[JDBCResult,NotUsed] = {logger.info(s"running createTbl ...")Source.single(ctx).via(stub.runDDL)}val p1 = MemberRow( "Peter Chan",Some(JDBCDate(1967,5,17)),"new member1",None,_root_.com.google.protobuf.ByteString.EMPTY)val p2 = MemberRow( "Alanda Wong",Some(JDBCDate(1980,11,10)),"new member2",None,_root_.com.google.protobuf.ByteString.EMPTY)val p3 = MemberRow( "Kate Zhang",Some(JDBCDate(1969,8,13)),"new member3",None,_root_.com.google.protobuf.ByteString.EMPTY)val p4 = MemberRow( "Tiger Chan",Some(JDBCDate(1962,5,1)),"new member4",None,_root_.com.google.protobuf.ByteString.EMPTY)def insertRows: Source[JDBCResult,NotUsed] = {logger.info(s"running insertRows ...")Source.fromIterator(() => List(p1,p2,p3,p4).toIterator).via(stub.insertRows)}val queryMember = JDBCQuery (dbName = "h2",statement = "select * from members")def updateRows: Source[JDBCResult,NotUsed] = {logger.info(s"running updateRows ...")Source.single(queryMember).via(stub.getMembers).via(stub.updateRows)}def updateBatches: Source[JDBCDataRow,NotUsed] = {logger.info(s"running updateBatches ...")Source.fromIterator(() => List(query,query2,query3).toIterator).via(stub.batQuery).via(stub.updateBat)}}object TestConversion extends App {val orgval: Seq[Option[Any]] = Seq(Some(1),Some("id"),None,Some(2))println(s"original value: ${orgval}")val marval = marshal(orgval)println(s"marshal value: ${marval}")val unmval = unmarshal[Seq[Option[Any]]](marval)println(s"marshal value: ${unmval}")val m1 = MemberRow(name = "Peter")val m2 = m1.update(_.birthday.yyyy := 1989,_.birthday.mm := 10,_.birthday.dd := 3,_.description := "a new member")}object QueryRows extends App {implicit val system = ActorSystem("QueryRows")implicit val mat = ActorMaterializer.create(system)val client = new JDBCStreamClient("localhost", 50051)client.queryRows.runForeach { r => println(r) }scala.io.StdIn.readLine()mat.shutdown()system.terminate()
}object BatQueryRows extends App {implicit val system = ActorSystem("BatQueryRows")implicit val mat = ActorMaterializer.create(system)val client = new JDBCStreamClient("localhost", 50051)client.batQueryRows.runForeach(println)scala.io.StdIn.readLine()mat.shutdown()system.terminate()
}object RunDDL extends App {implicit val system = ActorSystem("RunDDL")implicit val mat = ActorMaterializer.create(system)val client = new JDBCStreamClient("localhost", 50051)client.createTbl.runForeach{r => println(unmarshal[Seq[Any]](r.result))}scala.io.StdIn.readLine()mat.shutdown()system.terminate()}object InsertRows extends App {implicit val system = ActorSystem("InsertRows")implicit val mat = ActorMaterializer.create(system)val client = new JDBCStreamClient("localhost", 50051)client.insertRows.runForeach { r => println(unmarshal[Vector[Long]](r.result)) }scala.io.StdIn.readLine()mat.shutdown()system.terminate()
}object UpdateRows extends App {implicit val system = ActorSystem("UpdateRows")implicit val mat = ActorMaterializer.create(system)val client = new JDBCStreamClient("localhost", 50051)client.updateRows.runForeach{ r => println(unmarshal[Vector[Long]](r.result)) }scala.io.StdIn.readLine()mat.shutdown()system.terminate()
}object BatUpdates extends App {implicit val system = ActorSystem("BatUpdates")implicit val mat = ActorMaterializer.create(system)val client = new JDBCStreamClient("localhost", 50051)client.updateBatches.runForeach(println)scala.io.StdIn.readLine()mat.shutdown()system.terminate()
}

ByteConverter.scala

package protobuf.bytes
import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
import com.google.protobuf.ByteString
object Converter {def marshal(value: Any): ByteString = {val stream: ByteArrayOutputStream = new ByteArrayOutputStream()val oos = new ObjectOutputStream(stream)oos.writeObject(value)oos.close()ByteString.copyFrom(stream.toByteArray())}def unmarshal[A](bytes: ByteString): A = {val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))val value = ois.readObject()ois.close()value.asInstanceOf[A]}}

其它部分的源代码和系统设置可以从上次的讨论稿中获取。

PICE(2):JDBCStreaming - gRPC-JDBC Service相关推荐

  1. PICE(5):MongoDBStreaming - gRPC -MGO Service

    我在前面提到过MongoDB不支持像SQL般字符式的操作指令,所以我们必须对所有的MongoDB操作指令建立protobuf类型才能支持MongoDB指令的序列化.在对上一篇博文里我们把MongoDB ...

  2. PICE(4):MongoDBStreaming - gRPC Protobuf conversion

    前两篇我们介绍了JDBC和Cassandra的gRPC streaming实现.相对MongoDB来说,JDBC和Cassandra支持字符类型的query语句SQL,CQL,所以把query指令转换 ...

  3. PICE(6):集群环境里多异类端点gRPC Streaming - Heterogeneous multi-endpoints gRPC streaming

    gRPC Streaming的操作对象由服务端和客户端组成.在一个包含了多个不同服务的集群环境中可能需要从一个服务里调用另一个服务端提供的服务.这时调用服务端又成为了提供服务端的客户端了(服务消费端) ...

  4. PICE(6):集群环境里多异类端点gRPC Streaming - Heterogeneous multi-endpoints gRPC streaming...

    gRPC Streaming的操作对象由服务端和客户端组成.在一个包含了多个不同服务的集群环境中可能需要从一个服务里调用另一个服务端提供的服务.这时调用服务端又成为了提供服务端的客户端了(服务消费端) ...

  5. PICE(1):Programming In Clustered Environment - 集群环境内编程模式

    首先声明:标题上的所谓编程模式是我个人考虑在集群环境下跨节点(jvm)的流程控制编程模式,纯粹按实际需要构想,没什么理论支持.在5月份的深圳scala meetup上我分享了有关集群环境下的编程模式思 ...

  6. PICE(3):CassandraStreaming - gRPC-CQL Service

    在上一篇博文里我们介绍了通过gRPC实现JDBC数据库的streaming,这篇我们介绍关于cassandra的streaming实现方式.如果我们需要从一个未部署cassandra的节点或终端上读取 ...

  7. FAQ(43): com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL sy

    2018/1/3 spring对Mybatis整合, 看Log: org.springframework.jdbc.BadSqlGrammarException: ### Error querying ...

  8. mysql无法启动(centos7):systemctl status mysqld.service:Can‘t create/write to file

    一.问题描述 centos7启动mysql报错 [root@bigdata01 ~]# systemctl start mysqld Job for mysqld.service failed bec ...

  9. [Spring+SpringMVC+Mybatis]框架学习笔记(四):Spring实现AOP

    上一章:[Spring+SpringMVC+Mybatis]框架学习笔记(三):Spring实现JDBC 下一章:[Spring+SpringMVC+Mybatis]框架学习笔记(五):SpringA ...

最新文章

  1. 电脑如何进入bios模式_电脑如何进入bios关闭软驱
  2. oracle行的唯一标识符,Oracle 10g SELECT 语句
  3. 笑郭网络验证3.8研究笔记(内有视频教程)
  4. Java 对 lang3中Complex类的封装,使之支持BigDecimal
  5. 测试思路系列:《谷歌的软件测试之道》读书笔记
  6. Matcher的group()/group(int group)/groupCount()用法介绍
  7. SmartView函数HypSetActiveConnection使用
  8. R语言:决策树结果可视化
  9. 通过Log4j生成CSV格式日志时自动插入表头处理方法
  10. c语言将一个字符串转置,c语言实现数组的转置
  11. 华为S5700交换机配置 不同vlan 间通信---eNSP
  12. 【Shell】Shell 脚本自动输入密码的三种方式
  13. 墨者学院01 SQL手工注入漏洞测试(MySQL数据库)
  14. 前端获取视频帧率/帧数
  15. 哈佛《幸福课》 第4课 积极的环境能改变人
  16. 停车还能360全方位影像_360°全景倒车影像、自适应巡航买的时候觉得没用,现在发现错了...
  17. mobaxterm快捷键
  18. Android lk启动流程
  19. 于明:AMD比INTEL更受青睐
  20. 基于pywin32的考勤警报软件

热门文章

  1. php动态网站开发实训项目,PHP动态网站开发项目实战
  2. 【autoware(node:waypoint_saver)】
  3. 经典的DES算法详解
  4. linux命令统计文件行数据库,wc命令--Linux统计文件行数
  5. 到2004年Java技术发展预测
  6. AI Studio 项目
  7. EPCSTOE半球型电磁炉E0故障维修
  8. 如何在Java中将HTML转换为Javascript(.js)
  9. 整理uc/os的46个函数
  10. 计算机科学三大定律和网络三大定律