14. trait Connector[A] extends java.io.Closeable with Serializable {
def get: A
def close(): Unit
def using[B](f: A => B): B = f(get)
}
15. case class PoolAerospikeConnector(name: Symbol, config: AerospikeConfig)
extends Connector[AerospikeClient] {
def get: AerospikeClient =
PoolAerospikeConnector.defaultHolder.getOrCreate(name, mkClient)
def close(): Unit =
PoolAerospikeConnector.defaultHolder.remove(name)(
AerospikeConnector.aerospikeClientClosable)
private val mkClient: () => AerospikeClient =
() => new AerospikeClient(config.clientPolicy.underlying,
config.asHosts:_*)
}
object PoolAerospikeConnector {
private val defaultHolder = new DefaultResourceHolder[AerospikeClient]
}
16. case class ScalikeJdbcConnector(name: Symbol, config: Config)
extends Connector[Unit] {
def get: Unit = {
if (!ConnectionPool.isInitialized(name)) {
// Load MySQL JDBC driver class
Class.forName("com.mysql.jdbc.Driver")
ConnectionPool.add(name, config.getString("url"),
config.getString("user"), config.getString("password"))
}
}
def close(): Unit = ConnectionPool.close(name)
}
17. case class KafkaProducerConnector[K :ClassTag, V :ClassTag](
name: Symbol, config: java.util.Map[String, AnyRef])
extends Connector[ScalaKafkaProducer[K, V]] {
def get: ScalaKafkaProducer[K, V] =
KafkaProducerConnector.defaultHolder.getOrCreate(name, mkResource)
.asInstanceOf[ScalaKafkaProducer[K, V]]
def close(): Unit = KafkaProducerConnector.defaultHolder.remove(name)(
KafkaProducerConnector.kafkaProducerClosable)
private val mkResource = () => {
val keySer = mkDefaultSerializer[K](ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
val valueSer = mkDefaultSerializer[V](ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
new ScalaKafkaProducer[K, V](
new KafkaProducer[K, V](config, keySer.orNull, valueSer.orNull))
}
private def mkDefaultSerializer[A :ClassTag](configKey: String): Option[Serializer[A]] = {
if (!config.containsKey(configKey)) {
implicitly[ClassTag[A]].runtimeClass match {
case c if c == classOf[Array[Byte]] => Some(new ByteArraySerializer().asInstanceOf[Serializer[A]])
case c if c == classOf[String] => Some(new StringSerializer().asInstanceOf[Serializer[A]])
case _ => None
}
} else None
}
}