diff --git a/.idea/compiler.xml b/.idea/compiler.xml index c06a160..5bce655 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -7,11 +7,11 @@ - + - + diff --git a/.idea/dictionaries/sss.xml b/.idea/dictionaries/sss.xml index 8f2a077..031bbe4 100644 --- a/.idea/dictionaries/sss.xml +++ b/.idea/dictionaries/sss.xml @@ -1,10 +1,15 @@ + adconf creatives finchley getor javaedge + kaiping + pian + shishusheng + tiepian yyyy diff --git a/README.md b/README.md index b77de06..60d2e84 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,4 @@ -# 0 联系我 -![](http://upload-images.jianshu.io/upload_images/4685968-6a8b28d2fd95e8b7?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240 "图片标题") -1.[Java开发技术交流Q群](https://jq.qq.com/?_wv=1027&k=5UB4P1T) - -2.[完整博客链接](http://www.shishusheng.com) - -3.[个人知乎](http://www.zhihu.com/people/shi-shu-sheng-) - -4.[gayhub](https://github.com/Wasabi1234) - -# SpringCloud+Kafka+MySQL实现微服务架构下的广告系统 -掌握了广告系统,就是掌握了很多互联网公司的收入核心,你自然也就是公司的核心! +# 基于微服务架构的广告系统 # 环境参数 JDK 1.8 @@ -17,29 +6,5 @@ SpringCloud: Finchley.RELEASE Kafka 2.0 Maven 3.5.0 MySQL 5.7 -IDE IntelliJ IDEA: 2019.1 - -# 1 博客教程 -## 1.1 [项目简介](https://www.jianshu.com/p/69a703b080b6) -广告系统的介绍、所使用的技术介绍、学习规划,广告系统的思想、广告系统的技术实现架构、准备工作和广告系统的代码目录结构等 - -## 1.2 [脚手架开发](https://www.jianshu.com/p/5504269507f1) -广告系统使用SpringCloud微服务框架开发,并使用Maven做多模块管理。该部分完成项目骨架的开发,包括搭建注册中心和服务网关,同时也会对Maven的重要特性做介绍 - -## 1.3 [微服务通用模块开发](https://www.jianshu.com/p/e3c2b9e729dd) -实现广告系统微服务通用的功能,例如:统一响应格式、全局异常处理、通用代码定义、通用配置定义等 - -## 1.4 [广告投放系统的开发](https://www.jianshu.com/p/bfde2ed70106) -实现投放系统,投放系统即实现对广告数据的存储. -会对表结构设计进行介绍,使用 JPA 实现对各个数据表的增删改查。同时,由于会涉及Spring相关的知识点(Spring IOC、Spring MVC、SpringBoot),也会对这些做基础介绍 - -## 1.5 [广告检索系统 - 微服务调用](https://www.jianshu.com/p/9c62c0fbd139) -介绍使用 Ribbon 与 Feign 组件实现微服务之间的调用 - -## 1.6 [广告检索系统 - 广告数据索引的设计与实现](https://www.jianshu.com/p/685f1a5aafd0) -广告检索系统的核心是实现广告检索服务,为加快广告检索的速度,良好的索引设计是不可缺少的。本章首先对索引的设计与维护进行介绍,之后,实现广告数据的索引服务。 - -## 1.7 [广告检索系统 - 加载全量索引]() +IDE IntelliJ IDEA: 2018.3 -![](https://upload-images.jianshu.io/upload_images/4685968-c88d77d6f38a771b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) -![](https://upload-images.jianshu.io/upload_images/4685968-091b54b1fa8b5748.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) diff --git a/ad-gateway/src/main/resources/application.yml b/ad-gateway/src/main/resources/application.yml index deab9cf..9da5a10 100755 --- a/ad-gateway/src/main/resources/application.yml +++ b/ad-gateway/src/main/resources/application.yml @@ -14,4 +14,8 @@ zuul: sponsor: path: /ad-sponsor/** serviceId: eureka-client-ad-sponsor - strip-prefix: false \ No newline at end of file + strip-prefix: false + search: + path: /ad-search/** + serviceId: eureka-client-ad-search + strip-prefix: false diff --git a/javaedge-ad-service/ad-common/src/main/java/com/javaedge/ad/dump/table/AdPlanTable.java b/javaedge-ad-service/ad-common/src/main/java/com/javaedge/ad/dump/table/AdPlanTable.java index d9af2e5..01c3113 100644 --- a/javaedge-ad-service/ad-common/src/main/java/com/javaedge/ad/dump/table/AdPlanTable.java +++ b/javaedge-ad-service/ad-common/src/main/java/com/javaedge/ad/dump/table/AdPlanTable.java @@ -18,6 +18,10 @@ public class AdPlanTable { private Long id; private Long userId; private Integer planStatus; + + /** + * 对于 Date 类型,由于将 binlog 转化为了字符串,需搞清楚 Date 的字符串表达形式 + */ private Date startDate; private Date endDate; } diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/CommonStatus.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/CommonStatus.java new file mode 100644 index 0000000..a20ed8e --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/CommonStatus.java @@ -0,0 +1,22 @@ +package com.javaedge.ad.index; + +import lombok.Getter; + +/** + * @author sss + * @date 2019/2/27 + */ +@Getter +public enum CommonStatus { + + VALID(1, "有效状态"), + INVALID(0, "无效状态"); + + private Integer status; + private String desc; + + CommonStatus(Integer status, String desc) { + this.status = status; + this.desc = desc; + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/DataLevel.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/DataLevel.java new file mode 100755 index 0000000..c6c5b2a --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/DataLevel.java @@ -0,0 +1,23 @@ +package com.javaedge.ad.index; + +import lombok.Getter; + +/** + * @author sss + * @date 2019-02-22 + */ +@Getter +public enum DataLevel { + + LEVEL2("2", "level 2"), + LEVEL3("3", "level 3"), + LEVEL4("4", "level 4"); + + private String level; + private String desc; + + DataLevel(String level, String desc) { + this.level = level; + this.desc = desc; + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/adunit/AdUnitConstants.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/adunit/AdUnitConstants.java new file mode 100755 index 0000000..3b015f6 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/adunit/AdUnitConstants.java @@ -0,0 +1,36 @@ +package com.javaedge.ad.index.adunit; + +/** + * @author sss + * @date 2019-02-26 + */ +public class AdUnitConstants { + + public static class POSITION_TYPE { + + /** + * 开屏广告位 + */ + public static final int KAIPING = 1; + + /** + * 贴片广告 + */ + public static final int TIEPIAN = 2; + + /** + * 视频播放中显示的广告 + */ + public static final int TIEPIAN_MIDDLE = 4; + + /** + * 视频播放暂停时广告 + */ + public static final int TIEPIAN_PAUSE = 8; + + /** + * 视频播放后的广告位 + */ + public static final int TIEPIAN_POST = 16; + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/adunit/AdUnitIndex.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/adunit/AdUnitIndex.java index fb877e3..d4d1b62 100644 --- a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/adunit/AdUnitIndex.java +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/adunit/AdUnitIndex.java @@ -2,9 +2,10 @@ import com.javaedge.ad.index.IndexAware; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.springframework.stereotype.Component; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** @@ -21,6 +22,42 @@ public class AdUnitIndex implements IndexAware { objectMap = new ConcurrentHashMap<>(); } + + public Set match(Integer positionType) { + + Set adUnitIds = new HashSet<>(); + + objectMap.forEach((k, v) -> { + if (AdUnitObject.isAdSlotTypeOK(positionType, + v.getPositionType())) { + adUnitIds.add(k); + } + }); + + return adUnitIds; + } + + public List fetch(Collection adUnitIds) { + + if (CollectionUtils.isEmpty(adUnitIds)) { + return Collections.emptyList(); + } + + List result = new ArrayList<>(); + + adUnitIds.forEach(u -> { + AdUnitObject object = get(u); + if (object == null) { + log.error("AdUnitObject not found: {}", u); + return; + } + result.add(object); + }); + + return result; + } + + @Override public AdUnitObject get(Long key) { return objectMap.get(key); diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/adunit/AdUnitObject.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/adunit/AdUnitObject.java index 6fe69dd..d39caa0 100644 --- a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/adunit/AdUnitObject.java +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/adunit/AdUnitObject.java @@ -40,4 +40,42 @@ void update(AdUnitObject newObject) { this.adPlanObject = newObject.getAdPlanObject(); } } + + private static boolean isKaiPing(int positionType) { + return (positionType & AdUnitConstants.POSITION_TYPE.KAIPING) > 0; + } + + private static boolean isTiePian(int positionType) { + return (positionType & AdUnitConstants.POSITION_TYPE.TIEPIAN) > 0; + } + + private static boolean isTiePianMiddle(int positionType) { + return (positionType & AdUnitConstants.POSITION_TYPE.TIEPIAN_MIDDLE) > 0; + } + + private static boolean isTiePianPause(int positionType) { + return (positionType & AdUnitConstants.POSITION_TYPE.TIEPIAN_PAUSE) > 0; + } + + private static boolean isTiePianPost(int positionType) { + return (positionType & AdUnitConstants.POSITION_TYPE.TIEPIAN_POST) > 0; + } + + public static boolean isAdSlotTypeOK(int adSlotType, int positionType) { + + switch (adSlotType) { + case AdUnitConstants.POSITION_TYPE.KAIPING: + return isKaiPing(positionType); + case AdUnitConstants.POSITION_TYPE.TIEPIAN: + return isTiePian(positionType); + case AdUnitConstants.POSITION_TYPE.TIEPIAN_MIDDLE: + return isTiePianMiddle(positionType); + case AdUnitConstants.POSITION_TYPE.TIEPIAN_PAUSE: + return isTiePianPause(positionType); + case AdUnitConstants.POSITION_TYPE.TIEPIAN_POST: + return isTiePianPost(positionType); + default: + return false; + } + } } diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/creative/CreativeIndex.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/creative/CreativeIndex.java index 0132623..296bfa6 100644 --- a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/creative/CreativeIndex.java +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/creative/CreativeIndex.java @@ -27,7 +27,6 @@ public class CreativeIndex implements IndexAware { } public List fetch(Collection adIds) { - if (CollectionUtils.isEmpty(adIds)) { return Collections.emptyList(); } @@ -40,10 +39,8 @@ public List fetch(Collection adIds) { log.error("CreativeObject not found: {}", u); return; } - result.add(object); }); - return result; } @@ -54,7 +51,6 @@ public CreativeObject get(Long key) { @Override public void add(Long key, CreativeObject value) { - log.info("before add: {}", objectMap); objectMap.put(key, value); log.info("after add: {}", objectMap); @@ -62,7 +58,6 @@ public void add(Long key, CreativeObject value) { @Override public void update(Long key, CreativeObject value) { - log.info("before update: {}", objectMap); CreativeObject oldObject = objectMap.get(key); @@ -77,7 +72,6 @@ public void update(Long key, CreativeObject value) { @Override public void delete(Long key, CreativeObject value) { - log.info("before delete: {}", objectMap); objectMap.remove(key); log.info("after delete: {}", objectMap); diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/creativeunit/CreativeUnitIndex.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/creativeunit/CreativeUnitIndex.java index fb36102..06a3de3 100644 --- a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/creativeunit/CreativeUnitIndex.java +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/creativeunit/CreativeUnitIndex.java @@ -50,7 +50,6 @@ public CreativeUnitObject get(String key) { @Override public void add(String key, CreativeUnitObject value) { - log.info("before add: {}", objectMap); objectMap.put(key, value); @@ -74,13 +73,11 @@ public void add(String key, CreativeUnitObject value) { @Override public void update(String key, CreativeUnitObject value) { - log.error("CreativeUnitIndex not support update"); } @Override public void delete(String key, CreativeUnitObject value) { - log.info("before delete: {}", objectMap); objectMap.remove(key); @@ -99,7 +96,6 @@ public void delete(String key, CreativeUnitObject value) { } public List selectAds(List unitObjects) { - if (CollectionUtils.isEmpty(unitObjects)) { return Collections.emptyList(); } @@ -107,7 +103,6 @@ public List selectAds(List unitObjects) { List result = new ArrayList<>(); for (AdUnitObject unitObject : unitObjects) { - Set adIds = unitCreativeMap.get(unitObject.getUnitId()); if (CollectionUtils.isNotEmpty(adIds)) { result.addAll(adIds); diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/district/UnitDistrictIndex.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/district/UnitDistrictIndex.java index d033fda..30c624c 100644 --- a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/district/UnitDistrictIndex.java +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/district/UnitDistrictIndex.java @@ -1,6 +1,7 @@ package com.javaedge.ad.index.district; import com.javaedge.ad.index.IndexAware; +import com.javaedge.ad.search.vo.feature.DistrictFeature; import com.javaedge.ad.utils.CommonUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; @@ -39,16 +40,19 @@ public void add(String key, Set value) { log.info("UnitDistrictIndex, before add: {}", unitDistrictMap); + changeADD(key, value, districtUnitMap, unitDistrictMap); + + log.info("UnitDistrictIndex, after add: {}", unitDistrictMap); + } + + public static void changeADD(String key, Set value, Map> districtUnitMap, Map> unitDistrictMap) { Set unitIds = CommonUtils.getorCreate(key, districtUnitMap, ConcurrentSkipListSet::new); unitIds.addAll(value); for (Long unitId : value) { - Set districts = CommonUtils.getorCreate(unitId, unitDistrictMap, ConcurrentSkipListSet::new); districts.add(key); } - - log.info("UnitDistrictIndex, after add: {}", unitDistrictMap); } @Override @@ -62,6 +66,12 @@ public void delete(String key, Set value) { log.info("UnitDistrictIndex, before delete: {}", unitDistrictMap); + changeRM(key, value, districtUnitMap, unitDistrictMap); + + log.info("UnitDistrictIndex, after delete: {}", unitDistrictMap); + } + + public static void changeRM(String key, Set value, Map> districtUnitMap, Map> unitDistrictMap) { Set unitIds = CommonUtils.getorCreate(key, districtUnitMap, ConcurrentSkipListSet::new); unitIds.removeAll(value); @@ -69,27 +79,25 @@ public void delete(String key, Set value) { Set districts = CommonUtils.getorCreate(unitId, unitDistrictMap, ConcurrentSkipListSet::new); districts.remove(key); } - - log.info("UnitDistrictIndex, after delete: {}", unitDistrictMap); } -// public boolean match(Long adUnitId, List districts) { -// -// if (unitDistrictMap.containsKey(adUnitId) && -// CollectionUtils.isNotEmpty(unitDistrictMap.get(adUnitId))) { -// -// Set unitDistricts = unitDistrictMap.get(adUnitId); -// -// List targetDistricts = districts.stream() -// .map( -// d -> CommonUtils.stringConcat( -// d.getProvince(), d.getCity() -// ) -// ).collect(Collectors.toList()); -// -// return CollectionUtils.isSubCollection(targetDistricts, unitDistricts); -// } -// -// return false; -// } + public boolean match(Long adUnitId, List districts) { + + if (unitDistrictMap.containsKey(adUnitId) && + CollectionUtils.isNotEmpty(unitDistrictMap.get(adUnitId))) { + + Set unitDistricts = unitDistrictMap.get(adUnitId); + + List targetDistricts = districts.stream() + .map( + d -> CommonUtils.stringConcat( + d.getProvince(), d.getCity() + ) + ).collect(Collectors.toList()); + + return CollectionUtils.isSubCollection(targetDistricts, unitDistricts); + } + + return false; + } } diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/interest/UnitItIndex.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/interest/UnitItIndex.java index f23f6ee..90156c4 100644 --- a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/interest/UnitItIndex.java +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/interest/UnitItIndex.java @@ -1,6 +1,7 @@ package com.javaedge.ad.index.interest; import com.javaedge.ad.index.IndexAware; +import com.javaedge.ad.index.district.UnitDistrictIndex; import com.javaedge.ad.utils.CommonUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; @@ -45,20 +46,7 @@ public void add(String key, Set value) { log.info("UnitItIndex, before add: {}", unitItMap); - Set unitIds = CommonUtils.getorCreate( - key, itUnitMap, - ConcurrentSkipListSet::new - ); - unitIds.addAll(value); - - for (Long unitId : value) { - - Set its = CommonUtils.getorCreate( - unitId, unitItMap, - ConcurrentSkipListSet::new - ); - its.add(key); - } + UnitDistrictIndex.changeADD(key, value, itUnitMap, unitItMap); log.info("UnitItIndex, after add: {}", unitItMap); } @@ -75,17 +63,7 @@ public void delete(String key, Set value) { log.info("UnitItIndex, before delete: {}", unitItMap); // 先取出对应的推广 单元id - Set unitIds = CommonUtils.getorCreate(key, itUnitMap, ConcurrentSkipListSet::new); - - // 再对单元 ids 进行 rm 操作,考虑到部分删除,不是直接对 itunitmap 删除 key 对应值 - // 因为 key 所对应 推广单元的 ids 并不一定全量,我们要实现可删除部分兴趣 - // eg. 一个 ittag 对应到推广单元的1,2,3,我们可以删除1,2 而不是3.所以先取再删. - unitIds.removeAll(value); - - for (Long unitId : value) { - Set itTagSet = CommonUtils.getorCreate(unitId, unitItMap, ConcurrentSkipListSet::new); - itTagSet.remove(key); - } + UnitDistrictIndex.changeRM(key, value, itUnitMap, unitItMap); log.info("UnitItIndex, after delete: {}", unitItMap); } diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/keyword/UnitKeywordIndex.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/keyword/UnitKeywordIndex.java index 47f8c04..7cf3adf 100644 --- a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/keyword/UnitKeywordIndex.java +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/index/keyword/UnitKeywordIndex.java @@ -1,6 +1,7 @@ package com.javaedge.ad.index.keyword; import com.javaedge.ad.index.IndexAware; +import com.javaedge.ad.index.district.UnitDistrictIndex; import com.javaedge.ad.utils.CommonUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; @@ -49,17 +50,7 @@ public void add(String key, Set value) { log.info("UnitKeywordIndex, before add: {}", unitKeywordMap); - Set unitIdSet = CommonUtils.getorCreate(key, keywordUnitMap, ConcurrentSkipListSet::new); - - unitIdSet.addAll(value); - - for (Long unitId : value) { - Set keywordSet = CommonUtils.getorCreate( - unitId, unitKeywordMap, - ConcurrentSkipListSet::new - ); - keywordSet.add(key); - } + UnitDistrictIndex.changeADD(key, value, keywordUnitMap, unitKeywordMap); log.info("UnitKeywordIndex, after add: {}", unitKeywordMap); } @@ -82,16 +73,7 @@ public void delete(String key, Set value) { log.info("UnitKeywordIndex, before delete: {}", unitKeywordMap); - Set unitIds = CommonUtils.getorCreate(key, keywordUnitMap, ConcurrentSkipListSet::new); - - unitIds.removeAll(value); - - for (Long unitId : value) { - - Set keywordSet = CommonUtils.getorCreate(unitId, unitKeywordMap, ConcurrentSkipListSet::new); - - keywordSet.remove(key); - } + UnitDistrictIndex.changeRM(key, value, keywordUnitMap, unitKeywordMap); log.info("UnitKeywordIndex, after delete: {}", unitKeywordMap); } diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/BinlogClient.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/BinlogClient.java new file mode 100755 index 0000000..89ea883 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/BinlogClient.java @@ -0,0 +1,67 @@ +package com.javaedge.ad.mysql; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.javaedge.ad.mysql.listener.AggregationListener; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * @author sss + * @date 2019-02-25 + */ +@Slf4j +@Component +public class BinlogClient { + + private BinaryLogClient client; + + private final BinlogConfig config; + private final AggregationListener listener; + + @Autowired + public BinlogClient(BinlogConfig config, AggregationListener listener) { + this.config = config; + this.listener = listener; + } + + public void connect() { + + new Thread(() -> { + client = new BinaryLogClient( + config.getHost(), + config.getPort(), + config.getUsername(), + config.getPassword() + ); + + if (!StringUtils.isEmpty(config.getBinlogName()) && + !config.getPosition().equals(-1L)) { + client.setBinlogFilename(config.getBinlogName()); + client.setBinlogPosition(config.getPosition()); + } + + client.registerEventListener(listener); + + try { + log.info("connecting to mysql start"); + client.connect(); + log.info("connecting to mysql done"); + } catch (IOException ex) { + ex.printStackTrace(); + } + + }).start(); + } + + public void close() { + try { + client.disconnect(); + } catch (IOException ex) { + ex.printStackTrace(); + } + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/BinlogConfig.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/BinlogConfig.java new file mode 100755 index 0000000..c191742 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/BinlogConfig.java @@ -0,0 +1,27 @@ +package com.javaedge.ad.mysql; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * @author sss + * @date 2019-02-25 + */ +@Component +@ConfigurationProperties(prefix = "adconf.mysql") +@Data +@NoArgsConstructor +@AllArgsConstructor +public class BinlogConfig { + + private String host; + private Integer port; + private String username; + private String password; + + private String binlogName; + private Long position; +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/TemplateHolder.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/TemplateHolder.java new file mode 100755 index 0000000..5303cb8 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/TemplateHolder.java @@ -0,0 +1,102 @@ +package com.javaedge.ad.mysql; + +import com.alibaba.fastjson.JSON; +import com.javaedge.ad.mysql.constant.OpType; +import com.javaedge.ad.mysql.dto.ParseTemplate; +import com.javaedge.ad.mysql.dto.TableTemplate; +import com.javaedge.ad.mysql.dto.Template; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + +/** + * @author sss + * @date 2019-02-25 + */ +@Slf4j +@Component +public class TemplateHolder { + + private ParseTemplate template; + private final JdbcTemplate jdbcTemplate; + + private String SQL_SCHEMA = "select table_schema, table_name, " + + "column_name, ordinal_position from information_schema.columns " + + "where table_schema = ? and table_name = ?"; + + @Autowired + public TemplateHolder(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + } + + @PostConstruct + private void init() { + loadJson("template.json"); + } + + public TableTemplate getTable(String tableName) { + return template.getTableTemplateMap().get(tableName); + } + + private void loadJson(String path) { + + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + InputStream inStream = cl.getResourceAsStream(path); + + try { + Template template = JSON.parseObject( + inStream, + Charset.defaultCharset(), + Template.class + ); + this.template = ParseTemplate.parse(template); + loadMeta(); + } catch (IOException ex) { + log.error(ex.getMessage()); + throw new RuntimeException("fail to parse json file"); + } + } + + private void loadMeta() { + + for (Map.Entry entry : + template.getTableTemplateMap().entrySet()) { + + TableTemplate table = entry.getValue(); + + List updateFields = table.getOpTypeFieldSetMap().get( + OpType.UPDATE + ); + List insertFields = table.getOpTypeFieldSetMap().get( + OpType.ADD + ); + List deleteFields = table.getOpTypeFieldSetMap().get( + OpType.DELETE + ); + + jdbcTemplate.query(SQL_SCHEMA, new Object[]{ + template.getDatabase(), table.getTableName() + }, (rs, i) -> { + + int pos = rs.getInt("ORDINAL_POSITION"); + String colName = rs.getString("COLUMN_NAME"); + + if ((null != updateFields && updateFields.contains(colName)) + || (null != insertFields && insertFields.contains(colName)) + || (null != deleteFields && deleteFields.contains(colName))) { + table.getPosMap().put(pos - 1, colName); + } + + return null; + }); + } + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/constant/Constant.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/constant/Constant.java new file mode 100755 index 0000000..8d84b5e --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/constant/Constant.java @@ -0,0 +1,95 @@ +package com.javaedge.ad.mysql.constant; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author sss + * @date 2019-02-25 + */ +public class Constant { + + private static final String DB_NAME = "javaedge_ad_data"; + + public static class AD_PLAN_TABLE_INFO { + + public static final String TABLE_NAME = "ad_plan"; + + public static final String COLUMN_ID = "id"; + public static final String COLUMN_USER_ID = "user_id"; + public static final String COLUMN_PLAN_STATUS = "plan_status"; + public static final String COLUMN_START_DATE = "start_date"; + public static final String COLUMN_END_DATE = "end_date"; + } + + public static class AD_CREATIVE_TABLE_INFO { + + public static final String TABLE_NAME = "ad_creative"; + + public static final String COLUMN_ID = "id"; + public static final String COLUMN_TYPE = "type"; + public static final String COLUMN_MATERIAL_TYPE = "material_type"; + public static final String COLUMN_HEIGHT = "height"; + public static final String COLUMN_WIDTH = "width"; + public static final String COLUMN_AUDIT_STATUS = "audit_status"; + public static final String COLUMN_URL = "url"; + } + + public static class AD_UNIT_TABLE_INFO { + + public static final String TABLE_NAME = "ad_unit"; + + public static final String COLUMN_ID = "id"; + public static final String COLUMN_UNIT_STATUS = "unit_status"; + public static final String COLUMN_POSITION_TYPE = "position_type"; + public static final String COLUMN_PLAN_ID = "plan_id"; + } + + public static class AD_CREATIVE_UNIT_TABLE_INFO { + + public static final String TABLE_NAME = "creative_unit"; + + public static final String COLUMN_CREATIVE_ID = "creative_id"; + public static final String COLUMN_UNIT_ID = "unit_id"; + } + + public static class AD_UNIT_DISTRICT_TABLE_INFO { + + public static final String TABLE_NAME = "ad_unit_district"; + + public static final String COLUMN_UNIT_ID = "unit_id"; + public static final String COLUMN_PROVINCE = "province"; + public static final String COLUMN_CITY = "city"; + } + + public static class AD_UNIT_IT_TABLE_INFO { + + public static final String TABLE_NAME = "ad_unit_it"; + + public static final String COLUMN_UNIT_ID = "unit_id"; + public static final String COLUMN_IT_TAG = "it_tag"; + } + + public static class AD_UNIT_KEYWORD_TABLE_INFO { + + public static final String TABLE_NAME = "ad_unit_keyword"; + + public static final String COLUMN_UNIT_ID = "unit_id"; + public static final String COLUMN_KEYWORD = "keyword"; + } + + public static Map table2Db; + + static { + + table2Db = new HashMap<>(); + + table2Db.put(AD_PLAN_TABLE_INFO.TABLE_NAME, DB_NAME); + table2Db.put(AD_CREATIVE_TABLE_INFO.TABLE_NAME, DB_NAME); + table2Db.put(AD_UNIT_TABLE_INFO.TABLE_NAME, DB_NAME); + table2Db.put(AD_CREATIVE_UNIT_TABLE_INFO.TABLE_NAME, DB_NAME); + table2Db.put(AD_UNIT_DISTRICT_TABLE_INFO.TABLE_NAME, DB_NAME); + table2Db.put(AD_UNIT_IT_TABLE_INFO.TABLE_NAME, DB_NAME); + table2Db.put(AD_UNIT_KEYWORD_TABLE_INFO.TABLE_NAME, DB_NAME); + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/constant/OpType.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/constant/OpType.java old mode 100644 new mode 100755 index ff3612a..e6f6184 --- a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/constant/OpType.java +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/constant/OpType.java @@ -1,13 +1,30 @@ package com.javaedge.ad.mysql.constant; +import com.github.shyiko.mysql.binlog.event.EventType; + /** * @author sss - * @date 2019-02-12 + * @date 2019-02-25 */ -public enum OpType { +public enum OpType { ADD, UPDATE, DELETE, OTHER; + + public static OpType to(EventType eventType) { + + switch (eventType) { + case EXT_WRITE_ROWS: + return ADD; + case EXT_UPDATE_ROWS: + return UPDATE; + case EXT_DELETE_ROWS: + return DELETE; + + default: + return OTHER; + } + } } diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/BinlogRowData.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/BinlogRowData.java new file mode 100755 index 0000000..b20543f --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/BinlogRowData.java @@ -0,0 +1,25 @@ +package com.javaedge.ad.mysql.dto; + +import com.github.shyiko.mysql.binlog.event.EventType; +import lombok.Data; + +import java.util.List; +import java.util.Map; + +/** + * 日志文件相应的Java对象 + * + * @author sss + * @date 2019-02-25 + */ +@Data +public class BinlogRowData { + + private TableTemplate table; + + private EventType eventType; + + private List> after; + + private List> before; +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/JsonTable.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/JsonTable.java new file mode 100644 index 0000000..41db439 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/JsonTable.java @@ -0,0 +1,35 @@ +package com.javaedge.ad.mysql.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * 表达模板文件中的表 + * + * @author sss + * @date 2019-02-25 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class JsonTable { + + private String tableName; + private Integer level; + + private List insert; + private List update; + private List delete; + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class Column { + + private String column; + } +} + diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/MySqlRowData.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/MySqlRowData.java new file mode 100755 index 0000000..5bcce80 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/MySqlRowData.java @@ -0,0 +1,28 @@ +package com.javaedge.ad.mysql.dto; + +import com.javaedge.ad.mysql.constant.OpType; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * @author sss + * @date 2019-02-25 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class MySqlRowData { + + private String tableName; + + private String level; + + private OpType opType; + + private List> fieldValueMap = new ArrayList<>(); +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/ParseTemplate.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/ParseTemplate.java new file mode 100755 index 0000000..110afd6 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/ParseTemplate.java @@ -0,0 +1,74 @@ +package com.javaedge.ad.mysql.dto; + +import com.javaedge.ad.mysql.constant.OpType; +import lombok.Data; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** + * 解析模板 + * + * @author sss + * @date 2019-02-25 + */ +@Data +public class ParseTemplate { + + private String database; + + private Map tableTemplateMap = new HashMap<>(); + + public static ParseTemplate parse(Template _template) { + + ParseTemplate template = new ParseTemplate(); + template.setDatabase(_template.getDatabase()); + + for (JsonTable table : _template.getTableList()) { + + String name = table.getTableName(); + Integer level = table.getLevel(); + + TableTemplate tableTemplate = new TableTemplate(); + tableTemplate.setTableName(name); + tableTemplate.setLevel(level.toString()); + template.tableTemplateMap.put(name, tableTemplate); + + // 遍历操作类型对应的列 + Map> opTypeFieldSetMap = + tableTemplate.getOpTypeFieldSetMap(); + + for (JsonTable.Column column : table.getInsert()) { + getAndCreateIfNeed( + OpType.ADD, + opTypeFieldSetMap, + ArrayList::new + ).add(column.getColumn()); + } + for (JsonTable.Column column : table.getUpdate()) { + getAndCreateIfNeed( + OpType.UPDATE, + opTypeFieldSetMap, + ArrayList::new + ).add(column.getColumn()); + } + for (JsonTable.Column column : table.getDelete()) { + getAndCreateIfNeed( + OpType.DELETE, + opTypeFieldSetMap, + ArrayList::new + ).add(column.getColumn()); + } + } + + return template; + } + + private static R getAndCreateIfNeed(T key, Map map, + Supplier factory) { + return map.computeIfAbsent(key, k -> factory.get()); + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/TableTemplate.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/TableTemplate.java new file mode 100755 index 0000000..2e0930d --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/TableTemplate.java @@ -0,0 +1,32 @@ +package com.javaedge.ad.mysql.dto; + +import com.javaedge.ad.mysql.constant.OpType; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 表达模板文件中的 表&层级 + * + * @author sss + * @date 2019-02-25 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TableTemplate { + + private String tableName; + private String level; + + private Map> opTypeFieldSetMap = new HashMap<>(); + + /** + * 字段索引 -> 字段名 的映射关系 + * */ + private Map posMap = new HashMap<>(); +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/Template.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/Template.java new file mode 100644 index 0000000..90b8e7d --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/dto/Template.java @@ -0,0 +1,23 @@ +package com.javaedge.ad.mysql.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * 表达整个模板文件 + * + * @author sss + * @date 2019-02-25 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Template { + + private String database; + private List tableList; +} + diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/listener/AggregationListener.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/listener/AggregationListener.java new file mode 100755 index 0000000..19de372 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/listener/AggregationListener.java @@ -0,0 +1,172 @@ +package com.javaedge.ad.mysql.listener; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventData; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import com.javaedge.ad.mysql.TemplateHolder; +import com.javaedge.ad.mysql.dto.BinlogRowData; +import com.javaedge.ad.mysql.dto.TableTemplate; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 监听&解析 Binlog + * + * @author sss + * @date 2019-02-25 + */ +@Slf4j +@Component +public class AggregationListener implements BinaryLogClient.EventListener { + + private String dbName; + private String tableName; + + private Map listenerMap = new HashMap<>(); + + private final TemplateHolder templateHolder; + + @Autowired + public AggregationListener(TemplateHolder templateHolder) { + this.templateHolder = templateHolder; + } + + private String genKey(String dbName, String tableName) { + return dbName + ":" + tableName; + } + + public void register(String _dbName, String _tableName, Ilistener ilistener) { + log.info("register : {}-{}", _dbName, _tableName); + this.listenerMap.put(genKey(_dbName, _tableName), ilistener); + } + + @Override + public void onEvent(Event event) { + + EventType type = event.getHeader().getEventType(); + log.debug("event type: {}", type); + + if (type == EventType.TABLE_MAP) { + TableMapEventData data = event.getData(); + this.tableName = data.getTable(); + this.dbName = data.getDatabase(); + return; + } + + if (type != EventType.EXT_UPDATE_ROWS + && type != EventType.EXT_WRITE_ROWS + && type != EventType.EXT_DELETE_ROWS) { + return; + } + + // 表名和库名是否已经完成填充 + if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tableName)) { + log.error("no meta data event"); + return; + } + + // 找出对应表有兴趣的监听器 + String key = genKey(this.dbName, this.tableName); + Ilistener listener = this.listenerMap.get(key); + if (null == listener) { + log.debug("skip {}", key); + return; + } + + log.info("trigger event: {}", type.name()); + + try { + + BinlogRowData rowData = buildRowData(event.getData()); + if (rowData == null) { + return; + } + + rowData.setEventType(type); + listener.onEvent(rowData); + + } catch (Exception ex) { + ex.printStackTrace(); + log.error(ex.getMessage()); + } finally { + this.dbName = ""; + this.tableName = ""; + } + } + + private List getAfterValues(EventData eventData) { + + if (eventData instanceof WriteRowsEventData) { + return ((WriteRowsEventData) eventData).getRows(); + } + + if (eventData instanceof UpdateRowsEventData) { + return ((UpdateRowsEventData) eventData).getRows().stream() + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + } + + if (eventData instanceof DeleteRowsEventData) { + return ((DeleteRowsEventData) eventData).getRows(); + } + + return Collections.emptyList(); + } + + private BinlogRowData buildRowData(EventData eventData) { + + TableTemplate table = templateHolder.getTable(tableName); + + if (null == table) { + log.warn("table {} not found", tableName); + return null; + } + + List> afterMapList = new ArrayList<>(); + + for (Serializable[] after : getAfterValues(eventData)) { + + Map afterMap = new HashMap<>(); + + int colLen = after.length; + + for (int ix = 0; ix < colLen; ++ix) { + + // 取出当前位置对应的列名 + String colName = table.getPosMap().get(ix); + + // 如果没有则说明不关心这个列 + if (null == colName) { + log.debug("ignore position: {}", ix); + continue; + } + + String colValue = after[ix].toString(); + afterMap.put(colName, colValue); + } + + afterMapList.add(afterMap); + } + + BinlogRowData rowData = new BinlogRowData(); + rowData.setAfter(afterMapList); + rowData.setTable(table); + + return rowData; + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/listener/Ilistener.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/listener/Ilistener.java new file mode 100755 index 0000000..a1c7f8b --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/listener/Ilistener.java @@ -0,0 +1,24 @@ +package com.javaedge.ad.mysql.listener; + +import com.javaedge.ad.mysql.dto.BinlogRowData; + +/** + * 监听器 + * + * @author sss + * @date 2019-02-25 + */ +public interface Ilistener { + + /** + * 对应不同表定义不同的数据更新方法,即注册不同的监听器. + */ + void register(); + + /** + * 事件监听 + * + * @param eventData 对应于源码中的Event对象. + */ + void onEvent(BinlogRowData eventData); +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/listener/IncrementListener.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/listener/IncrementListener.java new file mode 100755 index 0000000..fcdac26 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/mysql/listener/IncrementListener.java @@ -0,0 +1,85 @@ +package com.javaedge.ad.mysql.listener; + +import com.github.shyiko.mysql.binlog.event.EventType; +import com.javaedge.ad.mysql.constant.Constant; +import com.javaedge.ad.mysql.constant.OpType; +import com.javaedge.ad.mysql.dto.BinlogRowData; +import com.javaedge.ad.mysql.dto.MySqlRowData; +import com.javaedge.ad.mysql.dto.TableTemplate; +import com.javaedge.ad.sender.ISender; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author sss + * @date 2019-02-26 + */ +@Slf4j +@Component +public class IncrementListener implements Ilistener { + + @Resource(name = "kafkaSender") + private ISender sender; + + private final AggregationListener aggregationListener; + + @Autowired + public IncrementListener(AggregationListener aggregationListener) { + this.aggregationListener = aggregationListener; + } + + @Override + @PostConstruct + public void register() { + + log.info("IncrementListener register db and table info"); + Constant.table2Db.forEach((k, v) -> + aggregationListener.register(v, k, this)); + } + + @Override + public void onEvent(BinlogRowData eventData) { + + TableTemplate table = eventData.getTable(); + EventType eventType = eventData.getEventType(); + + // 包装成最后需要投递的数据 + MySqlRowData rowData = new MySqlRowData(); + + rowData.setTableName(table.getTableName()); + rowData.setLevel(eventData.getTable().getLevel()); + OpType opType = OpType.to(eventType); + rowData.setOpType(opType); + + // 取出模板中该操作对应的字段列表 + List fieldList = table.getOpTypeFieldSetMap().get(opType); + if (null == fieldList) { + log.warn("{} not support for {}", opType, table.getTableName()); + return; + } + + for (Map afterMap : eventData.getAfter()) { + + Map _afterMap = new HashMap<>(); + + for (Map.Entry entry : afterMap.entrySet()) { + + String colName = entry.getKey(); + String colValue = entry.getValue(); + + _afterMap.put(colName, colValue); + } + + rowData.getFieldValueMap().add(_afterMap); + } + + sender.sender(rowData); + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/runner/BinlogRunner.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/runner/BinlogRunner.java new file mode 100755 index 0000000..94aac3b --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/runner/BinlogRunner.java @@ -0,0 +1,30 @@ +package com.javaedge.ad.runner; + +import com.javaedge.ad.mysql.BinlogClient; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +/** + * @author sss + * @date 2019-02-25 + */ +@Slf4j +@Component +public class BinlogRunner implements CommandLineRunner { + + private final BinlogClient client; + + @Autowired + public BinlogRunner(BinlogClient client) { + this.client = client; + } + + @Override + public void run(String... strings) throws Exception { + + log.info("Coming in BinlogRunner..."); + client.connect(); + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/ISearch.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/ISearch.java new file mode 100644 index 0000000..fbf8246 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/ISearch.java @@ -0,0 +1,18 @@ +package com.javaedge.ad.search; + +import com.javaedge.ad.search.vo.SearchRequest; +import com.javaedge.ad.search.vo.SearchResponse; + +/** + * @author sss + * @date 2019-02-26 + */ +public interface ISearch { + + /** + * + * @param request + * @return + */ + SearchResponse fetchAds(SearchRequest request); +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/controller/SearchController.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/controller/SearchController.java new file mode 100755 index 0000000..d46dc14 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/controller/SearchController.java @@ -0,0 +1,75 @@ +package com.javaedge.ad.search.controller; + +import com.alibaba.fastjson.JSON; +import com.javaedge.ad.annotation.IgnoreResponseAdvice; +import com.javaedge.ad.client.SponsorClient; +import com.javaedge.ad.client.vo.AdPlan; +import com.javaedge.ad.client.vo.AdPlanGetRequest; +import com.javaedge.ad.search.ISearch; +import com.javaedge.ad.search.vo.SearchRequest; +import com.javaedge.ad.search.vo.SearchResponse; +import com.javaedge.ad.vo.CommonResponse; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.client.RestTemplate; + +import java.util.List; + +/** + * @author sss + * @date 2019-02-26 + */ +@Slf4j +@RestController +public class SearchController { + + private final ISearch search; + + private final RestTemplate restTemplate; + + private final SponsorClient sponsorClient; + + @Autowired + public SearchController(RestTemplate restTemplate, + SponsorClient sponsorClient, ISearch search) { + this.restTemplate = restTemplate; + this.sponsorClient = sponsorClient; + this.search = search; + } + + @PostMapping("/fetchAds") + public SearchResponse fetchAds(@RequestBody SearchRequest request) { + + log.info("ad-search: fetchAds -> {}", + JSON.toJSONString(request)); + return search.fetchAds(request); + } + + @IgnoreResponseAdvice + @PostMapping("/getAdPlans") + public CommonResponse> getAdPlans( + @RequestBody AdPlanGetRequest request + ) { + log.info("ad-search: getAdPlans -> {}", + JSON.toJSONString(request)); + return sponsorClient.getAdPlans(request); + } + + @SuppressWarnings("all") + @IgnoreResponseAdvice + @PostMapping("/getAdPlansByRibbon") + public CommonResponse> getAdPlansByRebbon( + @RequestBody AdPlanGetRequest request + ) { + log.info("ad-search: getAdPlansByRibbon -> {}", + JSON.toJSONString(request)); + return restTemplate.postForEntity( + "http://eureka-client-ad-sponsor/ad-sponsor/get/adPlan", + request, + CommonResponse.class + ).getBody(); + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/impl/SearchImpl.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/impl/SearchImpl.java new file mode 100755 index 0000000..3622c4e --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/impl/SearchImpl.java @@ -0,0 +1,250 @@ +package com.javaedge.ad.search.impl; + +import com.alibaba.fastjson.JSON; +import com.javaedge.ad.index.CommonStatus; +import com.javaedge.ad.index.DataTable; +import com.javaedge.ad.index.adunit.AdUnitIndex; +import com.javaedge.ad.index.adunit.AdUnitObject; +import com.javaedge.ad.index.creative.CreativeIndex; +import com.javaedge.ad.index.creative.CreativeObject; +import com.javaedge.ad.index.creativeunit.CreativeUnitIndex; +import com.javaedge.ad.index.district.UnitDistrictIndex; +import com.javaedge.ad.index.interest.UnitItIndex; +import com.javaedge.ad.index.keyword.UnitKeywordIndex; +import com.javaedge.ad.search.ISearch; +import com.javaedge.ad.search.vo.SearchRequest; +import com.javaedge.ad.search.vo.SearchResponse; +import com.javaedge.ad.search.vo.feature.DistrictFeature; +import com.javaedge.ad.search.vo.feature.FeatureRelation; +import com.javaedge.ad.search.vo.feature.ItFeature; +import com.javaedge.ad.search.vo.feature.KeywordFeature; +import com.javaedge.ad.search.vo.media.AdSlot; +import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.stereotype.Service; + +import java.util.*; + +/** + * @author sss + * @date 2019-02-26 + */ +@Slf4j +@Service +public class SearchImpl implements ISearch { + + public SearchResponse fallback(SearchRequest request, Throwable e) { + return null; + } + + @Override + @HystrixCommand(fallbackMethod = "fallback") + public SearchResponse fetchAds(SearchRequest request) { + + // 请求的广告位信息 + List adSlots = request.getRequestInfo().getAdSlots(); + + // 三个 Feature + KeywordFeature keywordFeature = + request.getFeatureInfo().getKeywordFeature(); + DistrictFeature districtFeature = + request.getFeatureInfo().getDistrictFeature(); + ItFeature itFeature = + request.getFeatureInfo().getItFeature(); + + FeatureRelation relation = request.getFeatureInfo().getRelation(); + + // 构造响应对象 + SearchResponse response = new SearchResponse(); + Map> adSlot2Ads = + response.getAdSlot2Ads(); + + for (AdSlot adSlot : adSlots) { + + Set targetUnitIdSet; + + // 根据流量类型获取初始 AdUnit + Set adUnitIdSet = DataTable.of( + AdUnitIndex.class + ).match(adSlot.getPositionType()); + + if (relation == FeatureRelation.AND) { + + filterKeywordFeature(adUnitIdSet, keywordFeature); + filterDistrictFeature(adUnitIdSet, districtFeature); + filterItTagFeature(adUnitIdSet, itFeature); + + targetUnitIdSet = adUnitIdSet; + + } else { + targetUnitIdSet = getORRelationUnitIds( + adUnitIdSet, + keywordFeature, + districtFeature, + itFeature + ); + } + + List unitObjects = + DataTable.of(AdUnitIndex.class).fetch(targetUnitIdSet); + + filterAdUnitAndPlanStatus(unitObjects, CommonStatus.VALID); + + List adIds = DataTable.of(CreativeUnitIndex.class) + .selectAds(unitObjects); + List creatives = DataTable.of(CreativeIndex.class) + .fetch(adIds); + + // 通过 AdSlot 实现对 CreativeObject 的过滤 + filterCreativeByAdSlot( + creatives, + adSlot.getWidth(), + adSlot.getHeight(), + adSlot.getType() + ); + + adSlot2Ads.put( + adSlot.getAdSlotCode(), buildCreativeResponse(creatives) + ); + } + + log.info("fetchAds: {}-{}", + JSON.toJSONString(request), + JSON.toJSONString(response)); + + return response; + } + + private Set getORRelationUnitIds(Set adUnitIdSet, + KeywordFeature keywordFeature, + DistrictFeature districtFeature, + ItFeature itFeature) { + + if (CollectionUtils.isEmpty(adUnitIdSet)) { + return Collections.emptySet(); + } + + Set keywordUnitIdSet = new HashSet<>(adUnitIdSet); + Set districtUnitIdSet = new HashSet<>(adUnitIdSet); + Set itUnitIdSet = new HashSet<>(adUnitIdSet); + + filterKeywordFeature(keywordUnitIdSet, keywordFeature); + filterDistrictFeature(districtUnitIdSet, districtFeature); + filterItTagFeature(itUnitIdSet, itFeature); + + return new HashSet<>( + CollectionUtils.union( + CollectionUtils.union(keywordUnitIdSet, districtUnitIdSet), + itUnitIdSet + ) + ); + } + + private void filterKeywordFeature( + Collection adUnitIds, KeywordFeature keywordFeature) { + + if (CollectionUtils.isEmpty(adUnitIds)) { + return; + } + + if (CollectionUtils.isNotEmpty(keywordFeature.getKeywords())) { + + CollectionUtils.filter( + adUnitIds, + adUnitId -> + DataTable.of(UnitKeywordIndex.class) + .match(adUnitId, + keywordFeature.getKeywords()) + ); + } + } + + private void filterDistrictFeature( + Collection adUnitIds, DistrictFeature districtFeature + ) { + if (CollectionUtils.isEmpty(adUnitIds)) { + return; + } + + if (CollectionUtils.isNotEmpty(districtFeature.getDistricts())) { + + CollectionUtils.filter( + adUnitIds, + adUnitId -> + DataTable.of(UnitDistrictIndex.class) + .match(adUnitId, + districtFeature.getDistricts()) + ); + } + } + + private void filterItTagFeature(Collection adUnitIds, + ItFeature itFeature) { + + if (CollectionUtils.isEmpty(adUnitIds)) { + return; + } + + if (CollectionUtils.isNotEmpty(itFeature.getIts())) { + + CollectionUtils.filter( + adUnitIds, + adUnitId -> + DataTable.of(UnitItIndex.class) + .match(adUnitId, + itFeature.getIts()) + ); + } + } + + private void filterAdUnitAndPlanStatus(List unitObjects, + CommonStatus status) { + + if (CollectionUtils.isEmpty(unitObjects)) { + return; + } + + CollectionUtils.filter( + unitObjects, + object -> object.getUnitStatus().equals(status.getStatus()) + && object.getAdPlanObject().getPlanStatus().equals(status.getStatus()) + ); + } + + private void filterCreativeByAdSlot(List creatives, + Integer width, + Integer height, + List type) { + + if (CollectionUtils.isEmpty(creatives)) { + return; + } + + CollectionUtils.filter( + creatives, + creative -> + creative.getAuditStatus().equals(CommonStatus.VALID.getStatus()) + && creative.getWidth().equals(width) + && creative.getHeight().equals(height) + && type.contains(creative.getType()) + ); + } + + private List buildCreativeResponse( + List creatives + ) { + + if (CollectionUtils.isEmpty(creatives)) { + return Collections.emptyList(); + } + + CreativeObject randomObject = creatives.get( + Math.abs(new Random().nextInt()) % creatives.size() + ); + + return Collections.singletonList( + SearchResponse.convert(randomObject) + ); + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/SearchRequest.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/SearchRequest.java new file mode 100644 index 0000000..346c06f --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/SearchRequest.java @@ -0,0 +1,64 @@ +package com.javaedge.ad.search.vo; + +import com.javaedge.ad.search.vo.feature.DistrictFeature; +import com.javaedge.ad.search.vo.feature.FeatureRelation; +import com.javaedge.ad.search.vo.feature.ItFeature; +import com.javaedge.ad.search.vo.feature.KeywordFeature; +import com.javaedge.ad.search.vo.media.AdSlot; +import com.javaedge.ad.search.vo.media.App; +import com.javaedge.ad.search.vo.media.Device; +import com.javaedge.ad.search.vo.media.Geo; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author sss + * @date 2019-02-26 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class SearchRequest { + + /** + * 媒体方的请求标识 + */ + private String mediaId; + /** + * 请求基本信息 + */ + private RequestInfo requestInfo; + /** + * 匹配信息 + */ + private FeatureInfo featureInfo; + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class RequestInfo { + + private String requestId; + + private List adSlots; + private App app; + private Geo geo; + private Device device; + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class FeatureInfo { + + private KeywordFeature keywordFeature; + private DistrictFeature districtFeature; + private ItFeature itFeature; + + private FeatureRelation relation = FeatureRelation.AND; + } +} + diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/SearchResponse.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/SearchResponse.java new file mode 100755 index 0000000..81b9ab5 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/SearchResponse.java @@ -0,0 +1,60 @@ +package com.javaedge.ad.search.vo; + +import com.javaedge.ad.index.creative.CreativeObject; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author sss + * @date 2019-02-26 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class SearchResponse { + + public Map> adSlot2Ads = new HashMap<>(); + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class Creative { + + private Long adId; + private String adUrl; + private Integer width; + private Integer height; + private Integer type; + private Integer materialType; + + /** + * 展示监测 url + */ + private List showMonitorUrl = + Arrays.asList("www.shishusheng.com", "www.shishusheng.com"); + /** + * 点击监测 url + */ + private List clickMonitorUrl = + Arrays.asList("www.shishusheng.com", "www.shishusheng.com"); + } + + public static Creative convert(CreativeObject object) { + + Creative creative = new Creative(); + creative.setAdId(object.getAdId()); + creative.setAdUrl(object.getAdUrl()); + creative.setWidth(object.getWidth()); + creative.setHeight(object.getHeight()); + creative.setType(object.getType()); + creative.setMaterialType(object.getMaterialType()); + + return creative; + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/feature/DistrictFeature.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/feature/DistrictFeature.java new file mode 100755 index 0000000..84591c8 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/feature/DistrictFeature.java @@ -0,0 +1,28 @@ +package com.javaedge.ad.search.vo.feature; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author sss + * @date 2019-02-26 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class DistrictFeature { + + private List districts; + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class ProvinceAndCity { + + private String province; + private String city; + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/feature/FeatureRelation.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/feature/FeatureRelation.java new file mode 100755 index 0000000..ceca370 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/feature/FeatureRelation.java @@ -0,0 +1,11 @@ +package com.javaedge.ad.search.vo.feature; + +/** + * @author sss + * @date 2019-02-26 + */ +public enum FeatureRelation { + + OR, + AND +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/feature/ItFeature.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/feature/ItFeature.java new file mode 100755 index 0000000..178cded --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/feature/ItFeature.java @@ -0,0 +1,19 @@ +package com.javaedge.ad.search.vo.feature; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author sss + * @date 2019-02-26 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ItFeature { + + private List its; +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/feature/KeywordFeature.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/feature/KeywordFeature.java new file mode 100755 index 0000000..5658ce9 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/feature/KeywordFeature.java @@ -0,0 +1,19 @@ +package com.javaedge.ad.search.vo.feature; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author sss + * @date 2019-02-26 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class KeywordFeature { + + private List keywords; +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/media/AdSlot.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/media/AdSlot.java new file mode 100755 index 0000000..427b7c5 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/media/AdSlot.java @@ -0,0 +1,33 @@ +package com.javaedge.ad.search.vo.media; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author sss + * @date 2019-02-26 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class AdSlot { + + // 广告位编码 + private String adSlotCode; + + // 流量类型 + private Integer positionType; + + // 宽和高 + private Integer width; + private Integer height; + + // 广告物料类型: 图片, 视频 + private List type; + + // 最低出价 + private Integer minCpm; +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/media/App.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/media/App.java new file mode 100755 index 0000000..e7bed54 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/media/App.java @@ -0,0 +1,24 @@ +package com.javaedge.ad.search.vo.media; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author sss + * @date 2019-02-26 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class App { + + // 应用编码 + private String appCode; + // 应用名称 + private String appName; + // 应用包名 + private String packageName; + // activity 名称 + private String activityName; +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/media/Device.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/media/Device.java new file mode 100755 index 0000000..29527cc --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/media/Device.java @@ -0,0 +1,36 @@ +package com.javaedge.ad.search.vo.media; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author sss + * @date 2019-02-26 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Device { + + // 设备 id + private String deviceCode; + + // mac + private String mac; + + // ip + private String ip; + + // 机型编码 + private String model; + + // 分辨率尺寸 + private String displaySize; + + // 屏幕尺寸 + private String screenSize; + + // 设备序列号 + private String serialName; +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/media/Geo.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/media/Geo.java new file mode 100755 index 0000000..a804505 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/search/vo/media/Geo.java @@ -0,0 +1,21 @@ +package com.javaedge.ad.search.vo.media; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author sss + * @date 2019-02-26 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Geo { + + private Float latitude; + private Float longitude; + + private String city; + private String province; +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/sender/ISender.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/sender/ISender.java new file mode 100755 index 0000000..8427baa --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/sender/ISender.java @@ -0,0 +1,17 @@ +package com.javaedge.ad.sender; + +import com.javaedge.ad.mysql.dto.MySqlRowData; + +/** + * @author sss + * @date 2019-02-26 + */ +public interface ISender { + + /** + * 发送消息到 MQ + * + * @param rowData + */ + void sender(MySqlRowData rowData); +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/sender/index/IndexSender.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/sender/index/IndexSender.java new file mode 100644 index 0000000..9647cb2 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/sender/index/IndexSender.java @@ -0,0 +1,263 @@ +package com.javaedge.ad.sender.index; + +import com.alibaba.fastjson.JSON; +import com.javaedge.ad.dump.table.AdCreativeTable; +import com.javaedge.ad.dump.table.AdCreativeUnitTable; +import com.javaedge.ad.dump.table.AdPlanTable; +import com.javaedge.ad.dump.table.AdUnitDistrictTable; +import com.javaedge.ad.dump.table.AdUnitItTable; +import com.javaedge.ad.dump.table.AdUnitKeywordTable; +import com.javaedge.ad.dump.table.AdUnitTable; +import com.javaedge.ad.handler.AdLevelDataHandler; +import com.javaedge.ad.index.DataLevel; +import com.javaedge.ad.mysql.constant.Constant; +import com.javaedge.ad.mysql.dto.MySqlRowData; +import com.javaedge.ad.sender.ISender; +import com.javaedge.ad.utils.CommonUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * @author sss + * @date 2019-02-26 + */ +@Slf4j +@Component("indexSender") +public class IndexSender implements ISender { + + @Override + public void sender(MySqlRowData rowData) { + + String level = rowData.getLevel(); + + if (DataLevel.LEVEL2.getLevel().equals(level)) { + level2RowData(rowData); + } else if (DataLevel.LEVEL3.getLevel().equals(level)) { + Level3RowData(rowData); + } else if (DataLevel.LEVEL4.getLevel().equals(level)) { + level4RowData(rowData); + } else { + log.error("MysqlRowData ERROR: {}", JSON.toJSONString(rowData)); + } + } + + private void level2RowData(MySqlRowData rowData) { + if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) { + List planTables = new ArrayList<>(); + + for (Map fieldValueMap : rowData.getFieldValueMap()) { + AdPlanTable planTable = new AdPlanTable(); + + fieldValueMap.forEach((k, v) -> { + switch (k) { + case Constant.AD_PLAN_TABLE_INFO.COLUMN_ID: + planTable.setId(Long.valueOf(v)); + break; + case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID: + planTable.setUserId(Long.valueOf(v)); + break; + case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS: + planTable.setPlanStatus(Integer.valueOf(v)); + break; + case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE: + planTable.setStartDate( + CommonUtils.parseStringDate(v) + ); + break; + case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE: + planTable.setEndDate( + CommonUtils.parseStringDate(v) + ); + break; + } + }); + + planTables.add(planTable); + } + + planTables.forEach(p -> + AdLevelDataHandler.handleLevel2(p, rowData.getOpType())); + } else if (rowData.getTableName().equals( + Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME + )) { + List creativeTables = new ArrayList<>(); + + for (Map fieldValeMap : + rowData.getFieldValueMap()) { + + AdCreativeTable creativeTable = new AdCreativeTable(); + + fieldValeMap.forEach((k, v) -> { + switch (k) { + case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_ID: + creativeTable.setAdId(Long.valueOf(v)); + break; + case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE: + creativeTable.setType(Integer.valueOf(v)); + break; + case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE: + creativeTable.setMaterialType(Integer.valueOf(v)); + break; + case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT: + creativeTable.setHeight(Integer.valueOf(v)); + break; + case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH: + creativeTable.setWidth(Integer.valueOf(v)); + break; + case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS: + creativeTable.setAuditStatus(Integer.valueOf(v)); + break; + case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL: + creativeTable.setAdUrl(v); + break; + } + }); + + creativeTables.add(creativeTable); + } + + creativeTables.forEach(c -> + AdLevelDataHandler.handleLevel2(c, rowData.getOpType())); + } + } + + private void Level3RowData(MySqlRowData rowData) { + + if (rowData.getTableName().equals( + Constant.AD_UNIT_TABLE_INFO.TABLE_NAME)) { + + List unitTables = new ArrayList<>(); + + for (Map fieldValueMap : + rowData.getFieldValueMap()) { + + AdUnitTable unitTable = new AdUnitTable(); + + fieldValueMap.forEach((k, v) -> { + switch (k) { + case Constant.AD_UNIT_TABLE_INFO.COLUMN_ID: + unitTable.setUnitId(Long.valueOf(v)); + break; + case Constant.AD_UNIT_TABLE_INFO.COLUMN_UNIT_STATUS: + unitTable.setUnitStatus(Integer.valueOf(v)); + break; + case Constant.AD_UNIT_TABLE_INFO.COLUMN_POSITION_TYPE: + unitTable.setPositionType(Integer.valueOf(v)); + break; + case Constant.AD_UNIT_TABLE_INFO.COLUMN_PLAN_ID: + unitTable.setPlanId(Long.valueOf(v)); + break; + } + }); + + unitTables.add(unitTable); + } + + unitTables.forEach(u -> + AdLevelDataHandler.handleLevel3(u, rowData.getOpType())); + } else if (rowData.getTableName().equals( + Constant.AD_CREATIVE_UNIT_TABLE_INFO.TABLE_NAME + )) { + List creativeUnitTables = new ArrayList<>(); + + for (Map fieldValueMap : + rowData.getFieldValueMap()) { + + AdCreativeUnitTable creativeUnitTable = new AdCreativeUnitTable(); + + fieldValueMap.forEach((k, v) -> { + switch (k) { + case Constant.AD_CREATIVE_UNIT_TABLE_INFO.COLUMN_CREATIVE_ID: + creativeUnitTable.setAdId(Long.valueOf(v)); + break; + case Constant.AD_CREATIVE_UNIT_TABLE_INFO.COLUMN_UNIT_ID: + creativeUnitTable.setUnitId(Long.valueOf(v)); + break; + } + }); + + creativeUnitTables.add(creativeUnitTable); + } + + creativeUnitTables.forEach( + u -> AdLevelDataHandler.handleLevel3(u, rowData.getOpType()) + ); + } + } + + private void level4RowData(MySqlRowData rowData) { + switch (rowData.getTableName()) { + case Constant.AD_UNIT_DISTRICT_TABLE_INFO.TABLE_NAME: + List districtTables = new ArrayList<>(); + for (Map fieldValueMap : rowData.getFieldValueMap()) { + AdUnitDistrictTable districtTable = new AdUnitDistrictTable(); + + fieldValueMap.forEach((k, v) -> { + switch (k) { + case Constant.AD_UNIT_DISTRICT_TABLE_INFO.COLUMN_UNIT_ID: + districtTable.setUnitId(Long.valueOf(v)); + break; + case Constant.AD_UNIT_DISTRICT_TABLE_INFO.COLUMN_PROVINCE: + districtTable.setProvince(v); + break; + case Constant.AD_UNIT_DISTRICT_TABLE_INFO.COLUMN_CITY: + districtTable.setCity(v); + break; + } + }); + districtTables.add(districtTable); + } + districtTables.forEach( + d -> AdLevelDataHandler.handleLevel4(d, rowData.getOpType()) + ); + break; + case Constant.AD_UNIT_IT_TABLE_INFO.TABLE_NAME: + List itTables = new ArrayList<>(); + for (Map fieldValueMap : rowData.getFieldValueMap()) { + AdUnitItTable itTable = new AdUnitItTable(); + fieldValueMap.forEach((k, v) -> { + switch (k) { + case Constant.AD_UNIT_IT_TABLE_INFO.COLUMN_UNIT_ID: + itTable.setUnitId(Long.valueOf(v)); + break; + case Constant.AD_UNIT_IT_TABLE_INFO.COLUMN_IT_TAG: + itTable.setItTag(v); + break; + } + }); + itTables.add(itTable); + } + itTables.forEach( + i -> AdLevelDataHandler.handleLevel4(i, rowData.getOpType()) + ); + break; + case Constant.AD_UNIT_KEYWORD_TABLE_INFO.TABLE_NAME: + List keywordTables = new ArrayList<>(); + + for (Map fieldValueMap : rowData.getFieldValueMap()) { + AdUnitKeywordTable keywordTable = new AdUnitKeywordTable(); + + fieldValueMap.forEach((k, v) -> { + switch (k) { + case Constant.AD_UNIT_KEYWORD_TABLE_INFO.COLUMN_UNIT_ID: + keywordTable.setUnitId(Long.valueOf(v)); + break; + case Constant.AD_UNIT_KEYWORD_TABLE_INFO.COLUMN_KEYWORD: + keywordTable.setKeyword(v); + break; + } + }); + keywordTables.add(keywordTable); + } + + keywordTables.forEach( + k -> AdLevelDataHandler.handleLevel4(k, rowData.getOpType()) + ); + break; + } + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/sender/kafka/KafkaSender.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/sender/kafka/KafkaSender.java new file mode 100755 index 0000000..8495e42 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/sender/kafka/KafkaSender.java @@ -0,0 +1,46 @@ +package com.javaedge.ad.sender.kafka; + +import com.alibaba.fastjson.JSON; +import com.javaedge.ad.mysql.dto.MySqlRowData; +import com.javaedge.ad.sender.ISender; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.Optional; + +/** + * @author sss + * @date 2019-02-26 + */ +@Component("kafkaSender") +public class KafkaSender implements ISender { + + @Value("${adconf.kafka.topic}") + private String topic; + + @Resource + private KafkaTemplate kafkaTemplate; + + @Override + public void sender(MySqlRowData rowData) { + kafkaTemplate.send(topic, JSON.toJSONString(rowData)); + } + + @KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search") + public void processMysqlRowData(ConsumerRecord record) { + Optional kafkaMessage = Optional.ofNullable(record.value()); + if (kafkaMessage.isPresent()) { + Object message = kafkaMessage.get(); + MySqlRowData rowData = JSON.parseObject( + message.toString(), + MySqlRowData.class + ); + System.out.println("kafka processMysqlRowData: " + + JSON.toJSONString(rowData)); + } + } +} diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/service/BinlogServiceTest.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/service/BinlogServiceTest.java new file mode 100644 index 0000000..401a912 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/service/BinlogServiceTest.java @@ -0,0 +1,67 @@ +package com.javaedge.ad.service; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; +import com.github.shyiko.mysql.binlog.event.EventData; +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; + +/** + * @author sss + * @date 2019-02-22 + */ +public class BinlogServiceTest { + +// Write--------------- +// WriteRowsEventData{tableId=85, includedColumns={0, 1, 2}, rows=[ +// [10, 10, 宝马] +//]} +// Update-------------- +// UpdateRowsEventData{tableId=85, includedColumnsBeforeUpdate={0, 1, 2}, +// includedColumns={0, 1, 2}, rows=[ +// {before=[10, 10, 宝马], after=[10, 11, 宝马]} +//]} +// Delete-------------- +// DeleteRowsEventData{tableId=85, includedColumns={0, 1, 2}, rows=[ +// [11, 10, 奔驰] +//]} + + +// Write--------------- +// WriteRowsEventData{tableId=70, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[ +// [12, 10, plan, 1, Tue Jan 01 08:00:00 CST 2019, Tue Jan 01 08:00:00 CST 2019, Tue Jan 01 08:00:00 CST 2019, Tue Jan 01 08:00:00 CST 2019] +//]} + + // Tue Jan 01 08:00:00 CST 2019 + + public static void main(String[] args) throws Exception { + + BinaryLogClient client = new BinaryLogClient( + "127.0.0.1", + 3306, + "root", + "root" + ); +// client.setBinlogFilename("binlog.000037"); +// client.setBinlogPosition(); + + client.registerEventListener(event -> { + + EventData data = event.getData(); + + if (data instanceof UpdateRowsEventData) { + System.out.println("Update--------------"); + System.out.println(data.toString()); + } else if (data instanceof WriteRowsEventData) { + System.out.println("Write---------------"); + System.out.println(data.toString()); + } else if (data instanceof DeleteRowsEventData) { + System.out.println("Delete--------------"); + System.out.println(data.toString()); + } + }); + + client.connect(); + } +} + diff --git a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/utils/CommonUtils.java b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/utils/CommonUtils.java index 5ac838e..34ff18b 100644 --- a/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/utils/CommonUtils.java +++ b/javaedge-ad-service/ad-search/src/main/java/com/javaedge/ad/utils/CommonUtils.java @@ -33,7 +33,12 @@ public static String stringConcat(String... args) { return result.toString(); } - // Tue Jan 01 08:00:00 CST 2019 + /** + * Tue Jan 01 08:00:00 CST 2019 + * + * @param dateString + * @return + */ public static Date parseStringDate(String dateString) { try { diff --git a/javaedge-ad-service/ad-search/src/main/resources/application.yml b/javaedge-ad-service/ad-search/src/main/resources/application.yml index 81f4191..bd96c7d 100644 --- a/javaedge-ad-service/ad-search/src/main/resources/application.yml +++ b/javaedge-ad-service/ad-search/src/main/resources/application.yml @@ -50,7 +50,7 @@ adconf: username: root password: root binlogName: binlog.000038 - position: 60451 + position: -1 kafka: topic: ad-search-mysql-data diff --git a/javaedge-ad-service/ad-search/src/main/resources/example.sql b/javaedge-ad-service/ad-search/src/main/resources/example.sql new file mode 100755 index 0000000..19941ad --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/resources/example.sql @@ -0,0 +1,37 @@ +-- 更新, 插入, 删除 ad_plan +update ad_plan set start_date = '2018-11-30 00:00:00' where id = 10; + +INSERT INTO `ad_plan` (`user_id`, `plan_name`, `plan_status`, `start_date`, `end_date`, `create_time`, `update_time`) +VALUES (10, '第二个推广计划', 1, '2018-01-01 00:00:00', '2019-12-01 00:00:00', '2018-01-01 00:00:00', '2018-01-01 00:00:00'); + +delete from ad_plan where id = 12; + + +-- 更新, 插入, 删除 ad_creative +update ad_creative set url = 'https://www.shishusheng.com'; +INSERT INTO `ad_creative` (`name`, `type`, `material_type`, `height`, `width`, `size`, `duration`, `audit_status`, `user_id`, `url`, `create_time`, `update_time`) VALUES ('第二个创意', 1, 1, 720, 1080, 1024, 0, 1, 10, 'www.shishusheng.com', '2018-01-01 00:00:00', '2018-01-01 00:00:00'); +delete from ad_creative where id = 13; + + +-- 更新, 插入, 删除 ad_unit +update ad_unit set unit_status = 1 where id = 10; +INSERT INTO `ad_unit` (`plan_id`, `unit_name`, `unit_status`, `position_type`, `budget`, `create_time`, `update_time`) VALUES (10, '第二个推广单元', 1, 1, 15000000, '2018-01-01 00:00:00', '2018-01-01 00:00:00'); +delete from ad_unit where id = 11; + + +-- 插入, 删除 creative_unit +INSERT INTO `creative_unit` (`creative_id`, `unit_id`) VALUES (10, 12); +delete from creative_unit where creative_id = 10 and unit_id = 12; + + +-- 插入, 删除 ad_unit_district +INSERT INTO `ad_unit_district` (`unit_id`, `province`, `city`) VALUES (10, '辽宁省', '大连市'); +delete from ad_unit_district where unit_id = 10 and province = '辽宁省' and city = '大连市'; + +-- 插入, 删除 ad_unit_it +INSERT INTO `ad_unit_it` (`unit_id`, `it_tag`) VALUES (10, '徒步'); +delete from ad_unit_it where unit_id = 10 and it_tag = '徒步'; + +-- 插入, 删除 ad_unit_keyword +INSERT INTO `ad_unit_keyword` (`unit_id`, `keyword`) VALUES (10, '标志'); +delete from ad_unit_keyword where unit_id = 10 and keyword = '标志'; diff --git a/javaedge-ad-service/ad-search/src/main/resources/template.json b/javaedge-ad-service/ad-search/src/main/resources/template.json new file mode 100644 index 0000000..14ab7c5 --- /dev/null +++ b/javaedge-ad-service/ad-search/src/main/resources/template.json @@ -0,0 +1,128 @@ +{ + "database": "javaedge_ad_data", + "tableList": [ + { + "tableName": "ad_plan", + "level": 2, + "insert": [ + {"column": "id"}, + {"column": "user_id"}, + {"column": "plan_status"}, + {"column": "start_date"}, + {"column": "end_date"} + ], + "update": [ + {"column": "id"}, + {"column": "user_id"}, + {"column": "plan_status"}, + {"column": "start_date"}, + {"column": "end_date"} + ], + "delete": [ + {"column": "id"} + ] + }, + { + "tableName": "ad_unit", + "level": 3, + "insert": [ + {"column": "id"}, + {"column": "unit_status"}, + {"column": "position_type"}, + {"column": "plan_id"} + ], + "update": [ + {"column": "id"}, + {"column": "unit_status"}, + {"column": "position_type"}, + {"column": "plan_id"} + ], + "delete": [ + {"column": "id"} + ] + }, + { + "tableName": "ad_creative", + "level": 2, + "insert": [ + {"column": "id"}, + {"column": "type"}, + {"column": "material_type"}, + {"column": "height"}, + {"column": "width"}, + {"column": "audit_status"}, + {"column": "url"} + ], + "update": [ + {"column": "id"}, + {"column": "type"}, + {"column": "material_type"}, + {"column": "height"}, + {"column": "width"}, + {"column": "audit_status"}, + {"column": "url"} + ], + "delete": [ + {"column": "id"} + ] + }, + { + "tableName": "creative_unit", + "level": 3, + "insert": [ + {"column": "creative_id"}, + {"column": "unit_id"} + ], + "update": [ + ], + "delete": [ + {"column": "creative_id"}, + {"column": "unit_id"} + ] + }, + { + "tableName": "ad_unit_district", + "level": 4, + "insert": [ + {"column": "unit_id"}, + {"column": "province"}, + {"column": "city"} + ], + "update": [ + ], + "delete": [ + {"column": "unit_id"}, + {"column": "province"}, + {"column": "city"} + ] + }, + { + "tableName": "ad_unit_it", + "level": 4, + "insert": [ + {"column": "unit_id"}, + {"column": "it_tag"} + ], + "update": [ + ], + "delete": [ + {"column": "unit_id"}, + {"column": "it_tag"} + ] + }, + { + "tableName": "ad_unit_keyword", + "level": 4, + "insert": [ + {"column": "unit_id"}, + {"column": "keyword"} + ], + "update": [ + ], + "delete": [ + {"column": "unit_id"}, + {"column": "keyword"} + ] + } + ] +}