Presentation used during Ticketmaster's internal engineering conference in early 2018.
A lot of confidential data is removed from the original presentation, and thus some slides are simplified or completely removed.
Size: 2.56 MB
Language: en
Added: Aug 14, 2024
Slides: 45 pages
Slide Content
MIGRATING TM DATA
USING NEW TOOLS FOR
Dejan Vukmirovic, Ticketmaster Conference, 2018
CONTEXT
BUSINESS BACKGROUND
▸Ticketmaster International (TM) is segmented into markets.
▸Each user must create new account
in each market (country) they wants to purchase tickets in.
▸Users can’t login to different markets (no SSO).
▸Roughly 2% of existing TM accounts
have a shared email address across markets.
WHY ARE WE
BUILDING THIS
?!!?!?!?!
TARGET
▸New B2C Accounts service that will:
▸Introduce higher level entity
customer that will also wrap existing member data.
▸Expose both internal and public APIs
(registration, login, update of details…).
▸Stream aggregated customers/members data.
▸Eventually enable removal of
member functionalities from legacy backend (MFX).
WHAT ARE WE
BUILDING ???
ARCHITECTURE
IDEA
DATABASE (OPTIONS)
▸Couchbase was initial choice as:
▸it is blazingly fast storage for access by key
▸has built-in query language N1QL (query for JSON)
▸Initial development, about a month, was done with CB.
▸Got vetoed by Ops and DBA as:
▸it is a nightmare for maintenance
▸scaling is manual by explicitly adding new nodes
▸All other NoSQLs were unsupported by TM’s Tech Radar.
AWS AURORA
▸“…is a fully managed, MySQL-compatible,
relational database running in the AWS Cloud.”
▸Based on usage, storage will automatically grow,
up to 64 TB, in 10GB steps with no impact to performance.
▸Automatically maintains 6 copies of data
across 3 Availability Zones.
▸Single master node,
others form “read cluster”.
READING FROM FEED (OPTIONS)
▸Fork of MOTIF (project owned by other TM team).
▸Piggy-back on existing MOTIF with additional config.
▸New Java app/service with ActiveMQ integration
(Spring Messaging, Apache Camel…).
▸Apache Storm cluster.
…AND THE
WINNER IS
▸“… is a scalable and distributed processing
computation framework.”
▸Storm is executing topologies, made of spouts and bolts.
▸Multiple topologies can be run on the same cluster.
APACHE STORM
DOWNSTREAM (OPTIONS)
▸Goals:
▸Separate aggregated data consumption from service
▸Support big number of simultaneous clients.
▸Have persistent messages,
no processing to generate messages on client’s request.
▸“Traditional” messaging platforms (ActiveMQ, RabbitMQ..)
rejected due to limited retention policy.
???
APACHE KAFKA
▸“… is scalable, durable, and fault-tolerant
publish-subscribe messaging system.”
▸Kafka exposes topics.
▸Each topic is split into partitions,
each containing unique data.
▸All partitioned are replicated across brokers,
so in case of failure
the data will be served from other nodes.
SOLUTION
RUNNING THE ENV
STORM TOPOLOGY
▸Spout to listen to MFX’s ActiveMQ.
And sends unchanged message to bolt.
▸Bolt attaches metadata, doesn’t change original message.
And sends via HTTP to Accounts service.
So, no computation? No message processing?
Is this overkill?!
RUNNING THE STORM
▸Components:
▸Manager: Zookeeper, Nimbus, UI, LogViewer
▸Worker: Supervisor, LogViewer
▸All components are dockerised.
▸In Production:
3 Manager servers and 4 Worker servers,
total of 7 EC2 instances.
KAFKA TOPIC(S)
▸A single topic with customer data.
▸Topic is partitioned on 3 brokers in 3 replicas
▸Note: there is additional topic that is only used
to provide way for doing healthcheck for monitoring.
RUNNING THE KAFKA
▸Components:
▸Zookeeper
▸Brokers
▸Brokers are not dockerised.
▸In Production:
6 Zookeeper servers and 3 Broker servers,
total of 6 EC2 instances.
▸Each Broker is recording it’s data to local disk.
ENVIRONMENT PROVISIONING
▸Achieved with Terraform scripts, via Gitlab runners.
▸Scripts are fully describing
required servers, connectivities, clusters, etc.
▸Enables automated process with predictable results.
▸Dev team was developing most the scripts.
▸Number of blueprints came from Architecture team.
LOG AGGREGATION
▸All applications (Tomcat, Apache proxy) are
1) in Docker container
2) accompanied with additional FluentD container
▸FluentD is either
1) listening to console output (Tomcat)
2) reading from files (Apache)
and sending logs to ElasticSearch cluster.
▸ElasticSearch as managed AWS service.
▸Application logs are in JSON format
as easier for transport (no regex parsing)
and easy for indexing by ElasticSearch,
CAPACITY TESTING
▸Developed framework in Gatling.
▸Terraform setup to create cluster of EC2s
with each running the same Gatling scenario.
▸Lightweight compared to alternatives like SOASTA.
▸Started early testing with
first “complete” version
of service.
ADDITIONAL WORK
“OFFICIAL” SPOUT FOR JMS INTEGRATION
▸https://github.com/ptgoetz/storm-jms
Not under Apache’s hub, but listed on Apache website.
▸We found 2 major bugs, not edge cases.
▸Bug 1: NullPointerException
on very start of the Storm topology
▸Bug 2: ClassCastException
after we introduced timeouts on streams
▸Bypassed the issue by overriding
the Spout implementation with our custom one.
NEW TM’S HEALTHCHECK STANDARD
▸Original spec stated that:
when any non-critical component is down
returned HTTP status code must be 299 (Degraded)
▸ELB (Elastic Load Balancer) on AWS doesn’t support
custom response headers like 299 is
and stops routing traffic to that node.
▸v2 of spec stated that when “degraded”
service should return 200 (Success)
and with info in payload.
We contributed
with v2 implementation
TROUBLESHOOTING DOWNSTREAM
▸Created utility “tool” for reading entire topics on Kafka.
▸Running it from Rundeck.
MIGRATING DATA
THE PROBLEM
▸Total of 15 markets with 60M members.
▸Biggest market is Canada with ~9.5M members.
▸Full-syncs via ActiveMQs, instance per market,
are required to get all data from legacy MFX backend.
▸Full-syncs are blocking queues for all other TM products,
thus they had to be done quickly and in “dead hours”.
▸For confidence in migration it was required to test
with the “sanitised” data from production env.
PARALLELISM IN STORM
▸Each Worker node can support multiple executors/slots
that are executing any part of topology (spout or bolt).
▸Parallelism is achieved through:
number of executors
number of topology parts.
FIRST RUN
▸“Default” setup on Storm:
4 slots,
1 spout per market
8 bolts
no other settings
▸Initial results were great,
speed was over 20k of members per minute.
▸But when we moved to markets
with millions of members…
▸Using t2.micro EC2s for:
Storm worker nodes
Application servers
Storm worker node crashed.
Heapspace…
WHERE IS THE MEMORY USAGE?
▸1: ActiveMQ consumption was on auto-acknowledge,
so new messages was consumed instantly.
▸1: JMS Spout did not use Storm’s internal ACK mechanism.
▸2: We used no limitations for buffer on bolt’s side,
so spouts did not wait till sending new messages.
1
2
STORM CONFIGS
▸Configured bolt’s inbound buffer size
to receive max 20000 messages
▸Used bigger EC2 instances of type T with more RAM.
▸Aaand… still memory problems.
A bit later in the migration. But still happening.
ACTIVEMQ CONFIGS
▸1st: We started using queuePrefetch in connection URL.
“…to limit the maximum number of messages
that can be dispatched to an individual consumer at once"
▸Only that didn’t help as auto-acknowledge would
“empty” the prefetch queue again almost instantly.
▸2nd: We started using Storm’s explicit acknowledgement
so messages would be removed from ActiveMQ
after being processed by bolt.
▸Again, same thing…
ACTIVEMQ CONFIGS
▸3rd: We extended JMS Spout implementation
to handle ActiveMQ specific type of acknowledgement
of a single message.
▸So, we finally got the data pipeline!!!
▸Individual acknowledgement has additional benefit:
in case of failure to deliver messages
they will remain on ActiveMQ for later analysis or retry.
PERFORMANCE DROP
▸Now migrations would run with no errors…
▸But after some time the speed of processing messages
would get reduced to less than half.
▸Reason: We were using EC2 instances of T type and
they have throttling which starved CPU.
▸Solution: We started using M4 instances.
(very expensive with more RAM than we need)
We reverted back to T types
after validating migrations are ok.
LOG FLOOD
▸During the migration application boxes
would become unresponsive.
▸Reason: no disk space left.
▸FluentD was unable to send data
as fast as it was generated by application
so it was writing it locally.
▸Solution: stopped logging everything,
except access logs and errors, during the migration.
▸Good enough to track progress.
▸Potential risk for troubleshooting.
ZOOKEEPER ISSUE
▸Occasionally the Storm cluster would stop working.
▸Nimbus nodes would report being unable
to select a leader.
▸Reason: Zookeeper instances collapsed
due to disk space issue.
▸Solution: Enable additional log retention policy
Zookeepers would perform themselves.
FINAL SETUP
▸Final bottleneck - CPU usage on Aurora.
▸We ended with:
▸4 spouts per market
▸total of 20 bolts
▸12 executors
(as memory consumption was not big any more,
so 3 executors share available RAM on each EC2 node)
FINAL PERFORMANCE
▸AWS Aurora on 99% CPU
▸Member processing speed stable on 1.2M per hour
▸Storm’s bolt capacity between 1.0 and 1.2
KEY WINS
CLOUD WIN
▸AWS+Terraform enabled us to recreate test environments
multiple times a day.
▸We would never be able to pull this in on-prem,
as there is required to provide environment requirements
…in advance
…based on estimate
TIAP
AWS
DATA WIN
▸Storm enabled us to write simple (almost no) code,
and to focus instantly on tuning the throughput.
▸Kafka also offloaded a lot of work from team
on making sure consistent data is provided downstream.
▸Note: during 1st year of live service we experienced issues
with replication of data on Kafka collapsing.
▸Solution was to not store data to local disk on instances,,
and use Secor tool for permanent storing data in S3
and later replay of messages to new topic/cluster.