Apache
Михаил Михайленко, Moneytap /I-Free, 2015
http://spark.apache.org/
http://i-free.com/
http://moneytapp.com/
https://vk.com/sniff303
Что это такое?
Framework для написания распределенных приложений
обрабатывающих данные. Решает задачи:
•Map-Reduce, но в памяти (это не означает, что объем данных должен быть меньше чем
объем RAM)
•Stream Processing
Для чего это нужно?
•Любые аналитические отчеты, к примеру, как Яндекс. Метрика или
Google Analytics
•Задачи машинного обучения (классификация, прогнозы...)
•…
Альтернативы и отличии
Spark как альтернатива Hadoop для Map- Reduce
•Доступное API на Scala, Python и Java
•Не требует очень тяжелой настройки для небольших инсталляций
•Выигрывает в скорости, осуществляя процессинг данных, храня их в памяти и вылезая на
диск только при необходимости
Spark Streaming как альтернатива Apache Storm
•Гарантированная обработка каждого события только один раз
Как все устроено?
Driver Program — часть вашего приложения отвечающего за связь c кластером.
Cluster Manager — процесс или группа процессов, осуществляющих распределение ресурсов
всего кластера.
Worker Node — группа процессов на различных машинах, осуществляют контроль за ресурсами
машины, на которой находятся.
Executor — процессы, непосредственно исполняющие задачи .
Режимы работы
•Local
•Standalone (+ ZooKeeper)
•Hadoop YARN
•Apache MESOS
Как с этим работать?
Map-Reduce
Источник изображения: http://xiaochongzhang.me/blog/?p=338
Resilient Distributed Dataset
RDD - Распределенная, неизменяемая коллекция элементов,
доступная для параллельной обработки. Она знает как вычислить саму
себя в случае сбоя, а так же х ранит ссылки на своих предков. Может
быть записана на диск.
RDD RDD RDD
transform transform
Загрузка данных в RDD
Источники:
•Local file system
•HDFS
•Cassandra
•HBase
•Amazon S3
Форматы:
•Plain text
•GZIPpped plain text
•Hadoop InputFormat’s
JavaRDD<String> localRDD = sparkContext.textFile("/path/data_2015_05_*.txt.gz ");
JavaRDD<String> hdfsRDD = sparkContext.textFile("hdfs://...");
Операции над RDD
Transformations — преобразуют
данные из RDD, "лениво" создавая
новый RDD.
- .map(function)
- .flatMap(function)
- .filter(function)
- .sample(n)
- .union(anotherRDD)
- .intersection(anotherRDD)
- .distinct()
- .groupByKey()
- .reduceByKey()
- .join(anotherRDD)
Actions - возвращают результаты в
ваше приложение.
- .reduce(function)
- .collect()
- .count()
- .take(n)
- .takeOrdered(n, comparator)
- .foreach(function)
Persistence — сохранение RDD.
- .saveAs...()
- .persist(memoryLevel)
- .unpersist()
Map<String, Long> sdkVersions = sparkContext.textFile(filePath)
.filter(s -> s.contains("AD_GET"))
.map(s -> Extractors.extractSdkVersion(s))
.mapToPair(t -> new Tuple2<>(t._2(), 1L))
.reduceByKey((left, right) -> left + right)
.collectAsMap();
Общие переменные
Broadcast variables —
Read-only переменные, значение
которых доступно с любого Executor'а
вашей программы. Разъезжаются по
кластеру с помощью P2P протокола.
Accumulators —
Переменные, изменять значения
которых можно только путем
прибавления к ним какого либо
значения.
List<Long> largeList = ...;
Broadcast<List<Long>> broadcastVar = sparkContext.broadcast(largeList);
Accumulator<Long> accum = sparkContext.accumulator(0);
public class MapAccumulator implements AccumulatorParam<Map<Long, Long>> {
@Override
public Map<Long, Long> addAccumulator(Map<Long, Long> m1, Map<Long, Long> m2) {
for (Map.Entry<Long, Long> m2entry : m2.entrySet()) {
Long m1val = m1.get(m2entry.getKey());
if (m1val == null) {
m1val = m2entry.getValue();
} else {
m1val += m2entry.getValue();
}
m1.put(m2entry.getKey(), m1val);
}
return m1;
}
//…
}
Пример!
public class SparkExample {
public static void main(String... args) {
if (args.length < 3) {
throw new IllegalArgumentException ();
}
String date = args[0];
String appId = args[1];
String network = args[2];
final String filePath = String.format("/var/stat/%s/mt/%s/%s/*.ldjson.gz ", date, appId, network);
SparkConf sparkConfiguration =
new SparkConf().setAppName("SparkExample-" + date + "-" + appId + "-" + network);
JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
JavaRDD<Tuple3<String, String, Long>> dataForApp =
sparkContext .textFile(filePath)
.filter(StringFilter .containJsonKeyValue( "statisticEventType", "AD_GET"))
.map(line -> {
JsonExtractor extract = JsonExtractor.from(line);
return new Tuple3<>(
extract .visitorId(), extract .device(), extract.timestamp()
);
})
.setName( "SparkExampleRDD")
.persist(StorageLevel .MEMORY_ONLY_SER());
Map<String, Long> topDevices = dataForApp.mapToPair(t -> new Tuple2<>(t._2(), 1L))
.reduceByKey((left, right) -> left + right)
.top( 50, DeviceTupleComparator .instance())
.stream()
.collect(Collectors .toMap(Tuple2::_1, Tuple2::_2));
JavaRDD<Tuple2<Long, Long>> usersToSessions =
dataForApp .mapToPair(t -> new Tuple2<>(t._1(), t._3()))
.groupByKey()
.flatMap(t -> {
Iterator<Long> timestamps = t._2().iterator();
SessionCalculator sessions = SessionCalculator.from(timestamps);
if (sessions.isAny()) {
return Collections.singletonList(
new Tuple2<>(sessions.getCount(),
sessions.getApproximateLength()));
} else {
return Collections.emptyList();
}
});
Accumulator<Double> activeUsersAccumulator = sparkContext.accumulator(0.0D);
Accumulator<Map<Long, Long>> sessionLengthAccumulator =
sparkContext .accumulator(new HashMap<>(), MapAccumulator .get());
Accumulator<Map<Long, Long>> sessionCountAccumulator =
sparkContext .accumulator(new HashMap<>(), MapAccumulator .get());
usersToSessions .foreach(t -> {
activeUsersAccumulator .add(1.0D); // active users
Long count = t._1(); // session count for user
Map<Long, Long> map2 = new HashMap<>();
map2 .put(count, 1L);
sessionCountAccumulator .add(map2);
Long minute = t._2(); // session length for user
Map<Long, Long> map = new HashMap<>();
map .put(minute, 1L);
sessionLengthAccumulator .add(map);
});
Запуск кластера Spark в простейшем режиме
1.Скачать и распаковать архив
2.Отредактировать conf/spark-env.sh
3.Отредактировать conf/spark-defaults.conf
4.Запустить master и worker процессы
SPARK_MASTER_IP=...
SPARK_WORKER_MEMORY=...
spark.master=spark:// ...
spark.executor.memory=...
$ ./sbin/start-master.sh
$ ./bin/spark-class org.apache.spark.deploy.worker.Worker
Запуск вашего приложения
1.Запаковать ваш класс в «fat-jar» со всеми
зависимостями, кроме библиотек Spark’а.
2.Выполнить
$ ./bin/spark-submit --class com.ifree.SparkExample \
spark-example.jar
2015-05-26 \
c87ad063-c38f-4d2d-bbfe-d7ddfec5aab0 \
moneytapp
Личный опыт
Советы при эксплуатации
•Иногда ноды падают, используйте Monit для мониторинга и
восстановления.
•Не используйте Java сериализацию. Никогда. Используйте, к примеру,
Kryo.
•Бейте исходные данные на логические куски. Меньше входной кусок,
меньше чтения с диска, меньше время обработки.
•Не используйте Enum’ы в ваших Spark приложениях.
•Экономьте память, не плодите лишние объекты, старайтесь по
возможности переиспользовать их.
•SparkSQL, на самом деле, не так удобен, как кажется на первый
взгляд.
•Нормальный менеджер задач отсутствует*, приготовьтесь иметь дело
с CRON’ом или собственными костылями. Другого способа запустить
задачу, кроме как через spark-submit — нет.
spark.serializer=org.apache.spark.serializer.KryoSerializer
Спасибо за внимание!
http://spark.apache.org/
http://lambda-architecture.net/
https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x