f (!empty($uidarr)) { $uids = array(); $update = array(); foreach ($uidarr as $_uid => $n) { $uids[] = $_uid; $update[$_uid] = array('articles-' => $n); 'mysql' != $conf['cache']['type'] and cache_delete('user-' . $_uid); } user_big_update(array('uid' => $uids), $update); } !empty($operate_create) && function_exists('operate_big_insert') and operate_big_insert($operate_create); return TRUE; } // 大数据量容易超时 删除用户时使用,删除主题 回复 栏目统计 附件 全站统计 function well_thread_delete_all_by_uid($uid) { // 用户主题数 $user = user_read_cache($uid); set_time_limit(0); // 删除所有回复 if ($user['comments']) { $postist = comment_pid_find_by_uid($uid, 1, $user['comments'], FALSE); $pidarr = array(); foreach ($postist as $val) { $pidarr[] = $val['pid']; } unset($postist); !empty($pidarr) and comment_delete_by_pids($pidarr); } if ($user['articles']) { // 如果主题、附件和回复数量太大会超时 $tidlist = thread_tid_find_by_uid($uid, 1, $user['articles'], FALSE, 'tid', array('tid')); $tidarr = array(); foreach ($tidlist as $val) { $tidarr[] = $val['tid']; } unset($tidlist); !empty($tidarr) and well_thread_delete_all($tidarr); } return TRUE; } // 搜索标题 function well_thread_find_by_keyword($keyword, $d = NULL) { if (empty($keyword)) return NULL; $db = $_SERVER['db']; $d = $d ? $d : $db; if (!$d) return FALSE; $threadlist = db_sql_find("SELECT * FROM `{$d->tablepre}website_thread` WHERE subject LIKE '%$keyword%' LIMIT 60;", 'tid', $d); if ($threadlist) { $threadlist = arrlist_multisort($threadlist, 'tid', FALSE); foreach ($threadlist as &$thread) { well_thread_format($thread); // 关键词标色 //$thread['subject'] = comment_highlight_keyword($thread['subject'], $keyword); } } return $threadlist; } // 查找 最后评论 lastpid function well_thread_find_lastpid($tid) { $arr = comment_pid_read(array('tid' => $tid), array('pid' => -1), array('pid')); $lastpid = empty($arr) ? 0 : $arr['pid']; return $lastpid; } // 更新最后的 uid function well_thread_update_last($tid) { if (empty($tid)) return FALSE; $lastpid = well_thread_find_lastpid($tid); if (empty($lastpid)) return FALSE; $lastpost = comment_read($lastpid); if (empty($lastpost)) return FALSE; $r = well_thread_update($tid, array('lastuid' => $lastpost['uid'])); return $r; } function well_thread_maxid() { $n = db_maxid('website_thread', 'tid'); return $n; } // 主题状态 0:通过 1~9 审核:1待审核 10~19:10退稿 11逻辑删除 function well_thread_format(&$thread) { global $gid, $uid, $forumlist; $conf = _SERVER('conf'); if (empty($thread)) return; $thread['create_date_fmt'] = humandate($thread['create_date']); $thread['last_date_fmt'] = humandate($thread['last_date']); $thread['create_date_fmt_ymd'] = date('Y-m-d', $thread['create_date']); $thread['last_date_fmt_ymd'] = date('Y-m-d', $thread['last_date']); $user = user_read_cache($thread['uid']); $onlinelist = online_user_list_cache(); $user['online_status'] = isset($onlinelist[$user['uid']]) ? 1 : 0; $thread['username'] = $user['username']; $thread['user_avatar_url'] = array_value($user, 'avatar_url'); $thread['user'] = user_safe_info($user); unset($user); $forum = array_value($forumlist, $thread['fid']); $thread['forum_name'] = array_value($forum, 'name'); $thread['forum_url'] = array_value($forum, 'url'); if ($thread['last_date'] == $thread['create_date']) { $thread['last_date_fmt'] = ''; $thread['lastuid'] = 0; $thread['lastusername'] = ''; } else { $lastuser = $thread['lastuid'] ? user_read_cache($thread['lastuid']) : array(); $thread['lastusername'] = $thread['lastuid'] ? $lastuser['username'] : lang('guest'); $thread['lastuser'] = $thread['lastuid'] ? user_safe_info($lastuser) : array(); unset($lastuser); } $thread['url'] = url('read-' . $thread['tid'], '', FALSE); if ($conf['url_rewrite_on'] > 1) { !empty($forum['well_alias']) and $thread['url'] = url(urlencode($forum['well_alias'])."-$thread[create_date]a$thread[tid]", '', FALSE); } else { $thread['url'] = url("read-$thread[create_date]a$thread[tid]", '', FALSE); } $thread['user_url'] = url('user-' . $thread['uid']); $thread['sticky_class'] = ''; if ($thread['sticky'] > 0) { if (1 == $thread['sticky']) { $thread['sticky_class'] = 'success'; } elseif (2 == $thread['sticky']) { $thread['sticky_class'] = 'warning'; } elseif (3 == $thread['sticky']) { $thread['sticky_class'] = 'danger'; } } $nopic = view_path() . 'img/nopic.png'; if ($thread['icon']) { $attach_dir_save_rule = array_value($conf, 'attach_dir_save_rule', 'Ym'); $day = date($attach_dir_save_rule, $thread['icon']); $thread_format_icon_default = 1; if (1 == $thread_format_icon_default) { // 本地文件绝对路径 $destfile = $conf['upload_path'] . 'thumbnail/' . $day . '/' . $thread['uid'] . '_' . $thread['tid'] . '_' . $thread['icon'] . '.jpeg'; // 本地 $thread['icon_fmt'] = is_file($destfile) ? file_path($thread['attach_on']) . 'thumbnail/' . $day . '/' . $thread['uid'] . '_' . $thread['tid'] . '_' . $thread['icon'] . '.jpeg' : $nopic; if (1 == $conf['attach_on'] && 1 == $thread['attach_on']) { // 云储存 $thread['icon_fmt'] = file_path($thread['attach_on']) . 'thumbnail/' . $day . '/' . $thread['uid'] . '_' . $thread['tid'] . '_' . $thread['icon'] . '.jpeg'; } elseif (2 == $conf['attach_on'] && 2 == $thread['attach_on']) { // 图床 未上传成功 本地图片在的话使用本地,不在则默认 $thread['icon_fmt'] = $thread['image_url'] ? $thread['image_url'] : $thread['icon_fmt']; } } } else { $thread['icon_fmt'] = $nopic; } // 回复页面 $thread['pages'] = ceil($thread['posts'] / $conf['comment_pagesize']); $thread['tag_fmt'] = $thread['tag'] ? xn_json_decode($thread['tag']) : ''; // 权限判断 $thread['allowupdate'] = ($uid == $thread['uid']) || forum_access_mod($thread['fid'], $gid, 'allowupdate'); $thread['allowdelete'] = (group_access($gid, 'allowuserdelete') and $uid == $thread['uid']) || forum_access_mod($thread['fid'], $gid, 'allowdelete'); $thread['allowtop'] = forum_access_mod($thread['fid'], $gid, 'allowtop'); $thread = well_thread_safe_info($thread); } function well_thread_format_last_date(&$thread) { if ($thread['last_date'] != $thread['create_date']) { $thread['last_date_fmt'] = humandate($thread['last_date']); } else { $thread['create_date_fmt'] = humandate($thread['create_date']); } } // 对 $threadlist 权限过滤 function well_thread_list_access_filter(&$threadlist, $gid) { global $forumlist; if (empty($threadlist)) return NULL; foreach ($threadlist as $tid => $thread) { if (empty($forumlist[$thread['fid']]['accesson'])) continue; if ($thread['sticky'] > 0) continue; if (!forum_access_user($thread['fid'], $gid, 'allowread')) { unset($threadlist[$tid]); } } } function well_thread_safe_info($thread) { unset($thread['userip'], $thread['user']['threads'], $thread['user']['posts'], $thread['user']['credits'], $thread['user']['golds'], $thread['user']['money']); empty($thread['user']) || $thread['user'] = user_safe_info($thread['user']); return $thread; } // 过滤安全数据 function well_thread_filter(&$val) { unset($val['userip'], $val['fid'], $val['flagid'], $val['type'], $val['user'], $val['create_date']); } //------------------------ 其他方法 ------------------------ // 集合主题tid,统一拉取,避免多次查询thread表 function thread_unified_pull($arr) { global $gid, $fid; $stickylist = array_value($arr, 'stickylist', array()); $tidlist = array_value($arr, 'tidlist', array()); //$fid = array_value($arr, 'fid'); // 合并过滤空数组 //$tidlist = array_filter($stickylist + $tidlist); $tidarrlist = $tidlist = $stickylist + $tidlist; // 版块自定义 list($flaglist, $flagtids) = flag_thread_by_fid($fid); empty($flagtids) || $tidarrlist += $flagtids; unset($flagtids); // 在这之前合并所有二维数组 tid值为键/array('tid值' => tid值) $tidarr = empty($tidarrlist) ? array() : arrlist_values($tidarrlist, 'tid'); // 在这之前使用array_merge()前合并所有一维数组 tid/array(1,2,3) if (empty($tidarr)) return NULL; $tidarr = array_unique($tidarr); // 主题相关统一遍历后再归类 $arrlist = well_thread_find($tidarr, count($tidarr)); // 过滤没有权限访问的主题 / filter no permission thread well_thread_list_access_filter($arrlist, $gid); $threadlist = array(); foreach ($arrlist as $_tid => &$_thread) { $_thread = well_thread_safe_info($_thread); // 归类列表数据 isset($tidlist[$_thread['tid']]) and $threadlist[$_tid] = $_thread; // flag thread if (!empty($flaglist)) { foreach ($flaglist as $key => $val) { if (isset($val['tids']) && in_array($_thread['tid'], $val['tids'])) { $flaglist[$key]['list'][array_search($_thread['tid'], $val['tids'])] = $_thread; ksort($flaglist[$key]['list']); } } } } // 按之前tidlist排序 $threadlist = array2_sort_key($threadlist, $tidlist, 'tid'); unset($arrlist, $tidlist); $arr = array('threadlist' => $threadlist, 'flaglist' => $flaglist); return $arr; } // read.php 详情页其他主题调用,集合tid统一拉取数据,最后归类 function thread_other_pull($thread) { global $forumlist, $gid; $fid = array_value($thread, 'fid'); $forum = array_value($forumlist, $fid); if (empty($forum)) return NULL; //$tid = array_value($thread, 'tid'); //$tag_fmt = array_value($thread, 'tag_fmt'); $arrlist = array(); $tidlist = array(); // 版块自定义 list($flaglist, $flagtids) = flag_thread_by_fid($fid); empty($flagtids) || $tidlist += $flagtids; unset($flagtids); // 在这之前合并所有二维数组 tid值为键/array('tid值' => tid值) $tidarr = empty($tidlist) ? array() : arrlist_values($tidlist, 'tid'); // 在这之前使用array_merge()前合并所有一维数组 tid/array(1,2,3) if (empty($tidarr)) return NULL; $tidarr = array_unique($tidarr); // 主题相关统一遍历后再归类 $threadlist = well_thread_find($tidarr, count($tidarr)); // 过滤没有权限访问的主题 / filter no permission thread well_thread_list_access_filter($threadlist, $gid); foreach ($threadlist as &$_thread) { $_thread = well_thread_safe_info($_thread); // flag thread if (!empty($flaglist)) { foreach ($flaglist as $key => $val) { if (isset($val['tids']) && in_array($_thread['tid'], $val['tids'])) { $flaglist[$key]['list'][array_search($_thread['tid'], $val['tids'])] = $_thread; ksort($flaglist[$key]['list']); } } } } unset($threadlist); if (!empty($flaglist)) { foreach ($flaglist as &$val) { $i = 0; if (!isset($val['list'])) continue; foreach ($val['list'] as &$v) { ++$i; $v['i'] = $i; } } $arrlist['flaglist'] = $flaglist; unset($flaglist); } return $arrlist; } //--------------------------cache-------------------------- // 已格式化 从缓存中读取,避免重复从数据库取数据 function well_thread_read_cache($tid) { global $conf; $key = 'website_thread_' . $tid; static $cache = array(); // 用静态变量只能在当前 request 生命周期缓存,跨进程需要再加一层缓存:redis/memcached/xcache/apc if (isset($cache[$key])) return $cache[$key]; if ('mysql' == $conf['cache']['type']) { $r = well_thread_read($tid); } else { $r = cache_get($key); if (NULL === $r) { $r = well_thread_read($tid); $r and cache_set($key, $r, 1800); } } $cache[$key] = $r ? $r : NULL; return $cache[$key]; } ?>什么是Apache Spark
最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

什么是Apache Spark

网站源码admin3浏览0评论

什么是Apache Spark

Apache Spark 是一个闪电般的开源数据处理引擎,用于机器学习和人工智能应用程序,由最大的大数据开源社区提供支持。

什么是Apache Spark?

Apache Spark(Spark)是一个用于大型数据集的开源数据处理引擎。它旨在提供大数据所需的计算速度、可扩展性和可编程性,特别是流数据、图形数据、机器学习和人工智能 (AI) 应用程序。

Spark 的分析引擎处理数据的速度比其他产品快 10 到 100 倍。它通过在大型计算机群集之间分配处理工作来扩展,并具有内置的并行性和容错功能。它甚至包括数据分析师和数据科学家中流行的编程语言API,包括Scala,Java,Python和R。

在大数据行业,大家经常拿Spark与Apache Hadoop进行比较,特别是Hadoop的原生数据处理组件MapReduce。Spark和MapReduce之间的主要区别在于,Spark处理数据并将其保存在内存中以供后续步骤使用,而无需写入或读取磁盘,从而大大加快了处理速度。

Spark于2009年在加州大学伯克利分校开发。今天,它由Apache软件基金会维护,并拥有最大的大数据开源社区,拥有超过1000名贡献者。它还被列为几个商业大数据产品的核心组件。

Apache Spark 的工作原理

Apache Spark采用分层的主/从架构。Spark 驱动程序是控制集群管理器的主节点,它管理工作器(从属)节点并将数据结果传送到应用程序客户端。以下是Apache Spark架构

基于应用程序代码,Spark Driver 生成SparkContext,它与集群管理器(Spark 的独立集群管理器或其他集群管理器(如 Hadoop YARN、Kubernetes 或 Mesos))一起工作,并执行跨节点分发和监控。它还创建了弹性分布式数据集(RDD),这是Spark卓越处理速度的关键所在。

弹性分布式数据集 (RDD)

弹性分布式数据集 (RDD) 是元素的容错集合,可以分布在群集中的多个节点之间并并行处理。RDD是Apache Spark中的基本结构。

Spark 通过引用数据源或使用 SparkContext 并行化方法将现有集合并行化到 RDD 中进行处理。当数据加载到 RDD 后,Spark 会对内存中的 RDD 执行转换和操作,这是 Spark 速度的关键。Spark 还会将数据存储在内存中,除非系统内存不足或用户决定将数据写入磁盘以实现持久化目的。

RDD 中的每个数据集都分为逻辑分区,这些逻辑分区可以在集群的不同节点上进行计算。而且,用户可以执行两种类型的RDD操作:转换和操作。转换是应用于创建新 RDD 的操作。操作用于指示 Apache Spark 应用计算并将结果传递回驱动程序。

Spark支持RDD上的各种操作和转换。此分发由 Spark 完成,因此用户不必担心计算正确的分发。

有向无环图 (DAG)

与MapReduce中的两阶段执行过程相反,Spark创建了一个有向无环图(DAG)来调度任务和跨集群的工作节点编排。当 Spark 在任务执行过程中执行和转换数据时,DAG 计划程序通过编排群集中的工作器节点来提高效率。并对任务进行跟踪,以使容错成为可能,因为它将记录的操作并重新应用于先前状态中的数据。

数据帧和数据集

除了RDD之外,Spark还处理另外两种数据类型:DataFrames和Datasets。

数据帧是最常见的结构化应用程序编程接口 (API),表示包含行和列的数据表。尽管RDD一直是Spark的关键功能,但它现在处于维护模式。由于Spark的机器学习库(MLlib)的普及,DataFrames作为MLlib的主要API占据了主导地位。在使用MLlib API时,这一点很重要,因为DataFrames提供了不同语言(如Scala,Java,Python和R)的一致性。

数据集是数据帧的扩展,提供类型安全、面向对象的编程接口。默认情况下,数据集是强类型 JVM 对象的集合,与数据帧不同。

Spark SQL允许从DataFrames和SQL数据存储(如Apache Hive)查询数据。Spark SQL 查询在另一种语言中运行时返回数据帧或数据集。

Spark核心

Spark Core 是所有并行数据处理的基础,可处理调度、优化、RDD 和数据抽象。Spark Core 为 Spark 库、Spark SQL、Spark Streaming、MLlib 机器学习库和 GraphX 图形数据处理提供了功能基础。Spark 核心和集群管理器在 Spark 集群中分发数据并将其抽象化。这种分布和抽象使得处理大数据非常快速和用户友好。

Spark接口

Spark 包括各种应用程序编程接口 (API),可将 Spark 的强大功能带给最广泛的受众。Spark SQL允许以关系方式与RDD数据进行交互。Spark还有一个记录良好的API,用于Scala,Java,Python和R。Spark 中的每种语言 API 在处理数据的方式上都有其特定的细微差别。RDD、数据帧和数据集在每种语言 API 中都可用。通过针对如此多语言的 API,Spark 使具有开发、数据科学和统计背景的更多不同人群可以访问大数据处理。

Apache Spark 和机器学习

Spark 具有各种各样的库,可将功能扩展到机器学习、人工智能 (AI) 和流处理。

Apache Spark MLlib

Apache Spark的关键功能之一是Spark MLlib中可用的机器学习功能。Apache Spark MLlib 提供了一个开箱即用的解决方案,用于执行分类和回归、协同过滤、聚类、分布式线性代数、决策树、随机森林、梯度提升树、频繁模式挖掘、评估指标和统计。MLlib的功能与Spark可以处理的各种数据类型相结合,使Apache Spark成为不可或缺的大数据工具。

Spark GraphX

除了具有API功能外,Spark还具有Spark GraphX,这是Spark的新功能,旨在解决图形问题。GraphX 是一种图抽象,它扩展了图和图并行计算的 RDD。Spark GraphX与存储互连信息或连接信息网络的图形数据库集成,例如社交网络。

Spark Streaming

Spark Streaming 是核心 Spark API 的扩展,可实现实时数据流的可扩展容错处理。当Spark Streaming处理数据时,它可以将数据传送到文件系统,数据库和实时仪表板,以便使用Spark的机器学习和图形处理算法进行实时流分析。Spark Streaming 基于 Spark SQL 引擎构建,还允许增量批处理,从而更快地处理流数据。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2014-09-18,如有侵权请联系 cloudcommunity@tencent 删除机器学习apachespark集群数据

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论