Apache spark

AntonAnokhin1 13,931 views 21 slides Jul 24, 2015
Slide 1
Slide 1 of 21
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21

About This Presentation

Apache Spark at i-Free Meet-up


Slide Content

Apache
Михаил Михайленко, Moneytap /I-Free, 2015
http://spark.apache.org/
http://i-free.com/
http://moneytapp.com/
https://vk.com/sniff303

Что это такое?
Framework для написания распределенных приложений
обрабатывающих данные. Решает задачи:
•Map-Reduce, но в памяти (это не означает, что объем данных должен быть меньше чем
объем RAM)
•Stream Processing

Для чего это нужно?
•Любые аналитические отчеты, к примеру, как Яндекс. Метрика или
Google Analytics
•Задачи машинного обучения (классификация, прогнозы...)
•…

Альтернативы и отличии
Spark как альтернатива Hadoop для Map- Reduce
•Доступное API на Scala, Python и Java
•Не требует очень тяжелой настройки для небольших инсталляций
•Выигрывает в скорости, осуществляя процессинг данных, храня их в памяти и вылезая на
диск только при необходимости
Spark Streaming как альтернатива Apache Storm
•Гарантированная обработка каждого события только один раз

Как все устроено?
Driver Program — часть вашего приложения отвечающего за связь c кластером.
Cluster Manager — процесс или группа процессов, осуществляющих распределение ресурсов
всего кластера.
Worker Node — группа процессов на различных машинах, осуществляют контроль за ресурсами
машины, на которой находятся.
Executor — процессы, непосредственно исполняющие задачи .

Режимы работы
•Local
•Standalone (+ ZooKeeper)
•Hadoop YARN
•Apache MESOS

Как с этим работать?

Map-Reduce
Источник изображения: http://xiaochongzhang.me/blog/?p=338

Resilient Distributed Dataset
RDD - Распределенная, неизменяемая коллекция элементов,
доступная для параллельной обработки. Она знает как вычислить саму
себя в случае сбоя, а так же х ранит ссылки на своих предков. Может
быть записана на диск.
RDD RDD RDD
transform transform

Загрузка данных в RDD
Источники:
•Local file system
•HDFS
•Cassandra
•HBase
•Amazon S3
Форматы:
•Plain text
•GZIPpped plain text
•Hadoop InputFormat’s
JavaRDD<String> localRDD = sparkContext.textFile("/path/data_2015_05_*.txt.gz ");
JavaRDD<String> hdfsRDD = sparkContext.textFile("hdfs://...");

Операции над RDD
Transformations — преобразуют
данные из RDD, "лениво" создавая
новый RDD.
- .map(function)
- .flatMap(function)
- .filter(function)
- .sample(n)
- .union(anotherRDD)
- .intersection(anotherRDD)
- .distinct()
- .groupByKey()
- .reduceByKey()
- .join(anotherRDD)
Actions - возвращают результаты в
ваше приложение.
- .reduce(function)
- .collect()
- .count()
- .take(n)
- .takeOrdered(n, comparator)
- .foreach(function)
Persistence — сохранение RDD.
- .saveAs...()
- .persist(memoryLevel)
- .unpersist()
Map<String, Long> sdkVersions = sparkContext.textFile(filePath)
.filter(s -> s.contains("AD_GET"))
.map(s -> Extractors.extractSdkVersion(s))
.mapToPair(t -> new Tuple2<>(t._2(), 1L))
.reduceByKey((left, right) -> left + right)
.collectAsMap();

Общие переменные
Broadcast variables —
Read-only переменные, значение
которых доступно с любого Executor'а
вашей программы. Разъезжаются по
кластеру с помощью P2P протокола.
Accumulators —
Переменные, изменять значения
которых можно только путем
прибавления к ним какого либо
значения.
List<Long> largeList = ...;
Broadcast<List<Long>> broadcastVar = sparkContext.broadcast(largeList);
Accumulator<Long> accum = sparkContext.accumulator(0);
public class MapAccumulator implements AccumulatorParam<Map<Long, Long>> {
@Override
public Map<Long, Long> addAccumulator(Map<Long, Long> m1, Map<Long, Long> m2) {
for (Map.Entry<Long, Long> m2entry : m2.entrySet()) {
Long m1val = m1.get(m2entry.getKey());
if (m1val == null) {
m1val = m2entry.getValue();
} else {
m1val += m2entry.getValue();
}
m1.put(m2entry.getKey(), m1val);
}
return m1;
}
//…
}

Пример!

public class SparkExample {
public static void main(String... args) {
if (args.length < 3) {
throw new IllegalArgumentException ();
}
String date = args[0];
String appId = args[1];
String network = args[2];
final String filePath = String.format("/var/stat/%s/mt/%s/%s/*.ldjson.gz ", date, appId, network);
SparkConf sparkConfiguration =
new SparkConf().setAppName("SparkExample-" + date + "-" + appId + "-" + network);
JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
JavaRDD<Tuple3<String, String, Long>> dataForApp =
sparkContext .textFile(filePath)
.filter(StringFilter .containJsonKeyValue( "statisticEventType", "AD_GET"))
.map(line -> {
JsonExtractor extract = JsonExtractor.from(line);
return new Tuple3<>(
extract .visitorId(), extract .device(), extract.timestamp()
);
})
.setName( "SparkExampleRDD")
.persist(StorageLevel .MEMORY_ONLY_SER());
Map<String, Long> topDevices = dataForApp.mapToPair(t -> new Tuple2<>(t._2(), 1L))
.reduceByKey((left, right) -> left + right)
.top( 50, DeviceTupleComparator .instance())
.stream()
.collect(Collectors .toMap(Tuple2::_1, Tuple2::_2));

JavaRDD<Tuple2<Long, Long>> usersToSessions =
dataForApp .mapToPair(t -> new Tuple2<>(t._1(), t._3()))
.groupByKey()
.flatMap(t -> {
Iterator<Long> timestamps = t._2().iterator();
SessionCalculator sessions = SessionCalculator.from(timestamps);
if (sessions.isAny()) {
return Collections.singletonList(
new Tuple2<>(sessions.getCount(),
sessions.getApproximateLength()));
} else {
return Collections.emptyList();
}
});
Accumulator<Double> activeUsersAccumulator = sparkContext.accumulator(0.0D);
Accumulator<Map<Long, Long>> sessionLengthAccumulator =
sparkContext .accumulator(new HashMap<>(), MapAccumulator .get());
Accumulator<Map<Long, Long>> sessionCountAccumulator =
sparkContext .accumulator(new HashMap<>(), MapAccumulator .get());
usersToSessions .foreach(t -> {
activeUsersAccumulator .add(1.0D); // active users
Long count = t._1(); // session count for user
Map<Long, Long> map2 = new HashMap<>();
map2 .put(count, 1L);
sessionCountAccumulator .add(map2);
Long minute = t._2(); // session length for user
Map<Long, Long> map = new HashMap<>();
map .put(minute, 1L);
sessionLengthAccumulator .add(map);
});

Map<Long, Long> sessionLengthDistribution = sessionLengthAccumulator .value();
Map<Long, Long> sessionCountDistribution = sessionCountAccumulator .value();
Long activeUsers = activeUsersAccumulator .value().longValue();
System.out.printf("topDevices: %s", topDevices);
System.out.printf("sessionLengthDistribution: %s ", sessionLengthDistribution);
System.out.printf("sessionCountDistribution: %s ", sessionCountDistribution);
System.out.printf("activeUsers: %s", activeUsers);
dataForApp.unpersist(true);
sparkContext .stop();
}
}

Запуск кластера Spark в простейшем режиме
1.Скачать и распаковать архив
2.Отредактировать conf/spark-env.sh


3.Отредактировать conf/spark-defaults.conf 


4.Запустить master и worker процессы
SPARK_MASTER_IP=...
SPARK_WORKER_MEMORY=...
spark.master=spark:// ...
spark.executor.memory=...
$ ./sbin/start-master.sh
$ ./bin/spark-class org.apache.spark.deploy.worker.Worker

Запуск вашего приложения
1.Запаковать ваш класс в «fat-jar» со всеми
зависимостями, кроме библиотек Spark’а.
2.Выполнить
$ ./bin/spark-submit --class com.ifree.SparkExample \
                     spark-example.jar
                     2015-05-26 \
                     c87ad063-c38f-4d2d-bbfe-d7ddfec5aab0 \
                      moneytapp

Личный опыт

Советы при эксплуатации
•Иногда ноды падают, используйте Monit для мониторинга и
восстановления.
•Не используйте Java сериализацию. Никогда. Используйте, к примеру,
Kryo.

•Бейте исходные данные на логические куски. Меньше входной кусок,
меньше чтения с диска, меньше время обработки.
•Не используйте Enum’ы в ваших Spark приложениях.
•Экономьте память, не плодите лишние объекты, старайтесь по
возможности переиспользовать их.
•SparkSQL, на самом деле, не так удобен, как кажется на первый
взгляд.
•Нормальный менеджер задач отсутствует*, приготовьтесь иметь дело
с CRON’ом или собственными костылями. Другого способа запустить
задачу, кроме как через spark-submit — нет.
spark.serializer=org.apache.spark.serializer.KryoSerializer

Спасибо за внимание!
http://spark.apache.org/
http://lambda-architecture.net/
https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x
Tags