Spring: Implementing TaskExecutor with Transactional Support

Spring, taking care of the developers, offers a convenient and simple facade for interacting with the transaction manager. However, will the standard mechanism always be enough to implement sophisticated architectural ideas? Obviously not.

This post will talk about Spring features -
  • take a look at examples of standard transaction management using annotations,
  • we’ll understand when it’s impossible to solve the problem using annotations,
  • and judging by the title of the article, we give an example of the implementation of transactional code execution in a new thread created using the Spring TaskExecutor.


The most commonly used transaction specification using the @Transactional annotation. To use this mechanism, it is enough to configure the preferred TransactionManager and enable annotation processing. In the case of configuration using XML files, it looks something like this:
    .....
     
    ....


Using it further is as simple as hanging the @Transactional annotation on an implementation of an interface method or over the entire implementation class.
@Service
public class FooServiceImpl implements FooService {
    @Autowired
    private FooDao fooDao;
    @Transactional
    @Override
    public void update(Foo entity)  {
        fooDao.merge(entity);
    }
}

However, in order to effectively use this mechanism, you need to remember a few, not always obvious subtleties:
  • The class must be declared as a bean (using annotations, code-based or in the xml container configuration)
  • The class must implement the interface, otherwise it will be difficult for the container to create a proxy object with which the transaction is controlled.
  • Calling transactional methods from another method of the same class will not result in a transaction! (consequence of the previous paragraph)

A similar mechanism allows you not to write a transaction management code every time!

However, can there be situations when this mechanism will not be enough? or is there a context in which annotations cannot be dispensed with? I'm afraid the answer is obvious. For example, we might want to execute part of the code in one transaction, and another part in the second (here it would be more likely to split the method into two architecturally correct). Readers, I think, have their own examples.

A more realistic example is when part of the code needs to be executed asynchronously:
@Service
public class FooServiceImpl implements FooService {
    @Autowired
    private TaskExecutor taskExecutor;
    @Autowired
    private FooDao fooDao;
    @Transactional
    @Override
    public void update(Foo entity)  {
        fooDao.merge(entity);
        taskExecutor.run(new Runnable() {
            public void run() {
                someLongTimeOperation(entity);
            }
        });
    }
    @Transactional
    @Override
    public void someLongTimeOperation(Foo entity)  {
        // тут набор ресурсоёмких операций
    }
}


What happens: before the start of the update () method, a transaction is created, then operations from the body are performed, and upon exiting the method, the transaction is closed. But in our case, a new thread is created in which the code will be executed. And it is very obvious that at the time of exit from the update () method and the associated destruction of the transaction, the execution of the code in the second running thread can / will continue. As a result, upon completion of the method, in the second thread, we get an exception and the entire transaction is “backed up”.

To the previous example, add manual transaction creation:
@Service
public class FooServiceImpl implements FooService {
    @Autowired
    private TaskExecutor taskExecutor;
    @Autowired
    private FooDao fooDao;
    @Transactional
    @Override
    public void update(final Foo entity)  {
       fooDao.merge(entity);
       final TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
       taskExecutor.execute(new Runnable() {
            @Override
            public void run() {
                transactionTemplate.execute(new TransactionCallbackWithoutResult() {
                    @Override
                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        someLongTimeOperation(entity);
                    }
                });
            }
        });     
    }
    @Transactional
    @Override
    public void someLongTimeOperation(Foo entity)  {
        // тут набор ресурсоёмких операций
    }
}


Now someLongTimeOperation () is executed asynchronously and in the selected transaction. However, I want a generalized implementation, so as not to duplicate the cumbersome manual control code.

Well ... here she is:
public interface TransactionalAsyncTaskExecutor extends AsyncTaskExecutor {
    void execute(Runnable task, Integer propagation, Integer isolationLevel);
}

public class DelegatedTransactionalAsyncTaskExecutor implements InitializingBean, TransactionalAsyncTaskExecutor {
    private PlatformTransactionManager transactionManager;
    private AsyncTaskExecutor delegate;
    private TransactionTemplate sharedTransactionTemplate;
    public DelegatedTransactionalAsyncTaskExecutor() {
    }
    public DelegatedTransactionalAsyncTaskExecutor(PlatformTransactionManager transactionManager, AsyncTaskExecutor delegate) {
        this.transactionManager = transactionManager;
        this.delegate = delegate;
    }
    @Override
    public void execute(final Runnable task, Integer propagation, Integer isolationLevel) {
        final TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
        transactionTemplate.setPropagationBehavior(propagation);
        transactionTemplate.setIsolationLevel(isolationLevel);
        delegate.execute(new Runnable() {
            @Override
            public void run() {
                transactionTemplate.execute(new TransactionCallbackWithoutResult() {
                    @Override
                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        task.run();
                    }
                });
            }
        });
    }
    @Override
    public void execute(final Runnable task) {
        execute(task, TransactionDefinition.PROPAGATION_REQUIRED, TransactionDefinition.ISOLATION_DEFAULT);
    }
    @Override
    public void execute(final Runnable task, long startTimeout) {
        final TransactionTemplate transactionTemplate = getSharedTransactionTemplate();
        delegate.execute(new Runnable() {
            @Override
            public void run() {
                transactionTemplate.execute(new TransactionCallbackWithoutResult() {
                    @Override
                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        task.run();
                    }
                });
            }
        }, startTimeout);
    }
    @Override
    public Future submit(final Runnable task) {
        final TransactionTemplate transactionTemplate = getSharedTransactionTemplate();
        return delegate.submit(new Runnable() {
            @Override
            public void run() {
                transactionTemplate.execute(new TransactionCallbackWithoutResult() {
                    @Override
                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        task.run();
                    }
                });
            }
        });
    }
    @Override
    public  Future submit(final Callable task) {
        final TransactionTemplate transactionTemplate = getSharedTransactionTemplate();
        return delegate.submit(new Callable() {
            @Override
            public T call() throws Exception {
                return transactionTemplate.execute(new TransactionCallback() {
                    @Override
                    public T doInTransaction(TransactionStatus status) {
                        T result = null;
                        try {
                            result = task.call();
                        } catch (Exception e) {
                            e.printStackTrace();
                            status.setRollbackOnly();
                        }
                        return result;
                    }
                });
            }
        });
    }
    public PlatformTransactionManager getTransactionManager() {
        return transactionManager;
    }
    public void setTransactionManager(PlatformTransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }
    public AsyncTaskExecutor getDelegate() {
        return delegate;
    }
    public void setDelegate(AsyncTaskExecutor delegate) {
        this.delegate = delegate;
    }
    public TransactionTemplate getSharedTransactionTemplate() {
        return sharedTransactionTemplate;
    }
    public void setSharedTransactionTemplate(TransactionTemplate sharedTransactionTemplate) {
        this.sharedTransactionTemplate = sharedTransactionTemplate;
    }
    @Override
    public void afterPropertiesSet() {
        if (transactionManager == null) {
            throw new IllegalArgumentException("Property 'transactionManager' is required");
        }
        if (delegate == null) {
            delegate = new SimpleAsyncTaskExecutor();
        }
        if (sharedTransactionTemplate == null) {
            sharedTransactionTemplate = new TransactionTemplate(transactionManager);
        }
    }
}


This implementation is a wrapper, as a result of which it delays calls to any TaskExecutor of which there are several in Spring. Moreover, each call is “wrapped” in a transaction. You can manually manage transactions in Spring using TransactionTemplate, but EntityManager # getTransaction () throws an exception.

And finally, about a practical example in action:

Configure TaskExecutor:


Service example:

@Service
public class FooServiceImpl implements FooService {
    @Autowired
    private TransactionalAsyncTaskExecutor trTaskExecutor;
    @Autowired
    private FooDao fooDao;
    @Transactional
    @Override
    public void update(Foo entity)  {
        fooDao.merge(entity);                                   // Выполнится в транзакции созданной Spring'ом (tr_1).
        trTaskExecutor.run(new Runnable() {       // Запустится новый поток и новая транзакция (tr_2), метод run() выполнится паралельно текущему потоку и в рамках транзакции tr_2.
            public void run() {
                someLongTimeOperation();
            }
        });
    }                                                                             // Выход из метода и вместе с этим tr_1 завершится. Обрабока tr_2 осуществится с помощью TransactionTemplate.
    @Transactional
    @Override
    public void someLongTimeOperation(Foo entity)  {
        // тут набор ресурсоёмких операций
    }
}


Thus, we have a completely generalized wrapper implementation for TaskExecutor, which allows us to avoid duplication of transaction creation code.

Also popular now: