Saturday, July 23, 2016

MQTT with SSL Connection Example

Using "setSSLProperties" in MqttConnectOptions we can easily configure SSL.
import java.util.Properties;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class TestCon {
 public static void main(String[] args) {
  try {
   MqttClient client = new MqttClient("ssl://brocker_ip:8883", "testclient");
   MqttConnectOptions connectOptions = new MqttConnectOptions();
   connectOptions.setUserName("username");
   connectOptions.setPassword("password".toCharArray());
   Properties props = new Properties();
   props.setProperty("com.ibm.ssl.keyStore", "jksFilePath.jks");
   props.setProperty("com.ibm.ssl.keyStorePassword","jksPassword");
   connectOptions.setSSLProperties(props);
   client.connect(connectOptions);   
   MqttMessage  message = new MqttMessage();
   message.setPayload("test".getBytes());
   client.publish("topic", message);
   client.disconnect();   
  } catch (Exception e) {
   e.printStackTrace();
  }
  
 }
}

Sunday, February 28, 2016

Apache Spark with large data sets - Possible problems and solutions

IssueWorkaround
Driver OOM while using reduceByKeydecrease spark.default.parallelism
Java killed by OOM killer on a slave nodechange formula in /root/spark-ec2/deploy_templates.py on master node:
spark_mb = system_ram_mb * 4 // 5
SPARK-4808 Spark fails to spill with small number of large objectsupdate Spark to 1.4.1
SPARK-5077 Map output statuses can still exceed spark.akka.frameSizeset("spark.akka.frameSize", "128")
SPARK-6246 spark-ec2 can't handle clusters with > 100 nodesapply the patch for deploy script
SPARK-6497 Class is not registered: scala.reflect.ManifestFactory$$anon$9don’t force Kryo class registration
HADOOP-6254 s3n fails with SocketTimeoutExceptionuse S3A file system in Hadoop 2.7.1
S3 HEAD request failed for ... - ResponseCode=403, ResponseMessage=Forbiddenthe same
gzip input files are not splittable. (We need to save storage space by archiving datasets. The default choice gzip format doesn’t allow random read access. This is not a bug, but it greatly increases the probability of failing with other issues and degrades performance.)use bzip2 compression for input files
HADOOP-7823 port HADOOP-4012 to branch-1 (splitting support for bzip2)update to Hadoop 2.7.1
HADOOP-10614 CBZip2InputStream is not threadsafethe same
HADOOP-10400 Incorporate new S3A FileSystem implementationthe same
HADOOP-11571 Über-jira: S3a stabilisation phase Ithe same
SPARK-6668 repeated asking to remove non-existent executorinstall Hadoop 2.7.1 native libraries
SPARK-5348 s3a:// protocol and hadoop-aws dependencybuild Spark with patch
Stack Overflow - How to access s3a:// files from Apache Spark?--conf spark.hadoop.fs.s3a.access.key=...
--conf spark.hadoop.fs.s3a.secret.key=...
HADOOP-9565 Add a Blobstore interface to add to blobstore FileSystemsuse DirectOutputCommitter (see here)
Timeout waiting for connection from poolconf.setInt("fs.s3a.connection.maximum", 100)

Courtecy : http://tech.grammarly.com/blog/posts/Petabyte-Scale-Text-Processing-with-Spark.html#sthash.GsFi4n0G.dpuf

Friday, February 26, 2016

Spark Join and number of partitions

As we all know, number of partitions plays an important role in Apache Spark RDD.
We may need to pre-calculate the number of partitions we are expecting after RDD operations.
We can change partition number using coalesce.

But what will happen when we do RDD Join operation ? Because we are joining two different RDDs, what will be the number of partitions of the result ?

The answer is,



The number depends on `spark.sql.shuffle.partitions`. You can set it for customize it. The default value will be 200. 


Property NameDefaultMeaning


spark.sql.shuffle.partitions200Configures the number of partitions to use when shuffling data for joins or aggregations.



http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options

Wednesday, February 24, 2016

world's first open source memory-centric distributed storage system Alluxio (formerly known as tachyon) released v1.0 on Feb 23 2016



Alluxio is the world's first open source memory-centric distributed storage system enabling reliable data sharing at memory-speed across cluster jobs, possibly written in different computation frameworks, such as Apache Spark, Apache MapReduce, and Apache Flink. In the big data ecosystem, Alluxio lies between computation frameworks or jobs, such as Apache Spark, Apache MapReduce, or Apache Flink, and various kinds of storage systems, such as Amazon S3, OpenStack Swift, GlusterFS, HDFS, Ceph, or OSS.



"Today, we are very excited to announce the 1.0 release of Alluxio, the world’s first memory-centric virtual distributed storage system, which unifies data access and bridges computation frameworks and underlying storage systems. Applications only need to connect with Alluxio to access data stored in any underlying storage systems. Additionally, Alluxio’s memory-centric architecture enables data access orders of magnitude faster than existing solutions.
Now, organizations can run any computation framework (e.g. Apache Spark, Apache MapReduce, Apache Flink, etc.) with any storage system (e.g. Alibaba OSS, Amazon S3, OpenStack Swift, GlusterFS, Ceph, etc.), leveraging any storage media (e.g. DRAM, SSD, HDD, etc.).", Haoyuan Li, CEO of Alluxio wrote this on his blog. 


This graph will show how people are involved in this project comparing with other open source projects. 


Get ready for tackle Big data with Alluxio in "tachyon" speed. :) 

http://www.alluxio.org/


I  already wrote some blog about tachyon in past days.

http://devslogics.blogspot.com/2015/12/tachyon-cluster-configuration-setup.html
http://devslogics.blogspot.in/2015/12/tachyon-was-not-formatted.html


Monday, January 11, 2016

Failed to mount cgroups hierarchy at '/sys/fs/cgroup/freezer': 'freezer' is already attached to another hierarchy - MESOS

Error Log

While trying to start a slave in mesos cluster the following error may occur.

I0107 12:13:11.110400 18698 slave.cpp:585] Slave terminating
I0112 10:42:21.503870  9978 main.cpp:185] Build: 2015-10-12 20:59:28  
I0112 10:42:21.504222  9978 main.cpp:187] Version: 0.25.0
I0112 10:42:21.504257  9978 main.cpp:190] Git tag: 0.25.0
I0112 10:42:21.504278  9978 main.cpp:194] Git SHA: 2dd7f7ee115fe00b8e098b0a10762a4fa8f4600f
I0112 10:42:21.508028  9978 containerizer.cpp:143] Using isolation: posix/cpu,posix/mem,filesystem/posix
Failed to create a containerizer: Could not create MesosContainerizer: Failed to create launcher: Failed to create Linux launcher: Failed to mount cgroups hierarchy at '/sys/fs/cgroup/freezer': Failed to create directory '/sys/fs/cgroup/freezer': No such file or directory

Solution

add --launcher=posix to the slave start command.


That is,

mesos-slave --master=MASTER_IP:5050 

update the above command with the following 

mesos-slave --master=MASTER_IP:5050 --launcher=posix 





Courtesy : Mailing list