java - Why Kafka KTable is missing entries? -


i have single instance java application uses ktable kafka streams. until retrieve data using ktable when of messages seemed vanish. there should ~33k messages unique keys there.

when want retrieve messages key don't of messages. use readonlykeyvaluestore retrieve messages:

final readonlykeyvaluestore<genericrecord, genericrecord> store = ((kafkastreams)streams).store(storename, queryablestoretypes.keyvaluestore()); store.get(key); 

these configuration settings set kafkastreams.

final properties config = new properties(); config.put(streamsconfig.application_server_config, serverid); config.put(streamsconfig.application_id_config, applicationid); config.put(streamsconfig.bootstrap_servers_config, bootstrapservers); config.put(consumerconfig.auto_offset_reset_config, "earliest"); config.put(abstractkafkaavroserdeconfig.schema_registry_url_config, schemaregistryurl); config.put(streamsconfig.key_serde_class_config, genericavroserde.class); config.put(streamsconfig.value_serde_class_config, genericavroserde.class); config.put(streamsconfig.cache_max_bytes_buffering_config, 0); 

kafka: 0.10.2.0-cp1
confluent: 3.2.0

investigations brought me worrying insights. using rest proxy manually read partitions , found out offsets return error.

request: /topics/{topic}/partitions/{partition}/messages?offset={offset}

{     "error_code": 50002,     "message": "kafka error: fetch response contains error code: 1" } 

no client, neither java nor command line return error. skip on faulty missing messages resulting in missing data in ktables. fine , without notice seems somehow of messages got corrupt.

i have 2 brokers , topics have replication factor of 2 , replicated. both brokers separately return same. restarting brokers makes no difference.

  • what possibly cause?
  • how detect case in client?

by default kafka broker config key cleanup.policy set delete. set compact keep latest message each key. see compaction.

deletion of old messages not change minimum offset trying retrieve message below causes error. error vague. kafka streams client start reading messages minimum offset there no error. visible effect missing data in ktables.

while application running caches data might still available after messages deleted kafka itself. vanish after cleanup.


Comments

Popular posts from this blog

inversion of control - Autofac named registration constructor injection -

verilog - Systemverilog dynamic casting issues -

ios - Change Storyboard View using Seague -