背景
一般 ES 被作为MySQL 模糊查询的替代方案,用来提升模糊查询的效率,减少数据库的慢查询。我们需要将现有 MySQL 表中数据同步到 ES 上,然后配置合适的分词器才能提供模糊查询的功能。下面将介绍MySQL 如何同步数据到 ES 以及日志文件如何同步到 ES。
同步方案
工具
Logstash
Logstash 是 Elastic Stack 的组成部分,常用于数据采集、转换和加载。它提供了一个 JDBC 插件,可以定期从 MySQL 数据库查询数据,并将结果导入到 Elasticsearch。
安装
# mac 下安装
brew install logstash
# 或者到官网下载后,直接运行可执行文件
# https://www.elastic.co/downloads/logstash
配置文件
# 定义输入(Input): 输入部分定义了Logstash从哪里获取数据。例如,从标准输入读取数据可以使用以下配置:
input {
stdin {}
}
# 定义过滤器(Filter): 过滤器部分用于处理和转换数据。例如,使用grok解析来自日志的结构化数据:
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
# 定义输出(Output): 输出部分定义了处理后的数据应该去向何处。例如,将数据发送到Elasticsearch:
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logstash-logs-%{+YYYY.MM.dd}"
}
}
运行
bin/logstash -f path_to_your_logstash.conf
Flink CDC
Flink CDC (Change Data Capture) 是一个集成到 Apache Flink 的库,它允许你捕获并处理数据库中的变更数据流。Flink CDC 可以与各种数据库如 MySQL, PostgreSQL, MongoDB, Oracle 等进行集成,实时捕获数据变化,然后可以将这些变更数据用于流处理或实时分析应用。
- 实时数据同步:捕获源数据库中的实时变更,并可靠地传输到下游系统。
- 事件时间支持:支持基于事件时间的处理,确保即使在乱序或延迟的数据流中也能正确处理。
- 状态管理:利用 Flink 的状态管理和检查点机制,确保故障恢复时数据的一致性和完整性。
- 扩展性和灵活性:可以轻松扩展以处理大规模数据流,并且可以灵活地集成到现有的 Flink 应用程序中。
从 Apache Flink的官方网站 下载最新版本的 Flink。选择适合你的系统的版本(Binary for Scala 2.11/2.12等),下载并解压:
wget https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.12.tgz
tar -xzf flink-1.13.2-bin-scala_2.12.tgz
mv flink-1.13.2 /usr/local/flink
启动 Flink
cd /usr/local/flink
./bin/start-cluster.sh
可以通过访问 http://localhost:8081 来使用 Flink 的 Web UI
使用 Flink CDC
Flink CDC 可以通过添加对应的依赖到你的 Flink 应用中来使用。例如,如果你想捕获 MySQL 数据库的变更,你可以在你的 Maven 项目的 pom.xml
文件中添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
运行
mvn clean package
./bin/flink run -c com.example.MyFlinkApp /path/to/your/target.jar
然后,你可以在 Flink 程序中使用 Flink CDC 连接器,例如:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("mydatabase") // set captured database
.tableList("mydatabase.mytable") // set captured table
.username("yourusername")
.password("yourpassword")
.deserializer(new SimpleStringSchema()) // converts records to string
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source");
stream.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
或者使用 sql-client
进入 flink 终端
./bin/sql-client.sh embedded
执行下 sql
CREATE TABLE test (`id` bigint,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '*****','port' = '3306','username' = '***','password' = '***','database-name' = '','table-name' = 'test','debezium.snapshot.locking.mode' = 'none');
CREATE TABLE test_index (`id` bigint,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://ip:9200','index' = 'test_index','username' = 'elastic','password' = 'elastic','sink.bulk-flush.max-actions' = '1','sink.bulk-flush.backoff.strategy' = 'CONSTANT','format' = 'json');
INSERT INTO test_index SELECT `id` FROM test;
腾讯云 流计算 Oceanus
https://cloud.tencent.com/product/oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
Debezium + Kafka Connect
Debezium 是一个开源的分布式平台,用于捕获数据库的变更数据(CDC)。它可以与 Apache Kafka 和 Kafka Connect 结合使用,将变更数据捕获并发布到Kafka,然后通过Kafka Connect将数据流入Elasticsearch。
步骤概述:
- 设置并运行一个Kafka集群。
- 配置Debezium来捕获MySQL的变更数据,并将这些变更流式传输到Kafka的一个主题。
- 使用Kafka Connect和其Elasticsearch Sink Connector将数据从Kafka主题同步到Elasticsearch。
Elasticdump
Elasticdump 是一个工具,用于从一个Elasticsearch索引、文件或标准输入导入和导出数据,并可以用来同步MySQL到Elasticsearch,尽管这需要一个中间步骤来转换数据格式。
步骤概述:
- 将MySQL数据导出为JSON格式。
- 使用Elasticdump将JSON数据导入到Elasticsearch。
这种方法适用于不需要实时同步的场景,更适合批量数据迁移或定期同步。
Apache NiFi
Apache NiFi是一个易于使用、强大而灵活的数据流处理和分布式数据路由平台。NiFi提供了直观的用户界面,以便于设计、调度、控制和监控数据流。
步骤概述:
- 使用NiFi的处理器来从MySQL读取数据。
- 配置处理器将数据转换为所需格式并推送到Elasticsearch。
自定义同步脚本
如果你的同步需求非常特定,或者现有工具不能满足你的需求,你可以编写自定义脚本来处理同步。这通常涉及编写一个程序或脚本,使用MySQL和Elasticsearch的客户端库来读取MySQL数据并将其索引到Elasticsearch。
同步效率
如果你的场景需要高度的实时性,复杂的数据处理和分析,且系统可以支持较高的运维复杂性,Flink CDC 可能是更好的选择。如果你需要一个简单、易于部署和维护的解决方案,且主要关注于数据同步和简单转换,Logstash 可能更适合。
- Flink CDC: Flink 是为实时数据处理设计的,具有很强的流处理能力。使用 Flink CDC,你可以实现近乎实时的数据同步。Flink 的流处理模型允许进行复杂的事件驱动处理,适合需要高实时性和复杂事件处理逻辑的场景。
- Logstash: 尽管 Logstash 可以进行实时数据同步,它主要是作为日志数据处理和轻量级的实时处理工具。Logstash 的处理延迟通常比 Flink 高,更适合日志聚合和简单的数据转换任务。
选择合适的同步工具需要考虑数据源类型、同步的实时性需求、目标架构复杂度以及成本等因素。腾讯云提供的这些工具能够满足不同场景下的数据同步需求,帮助用户实现数据的高效、安全同步。
示例
MySQL 同步到 ES
将 MySQL 数据同步到Elasticsearch通常涉及到捕获数据库的变更数据(Change Data Capture, CDC),并将这些变更实时地推送到Elasticsearch。有几种常用的方法可以实现这一目标,包括使用专门的数据同步工具、日志解析以及使用Elasticsearch的 Logstash 插件。
使用 Logstash 结合 JDBC 插件来完成这个任务
以下是如何配置 Logstash 来从 MySQL 数据库读取数据并将其同步到 Elasticsearch 的具体步骤。
Logstash通过JDBC驱动与MySQL数据库通信。你需要下载MySQL的JDBC驱动(通常是一个名为mysql-connector-java-x.x.x.jar
的文件),并将其放在一个你可以记住的位置。
https://dev.mysql.com/downloads/connector/j
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/ryan_test"
jdbc_user => "root"
jdbc_password => "******"
jdbc_driver_library => "/path/mysql-connector-j-9.0.0.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
statement => "SELECT id FROM test"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "mysql-data" # 动态索引名,按日期创建索引
document_id => "%{id}" # 假设你的MySQL数据有一个唯一的ID字段
}
记得提前创建好数据库 ryan_test 以及表 test,执行同步命令后,查看 ES
curl -X GET "http://localhost:9200/mysql-data/_search?pretty"
使用 Flink CDC 同步
日志文件同步到 ES
使用 logstash
input {
file {
path => "path/app.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}" }
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
}
执行同步命令后查看 ES
curl -X GET "http://localhost:9200/app-logs-*/_search?pretty"