Der benutzerdefinierte Kafka State Store wird nicht aktualisiert - apache-kafka - Program QA

Der benutzerdefinierte Kafka State Store wird nicht aktualisiert

2020-06-29 apache-kafka apache-kafka-streams

Ich versuche, einen benutzerdefinierten Statusspeicher zu erstellen, in dem der Schlüssel zur Wertzuordnung gespeichert ist.

Stream & Store-Konfiguration

 final Serde <HashMap <String ,? >> userSessionsSerde = Serdes.serdeFrom (neuer HashMapSerializer (), neuer HashMapDeserializer ()); 
 StoreBuilder sessionStoreBuilder = Stores.keyValueStoreBuilder (Stores.persistentKeyValueStore (storeName), 
 Serdes.String (), 
 userSessionsSerde); 
 builder.addStateStore (sessionStoreBuilder); 

 builder.stream ("Verbindungsereignisse", Consumed.with (Serdes.String (), wsSerde)) 
 .transform (wsEventTransformerSupplier, storeName) 
 .to ("Statusänderungen", Produced.with (Serdes.String (), Serdes.String ())); 

 KafkaStreams-Streams = neue KafkaStreams (builder.build (), Eigenschaften); 
 streams.start (); 

Transformator

public class WSEventProcessor implements Transformer<String, ConnectionEvent, KeyValue<String, String>> {

    private String storeName = "user-sessions";
    private KeyValueStore<String, Map<String, ConnectionEvent>> stateStore;

    final Serde<HashMap<String, ?>> userSessionsSerde = Serdes.serdeFrom(new HashMapSerializer(), new HashMapDeserializer());

    @SuppressWarnings("unchecked")
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        stateStore = (KeyValueStore<String, Map<String, ConnectionEvent>>) context.getStateStore(storeName);
    }

    @Override
    public void close() {
    }

    @Override
    public KeyValue<String, String> transform(String key, ConnectionEvent value) {
        boolean sendUpdate = false;
    
        //Send null if there are no updates to be sent to downstream processors
        if(value.getState() == WebSocketConnection.CONNECTED) {
            if(stateStore.get(key) == null) {
                stateStore.put(key, new HashMap<>());
                sendUpdate = true;
            }
            stateStore.get(key).put(value.getSessionId(), value);
            return sendUpdate ? KeyValue.pair(key, "Online") : null;
        }
        else {
            stateStore.get(key).remove(value.getSessionId());
            int size = stateStore.get(key).size();
            return stateStore.get(key).isEmpty() ? KeyValue.pair(key, "Offline") : null;
        }
    }

}  

Der Statusspeicher verfügt unabhängig von verbundenen und nicht verbundenen Ereignissen immer über eine Größenzuordnung von 0 für jeden Schlüssel. Mache ich etwas falsch?

Answers

Das Wertobjekt, das Sie in stateStore.put(key, value) und stateStore.get(key) gespeichert stateStore.put(key, value) sind verschiedene Objekte (wie es serialisiert und dann deserialisiert wurde).

Ihr Problem hängt mit der Änderung des vom Statusspeicher zurückgegebenen Objekts zusammen: stateStore.get(key).put(value.getSessionId(), value) und stateStore.get(key).remove(value.getSessionId()) . Wenn Sie das Objekt stateStore.get(key) aktualisieren, stateStore.get(key) es tatsächlich nicht im stateStore.get(key) , sondern ändert nur dieses Objekt.

stateStore.put(key, calculated_value) Ihr Problem zu beheben, berechnen Sie den erforderlichen Wert (in Ihrem Fall HashMap ) und wenden stateStore.put(key, calculated_value) erst danach stateStore.put(key, calculated_value) . Wenn Sie den Schlüsselwert aus dem stateStore.put(key, null) entfernen müssen, verwenden Sie stateStore.put(key, null) . Ihre transform sollte ungefähr so ​​aussehen:

public KeyValue<String, String> transform(String key, ConnectionEvent value) {
    Map<String, Object> valueFromStateStore = stateStore.get(key);
    Map<String, Object> valueToUpdate = ofNullable(valueFromStateStore).orElseGet(Collections::emptyMap);
    KeyValue<String, String> resultKeyValue = null;

    //Send null if there are no updates to be sent to downstream processors
    if(value.getState() == WebSocketConnection.CONNECTED) {
        if(valueToUpdate.isEmpty()) {
            resultKeyValue = KeyValue.pair(key, "Online");
        }
        valueToUpdate.put(value.getSessionId(), value);
    }
    else {
        valueToUpdate.remove(value.getSessionId());
        if (valueToUpdate.isEmpty()) {
            resultKeyValue = KeyValue.pair(key, "Offline");
        }
    }

    stateStore.put(key, valueToUpdate);
    return resultKeyValue;
}

Related