v0.1.5-SNAPSHOT

Motivation

Personal data protection is becoming increasingly important for everyone these days. We chose no compromise, we encrypt everything end to end.

Quicksign uses Kafka as the backbone of its microservices architecture, we therefore needed a solution to achieve end to end encryption in Kafka. Since it didn’t appear to be available, we had to implement our own. We believe opening it to the community will help broaden best practices in terms of personal data protection and improve its security through a larger audience scrutiny.

This library is the actual one used on our platform but some particular encryption and operational details are not revealed here and as such this library on its own doesn’t reflect the overall mechanism used at Quicksign to protect our users data.

Usage

Import maven dependency

<dependency>
    <groupId>io.quicksign</groupId>
    <artifactId>kafka-encryption-core</artifactId>
    <version>0.1.5-SNAPSHOT</version>
</dependency>

Design goals

  • Support multiple encryption key management strategies (KMS, embedded, per message, rolling windows, per tenant, custom)

  • Support for Kafka Streams intermediate topics

  • Detect when a payload is not encrypted to skip decryption

  • Support for Camel

  • Support for Spring Boot

Use cases

Key repository sample (Consumer API)

In this example, an encryption key reference is stored in the record payload. The encryption key is generated by (and stored in) the key repository. Two records with the same record’s key embed the same encryption key reference. The key repository in this example is a simple in memory repository but it would be a key vault such as Google Cloud KMS or Hashicorp Vault in a real world example.

Running the sample

cd samples/kafka-encryption-keyrepo-sample
docker-compose up # on windows and OSX, you need to adjust
sh runSamples.sh

You need to have a running Kafka in order to run the examples, here’s how you can using docker.

On OSX and Windows

docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=192.168.99.100 landoop/fast-data-dev:2.0.1

On linux

docker run --rm --net=host -e ADV_HOST=localhost landoop/fast-data-dev:2.0.1

Usage

Sample Main
// 1- SETUP

// key provider backed by a simple in memory key repository
KeyProvider keyProvider = new RepositoryBasedKeyProvider(new SampleKeyRepository(), new SampleKeyNameObfuscator());

// We create an encryption keyref for each record's key.
// Two records with the same record key have the same encryption key ref.
KeyReferenceExtractor keyReferenceExtractor = new RepositoryBasedKeyReferenceExtractor(new SampleKeyNameExtractor(), new SampleKeyNameObfuscator());

// The payload is encrypted using AES
CryptoAlgorithm cryptoAlgorithm = new AesGcmNoPaddingCryptoAlgorithm();


// 2- RUN

ExecutorService executorService = Executors.newFixedThreadPool(3);

// the producer encrypts the message
executorService.submit(new SampleProducer(keyProvider, keyReferenceExtractor, cryptoAlgorithm));

// this consumer reads them but don't decrypt them... so you can see that it can't be read by default
executorService.submit(new SampleRawConsumer());

// this consumer reads and decrypts... and dump in clear the payload...
executorService.submit(new SampleDecryptingConsumer(keyProvider, cryptoAlgorithm));
Producer side
Encryptor encryptor = new DefaultEncryptor(keyProvider, cryptoAlgorithm);

// Wrap base LongSerializer and StringSerializer with encrypted wrappers
CryptoSerializerPairFactory cryptoSerializerPairFactory = new CryptoSerializerPairFactory(encryptor, keyReferenceExtractor);
SerializerPair<Long, String> serializerPair = cryptoSerializerPairFactory.build(new LongSerializer(), new StringSerializer());

Properties producerProperties = new Properties();
producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (KafkaProducer<Long, String> producer =
             new KafkaProducer<>(producerProperties, serializerPair.getKeySerializer(), serializerPair.getValueSerializer())) {

    for (long i = 0L; i < Long.MAX_VALUE; i++) {
        producer.send(new ProducerRecord<>("sampletopic", i, "test number " + i));
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            return;
        }
    }
}
Consumer side
// The key is embedded in each message

Decryptor decryptor = new DefaultDecryptor(keyProvider, cryptoAlgorithm);

// Construct decrypting deserializer
CryptoDeserializerFactory cryptoDeserializerFactory = new CryptoDeserializerFactory(decryptor);

Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "samplecrypted");
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

try (KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(
        consumerProperties,
        new LongDeserializer(),
        cryptoDeserializerFactory.buildFrom(new StringDeserializer()))) {

    consumer.subscribe(Collections.singleton("sampletopic"));
    for (; true; ) {
        ConsumerRecords<Long, String> records = consumer.poll(1000L);
        records.forEach(
                record -> System.out.println(
                        "-------------------------------------------------------------\n" +
                                "decrypted record: key=" + record.key() + ", offset=" + record.offset() + ", value=" + record.value() +
                                "\n-------------------------------------------------------------\n\n")
        );
    }
}

Embedded per message encryption key sample (Consumer API)

In this example, a different encryption key is generated for each message, encrypted using a master key and stored embedded with the payload.

Running the sample

cd samples/kafka-encryption-generatedkey-sample
docker-compose up # on windows and OSX, you need to adjust
sh generateMasterKey.sh
sh runSamples.sh

You need to have a running Kafka in order to run the examples, here’s how you can using docker.

On OSX and Windows

docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=192.168.99.100 landoop/fast-data-dev:2.0.1

On linux

docker run --rm --net=host -e ADV_HOST=localhost landoop/fast-data-dev:2.0.1

Usage

Producer side
MasterKeyEncryption masterKeyEncryption = new KeyStoreBasedMasterKey(
        new File("/tmp/sample.pkcs12"),
        "sample", "sample",
        new AesGcmNoPaddingCryptoAlgorithm()
);
// Use an AES256 key generator
AES256CryptoKeyGenerator cryptoKeyGenerator = new AES256CryptoKeyGenerator();

// Generate a different key for each message and encrypt it using the master key
KeyPerRecordKeyReferenceExtractor keyReferenceExtractor = new KeyPerRecordKeyReferenceExtractor(
        cryptoKeyGenerator, masterKeyEncryption);

// The key is embedded in each message
PerRecordKeyProvider keyProvider = new PerRecordKeyProvider(masterKeyEncryption);

// The payload is encrypted using AES
AesGcmNoPaddingCryptoAlgorithm cryptoAlgorithm = new AesGcmNoPaddingCryptoAlgorithm();
Encryptor encryptor = new DefaultEncryptor(keyProvider, cryptoAlgorithm);

// Wrap base LongSerializer and StringSerializer with encrypted wrappers
CryptoSerializerPairFactory cryptoSerializerPairFactory = new CryptoSerializerPairFactory(encryptor,
        keyReferenceExtractor);
SerializerPair<Long, String> serializerPair = cryptoSerializerPairFactory.build(new LongSerializer(), new StringSerializer());

Properties producerProperties = new Properties();
producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (KafkaProducer<Long, String> producer =
             new KafkaProducer<>(producerProperties, serializerPair.getKeySerializer(), serializerPair.getValueSerializer())) {

    for (long i = 0L; i < Long.MAX_VALUE; i++) {
        producer.send(new ProducerRecord<>("sampletopic", i, "test number " + i));
        if (i % 10 == 9) {

            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                return;
            }
        }
    }
}
Consumer side
MasterKeyEncryption masterKeyEncryption = new KeyStoreBasedMasterKey(
        new File("/tmp/sample.pkcs12"),
        "sample", "sample",
        new AesGcmNoPaddingCryptoAlgorithm()
);
// The key is embedded in each message
PerRecordKeyProvider keyProvider = new PerRecordKeyProvider(masterKeyEncryption);

// The payload is encrypted using AES
AesGcmNoPaddingCryptoAlgorithm cryptoAlgorithm = new AesGcmNoPaddingCryptoAlgorithm();
Decryptor decryptor = new DefaultDecryptor(keyProvider, cryptoAlgorithm);

// Construct decrypting deserializer
CryptoDeserializerFactory cryptoDeserializerFactory = new CryptoDeserializerFactory(decryptor);

Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "samplecrypted");
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

try (KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(
        consumerProperties,
        new LongDeserializer(),
        cryptoDeserializerFactory.buildFrom(new StringDeserializer()))) {

    consumer.subscribe(Collections.singleton("sampletopic"));
    for (; true; ) {
        ConsumerRecords<Long, String> records = consumer.poll(1000L);
        records.forEach(
                record -> System.out.println("decrypted record: key=" + record.key() + ", offset=" + record.offset() + ", value=" + record.value())
        );
    }
}

Key repository sample (Streams API)

This is a dummy sample of a bank account. Each account has an associated key to encrypt all operations associated to the account.

For this account, we manage the bank view and the agency view.

  • The bank has access to the full key repository (acount 0 to 9).

  • Agency 1 has only access to a subset of key repository (account 0 to 4)

  • Agency 2 has only access to a subset of key repostory (account 5 to 9)

All operations are written encrypted into a common topic operations

Running the sample

cd samples/kafkastream-with-keyrepo-sample
docker-compose up # on windows and OSX, you need to adjust
sh generateKeys.sh
sh runSamples.sh

You need to have a running Kafka in order to run the examples, here’s how you can using docker.

On OSX and Windows

docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=192.168.99.100 landoop/fast-data-dev:2.0.1

On linux

docker run --rm --net=host -e ADV_HOST=localhost landoop/fast-data-dev:2.0.1

Usage

Sample Main
KeyRepository fullKeyRepository = new KeyStoreBasedKeyRepository(
        new File("/tmp/samplestream.pkcs12"),
        "sample"
);

KeyRepository agency1KeyRepository = new KeyStoreBasedKeyRepository(
        new File("/tmp/samplestream1.pkcs12"),
        "sample"
);

KeyRepository agency2KeyRepository = new KeyStoreBasedKeyRepository(
        new File("/tmp/samplestream2.pkcs12"),
        "sample"
);

KeyProvider fullKeyProvider = new RepositoryBasedKeyProvider(fullKeyRepository, new SampleKeyNameObfuscator());
KeyProvider agency1KeyProvider = new RepositoryBasedKeyProvider(agency1KeyRepository, new SampleKeyNameObfuscator());
KeyProvider agency2KeyProvider = new RepositoryBasedKeyProvider(agency2KeyRepository, new SampleKeyNameObfuscator());

// We create an encryption keyref for each record's key.
// Two records with the same record key have the same encryption key ref.
KeyReferenceExtractor keyReferenceExtractor = new RepositoryBasedKeyReferenceExtractor(new SampleKeyNameExtractor(), new SampleKeyNameObfuscator());


ExecutorService executorService = Executors.newFixedThreadPool(4);


SampleProducer sampleProducer = new SampleProducer(fullKeyProvider, keyReferenceExtractor);

executorService.submit(sampleProducer);

try {
    Thread.sleep(10000l);
}
catch (InterruptedException e) {
    System.exit(0);
}

SampleStream fullView = new SampleStream("full", fullKeyProvider, keyReferenceExtractor);
SampleStream agency1View = new SampleStream("agency1", agency1KeyProvider, keyReferenceExtractor);
SampleStream agency2View = new SampleStream("agency2", agency2KeyProvider, keyReferenceExtractor);

executorService.submit(fullView);
executorService.submit(agency1View);
executorService.submit(agency2View);
Producer side
SampleProducer.java
// The payload is encrypted using AES
AesGcmNoPaddingCryptoAlgorithm cryptoAlgorithm = new AesGcmNoPaddingCryptoAlgorithm();
Encryptor encryptor = new DefaultEncryptor(keyProvider, cryptoAlgorithm);

// Wrap base LongSerializer and StringSerializer with encrypted wrappers
CryptoSerializerPairFactory cryptoSerializerPairFactory = new CryptoSerializerPairFactory(encryptor,
        keyReferenceExtractor);
SerializerPair<Integer, String> serializerPair = cryptoSerializerPairFactory.build(new IntegerSerializer(), new StringSerializer());

Properties producerProperties = new Properties();
producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

Random random = new Random();

try (KafkaProducer<Integer, String> producer =
             new KafkaProducer<>(producerProperties, serializerPair.getKeySerializer(), serializerPair.getValueSerializer())) {

    for (long i = 0L; i < Long.MAX_VALUE; i++) {
        long accountId = i % 10l;
        producer.send(new ProducerRecord<>("operations", (int) accountId, "" + (random.nextInt(1000) - 500)));

        if (i % 100 == 99) {
            try {
                Thread.sleep(10000L);
            }
            catch (InterruptedException e) {
                return;
            }
        }

    }
}
Stream side
SampleStream.java
AesGcmNoPaddingCryptoAlgorithm cryptoAlgorithm = new AesGcmNoPaddingCryptoAlgorithm();
Decryptor decryptor = new DefaultDecryptor(keyProvider, cryptoAlgorithm);
Encryptor encryptor = new DefaultEncryptor(keyProvider, cryptoAlgorithm);
CryptoSerdeFactory cryptoSerdeFactory = new CryptoSerdeFactory(encryptor, decryptor, keyReferenceExtractor);

KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(name + "_balance");


Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, name);
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());


StreamsBuilder streamsBuilder = new StreamsBuilder();

SerdesPair<Integer, String> serdesPair = cryptoSerdeFactory.buildSerdesPair(Serdes.Integer(), Serdes.String());

streamsBuilder.stream("operations", serdesPair.toConsumed())
        .filter((i, s) -> s != null) // messages that were not decrypted (because key not in repository) are null
        .groupByKey()
        .reduce((s1, s2) -> "" + (Integer.valueOf(s1) + Integer.valueOf(s2)),
                serdesPair.applyTo(Materialized.as(storeSupplier)));

KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
kafkaStreams.start();

and display

SampleStream.java
ReadOnlyKeyValueStore<Integer, String> store = kafkaStreams.store(
        name + "_balance", QueryableStoreTypes.<Integer, String>keyValueStore()
);

ArrayNode arrayNode = JsonNodeFactory.instance.arrayNode();

KeyValueIterator<Integer, String> iterator = store.all();
iterator.forEachRemaining(kv -> {
    ObjectNode node = JsonNodeFactory.instance.objectNode();
    node.put("account", kv.key);
    node.put("balance", kv.value);
    arrayNode.add(node);
});

ObjectNode node = JsonNodeFactory.instance.objectNode();
node.set(name, arrayNode);

System.out.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(node));