java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Lucene词向量索引文件构建

Lucene词向量索引文件构建源码解析

作者:沧叔解码

这篇文章主要为大家介绍了Lucene词向量索引文件构建源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

背景

词向量存储的信息内容其实和倒排(Posting)是一样的,也是每个term所出现的文档列表以及在文档中的位置信息,区别在于存储结构的不同:

Posting的存储结构是:field -> term -> doc -> freq/pos/offset

也就是说Posting是从字段定位到term,再定位到文档,获取位置信息。

TermVector的存储结构是:doc -> field -> term -> freq/pos/offset

TermVector是从文档定位到字段,再定位term,获取位置信息。

从上面的介绍中,我们可以看出一些基本的规律:

从上面的规律可以看出,检索过程确实使用的Posting,毕竟这是真的倒排索引。TermVector适用于从特定文档中获取某些字段中term的位置信息,典型应用就是高亮:获取特定的文档中相关term的位置。

特殊说明

词向量构建涉及到几个类之间的关系

词向量的构建中主要有3个类:

term的存储

在词向量中的term是按序存储的,但是每个Field中的所有的term,除了完整的存储第一个term之外,其他term都是存储除了跟前一个term的最长公共前缀的剩余的后缀部分。

chunk的生成条件

词向量是使用chunk来划分数据的,生成chunk的条件满足以下二者其一即可:

词向量的索引文件

词向量最终构建生成3个索引文件:

源码解读

注意:本文源码基于lucene-9.1.0版本

工具类

FieldsIndexWriter

FieldsIndexWriter这个工具类,以后介绍正排索引文件也会用到,它主要是用来生成所有chunk中的起始doc编号和chunk在数据文件中的位置信息,方便读取的时候快速定位doc所属的chunk,并从文件中读取chunk。总体逻辑是先使用临时文件存储前面说的两个信息,在真正生成索引文件的时候,使用DirectMonotonicWriter进行压缩存储,减小索引文件大小。

public final class FieldsIndexWriter implements Closeable {
  static final int VERSION_START = 0;
  static final int VERSION_CURRENT = 0;
  private final Directory dir;
  // 下面这些信息都是用来创建真正索引文件名的  
  private final String name;
  private final String suffix;
  private final String extension;
  private final String codecName;
  private final byte[] id;
  // DirectMonotonicWriter 所需的参数
  private final int blockShift;
  private final IOContext ioContext;
  // 临时文件,用来保存所有chunk中的文档数  
  private IndexOutput docsOut;
  // 临时文件,用来保存所有chunk在数据文件中的起始位置  
  private IndexOutput filePointersOut;
  // doc总数  
  private int totalDocs;
  // chunk总数  
  private int totalChunks;
  // 前一个chunk在tvd索引文件中的起始位置  
  private long previousFP;
  // 添加一个新的index,index就是用来定位doc属于哪个chunk,以及chunk在数据文件中的起始位置。
  // numDocs是chunk中的文档总数,后面真正序列化到正式的索引文件会通过换算,得到的是每个chunk的起始docID。  
  void writeIndex(int numDocs, long startPointer) throws IOException {
    assert startPointer >= previousFP;
    docsOut.writeVInt(numDocs);
    filePointersOut.writeVLong(startPointer - previousFP);
    previousFP = startPointer;
    totalDocs += numDocs;
    totalChunks++;
  }
  // metaOut是元信息索引文件  
  void finish(int numDocs, long maxPointer, IndexOutput metaOut) throws IOException {
    if (numDocs != totalDocs) {
      throw new IllegalStateException("Expected " + numDocs + " docs, but got " + totalDocs);
    }
    // 完成临时文件的写入  
    CodecUtil.writeFooter(docsOut);
    CodecUtil.writeFooter(filePointersOut);
    IOUtils.close(docsOut, filePointersOut);
    // 创建真正的索引文件  
    try (IndexOutput dataOut =
        dir.createOutput(IndexFileNames.segmentFileName(name, suffix, extension), ioContext)) {
      CodecUtil.writeIndexHeader(dataOut, codecName + "Idx", VERSION_CURRENT, id, suffix);
      // chunk中的doc总数
      metaOut.writeInt(numDocs);
      // 后面使用DirectMonotonicWriter压缩的时候需要的参数 
      metaOut.writeInt(blockShift);
      // 使用DirectMonotonicWriter写入的数据总数,为什么加1,看下面的具体写入逻辑。
      metaOut.writeInt(totalChunks + 1);
      // chunk索引文件中ChunkStartDocIDs的起始位置  
      metaOut.writeLong(dataOut.getFilePointer());
      try (ChecksumIndexInput docsIn =
          dir.openChecksumInput(docsOut.getName(), IOContext.READONCE)) {
        CodecUtil.checkHeader(docsIn, codecName + "Docs", VERSION_CURRENT, VERSION_CURRENT);
        Throwable priorE = null;
        try {
          // 压缩存储所有chunk的起始doc编号  
          final DirectMonotonicWriter docs =
              DirectMonotonicWriter.getInstance(metaOut, dataOut, totalChunks + 1, blockShift);
          long doc = 0;
          docs.add(doc); // 第一个chunk的起始doc编号肯定是0,这也是上面totalChunks + 1的原因之一。
          for (int i = 0; i < totalChunks; ++i) {
            doc += docsIn.readVInt();
            docs.add(doc);
          }
          docs.finish();
          if (doc != totalDocs) {
            throw new CorruptIndexException("Docs don't add up", docsIn);
          }
        } catch (Throwable e) {
          priorE = e;
        } finally {
          CodecUtil.checkFooter(docsIn, priorE);
        }
      }
      // 删除临时文件  
      dir.deleteFile(docsOut.getName());
      docsOut = null;
      // chunk索引文件中ChunkOffsets的起始位置
      metaOut.writeLong(dataOut.getFilePointer());
      try (ChecksumIndexInput filePointersIn =
          dir.openChecksumInput(filePointersOut.getName(), IOContext.READONCE)) {
        CodecUtil.checkHeader(
            filePointersIn, codecName + "FilePointers", VERSION_CURRENT, VERSION_CURRENT);
        Throwable priorE = null;
        try {
          // 压缩存储所有chunk在tvd文件中的起始位置  
          final DirectMonotonicWriter filePointers =
              DirectMonotonicWriter.getInstance(metaOut, dataOut, totalChunks + 1, blockShift);
          long fp = 0;
          for (int i = 0; i < totalChunks; ++i) {
            fp += filePointersIn.readVLong();
            filePointers.add(fp);
          }
          if (maxPointer < fp) {
            throw new CorruptIndexException("File pointers don't add up", filePointersIn);
          }
          filePointers.add(maxPointer); // 上面totalChunks + 1的原因之二。
          filePointers.finish();
        } catch (Throwable e) {
          priorE = e;
        } finally {
          CodecUtil.checkFooter(filePointersIn, priorE);
        }
      }
      dir.deleteFile(filePointersOut.getName());
      filePointersOut = null;
      metaOut.writeLong(dataOut.getFilePointer());
      metaOut.writeLong(maxPointer);
      CodecUtil.writeFooter(dataOut);
    }
  }
}

核心类

TermVectorsConsumer

class TermVectorsConsumer extends TermsHash {
  protected final Directory directory;
  protected final SegmentInfo info;
  protected final Codec codec;
  // 词向量持久化  
  TermVectorsWriter writer;
  /** Scratch term used by TermVectorsConsumerPerField.finishDocument. */
  final BytesRef flushTerm = new BytesRef();
  // 用来从TermVectorsConsumerPerField的bytepool中读取position信息
  final ByteSliceReader vectorSliceReaderPos = new ByteSliceReader();
  final ByteSliceReader vectorSliceReaderOff = new ByteSliceReader();
  private boolean hasVectors;
  private int numVectorFields;
  int lastDocID;
  // 每一个开启词向量构建的Field,都有一个TermVectorsConsumerPerField,当一个doc处理完之后会把所有的
  // TermVectorsConsumerPerField都序列化到Lucene90CompressingTermVectorsWriter中,然后重置等待处理下一个doc
  private TermVectorsConsumerPerField[] perFields = new TermVectorsConsumerPerField[1];
  Accountable accountable = Accountable.NULL_ACCOUNTABLE;
  TermVectorsConsumer(
      final IntBlockPool.Allocator intBlockAllocator,
      final ByteBlockPool.Allocator byteBlockAllocator,
      Directory directory,
      SegmentInfo info,
      Codec codec) {
    super(intBlockAllocator, byteBlockAllocator, Counter.newCounter(), null);
    this.directory = directory;
    this.info = info;
    this.codec = codec;
  }
  @Override
  void flush(
      Map<String, TermsHashPerField> fieldsToFlush,
      final SegmentWriteState state,
      Sorter.DocMap sortMap,
      NormsProducer norms)
      throws IOException {
    if (writer != null) {
      int numDocs = state.segmentInfo.maxDoc();
      try {
        // 把不存在词向量Filed的文档填充下  
        fill(numDocs);
        assert state.segmentInfo != null;
        // 触发词向量索引文件持久化落盘   
        writer.finish(numDocs);
      } finally {
        IOUtils.close(writer);
      }
    }
  }
  /**
   * Fills in no-term-vectors for all docs we haven't seen since the last doc that had term vectors.
   */
  void fill(int docID) throws IOException {
    while (lastDocID < docID) {
      writer.startDocument(0);
      writer.finishDocument();
      lastDocID++;
    }
  }
  // 创建  Lucene90CompressingTermVectorsWriter
  void initTermVectorsWriter() throws IOException {
    if (writer == null) {
      IOContext context = new IOContext(new FlushInfo(lastDocID, bytesUsed.get()));
      writer = codec.termVectorsFormat().vectorsWriter(directory, info, context);
      lastDocID = 0;
      accountable = writer;
    }
  }
  void setHasVectors() {
    hasVectors = true;
  }
  @Override
  void finishDocument(int docID) throws IOException {
    // 不存在词向量,直接返回
    if (!hasVectors) {
      return;
    }
    // 按字段名排序TermVectorsConsumerPerField
    ArrayUtil.introSort(perFields, 0, numVectorFields);
    initTermVectorsWriter();
    // 为了确保doc是连续,则把lastDocID到docID的空白填充下
    fill(docID);
    // 开始序列化
    writer.startDocument(numVectorFields);
    // 处理document中的所有Field,会把相关的词向量数据写到writer的缓存中
    for (int i = 0; i < numVectorFields; i++) {
      perFields[i].finishDocument();
    }
    // 结束一个document词向量的序列化  
    writer.finishDocument();
    assert lastDocID == docID : "lastDocID=" + lastDocID + " docID=" + docID;
    lastDocID++;
    super.reset();
    // 重置TermVectorsConsumerPerField 数组,等待处理下一个document  
    resetFields();
  }
  @Override
  public void abort() {
    try {
      super.abort();
    } finally {
      IOUtils.closeWhileHandlingException(writer);
      reset();
    }
  }
  void resetFields() {
    Arrays.fill(perFields, null); // don't hang onto stuff from previous doc
    numVectorFields = 0;
  }
  @Override
  public TermsHashPerField addField(FieldInvertState invertState, FieldInfo fieldInfo) {
    return new TermVectorsConsumerPerField(invertState, this, fieldInfo);
  }
  // 当结束一个Field的所有term的处理之后,就把TermVectorsConsumerPerField存在perFields中,
  // 等待把数据都序列化到Lucene90CompressingTermVectorsWriter中
  void addFieldToFlush(TermVectorsConsumerPerField fieldToFlush) {
    if (numVectorFields == perFields.length) {
      int newSize = ArrayUtil.oversize(numVectorFields + 1, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
      TermVectorsConsumerPerField[] newArray = new TermVectorsConsumerPerField[newSize];
      System.arraycopy(perFields, 0, newArray, 0, numVectorFields);
      perFields = newArray;
    }
    perFields[numVectorFields++] = fieldToFlush;
  }
  @Override
  void startDocument() {
    resetFields();
    numVectorFields = 0;
  }
}

Lucene90CompressingTermVectorsWriter

Lucene90CompressingTermVectorsWriter是生成词向量索引文件的核心类,主要负责按照特定的索引文件格式组织数据并持久化。

父类

TermVectorsWriter

Lucene90CompressingTermVectorsWriter的父类有大量的抽象方法,剩下一个模板方法addProx用来添加term在field中的所有的位置信息。

public abstract class TermVectorsWriter implements Closeable, Accountable {
  /** Sole constructor. (For invocation by subclass constructors, typically implicit.) */
  protected TermVectorsWriter() {}
  // 开始持久化一个doc的所有词向量
  public abstract void startDocument(int numVectorFields) throws IOException;
  // 结束一个文档持久化的时候调用
  public void finishDocument() throws IOException {};
  // 开始持久化一个Field
  public abstract void startField(
      FieldInfo info, int numTerms, boolean positions, boolean offsets, boolean payloads)
      throws IOException;
  // 结束一个Field的处理
  public void finishField() throws IOException {};
  // 开始持久化一个term的倒排信息
  public abstract void startTerm(BytesRef term, int freq) throws IOException;
  // 结束一个term的处理
  public void finishTerm() throws IOException {}
  // 构建term的一个position信息
  public abstract void addPosition(int position, int startOffset, int endOffset, BytesRef payload)
      throws IOException;
  // 所有文档处理完成,在close方法调用之前,调用finish,numDoc是处理的文档总数
  public abstract void finish(int numDocs) throws IOException;
  // 从positions和offsets中读取所有的位置信息,其实就是从TermVectorsConsumerPerField#bytePool中读取
  public void addProx(int numProx, DataInput positions, DataInput offsets) throws IOException {
    int position = 0;
    int lastOffset = 0;
    BytesRefBuilder payload = null;
    for (int i = 0; i < numProx; i++) {
      final int startOffset;
      final int endOffset;
      final BytesRef thisPayload;
      if (positions == null) {
        position = -1;
        thisPayload = null;
      } else {
        int code = positions.readVInt();
        position += code >>> 1;
        if ((code & 1) != 0) {
          final int payloadLength = positions.readVInt();
          if (payload == null) {
            payload = new BytesRefBuilder();
          }
          payload.grow(payloadLength);
          positions.readBytes(payload.bytes(), 0, payloadLength);
          payload.setLength(payloadLength);
          thisPayload = payload.get();
        } else {
          thisPayload = null;
        }
      }
      if (offsets == null) {
        startOffset = endOffset = -1;
      } else {
        startOffset = lastOffset + offsets.readVInt();
        endOffset = startOffset + offsets.readVInt();
        lastOffset = endOffset;
      }
      // 子类实现的真正的添加  
      addPosition(position, startOffset, endOffset, thisPayload);
    }
  }
    // 删除了一些跟merge相关的方法,以后介绍merge的时候再说
  @Override
  public abstract void close() throws IOException;
}

成员变量

  // sement的名称
  private final String segment;
  // 生成tvx索引文件
  private FieldsIndexWriter indexWriter;
  // metaStream:生成tvm索引文件
  // vectorStream:生成tvd索引文件
  private IndexOutput metaStream, vectorsStream;
  // 压缩算法
  private final CompressionMode compressionMode;
  private final Compressor compressor;
  // tvx中的chunk大小是2^chunkSize
  private final int chunkSize;
  // chunk总数
  private long numChunks; 
  // 如果一个chunk中包含的doc信息是不完整的,则算一次
  private long numDirtyChunks; 
  // 在dirtyChunk中的doc总数
  private long numDirtyDocs; 
  // 处理的doc总数
  private int numDocs;
  // 构建过程中暂时存储的DocData,触发flush的话就会持久化
  private final Deque<DocData> pendingDocs;
  // 当前正在处理的doc
  private DocData curDoc;
  // 当前正在处理的field
  private FieldData curField;
  // 上一个处理的term
  private final BytesRef lastTerm;
  // 全局临时存储的buf
  private int[] positionsBuf, startOffsetsBuf, lengthsBuf, payloadLengthsBuf;
  // 存储后缀
  private final ByteBuffersDataOutput termSuffixes;
  // 存储payload信息
  private final ByteBuffersDataOutput payloadBytes;
  // 批量整型的压缩工具
  private final BlockPackedWriter writer;
  // 一个chunk中最多的文档数
  private final int maxDocsPerChunk; 
  private final ByteBuffersDataOutput scratchBuffer = ByteBuffersDataOutput.newResettableInstance();

内部类

DocData

表示当前要序列化的一个doc的所有的词向量数据信息。

private class DocData {
  // doc中有多少个field  
  final int numFields;
  // 每个field的词向量信息  
  final Deque<FieldData> fields;
  // 当前doc在全局buffer(positionsBuf, startOffsetsBuf, payloadLengthsBuf)中的起始位置  
  final int posStart, offStart, payStart;
  DocData(int numFields, int posStart, int offStart, int payStart) {
    this.numFields = numFields;
    this.fields = new ArrayDeque<>(numFields);
    this.posStart = posStart;
    this.offStart = offStart;
    this.payStart = payStart;
  }
  // 新增一个field  
  FieldData addField(
      int fieldNum, int numTerms, boolean positions, boolean offsets, boolean payloads) {
    final FieldData field;
    if (fields.isEmpty()) {
      field =
          new FieldData(
              fieldNum, numTerms, positions, offsets, payloads, posStart, offStart, payStart);
    } else {
      final FieldData last = fields.getLast();
      // 计算当前field的一些起始位置,也就是前一个field的起始位置+前一个field的所有的数据量  
      final int posStart = last.posStart + (last.hasPositions ? last.totalPositions : 0);
      final int offStart = last.offStart + (last.hasOffsets ? last.totalPositions : 0);
      final int payStart = last.payStart + (last.hasPayloads ? last.totalPositions : 0);
      field =
          new FieldData(
              fieldNum, numTerms, positions, offsets, payloads, posStart, offStart, payStart);
    }
    fields.add(field);
    return field;
  }
}

FieldData

存储一个field的所有的词向量的所需的数据。

private class FieldData {
  final boolean hasPositions, hasOffsets, hasPayloads;
  // flags是个混合标记位,标记是否需要构建position,offset,payload  
  final int fieldNum, flags, numTerms;
  // freqs:存储的是每个term的频率
  // prefixLengths:存储的是当前term和前一个term的公共前缀的长度
  // suffixLengths:存储的是除了当前term和前一个term的公共前缀的剩余部分的长度
  final int[] freqs, prefixLengths, suffixLengths;
  // 当前Field的position,offset,payload数据在全局buf中的起始位置  
  final int posStart, offStart, payStart;
  int totalPositions;
  // 当前处理的是第几个term  
  int ord;
  FieldData(
      int fieldNum,
      int numTerms,
      boolean positions,
      boolean offsets,
      boolean payloads,
      int posStart,
      int offStart,
      int payStart) {
    this.fieldNum = fieldNum;
    this.numTerms = numTerms;
    this.hasPositions = positions;
    this.hasOffsets = offsets;
    this.hasPayloads = payloads;
    this.flags =
        (positions ? POSITIONS : 0) | (offsets ? OFFSETS : 0) | (payloads ? PAYLOADS : 0);
    this.freqs = new int[numTerms];
    this.prefixLengths = new int[numTerms];
    this.suffixLengths = new int[numTerms];
    this.posStart = posStart;
    this.offStart = offStart;
    this.payStart = payStart;
    totalPositions = 0;
    ord = 0;
  }
  // 新增一个term
  // prefixLength:和前一个term的最长公共前缀
  // suffixLength:除了prefix剩下的就是suffix  
  void addTerm(int freq, int prefixLength, int suffixLength) {
    freqs[ord] = freq;
    prefixLengths[ord] = prefixLength;
    suffixLengths[ord] = suffixLength;
    ++ord;
  }
  // 为当前处理的term新增一个位置信息数据,数据都是暂存在全局的buffer中
  void addPosition(int position, int startOffset, int length, int payloadLength) {
    if (hasPositions) {
      if (posStart + totalPositions == positionsBuf.length) {
        positionsBuf = ArrayUtil.grow(positionsBuf);
      }
      positionsBuf[posStart + totalPositions] = position;
    }
    if (hasOffsets) {
      if (offStart + totalPositions == startOffsetsBuf.length) {
        final int newLength = ArrayUtil.oversize(offStart + totalPositions, 4);
        startOffsetsBuf = ArrayUtil.growExact(startOffsetsBuf, newLength);
        lengthsBuf = ArrayUtil.growExact(lengthsBuf, newLength);
      }
      startOffsetsBuf[offStart + totalPositions] = startOffset;
      lengthsBuf[offStart + totalPositions] = length;
    }
    if (hasPayloads) {
      if (payStart + totalPositions == payloadLengthsBuf.length) {
        payloadLengthsBuf = ArrayUtil.grow(payloadLengthsBuf);
      }
      payloadLengthsBuf[payStart + totalPositions] = payloadLength;
    }
    ++totalPositions;
  }
}

构造方法

Lucene90CompressingTermVectorsWriter(
    Directory directory,
    SegmentInfo si,
    String segmentSuffix,
    IOContext context,
    String formatName,
    CompressionMode compressionMode,
    int chunkSize,
    int maxDocsPerChunk,
    int blockShift)
    throws IOException {
  assert directory != null;
  this.segment = si.name;
  this.compressionMode = compressionMode;
  this.compressor = compressionMode.newCompressor();
  this.chunkSize = chunkSize;
  this.maxDocsPerChunk = maxDocsPerChunk;
  numDocs = 0;
  pendingDocs = new ArrayDeque<>();
  termSuffixes = ByteBuffersDataOutput.newResettableInstance();
  payloadBytes = ByteBuffersDataOutput.newResettableInstance();
  lastTerm = new BytesRef(ArrayUtil.oversize(30, 1));
  boolean success = false;
  try {
    metaStream =
        directory.createOutput(
            IndexFileNames.segmentFileName(segment, segmentSuffix, VECTORS_META_EXTENSION),
            context);
    CodecUtil.writeIndexHeader(
        metaStream,
        VECTORS_INDEX_CODEC_NAME + "Meta",
        VERSION_CURRENT,
        si.getId(),
        segmentSuffix);
    assert CodecUtil.indexHeaderLength(VECTORS_INDEX_CODEC_NAME + "Meta", segmentSuffix)
        == metaStream.getFilePointer();
    vectorsStream =
        directory.createOutput(
            IndexFileNames.segmentFileName(segment, segmentSuffix, VECTORS_EXTENSION), context);
    CodecUtil.writeIndexHeader(
        vectorsStream, formatName, VERSION_CURRENT, si.getId(), segmentSuffix);
    assert CodecUtil.indexHeaderLength(formatName, segmentSuffix)
        == vectorsStream.getFilePointer();
    // 生成tvx索引文件
    indexWriter =
        new FieldsIndexWriter(
            directory,
            segment,
            segmentSuffix,
            VECTORS_INDEX_EXTENSION,
            VECTORS_INDEX_CODEC_NAME,
            si.getId(),
            blockShift,
            context);
    // 记录PackedInts的版本
    metaStream.writeVInt(PackedInts.VERSION_CURRENT);
    // 记录chunkSize  
    metaStream.writeVInt(chunkSize);
    writer = new BlockPackedWriter(vectorsStream, PACKED_BLOCK_SIZE);
    // 全局buffer,用来临时存储数据  
    positionsBuf = new int[1024];
    startOffsetsBuf = new int[1024];
    lengthsBuf = new int[1024];
    payloadLengthsBuf = new int[1024];
    success = true;
  } finally {
    if (!success) {
      IOUtils.closeWhileHandlingException(metaStream, vectorsStream, indexWriter, indexWriter);
    }
  }
}

核心方法

startDocument

要开始处理一个doc了,创建一个DocData来存储这个doc所有的数据信息。

  @Override
  public void startDocument(int numVectorFields) throws IOException {
    curDoc = addDocData(numVectorFields);
  }  
  private DocData addDocData(int numVectorFields) {
    FieldData last = null;
    // 逆序遍历pendingDocs列表,获取最后一个DocData,需要根据它来计算在下一个DocData在全局buffer中的起始offset
    for (Iterator<DocData> it = pendingDocs.descendingIterator(); it.hasNext(); ) {
      final DocData doc = it.next();
      if (!doc.fields.isEmpty()) {
        last = doc.fields.getLast();
        break;
      }
    }
    final DocData doc;
    if (last == null) {
      doc = new DocData(numVectorFields, 0, 0, 0); // 第一个doc
    } else {
      final int posStart = last.posStart + (last.hasPositions ? last.totalPositions : 0);
      final int offStart = last.offStart + (last.hasOffsets ? last.totalPositions : 0);
      final int payStart = last.payStart + (last.hasPayloads ? last.totalPositions : 0);
      doc = new DocData(numVectorFields, posStart, offStart, payStart);
    }
    pendingDocs.add(doc);
    return doc;
  }

startField

开始处理当前doc中的一个新的Field,创建FieldData,用来存储field的所有的数据信息。

  public void startField(
      FieldInfo info, int numTerms, boolean positions, boolean offsets, boolean payloads)
      throws IOException {
    curField = curDoc.addField(info.number, numTerms, positions, offsets, payloads);
    lastTerm.length = 0;
  }

startTerm

计算当前term和前一个term的最长公共前缀。

  public void startTerm(BytesRef term, int freq) throws IOException {
    // 和前一个term的最长公共前缀  
    final int prefix;
    if (lastTerm.length == 0) {
      prefix = 0;
    } else {
      prefix = StringHelper.bytesDifference(lastTerm, term);
    }
    // FieldData新增term  
    curField.addTerm(freq, prefix, term.length - prefix);
    // 存储suffix  
    termSuffixes.writeBytes(term.bytes, term.offset + prefix, term.length - prefix);
    // 更新lastTerm
    if (lastTerm.bytes.length < term.length) {
      lastTerm.bytes = new byte[ArrayUtil.oversize(term.length, 1)];
    }
    lastTerm.offset = 0;
    lastTerm.length = term.length;
    System.arraycopy(term.bytes, term.offset, lastTerm.bytes, 0, term.length);
  }

addPosition

为当前处理的term新增一个位置相关的信息。

  public void addPosition(int position, int startOffset, int endOffset, BytesRef payload)
      throws IOException {
    assert curField.flags != 0;
    curField.addPosition(
        position, startOffset, endOffset - startOffset, payload == null ? 0 : payload.length);
    if (curField.hasPayloads && payload != null) {
      payloadBytes.writeBytes(payload.bytes, payload.offset, payload.length);
    }
  }

finishField

结束一个field的处理,就是简单把当前的curFiled清空,等待处理下一个field。

  public void finishField() throws IOException {
    curField = null;
  }

finishDocument

可以看到,在结束一个doc的处理时,会判断是否满足一个chunk的构建条件,如果满足的话则进行构建。

  public void finishDocument() throws IOException {
    payloadBytes.copyTo(termSuffixes);
    payloadBytes.reset();
    ++numDocs;
    if (triggerFlush()) { // 是否满足一个chunk
      flush(false); // 构建一个chunk
    }
    curDoc = null;
  }

triggerFlush

判断当前是否满足一个chunk的构建条件,二者满足其一即可:

  private boolean triggerFlush() {
    return termSuffixes.size() >= chunkSize || pendingDocs.size() >= maxDocsPerChunk;
  }

构建chunk

flush

flush触发构建chunk逻辑,里面主要是调度逻辑,按类别构建所需的词向量信息。

  private void flush(boolean force) throws IOException {
    // 当前要构建的chunk中有几个doc  
    final int chunkDocs = pendingDocs.size();
    numChunks++;
    if (force) { // 如果是强制构建chunk,可能是不满足chunk条件的,这种chunk被定义为dirtyChunk
      numDirtyChunks++;
      numDirtyDocs += pendingDocs.size();
    }
    // 构建chunk的索引信息
    indexWriter.writeIndex(chunkDocs, vectorsStream.getFilePointer());
    final int docBase = numDocs - chunkDocs;
    // chunk的起始docID  
    vectorsStream.writeVInt(docBase);
    final int dirtyBit = force ? 1 : 0;
    vectorsStream.writeVInt((chunkDocs &lt;&lt; 1) | dirtyBit);
    // 记录每个doc的field数量
    final int totalFields = flushNumFields(chunkDocs);
    if (totalFields &gt; 0) {
      // 记录当前chunk中所有Field的编号
      final int[] fieldNums = flushFieldNums();
      // 记录所有doc的所有field的编号
      flushFields(totalFields, fieldNums);
      // 记录所有doc的所有field的flag
      flushFlags(totalFields, fieldNums);
      // 记录所有doc的所有field的term数量
      flushNumTerms(totalFields);
      // 记录所有term的长度信息
      flushTermLengths();
      // 记录所有term的频率
      flushTermFreqs();
      // 记录所有term的position信息
      flushPositions();
      // 记录所有term的offset信息
      flushOffsets(fieldNums);
      // 记录所有position的payload信息
      flushPayloadLengths();
      // 记录所有的suffix
      byte[] content = termSuffixes.toArrayCopy();
      compressor.compress(content, 0, content.length, vectorsStream);
    }
    // 重置相关变量,等待处理下一个chunk
    pendingDocs.clear();
    curDoc = null;
    curField = null;
    termSuffixes.reset();
  }

flushNumFields

记录所有doc的字段总数,分为两种情况:

  private int flushNumFields(int chunkDocs) throws IOException {
    if (chunkDocs == 1) { // 如果chunk中只有一个doc,则就直接写这个doc的字段总数
      final int numFields = pendingDocs.getFirst().numFields;
      vectorsStream.writeVInt(numFields);
      return numFields;
    } else { // 否则,使用PackedInts压缩存储所有doc的字段数信息
      writer.reset(vectorsStream);
      int totalFields = 0;
      for (DocData dd : pendingDocs) {
        writer.add(dd.numFields);
        totalFields += dd.numFields;
      }
      writer.finish();
      return totalFields;
    }
  }

flushFieldNums

  private int[] flushFieldNums() throws IOException {
    // chunk中所有term的编号按序存储  
    SortedSet<Integer> fieldNums = new TreeSet<>();
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        fieldNums.add(fd.fieldNum);
      }
    }
    final int numDistinctFields = fieldNums.size();
    final int bitsRequired = PackedInts.bitsRequired(fieldNums.last());
    // bitsRequired最大就是32,所以低5位就够了
    final int token = (Math.min(numDistinctFields - 1, 0x07) << 5) | bitsRequired;
    vectorsStream.writeByte((byte) token);
    if (numDistinctFields - 1 >= 0x07) {
      vectorsStream.writeVInt(numDistinctFields - 1 - 0x07);
    }
    final PackedInts.Writer writer =
        PackedInts.getWriterNoHeader(
            vectorsStream, PackedInts.Format.PACKED, fieldNums.size(), bitsRequired, 1);
    for (Integer fieldNum : fieldNums) {
      writer.add(fieldNum);
    }
    writer.finish();
    // Integer转int
    int[] fns = new int[fieldNums.size()];
    int i = 0;
    for (Integer key : fieldNums) {
      fns[i++] = key;
    }
    return fns;
  }

flushFields

存储doc中所有的field的编号。

  private void flushFields(int totalFields, int[] fieldNums) throws IOException {
    scratchBuffer.reset();
    // 使用  DirectWriter 压缩存储
    final DirectWriter writer =
        DirectWriter.getInstance(
            scratchBuffer, totalFields, DirectWriter.bitsRequired(fieldNums.length - 1));
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        final int fieldNumIndex = Arrays.binarySearch(fieldNums, fd.fieldNum);
        assert fieldNumIndex >= 0;
        writer.add(fieldNumIndex);
      }
    }
    writer.finish();
    vectorsStream.writeVLong(scratchBuffer.size());
    scratchBuffer.copyTo(vectorsStream);
  }

flushFlags

存储doc中所有field的flag,分为两种情况:

  private void flushFlags(int totalFields, int[] fieldNums) throws IOException {
    // 所有doc中相同的field是否都是一样的flag
    boolean nonChangingFlags = true;
    // 如果所有相同的field的flag都一样,则最后只存储这个数组  
    int[] fieldFlags = new int[fieldNums.length];
    Arrays.fill(fieldFlags, -1);
    outer:
    for (DocData dd : pendingDocs) { // 遍历所有的doc
      for (FieldData fd : dd.fields) { // 遍历所有的field
        final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum);
        assert fieldNumOff >= 0;
        if (fieldFlags[fieldNumOff] == -1) {
          fieldFlags[fieldNumOff] = fd.flags;
        } else if (fieldFlags[fieldNumOff] != fd.flags) { // 有一个field不一样
          nonChangingFlags = false;
          break outer;
        }
      }
    }
    if (nonChangingFlags) { // 如果所有doc相同的field的flag都一样,
      // 写0标记这种情况
      vectorsStream.writeVInt(0);
      scratchBuffer.reset();
      final DirectWriter writer =
          DirectWriter.getInstance(scratchBuffer, fieldFlags.length, FLAGS_BITS);
      for (int flags : fieldFlags) { // 每个field只写一个flag
        assert flags >= 0;
        writer.add(flags);
      }
      writer.finish();
      vectorsStream.writeVInt(Math.toIntExact(scratchBuffer.size()));
      scratchBuffer.copyTo(vectorsStream);
    } else { // 需要记录所有doc中的所有field的flag
      // 写1标记这种情况
      vectorsStream.writeVInt(1);
      scratchBuffer.reset();
      final DirectWriter writer = DirectWriter.getInstance(scratchBuffer, totalFields, FLAGS_BITS);
      for (DocData dd : pendingDocs) { // 遍历doc
        for (FieldData fd : dd.fields) { // 遍历field
          writer.add(fd.flags); // 记录field的flag
        }
      }
      writer.finish();
      vectorsStream.writeVInt(Math.toIntExact(scratchBuffer.size()));
      scratchBuffer.copyTo(vectorsStream);
    }
  }

flushNumTerms

存储所有field的term数量,会先统计最大的term数量,用来获取最大的term数据值需要几个bit存储。

  private void flushNumTerms(int totalFields) throws IOException {
    int maxNumTerms = 0;
    // 获取最大的term数量的值  
    for (DocData dd : pendingDocs) { 
      for (FieldData fd : dd.fields) {
        maxNumTerms |= fd.numTerms;
      }
    }
    final int bitsRequired = DirectWriter.bitsRequired(maxNumTerms);
    vectorsStream.writeVInt(bitsRequired);
    scratchBuffer.reset();
    // 使用DirectWriter压缩存储  
    final DirectWriter writer = DirectWriter.getInstance(scratchBuffer, totalFields, bitsRequired);
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        writer.add(fd.numTerms);
      }
    }
    writer.finish();
    vectorsStream.writeVInt(Math.toIntExact(scratchBuffer.size()));
    scratchBuffer.copyTo(vectorsStream);
  }

flushTermLengths

分别存储term的prefixLength和suffixLength。

  private void flushTermLengths() throws IOException {
    // 存储prefixLength  
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        for (int i = 0; i < fd.numTerms; ++i) {
          writer.add(fd.prefixLengths[i]);
        }
      }
    }
    writer.finish();
    // 存储suffixLength  
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        for (int i = 0; i < fd.numTerms; ++i) {
          writer.add(fd.suffixLengths[i]);
        }
      }
    }
    writer.finish();
  }

flushTermFreqs

存储term的频率,这里有个小小的优化,为了提高压缩率。

  private void flushTermFreqs() throws IOException {
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        for (int i = 0; i < fd.numTerms; ++i) {
          // 已经确定了freq肯定是大于等于1,减1是为了提高writer的压缩率,读取的时候加1就行了。
          writer.add(fd.freqs[i] - 1);
        }
      }
    }
    writer.finish();
  }

flushPositions

差值存储所有的position。

  private void flushPositions() throws IOException {
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        if (fd.hasPositions) {
          int pos = 0;
          for (int i = 0; i < fd.numTerms; ++i) {
            int previousPosition = 0;
            for (int j = 0; j < fd.freqs[i]; ++j) {
              final int position = positionsBuf[fd.posStart + pos++];
              writer.add(position - previousPosition);
              previousPosition = position;
            }
          }
          assert pos == fd.totalPositions;
        }
      }
    }
    writer.finish();
  }

flushOffsets

offset的存储做了一个优化设计,原因是term出现的不同的offset跨度可能会比较大,如果把原始的offset用PackedInts进行存储,可能压缩率不会很高。因此,在正式存储offset之前,先计算平均的term长度,根据term出现的前后两个offset的position,可以估计两个position的距离,用真实的前后两个offset的距离减去这个估计的距离,就能使得offset的差值向0趋近,可以提高PackedInts的压缩率。

  private void flushOffsets(int[] fieldNums) throws IOException {
    // 至少一个字段开启了offset  
    boolean hasOffsets = false;
    // term在所有字段中出现的最后一个postition之和  
    long[] sumPos = new long[fieldNums.length];
    // term在所有字段中出现的最后一个startOffset之和  
    long[] sumOffsets = new long[fieldNums.length];
    for (DocData dd : pendingDocs) { // 遍历所有的doc
      for (FieldData fd : dd.fields) { // 遍历doc中的所有field
        hasOffsets |= fd.hasOffsets;
        if (fd.hasOffsets && fd.hasPositions) { // 如果字段开启了offset和position
          // 查找在term数组中的下标  
          final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum);
          int pos = 0;
          for (int i = 0; i < fd.numTerms; ++i) {
            sumPos[fieldNumOff] += positionsBuf[fd.posStart + fd.freqs[i] - 1 + pos];
            sumOffsets[fieldNumOff] += startOffsetsBuf[fd.offStart + fd.freqs[i] - 1 + pos];
            pos += fd.freqs[i];
          }
          assert pos == fd.totalPositions;
        }
      }
    }
    if (!hasOffsets) {
      return;
    }
    final float[] charsPerTerm = new float[fieldNums.length];
    // 用  sumOffsets[i] / sumPos[i] 估计第i个term的长度
    for (int i = 0; i < fieldNums.length; ++i) {
      charsPerTerm[i] =
          (sumPos[i] <= 0 || sumOffsets[i] <= 0) ? 0 : (float) ((double) sumOffsets[i] / sumPos[i]);
    }
    // tvd中存储charsPerTerm
    for (int i = 0; i < fieldNums.length; ++i) {
      vectorsStream.writeInt(Float.floatToRawIntBits(charsPerTerm[i]));
    }
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) { // 遍历所有的doc
      for (FieldData fd : dd.fields) { // 遍历doc中所有的field
        if ((fd.flags & OFFSETS) != 0) { // 如果开启了offset
          final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum);
          final float cpt = charsPerTerm[fieldNumOff];
          int pos = 0;
          for (int i = 0; i < fd.numTerms; ++i) { // 遍历field中所有的term
            int previousPos = 0; // 差值使用
            int previousOff = 0; // 差值使用
            for (int j = 0; j < fd.freqs[i]; ++j) { // 遍历term出现的所有位置
              final int position = fd.hasPositions ? positionsBuf[fd.posStart + pos] : 0;
              final int startOffset = startOffsetsBuf[fd.offStart + pos];
              // (int) (cpt * (position - previousPos)):当前potition和前一个position之间的长度
              // startOffset - previousOff再减去(int) (cpt * (position - previousPos))就把值降到最小
              writer.add(startOffset - previousOff - (int) (cpt * (position - previousPos)));
              previousPos = position;
              previousOff = startOffset;
              ++pos;
            }
          }
        }
      }
    }
    writer.finish();
    // lengths
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) { // 遍历所有的doc
      for (FieldData fd : dd.fields) { // 遍历所有的Field
        if ((fd.flags & OFFSETS) != 0) { // 如果开启了offset
          int pos = 0;
          for (int i = 0; i < fd.numTerms; ++i) {
            for (int j = 0; j < fd.freqs[i]; ++j) {
              writer.add(
                  // 减去前缀长度和后缀长度也是为了把值变小,减少存储空间
                  lengthsBuf[fd.offStart + pos++] - fd.prefixLengths[i] - fd.suffixLengths[i]);
            }
          }
          assert pos == fd.totalPositions;
        }
      }
    }
    writer.finish();
  }

flushPayloadLengths

存储所有的payload的长度信息。

  private void flushPayloadLengths() throws IOException {
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        if (fd.hasPayloads) {
          for (int i = 0; i &lt; fd.totalPositions; ++i) {
            writer.add(payloadLengthsBuf[fd.payStart + i]);
          }
        }
      }
    }
    writer.finish();
  }

finish

结束词向量索引文件的构建,把待处理doc列表中剩下的doc生成一个chunk。

  public void finish(int numDocs) throws IOException {
    if (!pendingDocs.isEmpty()) { // 如果还有待处理的doc,则强制生成一个chunk
      flush(true);
    }
    if (numDocs != this.numDocs) {
      throw new RuntimeException(
          "Wrote " + this.numDocs + " docs, finish called with numDocs=" + numDocs);
    }
    // 生成tvx索引文件  
    indexWriter.finish(numDocs, vectorsStream.getFilePointer(), metaStream);
    metaStream.writeVLong(numChunks);
    metaStream.writeVLong(numDirtyChunks);
    metaStream.writeVLong(numDirtyDocs);
    CodecUtil.writeFooter(metaStream);
    CodecUtil.writeFooter(vectorsStream);
  }

TermVectorsConsumerPerField

TermVectorsConsumerPerField是TermsHashPerField的子类,TermsHashPerField在之前介绍倒排的时候已经非常详细地介绍过了。

在本文中,我们重点介绍不一样的地方。在介绍倒排的时候使用的是FreqProxTermsWriterPerField,它存储了所有的倒排数据,在所有的文档都处理完了之后才进行序列化和持久化,TermVectorsConsumerPerField和它最大的区别是每处理完一个doc,就进行序列化然后重置等待处理下一个doc。

在TermVectorsConsumerPerField的源码中,如果已经看明白之前倒排的逻辑,则大部分地方理解起来都比较容易,这里我们只看一个文档处理完之后进行序列化的逻辑,实际上在TermVectorsConsumerPerField中只负责调度Lucene90CompressingTermVectorsWriter进行操作:

  void finishDocument() throws IOException {
    // 如果没有开启词向量构建  
    if (doVectors == false) {
      return;
    }
    doVectors = false;
    // 当前field的term总数
    final int numPostings = getNumTerms();
    // 用来存储当前序列化的term
    final BytesRef flushTerm = termsWriter.flushTerm;
    TermVectorsPostingsArray postings = termVectorsPostingsArray;
    // 序列化和持久化的核心类,实际上使用的实现类:Lucene90CompressingTermVectorsWriter 
    final TermVectorsWriter tv = termsWriter.writer;
    // 对term进行排序
    sortTerms();
    // 获取排序后的termID列表
    final int[] termIDs = getSortedTermIDs();
    // 开始处理一个Field
    tv.startField(fieldInfo, numPostings, doVectorPositions, doVectorOffsets, hasPayloads);
    // 用来从bytePool中读取position信息
    final ByteSliceReader posReader = doVectorPositions ? termsWriter.vectorSliceReaderPos : null;
    // 用来从bytePool中读取offset信息  
    final ByteSliceReader offReader = doVectorOffsets ? termsWriter.vectorSliceReaderOff : null;
    // 遍历所有的term
    for (int j = 0; j < numPostings; j++) {
      final int termID = termIDs[j];
      final int freq = postings.freqs[termID];
      // 当前处理的term存入flushTerm
      termBytePool.setBytesRef(flushTerm, postings.textStarts[termID]);
      // 准备序列化term的词向量信息  
      tv.startTerm(flushTerm, freq);
      if (doVectorPositions || doVectorOffsets) {
        if (posReader != null) {
          initReader(posReader, termID, 0);
        }
        if (offReader != null) {
          initReader(offReader, termID, 1);
        }
        // 序列化所有的position和offset信息
        tv.addProx(freq, posReader, offReader);
      }
      // 结束term的处理  
      tv.finishTerm();
    }
    // 结束Field的处理  
    tv.finishField();
    reset();
    fieldInfo.setStoreTermVectors();
  }

索引文件格式

tvm

词向量索引文件的元信息,用来读取使用。

tvm.png

字段详解

Header

文件头部信息,主要是包括:

PackedItsVersion

在词向量的索引文件中有很多数据是使用PackedIts压缩存储,该字段记录PackedInts的版本。、

ChunkSize

用来判断是否满足一个chunk的一种条件,如果chunk的大小超过了ChunkSize的限制,则可以构建一个chunk

NumChunks

chunk总数

NumDirtyChunks

dirtyChunk总数

NumDirtyDocs

dirtyChunk中的doc总数

NumDocs

doc总数

BlockShift

DirectMonotonicWriter需要的参数,DirectMonotonicWriter压缩存储会生成多个block,BlockShift决定了block的大小。

TotalChunks + 1

chunk总数 + 1,在生成tvx索引文件中ChunkStartDocIDs和ChunkTVDOffsets两个字段时,使用DirectMonotonicWriter写入的值的总数。

tvxDocStartFP

tvx索引文件中ChunkStartDocIDs的起始位置

DocBlockMeta

tvx索引文件中ChunkStartDocIDs使用DirectMonotonicWriter编码存储,会生成多个block,这些block的元信息。

tvxOffsetStartFP

tvx中ChunkTVDOffsets的起始位置

OffsetBlockMeta

tvx索引文件中ChunkTVDOffsets使用DirectMonotonicWriter编码存储,会生成多个block,这些block的元信息。

SPEndPoint

tvx文件的结束位置,后面是tvx的footer信息。

MaxPointer

tvd文件的结束位置,后面tvd的footer信息。

Footer

文件尾,主要包括

tvd 字段详解

tvd索引文件主要是存储倒排信息中freq,position,offset,payload。

Header

文件头部信息,主要是包括:

chunk

在词向量的构建过程中,

Footer

文件尾,主要包括

tvx 字段详解

tvx索引文件主要存储的是tvd索引文件中的一些索引信息,tvd中每个chunk的起始docID以及存储的起始位置。

Header

文件头部信息,主要是包括:

ChunkStartDocIDs

所有chunk的起始docID,使用DirectMonotonicWriter编码存储,会生成多个block。

ChunkTVDOffsetsBlock

所有chunk在tvd索引文件中的起始位置,使用DirectMonotonicWriter编码存储,会生成多个block。

Footer

文件尾,主要包括

以上就是Lucene词向量索引文件构建源码解析的详细内容,更多关于Lucene词向量索引文件构建的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文