文章目录

  • 1, 操作说明
    • a, connector 故障排查
    • b, 创建、查看已加载的连接器 (rest api)
    • c, 指定connector 转换器
    • d, 查看,创建,删除 schema (source connector 自动创建)
  • 2, 关系型数据库:之间数据流动
    • a, pgsql -- > mysql
    • b, mysql -- > mysql
    • c, sqlserver -- > sqlserver
    • d, sqlserver -- > mysql: query模式 (字段映射)
    • e, sqlserver -- > oracle
    • f, oracle -- > oracle
  • 3, 关系型数据库 :导入hbase
    • a, debezium-mysql-cdc -- > hbase: 实时数据
    • b, oracle -- > hbase: 增量(时间字段)

1, 操作说明

数据源(pgsql,mysql,oracle,sqlserver) --> ksqldb ( kafka ) --> 目标端( mysql ,sqlserver, oracle,hbase )

a, connector 故障排查

// connector 故障排查1:
ksql> describe connector `mysql-source-t1`;
Name                 : mysql-source-t1
Class                : io.confluent.connect.jdbc.JdbcSourceConnector
Type                 : source
State                : RUNNING
WorkerId             : 10.0.2.15:8083Task ID | State  | Error Trace
------------------------------------------------------------------------------------------------------------------------------------------------------------0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Cannot make incremental queries using incrementing column id on `test`.`t1` because this column is nullable.at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:471)at io.confluent.connect.jdbc.source.JdbcSourceTask.start(JdbcSourceTask.java:165)at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)------------------------------------------------------------------------------------------------------------------------------------------------------------// connector 故障排查2:
$ curl -s "http://localhost:8083/connectors"
["jdbc_source_mysql_10"]
#Check the status of the connector and its task[s]:
$ curl -s "http://localhost:8083/connectors/jdbc_source_mysql_10/status"|jq '.'
{"name": "users_sink_mysql","connector": {"state": "RUNNING","worker_id": "10.0.2.15:8083"},"tasks": [{"id": 0,"state": "FAILED","worker_id": "10.0.2.15:8083","trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime....."}],"type": "sink"
}

b, 创建、查看已加载的连接器 (rest api)

#创建连接器: rest api
curl -s -X POST -H 'Content-Type: application/json'  http://localhost:8083/connectors  -d '
{"name": "jdbc-source","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max": "1","connection.url": "jdbc:sqlite:test.db","mode": "incrementing","incrementing.column.name": "id","topic.prefix": "test-sqlite-jdbc-","name": "jdbc-source"}
}'#删除连接器: rest api
$ curl -X DELETE localhost:8083/connectors/jdbc-source#查看加载的连接器插件: rest api
$  curl -sS localhost:8083/connector-plugins | jq .[].class
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
"io.confluent.connect.jdbc.JdbcSinkConnector"
...

c, 指定connector 转换器

  • "key.converter" = 'org.apache.kafka.connect.storage.StringConverter',
  • "value.converter"= 'io.confluent.connect.avro.AvroConverter' //org.apache.kafka.connect.json.JsonConverter,
  • "value.converter.schema.registry.url"='http://localhost:8081',

d, 查看,创建,删除 schema (source connector 自动创建)

自定义topic schema 样例:https://docs.confluent.io/platform/current/schema-registry/develop/using.html#schemaregistry-using

#create a new topic called test, edit schema:
#avro Primitive Types:
/* null: no value
boolean: a binary value
int: 32-bit signed integer
long: 64-bit signed integer
float: single precision (32-bit) IEEE 754 floating-point number
double: double precision (64-bit) IEEE 754 floating-point number
bytes: sequence of 8-bit unsigned bytes
string: unicode character sequence
*/curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" http://localhost:8081/subjects/test-value/versions -d '{"type": "record","name": "Payment","namespace": "io.confluent.examples.clients.basicavro","fields": [{"name": "id","type": "string"},{"name": "amount","type": "double"}]
} '#2, 查看schema: View the latest subject for transactions-value in Schema Registry:
[root@hadoop01 /]# curl --silent -X GET http://localhost:8081/subjects/ |jq
["oracl-t1PER-value","redo-log-topic-22-value","oracle-redo-log-topicCloud-key"
][root@hadoop01 /]# curl --silent -X GET http://localhost:8081/subjects/oracl-t1PER-value/versions/latest | jq .
{"subject": "oracl-t1PER-value","version": 1,"id": 2,"schema": "{\"type\":\"record\",\"name\":\"PER\",\"fields\":[{\"name\":\"ID\",\"type\":{\"type\":\"bytes\",\"scale\":0,\"precision\":38,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"0\",\"connect.decimal.precision\":\"38\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}},{\"name\":\"NAME\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"PER\"}"
}#3, 删除旧的schema
curl -X DELETE http://localhost:8081/subjects/oracl-t1PER-value/versions/latest

2, 关系型数据库:之间数据流动

a, pgsql – > mysql

参考:

  • pgsql- - > elasticsearsh demo1
  • pgsql- - > elasticsearsh demo2 (mastering-kafka-streams-and-ksqldb/chapter-09 )
  • jdbc连接器 doc: https://docs.confluent.io/kafka-connect-jdbc/current/

source connector 参数说明:poll.interval.ms=5000, 默认5s一次查询新数据

#mysql> create database test;
#        create table test.per (id int PRIMARY KEY, title VARCHAR(120);
#pgsql:  docker run -d --name pgsql -p 5432:5432 -e POSTGRES_USER=root -e POSTGRES_PASSWORD=root -e POSTGRES_DB=root   postgres:9.6.19-alpine
#psql -U root -d root>  CREATE TABLE titles ( id SERIAL PRIMARY KEY, title VARCHAR(120) );
#       INSERT INTO titles (title) values ('Stranger Things');
#       INSERT INTO titles (title) values ('Black Mirror');
ksql>  CREATE SOURCE CONNECTOR `postgres-source` WITH("connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',"connection.url"='jdbc:postgresql://192.168.56.117:5432/root?user=root&password=root',"mode"='incrementing',"incrementing.column.name"='id',"numeric.mapping"='best_fit',"topic.prefix"='',"table.whitelist"='titles',"key"='id');#======================= 查看 kafka topic数据 ==================ksql> set 'auto.offset.reset'='earliest';ksql> print titles;Key format: JSON or KAFKA_STRINGValue format: AVRO or KAFKA_STRINGrowtime: 2021/06/18 08:14:05.976 Z, key: 1, value: {"id": 1, "title": "Stranger Things"}rowtime: 2021/06/18 08:14:05.976 Z, key: 2, value: {"id": 2, "title": "Black Mirror"}ksql>  CREATE sink CONNECTOR `mysql-sink` WITH("connector.class"='io.confluent.connect.jdbc.JdbcSinkConnector',"connection.url"='jdbc:mysql://192.168.56.1:3306/test?user=root&password=root',"insert.mode"='upsert',"topics"='titles', "table.name.format"='per',"pk.mode"='record_value', "pk.fields"='id');

b, mysql – > mysql

# mysql:    CREATE TABLE per (id int PRIMARY KEY , title varchar(20))
# mysql:    CREATE TABLE per2(id int PRIMARY KEY , title varchar(20))
ksql>
CREATE SOURCE CONNECTOR `mysql-source-per` WITH("connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',"connection.url"='jdbc:mysql://192.168.56.1:3306/test?user=root&password=root',"mode"='incrementing',"incrementing.column.name"='id',"numeric.mapping"='best_fit',"topic.prefix"='mysql-source-',"table.whitelist"='per',"key"='id');#查看源数据流ksql> set 'auto.offset.reset'='earliest';Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.ksql> print `mysql-source-per`;Key format: JSON or KAFKA_STRINGValue format: AVRO or KAFKA_STRINGrowtime: 2021/06/19 16:35:59.911 Z, key: 1, value: {"id": 1, "title": "Stranger Things"}rowtime: 2021/06/19 16:35:59.913 Z, key: 2, value: {"id": 2, "title": "Black Mirror"}CREATE sink CONNECTOR `mysql-sink-per2` WITH("connector.class"='io.confluent.connect.jdbc.JdbcSinkConnector',"connection.url"='jdbc:mysql://192.168.56.1:3306/test?user=root&password=root',"insert.mode"='upsert',"topics"='mysql-source-per', "table.name.format"='per2',"pk.mode"='record_value', "pk.fields"='id');

c, sqlserver – > sqlserver

# sqlserver: CREATE TABLE t1(id int PRIMARY KEY ,name varchar(10), job varchar(20))
# sqlserver: CREATE TABLE t2(id int PRIMARY KEY ,name varchar(10), job varchar(20))
ksql>
CREATE SOURCE CONNECTOR `sqlserver-source-t11` WITH("connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',"connection.url"='jdbc:sqlserver://192.168.56.117:1434;databaseName=master',"connection.user"='sa',"connection.password"='abc123_123',"mode"='incrementing',"incrementing.column.name"='id',"numeric.mapping"='best_fit',"topic.prefix"='sqlserver-',"table.whitelist"='t1',"key"='id');CREATE sink CONNECTOR `sqlserver-sink-t2` WITH("connector.class"='io.confluent.connect.jdbc.JdbcSinkConnector',"connection.url"='jdbc:sqlserver://192.168.56.117:1434;databaseName=master',"connection.user"='sa',"connection.password"='abc123_123',"insert.mode"='upsert',"topics"='sqlserver-t1', "table.name.format"='t2',"pk.mode"='record_value', "pk.fields"='id');

d, sqlserver – > mysql: query模式 (字段映射)

# sqlserver: CREATE TABLE t1(id int PRIMARY KEY ,name varchar(10), job varchar(20))
# mysql:    CREATE TABLE per2(id int PRIMARY KEY ,name2 varchar(10), title varchar(20))
ksql>
// query 模式
CREATE SOURCE CONNECTOR `sqlserver2mysql-source-t11` WITH("connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',"connection.url"='jdbc:sqlserver://192.168.56.117:1434;databaseName=master',"connection.user"='sa',"connection.password"='abc123_123',"mode"='incrementing',"incrementing.column.name"='id',"numeric.mapping"='best_fit',"topic.prefix"='sqlserver2mysql',"query"='select id ,name as name2, job as title from t1',"key"='id');CREATE sink CONNECTOR `sqlserver2mysql-sink-per2` WITH("connector.class"='io.confluent.connect.jdbc.JdbcSinkConnector',"connection.url"='jdbc:mysql://192.168.56.1:3306/test?user=root&password=root',"insert.mode"='upsert',"topics"='sqlserver2mysql', "table.name.format"='per2',"pk.mode"='record_value', "pk.fields"='id');

e, sqlserver – > oracle

# sqlserver: CREATE TABLE t1(id int PRIMARY KEY ,name varchar(10), job varchar(20))
# oracle:    CREATE TABLE t2(ID int PRIMARY KEY ,NAME varchar2(10), TITLE varchar2(20))
ksql>
CREATE SOURCE CONNECTOR `sqlserver2oracl-source-t1` WITH("connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',"connection.url"='jdbc:sqlserver://192.168.56.117:1434;databaseName=master',"connection.user"='sa',"connection.password"='abc123_123',"mode"='incrementing',"incrementing.column.name"='ID',"numeric.mapping"='best_fit',"topic.prefix"='sqlserver2oracl-t1',"query"='select id as ID  ,name as NAME , job as TITLE from t1',"key"='ID');CREATE sink CONNECTOR `sqlserver2oracl-sink-t2` WITH("connector.class"='io.confluent.connect.jdbc.JdbcSinkConnector',"connection.url"='jdbc:oracle:thin:@192.168.1.1:1521:test',"connection.user"='test1',"connection.password"='test1',"insert.mode"='upsert',"topics"='sqlserver2oracl-t1', "table.name.format"='T2',"pk.mode"='record_value', "pk.fields"='ID');

f, oracle – > oracle

  • 注意事项:schema.pattern 可以防止扫描所有的表(oracle 一般是一个数据库, 对应多个schema (模式空间,用户名称空间) )
# oracle: CREATE TABLE t1(id int PRIMARY KEY ,name varchar(10), job varchar(20))
# oracle: CREATE TABLE t2(ID int PRIMARY KEY ,NAME varchar2(10),job varchar(20),  TITLE varchar2(20))
ksql>
CREATE SOURCE CONNECTOR `oracl-source-t1` WITH("connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',"connection.url"='jdbc:oracle:thin:@192.168.56.1:1521:orcl',"connection.user"='test1',"connection.password"='test1',"mode"='incrementing',"incrementing.column.name"='ID',"numeric.mapping"='best_fit',"topic.prefix"='oracl-t1',"query"='select id ,name  , job from t1',"schema.pattern"='TEST1',"key"='ID');#查看源数据流:ksql> show topics;Kafka Topic                 | Partitions | Partition Replicas---------------------------------------------------------------default_ksql_processing_log | 1          | 1oracl-t1                    | 1          | 1      #ksql> print `oracl-t1`;Key format: JSON or KAFKA_STRINGValue format: AVRO or KAFKA_STRINGrowtime: 2021/06/25 03:57:43.484 Z, key: 1, value: {"ID": "\u0001", "NAME": "a", "JOB": "java"}rowtime: 2021/06/25 03:57:43.485 Z, key: 2, value: {"ID": "\u0002", "NAME": "b", "JOB": "py"}CREATE sink CONNECTOR `oracl-sink-t2` WITH("connector.class"='io.confluent.connect.jdbc.JdbcSinkConnector',"connection.url"='jdbc:oracle:thin:@192.168.56.1:1521:orcl',"connection.user"='test1',"connection.password"='test1',"insert.mode"='upsert',"topics"='oracl-t1', "table.name.format"='T2',"pk.mode"='record_value', "pk.fields"='ID');

3, 关系型数据库 :导入hbase

kafka-hbase-sink插件zip包 (已改字段类型乱码问题):链接:https://pan.baidu.com/s/1f75ECo-i_iBRtfcF5vYRRA 提取码:abcd
源代码:https://github.com/tayalnishu/kafka-connect-hbase

  • 修改1:源代码中JsonEventParser或 AvroEventParser, 类中区分了字段的数据类型,如果写入hbase的rowkey是这几种的话,就会导致scan 'hbaseTab'时乱码,所以需要统一改成String类型
  • 修改2:把avroData.fromConnectData(schema, value)改为 (Struct) value;并且使用schema.name()补充判断字段类型[bytes, long]
  • 修改3:hbase表单列族时,也能支持选择插入的列名( hbase.destTable.d.columns=d1,d2)
  • 修改4:解析op_type字段以识别delete操作,并删除hbase数据

a, debezium-mysql-cdc – > hbase: 实时数据

  • 下载zip包 (选择0.9.4版本): https://www.confluent.io/hub/debezium/debezium-connector-mysql
  • confluent-hub install debezium-debezium-connector-mysql-0.9.4.zip

doc:
https://docs.confluent.io/debezium-connect-mysql-source/current/mysql_source_connector_config.html#mysql-source-connector-config,
https://debezium.io/documentation/reference/0.9/tutorial.html

# 安装hbase-sink插件:
# shell $>  confluent-hub install ~/nishutayal-kafka-connect-hbase-1.0.1.zip
# hbase:   create 'www', 'f'
# mysql:  create table customers ( id int primary key, first_name varchar(100), last_name varchar(100), email varchar(100) );
/*
my.cnf配置如下:(启用bin-log)server-id         = 1log_bin           = mysql-binbinlog_format     = rowbinlog_row_image  = fullexpire_logs_days  = 10
*/
ksql> CREATE SOURCE CONNECTOR `mysql-dbz-source` WITH("connector.class"='io.debezium.connector.mysql.MySqlConnector', "transforms"='unwrap',"transforms.unwrap.type"='io.debezium.transforms.UnwrapFromEnvelope',"transforms.unwrap.drop.tombstones"=true,"transforms.unwrap.delete.handling.mode"='rewrite',"tasks.max"='1',"database.hostname"='192.168.56.117',"database.port"='32768',"database.server.name"='dbz3',"database.user"='root',"database.password"='123456',"database.server.id"='1',"database.whitelist"='test',"database.history.kafka.bootstrap.servers"='localhost:9092',"database.history.kafka.topic"='schema-changes.inventory'
);#查看源数据流ksql> print `dbz3.inventory.customers`;Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRINGValue format: AVROrowtime: 2021/06/28 03:59:38.246 Z, key: [Struct{@7594262153124655485/-], value: {"id": 1001, "first_name": "Sally", "last_name": "Thomas", "email": "sally.thomas@acme.com", "__deleted": "false"}rowtime: 2021/06/28 03:59:38.246 Z, key: [Struct{@7594262153124655741/-], value: {"id": 1002, "first_name": "George", "last_name": "Bailey", "email": "gbailey@foobar.com", "__deleted": "false"}/* hbase sink 配置参数:
name=kafka-cdc-hbase
connector.class=io.svectors.hbase.sink.HBaseSinkConnector
tasks.max=1
topics=test
zookeeper.quorum=localhost:2181
event.parser.class=io.svectors.hbase.parser.AvroEventParser
hbase.table.name=destTable
hbase.destTable.rowkey.columns=id
hbase.destTable.rowkey.delimiter=|
hbase.destTable.family=c,d
hbase.destTable.c.columns=c1,c2
hbase.destTable.d.columns=d1,d2 */
CREATE sink CONNECTOR `www-hbase-sink2` WITH("connector.class"='io.svectors.hbase.sink.HBaseSinkConnector',"zookeeper.quorum"='192.168.56.161:2181',"event.parser.class"='io.svectors.hbase.parser.AvroEventParser',"hbase.table.name"='www',"hbase.www.rowkey.columns"='id',"hbase.www.family"='f',"topics"='dbz3.inventory.customers');

b, oracle – > hbase: 增量(时间字段)

  • 注意jdbc 连接使用的时区(默认UTC) :"db.timezone": "Asia/Shanghai"
  • 默认最初的时间是1970年,可以设置为当前时间: "timestamp.initial": -1, (必须为数值)
    获取自定义时间的时间戳数值如下:
[root@c7-docker logs]# date -d '2 minute ago'
Mon Jul 19 09:15:25 CST 2021
[root@c7-docker logs]# date -d '2 minute ago' +%s
1626657334
[root@c7-docker logs]# date -d '10 minute ago' +%s
1626656860[root@c7-docker logs]# date -d '1 hour ago'
Mon Jul 19 08:17:08 CST 2021
[root@c7-docker logs]# date -d '2 hour ago'
Mon Jul 19 07:17:11 CST 2021[root@c7-docker logs]# date -d '2 day ago'
Sat Jul 17 09:17:15 CST 2021
[root@c7-docker logs]# date -d '1 month ago' +%s
1624065481
[root@c7-docker logs]# date -d '1 year ago' +%s
1595121487
/* 方式1: 嵌套sql,时间格式转换后形成临时表
[root@c7-docker ~]# cat a.json
{"name": "oracl-source-t1","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max": "1","connection.url": "jdbc:oracle:thin:@192.168.56.117:1523:helowin","connection.user": "scott","connection.password": "scott","mode": "timestamp","timestamp.column.name": "RECORD_TIME","topic.prefix": "oracl-source-t1","numeric.mapping": "best_fit","validate.non.null": false,"query": "SELECT * FROM ( SELECT id, age, TO_DATE(COLUMN1, 'yyyy-mm-dd hh24:mi:ss') AS record_time FROM T1 )","schema.pattern": "SCOTT","db.timezone": "Asia/Shanghai","key": "ID","name": "oracl-source-t1"}
}
[root@c7-docker ~]# curl -s -X POST -H 'Content-Type: application/json'  http://localhost:8083/connectors  -d @a.json
*/# 方式2: 通过视图来获取格式转换后的时间字段
#oracle1: CREATE TABLE patient( id varchar(10), name varchar(10),visit_time varchar(20) );
#   CREATE VIEW PATIENT_view AS select id as id2  ,name as name2, TO_DATE(VISIT_TIME,'yyyy-mm-dd hh24:mi:ss') AS VISIT_TIME   from patient
#oracle2: CREATE TABLE patient2( id2 varchar(10), name2 varchar(10),visit_time varchar(20) );#validate.non.null: By default, the JDBC connector will validate that all incrementing and timestamp tables have NOT NULL
#   set for the columns being used as their ID/timestamp. If the tables don’t, JDBC connector will fail to #start.
#   Setting this to false will disable these checks.
ksql>
CREATE SOURCE CONNECTOR `oracl-source-patient` WITH("connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',"connection.url"='jdbc:oracle:thin:@192.168.56.161:1521:orcl',"connection.user"='test1',"connection.password"='test1',"table.types"='VIEW',"table.whitelist"='PATIENT_VIEW',"schema.pattern"= 'TEST1',"mode"='timestamp',"numeric.mapping"='best_fit',"timestamp.column.name"='VISIT_TIME',    "db.timezone"= 'Asia/Shanghai',"validate.non.null"=false,"TIMESTAMP"='VISIT_TIME', "TIMESTAMP_FORMAT"='yyyy-mm-dd hh24:mi:ss' ,"topic.prefix"='oracl-patient',   "key"='ID2');#查看源数据流#INSERT INTO TEST1.PATIENT(ID, NAME, VISIT_TIME) VALUES('000123', '张三', '2020-01-01 12:23');ksql> print `oracl-patientPATIENT_VIEW`;Key format: KAFKA_STRINGValue format: AVROrowtime: 2021/06/25 07:45:21.103 Z, key: 000123, value: {"ID2": "000123", "NAME2": "张三", "TIME": 1577881340000}rowtime: 2021/06/25 07:45:21.103 Z, key: b123axa, value: {"ID2": "b123axa", "NAME2": "测试1", "TIME": 1577881340000}CREATE sink CONNECTOR `www-hbase-sink2` WITH("connector.class"='io.svectors.hbase.sink.HBaseSinkConnector',"zookeeper.quorum"='192.168.56.161:2181',"event.parser.class"='io.svectors.hbase.parser.AvroEventParser',"hbase.table.name"='www',"hbase.www.rowkey.columns"='ID2',"hbase.www.family"='f',"topics"='oracl-patientPATIENT_VIEW');

Confluent Platform: ksqlDB 实时数据 ETL相关推荐

  1. Confluent Platform: ksqlDB 实时流处理 (quick start)

    文章目录 1, Confluent Platform介绍 功能说明 2, 快速部署: quick start a, 安装配置并启动服务 b, 页面化操作 (Control Center):创建topi ...

  2. 2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

    目录 案例一 实时数据ETL架构 准备主题 ​​​​​​​模拟基站日志数据 ​​​​​​​实时增量ETL 案例一 实时数据ETL架构 在实际实时流式项目中,无论使用Storm.SparkStreami ...

  3. 【Spark分布式内存计算框架——Spark Streaming】10. 应用案例:百度搜索风云榜(中)实时数据ETL存储

    5.3 实时数据ETL存储 实时从Kafka Topic消费数据,提取ip地址字段,调用[ip2Region]库解析为省份和城市,存储到HDFS文件中,设置批处理时间间隔BatchInterval为1 ...

  4. Tapdata 创始人唐建法:以秒级响应速度,为企业提供实时数据服务 | 阿里云云原生加速器特别报道

    作者:云原生加速器 数字化转型正当时,各行各业都在乘风破浪,加速数字化进程.随着信息化.数字化的不断渗透,企业在经营和业务过程中产生的数据爆发式增长.数据驱动增长的现实条件之下,对数据的获取.处理和应 ...

  5. 实时数据交换平台 - BottledWater-pg with confluent

    标签 PostgreSQL , Bottled Water , Kafka , Confluent , IoT 背景 想必大家都在图书馆借过书,小时候有好看的书也会在小伙伴之间传阅. 借书和数据泵有点 ...

  6. Confluent Platform 3.0支持使用Kafka Streams实现实时的数据处理(最新版已经是3.1了,支持kafka0.10了)...

    来自 Confluent 的 Confluent Platform 3.0 消息系统支持使用 Kafka Streams 实现实时的数据处理,这家公司也是在背后支撑 Apache Kafka 消息框架 ...

  7. Kafka ETL 之后,我们将如何定义新一代实时数据集成解决方案?

    上一个十年,以 Hadoop 为代表的大数据技术发展如火如荼,各种数据平台.数据湖.数据中台等产品和解决方案层出不穷,这些方案最常用的场景包括统一汇聚企业数据,并对这些离线数据进行分析洞察,来达到辅助 ...

  8. 【数据架构】Netflix 万亿级实时数据基础架构的四个创新阶段

    我叫徐振中.我于 2015 年加入 Netflix,担任实时数据基础架构团队的创始工程师,后来领导了流处理引擎团队.我在 2010 年代初对实时数据产生了兴趣,从那时起我就相信还有很多价值有待发掘. ...

  9. 如何设计实时数据平台(设计篇)

    导读:本文将会分上下两篇对一个重要且常见的大数据基础设施平台展开讨论,即"实时数据平台". 在上篇设计篇中,我们首先从两个维度介绍实时数据平台:从现代数仓架构角度看待实时数据平台, ...

最新文章

  1. Attribute 绑定、类绑定和样式绑定
  2. 在controller中无法通过注解@Value获取到配置文件中定义的值解决办法
  3. 跟我一起学编程—《Scratch编程》第21课:打地鼠
  4. 210511阶段四 切片 迭代 生成器
  5. IDEA入门(一):简介、安装
  6. Linux 一个进程如何从用户态切换到内核态运行
  7. linux 下载python命令_Linux下修改Python命令的方法示例(附代码)
  8. CSS:模拟实现QQ浏览器
  9. 《数据结构题集(C语言版)》第2章(线性表)习题自解答
  10. 黑苹果MacOS Big Sur 11.0 安装教程及驱动工具
  11. Chrome历史版本查看
  12. 树莓派python蓝牙_在树莓派3B上做蓝牙音频
  13. ASK、OOK、FSK、GFSK 学习
  14. 清水居士与数名志愿者大年三十慰问夏家河村周边贫困家庭
  15. SQL server中提示对象名无效
  16. android n ify三星,三星年度Android旗舰Galaxy S9包装盒曝光!
  17. 运动世界校园破解刷跑步数据
  18. ASCII二进制一键转换
  19. [POI2006] TET-Tetris 3D
  20. 菜鸟到大神的上位历程,即学即用走向人生巅峰

热门文章

  1. RPG Maker MV之如何创建NPC
  2. css3运动框架,CSS3 原子运动
  3. C语言编写的一个简单通用的日志框架----支持日志等级,日志颜色,打印到文件等设置
  4. 联邦学习在腾讯微视广告投放中的实践
  5. Redis error: MISCONF Redis is configured to save RDB snapshots, but is currently not able to pers...
  6. python爬虫批量爬取乐愚社区精美壁纸
  7. 一艘没有航行目标的船,任何方向的风都是逆风
  8. 人生哲理小故事《猎狗与兔子》非常经典的故事
  9. Spark的算子的分类
  10. 15.Unity2D 横版 骨骼动画 之 单张切片图骨骼动画+Aseprite像素画软件