About Oracle Coherence in Russian: Why is it needed?

    In this article you will find the answer to this question, and it will also explain the basic concepts of distributed computing technology in Oracle Coherence. This is an introductory article whose main task is to explain the “glossary” of terms used by Coherence developers. I will give terms, including in English, to facilitate the search for information for those who want to know more in this direction.
    For those who are interested in this topic, I ask for a cut

    So, suppose that we have a task to quickly calculate some sort of task for a large amount of data. What does “large volume" of data mean? This is such a volume that it makes no sense to upload to the client due to the fact that the client will not be able to fit on its side all the necessary data. The dilemma is how to get the result, without downloading the entire amount of data to the client. A possible solution would be to make subsets of a large amount of data and collect intermediate results on the client in a loop. Such a solution is good for everyone, except that sequential execution will be much longer than execution over the entire set at one time (time will be spent on request / response, preparing a subset of data and sending subsets of data to the client for calculation). Also, data may become outdated during this sequential action.
    Here such solutions as Oracle Coherence, Hadoop, Gemfire, etc. come to the rescue.

    Let's walk through the basics of Oracle Coherence.
    We read the documentation and see the following: “Oracle Coherence is an in memory data grid solution that enables ...”.
    “In memory” - this means that the data is stored in the computer’s memory (it can also be on disk, but more on that later).
    “Data grid solution” - this means that the data is distributed across the cluster and not concentrated in one place.

    But first things first. Let’s first understand what “building blocks” are there for realizing the tasks set.

    Nod

    A node is just a java process (running the com.tangosol.net.DefaultCacheServer class) with coherence.jar in the classpath and configuration files. You can run multiple nodes on the same / different machines, under the same or different users without restrictions. That is, it is important to understand that this is just a java process and it can / should be debited in the same way as any server application that you write.

    Cluster

    A cluster is a collection of several nodes. Nodes in the default configuration will find each other automatically by multicast. If necessary, you can configure WKA (well known addresses) if system administrators are unhappy that you “clogged the entire network with your multicast”. A cluster always has a master member (senior member) that monitors what happens to the cluster (how many nodes there are, which ones store data, where to copy data if one of the nodes “crashes”, etc.). The master node is the first node that started. If the master node “crashes” for some reason, the next master node is automatically assigned. It should be noted that during data processing the master node is not used. The calculations are performed on the nodes where the required data is. As a rule, nodes are divided by purpose: proxies, Computing and data storage nodes. If all the nodes “fell” at all, then you have no data. That is, you need to think in advance how the data / changes will be saved and how to load after the system boots.
    During development, it is recommended that you configure a development environment similar to production. This will allow you to find many serialization and communication errors between nodes before you release the version in production.

    Node configuration

    By default, configuration files are not needed, in this case the files from coherence.jar will be used. The configuration files provided by default are not suitable for the production system, they need to be changed for a specific task. Some even recommend deleting these files from the coherence.jar file.
    There are 3 main configuration files that you have to work with:
    tangosol-coherence.xml - this file is responsible for the configuration of the cluster as a whole. For example, the cluster name is configured in this file.
    coherence-cache-config.xml - this file is responsible for the configuration of various caches that the cluster will serve.
    coherence-pof-config.xml - this file is intended to determine what data will be processed by the cluster. Also, this file defines how the data will be serialized for transmission and storage in the cluster.

    There are so-called overrirde files (e.g. tangosol-coherence-override.xml). The settings in this file overwrite the settings for the base files. That is, if you have tangosol-coherence.xml and tangosol-coherence-override.xml in the classpath, then all installations will be loaded from the first file and overwritten with the settings from the second.
    You can have several identical files in the classpath, but only the first file found will be used. You can also install the necessary configuration files using system (-D) settings.
    When the cluster starts, it writes which files were used to configure the system. Something similar to the following appears in the logs:
    Loaded operational configuration from resource ...
    Loaded operational overrides from resource ...
    Loaded operational overrides from resource ...

    Proxies (extend) nodes

    Proxies (extend) nodes are nodes that are used to provide access to the cluster from clients. Configuration should be done both on the server side and on the client side. That is, if you have an application written on the .NET platform, then you have to install the .NET client (coherence-net- <version> .zip) and provide coherence-cache-config.xml, which will describe the cluster details, to which needs to be connected. On the server side, you will need to provide the coherence-cache-config.xml file in which it will be described, where the address and port on which you want to listen to incoming calls will be indicated. Both the client and the server must provide coherence-pof-config.xml, which describes the data formats for communication between the client and server.
    Proxy nodes should not be used for calculations. If during debugging the application your debugger stops at the proxy node, this means that the cluster configuration is usually performed incorrectly.

    Nodes for data storage (storage nodes)

    These are nodes that have the environment variable -Dtangosol.coherence.distributed.localstorage = true set. By default, a node stores data in a java heap, but you can also “drop” it onto a disk and load it as needed. You can perform calculations on these nodes, but you need to understand that you need to produce as little garbage as possible in the calculation process so that the node does not “fall” due to lack of memory (OutOfMemory). If the node “falls” for any reason, the data from it will be copied to other nodes, thus the overall capacity of the cluster will decrease. This can give rise to a cascade effect if there is not enough free space in the cluster, and then the entire cluster can “fall”, node after node. As a rule, important data has a second copy (which is prescribed in the configuration settings), so the loss of a single node is not critical.
    Data that is an intermediate result and easily calculated from the main data does not need a second copy. Data storage can be configured to have copies on another node, on another physical machine, or even on a different rack in another city. These are all configuration parameters and nothing needs to be programmed here. The data storage parameters are quite flexible and allow you to configure the system for a specific task.

    Compute Nodes (application tier / storage disabled nodes)

    These are nodes that have the environment variable -Dtangosol.coherence.distributed.localstorage = false set. These nodes are used to evenly distribute computations to a cluster. Intermediate calculations can also be cached on these nodes. All business logic that you want to implement for this application must be executed in the cluster on these nodes.

    Let's look at how the process of forwarding a call through a hierarchy of nodes is implemented. You have nodes for data storage, computational nodes and proxy nodes. No data is stored on proxy nodes and caches are not configured. On compute nodes, you configure caches, but without the ability to store data in caches. On the data storage nodes you have data. On the client side, you should not use caches on which data is stored. That is, you do not perform calculations on the data itself directly from the client, but always use computing nodes to perform operations on the data. Thus, you isolate data from client applications, which gives you the opportunity to change the storage architecture in the future without changing the client. All nodes in the cluster “know” where and what cache is located. It turns out that if you send a task for execution to a cache configured for calculations, it will be executed in a group of computational nodes using data from the nodes on which the data is stored. This may not sound clear, but this is a separate topic for the article.

    Localization of data (data affinity)

    In some cases, it is useful to have data grouped together according to some principle. For example, you can group data so that nodes located on the same physical machine have dependent data. In this case, you will not have network latency and calculations will be faster.

    The mechanisms for submitting a task for execution are as follows: EntryAggregator, EntryProcessor, InvocationService, MapListener, EventInterceptor

    Aggregator (EntryAggregator) - this is the task that will be performed on copies of the data. That is, you will not succeed in changing the data in the cache from the aggregator. Work occurs with read-only data. Typical tasks are the sum, minimum, maximum.
    Processor (EntryProcessor) - this task, which involves changing data within the cache. That is, if you want to change the data inside the cache, where the data is physically located, you need to use a processor for this. A nice feature of the processor is locking on data during processing. That is, if you have several operations that must be called sequentially, then you need to use a processor, since only one processor will work on this piece of data at a particular point in time.
    InvocationService is a node-level task. In this case, you are working roughly with Java processes, not data. Tasks of this type should implement Invocable, which in turn is Runnable.
    MapListener - this task will be performed asynchronously, as a reaction to events at the cache level.
    EventInterceptor - this task is similar to the previous one in the sense that it will be executed as a reaction to the event, but the difference is that the listener will be executed on all nodes on which the cache is configured, and the interceptor - only on nodes that have data, for which the event is executed. The interceptor also has the ability to be called before or after the event.
    A detailed explanation of how various types of tasks work is beyond the scope of this article.

    POF (portable object format) serialization

    All data in the cluster is stored in a byte array. Fields of a serialized object are stored sequentially (each field has its own index) and exactly as you write in the readExternal / writeExternal methods of a class that implements the PortableObject interface or serialize / deserialize class that implements the PofSerializer interface. Inside a byte array, fields are stored sequentially. The array scan also occurs sequentially. A non-obvious conclusion follows from this: the most used fields should be closer to the beginning of the byte array. Object data written to the array can be nested and have its own serialization. That is, when implementing the PortableObject and PofSerializer interfaces, you translate the hierarchical structure of the java object into the flat structure of a byte array.
    Coherence provides serialization for standard objects from jdk (java.lang). All objects that must be stored in the cluster must be described in the coherence-pof-config.xml file. Each data type has its own number. The numbers of your data types should begin with 1000. Thus, you get a structure that is well portable from one platform to another, and from one programming language to another. Each class that will be serialized in the cluster must have properly implemented hashCode and equals methods.

    Retrieving data from a cluster (ValueExtractor)

    From the previous paragraph, we know that data is stored in a byte array. In order to extract data, you need to write a class that implements the ValueExtractor interface. Coherence will use this class in order to get the necessary part of a serialized object and present it as a class with which you can work. That is, you have the opportunity to "pull out" from the data not the entire object as a whole, but only what you need at the moment for calculations. Thus, you have reduced the amount of data sent over the network.

    Partition (partition)

    Coherence provides the ability to store data in a key-value fashion, but key and value are logical concepts in the system. At the physical level, data is grouped in a partition. That is, several keys and values ​​may belong to the same partition. Partition is a data storage unit. When the nodes fall and the data regroups between the nodes, the partition is copied as a whole. A class that assigns partitions to a specific object implements the KeyPartitioningStrategy interface. By default, the partition is assigned according to the hash code of the Binary key object (com.tangosol.util.Binary object “wraps” the byte array). You yourself can influence how the partition is assigned by providing your implementation of the KeyPartitioningStrategy interface.

    Index

    Like a database, an index in Coherence is used to speed up data retrieval. To create an index, the QueryMap.addIndex method is used (ValueExtractor extractor, boolean ordered, java.util.Comparator comparator).
    ValueExtractor is used to select the necessary data for the index from an array of bytes. When you call the addIndex method, this does not mean at all that the cluster will start indexing the data right now. This call is a recommendation to create an index when resources allow it. After its creation, data changes will be displayed correctly in the index. This method can be called several times, and if the index already exists, then it will not be recreated. An index is a node level structure. That is, when data is copied from one node to another, the index will not be copied, instead, it will be changed in accordance with the data that is on this node. The data in the index is stored in deserialized form, so if you need to get the data quickly and without deserialization, create an index. Naturally, you have to “pay” for convenience and speed, and you pay with free space in the cluster and computing resources. Inside the index consists of two sub-indices (forward and reverse). The forward index stores data in the form of key-> value (which was provided by the extractor), the reverse index stores data in the form of value-> set of keys.

    Caches: replicated, distributed, near

    Replicated is a cache in which all data is stored in deserialized form on each of the nodes. This type of cache that provides the fastest read operations, but slow write operations. The fact is that in the case of writing, the data must be copied to all the nodes where this cache is configured. This type of cache is typically used for rarely changing small amounts of data.
    Distributed is the main type of cache that is used to store data. It allows you to overcome the limitations on the size of RAM allocated to a separate node, as if "spreading" the data across the entire cluster. This type of cache also provides horizontal scalability due to the inclusion of new nodes in the cluster, as well as fault tolerance by storing a copy of the data on other nodes.
    Near is a hybrid type of cache that is configured on the calling side (the calling side can be either a client or another node inside the cache). Typically, this cache "stands" in front of the distributed cache, and stores the most commonly used data. Data is stored in deserialized form. In the case of near cache, it is likely that the data is out of date, so you need to configure how the data will be updated.

    Service

    This is a group of java threads that are responsible for communication with other nodes of the cluster, performing tasks sent to execute for stored data, copying data in case of failure of other nodes, as well as other data maintenance tasks. It sounds quite abstract, but this is exactly what allows you to easily work with data. A service can serve multiple caches. The only thing that is important to know and remember during the development process is that the service does not support reentry calls. For example, you sent EntryProcessor to execute and from it you make a call to the cache serviced by this service. You will immediately get an InterruptedException.

    Now that we have the basic building blocks of concepts, we can answer the question of why Coherence is needed at all.
    The answer is simple: you have a fault tolerant, horizontally scalable data warehouse that provides quick access to data for parallel computing. Due to the presence of several nodes, you do not have a limit on the size of data that you can store in a cluster (of course, you are limited by the size of available memory on physical machines allocated for this task). You do not have a size limit on a single key-value pair. You can also extract from the stored data only what you need for calculations, so a minimum of information will be copied over the network. In general, the whole ideology of Coherence is built on sending only what is needed over the network. Also, you can configure services and calculation levels flexibly enough for your task. As a result, complex tasks will be solved quickly.
    From a management point of view, you are buying a solution that will satisfy many requirements. Once you upload data to the system, you can extract it in various ways and use it on other systems that use Coherence as a data warehouse. Thus, laying the foundation of Coherence, you can build an ecosystem for the extraction and processing of data.

    If you are interested in this topic, I can continue the series of articles on Coherence. Write what you are interested in, and I will try to answer.
    So far, in terms of:
    • configuration of the basic cluster structure
    • work with EntryAggregator
    • work with EntryProcessor
    • asynchronous calls in Coherence
    • look inside the binary object
    • work with indexes

    In general, it should be borne in mind that it is very easy to start with Coherence, but very difficult to do well and quickly, therefore the goal of the article series is to fill the gap between the initial level of familiarity with the system and the advanced level of the developer.

    Recently, my work colleague wrote a book for advanced developers, which I recommend reading. In this book you will not find basic knowledge, but you will find examples of solutions (with explanations) of rather complex problems. Author: David Whitmarsh

    Also popular now: