守护石 ReadByte.com
Menu
  • 首页
  • 自媒体平台
  • 知识库
  • 联系方式
Menu

Druid Kafka摄取规格说明

Posted on 2022年8月15日2022年8月15日 by fangshun

研究Apache Druid的Kafka Stream摄取规格说明,因此针对每一项配置规格的作用都进行了详细推敲和梳理,生成了一份表格,若你们也开始学习Druid,那么这份表格对于流式数据摄取的配置,具有指导参考作用。本表格来自于守护石大数据的知识星球。

Input tuning

配置项默认值配置描述
useEarliestOffsetfalse如果supervisor管理数据源,它需要获取Kafka的一组起始offsets,那么这个true与false就决定了是从最早(the earliest)或最晚(the latest)的offsets获取,另外,在通常情况下,后续任务会从前一个Segments的结尾处开始获取offsets,因此这个配置值只是应用于任务的首次运行。
taskDurationPT1H当前任务从开始读取数据到停止以后发布它们的Segment的这个时间段的时长。默认是1小时。
taskCount1该设定表示在一个副本集上读取数据的最大任务数,默认是1,那么所有副本上读取数据的最大任务数就是taskCount * replicas(副本数),因此整个任务数(读取+发布)比此设定更高。
replicas1副本集的数量,默认为1,代表了唯一任务集,也就是无副本。Replica的多个任务总是会分配给不同的works(相当于分布在不同节点),这样可以针对过程故障提供弹性。
completionTimeoutPT30M任务发布的时刻大约是在taskDuration之后开始,发布有默认30分钟超时等待时间,超时后会变成失败并终止,因此该设定不能太低,否则的话,你的任务可能永远都不会发布。
pollTimeout100作为Kafka消费者每次拉取记录前的等待时间,毫秒级。默认是100毫秒。
startDelayPT5SSupervisor开始管理任务之前的等待时间周期。默认是5秒。
periodPT30SSupervisor执行管理逻辑的频次,默认是30秒。不过有些特定事件的响应也会使其运行,例如:任务成功、任务失败和任务达到taskDuration设定时长,因此这个period值是指迭代周期的最大时间。
lateMessageRejectionPeriodPT1H设定一个任务创建之前的时间周期(默认1小时),任何早于这个设定周期的消息都会被拒绝,例如:Supervisor在2016-01-01T12:00Z创建了一个任务,按照默认设定PT1H(1小时),那么就会使得任何早于2016-01-01T11:00Z的消息都将被丢掉。
earlyMessageRejectionPeriodPT1H与lateMessageRejectionPeriod的反之。例如:Supervisor在2016-01-01T12:00Z创建了一个任务,按照默认设定PT1H(1小时),那么就会使得任何晚于2016-01-01T14:00Z的消息都会被丢掉。
skipOffsetGapsfalse是否允许Kafka stream存在缺失偏移量的缝隙,默认为false,也就是说,如果偏移量不连续就会抛出异常。

General tuning

配置项默认值配置描述
maxRowsInMemory1000000当内存中聚合的行数到达设定的值,默认是1百万行,就会发生刷盘的中间持久化。
maxBytesInMemoryDefault: 1/6 of max JVM memory当内存中聚合的字节数到达设定的值,默认是JVM内存最大值的1/6,就会发生刷盘的中间持久化。
resetOffsetAutomaticallyfalse当Durid读取Kafka中的不可用消息,例如:发生了OffsetOutOfRangeException 异常时,默认为false,就会抛出异常,导致任务失败,需要手动干预,可能使用 重置 Supervisor API。此模式对于生产非常有用。 若为true,Druid将根据 useEarliestOffset 属性的值(true 为 earliest,false 为 latest)自动重置为Kafka中可用的earlier或latest偏移量。这可能在你不知情的情况下导致部分数据被丢弃(如果useEarliestOffset 为 false)或导致 重复(如果 useEarliestOffset 为 true)。消息将被记录,表明发生了重置,但摄取将继续。这种模式对于非生产环境非常有用,因为它让Druid尝试从问题中自动恢复,即使这些问题会导致数据被删除或重复。该特性与Kafka的 auto.offset.reset 消费者属性很相似。
intermediatePersistPeriodPT10M设定一个时间周期,默认是PT10M(10分钟),确定了发生中间持久化的速率。
intermediateHandoffPeriodP2147483647DTasks进行Segments切换(Handoff)的频率,Handoff(数据由磁盘转移到深度存储)发生在maxRowsPerSegment或者maxTotalRows被命中的时候,或者在每个intermediateHandoffPeriod时间周期(默认是2147483647天)发生一次。
maxPendingPersists空(无限制)被挂起等待启动的持久化的最大数量,如果一个新的中间持久化任务使得挂起数量超过了限制,那么摄取将被阻塞,直到当前正运行的持久化过程完成。
pushTimeout0推送Segments的等待超时时间,毫秒级,默认为0,0意味着永远等待。
handoffConditionTimeout0切换Segments的等待超时时间,毫秒级,默认为0,0意味着永远等待。
bitmaproaringbitmap indexes压缩格式,默认为roaring
dimensionCompressionlz4dimension字段的压缩格式,默认为lz4
metricCompressionlz4基本类型metric字段的压缩格式,默认lz4
longEncodinglongs类型为long字段的编码格式,无论它们是dimensions还是metrics都适用。选项auto:通过偏移量或查找表根据列基进行值编码,并以可变长度来存储。默认选项为longs:以每8字节对值进行原样存储。
workerThreadsmin(10,taskCount)supervisor用于处理工作任务请求响应的线程数,以及任何其他内部异步操作。默认在10或taskCount中取最小值。
chatThreadsmin(10, taskCount * replicas)用于indexing任务通讯的线程数。默认在10和taskCount * replicas之间取最小值。
Chat retries8HTTP请求indexing任务未响应的重试次数。默认为8次
httpTimeoutPT10S等待HTTP响应indexing任务的超时时间,默认为10秒
shutdownTimeoutPT80SSupervisor退出前试图无故障的关停任务的等待时间,默认是80秒
offsetFetchPeriodPT30SSupervisor查询Kafka和Indexing任务以便获取当前偏移量并计算延迟的频率, 默认是30秒,最小5秒,如果你的设置少于5秒,Supervisor将使用最小值。

摄取规格JSON文件提交示例

{
  "type": "kafka",
  "spec": {
    "ioConfig": {
      "type": "kafka",
      "consumerProperties": {
        "bootstrap.servers": "bigdata-002:9092,bigdata-003:9092,bigdata-004:9092"
      },
      "topic": "netflow",
      "inputFormat": {
        "type": "json"
      },
      "useEarliestOffset": false,
      "replicas": 1,
      "skipOffsetGaps": false,
      "taskCount": 1,
      "completionTimeout": "PT30M",
      "pollTimeout": 100,
      "startDelay": "PT5S",
      "period": "PT30S",
      "taskDuration": "PT1H"
    },
    "tuningConfig": {
      "type": "kafka",
      "resetOffsetAutomatically": false,
      "intermediatePersistPeriod": "PT10M",
      "maxRowsInMemory": 1000000,
      "pushTimeout": 0,
      "handoffConditionTimeout": 0,
      "indexSpec": {
        "dimensionCompression": "lz4",
        "metricCompression": "lz4",
        "longEncoding": "auto",
        "bitmap": {
          "type": "roaring"
        }
      },
      "workerThreads": 1,
      "chatRetries": 1,
      "httpTimeout": "PT10S",
      "shutdownTimeout": "PT80S",
      "offsetFetchPeriod": "PT30S"
    },
    "dataSchema": {
      "dataSource": "netflow",
      "timestampSpec": {
        "column": "time",
        "format": "yyyy-MM-dd HH:mm:ss.S"
      },
      "dimensionsSpec": {
        "dimensions": [
          "node",
          "src",
          "dest"
        ]
      },
      "granularitySpec": {
        "queryGranularity": "second",
        "rollup": true,
        "segmentGranularity": "hour"
      },
      "metricsSpec": [
        {
          "name": "count",
          "type": "count"
        },
        {
          "name": "sum_size",
          "type": "longSum",
          "fieldName": "size"
        },
        {
          "name": "sum_num",
          "type": "longSum",
          "fieldName": "num"
        }
      ],
      "transformSpec": {}
    }
  }
}

发表评论 取消回复

邮箱地址不会被公开。 必填项已用*标注

图与文

只有通过专注,你才能做世界级的事情,无论你多么有能力。——比尔盖茨

标签

Cassandra (2) Druid (2) HBase (1) Influxdb (1) 大数据 (1) 微服务 (1)

近期文章

  • Druid Kafka摄取规格说明 2022年8月15日
  • Apache Druid 实时分析数据库入门介绍 2022年7月20日
  • 通俗理解大数据及其应用价值 2022年7月20日
  • 传统应用系统架构向微服务应用架构升级的实战案例 2022年7月20日
  • 探索Cassandra的去中心化分布式架构 2022年7月19日

文章归档

  • 2022年8月 (1)
  • 2022年7月 (5)
  • 2021年12月 (1)
  • 2021年9月 (1)

分类目录

  • 云计算架构应用
  • 公司介绍
  • 大数据技术研究
陕ICP备2021001914号 | 西安守护石信息科技有限公司