Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3916: [Rust] Add TimestampNanos types #2611

Merged
merged 1 commit into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lang/rust/avro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,10 @@ fn main() -> Result<(), Error> {
record.put("time_micros", Value::TimeMicros(3));
record.put("timestamp_millis", Value::TimestampMillis(4));
record.put("timestamp_micros", Value::TimestampMicros(5));
record.put("timestamp_nanos", Value::TimestampNanos(6));
record.put("local_timestamp_millis", Value::LocalTimestampMillis(4));
record.put("local_timestamp_micros", Value::LocalTimestampMicros(5));
record.put("local_timestamp_nanos", Value::LocalTimestampMicros(6));
record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8)));

writer.append(record)?;
Expand Down
54 changes: 50 additions & 4 deletions lang/rust/avro/src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,10 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
| Value::TimeMicros(i)
| Value::TimestampMillis(i)
| Value::TimestampMicros(i)
| Value::TimestampNanos(i)
| Value::LocalTimestampMillis(i)
| Value::LocalTimestampMicros(i) => visitor.visit_i64(*i),
| Value::LocalTimestampMicros(i)
| Value::LocalTimestampNanos(i) => visitor.visit_i64(*i),
&Value::Float(f) => visitor.visit_f32(f),
&Value::Double(d) => visitor.visit_f64(d),
Value::Union(_i, u) => match **u {
Expand All @@ -257,8 +259,10 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
| Value::TimeMicros(i)
| Value::TimestampMillis(i)
| Value::TimestampMicros(i)
| Value::TimestampNanos(i)
| Value::LocalTimestampMillis(i)
| Value::LocalTimestampMicros(i) => visitor.visit_i64(i),
| Value::LocalTimestampMicros(i)
| Value::LocalTimestampNanos(i) => visitor.visit_i64(i),
Value::Float(f) => visitor.visit_f32(f),
Value::Double(d) => visitor.visit_f64(d),
Value::Record(ref fields) => visitor.visit_map(RecordDeserializer::new(fields)),
Expand Down Expand Up @@ -1080,7 +1084,16 @@ mod tests {
fn test_timestamp_micros() -> TestResult {
let raw_value = 1;
let value = Value::TimestampMicros(raw_value);
let result = crate::from_value::<i64>(&value)?;
let result = from_value::<i64>(&value)?;
assert_eq!(result, raw_value);
Ok(())
}

#[test]
fn test_avro_3916_timestamp_nanos() -> TestResult {
let raw_value = 1;
let value = Value::TimestampNanos(raw_value);
let result = from_value::<i64>(&value)?;
assert_eq!(result, raw_value);
Ok(())
}
Expand All @@ -1089,7 +1102,7 @@ mod tests {
fn test_avro_3853_local_timestamp_millis() -> TestResult {
let raw_value = 1;
let value = Value::LocalTimestampMillis(raw_value);
let result = crate::from_value::<i64>(&value)?;
let result = from_value::<i64>(&value)?;
assert_eq!(result, raw_value);
Ok(())
}
Expand All @@ -1103,6 +1116,15 @@ mod tests {
Ok(())
}

#[test]
fn test_avro_3916_local_timestamp_nanos() -> TestResult {
let raw_value = 1;
let value = Value::LocalTimestampNanos(raw_value);
let result = crate::from_value::<i64>(&value)?;
assert_eq!(result, raw_value);
Ok(())
}

#[test]
fn test_from_value_uuid_str() -> TestResult {
let raw_value = "9ec535ff-3e2a-45bd-91d3-0a01321b5a49";
Expand Down Expand Up @@ -1146,8 +1168,10 @@ mod tests {
("time_micros_a".to_string(), 123),
("timestamp_millis_b".to_string(), 234),
("timestamp_micros_c".to_string(), 345),
("timestamp_nanos_d".to_string(), 345_001),
("local_timestamp_millis_d".to_string(), 678),
("local_timestamp_micros_e".to_string(), 789),
("local_timestamp_nanos_f".to_string(), 345_002),
]
.iter()
.cloned()
Expand All @@ -1164,12 +1188,18 @@ mod tests {
key if key.starts_with("timestamp_micros_") => {
(k.clone(), Value::TimestampMicros(*v))
}
key if key.starts_with("timestamp_nanos_") => {
(k.clone(), Value::TimestampNanos(*v))
}
key if key.starts_with("local_timestamp_millis_") => {
(k.clone(), Value::LocalTimestampMillis(*v))
}
key if key.starts_with("local_timestamp_micros_") => {
(k.clone(), Value::LocalTimestampMicros(*v))
}
key if key.starts_with("local_timestamp_nanos_") => {
(k.clone(), Value::LocalTimestampNanos(*v))
}
_ => unreachable!("unexpected key: {:?}", k),
})
.collect();
Expand Down Expand Up @@ -1219,6 +1249,14 @@ mod tests {
"a_non_existing_timestamp_micros".to_string(),
Value::Union(0, Box::new(Value::TimestampMicros(-345))),
),
(
"a_timestamp_nanos".to_string(),
Value::Union(0, Box::new(Value::TimestampNanos(345))),
),
(
"a_non_existing_timestamp_nanos".to_string(),
Value::Union(0, Box::new(Value::TimestampNanos(-345))),
),
(
"a_local_timestamp_millis".to_string(),
Value::Union(0, Box::new(Value::LocalTimestampMillis(678))),
Expand All @@ -1235,6 +1273,14 @@ mod tests {
"a_non_existing_local_timestamp_micros".to_string(),
Value::Union(0, Box::new(Value::LocalTimestampMicros(-789))),
),
(
"a_local_timestamp_nanos".to_string(),
Value::Union(0, Box::new(Value::LocalTimestampNanos(789))),
),
(
"a_non_existing_local_timestamp_nanos".to_string(),
Value::Union(0, Box::new(Value::LocalTimestampNanos(-789))),
),
(
"a_record".to_string(),
Value::Union(
Expand Down
2 changes: 2 additions & 0 deletions lang/rust/avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis),
Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros),
Schema::TimestampNanos => zag_i64(reader).map(Value::TimestampNanos),
Schema::LocalTimestampMillis => zag_i64(reader).map(Value::LocalTimestampMillis),
Schema::LocalTimestampMicros => zag_i64(reader).map(Value::LocalTimestampMicros),
Schema::LocalTimestampNanos => zag_i64(reader).map(Value::LocalTimestampNanos),
Schema::Duration => {
let mut buf = [0u8; 12];
reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
Expand Down
2 changes: 2 additions & 0 deletions lang/rust/avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(
Value::Long(i)
| Value::TimestampMillis(i)
| Value::TimestampMicros(i)
| Value::TimestampNanos(i)
| Value::LocalTimestampMillis(i)
| Value::LocalTimestampMicros(i)
| Value::LocalTimestampNanos(i)
| Value::TimeMicros(i) => encode_long(*i, buffer),
Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
Expand Down
6 changes: 6 additions & 0 deletions lang/rust/avro/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,18 @@ pub enum Error {
#[error("TimestampMicros expected, got {0:?}")]
GetTimestampMicros(ValueKind),

#[error("TimestampNanos expected, got {0:?}")]
GetTimestampNanos(ValueKind),

#[error("LocalTimestampMillis expected, got {0:?}")]
GetLocalTimestampMillis(ValueKind),

#[error("LocalTimestampMicros expected, got {0:?}")]
GetLocalTimestampMicros(ValueKind),

#[error("LocalTimestampNanos expected, got {0:?}")]
GetLocalTimestampNanos(ValueKind),

#[error("Null expected, got {0:?}")]
GetNull(ValueKind),

Expand Down
2 changes: 2 additions & 0 deletions lang/rust/avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,10 @@
//! record.put("time_micros", Value::TimeMicros(3));
//! record.put("timestamp_millis", Value::TimestampMillis(4));
//! record.put("timestamp_micros", Value::TimestampMicros(5));
//! record.put("timestamp_nanos", Value::TimestampNanos(6));
//! record.put("local_timestamp_millis", Value::LocalTimestampMillis(4));
//! record.put("local_timestamp_micros", Value::LocalTimestampMicros(5));
//! record.put("local_timestamp_nanos", Value::LocalTimestampMicros(6));
//! record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8)));
//!
//! writer.append(record)?;
Expand Down
18 changes: 18 additions & 0 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,14 @@ pub enum Schema {
TimestampMillis,
/// An instant in time represented as the number of microseconds after the UNIX epoch.
TimestampMicros,
/// An instant in time represented as the number of nanoseconds after the UNIX epoch.
TimestampNanos,
/// An instant in localtime represented as the number of milliseconds after the UNIX epoch.
LocalTimestampMillis,
/// An instant in local time represented as the number of microseconds after the UNIX epoch.
LocalTimestampMicros,
/// An instant in local time represented as the number of nanoseconds after the UNIX epoch.
LocalTimestampNanos,
/// An amount of time defined by a number of months, days and milliseconds.
Duration,
/// A reference to another schema.
Expand Down Expand Up @@ -199,8 +203,10 @@ impl From<&types::Value> for SchemaKind {
Value::TimeMicros(_) => Self::TimeMicros,
Value::TimestampMillis(_) => Self::TimestampMillis,
Value::TimestampMicros(_) => Self::TimestampMicros,
Value::TimestampNanos(_) => Self::TimestampNanos,
Value::LocalTimestampMillis(_) => Self::LocalTimestampMillis,
Value::LocalTimestampMicros(_) => Self::LocalTimestampMicros,
Value::LocalTimestampNanos(_) => Self::LocalTimestampNanos,
Value::Duration { .. } => Self::Duration,
}
}
Expand Down Expand Up @@ -1942,6 +1948,12 @@ impl Serialize for Schema {
map.serialize_entry("logicalType", "timestamp-micros")?;
map.end()
}
Schema::TimestampNanos => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "long")?;
map.serialize_entry("logicalType", "timestamp-nanos")?;
map.end()
}
Schema::LocalTimestampMillis => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "long")?;
Expand All @@ -1954,6 +1966,12 @@ impl Serialize for Schema {
map.serialize_entry("logicalType", "local-timestamp-micros")?;
map.end()
}
Schema::LocalTimestampNanos => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "long")?;
map.serialize_entry("logicalType", "local-timestamp-nanos")?;
map.end()
}
Schema::Duration => {
let mut map = serializer.serialize_map(None)?;

Expand Down
54 changes: 54 additions & 0 deletions lang/rust/avro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,14 @@ pub enum Value {
TimestampMillis(i64),
/// Timestamp in microseconds.
TimestampMicros(i64),
/// Timestamp in nanoseconds.
TimestampNanos(i64),
/// Local timestamp in milliseconds.
LocalTimestampMillis(i64),
/// Local timestamp in microseconds.
LocalTimestampMicros(i64),
/// Local timestamp in nanoseconds.
LocalTimestampNanos(i64),
/// Avro Duration. An amount of time defined by months, days and milliseconds.
Duration(Duration),
/// Universally unique identifier.
Expand Down Expand Up @@ -340,8 +344,10 @@ impl TryFrom<Value> for JsonValue {
Value::TimeMicros(t) => Ok(Self::Number(t.into())),
Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
Value::TimestampMicros(t) => Ok(Self::Number(t.into())),
Value::TimestampNanos(t) => Ok(Self::Number(t.into())),
Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())),
Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())),
Value::LocalTimestampNanos(t) => Ok(Self::Number(t.into())),
Value::Duration(d) => Ok(Self::Array(
<[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(),
)),
Expand Down Expand Up @@ -428,8 +434,10 @@ impl Value {
(&Value::Long(_), &Schema::LocalTimestampMicros) => None,
(&Value::TimestampMicros(_), &Schema::TimestampMicros) => None,
(&Value::TimestampMillis(_), &Schema::TimestampMillis) => None,
(&Value::TimestampNanos(_), &Schema::TimestampNanos) => None,
(&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros) => None,
(&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis) => None,
(&Value::LocalTimestampNanos(_), &Schema::LocalTimestampNanos) => None,
(&Value::TimeMicros(_), &Schema::TimeMicros) => None,
(&Value::TimeMillis(_), &Schema::TimeMillis) => None,
(&Value::Date(_), &Schema::Date) => None,
Expand Down Expand Up @@ -689,8 +697,10 @@ impl Value {
Schema::TimeMicros => self.resolve_time_micros(),
Schema::TimestampMillis => self.resolve_timestamp_millis(),
Schema::TimestampMicros => self.resolve_timestamp_micros(),
Schema::TimestampNanos => self.resolve_timestamp_nanos(),
Schema::LocalTimestampMillis => self.resolve_local_timestamp_millis(),
Schema::LocalTimestampMicros => self.resolve_local_timestamp_micros(),
Schema::LocalTimestampNanos => self.resolve_local_timestamp_nanos(),
Schema::Duration => self.resolve_duration(),
Schema::Uuid => self.resolve_uuid(),
}
Expand Down Expand Up @@ -814,6 +824,14 @@ impl Value {
}
}

fn resolve_timestamp_nanos(self) -> Result<Self, Error> {
match self {
Value::TimestampNanos(ts) | Value::Long(ts) => Ok(Value::TimestampNanos(ts)),
Value::Int(ts) => Ok(Value::TimestampNanos(i64::from(ts))),
other => Err(Error::GetTimestampNanos(other.into())),
}
}

fn resolve_local_timestamp_millis(self) -> Result<Self, Error> {
match self {
Value::LocalTimestampMillis(ts) | Value::Long(ts) => {
Expand All @@ -834,6 +852,14 @@ impl Value {
}
}

fn resolve_local_timestamp_nanos(self) -> Result<Self, Error> {
match self {
Value::LocalTimestampNanos(ts) | Value::Long(ts) => Ok(Value::LocalTimestampNanos(ts)),
Value::Int(ts) => Ok(Value::LocalTimestampNanos(i64::from(ts))),
other => Err(Error::GetLocalTimestampNanos(other.into())),
}
}

fn resolve_null(self) -> Result<Self, Error> {
match self {
Value::Null => Ok(Value::Null),
Expand Down Expand Up @@ -1738,6 +1764,16 @@ Field with name '"b"' is not a member of the map items"#,
assert!(value.resolve(&Schema::TimestampMicros).is_err());
}

#[test]
fn test_avro_3914_resolve_timestamp_nanos() {
let value = Value::TimestampNanos(10);
assert!(value.clone().resolve(&Schema::TimestampNanos).is_ok());
assert!(value.resolve(&Schema::Int).is_err());

let value = Value::Double(10.0);
assert!(value.resolve(&Schema::TimestampNanos).is_err());
}

#[test]
fn test_avro_3853_resolve_timestamp_millis() {
let value = Value::LocalTimestampMillis(10);
Expand All @@ -1758,6 +1794,16 @@ Field with name '"b"' is not a member of the map items"#,
assert!(value.resolve(&Schema::LocalTimestampMicros).is_err());
}

#[test]
fn test_avro_3916_resolve_timestamp_nanos() {
let value = Value::LocalTimestampNanos(10);
assert!(value.clone().resolve(&Schema::LocalTimestampNanos).is_ok());
assert!(value.resolve(&Schema::Int).is_err());

let value = Value::Double(10.0);
assert!(value.resolve(&Schema::LocalTimestampNanos).is_err());
}

#[test]
fn resolve_duration() {
let value = Value::Duration(Duration::new(
Expand Down Expand Up @@ -1963,6 +2009,10 @@ Field with name '"b"' is not a member of the map items"#,
JsonValue::try_from(Value::TimestampMicros(1))?,
JsonValue::Number(1.into())
);
assert_eq!(
JsonValue::try_from(Value::TimestampNanos(1))?,
JsonValue::Number(1.into())
);
assert_eq!(
JsonValue::try_from(Value::LocalTimestampMillis(1))?,
JsonValue::Number(1.into())
Expand All @@ -1971,6 +2021,10 @@ Field with name '"b"' is not a member of the map items"#,
JsonValue::try_from(Value::LocalTimestampMicros(1))?,
JsonValue::Number(1.into())
);
assert_eq!(
JsonValue::try_from(Value::LocalTimestampNanos(1))?,
JsonValue::Number(1.into())
);
assert_eq!(
JsonValue::try_from(Value::Duration(
[1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8].into()
Expand Down