Apache Ignite: distributed computing in RAM

Original author: Piotr Mińkowski
  • Transfer


Hello, Habr!

We continue to be interested in new Apache solutions. We hope to release the book “High Performance Spark” by Holden Karau (a book in layout) in May , and in August - the book “Kafka: The Definitive Guide” by Nii Narhid (still in translation). Today we want to offer a brief introductory article about Apache Ignite and evaluate the scale of interest in the topic.

Enjoy reading!

Apache Ignite is a relatively new solution, however, its popularity is growing rapidly. It is difficult to attribute it to a specific subspecies of database engines, since Ignite features make it similar to several tools. The main purpose of this tool is the storage of distributed data in RAM, as well as the storage of information in the "key-value" format. Ignite also has some common RDBMS features, in particular, support for SQL queries and ACID transactions. But this does not mean that this solution is a typical database for working with transactions in SQL. Foreign key restrictions are not supported here, and transactions are available only at the key-value level. However, Apache Ignite seems like a very interesting solution.

Apache Ignite is easy to launch as a host built into the Spring Boot application. The easiest way to achieve this is with the Spring Data Ignite library. Apache Ignite implements Spring Data interfaceCrudRepositorysupporting basic CRUD operations, as well as providing access to the Apache Ignite SQL Grid using the unified Spring Data interfaces. Although it provides data persistence in a disk storage with SQL support and the ACID paradigm, we have developed a solution for storing RAM cache objects in a MySQL database. The architecture of the proposed solution is shown in the figure below - as you can see, it is very simple. The application places data in the RAM cache, arranged in Apache Ignite. Apache Ignite automatically synchronizes these changes with the database during an asynchronous background task. The way of reading data in this application should not surprise you either. If the entity is not cached, then it is read from the database and cached for the future.



Here I will describe in detail how an application of this kind is developed. The result is posted on GitHub. I found a few more examples on the Internet, but only the basics are covered. I'll show you how to configure Apache Ignite to write objects from the cache to the database, and how to create more complex merge requests using multiple caches. Let's start by starting the database.

1. Set up the MySQL database


To run the MySQL database locally, it is best, of course, to use the Docker container. The MySQL database for Docker for Windows is currently available at 192.168.99.100:33306.

	docker run -d --name mysql -e MYSQL_DATABASE=ignite -e MYSQL_USER=ignite -e MYSQL_PASSWORD=ignite123 -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -p 33306:3306 mysql

Next, we create the tables used by the application entities to store data: PERSON, CONTACT. They refer to tables as 1 ... N, where the table CONTACTcontains a foreign key pointing to PERSON id.

	CREATE TABLE `person` (
  `id` int(11) NOT NULL,
  `first_name` varchar(45) DEFAULT NULL,
  `last_name` varchar(45) DEFAULT NULL,
  `gender` varchar(10) DEFAULT NULL,
  `country` varchar(10) DEFAULT NULL,
  `city` varchar(20) DEFAULT NULL,
  `address` varchar(45) DEFAULT NULL,
  `birth_date` date DEFAULT NULL,
  PRIMARY KEY (`id`)
);
CREATE TABLE `contact` (
  `id` int(11) NOT NULL,
  `location` varchar(45) DEFAULT NULL,
  `contact_type` varchar(10) DEFAULT NULL,
  `person_id` int(11) NOT NULL,
  PRIMARY KEY (`id`)
);
ALTER TABLE `ignite`.`contact` ADD INDEX `person_fk_idx` (`person_id` ASC);
ALTER TABLE `ignite`.`contact`
ADD CONSTRAINT `person_fk` FOREIGN KEY (`person_id`) REFERENCES `ignite`.`person` (`id`) ON DELETE CASCADE ON UPDATE CASCADE;

2. Configure Maven


To get started with the Spring Data repository for Apache Ignite, the easiest way is to add the following Maven dependency to pom.xmlour application file . All other Ignite dependencies will be included automatically. We will also need the MySQL JDBC driver and Spring JDBC dependencies to configure the database connection. They are necessary because we embed Apache Ignite in the application, and you need to connect to the MySQL database so that you can synchronize the cache with the database tables.

org.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-jdbcmysqlmysql-connector-javaruntimeorg.apache.igniteignite-spring-data${ignite.version}

3. Configure the Ignite node


The class IgniteConfigurationallows you to configure all available settings of the Ignite node. In this case, the cache configuration (1) is most important. You should add the master key and entity classes as indexed types (2). Next, you need to provide for the export of cache updates to the database (3) and read from the database that information that does not appear in the cache (4). The interaction between the Ignite node and MySQL can be configured using the class CacheJdbcPojoStoreFactory(5). There you need to convey DataSource @Bean(6), dialect (7) and the correspondence between the fields of the object and the columns of the table (8).

@Bean
public Ignite igniteInstance() {
   IgniteConfiguration cfg = new IgniteConfiguration();
   cfg.setIgniteInstanceName("ignite-1");
   cfg.setPeerClassLoadingEnabled(true);
   CacheConfiguration ccfg2 = new CacheConfiguration<>("ContactCache"); // (1)
   ccfg2.setIndexedTypes(Long.class, Contact.class); // (2)
   ccfg2.setWriteBehindEnabled(true);
   ccfg2.setWriteThrough(true); // (3)
   ccfg2.setReadThrough(true); // (4)
   CacheJdbcPojoStoreFactory f2 = new CacheJdbcPojoStoreFactory<>(); // (5)
   f2.setDataSource(datasource); // (6)
   f2.setDialect(new MySQLDialect()); // (7)
   JdbcType jdbcContactType = new JdbcType(); // (8)
   jdbcContactType.setCacheName("ContactCache");
   jdbcContactType.setKeyType(Long.class);
   jdbcContactType.setValueType(Contact.class);
   jdbcContactType.setDatabaseTable("contact");
   jdbcContactType.setDatabaseSchema("ignite");
   jdbcContactType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
   jdbcContactType.setValueFields(new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type"), new JdbcTypeField(Types.VARCHAR, "location", String.class, "location"), new JdbcTypeField(Types.INTEGER, "person_id", Long.class, "personId"));
   f2.setTypes(jdbcContactType);
   ccfg2.setCacheStoreFactory(f2);
   CacheConfiguration ccfg = new CacheConfiguration<>("PersonCache");
   ccfg.setIndexedTypes(Long.class, Person.class);
   ccfg.setWriteBehindEnabled(true);
   ccfg.setReadThrough(true);
   ccfg.setWriteThrough(true);
   CacheJdbcPojoStoreFactory f = new CacheJdbcPojoStoreFactory<>();
   f.setDataSource(datasource);
   f.setDialect(new MySQLDialect());
   JdbcType jdbcType = new JdbcType();
   jdbcType.setCacheName("PersonCache");
   jdbcType.setKeyType(Long.class);
   jdbcType.setValueType(Person.class);
   jdbcType.setDatabaseTable("person");
   jdbcType.setDatabaseSchema("ignite");
   jdbcType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
   jdbcType.setValueFields(new JdbcTypeField(Types.VARCHAR, "first_name", String.class, "firstName"), new JdbcTypeField(Types.VARCHAR, "last_name", String.class, "lastName"), new JdbcTypeField(Types.VARCHAR, "gender", Gender.class, "gender"), new JdbcTypeField(Types.VARCHAR, "country", String.class, "country"), new JdbcTypeField(Types.VARCHAR, "city", String.class, "city"), new JdbcTypeField(Types.VARCHAR, "address", String.class, "address"), new JdbcTypeField(Types.DATE, "birth_date", Date.class, "birthDate"));
   f.setTypes(jdbcType);
   ccfg.setCacheStoreFactory(f);
   cfg.setCacheConfiguration(ccfg, ccfg2);
   return Ignition.start(cfg);
}

Here is the Spring data source configuration for MySQL as a Docker container. It should be noted here that Apache Ignite is not without some drawbacks. For example, it maps to an integer and takes its ordinal value, although it configures VARCHAR as a JDCB type. When such a series is read from the database, it is displayed incorrectly on Enum in the object - you will succeed in this response field .

spring:
datasource:
name: mysqlds
url: jdbc:mysql://192.168.99.100:33306/ignite?useSSL=false
username: ignite
password: ignite123


Enumnull

	new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type")

4. Model objects


As mentioned above, there are two tables in our database schema. There are also two model classes and two cache configurations, one for each model class. The following is an implementation of the model class. One of the most interesting things to note here is generating an ID using a class AtomicLong. This is one of the basic components of Ignite, which serves as a sequence generator. We also see a specific annotation @QuerySqlField; if it accompanies a field, this means that this field can be used in SQL as a query parameter.

	@QueryGroupIndex.List(
   @QueryGroupIndex(name="idx1")
)
public class Person implements Serializable {
   private static final long serialVersionUID = -1271194616130404625L;
   private static final AtomicLong ID_GEN = new AtomicLong();
   @QuerySqlField(index = true)
   private Long id;
   @QuerySqlField(index = true)
   @QuerySqlField.Group(name = "idx1", order = 0)
   private String firstName;
   @QuerySqlField(index = true)
   @QuerySqlField.Group(name = "idx1", order = 1)
   private String lastName;
   private Gender gender;
   private Date birthDate;
   private String country;
   private String city;
   private String address;
   private List contacts = new ArrayList<>();
   public void init() {
      this.id = ID_GEN.incrementAndGet();
   }
   public Long getId() {
      return id;
   }
   public void setId(Long id) {
      this.id = id;
   }
   public String getFirstName() {
      return firstName;
   }
   public void setFirstName(String firstName) {
      this.firstName = firstName;
   }
   public String getLastName() {
      return lastName;
   }
   public void setLastName(String lastName) {
      this.lastName = lastName;
   }
   public Gender getGender() {
      return gender;
   }
   public void setGender(Gender gender) {
      this.gender = gender;
   }
   public Date getBirthDate() {
      return birthDate;
   }
   public void setBirthDate(Date birthDate) {
      this.birthDate = birthDate;
   }
   public String getCountry() {
      return country;
   }
   public void setCountry(String country) {
      this.country = country;
   }
   public String getCity() {
      return city;
   }
   public void setCity(String city) {
      this.city = city;
   }
   public String getAddress() {
      return address;
   }
   public void setAddress(String address) {
      this.address = address;
   }
   public List getContacts() {
      return contacts;
   }
   public void setContacts(List contacts) {
      this.contacts = contacts;
   }
}

5. Ignite repositories


I suppose you know how repositories are created in Spring Data JPA. Repository processing should be provided in the class mainor @Configuration.

@SpringBootApplication
@EnableIgniteRepositories
public class IgniteRestApplication {
   @Autowired
   DataSource datasource;
   public static void main(String[] args) {
    SpringApplication.run(IgniteRestApplication.class, args);
   }
   // ...
}

Then we expand our interface with a @Repositorybasic interface CrudRepository. It supports only inherited methods with a parameter id. In the snippet below, PersonRepositoryI defined several search methods using the naming conventions of v Spring Data and Ignite queries. These examples demonstrate that you can return either a complete object or selected fields from it in the query results, depending on what we need.

@RepositoryConfig(cacheName = "PersonCache")
public interface PersonRepository extends IgniteRepository {
    List findByFirstNameAndLastName(String firstName, String lastName);
    @Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
    List selectContacts(String firstName, String lastName);
    @Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
    List> selectContacts2(String firstName, String lastName);
}

6. API and testing


Now you can embed the repository components in the REST controller classes. The API will provide methods for adding new objects to the cache, updating or deleting existing objects, as well as for searching by primary key or by other, more complex indexes.

@RestController
@RequestMapping("/person")
public class PersonController {
    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);
    @Autowired
    PersonRepository repository;
    @PostMapping
    public Person add(@RequestBody Person person) {
        person.init();
        return repository.save(person.getId(), person);
    }
    @PutMapping
    public Person update(@RequestBody Person person) {
        return repository.save(person.getId(), person);
    }
    @DeleteMapping("/{id}")
    public void delete(Long id) {
        repository.delete(id);
    }
    @GetMapping("/{id}")
    public Person findById(@PathVariable("id") Long id) {
        return repository.findOne(id);
    }
    @GetMapping("/{firstName}/{lastName}")
    public List findByName(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
        return repository.findByFirstNameAndLastName(firstName, lastName);
    }
    @GetMapping("/contacts/{firstName}/{lastName}")
    public List findByNameWithContacts(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
        List persons = repository.findByFirstNameAndLastName(firstName, lastName);
        List contacts = repository.selectContacts(firstName, lastName);
        persons.stream().forEach(it -> it.setContacts(contacts.stream().filter(c -> c.getPersonId().equals(it.getId())).collect(Collectors.toList())));
        LOGGER.info("PersonController.findByIdWithContacts: {}", contacts);
        return persons;
    }
    @GetMapping("/contacts2/{firstName}/{lastName}")
    public List findByNameWithContacts2(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
        List> result = repository.selectContacts2(firstName, lastName);
        List persons = new ArrayList<>();
        for (List l : result) {
            persons.add(mapPerson(l));
        }
        LOGGER.info("PersonController.findByIdWithContacts: {}", result);
        return persons;
    }
    private Person mapPerson(List l) {
        Person p = new Person();
        Contact c = new Contact();
        p.setId((Long) l.get(0));
        p.setFirstName((String) l.get(1));
        p.setLastName((String) l.get(2));
        c.setId((Long) l.get(3));
        c.setType((ContactType) l.get(4));
        c.setLocation((String) l.get(4));
        p.addContact(c);
        return p;
    }
}

Of course, it is important to check the performance of the created solution, especially when it is associated with storing distributed data in RAM and with databases. To do this, I wrote several junit tests that cache a large number of objects and then call the search methods (random data are used for input) - this is how query performance is checked. Here is a method that generates many objects Personand Contactputs them in the cache using the endpoint API.

@Test
public void testAddPerson() throws InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();
    for (int j = 0; j < 10; j++) { es.execute(() -> {
        TestRestTemplate restTemplateLocal = new TestRestTemplate();
            Random r = new Random();
            for (int i = 0; i < 1000000; i++) {
                Person p = restTemplateLocal.postForObject("http://localhost:8090/person", createTestPerson(), Person.class);
                int x = r.nextInt(6);
                for (int k = 0; k < x; k++) {
                    restTemplateLocal.postForObject("http://localhost:8090/contact", createTestContact(p.getId()), Contact.class);
                }
            }
        });
    }
    es.shutdown();
    es.awaitTermination(60, TimeUnit.MINUTES);
}

Spring Boot provides methods for taking basic characteristics to judge API response speed. To activate this feature, you need to enable it depending on Spring Actuator. The Metrics endpoint is available at localhost : 8090 / metrics. It not only shows how much time each API method takes to work, but also displays statistics on indicators such as the number of active threads or free memory.

7. Launching the application


Now we will launch the application which turned out at us in which the Apache Ignite node is built in. I took into account the performance tips contained in the Ignite documentation and determined the JVM configuration shown below.

	java -jar -Xms512m -Xmx1024m -XX:MaxDirectMemorySize=256m -XX:+DisableExplicitGC -XX:+UseG1GC target/ignite-rest-service-1.0-SNAPSHOT.jar

Now you can run the test class JUnit IgniteRestControllerTest. It caches a certain amount of data, and then calls the search methods. Parameters are given for tests where 1M objects Personand 2.5M objects are used in the cache Contact. Each of the search methods is performed on average in 1 ms.

	{
"mem": 624886,
"mem.free": 389701,
"processors": 4,
"instance.uptime": 2446038,
"uptime": 2466661,
"systemload.average": -1,
"heap.committed": 524288,
"heap.init": 524288,
"heap.used": 133756,
"heap": 1048576,
"threads.peak": 107,
"threads.daemon": 25,
"threads.totalStarted": 565,
"threads": 80,
...
"gauge.response.person.contacts.firstName.lastName": 1,
"gauge.response.contact": 1,
"gauge.response.person.firstName.lastName": 1,
"gauge.response.contact.location.location": 1,
"gauge.response.person.id": 1,
"gauge.response.person": 0,
"counter.status.200.person.id": 1000,
"counter.status.200.person.contacts.firstName.lastName": 1000,
"counter.status.200.person.firstName.lastName": 1000,
"counter.status.200.contact": 2500806,
"counter.status.200.person": 1000000,
"counter.status.200.contact.location.location": 1000
}

Also popular now: