1# -*- coding: utf-8 -*-
2
3import json
4from pathlib import Path
5
6import avro
7import avro.schema
8from avro.io import DatumReader, DatumWriter
9from avro.datafile import DataFileReader, DataFileWriter
10
11
12def save_schema():
13 schema = {
14 "namespace": "example.avro",
15 "type": "record",
16 "name": "User",
17 "fields": [
18 {"name": "name", "type": "string"},
19 {"name": "favorite_number", "type": ["int", "null"]},
20 {"name": "favorite_color", "type": ["string", "null"]},
21 ],
22 }
23 Path("user.avsc").write_text(json.dumps(schema, indent=4))
24
25
26def load_schema() -> avro.schema.Schema:
27 return avro.schema.parse(Path("user.avsc").read_text())
28
29
30def write_data():
31 schema = load_schema()
32 data = [
33 {"name": "Alyssa", "favorite_number": 256},
34 {"name": "Ben", "favorite_number": 7, "favorite_color": "red"},
35 ]
36 with Path("users.avro").open("wb") as f:
37 writer = DataFileWriter(f, DatumWriter(), schema)
38 for record in data:
39 writer.append(record)
40 writer.close()
41
42
43def read_data():
44 with Path("users.avro").open("rb") as f:
45 reader = DataFileReader(f, DatumReader())
46
47 print("--- Schema ---")
48 print(json.dumps(json.loads(reader.schema), indent=4))
49
50 print("--- Data ---")
51 for user in reader:
52 print(user)
53 reader.close()
54
55
56save_schema()
57write_data()
58read_data()