Getting started with Tarantool in a Java project

  • Tutorial
    In the article below I will try to briefly talk about what Tarantool is and how to start using it in an existing project if you are programming in Java. If you program in another language, then you may be interested in some tools available in the connector, such as the ability to edit xlog files and create snap files from any data. If you do not know what Tarantool is, then it is better to read this post .

    Tarantool is a data warehouse tuple key. All data and indexes are stored in RAM. Values ​​make up a tuple, then tuple , tuples - space, further space, spaces - data model. 3 types of data are supported: 32 bit unsigned integer, 64 bit unsigned integer and binary string, then NUM , NUM64 and STR respectively. For any space, the type and structure of the primary index must be defined, for example: HASH in the fields 1,2 where 1 is NUM and 2 is NUM64. Secondary indexes are specified in the same way as primary indexes. DML operations are atomic at the tuple level and are performed only on the primary index. To perform several operations atomically, you need to use the built-in language Lua . Data safety is ensured by saving a snapshot of the current state, then snapshot , and a binary log, then xlog. For storage of tuples slab is used .

    In the examples below, the java connector is used, more details can be found at dgreenru.github.com/tarantool-java . The latest stable version at the time of writing is 0.1.2 . Below I will consider an example of using additional functionality that allows you to transfer and synchronize data with any other repositories.

Example of migrating a MySQL table to the Tarantool Box:



mysql> desc user;
+------------+--------------+------+-----+-------------------+----------------+
| Field      | Type         | Null | Key | Default           | Extra          |
+------------+--------------+------+-----+-------------------+----------------+
| id         | int(11)      | NO   | PRI | NULL              | auto_increment |
| username   | varchar(255) | NO   | UNI | NULL              |                |
| email      | varchar(255) | NO   | UNI | NULL              |                |
| enabled    | tinyint(1)   | NO   |     | 1                 |                |
| registered | timestamp    | NO   |     | CURRENT_TIMESTAMP |                |
+------------+--------------+------+-----+-------------------+----------------+
5 rows in set


    Primary index id and 2 secondary unique indexes username and email. Of the places that are not tolerated by default, auto_increment and timestamp can be distinguished . For the former, you can use the box.auto_increment stored procedure , and for the latter, you can store data in yyyyMMddhhmmss format or seconds. If the user table is small enough, then you can simply read the data from mysql and paste it into the Tarantool Box, I will not stop on this task, but tell you what to do if the table is very large, i.e. contains a lot of records, albeit each one is small. First you need to unload the data in a format convenient for us, preferably not taking up much server resources.

mysql> select * into outfile '/tmp/user' from user;
Query OK, 73890541 rows affected
$ head -1 /tmp/user
1	username	email@domain.tld	1	2012-10-14 01:27:05


    By copying the file to the desired server or local computer, you can begin to process it and convert it to the Tarantool Box format. In the example below, for simplicity, escape sequences are not considered. If you have tabs, line breaks, carriage returns, backslashes or fields that contain NULL values ​​in your tables, you need to add their processing yourself.

BufferedReader reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream("/tmp/user.gz")), "utf-8"));
SnapshotWriter writer = new SnapshotWriter(new FileOutputStream("/tmp/user.snap").getChannel());
String line = null;
DateFormat indf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
DateFormat outdf = new SimpleDateFormat("yyyyMMddhhmmss");
Pattern pattern = Pattern.compile("\t");
while ((line = reader.readLine()) != null) {
	try {
		String[] values = pattern.split(line);
		if (values.length == 5) {
			Integer id = Integer.parseInt(values[0]);
			String username = values[1];
			String email = values[2];
			byte[] enabled = { Byte.valueOf(values[3]) };
			Long registered = Long.parseLong(outdf.format(indf.parse(values[4])));
			Tuple tuple = new Tuple(5).setInt(0, id).setString(1, username, "UTF-8")
			.setString(2, email, "UTF-8").setBytes(3, enabled).setLong(4, registered);
			writer.writeRow(0, tuple);
		} else {
			System.err.println("Line should be splited in 5 parts, but has " + values.length + " for " + line);
		}
	} catch (Exception e) {
		System.err.println("Can't parse line " + line);
		e.printStackTrace();
	}
}
writer.close();
reader.close();


As a result, we have a file

$ ls -sh /tmp/user.snap
16.1G /tmp/user.snap


Now you need to configure space 0 accordingly.

# Этот параметр ограничивает суммарный размер памяти выделенной под slab блоки. 
# Индексы и другие накладные расходы хранятся вне slab, 
# поэтому суммарная память процесса может быть до 2х больше.
# В нашем случае логично поставить здесь 24 гигабайта.
slab_alloc_arena = 24
# Так же имеет смысл откорректировать количество записей в одном xlog файле.
rows_per_wal = 500000
# И конечно же конфигурация ключей space 0
space[0].enabled = 1
# id. Чтобы использовать box.auto_increment тип дерева должен быть TREE. 
space[0].index[0].type = "TREE"
space[0].index[0].unique = 1
space[0].index[0].key_field[0].fieldno = 0
space[0].index[0].key_field[0].type = "NUM"
#username
space[0].index[1].type = "HASH"
space[0].index[1].unique = 1
space[0].index[1].key_field[0].fieldno = 1
space[0].index[1].key_field[0].type = "STR"
#password
space[0].index[2].type = "HASH"
space[0].index[2].unique = 1
space[0].index[2].key_field[0].fieldno = 2
space[0].index[2].key_field[0].type = "STR"


Next, we need to replace 00000000000000000001.snap located in the work_dir folder from the configuration file with the file we created.

$ mv /tmp/user.snap /var/lib/tarantool/00000000000000000001.snap


and try to start the server

$ tarantool_box --background   
$ ps -C tarantool_box -o pid=,cmd=
 8853 tarantool_box: primary pri: 33013 sec: 33014 adm: 33015


also look at the tarantool.log file, if it starts successfully, it will end with lines similar to the ones below, in case of an error, you will immediately see the reason.

1350504007.249 7127 1/sched _ I> Space 0: done
1350504007.249 7127 101/33013/primary _ I> bound to port 33013
1350504007.249 7127 101/33013/primary _ I> I am primary
1350504007.249 7127 102/33014/secondary _ I> bound to port 33014
1350504007.250 7127 103/33015/admin _ I> bound to port 33015
1350504007.251 7127 1/sched _ C> log level 4
1350504007.251 7127 1/sched _ C> entering event loop					


Further, the correctness of the data insertion can be checked in a simple way.

$ tarantool -a 127.0.0.1
127.0.0.1> select * from t0 where k0 = 1
Select OK, 1 rows affected
[1, 'username', 'email@domain.tld', '\x01', '\x21\x8b\xe4\xc9\x4c\x12']
127.0.0.1> select * from t0 where k1 = 'username'
Select OK, 1 rows affected
[1, 'username', 'email@domain.tld', '\x01', '\x21\x8b\xe4\xc9\x4c\x12']
127.0.0.1> select * from t0 where k2 = 'email@domain.tld'
Select OK, 1 rows affected
[1, 'username', 'email@domain.tld', '\x01', '\x21\x8b\xe4\xc9\x4c\x12']


those. we checked the location of data on the 3 keys specified by us in the config. Next, you can see the amount of memory consumed by the process in the system and the report of the show slab command in the Tarantool Box console.

    Tarantool Box is launched, now you need to take care of backing up the data and keeping the MySQL table up to date in case some queries use the data from it for their own purposes. It is quite simple to organize using the ReplicationClient class. It will allow you to have an almost complete xlog backup without using a full-fledged slave server and organize updating the table in MySQL without the cost of additional resources and time. Remember to specify replication_portin the config to make replication possible. The class described below, we save all the logs received from the server into files with a length of 50 thousand records. The algorithm is quite simple:
1. search for existing logs
2. determine the maximum lsn
3. connect to the replication port
4. translate the received data to the file
The MySQL update logic is missing in this code, but it is easy to implement it by changing the loop in the main function a little. The difficult place is the extension of the ReplicationClient class with code that writes the received data to the binary log, expanding it to the xlog format. At this place you can not particularly stop, because This example is more likely a workpiece for a real application than a demonstration of use.

public class Backup {
	protected DecimalFormat xlogNameFormat = new DecimalFormat("00000000000000000000");
	protected String folder;
	protected FileChannel xlogChannel;
	protected int row;
	protected int limit = 50000;
	protected long lsn = 0L;
	protected ReplicationClient client;
	protected XLogWriter writer;
	public void setLimit(int limit) {
		this.limit = limit;
	}
	public Backup(String folder, String host, int port) throws IOException {
		this.folder = folder;
	}
	protected void getLatestLSN(String folder) throws IOException, FileNotFoundException {
		final File backupFolder = new File(folder);
		String[] xlogs = backupFolder.list(new FilenameFilter() {
			@Override
			public boolean accept(File dir, String name) {
				return name.endsWith(".xlog");
			}
		});
		boolean hasLogs = xlogs != null && xlogs.length > 0;
		if (hasLogs) {
			Arrays.sort(xlogs);
			XLogReader reader = new XLogReader(new FileInputStream(folder + "/" + xlogs[xlogs.length - 1]).getChannel());
			XLogEntry xlogEntry = null;
			while ((xlogEntry = reader.nextEntry()) != null) {
				lsn = xlogEntry.header.lsn;
			}
			reader.close();
		}
	}
	public void start() throws IOException {
		getLatestLSN(folder);
		System.out.println("Planning to start from lsn: " + lsn);
		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					synchronized (this) {
						close();
					}
				} catch (IOException e) {
					throw new IllegalStateException("Can't close xlog", e);
				}
			}
		}));
		final ByteBuffer rowStartMarker = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(Const.ROW_START_MARKER);
		client = new ReplicationClient(SocketChannel.open(new InetSocketAddress("127.0.0.1", 33016)), lsn + 1L) {
			@Override
			protected ByteBuffer readBody(Header header) throws IOException {
				if (Backup.this.xlogChannel == null) {
					Backup.this.xlogChannel = nextFile(folder);
				}
				ByteBuffer body = super.readBody(header);
				this.header.flip();
				rowStartMarker.flip();
				synchronized (Backup.this) {
					while (rowStartMarker.hasRemaining())
						Backup.this.xlogChannel.write(rowStartMarker);
					while (this.header.hasRemaining())
						Backup.this.xlogChannel.write(this.header);
					while (body.hasRemaining())
						Backup.this.xlogChannel.write(body);
					Backup.this.xlogChannel.force(false);
					body.flip();
				}
				return body;
			}
		};
	}
	public XLogEntry nextEntry() throws IOException {
		XLogEntry entry = client.nextEntry();
		lsn = entry.header.lsn;
		if (++row >= limit) {
			close();
			xlogChannel = nextFile(folder);
			row = 0;
		}
		return entry;
	}
	protected FileChannel nextFile(String folder) throws IOException {
		String fileName = folder + "/" + xlogNameFormat.format(lsn + 1L) + ".xlog";
		new File(fileName).createNewFile();
		FileChannel channel = new FileOutputStream(fileName, true).getChannel();
		writer = new XLogWriter(channel);
		return channel;
	}
	public void close() throws IOException {
		if (writer != null) {
			writer.close();
		}
	}
	public static void main(String[] args) throws IOException {
		final Backup backup = new Backup("/home/dgreen/backup", "localhost", 33016);
		backup.start();
		XLogEntry entry = null;
		while ((entry = backup.nextEntry()) != null) {
			StringBuilder pk = new StringBuilder();
			for (int i = 0; i < entry.tuple.size(); i++) {
				if (pk.length() > 0) {
					pk.append(" - ");
				}
				switch (entry.tuple.getBytes(i).length) {
				case 4:
					pk.append(String.valueOf(entry.tuple.getInt(i)));
					break;
				case 8:
					pk.append(String.valueOf(entry.tuple.getLong(i)));
					break;
				default:
					pk.append(entry.tuple.getString(i, "UTF-8"));
				}
			}
			switch (entry.op) {
			case Update.OP_CODE:
				System.out.println("Got update on #" + pk.toString());
				break;
			case Insert.OP_CODE:
				System.out.println("Got insert " + pk.toString());
				break;
			case Delete.OP_CODE:
				System.out.println("Got delete of #" + pk.toString());
				break;
			default:
				System.out.println("Got unknown op " + entry.op + " " + pk.toString());
				break;
			}
		}
	}
}


    I would also like to note separately that when using the functionality for working with xlog files, it is almost impossible to lose data, even if you accidentally deleted a tuple or completely cleared space using the XLogReader and XLogWriter classes, you can easily edit xlog.

    That's all, basically, once again I remind you that you can learn more about the connector at dgreenru.github.com/tarantool-java , the source code of the examples used is available in the github repository.

Also popular now: