简单分析Flink算子返回NULL导致的异常

知乎 · · 152 次点击 · · 开始浏览    

假设我们作业中有这样一段逻辑stream.map(xxx).filter(_ != null).xxx,并且map算子有可能返回NULL,你觉得作业运行会抛NPE吗?明明下游有filter not null,不应该出错才对?但实际情况是运行中有可能抛出异常。

1.异常信息

可能抛出的异常信息大致如下:

// 1. 如果map算子返回值类型为Java Tuple
Caused by: java.lang.NullPointerException
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:111)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    ...
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
// 2. 如果map算子返回值类型为Scala Case Class或Scala Tuple
Caused by: java.lang.NullPointerException
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    ...
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
// 3. 如果map算子返回值类型为Scala Option
Caused by: scala.MatchError: null
    at org.apache.flink.api.scala.typeutils.OptionSerializer.copy(OptionSerializer.scala:50)
    at org.apache.flink.api.scala.typeutils.OptionSerializer.copy(OptionSerializer.scala:29)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    ...
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

2.问题代码

可以看出,上述异常都是StreamMap.processElement方法抛出的,这个对应了我们代码中的map操作,具体异常信息和map的返回值类型有关;

以返回值为Java/Scala Tuple为例示意一下问题代码:

// java版本  
text.map(e -> {
  if(xxxx) {
    return Tuple2.of("hello", 1);
  } else {
    // 在某些条件下返回NULL
    return null;
  }
}).returns(new TypeHint<Tuple2<String, Integer>>(){})
.filter(Objects::nonNull)
.print()

// scala版本
text.map(e => {
  if(xxxx) {
    ("hello", 1)
  } else {
    // 在某些条件下返回NULL
    null
  }
})
.filter(_ != null)
.print()

2. 原因分析

如图(这里为了演示故意设置了disableOperatorChaining,一般情况这两个算子会串起来),如果“Map”想要传一个NULL值给下游的“Filter”,那它必须传一个具体的值给下游来表明是NULL(如果什么都不传的话下游根本不知道有数据);那么应该传什么值来表示NULL呢?不同的数据类型实现方法不同,但其本质思路是一样的,就是通过一个标志位来表示是不是NULL,类似 <nullFlag><value>

以String为例,Flink中StringSerializer会先写一个int标志位来表示String的长度;如果string == null,则标志位为0;否则的话标志位为string.length() + 1;这样的话NULL就是 <0>,空字符串就是 <1>,其他字符串是 <string.length + 1><stringContent>(当然,这里的标志位还担任着记录字符串长度的职责);

// 核心代码逻辑
public static final void writeString(CharSequence cs, DataOutput out) throws IOException {
  if (cs != null) {
   // 如果string不为null,则标志位是string的长度加1;
   // the length we write is offset by one, because a length of zero indicates a null value
   int lenToWrite = cs.length()+1;
   // 可以看出最大能够序列化长度为Integer.MAX_VALUE - 1的字符串;
  if (lenToWrite < 0) {
    throw new IllegalArgumentException("CharSequence is too long.");
  }
  ...
  } else {
    // 如果string为null,则标志位写0;
    out.write(0);
  }
}

对于大部分数据类型,使用标志位的好处是可以支持传递NULL值;缺点也很明显,就是浪费了带宽,多了标志位信息的传递。

那为什么还会出现上面的异常呢?那是因为并不是所有类型都支持NULL的,目前我所知的不支持NULL的类型包括Scala Option、Java/Scala Tuple和Scala Case Class;至于为什么不支持NULL,根据我搜到的解释,原因如下:

  • Scala Option不支持NULL,是因为Option就是设计来避免NULL的;如果Option类型返回NULL,本身就是个BUG(但奇怪的是Java Optional类型就可以返回NULL,不过Java Optional是通过KryoSerializer序列化的;Scala Option是通过CaseClassSerializer序列化的);
Stephan Ewen: Using null for an Option value is by itself a bug (after all, Option is explicitly designed to avoid null)
  • Java/Scala Tuple和Scala Case Class不支持NULL的原因不确定,不过它们是支持把变量设置为NULL的,具体方式见下一章节;

3. 解决方法

  • 对于Option类型,用None替代NULL;
  • 对于Tuple或Scala Case Class,可以设置变量为NULL;比如new Tuple2<String, Integer>(null, null)
  • 用flatMap替换map:
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  @Override
  public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
    if(xxx) {// 如果生成的数据不为NULL;      
      out.collect(Tuple2.of("hello", 1));
    }
  }
}).print()

4. 参考

本文来自:知乎

感谢作者:知乎

查看原文:简单分析Flink算子返回NULL导致的异常

152 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传