Time-limited parallel data loading

Published on December 10, 2014

Time-limited parallel data loading

  • Tutorial
There are situations when it is necessary to obtain data from several remote sources, but so that the wait time is not too long. For example, when loading weather or currency data, we can poll several services and display the results of all those who answered for a given period of time.



If an insufficient number of services responded during this period of time, we can give additional time to wait for the download.

Total, we operate with three basic parameters:
  • Acceptable waiting time
  • Minimum number of sources
  • Extra waiting time


To make things easier, we write a loader class. Everything is very simple, first listing, then explanation:
AsyncDataLoader
public sealed class AsyncDataLoader<T>
{
    /// <summary>
    /// Инициализирует новый объек.
    /// </summary>
    public AsyncDataLoader()
    {
        EmergencyPeriod = TimeSpan.Zero;
        MinResultsCount = 0;
    }
    /// <summary>
    /// Инициализирует новый объект.
    /// </summary>
    /// <param name="dataLoaders">Функции, загружающие данные.</param>
    /// <param name="loadDataPeriod">Время, в течении которого данные должны быть загруженны.</param>
    public AsyncDataLoader(IEnumerable<Func<T>> dataLoaders, TimeSpan loadDataPeriod)
        : this(dataLoaders, loadDataPeriod, 0, TimeSpan.Zero)
    {
    }
    /// <summary>
    /// Инициализирует новый объект.
    /// </summary>
    /// <param name="dataLoaders">Функции, загружающие данные.</param>
    /// <param name="loadDataPeriod">Время, в течении которого данные должны быть загруженны.</param>
    /// <param name="minimalResultsCount">Минимально необходимое количество загруженных результатов.</param>
    /// <param name="emergencyPeriod">Время, в течении которого будет происходить дозагрузка данных.</param>
    public AsyncDataLoader(IEnumerable<Func<T>> dataLoaders, TimeSpan loadDataPeriod, int minimalResultsCount, TimeSpan emergencyPeriod)
    {
        DataLoaders = dataLoaders;
        LoadDataPeriod = loadDataPeriod;
        EmergencyPeriod = emergencyPeriod;
        MinResultsCount = minimalResultsCount;    
    }
    /// <summary>
    /// Возвращает или устанавливает время, в течении которого будут предприниматься попытки догрузить данные, если получено недостаточное количество результатов.
    /// </summary>
    public TimeSpan EmergencyPeriod
    {
        get;
        set;
    }
    /// <summary>
    /// Возвращает или устанавливает минимально допустимое количество результатов.
    /// </summary>
    public int MinResultsCount
    {
        get;
        set;
    }
    /// <summary>
    /// Возвращает или устанавливает функции, загружающие данные.
    /// </summary>
    public IEnumerable<Func<T>> DataLoaders
    {
        get;
        set;
    }
    /// <summary>
    /// Возвращает или устанавливает время, в течении которого должны быть загруженны данные.
    /// </summary>
    public TimeSpan LoadDataPeriod
    {
        get;
        set;
    }
    /// <summary>
    /// Возвращает или устанавливает признак пропуска нулевых результатов.
    /// </summary>
    public bool SkipDefaultResults
    {
        get;
        set;
    }
    /// <summary>
    /// Асинхронно загружает результаты и возвращает их.
    /// </summary>
    /// <returns>Загруженные результаты.</returns>
    public async Task<T[]> GetResultsAsync()
    {
        BlockingCollection<T> results = new BlockingCollection<T>();
        List<Task> tasks = new List<Task>();
        tasks.AddRange(DataLoaders.Select(handler => Task.Factory.StartNew(() =>
        {
            T result = handler.Invoke();
            results.Add(result);
        }, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness, TaskScheduler.Default)));
        bool isAllCompleted = true;
        try
        {
            CancellationTokenSource source = new CancellationTokenSource(LoadDataPeriod);
            CancellationToken token = source.Token;
#if DEBUG
            token = CancellationToken.None; //не будем мешать отладке
#endif
            await Task.Factory.ContinueWhenAll(tasks.ToArray(), (t) =>
                {
                }, token);
        }
        catch (TaskCanceledException ex) //не всё успели? ничего страшного.
        {
            isAllCompleted = false;
        }
        if (!isAllCompleted && EmergencyPeriod > TimeSpan.Zero) //надо ли пробовать загружать дальше
        {
            Func<bool> isReadyHandler = () => results.Count >= MinResultsCount; //ок, результатов недостаточно, пытаемся грузить пока есть время.
            await WaitWhenReady(isReadyHandler, EmergencyPeriod);
        }
        if (SkipDefaultResults)
            return results.Where(r => !object.Equals(r, default(T))).ToArray();
        return results.ToArray();
    }
    /// <summary>
    /// Ждёт пока догрузятся данные.
    /// </summary>
    /// <param name="isReadyValidator">Функция, проверяющая, догрузились ли данные.</param>
    /// <param name="totalDelay">Задержка ожидания.</param>
    /// <param name="iterationsCount">Количество итерация для проверки.</param>
    private async Task WaitWhenReady(Func<bool> isReadyValidator, TimeSpan totalDelay, int iterationsCount = 7)
    {
        if (isReadyValidator())
            return;
        double milliseconds = totalDelay.TotalMilliseconds / iterationsCount;
        TimeSpan delay = TimeSpan.FromMilliseconds(milliseconds);
        for (int i = 0; i < iterationsCount; i++)
        {
            if (isReadyValidator())
                return;
            await Task.Delay(delay);
        }
    }
}


In the body of GetResultsAsync:
  1. Create a collection to store the results. The BlockingCollection class is safe when interacting with different threads;
  2. We put each handler in a separate task. We group all the tasks into a list, warn the scheduler about the long run (TaskCreationOptions.LongRunning) and ask them to add priority (TaskCreationOptions.PreferFairness);
  3. We launch all tasks on execution, setting a time limit;
  4. If necessary, give additional time to download data;
  5. Before returning, skip empty results if the SkipDefaultResults == true flag.

For the debug version, we forcefully disable the time limit in order to be able to walk around the code in a debugged function.
References: