Traditional data architectures are not enough to handle the huge amounts of data generated from millions of users. In addition, the diversity of data sources are increasing every day: Distributed file systems, relational, columnar-oriented, document-oriented or graph Databases.
Letgo has been growing quickly during the last years. Because of this, we needed to improve the scalability or our data platform and endow it further capabilities, like dynamic infrastructure elasticity, real-time processing or real-time complex event processing. In this talk, we are going to dive deeper into our journey. We started from a traditional data architecture with ETL and Redshift, till nowadays where we successfully have made an event oriented and horizontally scalable data architecture.
We will explain in detail from the event ingestion with Kafka / Kafka Connect to its processing in streaming and batch with Spark. On top of that, we will discuss how we have used Spark Thrift Server / Hive Metastore as glue to exploit all our data sources: HDFS, S3, Cassandra, Redshift, MariaDB in a unified way from any point of our ecosystem, using technologies like: Jupyter, Zeppelin, Superset 但側 We will also describe how to made ETL only with pure Spark SQL using Airflow for orchestration.
Along the way, we will highlight the challenges that we found and how we solved them. We will share a lot of useful tips for the ones that also want to start this journey in their own companies.
1 of 92
More Related Content
Designing a Horizontally Scalable Event-Driven Big Data Architecture with Apache Spark
5. L E T G O D ATA P L AT F O R M I N N U M B E R S
500GB
Data daily
Events Processed Daily
1 billion
50K
Peaks of events per Second
600+
Event Types
200TB
Storage (S3)
< 1sec
NRT Processing Time
#SAISExp2
20. B U I L D I N G T H E DATA L A K E
S TO R AG E
#SAISExp2
21. B U I L D I N G T H E DATA L A K E
S TO R AG E
We want to store all events coming from Kafka to S3.
#SAISExp2
22. B U I L D I N G T H E DATA L A K E
S TO R AG E
#SAISExp2
23. S O M E T I M E S S H I T H A P P E N S
S TO R AG E
#SAISExp2
24. D U P L I C AT E D E V E N T S
S TO R AG E
#SAISExp2
25. D U P L I C AT E D E V E N T S
S TO R AG E
#SAISExp2
26. ( V E RY ) L AT E E V E N T S
S TO R AG E
#SAISExp2
27. ( V E RY ) L AT E E V E N T S
S TO R AG E
1
2
3
4
5
6
Dirty Buckets
1 Read batch of events from Kafka.
2 Write each event to Cassandra.
3 Write dirty hours to compact topic:
Key=(event_type, hour).
4 Read dirty hours topic.
5 Read all events with dirty hours.
6 Store in S3
#SAISExp2
29. S 3 P RO B L E M S
S TO R AG E
1. Eventual consistency
2. Very slow renames
S O M E S 3 B I G DATA P RO B L E M S :
30. S 3 P RO B L E M S :
E V E N T UA L C O N S I S T E N C Y
S TO R AG E
#SAISExp2
31. S 3 P RO B L E M S :
E V E N T UA L C O N S I S T E N C Y
S TO R AG E
S3GUARD
S3AFileSystem
The Hadoop FileSystem for Amazon S3
FileSystem
Operations
S3 ClientDynamoDB Client
1
WRITE
fs metadata
READ
fs metadata
WRITE
object data
LIST
object info
LIST
object data
Write Path
Read Path 2 3
1 2
2 1 1 2 3
32. S 3 P RO B L E M S :
S L OW R E N A M E S
S TO R AG E
多Job freeze?
#SAISExp2
33. S 3 P RO B L E M S :
S L OW R E N A M E S
S TO R AG E
New Hadoop 3.1 S3A committers:
Directory
Partitioned
Magic
#SAISExp2
36. R E A L T I M E U S E R S E G M E N TAT I O N
S T R E A M
Stream Journal
User buckets changed
1 2
#SAISExp2
37. R E A L T I M E U S E R S E G M E N TAT I O N
S T R E A M
JOURNAL
User buckets changed
1 2
STREAM
#SAISExp2
38. R E A L T I M E PAT T E R N D E T E C T I O N
S T R E A M
Is it still available?
What condition is it in?
Could we meet at?
Is the price negotiable?
I offer you.$
#SAISExp2
39. R E A L T I M E PAT T E R N D E T E C T I O N
S T R E A M
{
"type": "meeting_proposal",
"properties": {
"location_name": Letgo HQ",
"geo": {
"lat": "41.390205",
"lon": "2.154007"
},
"date": "1511193820350",
"meeting_id": "23213213213"
}
}
Structured data
#SAISExp2
40. R E A L T I M E PAT T E R N D E T E C T I O N
S T R E A M
Meeting proposed + meeting accepted = emit accepted-meeting event
Meeting proposed + nothing in X time = You have a proposal to meet
#SAISExp2
42. G E O DATA E N R I C H M E N T
B AT C H
#SAISExp2
43. G E O DATA E N R I C H M E N T
B AT C H
{
"data": {
"id": "105dg3272-8e5f-426f-
bca0-704e98552961",
"type": "some_event",
"attributes": {
"latitude": 42.3677203,
"longitude": -83.1186093
}
},
"meta": {
"created_at": 1522886400036
}
}
Technically correct but
not very actionable
#SAISExp2
44. G E O DATA E N R I C H M E N T
B AT C H
City: Detroit
Postal code: 48206
State: Michigan
DMA: Detroit
Country: US
What we know:
(42.3677203, -83.1186093)
#SAISExp2
45. G E O DATA E N R I C H M E N T
B AT C H
How we do it:
Populating JTS indices from WKT polygon data
Custom Spark SQL UDF
SELECT geodata.dma_name,
geodata.dma_number AS dma_number,
geodata.city AS city,
geodata.state AS state ,
geodata.zip_code AS zip_code
FROM (
SELECT
geodata(longitude, latitude) AS geodata
FROM .
)
#SAISExp2
53. QU E RY I N G DATA
QU E RY
METASTORE
Thrift Server
Amazon Aurora
54. QU E RY I N G DATA
QU E RY
CREATE TABLE IF NOT EXISTS
database_name.table_name(
some_column STRING,
...
dt DATE
)
USING json
PARTITIONED BY (`dt`)
CREATE TEMPORARY VIEW table_name
USING org.apache.spark.sql.cassandra
OPTIONS (
table "table_name",
keyspace "keyspace_name")
CREATE EXTERNAL TABLE IF NOT EXISTS
database_name.table_name(
some_column STRING...,
dt DATE
)
PARTITIONED BY (`dt`)
USING PARQUET
LOCATION 's3a://bucket-name/database_name/table_name'
CREATE TABLE IF NOT EXISTS database_name.table_name
using com.databricks.spark.redshift
options (
dbtable 'schema.redshift_table_name',
tempdir 's3a://redshift-temp/',
url 'jdbc:redshift://xxxx.redshift.amazonaws.com:5439/letgo?
user=xxx&password=xxx',
forward_spark_s3_credentials 'true')
55. QU E RY I N G DATA
QU E RY
CREATETABLE
STORED AS
CREATETABLE
USING [parquet,json,csv]
70%
Higher performance!
56. QU E RY I N G DATA : B AT C H E S W I T H S Q L
QU E RY
Creating the
table
Inserting data
1 2
#SAISExp2
57. QU E RY I N G DATA : B AT C H E S W I T H S Q L
QU E RY
Creating the
table
1 CREATE EXTERNAL TABLE IF NOT EXISTS database.some_name(
user_id STRING,
column_b STRING,
...
)
USING PARQUET
PARTITIONED BY (`dt` STRING)
LOCATION 's3a://example/some_table'
#SAISExp2
58. QU E RY I N G DATA : B AT C H E S W I T H S Q L
QU E RY
Inserting data
2 INSERT OVERWRITE TABLE database.some_name PARTITION(dt)
SELECT
user_id,
column_b,
dt
FROM other_table
...
#SAISExp2
59. QU E RY I N G DATA : B AT C H E S W I T H S Q L
QU E RY
Problem?
#SAISExp2
60. QU E RY I N G DATA : B AT C H E S W I T H S Q L
QU E RY
200 files because default value of
spark.sql.shuffle.partition
61. QU E RY I N G DATA : B AT C H E S W I T H
S Q L
QU E RY
INSERT OVERWRITE TABLE database.some_name PARTITION(dt)
SELECT
user_id,
column_b,
dt
FROM other_table
...
?
#SAISExp2
62. QU E RY I N G DATA : B AT C H E S W I T H S Q L
QU E RY
?
DISTRIBUTE BY (dt):
Only one 鍖le not Sorted
CLUSTERED BY (dt, user_id, column_b):
Multiple 鍖les
DISTRIBUTE BY (dt) SORT BY (user_id, column_b):
Only one 鍖le sorted by user_id, column_b.
Good for joins using this properties.
#SAISExp2
63. QU E RY I N G DATA : B AT C H E S W I T H S Q L
QU E RY
INSERT OVERWRITE TABLE database.some_name
PARTITION(dt)
SELECT
user_id,
column_b,
dt
FROM other_table
...
DISTRIBUTE BY (dt) SORT BY (user_id)
#SAISExp2
64. QU E RY I N G DATA : B AT C H E S W I T H S Q L
QU E RY
#SAISExp2