Hello,
I created the simplest pyflink application to reproduce a memory leak we experience: It reads a Kinesis Stream, performs an aggregate then sinks to another Kinesis Stream. Over one week of running this sample, the "Container Memory" metric goes from 60% to 70% and the "Last Checkpoint size" keeps increasing from 128k to 800k. The full application behaves similarly and restarts the containers after 24 to 48 hours. Any clue if it something I can do on our side or is it a bug on AWS side? I'm running flink 1.18.1
Edit: I removed the aggregation stage so the application just reads from the input stream and writes to the output stream. There's no memory leak in this case so I have to deduce the leak is related to the aggregation stage. I cannot see anything in my example application that could explain this behavior.
Thanks for any help
This is the sample application:
import sys
import json
from pyflink.datastream import StreamExecutionEnvironment, DataStream, DataStreamSink
from pyflink.datastream.connectors.kinesis import (
KinesisStreamsSink,
PartitionKeyGenerator,
FlinkKinesisConsumer,
)
from pyflink.datastream.window import (
PurgingTrigger,
ProcessingTimeTrigger,
TumblingProcessingTimeWindows,
)
from pyflink.datastream.functions import AggregateFunction
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.common.types import Row
from pyflink.common.time import Time
print(f"Python_version is {sys.version}, {sys.version_info}")
def decode_signal(signal: str) -> Row:
sg = json.loads(signal)
result = {
"watcher_uuid": sg["watcher"]["uuid"],
"pokemon_value": sg["pokemon"]["value"],
"alert_received_at": sg["alert_received_at"],
"scenario": sg["scenario"],
}
return Row(**result)
def create_consumer(env: StreamExecutionEnvironment) -> DataStream:
consumer_config = {
"aws.region": "eu-west-1",
"flink.stream.initpos": "LATEST",
"flink.stream.recordpublisher": "EFO",
"flink.stream.efo.consumername": "test-flink",
}
enriched_ds = env.add_source(
FlinkKinesisConsumer(
"enriched_signal_stream", SimpleStringSchema(), consumer_config
)
)
enriched_ds = enriched_ds.map(
lambda x: decode_signal(x),
output_type=Types.ROW_NAMED(
field_names=[
"watcher_uuid",
"pokemon_value",
"alert_received_at",
"scenario",
],
field_types=[
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
],
),
)
return enriched_ds
# aggregate signals for the same watcher and pokemon
class AggregateSignalsFunction(AggregateFunction):
def create_accumulator(self) -> dict:
return Row(total_signals=0)
def add(self, signal: dict, accumulator: dict) -> dict:
accumulator["total_signals"] += 1
return accumulator
def get_result(self, accumulator: dict) -> dict:
return accumulator
def merge(self, acc_a, acc_b) -> dict:
acc_a["total_signals"] += acc_b["total_signals"]
return acc_a
def agregate_signals(consumer: DataStream) -> DataStream:
aggregated_pokemon_watcher_ds = (
consumer.key_by(
lambda row: row["pokemon_value"] + "_" + row["watcher_uuid"],
key_type=Types.STRING(),
)
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.trigger(PurgingTrigger(ProcessingTimeTrigger()))
.aggregate(
AggregateSignalsFunction(),
output_type=Types.ROW_NAMED(
field_names=["total_signals"],
field_types=[Types.INT()],
),
accumulator_type=Types.ROW_NAMED(
field_names=["total_signals"],
field_types=[Types.INT()],
),
)
.name("aggregate_pokemon_watcher_signals_by_5min")
.uid("aggregate_pokemon_watcher_signals_by_5min")
)
return aggregated_pokemon_watcher_ds
def create_sink() -> DataStreamSink:
sink_properties = {
# Required
"aws.region": "eu-west-1",
}
kds_sink = (
KinesisStreamsSink.builder()
.set_kinesis_client_properties(sink_properties)
.set_serialization_schema(SimpleStringSchema())
.set_partition_key_generator(PartitionKeyGenerator.fixed())
.set_stream_name("test_flink_sink")
.set_fail_on_error(False)
.set_max_batch_size(500)
.set_max_in_flight_requests(50)
.set_max_buffered_requests(10000)
.set_max_batch_size_in_bytes(5 * 1024 * 1024)
.set_max_time_in_buffer_ms(5000)
.set_max_record_size_in_bytes(1 * 1024 * 1024)
.build()
)
return kds_sink
def main():
env = StreamExecutionEnvironment.get_execution_environment()
# create streaming consumer from Kinesis Stream
consumer = create_consumer(env)
# agregate signals by pokemon watcher
agregated_stream = agregate_signals(consumer)
# create sink to Kinesis Stream
sink = create_sink()
# sink agregate to Kinesis Stream
agregated_stream.map(
lambda x: json.dumps(x.as_dict()), output_type=Types.STRING()
).sink_to(sink)
try:
env.execute_async("Analytics")
except Exception as e:
print(e)
raise e
if __name__ == "__main__":
main()
Thanks for your answer, however, I have the impression the code that I've shared here respects the principles you have listed and I hardly see how to make it even simpler. And it still leaks memory.