翻译主要的创建参数备忘,忒TM烦人的多


import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.log4j.Logger;import java.util.*;
import java.util.concurrent.ExecutionException;public class KafkaClient implements AutoCloseable{private KafkaAdminClient client;private int timeout=6000;private final static Logger logger=Logger.getLogger(KafkaClient.class);public KafkaClient(KafkaSettings settings){Map<String,Object> map=new HashMap<>();String[] hosts=settings.getArray(KafkaSettings.PREFIX_ALL,AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,",");map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(hosts));client=(KafkaAdminClient)KafkaAdminClient.create(map);if (logger.isInfoEnabled()){logger.info("Kafka集群已成功连接");}}public KafkaClient(String[] hosts){Map<String,Object> map=new HashMap<>();map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(hosts));client=(KafkaAdminClient)KafkaAdminClient.create(map);if (logger.isInfoEnabled()){logger.info("Kafka集群已成功连接");}}/*** 创建一个新的topic* @param name topic名* @param numPartitions 分区数* @param replicationFactor 副本数* @return 是否创建成功* @throws*/public boolean createTopic(String name, int numPartitions, short replicationFactor) throws ExecutionException, InterruptedException {Map<String,String> topicConfig=new HashMap<>();/** 旧日志段的保留测率,删除或压缩,此时选择删除 */topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG,TopicConfig.CLEANUP_POLICY_DELETE);/** 过期数据的压缩方式,如果上面选项为压缩的话才有效 *///topicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG,"snappy");/*** The amount of time to retain delete tombstone markers for log compacted topics.* This setting also gives a bound on the time in which a consumer must complete a* read if they begin from offset 0 to ensure that they get a valid snapshot of the* final stage (otherwise delete tombstones may be collected before they complete their scan).* 默认1天* */topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG,"86400000");/** 文件在文件系统上被删除前的保留时间,默认为60秒 */topicConfig.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG,"60000");/** 将数据强制刷入日志的条数间隔 *///topicConfig.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG,"9223372036854775807");/** 将数据强制刷入日志的时间间隔 *///topicConfig.put(TopicConfig.FLUSH_MS_CONFIG,"9223372036854775807");/** offset设置 *///topicConfig.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG,"4096");/** 每个批量消息最大字节数 *///topicConfig.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,"1000012");/** 记录标记时间与kafka本机时间允许的最大间隔,超过此值的将被拒绝 *///topicConfig.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG,"9223372036854775807");/** 标记时间类型,是创建时间还是日志时间 CreateTime/LogAppendTime *///topicConfig.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,"CreateTime");/** 如果日志压缩设置为可用的话,设置日志压缩器清理日志的频率。默认情况下,压缩比率超过50%时会避免清理日志。此比率限制重复日志浪费的最大空间,设置为50%,意味着最多50%的日志是重复的。更高的比率设置意味着更少、更高效的清理,但会浪费更多的磁盘空间。*///topicConfig.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG,"0.5");/** 消息在日志中保持未压缩状态的最短时间,只对已压缩的日志有效 *///topicConfig.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG,"0");/** 当一个producer的ack设置为all(或者-1)时,此项设置的意思是认为新记录写入成功时需要的最少副本写入成功数量。如果此最小数量没有达到,则producer抛出一个异常(NotEnoughReplicas 或者NotEnoughReplicasAfterAppend)。你可以同时使用min.insync.replicas 和ack来加强数据持久话的保障。一个典型的情况是把一个topic的副本数量设置为3,min.insync.replicas的数量设置为2,producer的ack模式设置为all,这样当没有足够的副本没有写入数据时,producer会抛出一个异常。*/topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,"1");/** 如果设置为true,会在新日志段创建时预分配磁盘空间 */topicConfig.put(TopicConfig.PREALLOCATE_CONFIG,"true");/** 当保留策略为删除(delete)时,此设置控制在删除就日志段来清理磁盘空间前,保存日志段的partition能增长到的最大尺寸。* 默认情况下没有尺寸大小限制,只有时间限制。。由于此项指定的是partition层次的限制,它的数量乘以分区数才是topic层面保留的数量。 */// topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG,"-1");/*** 当保留策略为删除(delete)时,此设置用于控制删除旧日志段以清理磁盘空间前,日志保留的最长时间。默认为7天。* 这是consumer在多久内必须读取数据的一个服务等级协议(SLA)。* */topicConfig.put(TopicConfig.RETENTION_MS_CONFIG,"604800000");/*** 此项用于控制日志段的大小,日志的清理和持久话总是同时发生,所以大的日志段代表更少的文件数量和更小的操作粒度。* */topicConfig.put(TopicConfig.SEGMENT_BYTES_CONFIG,"1073741824");/*** 此项用于控制映射数据记录offsets到文件位置的索引的大小。我们会给索引文件预先分配空间,然后在日志滚动时收缩它。* 一般情况下你不需要改动这个设置。* *///topicConfig.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG,"10485760");/**  从预订的段滚动时间中减去最大的随机抖动,避免段滚动时的惊群(thundering herds)  *///topicConfig.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG,"0");/** 此项用户控制kafka强制日志滚动时间,在此时间后,即使段文件没有满,也会强制滚动,以保证持久化操作能删除或压缩就数据。默认7天 */topicConfig.put(TopicConfig.SEGMENT_MS_CONFIG,"604800000");/*** 是否把一个不在isr中的副本被选举为leader作为最后手段,即使这样做会带来数据损失* */topicConfig.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,"false");NewTopic newTopic=new NewTopic(name,numPartitions,replicationFactor);newTopic.configs(topicConfig);CreateTopicsOptions options=new CreateTopicsOptions();options.timeoutMs(timeout);CreateTopicsResult result=client.createTopics(ImmutableList.of(newTopic),options);for(Map.Entry<String,KafkaFuture<Void>> e : result.values().entrySet()){KafkaFuture<Void> future= e.getValue();future.get();boolean success=!future.isCompletedExceptionally();if(logger.isInfoEnabled()&&success){logger.info("已成功创建Kafka topic "+name+" ,分区 "+numPartitions+" ,副本 "+replicationFactor);}return success;}return false;}/*** 当topic不存在时,主动创建* @param name topic名* @param numPartitions 分区数* @param replicationFactor 副本数* @return 是否可以使用此topic,如果为true,可能是新创建或已存在* @throws ExecutionException* @throws InterruptedException*/public boolean createTopicIfNotExists(String name, int numPartitions, short replicationFactor) throws ExecutionException, InterruptedException {if(!listTopics().contains(name)){return createTopic(name,numPartitions,replicationFactor);}if(logger.isInfoEnabled()){logger.info("Kafka topic "+name+" 已存在");}return true;}/*** 列出所有非内部topic* @return* @throws ExecutionException* @throws InterruptedException*/public Set<String> listTopics() throws ExecutionException, InterruptedException {ListTopicsOptions options=new ListTopicsOptions();//设置超时时间options.timeoutMs(timeout);//不列出kafka内部topicoptions.listInternal(false);ListTopicsResult result=client.listTopics(options);Set<String> topics= result.names().get();if(logger.isDebugEnabled()){logger.debug("发现"+topics.size()+"个Kafka topic : "+topics.toString());}return topics;}/*** 检查topic是否存在* @param topics 待检查的topic* @return topic是否存在* @throws ExecutionException* @throws InterruptedException*/public List<PairObject<String,Boolean>> checkExists(String[] topics) throws ExecutionException, InterruptedException {Set<String> topicSet= listTopics();List<PairObject<String,Boolean>> exists=new ArrayList<>();for (String topic :topics){exists.add(new PairObject<>(topic,topicSet.contains(topic)));}return exists;}/*** 删除指定topic(如果broker那没有设置允许删除topic的话,此调用会持续等待最终超时返回)* @param topics 待删除的topic* @return 删除是否成功* @throws ExecutionException* @throws InterruptedException*/public List<PairObject<String,Boolean>> deleteTopic(String[] topics) throws ExecutionException, InterruptedException {DeleteTopicsOptions options=new DeleteTopicsOptions();options.timeoutMs(timeout);DeleteTopicsResult deleteTopicsResult=client.deleteTopics(Arrays.asList(topics),options);List<PairObject<String,Boolean>> result=new ArrayList<>();for(Map.Entry<String,KafkaFuture<Void>> e : deleteTopicsResult.values().entrySet()){String topic=e.getKey();KafkaFuture<Void> future=e.getValue();future.get();result.add(new PairObject<>(topic,!future.isCompletedExceptionally()));}return result;}@Overridepublic void close() throws Exception {if (client!=null){client.close();}}public void setTimeout(int timeout) {this.timeout = timeout;}
}

kafka接口操作topic相关推荐

  1. Kafka解析之topic创建(3)——合法性验证

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  2. 8.解析Kafka中的 Topic 和 Partition

    目录 1.什么是Topic 2.什么是Partition 3.Consumer Group 消费者组 4.Topic 和 Partition 的存储 5.producer消息分发策略 6.消费者如何消 ...

  3. 解析Kafka中的 Topic 和 Partition

    topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据 1.什么是Topic ...

  4. 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...

    原文:http://blog.csdn.net/changong28/article/details/39325079 使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指 ...

  5. kafka监听topic消费_分布式专题|最近一直死磕kafka设计原理,都肝吐了

    kafka架构图 kafka核心控制器 定义 在kafka集群中,会选举出一个broker作为控制器(controller),负责管理集群中所有的分区和副本的状态: 职责 监听broker变化,通过监 ...

  6. ACL+SASL的认证配置后的Kafka命令操作(Windows版)

    ACL+SASL的认证配置后的Kafka命令操作 Windows环境 背景 版本 操作 配置文件准备 Zookeeper配置文件 Clients配置文件 Kafka Server配置文件 JAAS配置 ...

  7. Kafka原理+操作+实战

    Kafka原理+操作+实战 前面我和大家交流了kafka的部署安装.对于部署安装这都是小意思,不值得太多的提及.重点还是需要知道kafka原理.熟练掌握kafka命令以及灵活用于kafka场景. 干货 ...

  8. Doris系列之导入Kafka数据操作

    Doris系列 注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Sp ...

  9. Java-利用Spring提供的Resource/ResourceLoader接口操作资源文件

    背景 资源访问接口 主要方法 主要实现类 例子 WritableResource ClassPathResource ServletContextResource 对资源文件编码 资源加载 资源地址表 ...

  10. 02_clickhouse安装,卸载,启动/关闭服务,交互式命令(数据库创建,数据导入,查询),批模式数据导入,MySQL接口操作ClickHouse,DBeaver可视化工具安装与使用(学习笔记)

    1 ClickHouse安装 安装文件清单 clickhouse-client-${version}.noarch.rpm clickhouse-common-static-dbg-${version ...

最新文章

  1. 通俗讲java反射机制ioc,结合反射说明SpringIOC的实现原理
  2. jquery ajax返回Internal server error 500错误解决方案
  3. Win2003下Asp配置技巧 http 500内部服务器错误
  4. 《转》python学习--基础上
  5. 订单最小量限制的增强
  6. 网络测试及故障诊断方法及工具
  7. linux 解决依赖性错误,linux – 由于单模块依赖性,XSP配置失败
  8. c语言规定 函数返回值6,C语言六函数.ppt
  9. Android 应用开发----ViewPager---PagerTitleStrip添加标题栏
  10. TypeError: softmax() got an unexpected keyword argument 'axis'
  11. 【报告分享】2019年中国首席营销官(CMO)调查白皮书.pdf(附下载链接)
  12. How do I select an ITEM from a combobox?
  13. jpadao层继承什么_实木复合地板特点是什么
  14. 大专学历造假改成了 211 拿到了抖音 Offer
  15. baidumap api MySQL_百度地图API获取数据
  16. VC++实现快速截屏
  17. 水星网卡 linux驱动,水星网卡驱动下载_硬件驱动下载
  18. android输入法剪贴板,QQ输入法手安卓V5.4剪贴板 任性粘贴
  19. LeetCode K站中转内最便宜的航班(回溯法、动态规划)
  20. html实现拼图游戏,html、css、js实现拼图游戏

热门文章

  1. JVET专家组下360Lib全景视频投影格式测试平台
  2. 博弈论分析题_《博弈论》期末考试试题
  3. 腾讯入股合作金融云,花费4亿元获长亮科技7.14%股份
  4. 计算机硬盘中有许多碎片,电脑磁盘碎片整理有什么用(需要经常清理吗)
  5. 传奇怎么设置沙巴克自动攻城
  6. 给大家分享一下我的数字化转型研究资料
  7. 异数OS 星星之火(一)-- 异数OS-织梦师云 用户使用手册
  8. python灰色关联度分析_基于灰色关联度重庆万州区边坡稳定影响因素分析
  9. 求职互联网技术岗应届生面试必备技巧分享
  10. CSS的选择器(超详细!!)