From 26616791c5ea340b07ab5fbeaf3d2922812d7d9f Mon Sep 17 00:00:00 2001 From: Chunk Date: Mon, 27 Apr 2015 18:07:39 +0800 Subject: [PATCH] RDD-hbase bug fixed.(with 'repartition()') --- mspark/SC.py | 7 ++++--- test/test_data.py | 7 +++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/mspark/SC.py b/mspark/SC.py index 03ab128..5d7e5b9 100644 --- a/mspark/SC.py +++ b/mspark/SC.py @@ -247,9 +247,10 @@ def format_out(row, cols, withdata=False): puts.append((key, [key] + col + [str(data)])) return puts + # scconf = SparkConf() # scconf.setSparkHome("HPC-server") \ -# .setMaster("spark://HPC-server:7077") \ +# .setMaster("spark://HPC-server:7077") \ # .setAppName("example") # sc = SparkContext(conf=scconf) # @@ -342,7 +343,7 @@ class Sparker(object): self.model = None - def read_hbase(self, table_name, func=None, collect=False): + def read_hbase(self, table_name, func=None, collect=False, parallelism=40): """ ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data @@ -372,7 +373,7 @@ class Sparker(object): if collect: return hbase_rdd.collect() else: - return hbase_rdd + return hbase_rdd.repartition(parallelism) def write_hbase(self, table_name, data, fromrdd=False, columns=None, withdata=False): """ diff --git a/test/test_data.py b/test/test_data.py index 54180d9..7635f4f 100755 --- a/test/test_data.py +++ b/test/test_data.py @@ -93,7 +93,7 @@ def test_ILSVRC_S_LOCAL(): timer.report() -def test_ILSVRC_S_SPARK(category='Train_200'): +def test_ILSVRC_S_SPARK(category='Train_1000'): timer = Timer() timer.mark() @@ -102,7 +102,7 @@ def test_ILSVRC_S_SPARK(category='Train_200'): dil.format() dil.store_img() timer.report() - return + # return dils = ILSVRC_S.DataILSVRC_S(base='ILSVRC2013_DET_val', category=category) @@ -110,6 +110,9 @@ def test_ILSVRC_S_SPARK(category='Train_200'): dils._extract_data(mode='spark', writeback=False) timer.report() + # print dils.rdd_data.count() # pass + # return + timer.mark() dils._embed_data(mode='spark', rate=0.2, readforward=False, writeback=False) timer.report() -- libgit2 0.21.2