Working with Aerospike on scala with macro magic

  • Tutorial

N | Solid


In our bigdates department, part of the data is stored in Aerospike. There are quite a few consumers, among them two applications written in Scala, the interaction with the database in which will be expanded in connection with the ever-growing demands of the business. The only decent driver for us was the Java client mentioned on the aerospike.com database website itself ( http://www.aerospike.com/docs/client/java ). Converting rocky data types (and especially hierarchical ones) into corresponding aerospike types leads to a large amount of boilerplate. To avoid this, you need a more convenient, but at the same time type-safe interface.


Engineers do not like to write the same code many times and try to simplify and optimize all repetitive actions. Such tasks are often solved by code generation. Therefore, we decided to write our library to work with Aerospike, using macros .



A bit about Aerospike


Aerospike is a distributed schema-less key-value database based on the hash table principle. It is actively used in our bank for building distributed caches and for tasks requiring a low response time. The database is easily installed and administered without problems, which simplifies its implementation and support.


About the storage model: the namespace and setName parameters are associated with record keys, and the data itself is stored in so-called bins. The values can be of various types Integers, Strings, Bytes, Doubles, Lists, Maps, Sorted Maps, GeoJSON. Interestingly, the bean type is not fixed and, having written, say Integer, you can then rewrite it to any other. The drivers written for this database have a fair amount of code to serialize the values ​​of the external model into the internal one.


About creating DSL


Let's look at simple examples of the design process of our DSL, why we decided to use macros, and what came of it all.


In conditions of limited time (interaction with this database is only a small part of the project) it is difficult to write the whole client with the implementation of the protocol. In addition, this would require more support efforts. Therefore, we settled on creating a wrapper for an existing client. Let's look at examples.


The Aerospike Java Client version 3.3.1 was used as a basis (it can be found on the website www.aerospike.com, the sources are on the Github ), a considerable part of the methods in which operates with keys and bins from the package com.aerospike.client. Java Client supports working with the database both in synchronous and in asynchronous mode. We use asynchronous com.aerospike.client.async.AsyncClient. The easiest way to create it:


val client = new AsyncClient(new AsyncClientPolicy, hosts.map(new Host(_, port)): _*)

where hostsis the List[String]one containing the hosts of your database, and portis the type port Int(by default 3000).


If you create invalid client values ​​when creating the client or the wrong port, the driver will throw an error, because it checks the connection when it is called:


scala> new AsyncClient(new AsyncClientPolicy, List().map(new Host(_, port)): _*)
com.aerospike.client.AerospikeException$Connection: Error Code 11: Failed to connect to host(s):

Type Matching Table in DSL, Java CLient, and Database


| Scala         | Java Client   | Aerospike     |
|-------------- |-------------- |-----------    |
| Int           | IntegerValue  | Integer       |
| Long          | LongValue     | Integer       |
| String        | StringValue   | String        |
| Boolean       | BooleanValue  | Integer       |
| Float         | FloatValue    | Double        |
| Double        | DoubleValue   | Double        |
| Seq           | ListValue     | List          |
| Map           | MapValue      | Map           |
| Char          | StringValue   | String        |
| Short         | IntegerValue  | Integer       |
| Byte          | IntegerValue  | Integer       |
| HList         | MapValue      | Map           |
| case class T  | MapValue      | Map           |

The table shows that there are quite a lot of similar transformations ahead. Every time there is no desire to write all this with hands.


The first thought was about reflexion, but the runtime version does not suit us - it is long and expensive. There remains a variant with a compile-time reflection, which will allow converters to be generated and receive error messages at the compilation stage.


In the methods of our DSL interface for any actions with the database, we will transfer only specific values ​​of keys and bins, and all the transformations will be done for us by macros. The main idea was to get rid of the boilerplate and save the user from a thorough study of the internal data structure of Aerospike itself. We previously described the most optimal storage option, based on the type of value passed for recording.


Consider the example of one of the most common operations with Aerospike - adding a record and then reading it by key. We will use the method Put. First, we need functions to convert values ​​of certain types into internal driver models: keys in com.aerospike.client.Key, and bins in com.aerospike.client.Bin.
Let the key be String, and we will write type bins in various services String, Int, Boolean.


Let's write the key conversion function:


import com.aerospike.client.Key
def createStringKey(namespace: String, setName: String, value: String): Key =
   new Key(namespace, setName, new StringValue(value))

and bin, respectively:


import com.aerospike.client.Value.{IntegerValue, StringValue, BooleanValue}
def createStringBin(name: String, value: String): Bin = new Bin(name, new StringValue(value))
def createIntBin(name: String, value: Int): Bin = new Bin(name, new IntegerValue(value))
def createBooleanBin(name: String, value: Boolean): Bin = new Bin(name, new BooleanValue(value))

The signature of the method we need in the library in java (there are several options, we take with the least number of parameters):


public void put(WritePolicy policy, Key key, Bin... bins) throws AerospikeException;

So, calls using this library will look like this:


import com.aerospike.client.policy.WritePolicy
client.put(new WritePolicy, createStringKey("namespace", "setName", "keyValue1"),
   Seq(createStringBin("binName1", "binValue1"), createStringBin("binName2", "binValue2")): _*)
client.put(new WritePolicy, createStringKey("namespace", "setName", "keyValue2"),
   Seq(createIntBin("binName1", 2), createIntBin("binName2", 4)): _*)
client.put(new WritePolicy, createStringKey("namespace", "setName", "keyValue3"),
   Seq(createBooleanBin("binName1", true), createBooleanBin("binName2", false)): _*)

Not too pretty, right? Let's try to simplify:


 def createKey[T](ns: String, sn: String, value: T): Key = {
   val key = value match {
     case s: String => new StringValue(s)
     case i: Int => new IntegerValue(i)
     case b: Boolean => new BooleanValue(b)
     case _ => throw new Exception("Not implemented")
   }
   new Key(ns, sn, key)
 }
 def createBin[T](name: String, value: T): Bin = {
   value match {
     case s: String => new Bin(name, new StringValue(s))
     case i: Int => new Bin(name, new IntegerValue(i))
     case b: Boolean => new Bin(name, new BooleanValue(b))
     case _ => throw new Exception("Not implemented")
   }
 }
 def putValues[K, B](client: AsyncClient, namespace: String, setName: String,
                     keyValue: K, bins: Seq[(String, B)])(implicit wPolicy: WritePolicy): Unit = {
   client.put(wPolicy, createKey(namespace, setName, keyValue), bins.map(b => createBin(b._1, b._2)): _*)
 }

Now we need to get rid of the functions createKeyand createBinaddof magic implicits.


We will need service objects that will generate the corresponding models of the used driver based on input data types:


KeyWrapper: [K => Key]
BinWrapper: [B => Bin]

Now you can collect all the logic in one method:


case class SingleBin[B](name: String, value: B)
def putValues[K, B](client: AsyncClient, key: K, value: SingleBin[B])(implicit kC: KeyWrapper[K],
 bC: BinWrapper[B], wPolicy: WritePolicy): Unit = client.put(wPolicy, kC(key), bC(value))

where WritePolicyis a container object containing various recording parameters. We will use the default, creating it like this new WritePolicy.


Obviously, the most commonplace option would be to describe the creation of all types of wrappers. But why do this when we know how each instance will be created? This is where macros come in handy.


The simplest option is to describe the creation of a particular type of converter using quasiquotes . Let's start with the keys:


 trait KeyWrapper[KT] {
   val namespace: String = ""
   val setName: String = ""
   def apply(k: KT): Key
   def toValue(v: KT): Value = v match {
     case b: Int => new IntegerValue(b)
     case b: String => new StringValue(b)
     case b: Boolean => new BooleanValue(b)
     case _ => throw new Exception("not implemented")
   }
 }
 object KeyWrapper {
   implicit def materialize[T](implicit dbc: DBCredentials): KeyWrapper[T] = macro impl[T]
   def impl[T: c.WeakTypeTag](c: Context)(dbc: c.Expr[DBCredentials]): c.Expr[KeyWrapper[T]] = {
     import c.universe._
     val tpe = weakTypeOf[T]
     val ns = reify(dbc.splice.namespace)
     val sn = reify(dbc.splice.setname)
     val imports =
       q"""
         import com.aerospike.client.{Key, Value}
         import collection.JavaConversions._
         import com.aerospike.client.Value._
         import scala.collection.immutable.Seq
         import ru.tinkoff.aerospikescala.domain.ByteSegment
         import scala.util.{Failure, Success, Try}
        """
     c.Expr[KeyWrapper[T]] {
       q"""
       $imports
       new KeyWrapper[$tpe] {
         override val namespace = $ns
         override val setName = $sn
         def apply(k: $tpe): Key = new Key(namespace, setName, toValue(k))
       }
      """
     }
   }
 }

where DBCredentialscontains namespace and setName.


Thus, we can describe a method for a service, upon compilation of which converters will be generated independently.


N | Solid


С бинами у нас ситуация несколько сложнее. Необходимо доставать значения, сохраненные в базе, предварительно преобразованные во внутренний формат Aerospike. Для этого воспользуемся самым простым из методов драйвера:


public Record get(Policy policy, Key key) throws AerospikeException;

где возвращаемое значение:


public Record(
     Map bins,
     int generation,
     int expiration
  )

а необходимые нам данные лежат в Map bins. Тут возникает проблема (см. таблицу соответствий). Так как наша цель — генерировать конвертеры на этапе компиляции и обеспечить на выходе значение типа, идентичного записанному ранее, нам надо предсказать, как именно описать функцию, достающую нужное нам вэлью из базы. Помимо прочего типы, которые мы получаем в bins из пакета java.util — значит, нам пригодятся конвертеры из соответствующих пакетов scala.collection.
Теперь напишем конвертер для бинов:


trait BinWrapper[BT] {
 import com.aerospike.client.Value._
 import com.aerospike.client.{Bin, Record, Value}
 import scala.collection.JavaConversions._
 import scala.collection.immutable.Map
 import scala.reflect.runtime.universe._
 type Singleton = SingleBin[BT]
 type Out = (Map[String, Option[BT]], Int, Int)
 def apply(one: Singleton): Bin = {
   if (one.name.length > 14) throw new IllegalArgumentException("Current limit for bean name is 14 characters")
   else new Bin(one.name, toValue(one.value))
 }
 def toValue(v: BT): Value = v match {
   case b: Int => new IntegerValue(b)
   case b: String => new StringValue(b)
   case b: Boolean => new BooleanValue(b)
   case _ => throw new Exception("not implemented")
 }
 def apply(r: Record): Out = {
   val outValue: Map[String, Option[BT]] = {
     val jMap = r.bins.view collect {
       case (name, bt: Any) => name -> fetch(bt)
     }
     jMap.toMap
   }
   if (outValue.values.isEmpty && r.bins.nonEmpty) throw new ClassCastException(
     s"Failed to cast ${weakTypeOf[BT]}. Please, implement fetch function in BinWrapper")
   else (outValue, r.generation, r.expiration)
 }
 def fetch(any: Any): Option[BT]
}

Метод apply принимает в качестве параметра Record — тут обобщить можно всё до момента разбора непосредственно типа значения. Реализацию этого метода проще написать на макросах:


object BinWrapper {
 implicit def materialize[T]: BinWrapper[T] = macro materializeImpl[T]
 def materializeImpl[T: c.WeakTypeTag](c: blackbox.Context): c.Expr[BinWrapper[T]] = {
   import c.universe._
   val tpe = weakTypeOf[T]
   val singleton = weakTypeOf[SingleBin[T]]
   val out = weakTypeOf[(Map[String, Option[T]], Int, Int)]
   val tpeSt = q"${tpe.toString}"
   val fetchValue = tpe match {
     case t if t =:= weakTypeOf[String] => q"""override def fetch(any: Any): Option[$tpe] = any match {
       case v: String => Option(v)
       case oth => scala.util.Try(oth.toString).toOption
     } """
     case t if t =:= weakTypeOf[Boolean] => q"""override def fetch(any: Any): Option[$tpe] = any match {
       case v: java.lang.Long => Option(v == 1)
       case _ => None
     } """
     case t if t =:= weakTypeOf[Int] => q"""override def fetch(any: Any): Option[$tpe] = any match {
       case v: java.lang.Long => Option(v.toInt)
       case oth => scala.util.Try(oth.toString.toInt).toOption
     } """
     case t if t.toString.contains("HNil") || t.toString.contains("HList") =>
       q"""override def fetch(any: Any): Option[$tpe] = any match {
             case m: java.util.HashMap[Any, Any] =>
             val newList = castHListElements(m.asScala.values.toList, $tpeSt)
             newList.toHList[$tpe]
             case oth => None
           } """
     case _ => q""""""
   }
   val imports =
     q"""
        import java.util.{List => JList, Map => JMap}
        import com.aerospike.client.{Bin, Record, Value}
        import com.aerospike.client.Value.{BlobValue, ListValue, MapValue, ValueArray}
        import scala.collection.JavaConversions._
        import scala.collection.JavaConverters._
        import shapeless.{HList, _}
        import shapeless.HList.hlistOps
        import syntax.std.traversable._
        ....
      """
   c.Expr[BinWrapper[T]] {
     q"""
     $imports
     new BinWrapper[$tpe] {
       override def apply(one: $singleton): Bin = {
          if (one.name.length > 14) throw new IllegalArgumentException("Current limit for bean name is 14 characters")
          else new Bin(one.name, toValue(one.value))
        }
       override def apply(r: Record): $out = {
          val outValue: Map[String, Option[$tpe]] = {
          val jMap = r.bins.view collect {
           case (name, bt: Any) =>
           val res = fetch(bt)
           if (res.isEmpty && r.bins.nonEmpty) throwClassCast($tpeSt) else name -> res
          }
         jMap.toMap
         }
        (outValue, r.generation, r.expiration)
       }
       $fetchValue
     }
   """
   }
 }
}

Макросы сделали за нас всю работу — инстансы всех требуемых конвертеров будут генерироваться самостоятельно, вызовы методов будут содержать только сами значения ключей и бинов.


N | Solid


С Quasiquotes работать легко: поведение предсказуемое, подводных камней нет. Важно помнить, что при использовании такого подхода все библиотеки, которые нужны в описанных в Quasiquotes методах, должны быть импортированы в файл, где используется макрос. Поэтому я сразу добавила параметр imports в обоих конвертерах, чтобы не копировать множество библиотек в каждом файле.


Теперь у нас есть всё, чтобы написать сервис-обертку:


class SpikeImpl(client: IAsyncClient) {
 def putValue[K, B](key: K, value: SingleBin[B])(implicit kC: KeyWrapper[K], bC: BinWrapper[B]): Unit = {
   val wPolicy = new WritePolicy
   client.put(wPolicy, kC(key), bC(value))
 }
  def getByKey[K, B](k: K)(implicit kC: KeyWrapper[K], bC: BinWrapper[B]): Option[B] = {
   val policy = new Policy
   val record = client.get(policy, kC(k))
   bC.apply(record)._1.headOption.flatMap(_._2)
 }
}

Теперь можно проверить работу нашего сервиса:


import shapeless.{HList, _}
import shapeless.HList.hlistOps
import scala.reflect.macros.blackbox._
import scala.language.experimental.macros
object HelloAerospike extends App {
 val client = new AsyncClient(new AsyncClientPolicy, hosts.map(new Host(_, port)): _*)
 val database = new SpikeImpl(client)
 implicit val dbc = DBCredentials("namespace", "setName")
   database.putValue("key", SingleBin("binName", 123 :: "strValue" :: true :: HNil))
   val hlistBin = database.getByKey[String, Int :: String :: Boolean :: HNil]("key")
     .getOrElse(throw new Exception("Failed to get bin value"))
   println("hlistBin value = " + hlistBin)
}

We launch and go to the database:


Mac-mini-administrator-5:~ MarinaSigaeva$ ssh user@host
user@host's password:
Last login: Wed Nov 23 19:41:56 2016 from 1.1.1.1
[user@host ~]$ aql
Aerospike Query Client
Version 3.9.1.2
Copyright 2012-2016 Aerospike. All rights reserved.
aql> select * from namespace.setName
+------------------------------------------+
| binName                                  |
+------------------------------------------+
| MAP('{"0":123, "1":"strValue", "2":1}')  |
+------------------------------------------+
1 row in set (0.049 secs)
aql>

The data is recorded. Now let's see what the application displays in the console:


[info] Compiling 1 Scala source to /Users/Marina/Desktop/forks/playground/target/scala-2.11/classes...
[info] Running HelloAerospike
hlistBin value = 123 :: strValue :: true :: HNil
[success] Total time: 0 s, completed 23.11.2016 20:01:44

For scala developers, the solution may be more intuitive than the java library. The code for the current DSL is posted on Github with a detailed description of how to and a cookbook to be supplemented. In the light of recent events ( scala 2.12 released ), a task has appeared for interesting experiments with scala-meta . I hope this experience will be useful to you in solving such problems.


Also popular now: