SET TIMESTAMP=1527917394/*!*/;BEGIN/*!*/;# at 3751#180602 13:29:54 server id 1 end_log_pos 3819 CRC32 0x8dabdf01 Table_map: `webservice`.`building` mapped to number 74# at 3819#180602 13:29:54 server id 1 end_log_pos 3949 CRC32 0x59a8ed85 Update_rows: table id 74 flags: STMT_END_FBINLOG 'UisSWxMBAAAARAAAAOsOAAAAAEoAAAAAAAEACndlYnNlcnZpY2UACGJ1aWxkaW5nAAYIDwEPEREGwACAAQAAAAHfq40=UisSWx8BAAAAggAAAG0PAAAAAEoAAAAAAAEAAgAG///A1gcAAAAAAAALYnVpbGRpbmctMTAADwB3UkRNbjNLYlV5d1k3ajVbD64WWw+uFsDWBwAAAAAAAAtidWlsZGluZy0xMAEPAHdSRE1uM0tiVXl3WTdqNVsPrhZbD64Whe2oWQ=='/*!*/;### UPDATE `webservice`.`building`### WHERE### @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */### @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */### @3=0 /* TINYINT meta=0 nullable=0 is_null=0 */### @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */### @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */### @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */### SET### @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */### @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */### @3=1 /* TINYINT meta=0 nullable=0 is_null=0 */### @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */### @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */### @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */# at 3949#180602 13:29:54 server id 1 end_log_pos 3980 CRC32 0x58226b8f Xid = 182COMMIT/*!*/;
SET TIMESTAMP=1527919329/*!*/;update building set Status=1 where Id=2000/*!*/;# at 688#180602 14:02:09 server id 1 end_log_pos 719 CRC32 0x4c550a7d Xid = 200COMMIT/*!*/;
mysqldump is a tool for exporting full data from a MySQL database. It can be used in the following way:mysqldump -uelastic -p'Elastic_123' --host=172.16.32.5 -F webservice > dump.sql
database:webservice from the remote database 172.16.32.5:3306 and write to the dump.sql file. The -F parameter indicates to generate a new binlog file after exporting the data to log all subsequent data operations. The contents of the dump.sql file are as follows:-- MySQL dump 10.13 Distrib 5.6.40, for Linux (x86_64)---- Host: 172.16.32.5 Database: webservice-- -------------------------------------------------------- Server version 5.5.5-10.1.9-MariaDBV1.0R012D002-20171127-1822/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;/*!40101 SET NAMES utf8 */;/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;/*!40103 SET TIME_ZONE='+00:00' */;/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;---- Table structure for table `building`--DROP TABLE IF EXISTS `building`;/*!40101 SET @saved_cs_client = @@character_set_client */;/*!40101 SET character_set_client = utf8 */;CREATE TABLE `building` (`Id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',`BuildingId` varchar(64) NOT NULL COMMENT 'Virtual building ID',`Status` tinyint(4) NOT NULL DEFAULT '0' COMMENT 'Virtual building status. 0: processing; 1: normal; -1: stopped; -2: terminating; -3: terminated',`BuildingName` varchar(128) NOT NULL DEFAULT '' COMMENT 'Virtual building name',`CreateTime` timestamp NOT NULL DEFAULT '2017-12-03 16:00:00' COMMENT 'Creation time',`UpdateTime` timestamp NOT NULL DEFAULT '2017-12-03 16:00:00' COMMENT 'Update time',PRIMARY KEY (`Id`),UNIQUE KEY `BuildingId` (`BuildingId`)) ENGINE=InnoDB AUTO_INCREMENT=2010 DEFAULT CHARSET=utf8 COMMENT='Virtual building table';/*!40101 SET character_set_client = @saved_cs_client */;---- Dumping data for table `building`--LOCK TABLES `building` WRITE;/*!40000 ALTER TABLE `building` DISABLE KEYS */;INSERT INTO `building` VALUES (2000,'building-2',0,'6YFcmntKrNBIeTA','2018-05-30 13:28:31','2018-05-30 13:28:31'),(2001,'building-4',0,'4rY8PcVUZB1vtrL','2018-05-30 13:28:34','2018-05-30 13:28:34'),(2002,'building-5',0,'uyjHVUYrg9KeGqi','2018-05-30 13:28:37','2018-05-30 13:28:37'),(2003,'building-7',0,'DNhyEBO4XEkXpgW','2018-05-30 13:28:40','2018-05-30 13:28:40'),(2004,'building-1',0,'TmtYX6ZC0RNB4Re','2018-05-30 13:28:43','2018-05-30 13:28:43'),(2005,'building-6',0,'t8YQcjeXefWpcyU','2018-05-30 13:28:49','2018-05-30 13:28:49'),(2006,'building-10',0,'WozgBc2IchNyKyE','2018-05-30 13:28:55','2018-05-30 13:28:55'),(2007,'building-3',0,'yJk27cmLOVQLHf1','2018-05-30 13:28:58','2018-05-30 13:28:58'),(2008,'building-9',0,'RSbjotAh8tymfxs','2018-05-30 13:29:04','2018-05-30 13:29:04'),(2009,'building-8',0,'IBOMlhaXV6k226m','2018-05-30 13:29:31','2018-05-30 13:29:31');/*!40000 ALTER TABLE `building` ENABLE KEYS */;UNLOCK TABLES;/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;-- Dump completed on 2018-06-02 14:23:51
go-mysql-elasticsearch is an open-source tool for syncing MySQL data to an ES cluster. For more information, see its GitHub page.go-mysql-elasticsearch works. When you launch it for the first time, use the mysqldump tool to perform a full sync of the source MySQL database first and write the data to ES through the Elasticsearch client; then, implement a MySQL client as the slave, connect it to the source MySQL database which, as the master, will sync all data updates to the slave through binlog events. By parsing such events, the updated contents of the data can be obtained and written to ES.GRANT REPLICATION SLAVE ON *.* TO 'elastic'@'172.16.32.44';GRANT RELOAD ON *.* TO 'elastic'@'172.16.32.44';
GOPATH environment variable.go get github.com/siddontang/go-mysql-elasticsearch command.cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch command.make command to start compiling. After the compilation is successful, an executable file named go-mysql-elasticsearch will be generated in the go-mysql-elasticsearch/bin directory.vi etc/river.toml command to modify the configuration file so as to sync the webservice.building table in the 172.16.0.101:3306 database to the building index of the 172.16.32.64:9200 ES cluster. (For detailed description of the configuration file, see the project documentation.) # MySQL address, user and password# user must have replication privilege in MySQL.my_addr = "172.16.0.101:3306"my_user = "bellen"my_pass = "Elastic_123"my_charset = "utf8"# Set true when elasticsearch use https#es_https = false# Elasticsearch addresses_addr = "172.16.32.64:9200"# Elasticsearch user and password, maybe set by shield, nginx, or x-packes_user = ""es_pass = ""# Path to store data, like master.info, if not set or empty,# we must use this to support breakpoint resume syncing.# TODO: support other storage, like etcd.data_dir = "./var"# Inner Http status addressstat_addr = "127.0.0.1:12800"# pseudo server id like a slaveserver_id = 1001# mysql or mariadbflavor = "mariadb"# mysqldump execution path# if not set or empty, ignore mysqldump.mysqldump = "mysqldump"# if we have no privilege to use mysqldump with --master-data,# we must skip it.#skip_master_data = false# minimal items to be inserted in one bulkbulk_size = 128# force flush the pending requests if we don't have enough items >= bulk_sizeflush_bulk_time = "200ms"# Ignore table without primary keyskip_no_pk_table = false# MySQL data source[[source]]schema = "webservice"tables = ["building"][[rule]]schema = "webservice"table = "building"index = "building"type = "buildingtype"
./bin/go-mysql-elasticsearch -config=./etc/river.toml command.2018/06/02 16:13:21 INFO create BinlogSyncer with config {1001 mariadb 172.16.0.101 3306 bellen utf8 false false <nil> false false 0 0s 0s 0}2018/06/02 16:13:21 INFO run status http server 127.0.0.1:128002018/06/02 16:13:21 INFO skip dump, use last binlog replication pos (mysql-bin.000001, 120) or GTID %!s(<nil>)2018/06/02 16:13:21 INFO begin to sync binlog from position (mysql-bin.000001, 120)2018/06/02 16:13:21 INFO register slave for master server 172.16.0.101:33062018/06/02 16:13:21 INFO start sync binlog at binlog file (mysql-bin.000001, 120)2018/06/02 16:13:21 INFO rotate to (mysql-bin.000001, 120)2018/06/02 16:13:21 INFO rotate binlog to (mysql-bin.000001, 120)2018/06/02 16:13:21 INFO save position (mysql-bin.000001, 120)
go-mysql-elasticsearch provides the most basic capability to sync data from MySQL to ES in real time. If your business requires more complex features such as modifying the MySQL table structure during operation, you can customize it for secondary development.GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'elastic'@'%' IDENTIFIED BY 'Elastic_123'
KafkaGenericMutationAvroConsumer class that can be directly inherited for use. You can also implement the parsing on your own.git clone https://github.com/mardambey/mypipe.git command../sbt package command.mypipe-runner/src/main/resources/application.conf.mypipe {# Avro schema repository client class nameschema-repo-client = "mypipe.avro.schema.SchemaRepo"# consumers represent sources for mysql binary logsconsumers {localhost {# database "host:port:user:pass" arraysource = "172.16.0.101:3306:elastic:Elastic_123"}}# data producers export data out (stdout, other stores, external services, etc.)producers {kafka-generic {class = "mypipe.kafka.producer.KafkaMutationGenericAvroProducer"}}# pipes join consumers and producerspipes {kafka-generic {enabled = trueconsumers = ["localhost"]producer {kafka-generic {metadata-brokers = "172.16.16.22:9092"}}binlog-position-repo {# saved to a file, this is the default if unspecifiedclass = "mypipe.api.repo.ConfigurableFileBasedBinaryLogPositionRepository"config {file-prefix = "stdout-00" # required if binlog-position-repo is specifiecdata-dir = "/tmp/mypipe/data" # defaults to mypipe.data-dir if not present}}}}}
mypipe-api/src/main/resources/reference.conf by modifying the include-event-condition option to specify the database and table to be synced.include-event-condition = """ db == "webservice" && table =="building" """
topic: webservice_building_generic on the Kafka broker. By default, mypipe uses ${db}_${table}_generic as the topic name to send data to the topic../sbt "project runner" "runMain mypipe.runner.PipeRunner" command.ConsumerRecord(topic=u'webservice_building_generic', partition=0, offset=2, timestamp=None, timestamp_type=None, key=None, value='\\x00\\x01\\x00\\x00\\x14webservice\\x10building\\xcc\\x01\\x02\\x91,\\xae\\xa3fc\\x11\\xe8\\xa1\\xaaRT\\x00Z\\xf9\\xab\\x00\\x00\\x04\\x18BuildingName\\x06xxx\\x14BuildingId\\nId-10\\x00\\x02\\x04Id\\xd4%\\x00', checksum=128384379, serialized_key_size=-1, serialized_value_size=88)
Feedback