Parsim 25TB with AWK and R
- Transfer
How to read this article : I apologize for the fact that the text turned out so long and chaotic. To save you time, I begin each chapter with the introduction of “What I Learned,” in which I explain the essence of the chapter in one or two sentences.
“Just show the solution!” If you just want to see what I’ve come to, then go to the chapter “Become more inventive,” but I think it’s more interesting and useful to read about failures.
Recently, I was instructed to set up a process for processing a large volume of the original DNA sequences (technically, this is an SNP chip). It was necessary to quickly obtain data on a given genetic location (called SNP) for subsequent modeling and other tasks. With the help of R and AWK, I was able to clean and organize the data in a natural way, greatly accelerating the processing of requests. This was not easy for me and required numerous iterations. This article will help you avoid some of my mistakes and demonstrate what I did in the end.
First, some introductory explanations.
Data
Our University Genetic Information Processing Center has provided us with 25 TB of TSV data. I got them broken into 5 packages compressed by Gzip, each of which contained about 240 four-gigabyte files. Each row contained data for one SNP of one person. In total, data on ~ 2.5 million SNPs and ~ 60 thousand people were transmitted. In addition to SNP information, there were numerous columns in the files with numbers reflecting various characteristics, such as reading intensity, frequency of different alleles, etc. There were about 30 columns with unique values.
goal
As with any data management project, the most important thing was to determine how the data would be used. In this case, for the most part, we will select models and workflows for SNP based on SNP . That is, at the same time we will need data for only one SNP. I had to learn how to extract all the records related to one of the 2.5 million SNPs as simply as possible, faster and cheaper.
How not to do it
I will quote a suitable cliche:
I did not fail a thousand times, I just discovered a thousand ways not to parse a bunch of data in a format convenient for queries.
First try
What I learned : There is no cheap way to parse 25 TB at a time.
After listening to the subject “Advanced Big Data Processing Methods” at Vanderbilt University, I was sure that it was a hat. Perhaps it will take an hour or two to configure the Hive server to run through all the data and report on the result. Since our data is stored in AWS S3, I used the Athena service , which allows you to apply Hive SQL queries to S3 data. No need to configure / raise the Hive-cluster, and even pay only for the data you are looking for.
After I showed Athena my data and its format, I ran a few tests with similar queries:
select * from intensityData limit 10;
And quickly got well-structured results. Done.
Until we tried to use the data in the work ...
I was asked to pull out all the information on SNP in order to test the model on it. I ran a query:
select * from intensityData
where snp = 'rs123456';
... and waited. After eight minutes and more than 4 TB of the requested data, I got the result. Athena charges a fee for the amount of data found, at $ 5 per terabyte. So this single request cost $ 20 and eight minutes of waiting. To run the model according to all the data, it was necessary to wait 38 years and pay $ 50 million. Obviously, this did not suit us.
It was necessary to use Parquet ...
What I learned : Be careful with the size of your Parquet files and their organization.
At first I tried to correct the situation by converting all TSVs to Parquet files . They are convenient for working with large data sets, because the information in them is stored in a columnar form: each column lies in its own memory / disk segment, unlike text files in which lines contain elements of each column. And if you need to find something, then just read the necessary column. In addition, a range of values is stored in each file in a column, so if the desired value is not in the column range, Spark will not waste time scanning the entire file.
I ran a simple AWS Glue taskto convert our TSV to Parquet and abandoned new files in Athena. It took about 5 hours. But when I launched the request, it took about the same time and a little less money to complete it. The fact is that Spark, trying to optimize the task, simply unpacked one TSV-chunk and put it in its own Parquet-chunk. And since each chunk was large enough and contained the full records of many people, all SNPs were stored in each file, so Spark had to open all the files to extract the necessary information.
Curiously, the default (and recommended) compression type in Parquet - snappy - is not splitable. Therefore, each executor stuck to the task of unpacking and downloading the full 3.5 GB dataset.
We understand the problem
What I learned : sorting is difficult, especially if the data is distributed.
It seemed to me that now I understood the essence of the problem. All I had to do was sort the data by SNP column, not by people. Then several SNPs will be stored in a separate data chunk, and then the Parquet smart function “open only if the value is in the range” will manifest itself in all its glory. Unfortunately, sorting out billions of rows scattered across a cluster has proven to be a daunting task.
AWS certainly does not want to return the money because of “I'm an absent-minded student.” After I started sorting on Amazon Glue, it worked for 2 days and crashed.
What about partitioning?
What I learned : Partitions in Spark should be balanced.
Then the idea came to me to partition the data on the chromosomes. There are 23 of them (and a few more, given mitochondrial DNA and unmapped areas).
This will allow you to split the data into smaller portions. If you add only one line to the Spark export function in the Glue script
partition_by = "chr"
, then the data should be sorted into buckets. The genome consists of numerous fragments called chromosomes.
Unfortunately, this did not work. Chromosomes have different sizes, and therefore a different amount of information. This means that the tasks that Spark sent to workers were not balanced and performed slowly, because some nodes finished earlier and were idle. However, the tasks were completed. But when requesting one SNP, imbalance again caused problems. The cost of processing SNPs on larger chromosomes (that is, where we want to get the data from) decreased by only about 10 times. A lot, but not enough.
And if you divide into even smaller partitions?
What I learned : never try to do 2.5 million partitions at all.
I decided to go for a walk and partitioned every SNP. This guaranteed the same size of partitions. BAD WAS AN IDEA . I took advantage of Glue and added an innocent string
partition_by = 'snp'
. The task started and started to run. A day later, I checked and saw that nothing was written in S3 so far, so I killed the task. It looks like Glue was writing intermediate files to a hidden place in S3, and a lot of files, maybe a couple of millions. As a result, my mistake cost more than a thousand dollars and did not please my mentor.Partitioning + sorting
What I learned : sorting is still difficult, as is setting up Spark.
The last attempt at partitioning was that I partitioned the chromosomes and then sorted each partition. In theory, this would speed up each request, because the desired SNP data should be within several Parquet chunks within a given range. Alas, sorting even partitioned data has proven to be a difficult task. As a result, I switched to EMR for a custom cluster and used eight powerful instances (C5.4xl) and Sparklyr to create a more flexible workflow ...
# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
group_by(chr) %>%
arrange(Position) %>%
Spark_write_Parquet(
path = DUMP_LOC,
mode = 'overwrite',
partition_by = c('chr')
)
... however, the task was still not completed. I tuned in every way: I increased the memory allocation for each query executor, used nodes with a large amount of memory, used broadcasting variables, but each time it turned out to be half measures, and gradually the performers started to fail, until everything stopped.
I'm getting more inventive
What I learned : sometimes special data require special solutions.
Each SNP has a position value. This is the number corresponding to the number of bases lying along its chromosome. This is a good and natural way to organize our data. At first I wanted to partition by region of each chromosome. For example, positions 1 - 2000, 2001 - 4000, etc. But the problem is that SNPs are not evenly distributed across the chromosomes, which is why the size of the groups will vary greatly.
As a result, I came to be divided into categories (rank) positions. According to the already downloaded data, I ran a request for a list of unique SNPs, their positions and chromosomes. Then he sorted the data inside each chromosome and collected SNP into groups (bin) of a given size. Say 1000 SNP each. This gave me an SNP relationship with a group-in-chromosome.
In the end, I made groups (bin) on 75 SNP, I will explain the reason below.
snp_to_bin <- unique_snps %>%
group_by(chr) %>%
arrange(position) %>%
mutate(
rank = 1:n()
bin = floor(rank/snps_per_bin)
) %>%
ungroup()
First try with Spark
What I learned : Spark integration is fast, but partitioning is still expensive.
I wanted to read this small (2.5 million lines) data frame in Spark, combine it with raw data, and then partition by freshly added column
bin
.
# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
group_by(chr_bin) %>%
arrange(Position) %>%
Spark_write_Parquet(
path = DUMP_LOC,
mode = 'overwrite',
partition_by = c('chr_bin')
)
I used
sdf_broadcast()
, so Spark finds out that it should send a data frame to all nodes. This is useful if the data is small and required for all tasks. Otherwise, Spark tries to be smart and distributes data as needed, which can cause brakes. And again, my idea did not work: the tasks worked for a while, completed the merger, and then, like the executors launched by partitioning, they started to fail.
Add AWK
What I learned : do not sleep when the basics teach you. Surely someone already solved your problem back in the 1980s.
Up to this point, the cause of all my failures with Spark was the confusion of data in the cluster. Perhaps the situation can be improved by pre-processing. I decided to try to split the raw text data into chromosome columns, so I was hoping to provide Spark with “pre-partitioned” data.
I searched on StackOverflow for how to break down column values and found such a great answer. Using AWK, you can split a text file into column values by writing to the script rather than sending the results to
stdout
. For testing, I wrote a Bash script. I downloaded one of the packed TSVs, then unpacked it using
gzip
and sent to awk
.gzip -dc path/to/chunk/file.gz |
awk -F '\t' \
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'
It worked!
Core filling
What I learned :
gnu parallel
is a magic thing, everyone should use it. The separation was rather slow, and when I started
htop
to test the use of a powerful (and expensive) EC2 instance, it turned out that I was using only one core and approximately 200 MB of memory. In order to solve the problem and not lose a lot of money, it was necessary to figure out how to parallelize the work. Fortunately, in Jeron Janssens’s stunning Data Science at the Command Line book, I found a chapter on parallelization. From it I learned about a gnu parallel
very flexible method for implementing multithreading on Unix.When I started the partition using a new process, everything was fine, but there was a bottleneck - downloading S3 objects to disk was not too fast and not completely parallelized. To fix this, I did this:
- I found out that it is possible to implement the S3-download step directly in the pipeline, completely eliminating the intermediate storage on disk. This means that I can avoid writing raw data to disk and use even smaller, and therefore cheaper storage on AWS.
- The team
aws configure set default.s3.max_concurrent_requests 50
greatly increased the number of threads that the AWS CLI uses (by default there are 10). - I switched to the EC2 instance optimized for network speed, with the letter n in the name. I found that the loss of computing power when using n-instances is more than offset by an increase in download speed. For most tasks, I used c5n.4xl.
- Changed
gzip
topigz
, this is a gzip tool that can do cool things to parallelize the initially unparalleled task of unpacking files (this helped the least).
# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50
for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do
aws s3 cp s3://$batch_loc$chunk_file - |
pigz -dc |
parallel --block 100M --pipe \
"awk -F '\t' '{print \$1\",...\"$30\">\"chunked/{#}_chr\"\$15\".csv\"}'"
# Combine all the parallel process chunks to single files
ls chunked/ |
cut -d '_' -f 2 |
sort -u |
parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
# Clean up intermediate data
rm chunked/*
done
These steps are combined with each other so that everything works very quickly. Thanks to the increased download speed and the rejection of writing to disk, I could now process a 5-terabyte package in just a few hours.
This tweet was supposed to mention 'TSV'. Alas.
Using re-parsed data
What I learned : Spark loves uncompressed data and does not like to combine partitions.
Now the data was in S3 in an unpacked (read, shared) and semi-ordered format, and I could return to Spark again. A surprise awaited me: I again failed to achieve the desired! It was very difficult to tell Spark exactly how the data was partitioned. And even when I did this, it turned out that there were too many partitions (95 thousand), and when I
coalesce
reduced their number to reasonable limits, it ruined my partitioning. I am sure this can be fixed, but in a couple of days of searching, I could not find a solution. In the end, I completed all the tasks in Spark, although it took some time, and my split Parquet files were not very small (~ 200 Kb). However, the data was where it was needed.Too small and different, wonderful!
Testing local Spark requests
What I learned : Spark has too much overhead in solving simple problems.
By downloading the data in a smart format, I was able to test the speed. I set up a script on R to start the local Spark server, and then I loaded the Spark data frame from the specified repository of Parquet groups (bin). I tried to load all the data, but could not get Sparklyr to recognize partitioning.
sc <- Spark_connect(master = "local")
desired_snp <- 'rs34771739'
# Start a timer
start_time <- Sys.time()
# Load the desired bin into Spark
intensity_data <- sc %>%
Spark_read_Parquet(
name = 'intensity_data',
path = get_snp_location(desired_snp),
memory = FALSE )
# Subset bin to snp and then collect to local
test_subset <- intensity_data %>%
filter(SNP_Name == desired_snp) %>%
collect()
print(Sys.time() - start_time)
Execution took 29.415 seconds. Much better, but not too good for mass testing anything. In addition, I could not speed up the work with caching, because when I tried to cache the data frame in memory, Spark always crashed, even when I allocated more than 50 GB of memory for a dataset that weighed less than 15.
Return to AWK
What I learned : AWK associative arrays are very efficient.
I understood that I could achieve a higher speed. I recalled that in Bruce Barnett ’s excellent AWK guide, I read about a cool feature called “ associative arrays ”. In fact, these are key-value pairs, which for some reason were called differently in AWK, and therefore I somehow did not particularly mention them. Roman Cheplyaka recalled that the term “associative arrays” is much older than the term “key-value pair”. Even if you search for key-value in Google Ngram, you will not see this term there, but you will find associative arrays! In addition, the key-value pair is most often associated with databases, so it is much more logical to compare with hashmap. I realized that I could use these associative arrays to connect my SNPs to the bin table and raw data without using Spark.
For this, in the AWK script, I used a block
BEGIN
. This is a piece of code that is executed before the first line of data is transferred to the main body of the script.join_data.awk
BEGIN {
FS=",";
batch_num=substr(chunk,7,1);
chunk_id=substr(chunk,15,2);
while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}
The command
while(getline...)
loaded all the lines from the CSV group (bin), set the first column (SNP name) as the key for the associative array bin
and the second value (group) as the value. Then, in the block {
}
, which is executed with respect to all the rows of the main file, each line is sent to the output file gets a unique name depending on the group (bin directory): ..._bin_"bin[$1]"_...
. Variables
batch_num
and chunk_id
reflect the data provided by the conveyor, thus avoiding race conditions, and each thread of execution, running parallel
, writing in its own unique file.Since I scattered all the raw data into folders on the chromosomes left after my previous experiment with AWK, now I could write another Bash script to process on the chromosome at a time and give deeper partitioned data to S3.
DESIRED_CHR='13'
# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=\""$DESIRED_CHR"\" -v chunk=\"{}\" -f split_on_chr_bin.awk"
# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*
The script has two sections
parallel
. The first section reads data from all files containing information on the desired chromosome, then this data is distributed across streams that scatter files into the corresponding groups (bin). In order to avoid a race condition when multiple streams are recorded in a single file, the AWK passes the file names to record data in different places, for example
chr_10_bin_52_batch_2_aa.csv
. As a result, many small files are created on the disk (for this I used terabyte EBS volumes). The conveyor from the second section
parallel
goes through the groups (bin) and combines their individual files into common CSV c cat
, and then sends them for export.Broadcast to R?
What I learned : you can access
stdin
and stdout
from the R-script, and therefore use it in the pipeline. Within the Bash-script, you can see a line like this:
...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R...
. It translates all concatenated group files (bin) into the R script below. {}
is a special technique parallel
that inserts any data sent by it into the specified stream directly into the command itself. The option {#}
provides a unique thread ID, and {%}
represents the job slot number (repeated, but never at the same time). A list of all options can be found in the documentation.#!/usr/bin/env Rscript
library(readr)
library(aws.s3)
# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]
data_cols <- list(SNP_Name = 'c', ...)
s3saveRDS(
read_csv(
file("stdin"),
col_names = names(data_cols),
col_types = data_cols
),
object = data_destination
)
When the variable is
file("stdin")
transferred to readr::read_csv
, the data translated into the R-script is loaded into the frame, which is then written directly to S3 as a .rds
file using aws.s3
. RDS is a bit like a younger version of Parquet, without the frills of column storage.
After completing the Bash script, I received a
.rds
bunch of files lying in S3, which allowed me to use efficient compression and built-in types. Despite using the brake R, everything worked very fast. It is not surprising that the fragments on R that are responsible for reading and writing data are well optimized. After testing on one medium-sized chromosome, the task was completed on the C5n.4xl instance in about two hours.
S3 limitations
What I learned : thanks to the smart implementation of paths, S3 can process many files.
I was worried if S3 could handle a lot of files transferred to it. I could make the file names meaningful, but how will S3 look for them?
Folders in S3 are just for beauty, in fact, the system is not interested in the symbol
/
. From the S3 FAQ page. It seems that S3 represents the path to a specific file as a simple key in a kind of hash table or document-based database. A bucket can be thought of as a table, and files as entries in this table.
Since speed and efficiency are important for making a profit at Amazon, it’s not surprising that this key-to-file-to-file system is awesomely optimized. I tried to find a balance: so that I didn’t have to make a lot of get-requests, but so that the requests would be executed quickly. It turned out that it is best to do about 20 thousand bin files. I think if you continue to optimize, you can increase the speed (for example, make a special bucket only for data, thus reducing the size of the search table). But there was no longer time and money for further experiments.
What about cross-compatibility?
What I learned: the main reason for losing time is the premature optimization of your storage method.
At this moment it is very important to ask yourself: “Why use a proprietary file format?” The reason is the download speed (packed gzip CSV files were loaded 7 times longer) and compatibility with our work processes. I can reconsider my decision if R can easily load Parquet (or Arrow) files without load in the form of Spark. In our laboratory everyone uses R, and if I need to convert the data to another format, then I still have the original text data, so I can just start the pipeline again.
Work sharing
What I learned : do not try to optimize tasks manually, let the computer do it.
I debugged the workflow on one chromosome, now I need to process all the other data.
I wanted to raise several EC2 instances for the conversion, but at the same time I was afraid of getting an extremely unbalanced load in different processing tasks (just as Spark suffered from unbalanced partitions). In addition, I did not want to raise one instance per chromosome, because AWS accounts have a default limit of 10 instances.
Then I decided to write a script in R to optimize processing tasks.
First I asked S3 to calculate how much storage space each chromosome occupies.
library(aws.s3)
library(tidyverse)
chr_sizes <- get_bucket_df(
bucket = '...', prefix = '...', max = Inf
) %>%
mutate(Size = as.numeric(Size)) %>%
filter(Size != 0) %>%
mutate(
# Extract chromosome from the file name
chr = str_extract(Key, 'chr.{1,4}\\.csv') %>%
str_remove_all('chr|\\.csv')
) %>%
group_by(chr) %>%
summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB
# A tibble: 27 x 2
chr total_size
1 0 163.
2 1 967.
3 10 541.
4 11 611.
5 12 542.
6 13 364.
7 14 375.
8 15 372.
9 16 434.
10 17 443.
# … with 17 more rows
Then I wrote a function that takes the total size, shuffles the order of the chromosomes, divides them into groups
num_jobs
and tells how different the sizes of all the processing jobs are.num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7
shuffle_job <- function(i){
chr_sizes %>%
sample_frac() %>%
mutate(
cum_size = cumsum(total_size),
job_num = ceiling(cum_size/job_size)
) %>%
group_by(job_num) %>%
summarise(
job_chrs = paste(chr, collapse = ','),
total_job_size = sum(total_size)
) %>%
mutate(sd = sd(total_job_size)) %>%
nest(-sd)
}
shuffle_job(1)
# A tibble: 1 x 2
sd data
1 153.
Then I ran a thousand stirs with purrr and picked the best.
1:1000 %>%
map_df(shuffle_job) %>%
filter(sd == min(sd)) %>%
pull(data) %>%
pluck(1)
So I got a set of tasks very similar in size. Then it only remained to wrap my previous Bash script in a large loop
for
. It took about 10 minutes to write this optimization. And this is much less than what I would have spent on manually creating tasks in the case of their imbalance. Therefore, I believe that with this preliminary optimization I did not lose.for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi
At the end I add the shutdown command:
sudo shutdown -h now
... and everything worked out! Using AWS CLI, I raised instances and, through the option,
user_data
passed them the Bash scripts of their processing tasks. They were executed and automatically turned off, so I did not pay for excessive processing power.aws ec2 run-instances ...\
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<>}]" \
--user-data file://<>
We are packing!
What I learned : API should be simple for the sake of simplicity and flexibility of use.
Finally, I got the data in the right place and form. It remained to simplify the process of using data as much as possible, so that my colleagues would be easier. I wanted to make a simple API for creating queries. If in the future I decide to switch from
.rds
to Parquet files, then this should be a problem for me, and not for colleagues. For this, I decided to make an internal R-package. I compiled and documented a very simple package that contains just a few functions for accessing the data collected around the function
get_snp
. I also made pkgdown a site for my colleagues so that they can easily see examples and documentation.Intelligent Caching
What I learned : if your data is well prepared, caching will be easy!
Since one of the main workflows applied the same analysis model to the SNP package, I decided to use binning to my advantage. When transmitting data via SNP, all information from the group (bin) is attached to the returned object. That is, old queries can (in theory) speed up the processing of new queries.
# Part of get_snp()
...
# Test if our current snp data has the desired snp.
already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin
if(!already_have_snp){
# Grab info on the bin of the desired snp
snp_results <- get_snp_bin(desired_snp)
# Download the snp's bin data
snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
} else {
# The previous snp data contained the right bin so just use it
snp_results <- prev_snp_results
}
...
When building the package, I ran a lot of benchmarks to compare speed when using different methods. I recommend not to neglect this, because sometimes the results are unexpected. For example, it
dplyr::filter
turned out to be much faster than capturing rows using index-based filtering, and getting a single column from a filtered data frame worked much faster than using indexing syntax. Note that the object
prev_snp_results
contains a key snps_in_bin
. This is an array of all unique SNPs in the group (bin), allowing you to quickly check if there is already data from a previous request. It also makes it easy to loop through all the SNPs in the group (bin) with this code:# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin
for(current_snp in snps_in_bin){
my_snp_results <- get_snp(current_snp, my_snp_results)
# Do something with results
}
results
Now we can (and started to seriously) run models and scenarios that were previously inaccessible to us. The best thing is that my lab colleagues don't have to think about any difficulties. They just have a working function.
And although the package saves them from the details, I tried to make the data format simple enough so that they could figure it out if I suddenly disappear tomorrow ...
The speed increased noticeably. Usually we scan functionally significant fragments of the genome. Previously, we could not do this (it turned out too expensive), but now, thanks to the group (bin) structure and caching, it takes an average of less than 0.1 seconds to query one SNP, and the data usage is so low that the cost of S3 is cheap.
Заключение
This article is not a guide at all. The solution was individual, and almost certainly not optimal. Rather, it is a travel story. I want others to understand that such solutions do not appear fully formed in the head, this is the result of trial and error. In addition, if you are looking for a data analysis specialist, then keep in mind that to use these tools effectively, you need experience, and experience requires money. I am happy that I had the means to pay, but many others who can do the same job better than me will never have such an opportunity due to lack of money even for an attempt.
Big data tools are universal. If you have the time, then you can almost certainly write a faster solution using smart data cleansing, storage, and extraction techniques. Ultimately, it comes down to an analysis of costs and benefits.
What I learned:
- There is no cheap way to parse 25 TB at a time;
- Be careful with the size of your Parquet files and their organization;
- Partitions in Spark must be balanced;
- never try to do 2.5 million partitions at all;
- sorting is still difficult, as is setting up Spark;
- sometimes special data requires special solutions;
- Spark integration is fast, but partitioning is still expensive;
- Do not sleep when the basics teach you, for sure, someone already solved your problem back in the 1980s;
gnu parallel
- this is a magic thing, everyone should use it;- Spark loves uncompressed data and does not like to combine partitions;
- Spark has too much overhead in solving simple tasks;
- associative arrays in AWK are very efficient;
- You can access
stdin
andstdout
from the R-script, and therefore use it in the pipeline; - thanks to smart path implementation, S3 can process many files;
- the main reason for losing time is the premature optimization of your storage method;
- do not try to optimize tasks manually; let the computer do it;
- The API should be simple for the sake of simplicity and flexibility of use;
- if your data is well prepared, caching will be easy!