Знакомство с Кафкой
Apache Kafka — это распределенная система обмена сообщениями с публикацией и подпиской. Первоначально он был разработан корпорацией LinkedIn, которая в 2010 году внесла свой вклад в Apache Foundation и стала проектом с открытым исходным кодом высшего уровня. Kafka используется для создания конвейеров данных в реальном времени и потоковых приложений. Он обладает горизонтальной масштабируемостью, отказоустойчивостью, чрезвычайно высокой скоростью и широко используется.
Kafka — это не только система распределенных сообщений, но и поддержка потоковых вычислений, поэтому, прежде чем представить приложение Kafka в Apache Flink, давайте возьмем простой пример Kafka, чтобы интуитивно понять, что такое Kafka.
Установить
Эта статья не является систематическим и подробным введением в Kafka, а предназначена для того, чтобы каждый мог интуитивно понять Kafka, чтобы его можно было хорошо применить в Apahe Flink, поэтому мы устанавливаем Kafka самым простым способом.
Скачать бинарный пакет
curl -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
Разархивируйте установку
Для установки Kafka нужно только распаковать загруженный tgz следующим образом:
jincheng:kafka jincheng.sunjc$ tar -zxf kafka_2.11-2.1.0.tgz
jincheng:kafka jincheng.sunjc$ cd kafka_2.11-2.1.0
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ ls
LICENSE NOTICE bin config libs site-docs
Среди них bin содержит все команды управления Kafka, такие как сервер Kafka, который мы начнем дальше.
Запустить KafkaServer
Kafka — это система публикации и подписки, и подписка на сообщения должна сначала иметь службу. Мы запускаем экземпляр Kafka Server. Kafka должен использовать ZooKeeper.Для производственного развертывания нам нужно установить кластер ZooKeeper, что не входит в рамки этой статьи, поэтому мы используем сценарий, предоставленный Kafka, для установки экземпляра ZooKeeper только с одним узлом. следующее:
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/zookeeper-server-start.sh config/zookeeper.properties &
[2019-01-13 09:06:19,985] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
....
....
[2019-01-13 09:06:20,061] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
После запуска ZooKeeper привязывается к порту 2181 (по умолчанию). Затем мы запускаем сервер Kafka следующим образом:
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-server-start.sh config/server.properties
[2019-01-13 09:09:16,937] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-01-13 09:09:17,267] INFO starting (kafka.server.KafkaServer)
[2019-01-13 09:09:17,267] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-01-13 09:09:17,284] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
...
...
[2019-01-13 09:09:18,253] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
Если все пойдет хорошо, установка Kafka завершена.
Создать тему
Kafka — это система подписки на сообщения.Сначала создайте тему, на которую можно подписаться.flink-tipic
Тема, в новом терминале выполните следующую команду:
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic
Created topic "flink-tipic".
Следующая информация об успешном создании также будет выведена в терминал Kafka Server:
...
[2019-01-13 09:13:31,156] INFO Created log for partition flink-tipic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
...
показано вышеflink-topic
Конфигурация основных атрибутов, таких как метод сжатия сообщений, формат сообщений, количество резервных копий и т. д.
В дополнение к просмотру журнала мы можем использовать команду display для запроса, успешно ли мы создалиflink-topic
,следующее:
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181
flink-tipic
если выводflink-tipic
, значит наша тема успешно создана.
Так где тема сохранена? Как Кафка публикует сообщения и подписывается на них? Для наглядности давайте посмотрим на следующую схематическую диаграмму архитектуры Kafka для простого понимания:
Чтобы кратко представить, Kafka использует ZooKeeper для хранения информации о кластере, то есть экземпляре сервера Kafka, который мы начали выше.В кластере может быть несколько экземпляров сервера Kafka.Сервер Kafka называется Broker, и темы, которые мы создаем, могут быть в одном или более Брокеры. Kafka использует режим Push для отправки сообщений и режим Pull для получения сообщений.
Отправить сообщение
Как отправить сообщение в существующую тему?Конечно, мы можем написать код для отправки сообщения по пути API. В то же время вы также можете использовать метод команды для удобной отправки сообщений следующим образом:
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
>Kafka test msg
>Kafka connector
Выше мы отправили два сообщенияKafka test msg
иKafka connector
прибытьflink-topic
Тема.
прочитать сообщение
Что делать, если сообщение указанной темы прочитано? То же самое можно сделать как API, так и командным способом, читаем в командном режимеflink-topic
сообщение следующим образом:
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning
Kafka test msg
Kafka connector
в--from-beginning
Описывает, где мы читаем сообщение с начала темы.
Flink Kafka Connector
Ранее мы установили среду Kafka самым простым способом, а затем представили использование Flink Kafka Connector в указанной выше среде. Базовые знания, связанные с соединителем Flink, будут представлены в «Серии обсуждений Apache Flink (14) — соединители», здесь мы непосредственно представляем содержимое, связанное с соединителем Kafka.
Apache Flink предоставляет несколько версий Kafka Connector.В этой статье в качестве примера используется flink-1.7.0.
maven-зависимости
Чтобы использовать Kakfa Connector, нам нужно добавить зависимость от Kafka Connector в наш pom следующим образом:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.7.0</version>
</dependency>
Потребитель Flink Kafka должен знать, как преобразовывать двоичные данные в Kafka в объекты Java/Scala.DeserializationSchema
Позволяет пользователю указать такой режим. Вызовите метод T deserialize(byte[] message) для каждого сообщения Kafka, передав значение из Kafka.
Examples
В нашем примере данные считываются из Kafka, а затем после простой обработки записываются в Kafka. Нам нужно создать еще одну тему для написания, а именно:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output
Итак, в примере, который мы используем, источникflink-topic
, для раковиныslink-topic-output
.
Simple ETL
Мы предполагаем, что в Kafka хранится простая строка, поэтому нам нуженserialize
иdeserialize
реализации, то есть мы хотим определить реализациюDeserializationSchema
иSerializationSchema
Классы сериализации и десериализации. Поскольку наш пример представляет собой строку, мы определяемKafkaMsgSchema
Реализуйте класс, а затем напишите основную программу Flink.
KafkaMsgSchema — полный код
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> {
private static final long serialVersionUID = 1L;
private transient Charset charset;
public KafkaMsgSchema() {
// 默认UTF-8编码
this(Charset.forName("UTF-8"));
}
public KafkaMsgSchema(Charset charset) {
this.charset = Preconditions.checkNotNull(charset);
}
public Charset getCharset() {
return this.charset;
}
public String deserialize(byte[] message) {
// 将Kafka的消息反序列化为java对象
return new String(message, charset);
}
public boolean isEndOfStream(String nextElement) {
// 流永远不结束
return false;
}
public byte[] serialize(String element) {
// 将java对象序列化为Kafka的消息
return element.getBytes(this.charset);
}
public TypeInformation<String> getProducedType() {
// 定义产生的数据Typeinfo
return BasicTypeInfo.STRING_TYPE_INFO;
}
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
out.writeUTF(this.charset.name());
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
String charsetName = in.readUTF();
this.charset = Charset.forName(charsetName);
}
}
Основная программа - полный код
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import java.util.Properties;
public class KafkaExample {
public static void main(String[] args) throws Exception {
// 用户参数获取
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
// Stream 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Source的topic
String sourceTopic = "flink-topic";
// Sink的topic
String sinkTopic = "flink-topic-output";
// broker 地址
String broker = "localhost:9092";
// 属性参数 - 实际投产可以在命令行传入
Properties p = parameterTool.getProperties();
p.putAll(parameterTool.getProperties());
p.put("bootstrap.servers", broker);
env.getConfig().setGlobalJobParameters(parameterTool);
// 创建消费者
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(
sourceTopic,
new KafkaMsgSchema(),
p);
// 设置读取最早的数据
// consumer.setStartFromEarliest();
// 读取Kafka消息
DataStream<String> input = env.addSource(consumer);
// 数据处理
DataStream<String> result = input.map(new MapFunction<String, String>() {
public String map(String s) throws Exception {
String msg = "Flink study ".concat(s);
System.out.println(msg);
return msg;
}
});
// 创建生产者
FlinkKafkaProducer producer = new FlinkKafkaProducer<String>(
sinkTopic,
new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()),
p,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
// 将数据写入Kafka指定Topic中
result.addSink(producer);
// 执行job
env.execute("Kafka Example");
}
}
Запустите основную программу следующим образом:
Процесс моей тестовой операции выглядит следующим образом:
запускать
flink-topic
иflink-topic-output
тяга потребления;по команде
flink-topic
добавить тестовое сообщениеonly for test
;Подтвердите добавленное тестовое сообщение с помощью команды print
only for test
;Простейший FlinkJob
source->map->sink
Сопоставьте тестовое сообщение:"Flink study ".concat(s)
;Распечатать данные стока через команду;
#### Встроенные схемы
Apache Flink предоставляет следующие три встроенные схемы для распространенных форматов сообщений:
TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema)
Он создает схему на основе TypeInformation Flink. Это полезно, если данные записываются и считываются Flink.JsonDeserializationSchema (and JSONKeyValueDeserializationSchema)
Он преобразует сериализованный JSON в объект ObjectNode, из которого можно получить доступ к полям с помощью objectNode.get("field") as (Int/String/...)(). Объектный узел KeyValue содержит поля «ключ» и «значение», которые содержат все поля и необязательное поле «метаданные», которое предоставляет смещение/раздел/тему этого сообщения.AvroDeserializationSchema
Он считывает данные, сериализованные с использованием формата Avro, используя статически предоставленную схему. Он может вывести схему из классов, сгенерированных Avro (AvroDeserializationSchema.forSpecific(...)), или может использовать предоставленную вручную схему с GenericRecords (используя AvroDeserializationSchema.forGeneric(...))
Чтобы использовать встроенные схемы, вам необходимо добавить следующие зависимости:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.7.0</version>
</dependency>
прочитать конфигурацию местоположения
Когда мы используем данные Kafka, нам может потребоваться указать место потребления.FlinkKafkaConsumer
Предоставляет множество удобных настроек местоположения, например:
Consumer.setStartFromEarliest() — начать с самой старой записи;
Consumer.setStartFromLatest() — начать с последней записи;
Consumer.setStartFromTimestamp(...);// начать с указанной временной метки эпохи (миллисекунды);
Consumer.setStartFromGroupOffsets(); // Поведение по умолчанию, продолжение потребления со смещения последнего потребления.
Приведенная выше спецификация позиции может быть точной для каждого раздела, например следующий код:
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // 第一个分区从23L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二个分区从31L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三个分区从43L开始
consumer.setStartFromSpecificOffsets(specificStartOffsets);
Это по-прежнему значение по умолчанию для неуказанных разделов.setStartFromGroupOffsets
Способ.
Открытие темы
Kafka поддерживает автоматическое обнаружение темы, то есть создается обычным способом.FlinkKafkaConsumer
,Например:
// 创建消费者
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>( java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),
new KafkaMsgSchema(),
p);
В приведенном выше примере, когда задание запускается, потребитель подпишется на все темы, имена которых соответствуют указанному регулярному выражению (начиная сsourceTopic
Начальное и конечное значения одного числа).
Определить водяной знак (окно)
Применение Kafka Connector не ограничивается описанным выше простым извлечением данных.Мы чаще ожидаем оконных операций с данными Kafka во время события, поэтому нам необходимо определить водяной знак в Flink Kafka Source.
Чтобы определить Event-time, во-первых, нужно передать атрибут времени в данные Kafka, предполагая, что наши данныеString#Long
формат, напримерonly for test#1000
. тогда мы будемLong
как столбец времени.
KafkaWithTsMsgSchema — полный код
Чтобы проанализировать вышеуказанный формат данных Kafka, нам нужно разработать пользовательскую схему, например вызовKafkaWithTsMsgSchema
,будетString#Long
Разбор как javaTuple2<String, Long>
, полный код выглядит следующим образом:
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
private transient Charset charset;
public KafkaWithTsMsgSchema() {
this(Charset.forName("UTF-8"));
}
public KafkaWithTsMsgSchema(Charset charset) {
this.charset = Preconditions.checkNotNull(charset);
}
public Charset getCharset() {
return this.charset;
}
public Tuple2<String, Long> deserialize(byte[] message) {
String msg = new String(message, charset);
String[] dataAndTs = msg.split("#");
if(dataAndTs.length == 2){
return new Tuple2<String, Long>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));
}else{
// 实际生产上需要抛出runtime异常
System.out.println("Fail due to invalid msg format.. ["+msg+"]");
return new Tuple2<String, Long>(msg, 0L);
}
}
@Override
public boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) {
return false;
}
public byte[] serialize(Tuple2<String, Long> element) {
return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);
}
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
out.writeUTF(this.charset.name());
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
String charsetName = in.readUTF();
this.charset = Charset.forName(charsetName);
}
@Override
public TypeInformation<Tuple2<String, Long>> getProducedType() {
return new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
}
}
Генерация водяных знаков
Извлечение временных меток и создание водяных знаков требует реализации собственного извлечения времени и генератора водяных знаков. Внутри Apache Flink есть 2 способа:
AssignerWithPunctuatedWatermarks — водяные знаки создаются для каждой записи.
-
AssignerWithPeriodicWatermarks — периодическое создание водяных знаков.
мы начинаем с
AssignerWithPunctuatedWatermarks
В качестве примера напишите собственное извлечение времени и генератор водяных знаков. код показывает, как показано ниже:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
public class KafkaAssignerWithPunctuatedWatermarks
implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Tuple2<String, Long> o, long l) {
// 利用提取的时间戳创建Watermark
return new Watermark(l);
}
@Override
public long extractTimestamp(Tuple2<String, Long> o, long l) {
// 提取时间戳
return o.f1;
}
}
Основная программа - Полная программа
Мы вычисляем окно Tumble размером 1 секунду и вычисляем наибольшее значение в пределах окна. Полная процедура выглядит следующим образом:
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import java.util.Properties;
public class KafkaWithEventTimeExample {
public static void main(String[] args) throws Exception {
// 用户参数获取
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
// Stream 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置 Event-time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Source的topic
String sourceTopic = "flink-topic";
// Sink的topic
String sinkTopic = "flink-topic-output";
// broker 地址
String broker = "localhost:9092";
// 属性参数 - 实际投产可以在命令行传入
Properties p = parameterTool.getProperties();
p.putAll(parameterTool.getProperties());
p.put("bootstrap.servers", broker);
env.getConfig().setGlobalJobParameters(parameterTool);
// 创建消费者
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Tuple2<String, Long>>(
sourceTopic,
new KafkaWithTsMsgSchema(),
p);
// 读取Kafka消息
TypeInformation<Tuple2<String, Long>> typeInformation = new TupleTypeInfo<Tuple2<String, Long>>(
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
DataStream<Tuple2<String, Long>> input = env
.addSource(consumer).returns(typeInformation)
// 提取时间戳,并生产Watermark
.assignTimestampsAndWatermarks(new KafkaAssignerWithPunctuatedWatermarks());
// 数据处理
DataStream<Tuple2<String, Long>> result = input
.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
.max(0);
// 创建生产者
FlinkKafkaProducer producer = new FlinkKafkaProducer<Tuple2<String, Long>>(
sinkTopic,
new KeyedSerializationSchemaWrapper<Tuple2<String, Long>>(new KafkaWithTsMsgSchema()),
p,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
// 将数据写入Kafka指定Topic中
result.addSink(producer);
// 执行job
env.execute("Kafka With Event-time Example");
}
}
Тест работает следующим образом
Чтобы объяснить кратко, мы вводим числа следующим образом:
Msg | Watermark |
---|---|
E#1000000 | 1000000 |
A#3000000 | 3000000 |
B#5000000 | 5000000 |
C#5000100 | 5000100 |
E#5000120 | 5000120 |
A#7000000 | 7000000 |
что мы видим5000000~7000000
данные, между которымиB#5000000
, C#5000100
иE#5000120
является содержимым того же окна. Вычислить значение MAX, сравнить по строке, выводится самое большое сообщениеE#5000120
.
Кафка несет временные метки
В Kafka-0.10+ сообщения могут содержать метки времени, а это означает, что нет необходимости добавлять столбец данных в виде меток времени отдельно в msg. Это проще только при использовании Flink как для записи, так и для чтения. В общем, вышеприведенного примера достаточно.
резюме
В этой статье рассказывается о том, как Kafka применяется во Flink. Она начинается с введения в простую установку Kafka и демонстрации команд для отправки и получения сообщений, а затем используется простое извлечение данных и пример окна во время события. чтобы вы интуитивно почувствовали, как использовать Kafka с Apache Flink.
Статьи, которые могут вас заинтересовать:
Более практические случаи будут обновлены позже...