This section describes how to use the Maxwell data synchronization tool to migrate offline binlog-based data to an MRS Kafka cluster.
Maxwell is an open source application that reads MySQL binlogs, converts operations, such as addition, deletion, and modification, into a JSON format, and sends them to an output end, such as a console, a file, and Kafka. For details about Maxwell, visit https://maxwells-daemon.io. Maxwell can be deployed on a MySQL server or on other servers that can communicate with MySQL.
Maxwell runs on a Linux server, including EulerOS, Ubuntu, Debian, CentOS, and OpenSUSE. Java 1.8+ must be supported.
The following provides details about data synchronization.
$ vi my.cnf [mysqld] server_id=1 log-bin=master binlog_format=row
mysql> GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'XXXXXX';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'localhost' identified by 'XXXXXX';
mysql> GRANT ALL on maxwell.* to 'maxwell'@'localhost';
cd /opt
tar -zxvf maxwell-XXX.tar.gz
cd maxwell-XXX
If the conf directory exists in the maxwell-XXX folder, configure the config.properties file. For details about the configuration items, see Table 1. If the conf directory does not exist, change config.properties.example in the maxwell-XXX folder to config.properties.
Parameter |
Mandatory |
Description |
Default Value |
---|---|---|---|
user |
Yes |
Name of the user for connecting to MySQL, that is, the user created in 2. |
- |
password |
Yes |
Password for connecting to MySQL |
- |
host |
No |
MySQL address |
localhost |
port |
No |
MySQL port |
3306 |
log_level |
No |
Log print level. The options are as follows:
|
info |
output_ddl |
No |
Whether to send a DDL (modified based on definitions of the database and data table) event
|
false |
producer |
Yes |
Producer type. Set this parameter to kafka.
|
stdout |
producer_partition_by |
No |
Partition policy used to ensure that data of the same type is written to the same partition of Kafka.
|
databa |
ignore_producer_error |
No |
Specifies whether to ignore the error that the producer fails to send data.
|
true |
metrics_slf4j_interval |
No |
Interval for outputting statistics on data successfully uploaded or failed to be uploaded to Kafka in logs. The unit is second. |
60 |
kafka.bootstrap.servers |
Yes |
Address of the Kafka proxy node. The value is in the format of HOST:PORT[,HOST:PORT]. |
- |
kafka_topic |
No |
Name of the topic that is written to Kafka |
maxwell |
dead_letter_topic |
No |
Kafka topic used to record the primary key of the error log record when an error occurs when the record is sent |
- |
kafka_version |
No |
Kafka producer version used by Maxwell, which cannot be configured in the config.properties file. You need to use the -- kafka_version xxx parameter to import the version number when starting the command. |
- |
kafka_partition_hash |
No |
Kafka topic partitioning algorithm. The value can be default or murmur3. |
default |
kafka_key_format |
No |
Key generation method of the Kafka record. The value can be array or Hash. |
Hash |
ddl_kafka_topic |
No |
Topic that is written to the DDL operation when output_ddl is set to true |
{kafka_topic} |
filter |
No |
Used to filter databases or tables.
|
- |
cd /opt/maxwell-1.21.0/
For the first time to use Maxwell, you are advised to change log_level in conf/config.properties to debug (debug level) so that you can check whether data can be obtained from MySQL and sent to Kafka after startup. After the entire process is debugged, change log_level to info, and then restart Maxwell for the modification to take effect.
# log level [debug | info | warn | error]
log_level=debug
source /opt/client/bigdata_env
bin/Maxwell
bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
--producer=kafka --kafka.bootstrap.servers=kafkahost:9092 --kafka_topic=Maxwell
In the preceding commands, user, password, and host indicate the username, password, and IP address of MySQL, respectively. You can configure the three parameters by modifying configurations of the configuration items or using the preceding commands. kafkahost indicates the IP address of the Core node in the streaming cluster.
If information similar to the following appears, Maxwell has started successfully:
Success to start Maxwell [78092].
BinlogConnectorLifecycleListener - Binlog connected.
--Creating a database create database test; --Creating a table create table test.e ( id int(10) not null primary key auto_increment, m double, c timestamp(6), comment varchar(255) charset 'latin1' ); -- Adding a record insert into test.e set m = 4.2341, c = now(3), comment = 'I am a creature of light.'; --Updating a record update test.e set m = 5.444, c = now(3) where id = 1; --Deleting a record delete from test.e where id = 1; --Modifying a table alter table test.e add column torvalds bigint unsigned after m; --Deleting a table drop table test.e; -- Deleting a database drop database test;
{"database":"test","table":"e","type":"insert","ts":1541150929,"xid":60556,"commit":true,"data":{"id":1,"m":4.2341,"c":"2018-11-02 09:28:49.297000","comment":"I am a creature of light."}} ......
After the entire process is debugged, you can change the value of log_level in the config.properties file to info to reduce the number of logs to be printed and restart Maxwell for the modification to take effect.
# log level [debug | info | warn | error] log_level=info
ps -ef | grep Maxwell | grep -v grep
kill -9 PID
The data generated by Maxwell is in JSON format. The common fields are described as follows: