context.setSecretId("AKIDfdsfdsfsdt1331431sdfds"); 请填写您从云API获取的secretID。
context.setSecretKey("test111usdfsdfsddsfRkeT"); 请填写您从云API获取的secretKey.
// 在数据迁移服务里面通过数据P阅获取到对应的ip,port,填写到此处
context.setServiceIp("10.66.112.181"); 请填写您从数据订阅配置获取到的IP
context.setServicePort(7507); 请填写您从数据订阅配置获取到的PORT
// 创建消费者
//SubscribeClient client=new DefaultSubscribeClient(context,true);
final DefaultSubscribeClient client = new DefaultSubscribeClient(context);
final Jedis jedis = new Jedis("127.0.0.1", 6379); 请填写您对应的redis主机和端口
final String targetDatabase = "test"; 填写您所要订阅的库名
final String targetTable = "alantest"; 填写您所要订阅的表名,表有2个字段分别是id,name。(id是做了主键)。
// 创建订阅监听者listener
ClusterListener listener = new ClusterListener() {
@Override
public void notify(List<ClusterMessage> messages) throws Exception {
// System.out.println("--------------------:" + messages.size());
for(ClusterMessage m:messages){
DataMessage.Record record = m.getRecord();
//过滤不感兴趣的订阅信息
if(!record.getDbName().equalsIgnoreCase(targetDatabase) || !record.getTablename().equalsIgnoreCase(targetTable)){
//注意:对于不感兴趣的信息也必须Ack
m.ackAsConsumed();
continue;
}
if(record.getOpt() != DataMessage.Record.Type.BEGIN && record.getOpt() != DataMessage.Record.Type.COMMIT){
List<DataMessage.Record.Field> fields = record.getFieldList();
//INSERT RECORD
//String pk = record.getPrimaryKeys();
if(record.getOpt() == DataMessage.Record.Type.INSERT){
String keyid="";
String value="";
for (DataMessage.Record.Field field : fields) {
//先获取id值,需要有primary key,然后找到名为name的列,赋值给redis 插入key和name对应的value.
if(field.getFieldname().equalsIgnoreCase("id")){
keyid=field.getValue();
continue;
}
if(field.getFieldname().equalsIgnoreCase("name")){
value=field.getValue();
}
jedis.set(keyid, value);
}
}
本页内容是否解决了您的问题?