Flink Sort-Shuffle写简析

news/2024/7/3 9:02:09

文章目录

  • 1、配置
  • 2、初始创建
  • 3、成员变量
  • 4、写shuffle文件
    • 4.1、获取SortBuffer
    • 4.2、追加数据
    • 4.3、buffer不足的处理
    • 4.4、buffer不足数据未读完
  • 5、关于排序
    • 5.1、segment申请
    • 5.2、writeIndex
      • 5.2.1、获取当前可用segment
      • 5.2.2、写入index到segment
      • 5.2.3、更新partition最后数据的索引
      • 5.2.4、分区前后数据关联
      • 5.2.5、更新公共变量的值
    • 5.3、writeRecord
    • 5.4、flushSortBuffer
      • 5.4.1、启动新region
      • 5.4.2、构建基础对象
      • 5.4.3、copyIntoSegment
        • 5.4.3.1、获取分区号
        • 5.4.3.2、获取元数据信息
        • 5.4.3.3、copyRecordOrEvent
        • 5.4.3.4、读同分区下一个数据
        • 5.4.3.5、封装buffer
      • 5.4.4、更新统计数据
      • 5.4.5、compressBuffer
      • 5.4.6、buffer写出
        • 5.4.6.1、构建ByteBuffer
        • 5.4.6.2、collectBroadcastBuffers
        • 5.4.6.3、collectUnicastBuffers
        • 5.4.6.4、collectUnicastBuffers


1、配置

  • taskmanager.network.sort-shuffle.min-parallelism

    核心配置。设置Hash Shuffle和Sort Shuffle的分界。并发数大于该值时,使用Sort Shuffle。默认是int最大值,即使用Hash Shuffle。

  • taskmanager.network.sort-shuffle.min-buffers

    每个Sort Shuffle的Result Partition使用的最小buffer数,默认64,推荐生产使用2048,但可能需要增大网络内存配置。

  • taskmanager.network.blocking-shuffle.compression.enabled

    是否启用压缩

2、初始创建

  实现类:SortMergeResultPartition
  类的创建在ResultPartitionFactory的create()方法中,根据不同的配置,会选择使用不同的ResultPartition,总共三种:PipelinedResultPartition、SortMergeResultPartition、BoundedBlockingResultPartition。其中PipelinedResultPartition用于流模式,其他的用于批模式。
  创建SortMergeResultPartition的分支条件如下

else if (type == ResultPartitionType.BLOCKING
                || type == ResultPartitionType.BLOCKING_PERSISTENT) {
            if (numberOfSubpartitions >= sortShuffleMinParallelism)

3、成员变量

  • NUM_WRITE_BUFFER_BYTES

    int类型的数值,表示为数据写入设置的buffer大小,目前固定16M,不可配置。

  • resultFile

    PartitionedFile类型,是Sort-Merge Shuffle的持久化文件代表,包含两个文件:.shuffle.data、.shuffle.index。文件根目录在tmp。
    数据文件内分为多个区域,每个区域内,相同的子分区的数据相邻存储。索引条目是(long,int)类型,long代表文件偏移量,int代表buffer数量。

  • writeSegments

    List类型,从网络buffer切出来的用于数据写入的buffer。其中,numRequiredBuffer来源在ResultPartitionFactory中,根据shuffle类型,选择不同的值,其值来源配置:taskmanager.network.sort-shuffle.min-buffers

int expectedWriteBuffers = NUM_WRITE_BUFFER_BYTES / networkBufferSize;
if (networkBufferSize > NUM_WRITE_BUFFER_BYTES) {
	expectedWriteBuffers = 1;
}

int numRequiredBuffer = bufferPool.getNumberOfRequiredMemorySegments();
int numWriteBuffers = Math.min(numRequiredBuffer / 2, expectedWriteBuffers);
int numRequiredBuffers =
	!type.isPipelined() && numberOfSubpartitions >= sortShuffleMinParallelism
	? sortShuffleMinBuffers
	: numberOfSubpartitions + 1;
  • networkBufferSize

    int类型的数值,网络缓冲区和写缓冲区的大小(buffer的大小),其值来源pageSize,由taskmanager.memory.segment-size设定

  • fileWriter

    PartitionedFileWriter类型,此ResultPartition的文件输出器。

  • subpartitionOrder

    int[]类型,分区的顺序,用于写入数据文件时的分区顺序。

  • readScheduler

    SortMergeResultPartitionReadScheduler类型,分区数据读取调度器。

  • numBuffersForSort

    int类型的数值,unicastSortBuffer和broadcastSortBuffer可使用的buffer数。

  • broadcastSortBuffer

    SortBuffer类型,用于broadcastRecord使用的buffer

  • unicastSortBuffer

    SortBuffer类型,用于飞broadcastRecord使用的buffer

4、写shuffle文件

  基于数据收发的内容,数据发送按RecordWriteOutput的collect方法开始

RecordWriteOutput.collect()
	->pushToRecordWriter()
		->RecordWriter.emit()
			->ResultPartitionWriter.emitRecord()
				->SortMergeResultPartition.emitRecord()

4.1、获取SortBuffer

  首先判断是否是Broadcast数据,然后根据条件,创建新的buffer并返回

SortBuffer sortBuffer = isBroadcast ? getBroadcastSortBuffer() : getUnicastSortBuffer();

  getUnicastSortBuffer()方法中,主要做两件事:1、flush Broadcast的buffer;2、创建新的buffer并返回。详细内容见第五章。

private SortBuffer getUnicastSortBuffer() throws IOException {
        flushBroadcastSortBuffer();

        if (unicastSortBuffer != null && !unicastSortBuffer.isFinished()) {
            return unicastSortBuffer;
        }

        unicastSortBuffer =
                new PartitionSortedBuffer(
                        lock,
                        bufferPool,
                        numSubpartitions,
                        networkBufferSize,
                        numBuffersForSort,
                        subpartitionOrder);
        return unicastSortBuffer;
    }

4.2、追加数据

  此步骤将产生的数据写入上一节产生的SortBuffer当中。注意这边的判断条件,当数据过大没有足够buffer写入时才会向下执行,否则写入完成后退出方法

if (sortBuffer.append(record, targetSubpartition, dataType)) {
	return;
}
// return false directly if it can not allocate enough buffers for the given record
if (!allocateBuffersForRecord(totalBytes)) {
	return false;
}

  写入数据的时候会在前部先写入一个元数据信息

// write the index entry and record or event data
writeIndex(targetChannel, totalBytes, dataType);
writeRecord(source);

4.3、buffer不足的处理

  此步骤是4.2步骤buffer不足的后续处理,如果数据已经全部读出,则释放该buffer并采用其他方式写入过大的数据

if (!sortBuffer.hasRemaining()) {
	// the record can not be appended to the free sort buffer because it is too large
	sortBuffer.finish();
	sortBuffer.release();
	writeLargeRecord(record, targetSubpartition, dataType, isBroadcast);
	return;
}

4.4、buffer不足数据未读完

  此步骤接续4.3,当buffer不足以写入新数据且数据未被写入shuffle文件时,增加shuffle出文件的操作并重新调用写数据方法

flushSortBuffer(sortBuffer, isBroadcast);
emit(record, targetSubpartition, dataType, isBroadcast);

5、关于排序

  PartitionSortedBuffer是会进行排序的buffer,依赖于内部的MemorySegment列表作为缓冲。相关的一些成员变量如下,index和segment使用的是同一份MemorySegment列表

/** A segment list as a joint buffer which stores all records and index entries. */
@GuardedBy("lock")
private final ArrayList<MemorySegment> segments = new ArrayList<>();

/** Addresses of the first record's index entry for each subpartition. */
private final long[] firstIndexEntryAddresses;

/** Addresses of the last record's index entry for each subpartition. */
private final long[] lastIndexEntryAddresses;

/** Array index in the segment list of the current available buffer for writing. */
private int writeSegmentIndex;

/** Next position in the current available buffer for writing. */
private int writeSegmentOffset;

5.1、segment申请

  根据第四章内容,添加数据有如下调用链:emit()->append()->allocateBuffersForRecord()
  allocateBuffersForRecord是申请segment用来存储数据的。当segment不足时,向bufferPool申请新资源。注意初始的时候,segment的列表是空的,所以最初必然是会申请的。
  注意,一个segment是可能写多个数据的,如下,writeSegmentOffset是当前segment的写入位置,如果剩余量充足,是会继续写入数据的。

int availableBytes =
	writeSegmentIndex == segments.size() ? 0 : bufferSize - writeSegmentOffset;

// return directly if current available bytes is adequate
if (availableBytes >= numBytesRequired) {
	return true;
}

5.2、writeIndex

  在落地文件层,index和数据是分文件的,在PartitionedFile的定义如下

public static final String DATA_FILE_SUFFIX = ".shuffle.data";

public static final String INDEX_FILE_SUFFIX = ".shuffle.index";

  PartitionSortedBuffer的writeIndex方法完成index向segment的写入,详细如下

5.2.1、获取当前可用segment

  获取当前可用的segment,内部使用writeSegmentIndex记录segments列表当中segment的下表

MemorySegment segment = segments.get(writeSegmentIndex);

5.2.2、写入index到segment

  写入index到segment,一个index是一个long数据,占64位。其中,高32位记录数据长度,低32位记录数据类型。此处用到了long64位、int32位、位运算相关知识。<<是左移符号

// record length takes the high 32 bits and data type takes the low 32 bits
segment.putLong(writeSegmentOffset, ((long) numRecordBytes << 32) | dataType.ordinal());

5.2.3、更新partition最后数据的索引

  更新对应partition的最后数据的索引。
  lastIndexEntryAddresses是一个列表,大小与分区数对应,每一项记录对应分区的最新数据的索引地址。
  索引地址即indexEntryAddress,也是一个long类型的数据,高32位只想segments列表中对应segment的下标,低32位指向segment内部的偏移量。此结构式后续排序的一个基础。

// segment index takes the high 32 bits and segment offset takes the low 32 bits
long indexEntryAddress = ((long) writeSegmentIndex << 32) | writeSegmentOffset;

long lastIndexEntryAddress = lastIndexEntryAddresses[channelIndex];
lastIndexEntryAddresses[channelIndex] = indexEntryAddress;

5.2.4、分区前后数据关联

  此步骤是将新数据的索引附加在上一个数据索引的后面,如果没有上一个数据,直接放入firstIndexEntryAddresses,表示当前数据是此分区最早的数据

if (lastIndexEntryAddress >= 0) {
	// link the previous index entry of the given channel to the new index entry
	segment = segments.get(getSegmentIndexFromPointer(lastIndexEntryAddress));
	segment.putLong(getSegmentOffsetFromPointer(lastIndexEntryAddress) + 8, indexEntryAddress);
} else {
	firstIndexEntryAddresses[channelIndex] = indexEntryAddress;
}

  以上,getSegmentIndexFromPointer和getSegmentOffsetFromPointer分别获取segment在列表中的下标以及segment内部的偏移量,如5.2.3所述

private int getSegmentIndexFromPointer(long value) {
	return (int) (value >>> 32);
}

private int getSegmentOffsetFromPointer(long value) {
	return (int) (value);
}

  getSegmentOffsetFromPointer(lastIndexEntryAddress) + 8的意思是:8即8 bytes,也就是64 bit,按5.2.2所述,这是一个数据的索引的长度,也就是在前一个数据的索引后面加入添加上当前数据的索引。
  关于segment索引后面预留一个index空间的来源。
  参照下节5.2.5及成员变量INDEX_ENTRY_SIZE,这是一个4+4+8的值,也就是一个当前索引的长度+预留下一个索引的长度。

5.2.5、更新公共变量的值

  此步骤主要更新writeSegmentOffset的值,也就是segment的内部偏移量,可以看到,一次性偏移了两个64位的量,也就是两个索引的位置,与5.2.4追加索引关联上了

// move the write position forward so as to write the corresponding record
updateWriteSegmentIndexAndOffset(INDEX_ENTRY_SIZE);

private void updateWriteSegmentIndexAndOffset(int numBytes) {
	writeSegmentOffset += numBytes;

	// using the next available free buffer if the current is full
	if (writeSegmentOffset == bufferSize) {
		++writeSegmentIndex;
		writeSegmentOffset = 0;
	}
}

5.3、writeRecord

  此步骤用于写数据进segment。写数据步骤相对写index简单很多,就是直接将数据不断追加进segment

private void writeRecord(ByteBuffer source) {
    while (source.hasRemaining()) {
        MemorySegment segment = segments.get(writeSegmentIndex);
        int toCopy = Math.min(bufferSize - writeSegmentOffset, source.remaining());
        segment.put(writeSegmentOffset, source, toCopy);

        // move the write position forward so as to write the remaining bytes or next record
        updateWriteSegmentIndexAndOffset(toCopy);
    }
}

5.4、flushSortBuffer

  此步骤用于将buffer中的数据写出到shuffle文件当中

5.4.1、启动新region

  shuffle文件是按region存储的,每个region内,相同分区的数据写在一起,不同的region之间不保证。向shuffl文件输出的写操作的实现类是PartitionedFileWriter

fileWriter.startNewRegion(isBroadcast);

  此处会调用到PartitionedFileWriter的writeRegionIndex方法,这个方法初次进入不做操作,开启第二个region开始才会进行执行。

private void writeRegionIndex() throws IOException {
    if (Arrays.stream(subpartitionBuffers).sum() > 0) {
        for (int channel = 0; channel < numSubpartitions; ++channel) {
            writeIndexEntry(subpartitionOffsets[channel], subpartitionBuffers[channel]);
        }

        currentSubpartition = -1;
        ++numRegions;
        Arrays.fill(subpartitionBuffers, 0);
    }
}

private void writeIndexEntry(long subpartitionOffset, int numBuffers) throws IOException {
    if (!indexBuffer.hasRemaining()) {
        if (!extendIndexBufferIfPossible()) {
            flushIndexBuffer();
            indexBuffer.clear();
            allIndexEntriesCached = false;
        }
    }

    indexBuffer.putLong(subpartitionOffset);
    indexBuffer.putInt(numBuffers);
}

  相关的PartitionedFileWriter的成员如下
  1、subpartitionBuffers,分区写入的buffer数
  2、subpartitionOffsets,分区写入的偏移,也就是记录写入的数据量(bytes)
  3、indexBuffer,用于写入index的buffer,满了会溢出写到文件,此处写入index文件
  subpartitionBuffers是一个数组,每一项记录了对应分区写出的buffer数,在5.4.6节写出数据的时候会增加。此处Arrays.stream(subpartitionBuffers).sum() > 0就是判断已经存在文件输出了
  subpartitionOffsets代表数据在文件中的偏移量,也是在5.4.6节写数据的时候会更新,就是统计输出到文件的bytes数

5.4.2、构建基础对象

  这一步构建两个基础对象List toWrite、Queue segments。其中toWrite用于后续向文件输出,segments是基于writeSegments列表克隆出来的一个队列。

private Queue<MemorySegment> getWriteSegments() {
    synchronized (lock) {
        checkState(!writeSegments.isEmpty(), "Task has been canceled.");
        return new ArrayDeque<>(writeSegments);
    }
}

5.4.3、copyIntoSegment

  这一步是将segment的数据封装进buffer形成一个BufferWithChannel用于后续写出到文件。

5.4.3.1、获取分区号

  subpartitionReadOrder列表设置了分区读取顺序,可以自定义;readOrderIndex设置了当前读取的分区

// 获取
int channelIndex = subpartitionReadOrder[readOrderIndex];

// subpartitionReadOrder定义
this.subpartitionReadOrder = new int[numSubpartitions];
if (customReadOrder != null) {
    checkArgument(customReadOrder.length == numSubpartitions, "Illegal data read order.");
    System.arraycopy(customReadOrder, 0, this.subpartitionReadOrder, 0, numSubpartitions);
} else {
    for (int channel = 0; channel < numSubpartitions; ++channel) {
        this.subpartitionReadOrder[channel] = channel;
    }
}

5.4.3.2、获取元数据信息

  这一步参考5.2节,根据其中的数据,反向解析出对应的index信息。此步开始是一个循环调用的操作,注意如果已经读取部分数据并且下一个读的数据是event事件类型,则跳出循环

int sourceSegmentIndex = getSegmentIndexFromPointer(readIndexEntryAddress);
int sourceSegmentOffset = getSegmentOffsetFromPointer(readIndexEntryAddress);
MemorySegment sourceSegment = segments.get(sourceSegmentIndex);

long lengthAndDataType = sourceSegment.getLong(sourceSegmentOffset);
int length = getSegmentIndexFromPointer(lengthAndDataType);
DataType dataType = DataType.values()[getSegmentOffsetFromPointer(lengthAndDataType)];

// return the data read directly if the next to read is an event
if (dataType.isEvent() && numBytesCopied > 0) {
    break;
}
bufferDataType = dataType;

// get the next index entry address and move the read position forward
long nextReadIndexEntryAddress = sourceSegment.getLong(sourceSegmentOffset + 8);
sourceSegmentOffset += INDEX_ENTRY_SIZE;

5.4.3.3、copyRecordOrEvent

  这一步就是将数据拷贝进克隆出来的segment中,注意这里只拷贝了数据。

5.4.3.4、读同分区下一个数据

  前面读出了下一个数据的地址,此处如果当前读取的数据不是分区的最后一个数据,则继续读下一个数据。基于这一步的操作,完成了同分区写在一起的目的。

if (recordRemainingBytes == 0) {
    // move to next channel if the current channel has been finished
    if (readIndexEntryAddress == lastIndexEntryAddresses[channelIndex]) {
        updateReadChannelAndIndexEntryAddress();
        break;
    }
    readIndexEntryAddress = nextReadIndexEntryAddress;
}

5.4.3.5、封装buffer

  这一步将segment封装成Buffer,再进一步添加分区号封装成BufferWithChannel

numTotalBytesRead += numBytesCopied;
Buffer buffer = new NetworkBuffer(target, (buf) -> {}, bufferDataType, numBytesCopied);
return new BufferWithChannel(buffer, channelIndex);

5.4.4、更新统计数据

  这一步是更新统计相关的数据

private void updateStatistics(Buffer buffer, boolean isBroadcast) {
    numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
    long readableBytes = buffer.readableBytes();
    numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions : readableBytes);
}

5.4.5、compressBuffer

  这一步根据情况,对buffer做压缩

private BufferWithChannel compressBufferIfPossible(BufferWithChannel bufferWithChannel) {
    Buffer buffer = bufferWithChannel.getBuffer();
    if (!canBeCompressed(buffer)) {
        return bufferWithChannel;
    }

    buffer = checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);
    return new BufferWithChannel(buffer, bufferWithChannel.getChannelIndex());
}

  可压缩条件如下

protected boolean canBeCompressed(Buffer buffer) {
    return bufferCompressor != null && buffer.isBuffer() && buffer.readableBytes() > 0;
}

  压缩类的创建如下,是批方式并且配置了压缩的情况下会创建压缩类

BufferCompressor bufferCompressor = null;
if (type.isBlocking() && blockingShuffleCompressionEnabled) {
    bufferCompressor = new BufferCompressor(networkBufferSize, compressionCodec);
}

  最后会把buffer放入toWrite列表

toWrite.add(compressBufferIfPossible(bufferWithChannel));

5.4.6、buffer写出

  这一步完成buffer数据向PartitionedFile的写出,使用PartitionedFileWriter。

fileWriter.writeBuffers(toWrite);

5.4.6.1、构建ByteBuffer

  此处会构建一个BufferWithChannel列表两倍量的ByteBuffer,两倍的原因就是会有一个每个buffer会有一个头数据。

ByteBuffer[] bufferWithHeaders = new ByteBuffer[2 * bufferWithChannels.size()];

5.4.6.2、collectBroadcastBuffers

  根据情况,broadcast和非broadcast会有不同的处理,待续

5.4.6.3、collectUnicastBuffers

  这一步就是向bufferWithHeaders填充buffer和设置header的。过程就是循环获取BufferWithChannel,然后对 每个channel设置bufferWithHeaders。
  根据sort-shuffle的特性,一个region内同分区的数据会写在一起,所以有一步分区判断,但是根据5.4.3.4的内容,数据是已经聚合过的,所以这一步判断肯定是过的。如下,当partition id变更的时候,那肯定就是新的id,以前是没有数据的,也就是subpartitionBuffers[subpartition] == 0

int subpartition = bufferWithChannels.get(i).getChannelIndex();
if (subpartition != currentSubpartition) {
    checkState(
            subpartitionBuffers[subpartition] == 0,
            "Must write data of the same channel together.");
    subpartitionOffsets[subpartition] = fileOffset;
    currentSubpartition = subpartition;
}

  之后是设置bufferWithHeaders,可以看到index是2的倍数跳的。同时,此处会对一些数据做记录,比如分区的buffer数等。

Buffer buffer = bufferWithChannels.get(i).getBuffer();
int numBytes = setBufferWithHeader(buffer, bufferWithHeaders, 2 * i);
expectedBytes += numBytes;
fileOffset += numBytes;
++subpartitionBuffers[subpartition];

  setBufferWithHeader的内容主要就是设置一个头信息,然后数据使用一个ByteBuffer存储(使用netty内部的实现)。此处bufferWithChannels传入的Buffer是一个NetworkBuffer

private int setBufferWithHeader(Buffer buffer, ByteBuffer[] bufferWithHeaders, int index) {
    ByteBuffer header = BufferReaderWriterUtil.allocatedHeaderBuffer();
    BufferReaderWriterUtil.setByteChannelBufferHeader(buffer, header);

    bufferWithHeaders[index] = header;
    bufferWithHeaders[index + 1] = buffer.getNioBufferReadable();

    return header.remaining() + buffer.readableBytes();
}

  头信息的内容如下:1、数据类型;2、是否压缩;3、buffer大小

static void setByteChannelBufferHeader(Buffer buffer, ByteBuffer header) {
    header.clear();
    header.putShort(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT);
    header.putShort(buffer.isCompressed() ? BUFFER_IS_COMPRESSED : BUFFER_IS_NOT_COMPRESSED);
    header.putInt(buffer.getSize());
    header.flip();
}

5.4.6.4、collectUnicastBuffers

  最后是写出数据,利用FileChannel写出5.4.6.3节的buffer内容。此处写出出到数据文件,index文件是在5.4.1当中写出的

totalBytesWritten += expectedBytes;
BufferReaderWriterUtil.writeBuffers(dataFileChannel, expectedBytes, bufferWithHeaders);

static void writeBuffers(FileChannel channel, long bytesExpected, ByteBuffer... buffers)
        throws IOException {
    // The FileChannel#write method relies on the writev system call for data writing on linux.
    // The writev system call has a limit on the maximum number of buffers can be written in one
    // invoke whose advertised value is 1024 (see writev man page for more information), which
    // means if more than 1024 buffers is written in one invoke, it is not guaranteed that all
    // bytes can be written, so we build this safety net.
    if (bytesExpected > channel.write(buffers)) {
        for (ByteBuffer buffer : buffers) {
            writeBuffer(channel, buffer);
        }
    }
}


http://www.niftyadmin.cn/n/3018939.html

相关文章

Flink Sort-Shuffle读简析

文章目录1、SortMergeResultPartition的创建使用2、PartitionedFileReader2.1、moveToNextReadableRegion2.2、readCurrentRegion2.3、hasRemaining3、读操作的调用4、数据返回4.1、读入缓存4.2、buffersRead读取1、SortMergeResultPartition的创建使用 首先是一个读过程的一个…

信息社会

消息 通知 公告(简报) 新闻(深度分析文章) 历史(沉淀形成知识) 各部门的信息系统 陕西省住房和城乡建设厅 陕西省建筑市场监管平台陕西省质量安全监管信息系统陕西省标准定额协同管理平台陕西省房地产市场监管信息系统陕西省城市园林绿化企业信息管理系统陕西省执业资格注册人员…

java 实现方法_java常见代码(1)------常见实现方法

1.equals 和 hashcode方法class Students {String name;int age;byte[] idSequence;Overridepublic boolean equals(Object obj) {if (!(obj instanceof Students))return false;Students other (Students) obj;return name.equals(other.name)&& age other.age&…

JdbcSink 简析

文章目录1、JdbcSink1.1、参数1.2、返回2、JdbcBatchingOutputFormat2.1、参数2.2、open方法2.2.1、连接数据库2.2.2、JdbcExec2.2.3、scheduler2.3、writeRecord方法2.3.1、缓存数据2.3.2、flush1、JdbcSink 用于DataStream增加Jdbc的Sink输出&#xff0c;主要两个接口&#x…

机器学习(1)_R与神经网络之Neuralnet包

本篇博客将会介绍R中的一个神经网络算法包&#xff1a;Neuralnet&#xff0c;通过模拟一组数据&#xff0c;展现其在R中是如何使用&#xff0c;以及如何训练和预测。在介绍Neuranet之前&#xff0c;我们先简单介绍一下神经网络算法。 人工神经网络(ANN)&#xff0c;简称神经网络…

C# List.ForEach 方法

C#中List.ForEach 方法是对 List 的每个元素执行指定操作。 示例&#xff1a; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace AppExample {class Program{static void Main(string[] args){…

Adaptive调度器

文章目录1.前言2.测试3.配置启用4.其他配置参数4.1.主要配置4.2.其他可能相关的配置5.调用流程6.配置Adaptive调度器7.DefaultDeclarativeSlotPool7.1.NewSlotsListener7.2.offerSlots7.3.freeReservedSlot7.4.缩容触发8.AdaptiveScheduler8.1.使用条件8.2.计算并行度信息8.2.1…

jQuery触屏插件:Tap 代码

jQuery触屏插件&#xff1a;Tap&#xff0c;使用方法非常简单&#xff0c;例&#xff1a;$("#domid").tap(function(){alert("You tapped me! -- by"this.innerText);});依赖jquery 1.701$.fn.tap function(fn){02var collection this,03isTouch "…