Create a pipeline for streaming data processing. Part 1

Original author: Daniel Foley
  • Transfer
Hello. Friends, we are sharing with you a translation of an article prepared especially for students of the Data Engineer course . Go!



Apache Beam and DataFlow for real-time pipelines


Today's post is based on a task that I recently worked on at work. I was really happy to implement it and describe the work done in the format of a blog post, because it gave me the opportunity to work on data engineering, as well as to do something that would be very useful for my team. Not so long ago, I discovered that our systems store a fairly large amount of user logs related to one of our products for working with data. It turned out that no one had used this data, so I immediately became interested in what we could find out if we started to analyze it regularly. However, there were several problems along the way. The first problem was that the data was stored in many different text files that were not available for instant analysis. The second problem was

I had to decide how to make access easier for us and add at least some value by embedding this data source in some of our user interaction solutions. After thinking for a while, I decided to construct a pipeline to transfer this data to the cloud database so that I and the team could access it and start generating any conclusions. After I completed my specialization in Data Engineering at Coursera some time ago, I was eager to use some of the course tools in the project.

So putting data in a cloud database seemed like a smart way to solve my first problem, but what could I do with problem number 2? Fortunately, there was a way to transfer this data to an environment where I could access tools like Python and the Google Cloud Platform (GCP). However, it was a long process, so I needed to do something that would allow me to continue development while I was waiting for the end of the data transfer. The solution I came up with was to create fake data using the Faker library in Python. I had never used this library before, but quickly realized how useful it was. Using this approach allowed me to start writing code and testing the pipeline without actual data.

Based on the foregoing, in this post I will tell you how I built the pipeline described above using some of the technologies available in GCP. In particular, I will use Apache Beam (version for Python), Dataflow, Pub / Sub and Big Query to collect user logs, convert data and transfer them to a database for further analysis. In my case, I only needed Beam's batch functionality, since my data did not arrive in real time, so Pub / Sub was not required. However, I will focus on the streaming version, as this is what you may encounter in practice.

Introduction to GCP and Apache Beam


The Google Cloud Platform provides a set of really useful tools for processing big data. Here are some of the tools that I will use:

  • Pub / Sub is a messaging service using the Publisher-Subscriber template that allows us to receive data in real time.
  • DataFlow is a service that simplifies the creation of data pipelines and automatically solves tasks such as scaling the infrastructure, which means that we can only focus on writing code for our pipeline.
  • BigQuery is a cloud-based data warehouse. If you are familiar with other SQL databases, you won’t have to deal with BigQuery for long.
  • And finally, we will use Apache Beam, namely, focus on the Python version to create our pipeline. This tool will allow us to create a pipeline for streaming or batch processing that integrates with GCP. It is especially useful for parallel processing and is suitable for tasks such as extraction, transformation, and loading (ETL), so if we need to move data from one place to another with transformations or calculations, Beam is a good choice.


A large number of tools are available in GCP, so it can be difficult to cover them all, including their purpose, but nevertheless, here is a brief summary for reference.

Visualization of our conveyor


Let's visualize the components of our pipeline in Figure 1 . At a high level, we want to collect user data in real time, process it and transfer it to BigQuery. Logs are created when users interact with the product by sending requests to the server, which are then logged. This data can be especially useful for understanding how users interact with our product and whether they work correctly. In general, the conveyor will contain the following steps:

  1. The log data of our users is published in the Pub / Sub section.
  2. We will connect to Pub / Sub and convert the data to the appropriate format using Python and Beam (steps 3 and 4 in Figure 1).
  3. After converting the data, Beam will then connect to BigQuery and add them to our table (steps 4 and 5 in Figure 1).
  4. For analysis, we can connect to BigQuery using various tools such as Tableau and Python.

Beam makes this process very simple, regardless of whether we have a streaming data source or a CSV file, and we want to do batch processing. Later you will see that the code contains only the minimum changes necessary to switch between them. This is one of the benefits of using Beam.


Figure 1: The main data pipeline

Creating Pseudo Data Using Faker


As I mentioned earlier, due to limited access to data, I decided to create pseudo-data in the same format as the actual ones. This was a really useful exercise, as I could write code and test the pipeline while I was expecting data. I suggest taking a look at the Faker documentation if you want to know what else this library has to offer. Our user data will generally be similar to the example below. Based on this format, we can generate line-by-line data to simulate real-time data. These logs give us information such as date, type of request, response from the server, IP address, etc.

192.52.197.161 - - [30/Apr/2019:21:11:42] "PUT /tag/category/tag HTTP/1.1" [401] 155 "https://harris-lopez.com/categories/about/" "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_11_2) AppleWebKit/5312 (KHTML, like Gecko) Chrome/34.0.855.0 Safari/5312"

Based on the line above, we want to create our LINE variableusing 7 variables in braces below. We will also use them as variable names in our table schema a bit later. If we were to perform batch processing, the code would be very similar, although we would need to create a set of samples in a certain time range. To use a faker, we simply create an object and call the methods we need. In particular, Faker was useful for creating IP addresses as well as websites. I used the following methods:

LINE = """\
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\
"""




fake.ipv4()
fake.uri_path()
fake.uri()
fake.user_agent()


from faker import Faker
import time
import random
import os
import numpy as np
from datetime import datetime, timedelta
LINE = """\
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\
"""
def generate_log_line():
    fake = Faker()
    now = datetime.now()
    remote_addr = fake.ipv4()
    time_local = now.strftime('%d/%b/%Y:%H:%M:%S')
    request_type = random.choice(["GET", "POST", "PUT"])
    request_path = "/" + fake.uri_path()
    status = np.random.choice([200, 401, 404], p = [0.9, 0.05, 0.05])
    body_bytes_sent = random.choice(range(5, 1000, 1))
    http_referer = fake.uri()
    http_user_agent = fake.user_agent()
    log_line = LINE.format(
        remote_addr=remote_addr,
        time_local=time_local,
        request_type=request_type,
        request_path=request_path,
        status=status,
        body_bytes_sent=body_bytes_sent,
        http_referer=http_referer,
        http_user_agent=http_user_agent
    )
    return log_line


The end of the first part.

In the coming days, we will share with you the continuation of the article, but now we are traditionally waiting for comments ;-).

Second part

Also popular now: