Skip to content

Conversation

autophagy
Copy link
Contributor

What is the purpose of the change

Currently, when trying to collect a TIMESTAMP_LTZ value in PyFlink, you get a pickling error:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromRow.
: org.apache.flink.api.python.shaded.net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments 

This is because there is no case to handle pickling LocalZonedTimestampType Instant values in the pickler, which just defaults to trying to pickle the Instant object which fails. This PR adds a clause to handle this (by creating a Timestamp value from the Instant, similar to how Timestamps are pickled), and adds a handler on the PyFlink side to convert this value to a datetime.

Brief change log

  • Adds case to handle LocalZonedTimestampType types in the pickling/unpickling logic during collection.

Verifying this change

I've added an extra case in the Table API environment type tests to include a test for TIMESTAMP_LTZ types insert/collection tests, which before this change fails with the above pickling error.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 26, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

} else {
return pickler.dumps(obj);
}
} else if (dataType instanceof LocalZonedTimestampType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of thoughts:

  • l can we add a test at the Java level
  • is it possible to test with different local zones to make sure this works as expected - as Instant is based on seconds since epoche.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Sep 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants