Time-limited parallel data loading

  • Tutorial
There are situations when it is necessary to obtain data from several remote sources, but so that the wait time is not too long. For example, when loading weather or currency data, we can poll several services and display the results of all those who answered for a given period of time.



If an insufficient number of services responded during this period of time, we can give additional time to wait for the download.

Total, we operate with three basic parameters:
  • Acceptable waiting time
  • Minimum number of sources
  • Extra waiting time


To make things easier, we write a loader class. Everything is very simple, first listing, then explanation:
AsyncDataLoader
publicsealedclassAsyncDataLoader<T>
{
    ///<summary>/// Инициализирует новый объек.///</summary>publicAsyncDataLoader()
    {
        EmergencyPeriod = TimeSpan.Zero;
        MinResultsCount = 0;
    }
    ///<summary>/// Инициализирует новый объект.///</summary>///<param name="dataLoaders">Функции, загружающие данные.</param>///<param name="loadDataPeriod">Время, в течении которого данные должны быть загруженны.</param>publicAsyncDataLoader(IEnumerable<Func<T>> dataLoaders, TimeSpan loadDataPeriod)
        : this(dataLoaders, loadDataPeriod, 0, TimeSpan.Zero)
    {
    }
    ///<summary>/// Инициализирует новый объект.///</summary>///<param name="dataLoaders">Функции, загружающие данные.</param>///<param name="loadDataPeriod">Время, в течении которого данные должны быть загруженны.</param>///<param name="minimalResultsCount">Минимально необходимое количество загруженных результатов.</param>///<param name="emergencyPeriod">Время, в течении которого будет происходить дозагрузка данных.</param>publicAsyncDataLoader(IEnumerable<Func<T>> dataLoaders, TimeSpan loadDataPeriod, int minimalResultsCount, TimeSpan emergencyPeriod)
    {
        DataLoaders = dataLoaders;
        LoadDataPeriod = loadDataPeriod;
        EmergencyPeriod = emergencyPeriod;
        MinResultsCount = minimalResultsCount;    
    }
    ///<summary>/// Возвращает или устанавливает время, в течении которого будут предприниматься попытки догрузить данные, если получено недостаточное количество результатов.///</summary>public TimeSpan EmergencyPeriod
    {
        get;
        set;
    }
    ///<summary>/// Возвращает или устанавливает минимально допустимое количество результатов.///</summary>publicint MinResultsCount
    {
        get;
        set;
    }
    ///<summary>/// Возвращает или устанавливает функции, загружающие данные.///</summary>public IEnumerable<Func<T>> DataLoaders
    {
        get;
        set;
    }
    ///<summary>/// Возвращает или устанавливает время, в течении которого должны быть загруженны данные.///</summary>public TimeSpan LoadDataPeriod
    {
        get;
        set;
    }
    ///<summary>/// Возвращает или устанавливает признак пропуска нулевых результатов.///</summary>publicbool SkipDefaultResults
    {
        get;
        set;
    }
    ///<summary>/// Асинхронно загружает результаты и возвращает их.///</summary>///<returns>Загруженные результаты.</returns>publicasync Task<T[]> GetResultsAsync()
    {
        BlockingCollection<T> results = new BlockingCollection<T>();
        List<Task> tasks = new List<Task>();
        tasks.AddRange(DataLoaders.Select(handler => Task.Factory.StartNew(() =>
        {
            T result = handler.Invoke();
            results.Add(result);
        }, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness, TaskScheduler.Default)));
        bool isAllCompleted = true;
        try
        {
            CancellationTokenSource source = new CancellationTokenSource(LoadDataPeriod);
            CancellationToken token = source.Token;
#if DEBUG
            token = CancellationToken.None; //не будем мешать отладке#endifawait Task.Factory.ContinueWhenAll(tasks.ToArray(), (t) =>
                {
                }, token);
        }
        catch (TaskCanceledException ex) //не всё успели? ничего страшного.
        {
            isAllCompleted = false;
        }
        if (!isAllCompleted && EmergencyPeriod > TimeSpan.Zero) //надо ли пробовать загружать дальше
        {
            Func<bool> isReadyHandler = () => results.Count >= MinResultsCount; //ок, результатов недостаточно, пытаемся грузить пока есть время.await WaitWhenReady(isReadyHandler, EmergencyPeriod);
        }
        if (SkipDefaultResults)
            return results.Where(r => !object.Equals(r, default(T))).ToArray();
        return results.ToArray();
    }
    ///<summary>/// Ждёт пока догрузятся данные.///</summary>///<param name="isReadyValidator">Функция, проверяющая, догрузились ли данные.</param>///<param name="totalDelay">Задержка ожидания.</param>///<param name="iterationsCount">Количество итерация для проверки.</param>privateasync Task WaitWhenReady(Func<bool> isReadyValidator, TimeSpan totalDelay, int iterationsCount = 7)
    {
        if (isReadyValidator())
            return;
        double milliseconds = totalDelay.TotalMilliseconds / iterationsCount;
        TimeSpan delay = TimeSpan.FromMilliseconds(milliseconds);
        for (int i = 0; i < iterationsCount; i++)
        {
            if (isReadyValidator())
                return;
            await Task.Delay(delay);
        }
    }
}


In the body of GetResultsAsync:
  1. Create a collection to store the results. The BlockingCollection class is safe when interacting with different threads;
  2. We put each handler in a separate task. We group all the tasks into a list, warn the scheduler about the long run (TaskCreationOptions.LongRunning) and ask them to add priority (TaskCreationOptions.PreferFairness);
  3. We launch all tasks on execution, setting a time limit;
  4. If necessary, give additional time to download data;
  5. Before returning, skip empty results if the SkipDefaultResults == true flag.

For the debug version, we forcefully disable the time limit in order to be able to walk around the code in a debugged function.
References:

Also popular now: