Beam 解释器

原文链接 : http://zeppelin.apache.org/docs/0.7.2/interpreter/beam.html

译文链接 : http://www.apache.wiki/pages/viewpage.action?pageId=10030766

贡献者 : 片刻 ApacheCN Apache中文网

概观

Apache Beam是数据处理流水线的开源统一平台。可以使用其中一个Beam SDK构建管道。管道的执行由不同的跑步者完成。目前,Beam支持Apache Flink Runner,Apache Spark Runner和Google Dataflow Runner。

如何使用

基本上,您可以编写正常的Beam java代码,您可以在其中确定Runner。您应该在类中写入main方法,因为解释器调用此main来执行管道。与Zeppelin正常模式不同,每段都被视为单独的工作,与任何其他段落没有任何关系。

下面是一个字的一个示范与字符串数组表示的数据计数例如但它可以通过更换从文件中读取数据Create.of(SENTENCES).withCoder(StringUtf8Coder.of())TextIO.Read.from("path/to/filename.txt")

  1. %beam
  2. // most used imports
  3. import org.apache.beam.sdk.coders.StringUtf8Coder;
  4. import org.apache.beam.sdk.transforms.Create;
  5. import java.io.Serializable;
  6. import java.util.Arrays;
  7. import java.util.List;
  8. import java.util.ArrayList;
  9. import org.apache.spark.api.java.*;
  10. import org.apache.spark.api.java.function.Function;
  11. import org.apache.spark.SparkConf;
  12. import org.apache.spark.streaming.*;
  13. import org.apache.spark.SparkContext;
  14. import org.apache.beam.runners.direct.*;
  15. import org.apache.beam.sdk.runners.*;
  16. import org.apache.beam.sdk.options.*;
  17. import org.apache.beam.runners.spark.*;
  18. import org.apache.beam.runners.spark.io.ConsoleIO;
  19. import org.apache.beam.runners.flink.*;
  20. import org.apache.beam.runners.flink.examples.WordCount.Options;
  21. import org.apache.beam.sdk.Pipeline;
  22. import org.apache.beam.sdk.io.TextIO;
  23. import org.apache.beam.sdk.options.PipelineOptionsFactory;
  24. import org.apache.beam.sdk.transforms.Count;
  25. import org.apache.beam.sdk.transforms.DoFn;
  26. import org.apache.beam.sdk.transforms.MapElements;
  27. import org.apache.beam.sdk.transforms.ParDo;
  28. import org.apache.beam.sdk.transforms.SimpleFunction;
  29. import org.apache.beam.sdk.values.KV;
  30. import org.apache.beam.sdk.options.PipelineOptions;
  31. public class MinimalWordCount {
  32. static List<String> s = new ArrayList<>();
  33. static final String[] SENTENCES_ARRAY = new String[] {
  34. "Hadoop is the Elephant King!",
  35. "A yellow and elegant thing.",
  36. "He never forgets",
  37. "Useful data, or lets",
  38. "An extraneous element cling!",
  39. "A wonderful king is Hadoop.",
  40. "The elephant plays well with Sqoop.",
  41. "But what helps him to thrive",
  42. "Are Impala, and Hive,",
  43. "And HDFS in the group.",
  44. "Hadoop is an elegant fellow.",
  45. "An elephant gentle and mellow.",
  46. "He never gets mad,",
  47. "Or does anything bad,",
  48. "Because, at his core, he is yellow",
  49. };
  50. static final List<String> SENTENCES = Arrays.asList(SENTENCES_ARRAY);
  51. public static void main(String[] args) {
  52. Options options = PipelineOptionsFactory.create().as(Options.class);
  53. options.setRunner(FlinkRunner.class);
  54. Pipeline p = Pipeline.create(options);
  55. p.apply(Create.of(SENTENCES).withCoder(StringUtf8Coder.of()))
  56. .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
  57. @Override
  58. public void processElement(ProcessContext c) {
  59. for (String word : c.element().split("[^a-zA-Z']+")) {
  60. if (!word.isEmpty()) {
  61. c.output(word);
  62. }
  63. }
  64. }
  65. }))
  66. .apply(Count.<String> perElement())
  67. .apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() {
  68. @Override
  69. public void processElement(DoFn<KV<String, Long>, String>.ProcessContext arg0)
  70. throws Exception {
  71. s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue());
  72. }
  73. }));
  74. p.run();
  75. System.out.println("%table word\tcount");
  76. for (int i = 0; i < s.size(); i++) {
  77. System.out.print(s.get(i));
  78. }
  79. }
  80. }