Work with AVRO format in python - fastavro library
The article describes the use of the AVRO serialization format in python, provides a brief description of the AVRO-scheme with explanations of the most unobvious moments, provides specific examples of python code. The issues of schema evolution, RPC, and AVRO-IDL are deliberately excluded from consideration.
All examples are given using the fastavro library, which the author had to significantly modify to comply with the specification and compatibility with the java implementation.
Historical reference
Why is AVRO and not json, bson, msgpack, protobuf, ASN.1, thrift, yaml?
When decomposing a monolithic system, it became necessary to describe the interaction procedure between microservices. Not long choosing between RabbitMQ and Kafka settled on the latter. But over the next task - the choice of serialization system had to sweat.
When choosing a serialization system, the following requirements were taken into account:
Support for multiple programming languages
The core of our code base is python 2.7. Moreover, in the future I would like to translate performance-sensitive processes into other languages.
Serialization data validation
In a dynamic interpreted python, it’s too easy to accidentally send the “wrong” data. And in our pub-sub model of kafka, it was very important for us to ensure the correctness of the input data in the topic. What was needed was a system that allowed typing Kafka topics.
Type Support
Our system actively operates with Decimal, UUID, and Datetime types. Alas, the well-known serialization formats starting from ASN.1 and ending with msgpack mainly describe the serialization of low-level types (int \ float \ string \ bytes) and do not offer complete solutions for those of interest to us.
Based on these considerations, the choice fell on the AVRO. But suddenly it turned out (spring 2017) that despite the presence of support for logical types in the specification and the JAVA libraries, they were simply ignored in the official AVRO implementation for python or in the competing fastavto . I had to add it myself.
Fastavro turned out to be the most adequate (as well as the fastest) code, as a result, it was decided to modify this library. This was my first experience with open source.
What is AVRO
AVRO is a data serialization system created as part of the Hadoop project. The data is serialized in a binary format using a previously created json scheme, and a scheme is also required for deserialization (possibly a different one).
Also, AVRO will allow within a single container to pack many records specified by a single scheme, which allows you to effectively transfer large amounts of data, avoiding the overhead inherent in other formats.
AVRO circuit
I will not describe in detail the rules for constructing circuits, because they are stated in the official documentation. I will
dwell only on basic things and absolutely not obvious points.
The AVRO scheme is JSON that describes a serializable / deserializable data structure. Data types can be:
- Primitive
- null
- boolean
- int - signed integer 32 bits
- long - signed integer 64 bits
- float
- double
- string - unicode string
- bytes - byte sequence
- fixed - the same bytes, but with a long specified in the scheme
- Compound
- union - type-sum
- recod - type-product
- enum - enumeration
- array - array / list
- map - associative array
- Logical (in AVRO terminology)
- decimal - fixed point number
- date - date
- time-millis - time with millisecond precision
- time-micros - time with microsecond accuracy
- timestamp-millis - date-time with millisecond precision
- timestamp-micros - microsecond precision data-time
- uuid - universally unique identifier
Although the above logical types have long been a standard in relational databases and modern programming languages, serialization libraries have bypassed them, forcing them to reduce them to primitive types, fortunately in AVRO this problem has been solved.
Consider a simple scheme:
{
"type": "record",
"namespace": "notificator",
"name": "out",
"doc": "HTTP нотификация",
"fields": [
{
"doc": "id задания",
"name": "id",
"type": "long"
},
{
"name": "datetime",
"doc": "время постановки задания",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"doc": "источник нотификации",
"name": "source",
"type": [
"null",
"string"
]
},
{
"doc": "Метод",
"name": "method",
"default": "POST",
"type": {
"type": "enum",
"name": "methods",
"symbols": [
"POST",
"GET",
]
}
},
{
"name": "url",
"type": "string"
},
{
"name": "headers",
"type": {
"type": "map",
"values": "string"
}
},
{
"doc": "body",
"name": "body",
"type": "bytes"
}
]
}
The schema begins with a record type declaration with the given name and namespace. These fields in the first lines will be used in code generation systems that are not relevant for python, because our scheme will be processed dynamically. Next is a listing of the field types of our record.
Of particular interest is the declaration of the datetime field, as it contains a boolean type. It is important to remember that boolean types should be specified as nested in field type descriptions .
wrong:
{
"name": "datetime",
"doc": "время постановки задания",
"type": "long",
"logicalType": "timestamp-millis"
},
right:
{
"name": "datetime",
"doc": "время постановки задания",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
Next is the source field declared as union "type": ["null", "string"]
, this entry means that the source value can be one of two types null
or string
. Thus, it is possible to combine not only primitive types, but also composite and logical ones. Examples of such combinations, as well as more complex schemes, can be found here.
Another non-obvious point is related to default
: the default value must be set for the first type in the enumeration .
wrong:
{
"name": "f",
"type": ["long", "string"],
"default": "asd"
},
right:
{
"name": "f",
"type": ["string", "long"],
"default": "asd"
},
The logical types Decimal (fixed-point number) and UUID deserve special mention.
Decimal requires additional parameters - the number of characters in the number and the number of decimal places:
{
"name": "money",
"doc": "16 знаков, 4 после точки",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 16,
"scale": 4,
}
}
And the UUID is interesting in that it is not in the specification, but it is implemented . And what is rather strange done - the UUID is encoded by a string.
{
"name": "uuid",
"type": {
"type": "string",
"logicalType": "uuid"
}
}
I had to add such an implementation to fastavro.
Examples of working with fastavro
How to read from the container
import fastavro as avro
with open('some-file.avro', 'rb') as fo:
# можно подменить схему контейнера с помощью reader_schema
reader = fastavro.reader(fo, reader_schema=None)
schema = reader.schema
for record in reader:
process_record(record)
How to write to a container
from fastavro import writer
schema = {
'doc': 'A weather reading.',
'name': 'Weather',
'namespace': 'test',
'type': 'record',
'fields': [
{'name': 'station', 'type': 'string'},
{'name': 'time', 'type': 'long'},
{'name': 'temp', 'type': 'int'},
],
}
records = [
{'station': '011990-99999', 'temp': 0, 'time': 1433269388},
{'station': '011990-99999', 'temp': 22, 'time': 1433270389},
{'station': '011990-99999', 'temp': -11, 'time': 1433273379},
{'station': '012650-99999', 'temp': 111, 'time': 1433275478},
]
with open('weather.avro', 'wb') as out:
writer(out, schema, records)
How to serialize and deserialize data outside a container
Used when transmitting data as messages.
from io import BytesIO
import fastavro
def serialize(schema, data):
bytes_writer = BytesIO()
fastavro.schemaless_writer(bytes_writer, schema, data)
return bytes_writer.getvalue()
def deserialize(schema, binary):
bytes_writer = BytesIO()
bytes_writer.write(binary)
bytes_writer.seek(0)
data = fastavro.schemaless_reader(bytes_writer, schema)
return data
How to suppress reading of logical types
import fastavro
fastavro._reader.LOGICAL_READERS['long-timestamp-millis'] = lambda d, w, r: d
Now the boolean type timestamp-millis will be read not in the python datetime, but in long.
How to read the chart in advance
Fastavro provides the function acquaint_schema, which reads the scheme into the internal repository (there are also looks , but this is a different story).
Having a circuit
{
"name": "decimal_18_6",
"namespace": "types",
"type": "bytes",
"logicalType": "decimal",
"precision": 18,
"scale": 6
}
and downloading it using acquaint_schema you can use a short description of the types in the future:
"fields": [
{
"name": "money1",
"type": "types.decimal_18_6"
},
{
"name": "money2",
"type": "types.decimal_18_6"
},
]
Please note - the name of the type upon access includes its namespace types .decimal_18_6
It is also necessary in some non-trivial cases.