上篇我们介绍了distributed pub/sub消息传递机制。这是在同一个集群内的消息共享机制:发布者(publisher)和订阅者(subscriber)都在同一个集群的节点上,所有节点上的DistributedPubSubMediator通过集群内部的沟通机制在底层构建了消息流通渠道。在actor pub/sub层面可以实现对象位置透明化。在现实里很多前端都会作为某个集群的客户端但又与集群分离,又或者两个独立的集群之间可能会发生交互关系,这是也会出现客户端与服务端不在同一集群内的情况,ClusterClient就是为集群外部actor与集群内部actor进行沟通的解决方案。

实际上ClusterClient模式就代表一种依赖于消息发布订阅机制的服务方式:客户端通过消息来请求服务,服务端接收请求服务消息并提供相应运算服务。

我们可以把集群客户端模式分成集群客户端ClusterClient和集群服务端ClusterClientReceptionist,从字面理解这就是个接待员这么个角色,负责接待集群外客户端发起的服务请求。在集群所有节点上(或者选定角色role)都部署ClusterClientReceptionist,它们都与本节点的DistributedPubSubMediator对接组成更上一层的消息订阅方,ClusterClient与ClusterClientReceptionist的对接又组成了一种统一集群环境可以实现上集所讨论的distributed pub/sub机制。

ClusterClient就是消息发布方,它是在目标集群之外机器上的某个actor。这个机器上的actor如果需要向集群内部actor发送消息可以通过这个机器上的ClusterClient actor与集群内的ClusterClientReceptionist搭建的通道向集群内某个ClusterClientReceptionist连接的DistributedPubSubMediator所登记的actor进行消息发送。所以使用集群客户端的机器必须在本机启动ClusterClient服务(运行这个actor),这是通讯桥梁的一端。

ClusterClient在启动时用预先配置的地址(contact points)与ClusterClientReceptionist连接,然后通过ClusterClientReceptionist发布的联络点清单来维护内部的对接点清单,可以进行持久化,在发生系统重启时用这个名单来与集群连接。一旦连接,ClusterClient会监控对方运行情况,自动进行具体ClusterClientReceiptionist的替换。ClusterClient发布消息是包嵌在三种结构里的:

1、ClusterClient.Send

2、ClusterClient.SendAll

3、ClusterClient.Publish

这几种方法我们在上篇已经讨论过,这里就略去。

ClusterClientReceiptionist是集群内的消息接收接口。集群内需要接收消息的actor必须在本地的DistributedPubSubMediator上注册自己的地址,ClusterClientReceptionist由此获得集群内所有服务项目actor的地址清单。通过ClusterClient发布的消息内指定接收方类型信息来确定最终接收消息并提供服务的actor。服务注册示范如下:

//注册服务A
  val serviceA = system.actorOf(Props[Service], \"serviceA\")
  ClusterClientReceptionist(system).registerService(serviceA)
//注册服务B
  val serviceB = system.actorOf(Props[Service], \"serviceB\")
  ClusterClientReceptionist(system).registerService(serviceB)

ClusterClient调用服务示范:

  val client = system.actorOf(ClusterClient.props(
  ClusterClientSettings(system).withInitialContacts(initialContacts)), \"client\")
  client ! ClusterClient.Send(\"/user/serviceA\", DoThis, localAffinity = true)
  client ! ClusterClient.SendToAll(\"/user/serviceB\", DoThat)

注意:ClusterClientReceptionist需要接收DoThis,DoThat消息并实现相关的运算。

在具体应用中要注意sender()的具体意义:从提供服务的actor方面看,sender()代表ClusterClientReceptionist。从发布消息的actor角度看,sender()代表的是DeadLetter。如果服务actor需要知道请求者具体地址,发布方可以把自己的地址嵌在发布的消息结构里。

下面我们就通过一个简单的例子来进行示范。先设计两个服务actor:Cat,Dog 。假设它们会提供不同的叫声作为服务吧:

class Cat extends Actor with ActorLogging {
  //使用pub/sub方式设置
  val mediator = DistributedPubSub(context.system).mediator
  override def preStart() = {
    mediator ! Subscribe(\"Shout\", self)
    super.preStart()
  }

  override def receive: Receive = {
    case \"Shout\" =>
      log.info(\"*******I am a cat, MIAOM ...******\")
  }
}

class Dog extends Actor with ActorLogging {
  //使用pub/sub方式设置
  val mediator = DistributedPubSub(context.system).mediator
  override def preStart() = {
    mediator ! Subscribe(\"Shout\", self)
    super.preStart()
  }
  override def receive: Receive = {
    case \"Shout\" =>
      log.info(\"*****I am a dog, WANG WANG...*****\")
  }
}

我们看到,这就是两个很普通的actor。但我们还是可以和上一篇分布式pub/sub结合起来验证cluster-client是基于distributed-pub/sub的。然后我们分别把这两个actor(服务)放到不同的集群节点上:

  Cat {
  def props = Props[Cat]
  def create(port: Int): ActorSystem  = {
    val config = ConfigFactory.parseString(s\"akka.remote.netty.tcp.port=$port\")
      .withFallback(ConfigFactory.load())

    val system = ActorSystem(\"ClusterSystem\",config)
    val catSound = system.actorOf(props,\"CatSound\")

    ClusterClientReceptionist(system).registerService(catSound)
    system
  }
}

  Dog {
  def props = Props(new Dog)
  def create(port: Int): ActorSystem = {
    val config = ConfigFactory.parseString(s\"akka.remote.netty.tcp.port=$port\")
      .withFallback(ConfigFactory.load())
    val system = ActorSystem(\"ClusterSystem\",config)
    val dogSound = system.actorOf(props,\"DogSound\")
    ClusterClientReceptionist(system).registerService(dogSound)
    system
  }
}

注意:集群名称是ClusterSystem。我们分别在actor所在节点用ClusterClientReceptionist.registerService登记了服务。这个集群所使用的conf如下:

akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka {
  loglevel = INFO
  extensions = [\"akka.cluster.client.ClusterClientReceptionist\"]
  actor {
    provider = \"cluster\"
    serializers {
      java = \"akka.serialization.JavaSerializer\"
      proto = \"akka.remote.serialization.ProtobufSerializer\"
    }
    serialization-bindings {
      \"java.lang.String\" = java
      \"scalapb.GeneratedMessage\" = proto
    }
  }

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = \"127.0.0.1\"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      \"akka.tcp://ClusterSystem@127.0.0.1:2551\"]
    log-info = off
  }
}

这是一个比较完整的集群配置文档,只有port需要再配置。然后运行这两个节点:

  PetHouse extends App {

  val sysCat = Cat.create(2551)
  val sysDog = Dog.create(2552)

  scala.io.StdIn.readLine()

  sysCat.terminate()
  sysDog.terminate()

}

完成了在2551,2552节点上的Cat,Dog actor构建及ClusterClientReceptionist.registerService服务登记。现在看看客户端:

  PetClient extends App {

  val conf = ConfigFactory.load(\"client\")
  val clientSystem = ActorSystem(\"ClientSystem\",conf)
/* 从 conf 文件里读取 contact-points 地址
  val initialContacts = immutableSeq(conf.getStringList(\"contact-points\")).map {
    case AddressFromURIString(addr) ⇒ RootActorPath(addr) / \"system\" / \"receptionist\"
  }.toSet
*/

  //先放一个contact-point, 系统会自动增加其它的点
  val initialContacts = Set(
    ActorPaths.fromString(\"akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist\")
  )

  val clusterClient = clientSystem.actorOf(
    ClusterClient.props(
      ClusterClientSettings(clientSystem)
        .withInitialContacts(initialContacts)),
    \"petClient\")

  clusterClient ! Send(\"/user/CatSound\",\"Shout\",localAffinity = true)
  clusterClient ! Send(\"/user/DogSound\",\"Shout\",localAffinity = true)

  println(s\"sent shout messages ...\")
  scala.io.StdIn.readLine()

  clusterClient ! Publish(\"Shout\",\"Shout\")
  println(s\"publish shout messages ...\")

  scala.io.StdIn.readLine()
  clientSystem.terminate();
}

客户端的ActorSystem名称为ClientSystem,是在ClusterSystem集群之外的。conf文件如下:

akka {

  actor.provider = remote

  remote.netty.tcp.port= 2553
  remote.netty.tcp.hostname=127.0.0.1

}

contact-points = [
  \"akka.tcp://ClusterSystem@127.0.0.1:2551\",
  \"akka.tcp://ClusterSystem@127.0.0.1:2552\"]

把它设成actor.provider=remote可以免去提供seednodes。运算结果:

 [12/08/2018 09:32:51.432] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/CatSound] *******I am a cat, MIAOM ...******
[INFO] [12/08/2018 09:32:51.435] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/DogSound] *****I am a dog, WANG WANG...*****
[INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/CatSound] *******I am a cat, MIAOM ...******
[INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/DogSound] *****I am a dog, WANG WANG...*****

无论ClusterClient或Receptionist都会针对自己的状态发送消息。我们可以截取这些消息来做些相应的工作。参考下面的截听器示范代码: 

package petsound
import akka.actor._
import akka.cluster.client._
class ClientListener(clusterClient: ActorRef) extends Actor with ActorLogging {
  override def preStart(): Unit = {
    clusterClient ! SubscribeContactPoints
    super.preStart()
  }

  override def receive: Receive = {
    case ContactPoints(cps) =>
      cps.map {ap => log.info(s\"*******ContactPoints:${ap.address.toString}******\")}
    case ContactPointAdded(cp) =>
      log.info(s\"*******ContactPointAdded: ${cp.address.toString}*******\")
    case ContactPointRemoved(cp) =>
      log.info(s\"*******ContactPointRemoved: ${cp.address.toString}*******\")

  }
}

class ReceptionistListener(receptionist: ActorRef) extends Actor with ActorLogging {
  override def preStart(): Unit = {
    receptionist ! SubscribeClusterClients
    super.preStart()
  }

  override def receive: Receive = {
    case ClusterClients(cs) =>
      cs.map{aref => println(s\"*******ClusterClients: ${aref.path.address.toString}*******\")}
    case ClusterClientUp(cc) =>
      log.info(s\"*******ClusterClientUp: ${cc.path.address.toString}*******\")
    case ClusterClientUnreachable(cc) =>
      log.info(s\"*******ClusterClientUnreachable: ${cc.path.address.toString}*******\")

  }
}

这两个event-listener的安装方法如下:

    val receptionist = ClusterClientReceptionist(system).underlying
    system.actorOf(Props(classOf[ReceptionistListener],receptionist),\"cat-event-listner\")

    val receptionist = ClusterClientReceptionist(system).underlying
    system.actorOf(Props(classOf[ReceptionistListener],receptionist),\"dog-event-listner\")

  val clusterClient = clientSystem.actorOf(
    ClusterClient.props(
      ClusterClientSettings(clientSystem)
        .withInitialContacts(initialContacts)),
    \"petClient\")

  clientSystem.actorOf(Props(classOf[ClientListener],clusterClient),\"client-event-listner\")

看看运算结果:

[INFO] [12/09/2018 09:42:40.838] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClientSystem@127.0.0.1:2553/user/client-event-listner] *******ContactPoints:akka.tcp://ClusterSystem@127.0.0.1:2551******
[INFO] [12/09/2018 09:42:40.947] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClientSystem@127.0.0.1:2553/user/client-event-listner] *******ContactPointAdded: akka.tcp://ClusterSystem@127.0.0.1:2552*******
[INFO] [12/09/2018 09:42:40.967] [ClientSystem-akka.actor.default-dispatcher-15] [akka.tcp://ClientSystem@127.0.0.1:2553/user/petClient] Connected to [akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist]


[INFO] [12/09/2018 09:42:40.979] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/cat-event-listner] *******ClusterClientUp: akka.tcp://ClientSystem@127.0.0.1:2553*******

[INFO] [12/09/2018 09:54:34.363] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/cat-event-listner] *******ClusterClientUnreachable: akka.tcp://ClientSystem@127.0.0.1:2553*******

下面我们再做个示范,还是与上篇讨论一样:由集群客户端发送MongoDB指令至某个在集群里用ClusterClientReceptionist注册的MongoDB操作服务actor。服务方接收指令后在MongoDB上进行运算。下面是MongoDB的服务actor: 

package petsound
import akka.actor._
import com.typesafe.config._
import akka.actor.ActorSystem
import org.mongodb.scala._
import sdp.grpc.services.ProtoMGOContext
import sdp.mongo.engine.MGOClasses._
import sdp.mongo.engine.MGOEngine._
import sdp.result.DBOResult._
import akka.cluster.client._

import scala.collection.JavaConverters._
import scala.util._

class MongoAdder extends Actor with ActorLogging {
  import monix.execution.Scheduler.Implicits.global
  implicit val mgosys = context.system
  implicit val ec = mgosys.dispatcher

  val clientSettings: MongoClientSettings = MongoClientSettings.builder()
    .applyToClusterSettings {b =>
      b.hosts(List(new ServerAddress(\"localhost:27017\")).asJava)
    }.build()

  implicit val client: MongoClient = MongoClient(clientSettings)

  val ctx = MGOContext(\"testdb\",\"friends\")

  override def receive: Receive = {

    case someProto @ Some(proto:ProtoMGOContext) =>
      val ctx = MGOContext.fromProto(proto)
      log.info(s\"****** received MGOContext: $someProto *********\")

      val task = mgoUpdate[Completed](ctx).toTask
      task.runOnComplete {
        case Success(s) => println(\"operations completed successfully.\")
        case Failure(exception) => println(s\"error: ${exception.getMessage}\")
      }

  }
}

  MongoAdder {

  def create(port: Int): ActorSystem = {
    val config = ConfigFactory.parseString(s\"akka.remote.netty.tcp.port=$port\")
      .withFallback(ConfigFactory.load())
    val system = ActorSystem(\"ClusterSystem\", config)

    val mongoAdder = system.actorOf(Props[MongoAdder],\"MongoAdder\")
    ClusterClientReceptionist(system).registerService(mongoAdder)

    val receptionist = ClusterClientReceptionist(system).underlying
    system.actorOf(Props(classOf[ReceptionistListener],receptionist),\"mongo-event-listner\")

    system

  }

}

MongoAdder处于同一个集群ClusterSystem中。代码里已经包括了服务注册部分。客户端发送MongoDB指令的示范如下:

 //MongoDB 操作示范
  import org.mongodb.scala._
  import sdp.mongo.engine.MGOClasses._

  val ctx = MGOContext(\"testdb\",\"friends\")

  val chen = Document(\"\" -> \"\", \"\" -> \"大文\",\"age\" -> 28)
  val zhang = Document(\"\" -> \"\", \"\" -> \"小海\",\"age\" -> 7)
  val lee = Document(\"\" -> \"\", \"\" -> \"\",\"age\" -> 45)
  val ouyang = Document(\"\" -> \"欧阳\", \"\" -> \"\",\"age\" -> 120)

  val c = ctx.setCommand(MGOCommands.Insert(Seq(chen,zhang,lee,ouyang)))
  clusterClient ! Send(\"/user/MongoAdder\",c.toSomeProto,localAffinity = true)

由于MongoDB指令是通过protobuffer方式进行序列化的,所以需要修改client.conf通知akka使用protobuf格式的消息:

akka {

  actor {
    provider = remote
    serializers {
      java = \"akka.serialization.JavaSerializer\"
      proto = \"akka.remote.serialization.ProtobufSerializer\"
    }
    serialization-bindings {
      \"java.lang.String\" = java
      \"scalapb.GeneratedMessage\" = proto
    }
  }
  remote.netty.tcp.port= 2553
  remote.netty.tcp.hostname=127.0.0.1

}

contact-points = [
  \"akka.tcp://ClusterSystem@127.0.0.1:2551\",
  \"akka.tcp://ClusterSystem@127.0.0.1:2552\"]

下面是本次讨论完整示范源代码:

build.sbt

import scalapb.compiler.Version.scalapbVersion
import scalapb.compiler.Version.grpcJavaVersion

name := \"akka-cluster-client\"

version := \"0.1\"

scalaVersion := \"2.12.7\"

scalacOptions += \"-Ypartial-unification\"

libraryDependencies := Seq(
  \"com.typesafe.akka\" %% \"akka-actor\" % \"2.5.17\",
  \"com.typesafe.akka\" %% \"akka-cluster-tools\" % \"2.5.17\",
  \"com.thesamet.scalapb\" %% \"scalapb-runtime\" % scalapbVersion % \"protobuf\",
  // \"io.grpc\" % \"grpc-netty\" % grpcJavaVersion,
  \"com.thesamet.scalapb\" %% \"scalapb-runtime-grpc\" % scalapbVersion,
  \"io.monix\" %% \"monix\" % \"2.3.0\",
  //for mongodb 4.0
  \"org.mongodb.scala\" %% \"mongo-scala-driver\" % \"2.4.0\",
  \"com.lightbend.akka\" %% \"akka-stream-alpakka-mongodb\" % \"0.20\",
  //other dependencies
  \"co.fs2\" %% \"fs2-core\" % \"0.9.7\",
  \"ch.qos.logback\"  %  \"logback-classic\"   % \"1.2.3\",
  \"org.typelevel\" %% \"cats-core\" % \"0.9.0\",
  \"io.monix\" %% \"monix-execution\" % \"3.0.0-RC1\",
  \"io.monix\" %% \"monix-eval\" % \"3.0.0-RC1\"
)

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)

project/scalapb.sbt

addSbtPlugin(\"com.thesamet\" % \"sbt-protoc\" % \"0.99.18\")

libraryDependencies ++= Seq(
  \"com.thesamet.scalapb\" %% \"compilerplugin\" % \"0.7.4\"
)

resouces/application.conf

akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka {
  loglevel = INFO
  extensions = [\"akka.cluster.client.ClusterClientReceptionist\"]
  actor {
    provider = \"cluster\"
    serializers {
      java = \"akka.serialization.JavaSerializer\"
      proto = \"akka.remote.serialization.ProtobufSerializer\"
    }
    serialization-bindings {
      \"java.lang.String\" = java
      \"scalapb.GeneratedMessage\" = proto
    }
  }

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = \"127.0.0.1\"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      \"akka.tcp://ClusterSystem@127.0.0.1:2551\"]
    log-info = off
  }
}

resources/client.conf

akka {

  actor {
    provider = remote
    serializers {
      java = \"akka.serialization.JavaSerializer\"
      proto = \"akka.remote.serialization.ProtobufSerializer\"
    }
    serialization-bindings {
      \"java.lang.String\" = java
      \"scalapb.GeneratedMessage\" = proto
    }
  }
  remote.netty.tcp.port= 2553
  remote.netty.tcp.hostname=127.0.0.1

}

contact-points = [
  \"akka.tcp://ClusterSystem@127.0.0.1:2551\",
  \"akka.tcp://ClusterSystem@127.0.0.1:2552\"]

protobuf/spd.proto

syntax = \"proto3\";

import \"google/protobuf/wrappers.proto\";
import \"google/protobuf/any.proto\";
import \"scalapb/scalapb.proto\";

option (scalapb.options) = {
  // use a custom Scala package name
  // package_name: \"io.ontherocks.introgrpc.demo\"

  // don\'t append file name to package
  flat_package: true

  // generate one Scala file for all messages (services still get their own file)
  single_file: true

  // add imports to generated file
  // useful when extending traits or using custom types
  // import: \"io.ontherocks.hellogrpc.RockingMessage\"

  // code to put at the top of generated file
  // works only with `single_file: true`
  //preamble: \"sealed trait SomeSealedTrait\"
};

package sdp.grpc.services;


message ProtoDate {
  int32 yyyy = 1;
  int32 mm   = 2;
  int32 dd   = 3;
}

message ProtoTime {
  int32 hh   = 1;
  int32 mm   = 2;
  int32 ss   = 3;
  int32 nnn  = 4;
}

message ProtoDateTime {
   ProtoDate date = 1;
   ProtoTime time = 2;
}

message ProtoAny {
  bytes value = 1;
}

protobuf/mgo.proto

syntax = \"proto3\";

import \"google/protobuf/wrappers.proto\";
import \"google/protobuf/any.proto\";
import \"scalapb/scalapb.proto\";


option (scalapb.options) = {
  // use a custom Scala package name
  // package_name: \"io.ontherocks.introgrpc.demo\"

  // don\'t append file name to package
  flat_package: true

  // generate one Scala file for all messages (services still get their own file)
  single_file: true

  // add imports to generated file
  // useful when extending traits or using custom types
  // import: \"io.ontherocks.hellogrpc.RockingMessage\"

  // code to put at the top of generated file
  // works only with `single_file: true`
  //preamble: \"sealed trait SomeSealedTrait\"
};

/*
 * Demoes various customization options provided by ScalaPBs.
 */

package sdp.grpc.services;

import \"sdp.proto\";

message ProtoMGOBson {
  bytes bson = 1;
}

message ProtoMGODocument {
  bytes document = 1;
}

message ProtoMGOResultOption { //FindObservable
   int32 optType = 1;
   ProtoMGOBson bsonParam = 2;
   int32 valueParam = 3;
}

message ProtoMGOAdmin{
  string tarName = 1;
  repeated ProtoMGOBson bsonParam  = 2;
  ProtoAny options = 3;
  string objName = 4;
}

message ProtoMGOContext {  //MGOContext
  string dbName = 1;
  string collName = 2;
  int32 commandType = 3;
  repeated ProtoMGOBson bsonParam = 4;
  repeated ProtoMGOResultOption resultOptions = 5;
  repeated string targets = 6;
  ProtoAny options = 7;
  repeated ProtoMGODocument documents = 8;
  google.protobuf.BoolValue only = 9;
  ProtoMGOAdmin adminOptions = 10;
}

converters/ByteConverter.scala

package protobuf.bytes
import java.io.{ByteArrayInputStream,ByteArrayOutputStream, InputStream, OutputStream}
import com.google.protobuf.ByteString
  Converter {

  def marshal(value: Any): ByteString = {
    val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
    val oos = new  OutputStream(stream)
    oos.write (value)
    oos.close()
    ByteString.copyFrom(stream.toByteArray())
  }

  def unmarshal[A](bytes: ByteString): A = {
    val ois = new  InputStream(new ByteArrayInputStream(bytes.toByteArray))
    val value = ois.read ()
    ois.close()
    value.asInstanceOf[A]
  }


}

converters/DBOResultType.scala

package sdp.result

import cats._
import cats.data.EitherT
import cats.data.OptionT
import monix.eval.Task
import cats.implicits._

import scala.concurrent._

import scala.collection.TraversableOnce

  DBOResult {


  type DBOError[A] = EitherT[Task,Throwable,A]
  type DBOResult[A] = OptionT[DBOError,A]

  implicit def valueToDBOResult[A](a: A): DBOResult[A] =
         Applicative[DBOResult].pure(a)
  implicit def optionToDBOResult[A](o: Option[A]): DBOResult[A] =
         OptionT((o: Option[A]).pure[DBOError])
  implicit def eitherToDBOResult[A](e: Either[Throwable,A]): DBOResult[A] = {
 //   val error: DBOError[A] = EitherT[Task,Throwable, A](Task.eval(e))
         OptionT.liftF(EitherT.fromEither[Task](e))
  }
  implicit def futureToDBOResult[A](fut: Future[A]): DBOResult[A] = {
       val task = Task.fromFuture[A](fut)
       val et = EitherT.liftF[Task,Throwable,A](task)
       OptionT.liftF(et)
  }

  implicit class DBOResultToTask[A](r: DBOResult[A]) {
    def toTask = r.value.value
  }

  implicit class DBOResultToOption[A](r:Either[Throwable,Option[A]]) {
    def someValue: Option[A] = r match {
      case Left(err) => (None: Option[A])
      case Right(oa) => oa
    }
  }

  def wrapCollectionInOption[A, C[_] <: TraversableOnce[_]](coll: C[A]): DBOResult[C[A]] =
    if (coll.isEmpty)
      optionToDBOResult(None: Option[C[A]])
    else
      optionToDBOResult(Some(coll): Option[C[A]])
}

filestream/FileStreaming.scala

package sdp.file

import java.io.{B					
收藏 打印
您的足迹: