用 PHP 处理 10 亿行数据!(作者:校长)

回贴
回帖数 2
阅读数 443
发表时间 2024-04-02 10:05:45
📘
路婕 楼主
今天,我将带大家一起走进“挑战十亿行“数据的世界。

当然,这个事情是根据GitHub上的一个“十亿行挑战”(1brc)活动而来,现在正在进行,如果你没有听说过,可查看Gunnar Morlings 的 1brc 存储库。 https://github.com/gunnarmorling/1brc


我之所以被吸引,是因为社区中的两位同事参加了比赛。以下地址是全球各地开发者的代码排行榜。
https://github.com/gunnarmorling/1brc?tab=readme-ov-file#results

图1 10亿行代码处理速度排行榜


我只截图一小部分,感兴趣的朋友还可以浏览上面的网址。


使用PHP加入速度排行榜

大家知道,PHP 并不以速度最快而闻名。在我开发 PHP 分析器时,我想应该尝试一下它,看看能达到多快的速度。

第一种,最“幼稚”的方法

我克隆了存储库,下载了measurements.txt(一个csv格式文件)后,我开始构建第一个最“天真纯朴”的实现,但是能够解决这个挑战。

代码如下:


<?php
$stations = [];
$fp = fopen('measurements.txt', 'r');
while ($data = fgetcsv($fp, null, ';')) {
    if (!isset($stations[$data[0]])) {
        $stations[$data[0]] = [
            $data[1],
            $data[1],
            $data[1],
            1
        ];
    } else {
        $stations[$data[0]][3] ++;
        $stations[$data[0]][2] += $data[1];
        if ($data[1] < $stations[$data[0]][0]) {
            $stations[$data[0]][0] = $data[1];
        }
        if ($data[1] > $stations[$data[0]][1]) {
            $stations[$data[0]][1] = $data[1];
        }
    }
}
ksort($stations);
echo '{';
foreach($stations as $k=>&$station) {
    $station[2] = $station[2]/$station[3];
    echo $k, '=', $station[0], '/', $station[2], '/', $station[1], ', ';
}
echo '}';
这里没什么可炫耀的,我们先打开文件,然后用来fgetcsv()函数来读取数据。如果未找到元素,则创建新的空的stations数组和相应列,如要有数据则增加计数,对温度求和,并查看当前温度是否低于或高于最小值或最大值,并做相应的更新。

我把所有东西都放在一起,使用ksort()按$stations数组顺序排列,然后回显数组并计算平均温度(总和/计数)。

简单描述完算法,然后在我的笔记本电脑上运行了这个简单代码,它需要25 分钟。


就想着如何优化,下面查看分析器运行结果:


可视化的时间轴帮助我看到了,明显是 CPU 限制了速度,脚本开头的文件编译可以忽略不计,并且也没有垃圾收集事件。


火焰视图,显示了fgetcsv()函数上的瓶颈。

使用fgets()代替fgetcsv()

第一个优化是使用fgets()手动获取每一行,然后根据文件;分号分割,而不在依赖fgetcsv()。这主要是因为我看 fgetcsv()所做的远远超出了本人的“预期”图片。更换后的主要代码如下:
// ...
while ($data = fgets($fp, 999)) {
    $pos = strpos($data, ';');
    $city = substr($data, 0, $pos);
    $temp = substr($data, $pos+1, -1);
// ...

此外,我还$data[0]以及各个数组进行了重构。通过这一更改,再次运行脚本就已将运行时间降至19m 49s。从绝对数字来看,下降了很多,是 21%!


火焰图也反映了这一变化,切换到按行显示, CPU 时间也提示了中间发生的情况:

我又发现在第 18 行和第 23 行花费了约 38% 的 CPU 时间,它们是:
18 | $stations[$city][3] ++;
   | // ...
23 | if ($temp > $stations[$city][1]) {
第 18 行是循环中对数组$stations的第一次访问,它做了一个自增,第 23 行是一个比较,乍一看似乎没有怎么花费时间,让我们对它再做一些优化,会看到这里花费了时间。


首先,要尽可能地使用“引用操作符”—&:

$station = &$stations[$city];
$station[3] ++;
$station[2] += $temp;
// instead of
$stations[$city][3] ++;
$stations[$city][2] += $data[1];

这样应该有助于 PHP 在每次访问数组时不必搜索数组$stations中的键,将其视为用于访问数组中“当前”的缓存。实际上它也非常有帮助,此次运行只需要17m 48s,又减少了 10%!

只包含一处比较

在查看代码时,我偶然发现了以下这段代码:
if ($temp < $station[0]) {
    $station[0] = $temp;
}
if ($temp > $station[1]) {
    $station[1] = $temp;
}
新版本的代码执行时间为 17m 30s,又增加了约 2%。

添加类型转换

PHP被认为是一种动态语言,这是我刚开始编写软件时非常看重的东西,少了一些需要关心的问题。但另一方面,了解类型有助于引擎在运行代码时做出更好的决策。下列代码来看一下:

$temp = (float)substr($data, $pos+1, -1);

你猜怎么着?这个简单的转换,让脚本运行时间仅为13m 32s,性能提升了 21%!


18 | $station = &$stations[$city];
   | // ...
23 | } elseif ($temp > $station[1]) {
第 18 行仍然显示出 11% 的 CPU 时间消耗,这是对数组的访问(在哈希映射中查找键,哈希映射(HashMap)是 PHP 中用于关联数组的底层数据结构)。第 23 行修改后 CPU 时间从 ~32% 下降到 ~15%。这是因为 PHP 不再进行类型转换处理。

使用 JIT 怎么样?

PHP 中的 OPCache 在 CLI 状态中默认处于禁用状态,需要将opcache.enable_cli设置设置为on。


JIT(作为 OPCache 的一部分)默认启用,但由于缓冲区大小设置为0,因此相当于有效禁用,因此我设置了opcache.jit-buffer-size项,例如使用10M。应用这些更改后,我使用 JIT 重新运行脚本并看到它完成:7m 19s !花费的时间减少了45.9%!

还有什么!?

现在,我已经将运行时间从一开始的 25 分钟缩短到了大约 7 分钟。


我还发现令人惊讶的一件事是fgets()分配 56 GiB/m 的内在来读取 13 GB 文件。好像有些不太对劲,所以我又检查了fgets()的实现,看起来我可以通过省略参数来节省大量长度比较,再分配fgets():

while ($data = fgets($fp)) {
// instead of
while ($data = fgets($fp, 999)) {
比较更改前后的配置文件可以得出以下结果:

你可能认为这次又带来了很大的性能提升,但实际上只有大约 1%。这是因为这些是Zend内存管理器可以在处理的更小的二进制分配,但是它的速度非常快。

可以让它更快吗?

是的,我们可以!到目前为止,我的方法还是单线程,这是大多数 PHP 软件的本质,但 PHP 确实通过并行扩展支持用户态中的线程。


上面分析器清楚地显示,在 PHP 中读取数据是一个瓶颈。从切换fgetcsv()到fgets()手动拆分提供了帮助,但这仍然需要花费大量时间,因此下面使用线程并行读取与处理数据,然后再合并工作线程的中间结果。完整代码如下:
<?php
$file = 'measurements.txt';
$threads_cnt = 16;
/**
 * Get the chunks that each thread needs to process with start and end position.
 * These positions are aligned to \n chars because we use `fgets()` to read
 * which itself reads till a \n character.
 *
 * @return array<int, array{0: int, 1: int}>
 */
function get_file_chunks(string $file, int $cpu_count): array {
    $size = filesize($file);
    if ($cpu_count == 1) {
        $chunk_size = $size;
    } else {
        $chunk_size = (int) ($size / $cpu_count);
    }
    $fp = fopen($file, 'rb');
    $chunks = [];
    $chunk_start = 0;
    while ($chunk_start < $size) {
        $chunk_end = min($size, $chunk_start + $chunk_size);
        if ($chunk_end < $size) {
            fseek($fp, $chunk_end);
            fgets($fp); // moves fp to next \n char
            $chunk_end = ftell($fp);
        }
        $chunks[] = [
            $chunk_start,
            $chunk_end
        ];
        $chunk_start = $chunk_end+1;
    }
    fclose($fp);
    return $chunks;
}
/**
 * This function will open the file passed in `$file` and read and process the
 * data from `$chunk_start` to `$chunk_end`.
 *
 * The returned array has the name of the city as the key and an array as the
 * value, containing the min temp in key 0, the max temp in key 1, the sum of
 * all temperatures in key 2 and count of temperatures in key 3.
 *
 * @return array<string, array{0: float, 1: float, 2: float, 3: int}>
 */ 
$process_chunk = function (string $file, int $chunk_start, int $chunk_end): array {
    $stations = [];
    $fp = fopen($file, 'rb');
    fseek($fp, $chunk_start);
    while ($data = fgets($fp)) {
        $chunk_start += strlen($data);
        if ($chunk_start > $chunk_end) {
            break;
        }
        $pos2 = strpos($data, ';');
        $city = substr($data, 0, $pos2);
        $temp = (float)substr($data, $pos2+1, -1);
        if (isset($stations[$city])) {
            $station = &$stations[$city];
            $station[3] ++;
            $station[2] += $temp;
            if ($temp < $station[0]) {
                $station[0] = $temp;
            } elseif ($temp > $station[1]) {
                $station[1] = $temp;
            }
        } else {
            $stations[$city] = [
                $temp,
                $temp,
                $temp,
                1
            ];
        }
    }
    return $stations;
};
$chunks = get_file_chunks($file, $threads_cnt);
$futures = [];
for ($i = 0; $i < $threads_cnt; $i++) {
    $runtime = new \parallel\Runtime();
    $futures[$i] = $runtime->run(
        $process_chunk,
        [
            $file,
            $chunks[$i][0],
            $chunks[$i][1]
        ]
    );
}
$results = [];
for ($i = 0; $i < $threads_cnt; $i++) {
    // `value()` blocks until a result is available, so the main thread waits
    // for the thread to finish
    $chunk_result = $futures[$i]->value();
    foreach ($chunk_result as $city => $measurement) {
        if (isset($results[$city])) {
            $result = &$results[$city];
            $result[2] += $measurement[2];
            $result[3] += $measurement[3];
            if ($measurement[0] < $result[0]) {
                $result[0] = $measurement[0];
            }
            if ($measurement[1] < $result[1]) {
                $result[1] = $measurement[1];
            }
        } else {
            $results[$city] = $measurement;
        }
    }
}
ksort($results);
echo '{', PHP_EOL;
foreach($results as $k=>&$station) {
    echo "\t", $k, '=', $station[0], '/', ($station[2]/$station[3]), '/', $station[1], ',', PHP_EOL;
}
echo '}', PHP_EOL;
首先我扫描文件并将其分割成以\n对齐的块,我稍后会使用fgets()函数。当准备好块时,我启动$threads_cnt工作线程,然后所有线程都打开同一个文件并寻找分配的块,开始并读取和处理数据直到块结束,返回一个中间结果,然后主线程将其组合、排序并打印出来。

这种多线程方法只需:1 分 35 秒!


这就是结局?

当然不是。这个解决方案至少还有两件事要做:
  • 我在 Apple Silicon 硬件上的 MacOS 上运行此代码,在 PHP 的 ZTS 版本中使用 JIT 时会崩溃,因此 1m 35s 结果是没有使用上 JIT 的,如果我可以使用它,它可能会更快;
  • 我意识到我正在运行一个 PHP 版本,该版本是CFLAGS="-g -O0 ..."根据我日常工作的需要而自定义编译的。

我应该在一开始就检查一下这一块,所以我重新编译了 PHP 8.3,并修改了参数CLFAGS="-Os ..."。于是我的最终数字(16 个线程处理)是:27.7 秒!


这是一个有 10 个线程的时间线视图:


最底层的线程是主线程,等待工作线程的结果。一旦这些worker返回了中间结果,就可以看到主线程正在对所有内容进行组合和排序。我们也可以清楚地看到,主线程不再是瓶颈


如果想尝试进一步优化,还是要专注于Worker的线程处理。

我在路上学到了什么?

每个抽象层只是用可用性/集成来换取 CPU 周期或内存。fgetcsv()虽然超级易用,但隐藏了很多东西,这是有代价的。虽然fgets()也向我们隐藏了一些东西,但它让读取数据变得更灵活和方便。


在代码中添加类型,将有助于语言优化执行或停止类型转换(这是我们看不到的东西,但是需要付出 CPU 周期的代价)。


PHP的JIT 技术非常棒,尤其是在处理 CPU 密集型问题时!由于使用并行化(使用ext-parallel)线程,我们可以看到显著地提高了性能。


原文作者:校长

原文链接: https://mp.weixin.qq.com/s/mctgCCPqjQl0OKe0AtwXpw


声明:版权属于原文作者,如有侵权,请联系删除。


2024-04-02 10:20:01 路婕 最后编辑
2个回复
合拍 沙发

自古程序最高效

2024-10-15 10:23:56 合拍 回帖
禅道-Bee 板凳
2024-10-16 09:44:27 禅道-李锡碧 回帖
联系人
刘斌/高级客户经理
电话(微信)
17685869372
QQ号码
526288068
联系邮箱
liubin@chandao.com
返回顶部
刘斌
高级客户经理
17685869372
526288068
统一服务热线 4006-8899-23
我要提问提问有任何问题,您都可以在这里提问。 问题反馈反馈点击这里,让我们聆听您的建议与反馈。