[Nest] 19 - 06/16/2021, 1:09:12 PM [ClientKafka] ERROR [Connection] Response Heartbeat(key: 12, version: 3) {"timestamp":"2021-06-16T13:09:12.779Z","logger":"kafkajs","broker":"kafka-0.kafka-headless.dev.svc.cluster.local:9092","clientId":"reviews-ts-service-client","error":"The group is rebalancing, so a rejoin is needed","correlationId":1241,"size":10} +2857ms
[Nest] 19 - 06/16/2021, 1:09:12 PM [ClientKafka] ERROR [Runner] The group is rebalancing, re-joining {"timestamp":"2021-06-16T13:09:12.779Z","logger":"kafkajs","groupId":"reviews-consumer-ts-customer-client","memberId":"reviews-ts-service-client-453b2860-fdab-4c01-aa98-e015667b8d3b","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":330} +1m




sessionTimeout: 60000,
heartbeatInterval: 40000,
maxWaitTimeInMs: 43000,

sessionTimeout : it should be greater than the processing time of method.
heartbeatInterval : someone said, it should 2/3 of sessionTimeout
maxWaitTimeInMs : it must be _greater with heartbeatInterval

This issue was resolved by above configuration.





  groupId: <String>,
  partitionAssigners: <Array>,
  sessionTimeout: <Number>,
  rebalanceTimeout: <Number>,
  heartbeatInterval: <Number>,
  metadataMaxAge: <Number>,
  allowAutoTopicCreation: <Boolean>,
  maxBytesPerPartition: <Number>,
  minBytes: <Number>,
  maxBytes: <Number>,
  maxWaitTimeInMs: <Number>,
  retry: <Object>,
partitionAssignersList of partition assigners[PartitionAssigners.roundRobin]
sessionTimeoutTimeout in milliseconds used to detect failures. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance30000
rebalanceTimeoutThe maximum time that the coordinator will wait for each member to rejoin when rebalancing the group60000
heartbeatIntervalThe expected time in milliseconds between heartbeats to the consumer coordinator. Heartbeats are used to ensure that the consumer's session stays active. The value must be set lower than session timeout3000
metadataMaxAgeThe period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions300000 (5 minutes)
allowAutoTopicCreationAllow topic creation when querying metadata for non-existent topicstrue
maxBytesPerPartitionThe maximum amount of data per-partition the server will return. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition1048576 (1MB)
minBytesMinimum amount of data the server should return for a fetch request, otherwise wait up to maxWaitTimeInMs for more data to accumulate.1
maxBytesMaximum amount of bytes to accumulate in the response. Supported by Kafka >= (10MB)
maxWaitTimeInMsThe maximum amount of time in milliseconds the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by minBytes5000
retrySee retry for more information{ retries: 10 }
readUncommittedConfigures the consumer isolation level. If false (default), the consumer will not return any transactional messages which were not committed.false


最后修改:2022 年 04 月 29 日 05 : 32 PM