Introduction
This project provides a Serializer, Deserializer and a Serde for Kafka Streams using Jackson for JSON processing. All of the available settings for Jackson are configurable.
What if I don't know what settings to use.
If you are already defining a ObjectMapper that works for you and you want to use it's settings. Use this method to dump the non default settings. This will compare all of the default settings against the supplied ObjectMapper and return the configuration values that the equivalent.
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(SerializationFeature.INDENT_OUTPUT, true)
Map<String, String> actual = JacksonSerializer.nonDefaultSettings(objectMapper);
Kafka Streams Serde
The Serde can be configured by passing in the class to deserialize to. This will return a Serde with configured to the object type. When configure is called it will ignore any output.class
supplied since it has already been configured.
Serde<TestPojo> serde = JacksonSerde.of(TestPojo.class);
Serializer
The Serializer does not need to be specified with a type. It will serialize any object that is passed to it.
Map<String, String> settings = ImmutableMap.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()
);
Producer<String, TestPojo> producer = new KafkaProducer<>(settings);
// Do your thing
Configuration
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
accept.case.insensitive.enums.enable | See ACCEPT_CASE_INSENSITIVE_ENUMS | boolean | false | medium | |
accept.case.insensitive.properties.enable | See ACCEPT_CASE_INSENSITIVE_PROPERTIES | boolean | false | medium | |
allow.coercion.of.scalars.enable | See ALLOW_COERCION_OF_SCALARS | boolean | true | medium | |
allow.explicit.property.renaming.enable | See ALLOW_EXPLICIT_PROPERTY_RENAMING | boolean | false | medium | |
allow.final.fields.as.mutators.enable | See ALLOW_FINAL_FIELDS_AS_MUTATORS | boolean | true | medium | |
auto.detect.creators.enable | See AUTO_DETECT_CREATORS | boolean | true | medium | |
auto.detect.fields.enable | See AUTO_DETECT_FIELDS | boolean | true | medium | |
auto.detect.getters.enable | See AUTO_DETECT_GETTERS | boolean | true | medium | |
auto.detect.is.getters.enable | See AUTO_DETECT_IS_GETTERS | boolean | true | medium | |
auto.detect.setters.enable | See AUTO_DETECT_SETTERS | boolean | true | medium | |
can.override.access.modifiers.enable | See CAN_OVERRIDE_ACCESS_MODIFIERS | boolean | true | medium | |
default.view.inclusion.enable | See DEFAULT_VIEW_INCLUSION | boolean | true | medium | |
fail.on.empty.beans.enable | See FAIL_ON_EMPTY_BEANS | boolean | true | medium | |
fail.on.self.references.enable | See FAIL_ON_SELF_REFERENCES | boolean | true | medium | |
fail.on.unwrapped.type.identifiers.enable | See FAIL_ON_UNWRAPPED_TYPE_IDENTIFIERS | boolean | true | medium | |
ignore.duplicate.module.registrations.enable | See IGNORE_DUPLICATE_MODULE_REGISTRATIONS | boolean | true | medium | |
ignore.merge.for.unmergeable.enable | See IGNORE_MERGE_FOR_UNMERGEABLE | boolean | true | medium | |
indent.output.enable | See INDENT_OUTPUT | boolean | false | medium | |
infer.creator.from.constructor.properties.enable | See INFER_CREATOR_FROM_CONSTRUCTOR_PROPERTIES | boolean | true | medium | |
infer.property.mutators.enable | See INFER_PROPERTY_MUTATORS | boolean | true | medium | |
java.time.module.enable | Flag to register the java time module. | boolean | false | medium | |
order.map.entries.by.keys.enable | See ORDER_MAP_ENTRIES_BY_KEYS | boolean | false | medium | |
override.public.access.modifiers.enable | See OVERRIDE_PUBLIC_ACCESS_MODIFIERS | boolean | true | medium | |
propagate.transient.marker.enable | See PROPAGATE_TRANSIENT_MARKER | boolean | false | medium | |
require.setters.for.getters.enable | See REQUIRE_SETTERS_FOR_GETTERS | boolean | false | medium | |
sort.properties.alphabetically.enable | See SORT_PROPERTIES_ALPHABETICALLY | boolean | false | medium | |
use.annotations.enable | See USE_ANNOTATIONS | boolean | true | medium | |
use.equality.for.object.id.enable | See USE_EQUALITY_FOR_OBJECT_ID | boolean | false | medium | |
use.getters.as.setters.enable | See USE_GETTERS_AS_SETTERS | boolean | true | medium | |
use.static.typing.enable | See USE_STATIC_TYPING | boolean | false | medium | |
use.std.bean.naming.enable | See USE_STD_BEAN_NAMING | boolean | false | medium | |
use.wrapper.name.as.property.name.enable | See USE_WRAPPER_NAME_AS_PROPERTY_NAME | boolean | false | medium | |
wrap.exceptions.enable | See WRAP_EXCEPTIONS | boolean | true | medium | |
wrap.root.value.enable | See WRAP_ROOT_VALUE | boolean | false | medium | |
write.char.arrays.as.json.arrays.enable | See WRITE_CHAR_ARRAYS_AS_JSON_ARRAYS | boolean | false | medium | |
write.date.keys.as.timestamps.enable | See WRITE_DATE_KEYS_AS_TIMESTAMPS | boolean | false | medium | |
write.date.timestamps.as.nanoseconds.enable | See WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS | boolean | true | medium | |
write.dates.as.timestamps.enable | See WRITE_DATES_AS_TIMESTAMPS | boolean | true | medium | |
write.dates.with.zone.id.enable | See WRITE_DATES_WITH_ZONE_ID | boolean | false | medium | |
write.durations.as.timestamps.enable | See WRITE_DURATIONS_AS_TIMESTAMPS | boolean | true | medium | |
write.enums.using.index.enable | See WRITE_ENUMS_USING_INDEX | boolean | false | medium | |
write.enums.using.to.string.enable | See WRITE_ENUMS_USING_TO_STRING | boolean | false | medium | |
write.single.elem.arrays.unwrapped.enable | See WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED | boolean | false | medium |
Deserializer
Generically processing Json
By default if you do not configure a class to deserialize to the serializer will deserialize to a Jackson JsonNode. Assuming that your data is Json objects you can use ObjectNode.
Map<String, String> settings = ImmutableMap.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()
);
Consumer<JsonNode, JsonNode> consumer = new KafkaConsumer<JsonNode, JsonNode>(settings);
// Do your thing
Strongly typed Json
You can tell the deserializer to deserialize to a Pojo.
Map<String, Object> settings = ImmutableMap.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName(),
"value.deserializer.output.class", TestPojo.class.getName()
);
Consumer<JsonNode, TestPojo> consumer = new KafkaConsumer<>(settings);
Configuration
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
output.class | The java class to deserialize to. | class | class com.fasterxml.jackson.databind.JsonNode | high | |
accept.case.insensitive.enums.enable | See ACCEPT_CASE_INSENSITIVE_ENUMS | boolean | false | medium | |
accept.case.insensitive.properties.enable | See ACCEPT_CASE_INSENSITIVE_PROPERTIES | boolean | false | medium | |
accept.empty.array.as.null.object.enable | See ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT | boolean | false | medium | |
accept.empty.string.as.null.object.enable | See ACCEPT_EMPTY_STRING_AS_NULL_OBJECT | boolean | false | medium | |
accept.float.as.int.enable | See ACCEPT_FLOAT_AS_INT | boolean | true | medium | |
accept.single.value.as.array.enable | See ACCEPT_SINGLE_VALUE_AS_ARRAY | boolean | false | medium | |
adjust.dates.to.context.time.zone.enable | See ADJUST_DATES_TO_CONTEXT_TIME_ZONE | boolean | true | medium | |
allow.coercion.of.scalars.enable | See ALLOW_COERCION_OF_SCALARS | boolean | true | medium | |
allow.explicit.property.renaming.enable | See ALLOW_EXPLICIT_PROPERTY_RENAMING | boolean | false | medium | |
allow.final.fields.as.mutators.enable | See ALLOW_FINAL_FIELDS_AS_MUTATORS | boolean | true | medium | |
auto.detect.creators.enable | See AUTO_DETECT_CREATORS | boolean | true | medium | |
auto.detect.fields.enable | See AUTO_DETECT_FIELDS | boolean | true | medium | |
auto.detect.getters.enable | See AUTO_DETECT_GETTERS | boolean | true | medium | |
auto.detect.is.getters.enable | See AUTO_DETECT_IS_GETTERS | boolean | true | medium | |
auto.detect.setters.enable | See AUTO_DETECT_SETTERS | boolean | true | medium | |
can.override.access.modifiers.enable | See CAN_OVERRIDE_ACCESS_MODIFIERS | boolean | true | medium | |
default.view.inclusion.enable | See DEFAULT_VIEW_INCLUSION | boolean | true | medium | |
eager.deserializer.fetch.enable | See EAGER_DESERIALIZER_FETCH | boolean | true | medium | |
fail.on.ignored.properties.enable | See FAIL_ON_IGNORED_PROPERTIES | boolean | false | medium | |
fail.on.invalid.subtype.enable | See FAIL_ON_INVALID_SUBTYPE | boolean | true | medium | |
fail.on.missing.creator.properties.enable | See FAIL_ON_MISSING_CREATOR_PROPERTIES | boolean | false | medium | |
fail.on.missing.external.type.id.property.enable | See FAIL_ON_MISSING_EXTERNAL_TYPE_ID_PROPERTY | boolean | true | medium | |
fail.on.null.creator.properties.enable | See FAIL_ON_NULL_CREATOR_PROPERTIES | boolean | false | medium | |
fail.on.null.for.primitives.enable | See FAIL_ON_NULL_FOR_PRIMITIVES | boolean | false | medium | |
fail.on.numbers.for.enums.enable | See FAIL_ON_NUMBERS_FOR_ENUMS | boolean | false | medium | |
fail.on.reading.dup.tree.key.enable | See FAIL_ON_READING_DUP_TREE_KEY | boolean | false | medium | |
fail.on.trailing.tokens.enable | See FAIL_ON_TRAILING_TOKENS | boolean | false | medium | |
fail.on.unknown.properties.enable | See FAIL_ON_UNKNOWN_PROPERTIES | boolean | true | medium | |
fail.on.unresolved.object.ids.enable | See FAIL_ON_UNRESOLVED_OBJECT_IDS | boolean | true | medium | |
ignore.duplicate.module.registrations.enable | See IGNORE_DUPLICATE_MODULE_REGISTRATIONS | boolean | true | medium | |
ignore.merge.for.unmergeable.enable | See IGNORE_MERGE_FOR_UNMERGEABLE | boolean | true | medium | |
infer.creator.from.constructor.properties.enable | See INFER_CREATOR_FROM_CONSTRUCTOR_PROPERTIES | boolean | true | medium | |
infer.property.mutators.enable | See INFER_PROPERTY_MUTATORS | boolean | true | medium | |
java.time.module.enable | Flag to register the java time module. | boolean | false | medium | |
override.public.access.modifiers.enable | See OVERRIDE_PUBLIC_ACCESS_MODIFIERS | boolean | true | medium | |
propagate.transient.marker.enable | See PROPAGATE_TRANSIENT_MARKER | boolean | false | medium | |
read.date.timestamps.as.nanoseconds.enable | See READ_DATE_TIMESTAMPS_AS_NANOSECONDS | boolean | true | medium | |
read.enums.using.to.string.enable | See READ_ENUMS_USING_TO_STRING | boolean | false | medium | |
read.unknown.enum.values.as.null.enable | See READ_UNKNOWN_ENUM_VALUES_AS_NULL | boolean | false | medium | |
read.unknown.enum.values.using.default.value.enable | See READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE | boolean | false | medium | |
require.setters.for.getters.enable | See REQUIRE_SETTERS_FOR_GETTERS | boolean | false | medium | |
sort.properties.alphabetically.enable | See SORT_PROPERTIES_ALPHABETICALLY | boolean | false | medium | |
unwrap.root.value.enable | See UNWRAP_ROOT_VALUE | boolean | false | medium | |
unwrap.single.value.arrays.enable | See UNWRAP_SINGLE_VALUE_ARRAYS | boolean | false | medium | |
use.annotations.enable | See USE_ANNOTATIONS | boolean | true | medium | |
use.big.decimal.for.floats.enable | See USE_BIG_DECIMAL_FOR_FLOATS | boolean | false | medium | |
use.big.integer.for.ints.enable | See USE_BIG_INTEGER_FOR_INTS | boolean | false | medium | |
use.getters.as.setters.enable | See USE_GETTERS_AS_SETTERS | boolean | true | medium | |
use.java.array.for.json.array.enable | See USE_JAVA_ARRAY_FOR_JSON_ARRAY | boolean | false | medium | |
use.long.for.ints.enable | See USE_LONG_FOR_INTS | boolean | false | medium | |
use.static.typing.enable | See USE_STATIC_TYPING | boolean | false | medium | |
use.std.bean.naming.enable | See USE_STD_BEAN_NAMING | boolean | false | medium | |
use.wrapper.name.as.property.name.enable | See USE_WRAPPER_NAME_AS_PROPERTY_NAME | boolean | false | medium | |
wrap.exceptions.enable | See WRAP_EXCEPTIONS | boolean | true | medium |