Apache Ignite: distributed computing in RAM

Original author: Piotr Mińkowski
  • Transfer


Hello, Habr!

We continue to be interested in new Apache solutions. We hope to release Holden Karau's book “High Performance Spark” in May (a book in typesetting), and in August - the book “Kafka: The Definitive Guide” by Nii Narhid (also in translation). Today, we want to offer a brief introductory article about Apache Ignite and evaluate the scale of interest in the topic.

Enjoy reading!

Apache Ignite is a relatively new solution, however, its popularity is growing rapidly. It is difficult to attribute it to a specific subspecies of database engines, since Ignite features make it similar to several tools. The main purpose of this tool is the storage of distributed data in RAM, as well as the storage of information in the "key-value" format. Ignite also has some common RDBMS features, in particular, support for SQL queries and ACID transactions. But this does not mean that this solution is a typical database for working with transactions in SQL. Foreign key restrictions are not supported here, and transactions are available only at the key-value level. However, Apache Ignite seems like a very interesting solution.

Apache Ignite is easy to launch as a host built into the Spring Boot application. The easiest way to achieve this is with the Spring Data Ignite library. Apache Ignite implements Spring Data interface CrudRepository supporting basic CRUD operations, as well as providing access to the Apache Ignite SQL Grid using the unified Spring Data interfaces. Although it provides data persistence in a disk storage with SQL support and the ACID paradigm, we have developed a solution for storing RAM cache objects in a MySQL database. The architecture of the proposed solution is shown in the figure below - as you can see, it is very simple. The application places data in the RAM cache, arranged in Apache Ignite. Apache Ignite automatically synchronizes these changes with the database during an asynchronous background task. The method of reading data in this application should not surprise you either. If the entity is not cached, then it is read from the database and placed in the cache for the future.



Here I will describe in detail how an application of this kind is developed. The result is posted on GitHub. I found a few more examples on the Internet, but only the basics are covered. I'll show you how to configure Apache Ignite to write objects from the cache to the database, and how to create more complex merge requests using multiple caches. Let's start by starting the database.

1. Set up the MySQL database


To run the MySQL database locally, it is best, of course, to use the Docker container. The MySQL database for Docker for Windows is currently available at 192.168.99.100:33306.

	docker run -d --name mysql -e MYSQL_DATABASE=ignite -e MYSQL_USER=ignite -e MYSQL_PASSWORD=ignite123 -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -p 33306:3306 mysql

Next, we create the tables used by the application entities to store data: PERSON , CONTACT . They refer to tables as 1 ... N, where the table CONTACT contains a foreign key pointing to PERSON id .

	CREATE TABLE `person` (
  `id` int(11) NOT NULL,
  `first_name` varchar(45) DEFAULT NULL,
  `last_name` varchar(45) DEFAULT NULL,
  `gender` varchar(10) DEFAULT NULL,
  `country` varchar(10) DEFAULT NULL,
  `city` varchar(20) DEFAULT NULL,
  `address` varchar(45) DEFAULT NULL,
  `birth_date` date DEFAULT NULL,
  PRIMARY KEY (`id`)
);
 
CREATE TABLE `contact` (
  `id` int(11) NOT NULL,
  `location` varchar(45) DEFAULT NULL,
  `contact_type` varchar(10) DEFAULT NULL,
  `person_id` int(11) NOT NULL,
  PRIMARY KEY (`id`)
);
 
ALTER TABLE `ignite`.`contact` ADD INDEX `person_fk_idx` (`person_id` ASC);
ALTER TABLE `ignite`.`contact`
ADD CONSTRAINT `person_fk` FOREIGN KEY (`person_id`) REFERENCES `ignite`.`person` (`id`) ON DELETE CASCADE ON UPDATE CASCADE;

2. Configure Maven


To get started with the Spring Data repository for Apache Ignite, the easiest way is to add the following Maven dependency to pom.xml our application file pom.xml . All other Ignite dependencies will be included automatically. We will also need the MySQL JDBC driver and Spring JDBC dependencies to configure the database connection. They are necessary because we embed Apache Ignite in the application, and you need to connect to the MySQL database so that you can synchronize the cache with the database tables.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <scope>runtime</scope>
</dependency>
<dependency>
   <groupId>org.apache.ignite</groupId>
   <artifactId>ignite-spring-data</artifactId>
   <version>${ignite.version}</version>
</dependency>

3. Configure the Ignite node


The class IgniteConfiguration allows you to configure all available settings of the Ignite node. In this case, the cache configuration (1) is most important. You should add the master key and entity classes as indexed types (2). Next, you need to provide for the export of cache updates to the database (3) and read from the database that information that does not appear in the cache (4). The interaction between the Ignite node and MySQL can be configured using the class CacheJdbcPojoStoreFactory (5). There you need to convey DataSource @Bean (6), dialect (7) and the correspondence between the fields of the object and the columns of the table (8).

@Bean
public Ignite igniteInstance() {
   IgniteConfiguration cfg = new IgniteConfiguration();
   cfg.setIgniteInstanceName("ignite-1");
   cfg.setPeerClassLoadingEnabled(true);
 
   CacheConfiguration<Long, Contact> ccfg2 = new CacheConfiguration<>("ContactCache"); // (1)
   ccfg2.setIndexedTypes(Long.class, Contact.class); // (2)
   ccfg2.setWriteBehindEnabled(true);
   ccfg2.setWriteThrough(true); // (3)
   ccfg2.setReadThrough(true); // (4)
   CacheJdbcPojoStoreFactory<Long, Contact> f2 = new CacheJdbcPojoStoreFactory<>(); // (5)
   f2.setDataSource(datasource); // (6)
   f2.setDialect(new MySQLDialect()); // (7)
   JdbcType jdbcContactType = new JdbcType(); // (8)
   jdbcContactType.setCacheName("ContactCache");
   jdbcContactType.setKeyType(Long.class);
   jdbcContactType.setValueType(Contact.class);
   jdbcContactType.setDatabaseTable("contact");
   jdbcContactType.setDatabaseSchema("ignite");
   jdbcContactType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
   jdbcContactType.setValueFields(new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type"), new JdbcTypeField(Types.VARCHAR, "location", String.class, "location"), new JdbcTypeField(Types.INTEGER, "person_id", Long.class, "personId"));
   f2.setTypes(jdbcContactType);
   ccfg2.setCacheStoreFactory(f2);
 
   CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>("PersonCache");
   ccfg.setIndexedTypes(Long.class, Person.class);
   ccfg.setWriteBehindEnabled(true);
   ccfg.setReadThrough(true);
   ccfg.setWriteThrough(true);
   CacheJdbcPojoStoreFactory<Long, Person> f = new CacheJdbcPojoStoreFactory<>();
   f.setDataSource(datasource);
   f.setDialect(new MySQLDialect());
   JdbcType jdbcType = new JdbcType();
   jdbcType.setCacheName("PersonCache");
   jdbcType.setKeyType(Long.class);
   jdbcType.setValueType(Person.class);
   jdbcType.setDatabaseTable("person");
   jdbcType.setDatabaseSchema("ignite");
   jdbcType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
   jdbcType.setValueFields(new JdbcTypeField(Types.VARCHAR, "first_name", String.class, "firstName"), new JdbcTypeField(Types.VARCHAR, "last_name", String.class, "lastName"), new JdbcTypeField(Types.VARCHAR, "gender", Gender.class, "gender"), new JdbcTypeField(Types.VARCHAR, "country", String.class, "country"), new JdbcTypeField(Types.VARCHAR, "city", String.class, "city"), new JdbcTypeField(Types.VARCHAR, "address", String.class, "address"), new JdbcTypeField(Types.DATE, "birth_date", Date.class, "birthDate"));
   f.setTypes(jdbcType);
   ccfg.setCacheStoreFactory(f);
 
   cfg.setCacheConfiguration(ccfg, ccfg2);
   return Ignition.start(cfg);
}

Here is the Spring data source configuration for MySQL as a Docker container.

spring:
datasource:
name: mysqlds
url: jdbc:mysql://192.168.99.100:33306/ignite?useSSL=false
username: ignite
password: ignite123


It should be noted here that Apache Ignite is not without some drawbacks. For example, it maps Enum to an integer and takes its ordinal value, although it configures VARCHAR as a JDCB type. When such a series is read from the database, it is incorrectly displayed on Enum in the object - you will succeed in this response field null .

	new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type")

4. Model objects


As mentioned above, there are two tables in our database schema. There are also two model classes and two cache configurations, one for each model class. The following is an implementation of the model class. One of the most interesting things to note here is to generate an ID using a class AtomicLong . This is one of the basic components of Ignite, which serves as a sequence generator. We also see a specific annotation @QuerySqlField ; if it accompanies a field, it means that this field can be used in SQL as a query parameter.

	@QueryGroupIndex.List(
   @QueryGroupIndex(name="idx1")
)
public class Person implements Serializable {
 
   private static final long serialVersionUID = -1271194616130404625L;
   private static final AtomicLong ID_GEN = new AtomicLong();
 
   @QuerySqlField(index = true)
   private Long id;
   @QuerySqlField(index = true)
   @QuerySqlField.Group(name = "idx1", order = 0)
   private String firstName;
   @QuerySqlField(index = true)
   @QuerySqlField.Group(name = "idx1", order = 1)
   private String lastName;
   private Gender gender;
   private Date birthDate;
   private String country;
   private String city;
   private String address;
   private List<Contact> contacts = new ArrayList<>();
 
   public void init() {
      this.id = ID_GEN.incrementAndGet();
   }
 
   public Long getId() {
      return id;
   }
 
   public void setId(Long id) {
      this.id = id;
   }
 
   public String getFirstName() {
      return firstName;
   }
 
   public void setFirstName(String firstName) {
      this.firstName = firstName;
   }
 
   public String getLastName() {
      return lastName;
   }
 
   public void setLastName(String lastName) {
      this.lastName = lastName;
   }
 
   public Gender getGender() {
      return gender;
   }
 
   public void setGender(Gender gender) {
      this.gender = gender;
   }
 
   public Date getBirthDate() {
      return birthDate;
   }
 
   public void setBirthDate(Date birthDate) {
      this.birthDate = birthDate;
   }
 
   public String getCountry() {
      return country;
   }
 
   public void setCountry(String country) {
      this.country = country;
   }
 
   public String getCity() {
      return city;
   }
 
   public void setCity(String city) {
      this.city = city;
   }
 
   public String getAddress() {
      return address;
   }
 
   public void setAddress(String address) {
      this.address = address;
   }
 
   public List<Contact> getContacts() {
      return contacts;
   }
 
   public void setContacts(List<Contact> contacts) {
      this.contacts = contacts;
   }
 
}

5. Ignite repositories


I suppose you know how repositories are created in Spring Data JPA. Repository processing must be provided in the class main or @Configuration .

@SpringBootApplication
@EnableIgniteRepositories
public class IgniteRestApplication {
 
   @Autowired
   DataSource datasource;
 
   public static void main(String[] args) {
    SpringApplication.run(IgniteRestApplication.class, args);
   }
 
   // ...
}

Then we expand our interface with a @Repository basic interface CrudRepository . It supports only inherited methods with a parameter id . In the snippet below, PersonRepository I defined several search methods using the naming conventions of v Spring Data and Ignite queries. These examples demonstrate that you can return either the complete object or the selected fields from it in the query results, depending on what we need.

@RepositoryConfig(cacheName = "PersonCache")
public interface PersonRepository extends IgniteRepository<Person, Long> {
 
    List<Person> findByFirstNameAndLastName(String firstName, String lastName);
 
    @Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
    List<Contact> selectContacts(String firstName, String lastName);
 
    @Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
    List<List<?>> selectContacts2(String firstName, String lastName);
}

6. API and testing


Now you can embed the repository components in the REST controller classes. The API will provide methods for adding new objects to the cache, updating or deleting existing objects, as well as for searching by primary key or by other, more complex indexes.

@RestController
@RequestMapping("/person")
public class PersonController {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);
 
    @Autowired
    PersonRepository repository;
 
    @PostMapping
    public Person add(@RequestBody Person person) {
        person.init();
        return repository.save(person.getId(), person);
    }
 
    @PutMapping
    public Person update(@RequestBody Person person) {
        return repository.save(person.getId(), person);
    }
 
    @DeleteMapping("/{id}")
    public void delete(Long id) {
        repository.delete(id);
    }
 
    @GetMapping("/{id}")
    public Person findById(@PathVariable("id") Long id) {
        return repository.findOne(id);
    }
 
    @GetMapping("/{firstName}/{lastName}")
    public List<Person> findByName(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
        return repository.findByFirstNameAndLastName(firstName, lastName);
    }
 
    @GetMapping("/contacts/{firstName}/{lastName}")
    public List<Person> findByNameWithContacts(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
        List<Person> persons = repository.findByFirstNameAndLastName(firstName, lastName);
        List<Contact> contacts = repository.selectContacts(firstName, lastName);
        persons.stream().forEach(it -> it.setContacts(contacts.stream().filter(c -> c.getPersonId().equals(it.getId())).collect(Collectors.toList())));
        LOGGER.info("PersonController.findByIdWithContacts: {}", contacts);
        return persons;
    }
 
    @GetMapping("/contacts2/{firstName}/{lastName}")
    public List<Person> findByNameWithContacts2(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
        List<List<?>> result = repository.selectContacts2(firstName, lastName);
        List<Person> persons = new ArrayList<>();
        for (List<?> l : result) {
            persons.add(mapPerson(l));
        }
        LOGGER.info("PersonController.findByIdWithContacts: {}", result);
        return persons;
    }
 
    private Person mapPerson(List<?> l) {
        Person p = new Person();
        Contact c = new Contact();
        p.setId((Long) l.get(0));
        p.setFirstName((String) l.get(1));
        p.setLastName((String) l.get(2));
        c.setId((Long) l.get(3));
        c.setType((ContactType) l.get(4));
        c.setLocation((String) l.get(4));
        p.addContact(c);
        return p;
    }
 
}

Of course, it is important to check the performance of the created solution, especially when it is associated with storing distributed data in RAM and with databases. To do this, I wrote several junit tests that cache a large number of objects and then call the search methods (random data are used for input) - this is how query performance is checked. Here is a method that generates many objects Person and Contact puts them in the cache using the endpoint API.

@Test
public void testAddPerson() throws InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();
    for (int j = 0; j < 10; j++) { es.execute(() -> {
        TestRestTemplate restTemplateLocal = new TestRestTemplate();
            Random r = new Random();
            for (int i = 0; i < 1000000; i++) {
                Person p = restTemplateLocal.postForObject("http://localhost:8090/person", createTestPerson(), Person.class);
                int x = r.nextInt(6);
                for (int k = 0; k < x; k++) {
                    restTemplateLocal.postForObject("http://localhost:8090/contact", createTestContact(p.getId()), Contact.class);
                }
            }
        });
    }
    es.shutdown();
    es.awaitTermination(60, TimeUnit.MINUTES);
}

Spring Boot provides methods for taking basic characteristics to judge the response speed of the API. To activate this feature, you need to enable it depending on Spring Actuator . The Metrics endpoint is available at localhost : 8090 / metrics. It not only shows how much time each API method takes to work, but also displays statistics on indicators such as the number of active threads or free memory.

7. Launching the application


Now we will launch the application which turned out at us in which the Apache Ignite node is built in. I took into account the performance tips contained in the Ignite documentation and determined the JVM configuration shown below.

	java -jar -Xms512m -Xmx1024m -XX:MaxDirectMemorySize=256m -XX:+DisableExplicitGC -XX:+UseG1GC target/ignite-rest-service-1.0-SNAPSHOT.jar

Now you can run the test class JUnit IgniteRestControllerTest . It caches a certain amount of data, and then calls the search methods. Parameters are given for tests where 1M objects Person and 2.5M objects are used in the cache Contact . Each of the search methods is performed on average in 1 ms.

	{
"mem": 624886,
"mem.free": 389701,
"processors": 4,
"instance.uptime": 2446038,
"uptime": 2466661,
"systemload.average": -1,
"heap.committed": 524288,
"heap.init": 524288,
"heap.used": 133756,
"heap": 1048576,
"threads.peak": 107,
"threads.daemon": 25,
"threads.totalStarted": 565,
"threads": 80,
...
"gauge.response.person.contacts.firstName.lastName": 1,
"gauge.response.contact": 1,
"gauge.response.person.firstName.lastName": 1,
"gauge.response.contact.location.location": 1,
"gauge.response.person.id": 1,
"gauge.response.person": 0,
"counter.status.200.person.id": 1000,
"counter.status.200.person.contacts.firstName.lastName": 1000,
"counter.status.200.person.firstName.lastName": 1000,
"counter.status.200.contact": 2500806,
"counter.status.200.person": 1000000,
"counter.status.200.contact.location.location": 1000
}