原同步服务正常,因需,对方单表新增字段,超过22条
sbt assembly 编译出错
too many elements for tuple: 26, allowed: 22
scala case class 最多只支持22个构造参数
遂找解决办法
https://underscore.io/blog/posts/2016/10/11/twenty-two.html
https://github.com/slick/slick/issues/519#issuecomment-48327043
https://github.com/underscoreio/slickless/issues/16
最终应用slickless解决
部分示例
import slick.jdbc.H2Profile.api._ import shapeless.{ HList, ::, HNil, Generic } import slickless._ case class User(id: Long, email: String) class Users(tag: Tag) extends Table[User](tag, "users") { def id = column[Long]( "id", O.PrimaryKey, O.AutoInc ) def email = column[String]("email") def * = (id :: email :: HNil).mappedWith(Generic[User]) } lazy val users = TableQuery[Users]
基于 tupled 需要作转换
case (BYOFFSET_GET,lastid:Int) => { println("sync start by",lastid) val words = TableQuery[Words] // val action=ORIGIN_DB.run(Word.filter(_.id > lastid).take(PAGE_SIZE).result) val action=ORIGIN_DB.run(words.filter(_.id > lastid).take(PAGE_SIZE).result) action.onComplete(data=>{ println("sync get data",data.get.length) if (data.isSuccess){ val words=data.get.toList.map(a=>{ Word.tupled(a) }) if (words.length>0){ Future { println(s"Blocking next page 1s start") TimeUnit.SECONDS.sleep(1) println(s"Blocking next page 1s finished") //同步时只考虑insert if(is_just_insert){ self !(BYOFFSET_INSERT,words) }else{ //如果会更新历史数据 self !(BYOFFSET_INSERT_UPDATE,words) } } }else{ //拿到最新数据 等待5分钟 Future { println(s"Blocking future 5m start") TimeUnit.MINUTES.sleep(5) println(s"Blocking future 5m finished") self !(BYOFFSET_GET,lastid) } } }else{
Future {
println(s"Blocking table "+tablename+" future 5m start")
TimeUnit.MINUTES.sleep(5)
println(s"Blocking table "+tablename+" future 5m finished")
self !(BYOFFSET_GET,lastid)
}
}
}) } //插入数据 case (BYOFFSET_INSERT,words:List[Word])=>{ println("insert start",words.length) val word = TableQuery[Words] word.++=(words.map(a=>{ Word.unapply(a).get })) //Word.+=(Word.unapply(words.head).get) val insertActions = DBIO.seq( word.++=(words.map(a=>{ Word.unapply(a).get })) ) DEST_DB.run(insertActions).onComplete(data=>{ if (data.isSuccess){ println("insert data result",data) //添加成功后更新last表 val lastid=words.last.id Sync.lastActor !(BYOFFSET_UPSERT_OFFSET,tablename,lastid) }else{ self !(BYOFFSET_INSERT,words) } }) }
基于HLists/Generics 则不必
case (BYOFFSET_GET,lastid:Int) => { println("table "+tablename+" sync start by",lastid) val users = TableQuery[Users] // val action=ORIGIN_DB.run(User.filter(_.id > lastid).take(PAGE_SIZE).result) val action=ORIGIN_DB.run(users.filter(_.id > lastid).take(PAGE_SIZE).result) action.onComplete(data=>{ println("table "+tablename+" sync get data",data.get.length) if (data.isSuccess){ val users=data.get.toList if (users.length>0){ Future { println(s"Blocking table "+tablename+" next page 1s start") TimeUnit.SECONDS.sleep(1) println(s"Blocking table "+tablename+" next page 1s finished") //同步时只考虑insert if(is_just_insert){ self !(BYOFFSET_INSERT,users) }else{ //如果会更新历史数据 self !(BYOFFSET_INSERT_UPDATE,users) } } }else{ Future { println(s"Blocking table "+tablename+" future 5m start") TimeUnit.MINUTES.sleep(5) println(s"Blocking table "+tablename+" future 5m finished") self !(BYOFFSET_GET,lastid) } } }else{ Future { println(s"Blocking table "+tablename+" future 5m start") TimeUnit.MINUTES.sleep(5) println(s"Blocking table "+tablename+" future 5m finished") self !(BYOFFSET_GET,lastid) } } }) } // 插入数据 case (BYOFFSET_INSERT,users:List[User])=>{ println("table "+tablename+" insert start",users.length) val user = TableQuery[Users] //User.+=(User.unapply(users.head).get) val insertActions = DBIO.seq( user++=users ) DEST_DB.run(insertActions).onComplete(data=>{ if (data.isSuccess){ println("table "+tablename+" insert data result",data) //添加成功后更新last表 val lastid=users.last.id Sync.lastActor !(BYOFFSET_UPSERT_OFFSET,tablename,lastid) }else{ self !(BYOFFSET_INSERT,users) } }) }
scala slick 异构表同步服务
https://github.com/cclient/ScalaMysqlSync