前文作者讲述了BottledWater-PG安装部署,并在pg中实现了数据改变,向kafka发送消息的案例,详细参考《BottledWater-PG:PostgreSQL集成Kafka的实时数据交换平台》。此前作者写过一篇pg的异步消息实现的实时地图应用案例《postgres+socket.io+nodejs实时地图应用实践》,本文将改用BottledWater-PG实现一遍。

一 服务器端

var fs = require('fs');
var http = require('http');
var socket = require('socket.io');
var Kafka = require('node-rdkafka');var server = http.createServer(function(req, res) {res.writeHead(200, { 'Content-type': 'text/html'});res.end(fs.readFileSync(__dirname + '/index.html'));}).listen(8081, function() {console.log('Listening at: http://localhost:8081');
});
//注册socket.io
var socketio=socket.listen(server);
socketio.on('connection', function (socketclient) {console.log('已连接socket:');//socketclient.broadcast.emit('GPSCoor', data.payload);//广播给别人//socketclient.emit('GPSCoor', data.payload);//广播给自己});var consumer = new Kafka.KafkaConsumer({//'debug': 'all','metadata.broker.list': '192.168.43.27:9092','group.id': 'node-rdkafka-consumer-flow-example','enable.auto.commit': false
});var topicName = 'gps';//logging debug messages, if debug is enabled
consumer.on('event.log', function(log) {console.log(log);
});//打印错误
consumer.on('error', function(err) {console.error('Error from consumer');console.error(err);
});consumer.on('ready', function(arg) {console.log('consumer ready.' + JSON.stringify(arg));consumer.subscribe([topicName]);//准备消费消息consumer.consume();
});consumer.on('data', function(m) {console.log(m);let _data;if(m.value==null)//delete操作发送来的消息{_data=JSON.parse(m.key);_data.tg_op='delete';}else{_data=m.value.toString();_data=JSON.parse(_data);}   console.log(_data);socketio.emit('GPSCoor', _data);//广播给所有的客户端
});consumer.on('disconnected', function(arg) {console.log('consumer disconnected. ' + JSON.stringify(arg));
});//启动
consumer.connect();

二 客户端

<html>
<head><meta charset='utf-8'><title>实时地图应用</title><link rel="stylesheet" href="http://openlayers.org/en/v3.18.2/css/ol.css" type="text/css"><script src="http://openlayers.org/en/v3.18.2/build/ol.js"></script><script src="/socket.io/socket.io.js"></script><script>var wktform=new ol.format.WKT();//wkt解析var gpsSource=new ol.source.Vector();function init(){var gpsLayer=new ol.layer.Vector({source:gpsSource,style:new ol.style.Style({image: new ol.style.Icon(({anchor: [0.5, 1],src: 'http://openlayers.org/en/v3.18.2/examples/data/icon.png'}))})});var map = new ol.Map({layers : [new ol.layer.Tile({title : '街道图',visible : true,source : new ol.source.XYZ({url : 'http://www.google.cn/maps/vt?pb=!1m5!1m4!1i{z}!2i{x}!3i{y}!4i256!2m3!1e0!2sm!3i342009817!3m9!2szh-CN!3sCN!5e18!12m1!1e47!12m3!1e37!2m1!1ssmartmaps!4e0&token=32965'})}),gpsLayer],target : 'map',controls : ol.control.defaults({attributionOptions : ({collapsible : false})}),view : new ol.View({center : [0, 0],zoom : 2})});var iosocket = io.connect();//接受服务端消息iosocket.on('GPSCoor', function(data) {console.log(data);var id=data.id.int;var feature;if(data.tg_op=='delete'){feature=gpsSource.getFeatureById(id);if(feature)gpsSource.removeFeature(feature);//删除点}else{var geom=data.geom.string;geom=wktform.readGeometry(geom);geom.transform('EPSG:4326','EPSG:3857');feature=gpsSource.getFeatureById(id);if(feature)feature.setGeometry(geom);//修改已有点else{feature=new ol.Feature({geometry:geom});feature.setId(id);gpsSource.addFeature(feature);//地图新增点}}});}</script>
</head>
<body onload="init()"><div id="map"></div>
</body>
</html>

三 测试成果

3.1 新增

mcsas=# insert into gps(name,geom) values ('opy','Point(118 31.5)');
INSERT 0 1
mcsas=# insert into gps(name,geom) values ('ty','Point(117 30.5)');
INSERT 0 1
新增成果.png

3.2 修改

mcsas=# update gps set geom='Point(115 40)' where name='opy';
UPDATE 1
修改成果.png

3.3 删除

mcsas=# delete from gps where name='opy';
DELETE 1
删除结果.png

四 总结

BottledWater-PG主要作用是将pg库中的表的增删改的消息都发往了kafka,应用程序并没有直接连接数据库,而是直接去消费kafka的消息。在表发生insert,update,delete能获取消息,但是truncate table并未向kafka生成消息,不知是否是我哪里遗漏。
  作者之前曾使用pg自带的notify与listen实现异步消息发送,该方法借助了表的触发器实现。应用程序是直连数据库且数据增删改都会走触发器。
  匆忙中,作者并未对比两者之间孰优孰劣,但一个直连库,一个间接消费,在不同需求中可选择一个比较符合要求的方案而加以应用。

基于BottledWater-PG+nodejs实时地图应用实践相关推荐

  1. postgres+socket.io+nodejs实时地图应用实践

    2019独角兽企业重金招聘Python工程师标准>>> nodejs一直以异步io著称,其语言特性尤其擅长于在realtime应用中,如聊天室等.在进行实时应用开发时,必不可少的需要 ...

  2. 基于Confluent+Flink的实时数据分析最佳实践

    简介:在实际业务使用中,需要经常实时做一些数据分析,包括实时PV和UV展示,实时销售数据,实时店铺UV以及实时推荐系统等,基于此类需求,Confluent+实时计算Flink版是一个高效的方案. 业务 ...

  3. 【Baidu Apollo】基于人工驾驶路径的实时地图生成

    Apollo相对地图 基于人工驾驶路径的实时地图生成 百度资深软件工程师 Yifei Jiang   本文转自百度开发者社区 相对地图是在Apollo 2.5的时候第一次对外开放.在3.0的时候我们和 ...

  4. 基于Flink+ClickHouse构建实时游戏数据分析最佳实践

    简介:本实践介绍如何快速收集海量用户行为数据,实现秒级响应的实时用户行为分析,并通过实时流计算.云数据库ClickHouse等技术进行深入挖掘和分析,得到用户特征和画像,实现个性化系统推荐服务. 直达 ...

  5. 趣头条基于 Flink 的实时平台建设实践

    本文由趣头条实时平台负责人席建刚分享趣头条实时平台的建设,整理者叶里君.文章将从平台的架构.Flink 现状,Flink 应用以及未来计划四部分分享. 一.平台架构 1.Flink 应用时间线 首先是 ...

  6. Lyft推出一种新的实时地图匹配算法

    点击上方"3D视觉工坊",选择"星标" 干货第一时间送达 打车有时也会职业病发作,琢磨一下车辆调度是怎么做的,路径规划算法要怎么写,GPS偏移该怎么纠正等等.不 ...

  7. QCon技术干货:个推基于Docker和Kubernetes的微服务实践

    2019独角兽企业重金招聘Python工程师标准>>> 2016年伊始,Docker无比兴盛,如今Kubernetes万人瞩目.在这个无比需要创新与速度的时代,由容器.微服务.Dev ...

  8. 百度爱番番实时CDP建设实践

    导读:随着营销3.0时代的到来,企业愈发需要依托强大CDP能力解决其严重的数据孤岛问题,帮助企业加温线索.促活客户.但什么是CDP.好的CDP应该具备哪些关键特征?本文在回答此问题的同时,详细讲述了爱 ...

  9. 分布式大数据多维分析引擎:Kylin 在百度地图的实践

    2019独角兽企业重金招聘Python工程师标准>>> 1. 前言 百度地图开放平台业务部数据智能组主要负责百度地图内部相关业务的大数据计算分析,处理日常百亿级规模数据,为不同业务提 ...

  10. 基于Docker和Kubernetes的企业级DevOps实践训练营

    基于Docker和Kubernetes的企业级DevOps实践训练营 课程准备 离线镜像包 百度:https://pan.baidu.com/s/1N1AYGCYftYGn6L0QPMWIMw 提取码 ...

最新文章

  1. 双水泵轮换工作原理图_「物业管理工作」水泵维护保养规程
  2. 你听过BA、DA、AA、TA么?全网疯传的架构实践全景图!
  3. python 作用域 前缀_Python 之作用域和名字空间
  4. 【采用】解读消金业务风控模型的6个层级
  5. Java 8系列之Lambda表达式
  6. jsp模糊查询_查询知识产权和商标专利的网站汇总!
  7. zcmu4959: ly的新闹钟(有个点容易忽略)
  8. 修改hosts 流畅使用coursera
  9. SonicWall 防火墙曝严重漏洞,有些设备仍无补丁
  10. 14章类型信息之使用类字面常量
  11. 利用MFC Picture Control控件 加载bmp,png
  12. Vmware WorkStation Pro 14 激活密钥
  13. 星宸科技SSD202D芯片+无线投屏协议在摩托车智能仪表,电动车智能仪表批量出货。
  14. 「AI Timer 说」我只是没有行动而已,我笃定。
  15. python剪刀石头布_Python Tkinter教程系列01:剪刀石头布游戏
  16. python从第二行开始读取文件_python学习笔记—— 从第二行开始读文件
  17. 解决:springboot生成jar运行没有主清单属性
  18. Java map去空值
  19. 普元云计算-聊聊前端工程化的实践与未来
  20. 关于用软碟通UltralSO安装fedora时出现的问题

热门文章

  1. 360路由器故障显示DNS服务器,360安全路由器dns异常解决方法
  2. dayjs格式化使用
  3. Cookie和Session的使用及其案例分析
  4. 大揭秘(1):网店背后的隐秘产业链【连载】
  5. dell服务器系统备份软件,使用 AlienRespawn 备份 Alienware 电脑的系统
  6. 如何在线下载哔哩哔哩上的视频
  7. 本地html网页载入很慢,网页打开很慢是什么原因?怎么解决
  8. java+selenium——Navigate命令
  9. idea Push Tags选All还是Current Branch?
  10. android 图片/视频混合轮播控件banner