研究Apache Druid的Kafka Stream摄取规格说明,因此针对每一项配置规格的作用都进行了详细推敲和梳理,生成了一份表格,若你们也开始学习Druid,那么这份表格对于流式数据摄取的配置,具有指导参考作用。本表格来自于守护石大数据的知识星球。
Input tuning
配置项 | 默认值 | 配置描述 |
---|---|---|
useEarliestOffset | false | 如果supervisor管理数据源,它需要获取Kafka的一组起始offsets,那么这个true与false就决定了是从最早(the earliest)或最晚(the latest)的offsets获取,另外,在通常情况下,后续任务会从前一个Segments的结尾处开始获取offsets,因此这个配置值只是应用于任务的首次运行。 |
taskDuration | PT1H | 当前任务从开始读取数据到停止以后发布它们的Segment的这个时间段的时长。默认是1小时。 |
taskCount | 1 | 该设定表示在一个副本集上读取数据的最大任务数,默认是1,那么所有副本上读取数据的最大任务数就是taskCount * replicas(副本数),因此整个任务数(读取+发布)比此设定更高。 |
replicas | 1 | 副本集的数量,默认为1,代表了唯一任务集,也就是无副本。Replica的多个任务总是会分配给不同的works(相当于分布在不同节点),这样可以针对过程故障提供弹性。 |
completionTimeout | PT30M | 任务发布的时刻大约是在taskDuration之后开始,发布有默认30分钟超时等待时间,超时后会变成失败并终止,因此该设定不能太低,否则的话,你的任务可能永远都不会发布。 |
pollTimeout | 100 | 作为Kafka消费者每次拉取记录前的等待时间,毫秒级。默认是100毫秒。 |
startDelay | PT5S | Supervisor开始管理任务之前的等待时间周期。默认是5秒。 |
period | PT30S | Supervisor执行管理逻辑的频次,默认是30秒。不过有些特定事件的响应也会使其运行,例如:任务成功、任务失败和任务达到taskDuration设定时长,因此这个period值是指迭代周期的最大时间。 |
lateMessageRejectionPeriod | PT1H | 设定一个任务创建之前的时间周期(默认1小时),任何早于这个设定周期的消息都会被拒绝,例如:Supervisor在2016-01-01T12:00Z创建了一个任务,按照默认设定PT1H(1小时),那么就会使得任何早于2016-01-01T11:00Z的消息都将被丢掉。 |
earlyMessageRejectionPeriod | PT1H | 与lateMessageRejectionPeriod的反之。例如:Supervisor在2016-01-01T12:00Z创建了一个任务,按照默认设定PT1H(1小时),那么就会使得任何晚于2016-01-01T14:00Z的消息都会被丢掉。 |
skipOffsetGaps | false | 是否允许Kafka stream存在缺失偏移量的缝隙,默认为false,也就是说,如果偏移量不连续就会抛出异常。 |
General tuning
配置项 | 默认值 | 配置描述 |
---|---|---|
maxRowsInMemory | 1000000 | 当内存中聚合的行数到达设定的值,默认是1百万行,就会发生刷盘的中间持久化。 |
maxBytesInMemory | Default: 1/6 of max JVM memory | 当内存中聚合的字节数到达设定的值,默认是JVM内存最大值的1/6,就会发生刷盘的中间持久化。 |
resetOffsetAutomatically | false | 当Durid读取Kafka中的不可用消息,例如:发生了OffsetOutOfRangeException 异常时,默认为false,就会抛出异常,导致任务失败,需要手动干预,可能使用 重置 Supervisor API。此模式对于生产非常有用。
若为true,Druid将根据 useEarliestOffset 属性的值(true 为 earliest ,false 为 latest )自动重置为Kafka中可用的earlier或latest偏移量。这可能在你不知情的情况下导致部分数据被丢弃(如果useEarliestOffset 为 false )或导致 重复(如果 useEarliestOffset 为 true )。消息将被记录,表明发生了重置,但摄取将继续。这种模式对于非生产环境非常有用,因为它让Druid尝试从问题中自动恢复,即使这些问题会导致数据被删除或重复。该特性与Kafka的 auto.offset.reset 消费者属性很相似。 |
intermediatePersistPeriod | PT10M | 设定一个时间周期,默认是PT10M(10分钟),确定了发生中间持久化的速率。 |
intermediateHandoffPeriod | P2147483647D | Tasks进行Segments切换(Handoff)的频率,Handoff(数据由磁盘转移到深度存储)发生在maxRowsPerSegment或者maxTotalRows被命中的时候,或者在每个intermediateHandoffPeriod时间周期(默认是2147483647天)发生一次。 |
maxPendingPersists | 空(无限制) | 被挂起等待启动的持久化的最大数量,如果一个新的中间持久化任务使得挂起数量超过了限制,那么摄取将被阻塞,直到当前正运行的持久化过程完成。 |
pushTimeout | 0 | 推送Segments的等待超时时间,毫秒级,默认为0,0意味着永远等待。 |
handoffConditionTimeout | 0 | 切换Segments的等待超时时间,毫秒级,默认为0,0意味着永远等待。 |
bitmap | roaring | bitmap indexes压缩格式,默认为roaring |
dimensionCompression | lz4 | dimension字段的压缩格式,默认为lz4 |
metricCompression | lz4 | 基本类型metric字段的压缩格式,默认lz4 |
longEncoding | longs | 类型为long字段的编码格式,无论它们是dimensions还是metrics都适用。选项auto:通过偏移量或查找表根据列基进行值编码,并以可变长度来存储。默认选项为longs:以每8字节对值进行原样存储。 |
workerThreads | min(10,taskCount) | supervisor用于处理工作任务请求响应的线程数,以及任何其他内部异步操作。默认在10或taskCount中取最小值。 |
chatThreads | min(10, taskCount * replicas) | 用于indexing任务通讯的线程数。默认在10和taskCount * replicas之间取最小值。 |
Chat retries | 8 | HTTP请求indexing任务未响应的重试次数。默认为8次 |
httpTimeout | PT10S | 等待HTTP响应indexing任务的超时时间,默认为10秒 |
shutdownTimeout | PT80S | Supervisor退出前试图无故障的关停任务的等待时间,默认是80秒 |
offsetFetchPeriod | PT30S | Supervisor查询Kafka和Indexing任务以便获取当前偏移量并计算延迟的频率, 默认是30秒,最小5秒,如果你的设置少于5秒,Supervisor将使用最小值。 |
摄取规格JSON文件提交示例
{ "type": "kafka", "spec": { "ioConfig": { "type": "kafka", "consumerProperties": { "bootstrap.servers": "bigdata-002:9092,bigdata-003:9092,bigdata-004:9092" }, "topic": "netflow", "inputFormat": { "type": "json" }, "useEarliestOffset": false, "replicas": 1, "skipOffsetGaps": false, "taskCount": 1, "completionTimeout": "PT30M", "pollTimeout": 100, "startDelay": "PT5S", "period": "PT30S", "taskDuration": "PT1H" }, "tuningConfig": { "type": "kafka", "resetOffsetAutomatically": false, "intermediatePersistPeriod": "PT10M", "maxRowsInMemory": 1000000, "pushTimeout": 0, "handoffConditionTimeout": 0, "indexSpec": { "dimensionCompression": "lz4", "metricCompression": "lz4", "longEncoding": "auto", "bitmap": { "type": "roaring" } }, "workerThreads": 1, "chatRetries": 1, "httpTimeout": "PT10S", "shutdownTimeout": "PT80S", "offsetFetchPeriod": "PT30S" }, "dataSchema": { "dataSource": "netflow", "timestampSpec": { "column": "time", "format": "yyyy-MM-dd HH:mm:ss.S" }, "dimensionsSpec": { "dimensions": [ "node", "src", "dest" ] }, "granularitySpec": { "queryGranularity": "second", "rollup": true, "segmentGranularity": "hour" }, "metricsSpec": [ { "name": "count", "type": "count" }, { "name": "sum_size", "type": "longSum", "fieldName": "size" }, { "name": "sum_num", "type": "longSum", "fieldName": "num" } ], "transformSpec": {} } } }